diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 63be7d1c5ebb8..73aa43eda6fdb 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -2273,25 +2273,27 @@ private StreamsTopology maybeUpdateTopology(final String groupId, final Topology topology, final StreamsGroup group, final List records) { - StreamsTopology updatedTopology; if (topology != null) { - StreamsGroupTopologyValue recordValue = convertToStreamsGroupTopologyRecord(topology); - - updatedTopology = StreamsTopology.fromHeartbeatRequest(topology); - + StreamsTopology streamsTopologyFromRequest = StreamsTopology.fromHeartbeatRequest(topology); if (group.topology().isEmpty()) { log.info("[GroupId {}][MemberId {}] Member initialized the topology with epoch {}", groupId, memberId, topology.epoch()); - + StreamsGroupTopologyValue recordValue = convertToStreamsGroupTopologyRecord(topology); records.add(newStreamsGroupTopologyRecord(groupId, recordValue)); - } else if (!updatedTopology.equals(group.topology().get())) { + return streamsTopologyFromRequest; + } else if (group.topology().get().topologyEpoch() > topology.epoch()) { + log.info("[GroupId {}][MemberId {}] Member joined with stale topology epoch {}", groupId, memberId, topology.epoch()); + return group.topology().get(); + } else if (!group.topology().get().equals(streamsTopologyFromRequest)) { throw new InvalidRequestException("Topology updates are not supported yet."); + } else { + log.debug("[GroupId {}][MemberId {}] Member joined with currently initialized topology {}", groupId, memberId, topology.epoch()); + return group.topology().get(); } } else if (group.topology().isPresent()) { - updatedTopology = group.topology().get(); + return group.topology().get(); } else { throw new IllegalStateException("The topology is null and the group topology is also null."); } - return updatedTopology; } private List createStreamsGroupHeartbeatResponseTaskIds(final Map> taskIds) { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java index 71f31244d0091..1d14d9a8477d6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java @@ -156,7 +156,6 @@ private static Map decidePartitionCounts(final LogContext logCo enforceCopartitioning( topology, copartitionGroupsBySubtopology, - log, decidedPartitionCountsForInternalTopics, copartitionedTopicsEnforcer ); @@ -168,7 +167,6 @@ private static Map decidePartitionCounts(final LogContext logCo private static void enforceCopartitioning(final StreamsTopology topology, final Map>> copartitionGroupsBySubtopology, - final Logger log, final Map decidedPartitionCountsForInternalTopics, final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer) { final Set fixedRepartitionTopics = @@ -180,17 +178,13 @@ private static void enforceCopartitioning(final StreamsTopology topology, x.repartitionSourceTopics().stream().filter(y -> y.partitions() == 0) ).map(StreamsGroupTopologyValue.TopicInfo::name).collect(Collectors.toSet()); - if (fixedRepartitionTopics.isEmpty() && flexibleRepartitionTopics.isEmpty()) { - log.info("Skipping the repartition topic validation since there are no repartition topics."); - } else { - // ensure the co-partitioning topics within the group have the same number of partitions, - // and enforce the number of partitions for those repartition topics to be the same if they - // are co-partitioned as well. - for (Collection> copartitionGroups : copartitionGroupsBySubtopology.values()) { - for (Set copartitionGroup : copartitionGroups) { - decidedPartitionCountsForInternalTopics.putAll( - copartitionedTopicsEnforcer.enforce(copartitionGroup, fixedRepartitionTopics, flexibleRepartitionTopics)); - } + // ensure the co-partitioning topics within the group have the same number of partitions, + // and enforce the number of partitions for those repartition topics to be the same if they + // are co-partitioned as well. + for (Collection> copartitionGroups : copartitionGroupsBySubtopology.values()) { + for (Set copartitionGroup : copartitionGroups) { + decidedPartitionCountsForInternalTopics.putAll( + copartitionedTopicsEnforcer.enforce(copartitionGroup, fixedRepartitionTopics, flexibleRepartitionTopics)); } } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 4567cdd766af5..1e62be05753fc 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -42,6 +42,7 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.message.ConsumerProtocolAssignment; import org.apache.kafka.common.message.ConsumerProtocolSubscription; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.HeartbeatRequestData; import org.apache.kafka.common.message.HeartbeatResponseData; @@ -59,7 +60,9 @@ import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.CopartitionGroup; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.Subtopology; +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.TopicInfo; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.Topology; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; @@ -70,6 +73,7 @@ import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; @@ -15867,10 +15871,348 @@ public void testMemberJoinsEmptyStreamsGroup() { } @Test - public void testStreamsUpdatingMemberMetadataTriggersNewTargetAssignment() { + public void testStreamsGroupMemberJoiningWithMissingSourceTopic() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String subtopology2 = "subtopology2"; + String barTopicName = "bar"; + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)), + new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .build(); + + // Member joins the streams group. + CoordinatorResult result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setProcessId(DEFAULT_PROCESS_ID) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertEquals( + Map.of(), + result.response().creatableTopics() + ); + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(Status.MISSING_SOURCE_TOPICS.code()) + .setStatusDetail("Source topics bar are missing."))), + result.response().data() + ); + + StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(1500) + .build(); + + List expectedRecords = List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), + StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology), + StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, + Map.of( + fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6) + ) + ), + StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } + + @Test + public void testStreamsGroupMemberJoiningWithMissingInternalTopic() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology() + .setSubtopologyId(subtopology1) + .setSourceTopics(List.of(fooTopicName)) + .setStateChangelogTopics(List.of(new TopicInfo().setName(barTopicName))) + ) + ); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .build(); + + // Member joins the streams group. + CoordinatorResult result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setProcessId(DEFAULT_PROCESS_ID) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertEquals( + Map.of(barTopicName, + new CreatableTopic() + .setName(barTopicName) + .setNumPartitions(6) + .setReplicationFactor((short) -1) + ), + result.response().creatableTopics() + ); + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(Status.MISSING_INTERNAL_TOPICS.code()) + .setStatusDetail("Internal topics are missing: [bar]"))), + result.response().data() + ); + + StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(1500) + .build(); + + List expectedRecords = List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), + StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology), + StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of( + fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6) + )), + StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } + + @Test + public void testStreamsGroupMemberJoiningWithIncorrectlyPartitionedTopic() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + Uuid barTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology() + .setSubtopologyId(subtopology1) + .setSourceTopics(List.of(fooTopicName, barTopicName)) + .setCopartitionGroups(List.of(new CopartitionGroup().setSourceTopics(List.of((short) 0, (short) 1)))) + ) + ); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build()) + .build(); + + // Member joins the streams group. + CoordinatorResult result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setProcessId(DEFAULT_PROCESS_ID) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertEquals( + Map.of(), + result.response().creatableTopics() + ); + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(Status.INCORRECTLY_PARTITIONED_TOPICS.code()) + .setStatusDetail("Following topics do not have the same number of partitions: [{bar=3, foo=6}]"))), + result.response().data() + ); + + StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(1500) + .build(); + + List expectedRecords = List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), + StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology), + StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of( + fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), + barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3) + )), + StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } + + @Test + public void testStreamsGroupMemberJoiningWithStaleTopology() { String groupId = "fooup"; String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + Uuid barTopicId = Uuid.randomUuid(); + Topology topology0 = new Topology().setEpoch(0).setSubtopologies(List.of( + new Subtopology() + .setSubtopologyId(subtopology1) + .setSourceTopics(List.of(fooTopicName)) + ) + ); + Topology topology1 = new Topology().setEpoch(1).setSubtopologies(List.of( + new Subtopology() + .setSubtopologyId(subtopology1) + .setSourceTopics(List.of(fooTopicName, barTopicName)) + ) + ); + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build()) + .withStreamsGroup( + new StreamsGroupBuilder(groupId, 10) + .withTopology(StreamsTopology.fromHeartbeatRequest(topology1)) + ) + .build(); + + assignor.prepareGroupAssignment(new org.apache.kafka.coordinator.group.streams.assignor.GroupAssignment(Map.of( + memberId, org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment.empty() + ))); + + // Member joins the streams group. + CoordinatorResult result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology0) + .setProcessId(DEFAULT_PROCESS_ID) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertEquals( + Map.of(), + result.response().creatableTopics() + ); + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(Status.STALE_TOPOLOGY.code()) + .setStatusDetail("The member's topology epoch 0 is behind the group's topology epoch 1."))), + result.response().data() + ); + + StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(11) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(1500) + .build(); + + List expectedRecords = List.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), + StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, Map.of( + fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), + barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3) + )), + StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 11), + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } + + @Test + public void testStreamsUpdatingMemberMetadataTriggersNewTargetAssignment() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); String subtopology1 = "subtopology1"; String fooTopicName = "foo"; Uuid fooTopicId = Uuid.randomUuid(); @@ -16074,7 +16416,6 @@ public void testStreamsNewJoiningMemberTriggersNewTargetAssignment() { String memberId1 = Uuid.randomUuid().toString(); String memberId2 = Uuid.randomUuid().toString(); String memberId3 = Uuid.randomUuid().toString(); - String subtopology1 = "subtopology1"; String fooTopicName = "foo"; Uuid fooTopicId = Uuid.randomUuid(); @@ -16170,7 +16511,6 @@ public void testStreamsLeavingMemberRemovesMemberAndBumpsGroupEpoch() { String groupId = "fooup"; String memberId1 = Uuid.randomUuid().toString(); String memberId2 = Uuid.randomUuid().toString(); - String subtopology1 = "subtopology1"; String fooTopicName = "foo"; Uuid fooTopicId = Uuid.randomUuid(); @@ -16241,7 +16581,6 @@ public void testStreamsLeavingMemberRemovesMemberAndBumpsGroupEpoch() { public void testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() { String groupId = "fooup"; String memberId = Uuid.randomUuid().toString(); - String subtopology1 = "subtopology1"; String fooTopicName = "foo"; Uuid fooTopicId = Uuid.randomUuid(); @@ -16311,7 +16650,6 @@ public void testStreamsReconciliationProcess() { String memberId1 = Uuid.randomUuid().toString(); String memberId2 = Uuid.randomUuid().toString(); String memberId3 = Uuid.randomUuid().toString(); - String subtopology1 = "subtopology1"; String fooTopicName = "foo"; Uuid fooTopicId = Uuid.randomUuid(); @@ -16824,7 +17162,6 @@ public void testStreamsStreamsGroupStates() { public void testStreamsTaskAssignorExceptionOnRegularHeartbeat() { String groupId = "fooup"; String memberId1 = Uuid.randomUuid().toString(); - String subtopology1 = "subtopology1"; String fooTopicName = "foo"; Uuid fooTopicId = Uuid.randomUuid(); @@ -16867,7 +17204,6 @@ public void testStreamsTaskAssignorExceptionOnRegularHeartbeat() { public void testStreamsPartitionMetadataRefreshedAfterGroupIsLoaded() { String groupId = "fooup"; String memberId = Uuid.randomUuid().toString(); - String subtopology1 = "subtopology1"; String fooTopicName = "foo"; Uuid fooTopicId = Uuid.randomUuid(); @@ -16964,7 +17300,6 @@ public void testStreamsPartitionMetadataRefreshedAfterGroupIsLoaded() { public void testStreamsPartitionMetadataRefreshedAgainAfterWriteFailure() { String groupId = "fooup"; String memberId = Uuid.randomUuid().toString(); - String subtopology1 = "subtopology1"; String fooTopicName = "foo"; Uuid fooTopicId = Uuid.randomUuid(); @@ -17081,7 +17416,6 @@ public void testStreamsPartitionMetadataRefreshedAgainAfterWriteFailure() { public void testStreamsSessionTimeoutLifecycle() { String groupId = "fooup"; String memberId = Uuid.randomUuid().toString(); - String subtopology1 = "subtopology1"; String fooTopicName = "foo"; Uuid fooTopicId = Uuid.randomUuid(); @@ -17158,7 +17492,6 @@ public void testStreamsSessionTimeoutLifecycle() { public void testStreamsSessionTimeoutExpiration() { String groupId = "fooup"; String memberId = Uuid.randomUuid().toString(); - String subtopology1 = "subtopology1"; String fooTopicName = "foo"; Uuid fooTopicId = Uuid.randomUuid(); @@ -17224,7 +17557,6 @@ public void testStreamsRebalanceTimeoutLifecycle() { String groupId = "fooup"; String memberId1 = Uuid.randomUuid().toString(); String memberId2 = Uuid.randomUuid().toString(); - String subtopology1 = "subtopology1"; String fooTopicName = "foo"; Uuid fooTopicId = Uuid.randomUuid(); @@ -17381,7 +17713,6 @@ public void testStreamsRebalanceTimeoutExpiration() { String groupId = "fooup"; String memberId1 = Uuid.randomUuid().toString(); String memberId2 = Uuid.randomUuid().toString(); - String subtopology1 = "subtopology1"; String fooTopicName = "foo"; Uuid fooTopicId = Uuid.randomUuid();