File tree 1 file changed +4
-4
lines changed
clients/src/main/java/org/apache/kafka/clients/consumer/internals
1 file changed +4
-4
lines changed Original file line number Diff line number Diff line change @@ -916,8 +916,8 @@ private SortedSet<TopicPartition> topicPartitionsForActiveTasks(final Map<String
916
916
final SortedSet <TopicPartition > topicPartitions = new TreeSet <>(TOPIC_PARTITION_COMPARATOR );
917
917
activeTasks .forEach ((subtopologyId , partitionIds ) ->
918
918
Stream .concat (
919
- streamsRebalanceData .subtopologies ().get (subtopologyId ).sourceTopics .stream (),
920
- streamsRebalanceData .subtopologies ().get (subtopologyId ).repartitionSourceTopics .keySet ().stream ()
919
+ streamsRebalanceData .subtopologies ().get (subtopologyId ).sourceTopics () .stream (),
920
+ streamsRebalanceData .subtopologies ().get (subtopologyId ).repartitionSourceTopics () .keySet ().stream ()
921
921
).forEach (topic -> {
922
922
for (final int partitionId : partitionIds ) {
923
923
topicPartitions .add (new TopicPartition (topic , partitionId ));
@@ -931,8 +931,8 @@ private SortedSet<TopicPartition> topicPartitionsForActiveTasks(final SortedSet<
931
931
final SortedSet <TopicPartition > topicPartitions = new TreeSet <>(TOPIC_PARTITION_COMPARATOR );
932
932
activeTasks .forEach (task ->
933
933
Stream .concat (
934
- streamsRebalanceData .subtopologies ().get (task .subtopologyId ()).sourceTopics .stream (),
935
- streamsRebalanceData .subtopologies ().get (task .subtopologyId ()).repartitionSourceTopics .keySet ().stream ()
934
+ streamsRebalanceData .subtopologies ().get (task .subtopologyId ()).sourceTopics () .stream (),
935
+ streamsRebalanceData .subtopologies ().get (task .subtopologyId ()).repartitionSourceTopics () .keySet ().stream ()
936
936
).forEach (topic -> {
937
937
topicPartitions .add (new TopicPartition (topic , task .partitionId ()));
938
938
})
You can’t perform that action at this time.
0 commit comments