Skip to content

Commit 31f170d

Browse files
committed
Improve streams rebalance data
1 parent de35989 commit 31f170d

File tree

2 files changed

+28
-11
lines changed

2 files changed

+28
-11
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ public String toString() {
250250
private final AtomicReference<Assignment> reconciledAssignment = new AtomicReference<>(Assignment.EMPTY);
251251

252252
public StreamsRebalanceData(Map<String, Subtopology> subtopologies) {
253-
this.subtopologies = subtopologies;
253+
this.subtopologies = Map.copyOf(Objects.requireNonNull(subtopologies, "Subtopologies cannot be null"));
254254
}
255255

256256
public Map<String, Subtopology> subtopologies() {

clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java

+27-10
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import java.util.ArrayList;
77
import java.util.Collections;
8+
import java.util.HashMap;
89
import java.util.HashSet;
910
import java.util.List;
1011
import java.util.Map;
@@ -258,18 +259,34 @@ public void topicInfoShouldNotAcceptNulls() {
258259
}
259260

260261
@Test
261-
public void topicInfoShouldNotBeModifiable() {
262-
final StreamsRebalanceData.TopicInfo topicInfo = new StreamsRebalanceData.TopicInfo(
263-
Optional.of(1),
264-
Optional.of((short) 1),
265-
Map.of("key1", "value1")
266-
.entrySet().stream()
267-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
268-
);
262+
public void streamsRebalanceDataShouldNotHaveModifiableSubtopologies() {
263+
final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(new HashMap<>());
269264

270265
assertThrows(
271266
UnsupportedOperationException.class,
272-
() -> topicInfo.topicConfigs().put("key2", "value2")
267+
() -> streamsRebalanceData.subtopologies().put("subtopologyId2", new StreamsRebalanceData.Subtopology(
268+
Set.of(),
269+
Set.of(),
270+
Map.of(),
271+
Map.of(),
272+
List.of()
273+
))
273274
);
274275
}
275-
}
276+
277+
@Test
278+
public void streamsRebalanceDataShouldNotAcceptNulls() {
279+
final Exception exception = assertThrows(
280+
NullPointerException.class,
281+
() -> new StreamsRebalanceData(null)
282+
);
283+
assertEquals("Subtopologies cannot be null", exception.getMessage());
284+
}
285+
286+
@Test
287+
public void streamsRebalanceDataShouldBeConstructedWithEmptyAssignment() {
288+
final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(new HashMap<>());
289+
290+
assertEquals(StreamsRebalanceData.Assignment.EMPTY, streamsRebalanceData.reconciledAssignment());
291+
}
292+
}

0 commit comments

Comments
 (0)