From 1f0af912019a68966a36097edb7233dca2fbc2ab Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Mon, 17 Mar 2025 16:33:25 +0100 Subject: [PATCH 1/3] KAFKA-19001: Use streams group-level configurations in heartbeat Implements the use of session timeout, standby tasks and heartbeat interval configurations in the streams group heartbeat. Piggy-backed is another test that streams groups react to changes in the topic metadata --- .../group/GroupMetadataManager.java | 35 ++- .../group/GroupMetadataManagerTest.java | 269 +++++++++++++++++- .../group/streams/MockTaskAssignor.java | 6 + 3 files changed, 306 insertions(+), 4 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 73aa43eda6fdb..af2434aa01c08 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1812,6 +1812,27 @@ private void throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri } } + /** + * Checks whether the streams group can accept a new member or not based on the + * max group size defined. + * + * @param group The streams group. + * @param memberId The member id. + * + * @throws GroupMaxSizeReachedException if the maximum capacity has been reached. + */ + private void throwIfStreamsGroupIsFull( + StreamsGroup group, + String memberId + ) throws GroupMaxSizeReachedException { + // If the streams group has reached its maximum capacity, the member is rejected if it is not + // already a member of the streams group. + if (group.numMembers() >= config.streamsGroupMaxSize() && (memberId.isEmpty() || !group.hasMember(memberId))) { + throw new GroupMaxSizeReachedException("The streams group has reached its maximum capacity of " + + config.streamsGroupMaxSize() + " members."); + } + } + /** * Validates the member epoch provided in the heartbeat request. * @@ -2079,6 +2100,7 @@ private CoordinatorResult stream // Get or create the streams group. boolean isJoining = memberEpoch == 0; final StreamsGroup group = isJoining ? getOrCreateStreamsGroup(groupId) : getStreamsGroupOrThrow(groupId); + throwIfStreamsGroupIsFull(group, memberId); // Get or create the member. StreamsGroupMember member; @@ -8127,14 +8149,18 @@ private int shareGroupHeartbeatIntervalMs(String groupId) { * Get the session timeout of the provided streams group. */ private int streamsGroupSessionTimeoutMs(String groupId) { - return 45000; + Optional groupConfig = groupConfigManager.groupConfig(groupId); + return groupConfig.map(GroupConfig::streamsSessionTimeoutMs) + .orElse(config.streamsGroupSessionTimeoutMs()); } /** * Get the heartbeat interval of the provided streams group. */ private int streamsGroupHeartbeatIntervalMs(String groupId) { - return 5000; + Optional groupConfig = groupConfigManager.groupConfig(groupId); + return groupConfig.map(GroupConfig::streamsHeartbeatIntervalMs) + .orElse(config.streamsGroupHeartbeatIntervalMs()); } /** @@ -8148,7 +8174,10 @@ private TaskAssignor streamsGroupAssignor(String groupId) { * Get the assignor of the provided streams group. */ private Map streamsGroupAssignmentConfigs(String groupId) { - return Map.of("group.streams.num.standby.replicas", "0"); + Optional groupConfig = groupConfigManager.groupConfig(groupId); + final Integer numStandbyReplicas = groupConfig.map(GroupConfig::streamsNumStandbyReplicas) + .orElse(config.streamsGroupNumStandbyReplicas()); + return Map.of("num.standby.replicas", numStandbyReplicas.toString()); } /** diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 94464eb42fdcf..9afa6c72fac6e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -174,6 +174,9 @@ import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG; import static org.apache.kafka.coordinator.group.GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG; import static org.apache.kafka.coordinator.group.GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord; import static org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT; import static org.apache.kafka.coordinator.group.GroupMetadataManager.appendGroupMetadataErrorToResponseError; @@ -4069,7 +4072,7 @@ private StreamsGroupMember.Builder streamsGroupMemberBuilderWithDefaults(String .setProcessId(DEFAULT_PROCESS_ID) .setUserEndpoint(null); } - + @Test public void testGenerateRecordsOnNewClassicGroup() throws Exception { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() @@ -15776,6 +15779,76 @@ public void testStreamsGroupMemberEpochValidation() { assertEquals(100, result.response().data().memberEpoch()); } + @Test + public void testStreamsNewMemberIsRejectedWithMaximumMembersIsReached() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String memberId3 = Uuid.randomUuid().toString(); + + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String subtopology2 = "subtopology2"; + String barTopicName = "bar"; + Uuid barTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)), + new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName)) + )); + + // Create a context with one streams group containing two members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG, 2) + .withStreamsGroup(new StreamsGroupBuilder(groupId, 10) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1))) + .build()) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 2))) + .build()) + .withTargetAssignment(memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1))) + .withTargetAssignment(memberId2, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 2))) + .withTargetAssignmentEpoch(10) + .withTopology(StreamsTopology.fromHeartbeatRequest(topology)) + .withPartitionMetadata(Map.of( + fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), + barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3) + )) + ) + .build(); + + assertThrows(GroupMaxSizeReachedException.class, () -> + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId3) + .setMemberEpoch(0) + .setProcessId("process-id") + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + )); + } + @Test public void testMemberJoinsEmptyStreamsGroup() { String groupId = "fooup"; @@ -17848,6 +17921,105 @@ public void testStreamsRebalanceTimeoutExpiration() { context.assertNoRebalanceTimeout(groupId, memberId1); } + @Test + public void testStreamsOnNewMetadataImage() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder().build(); + String subtopology1 = "subtopology1"; + + // Topology of group 1 uses a and b. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group1", + new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1) + .setSourceTopics(List.of("a")) + .setRepartitionSourceTopics(List.of(new TopicInfo().setName("b")) + )) + ))); + + // Topology of group 2 uses b and c. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group2", + new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1) + .setSourceTopics(List.of("b")) + .setStateChangelogTopics(List.of(new TopicInfo().setName("c"))) + )) + )); + + // Topology of group 3 uses d. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group3", + new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1) + .setSourceTopics(List.of("d")) + )) + )); + + // Topology of group 4 subscribes to e. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group4", + new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1) + .setSourceTopics(List.of("e")) + )) + )); + + // Topology of group 5 subscribes to f. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group5", + new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1) + .setSourceTopics(List.of("f")) + )) + )); + + // Ensures that all refresh flags are set to the future. + List.of("group1", "group2", "group3", "group4", "group5").forEach(groupId -> { + StreamsGroup group = context.groupMetadataManager.streamsGroup(groupId); + group.setMetadataRefreshDeadline(context.time.milliseconds() + 5000L, 0); + assertFalse(group.hasMetadataExpired(context.time.milliseconds())); + }); + + // Update the metadata image. + Uuid topicA = Uuid.randomUuid(); + Uuid topicB = Uuid.randomUuid(); + Uuid topicC = Uuid.randomUuid(); + Uuid topicD = Uuid.randomUuid(); + Uuid topicE = Uuid.randomUuid(); + + // Create a first base image with topic a, b, c and d. + MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); + delta.replay(new TopicRecord().setTopicId(topicA).setName("a")); + delta.replay(new PartitionRecord().setTopicId(topicA).setPartitionId(0)); + delta.replay(new TopicRecord().setTopicId(topicB).setName("b")); + delta.replay(new PartitionRecord().setTopicId(topicB).setPartitionId(0)); + delta.replay(new TopicRecord().setTopicId(topicC).setName("c")); + delta.replay(new PartitionRecord().setTopicId(topicC).setPartitionId(0)); + delta.replay(new TopicRecord().setTopicId(topicD).setName("d")); + delta.replay(new PartitionRecord().setTopicId(topicD).setPartitionId(0)); + MetadataImage image = delta.apply(MetadataProvenance.EMPTY); + + // Create a delta which updates topic B, deletes topic D and creates topic E. + delta = new MetadataDelta(image); + delta.replay(new PartitionRecord().setTopicId(topicB).setPartitionId(2)); + delta.replay(new RemoveTopicRecord().setTopicId(topicD)); + delta.replay(new TopicRecord().setTopicId(topicE).setName("e")); + delta.replay(new PartitionRecord().setTopicId(topicE).setPartitionId(1)); + image = delta.apply(MetadataProvenance.EMPTY); + + // Update metadata image with the delta. + context.groupMetadataManager.onNewMetadataImage(image, delta); + + // Verify the groups. + List.of("group1", "group2", "group3", "group4").forEach(groupId -> { + StreamsGroup group = context.groupMetadataManager.streamsGroup(groupId); + assertTrue(group.hasMetadataExpired(context.time.milliseconds()), groupId); + }); + + List.of("group5").forEach(groupId -> { + StreamsGroup group = context.groupMetadataManager.streamsGroup(groupId); + assertFalse(group.hasMetadataExpired(context.time.milliseconds())); + }); + + // Verify image. + assertEquals(image, context.groupMetadataManager.image()); + } + @Test public void testConsumerGroupDynamicConfigs() { String groupId = "fooup"; @@ -18070,6 +18242,101 @@ public void testShareGroupDynamicConfigs() { context.assertNoRebalanceTimeout(groupId, memberId); } + @Test + public void testStreamsGroupDynamicConfigs() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() + .build()) + .build(); + + assignor.prepareGroupAssignment( + Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))); + + // Session timer is scheduled on first heartbeat. + CoordinatorResult result = + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(10000) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + assertEquals(1, result.response().data().memberEpoch()); + assertEquals(Map.of("num.standby.replicas", "0"), assignor.lastPassedAssignmentConfigs()); + + // Verify heartbeat interval + assertEquals(5000, result.response().data().heartbeatIntervalMs()); + + // Verify that there is a session time. + context.assertSessionTimeout(groupId, memberId, 45000); + + // Advance time. + assertEquals( + List.of(), + context.sleep(result.response().data().heartbeatIntervalMs()) + ); + + // Dynamic update group config + Properties newGroupConfig = new Properties(); + newGroupConfig.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 50000); + newGroupConfig.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 10000); + newGroupConfig.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 2); + context.updateGroupConfig(groupId, newGroupConfig); + + // Session timer is rescheduled on second heartbeat, new assignment with new parameter is calculated. + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(result.response().data().memberEpoch()) + .setRackId("bla")); + assertEquals(2, result.response().data().memberEpoch()); + + // Verify heartbeat interval + assertEquals(10000, result.response().data().heartbeatIntervalMs()); + + // Verify that there is a session time. + context.assertSessionTimeout(groupId, memberId, 50000); + + // Verify that the new number of standby replicas is used + assertEquals(Map.of("num.standby.replicas", "2"), assignor.lastPassedAssignmentConfigs()); + + // Advance time. + assertEquals( + List.of(), + context.sleep(result.response().data().heartbeatIntervalMs()) + ); + + // Session timer is cancelled on leave. + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)); + assertEquals(LEAVE_GROUP_MEMBER_EPOCH, result.response().data().memberEpoch()); + + // Verify that there are no timers. + context.assertNoSessionTimeout(groupId, memberId); + context.assertNoRebalanceTimeout(groupId, memberId); + } + @Test public void testReplayConsumerGroupMemberMetadata() { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/MockTaskAssignor.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/MockTaskAssignor.java index ee38d6b130045..f2bde9f76fa13 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/MockTaskAssignor.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/MockTaskAssignor.java @@ -31,6 +31,7 @@ public class MockTaskAssignor implements TaskAssignor { private final String name; private GroupAssignment preparedGroupAssignment = null; + private Map assignmentConfigs = Map.of(); public MockTaskAssignor(String name) { this.name = name; @@ -52,6 +53,10 @@ public void prepareGroupAssignment(Map memberAssignments) { }))); } + public Map lastPassedAssignmentConfigs() { + return assignmentConfigs; + } + @Override public String name() { return name; @@ -60,6 +65,7 @@ public String name() { @Override public GroupAssignment assign(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber) throws TaskAssignorException { + assignmentConfigs = groupSpec.assignmentConfigs(); return preparedGroupAssignment; } } From 4b33e6cf40fb455d4f5d8925cac3e6d74d2f3b27 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Mon, 31 Mar 2025 11:52:17 +0200 Subject: [PATCH 2/3] comments --- .../coordinator/group/GroupMetadataManagerTest.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 9afa6c72fac6e..7305dfd50268a 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -17924,12 +17924,11 @@ public void testStreamsRebalanceTimeoutExpiration() { @Test public void testStreamsOnNewMetadataImage() { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder().build(); - String subtopology1 = "subtopology1"; // Topology of group 1 uses a and b. context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group1", new Topology().setSubtopologies(List.of( - new Subtopology().setSubtopologyId(subtopology1) + new Subtopology().setSubtopologyId("subtopology1") .setSourceTopics(List.of("a")) .setRepartitionSourceTopics(List.of(new TopicInfo().setName("b")) )) @@ -17938,7 +17937,7 @@ public void testStreamsOnNewMetadataImage() { // Topology of group 2 uses b and c. context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group2", new Topology().setSubtopologies(List.of( - new Subtopology().setSubtopologyId(subtopology1) + new Subtopology().setSubtopologyId("subtopology2") .setSourceTopics(List.of("b")) .setStateChangelogTopics(List.of(new TopicInfo().setName("c"))) )) @@ -17947,7 +17946,7 @@ public void testStreamsOnNewMetadataImage() { // Topology of group 3 uses d. context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group3", new Topology().setSubtopologies(List.of( - new Subtopology().setSubtopologyId(subtopology1) + new Subtopology().setSubtopologyId("subtopology3") .setSourceTopics(List.of("d")) )) )); @@ -17955,7 +17954,7 @@ public void testStreamsOnNewMetadataImage() { // Topology of group 4 subscribes to e. context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group4", new Topology().setSubtopologies(List.of( - new Subtopology().setSubtopologyId(subtopology1) + new Subtopology().setSubtopologyId("subtopology4") .setSourceTopics(List.of("e")) )) )); @@ -17963,7 +17962,7 @@ public void testStreamsOnNewMetadataImage() { // Topology of group 5 subscribes to f. context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group5", new Topology().setSubtopologies(List.of( - new Subtopology().setSubtopologyId(subtopology1) + new Subtopology().setSubtopologyId("subtopology5") .setSourceTopics(List.of("f")) )) )); From 982a5e4b1abf2af7f6b8ce442649eaaa66b12b7b Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Fri, 4 Apr 2025 12:09:07 +0200 Subject: [PATCH 3/3] comments --- .../group/GroupMetadataManager.java | 15 ++++--- .../group/GroupMetadataManagerTest.java | 39 +++---------------- 2 files changed, 14 insertions(+), 40 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index af2434aa01c08..6b6257179c719 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1817,17 +1817,15 @@ private void throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri * max group size defined. * * @param group The streams group. - * @param memberId The member id. * * @throws GroupMaxSizeReachedException if the maximum capacity has been reached. */ private void throwIfStreamsGroupIsFull( - StreamsGroup group, - String memberId + StreamsGroup group ) throws GroupMaxSizeReachedException { // If the streams group has reached its maximum capacity, the member is rejected if it is not // already a member of the streams group. - if (group.numMembers() >= config.streamsGroupMaxSize() && (memberId.isEmpty() || !group.hasMember(memberId))) { + if (group.numMembers() >= config.streamsGroupMaxSize()) { throw new GroupMaxSizeReachedException("The streams group has reached its maximum capacity of " + config.streamsGroupMaxSize() + " members."); } @@ -2099,8 +2097,13 @@ private CoordinatorResult stream // Get or create the streams group. boolean isJoining = memberEpoch == 0; - final StreamsGroup group = isJoining ? getOrCreateStreamsGroup(groupId) : getStreamsGroupOrThrow(groupId); - throwIfStreamsGroupIsFull(group, memberId); + StreamsGroup group; + if (isJoining) { + group = getOrCreateStreamsGroup(groupId); + throwIfStreamsGroupIsFull(group); + } else { + group = getStreamsGroupOrThrow(groupId); + } // Get or create the member. StreamsGroupMember member; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 7305dfd50268a..8cc3eb32671d3 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -15785,52 +15785,24 @@ public void testStreamsNewMemberIsRejectedWithMaximumMembersIsReached() { String memberId1 = Uuid.randomUuid().toString(); String memberId2 = Uuid.randomUuid().toString(); String memberId3 = Uuid.randomUuid().toString(); - - String subtopology1 = "subtopology1"; - String fooTopicName = "foo"; - Uuid fooTopicId = Uuid.randomUuid(); - String subtopology2 = "subtopology2"; - String barTopicName = "bar"; - Uuid barTopicId = Uuid.randomUuid(); - Topology topology = new Topology().setSubtopologies(List.of( - new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)), - new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName)) - )); + Topology topology = new Topology().setSubtopologies(List.of()); // Create a context with one streams group containing two members. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .build()) + .withMetadataImage(new MetadataImageBuilder().build()) .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG, 2) .withStreamsGroup(new StreamsGroupBuilder(groupId, 10) .withMember(streamsGroupMemberBuilderWithDefaults(memberId1) .setMemberEpoch(10) .setPreviousMemberEpoch(9) - .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, - TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2), - TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1))) .build()) .withMember(streamsGroupMemberBuilderWithDefaults(memberId2) .setMemberEpoch(10) .setPreviousMemberEpoch(9) - .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, - TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5), - TaskAssignmentTestUtil.mkTasks(subtopology2, 2))) .build()) - .withTargetAssignment(memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, - TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2), - TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1))) - .withTargetAssignment(memberId2, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, - TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5), - TaskAssignmentTestUtil.mkTasks(subtopology2, 2))) .withTargetAssignmentEpoch(10) .withTopology(StreamsTopology.fromHeartbeatRequest(topology)) - .withPartitionMetadata(Map.of( - fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, barTopicName, 3) - )) + .withPartitionMetadata(Map.of()) ) .build(); @@ -18281,10 +18253,10 @@ public void testStreamsGroupDynamicConfigs() { assertEquals(Map.of("num.standby.replicas", "0"), assignor.lastPassedAssignmentConfigs()); // Verify heartbeat interval - assertEquals(5000, result.response().data().heartbeatIntervalMs()); + assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, result.response().data().heartbeatIntervalMs()); // Verify that there is a session time. - context.assertSessionTimeout(groupId, memberId, 45000); + context.assertSessionTimeout(groupId, memberId, GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT); // Advance time. assertEquals( @@ -18306,7 +18278,6 @@ public void testStreamsGroupDynamicConfigs() { .setMemberId(memberId) .setMemberEpoch(result.response().data().memberEpoch()) .setRackId("bla")); - assertEquals(2, result.response().data().memberEpoch()); // Verify heartbeat interval assertEquals(10000, result.response().data().heartbeatIntervalMs());