diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 73aa43eda6fdb..6b6257179c719 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1812,6 +1812,25 @@ private void throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri } } + /** + * Checks whether the streams group can accept a new member or not based on the + * max group size defined. + * + * @param group The streams group. + * + * @throws GroupMaxSizeReachedException if the maximum capacity has been reached. + */ + private void throwIfStreamsGroupIsFull( + StreamsGroup group + ) throws GroupMaxSizeReachedException { + // If the streams group has reached its maximum capacity, the member is rejected if it is not + // already a member of the streams group. + if (group.numMembers() >= config.streamsGroupMaxSize()) { + throw new GroupMaxSizeReachedException("The streams group has reached its maximum capacity of " + + config.streamsGroupMaxSize() + " members."); + } + } + /** * Validates the member epoch provided in the heartbeat request. * @@ -2078,7 +2097,13 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream // Get or create the streams group. boolean isJoining = memberEpoch == 0; - final StreamsGroup group = isJoining ? getOrCreateStreamsGroup(groupId) : getStreamsGroupOrThrow(groupId); + StreamsGroup group; + if (isJoining) { + group = getOrCreateStreamsGroup(groupId); + throwIfStreamsGroupIsFull(group); + } else { + group = getStreamsGroupOrThrow(groupId); + } // Get or create the member. StreamsGroupMember member; @@ -8127,14 +8152,18 @@ private int shareGroupHeartbeatIntervalMs(String groupId) { * Get the session timeout of the provided streams group. */ private int streamsGroupSessionTimeoutMs(String groupId) { - return 45000; + Optional<GroupConfig> groupConfig = groupConfigManager.groupConfig(groupId); + return groupConfig.map(GroupConfig::streamsSessionTimeoutMs) + .orElse(config.streamsGroupSessionTimeoutMs()); } /** * Get the heartbeat interval of the provided streams group. */ private int streamsGroupHeartbeatIntervalMs(String groupId) { - return 5000; + Optional<GroupConfig> groupConfig = groupConfigManager.groupConfig(groupId); + return groupConfig.map(GroupConfig::streamsHeartbeatIntervalMs) + .orElse(config.streamsGroupHeartbeatIntervalMs()); } /** @@ -8148,7 +8177,10 @@ private TaskAssignor streamsGroupAssignor(String groupId) { * Get the assignor of the provided streams group. */ private Map<String, String> streamsGroupAssignmentConfigs(String groupId) { - return Map.of("group.streams.num.standby.replicas", "0"); + Optional<GroupConfig> groupConfig = groupConfigManager.groupConfig(groupId); + final Integer numStandbyReplicas = groupConfig.map(GroupConfig::streamsNumStandbyReplicas) + .orElse(config.streamsGroupNumStandbyReplicas()); + return Map.of("num.standby.replicas", numStandbyReplicas.toString()); } /** diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 94464eb42fdcf..8cc3eb32671d3 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -174,6 +174,9 @@ import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG; import static org.apache.kafka.coordinator.group.GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG; import static org.apache.kafka.coordinator.group.GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord; import static org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT; import static org.apache.kafka.coordinator.group.GroupMetadataManager.appendGroupMetadataErrorToResponseError; @@ -4069,7 +4072,7 @@ private StreamsGroupMember.Builder streamsGroupMemberBuilderWithDefaults(String .setProcessId(DEFAULT_PROCESS_ID) .setUserEndpoint(null); } - + @Test public void testGenerateRecordsOnNewClassicGroup() throws Exception { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() @@ -15776,6 +15779,48 @@ public void testStreamsGroupMemberEpochValidation() { assertEquals(100, result.response().data().memberEpoch()); } + @Test + public void testStreamsNewMemberIsRejectedWithMaximumMembersIsReached() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String memberId3 = Uuid.randomUuid().toString(); + Topology topology = new Topology().setSubtopologies(List.of()); + + // Create a context with one streams group containing two members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withMetadataImage(new MetadataImageBuilder().build()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG, 2) + .withStreamsGroup(new StreamsGroupBuilder(groupId, 10) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .build()) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .build()) + .withTargetAssignmentEpoch(10) + .withTopology(StreamsTopology.fromHeartbeatRequest(topology)) + .withPartitionMetadata(Map.of()) + ) + .build(); + + assertThrows(GroupMaxSizeReachedException.class, () -> + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId3) + .setMemberEpoch(0) + .setProcessId("process-id") + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + )); + } + @Test public void testMemberJoinsEmptyStreamsGroup() { String groupId = "fooup"; @@ -17848,6 +17893,104 @@ public void testStreamsRebalanceTimeoutExpiration() { context.assertNoRebalanceTimeout(groupId, memberId1); } + @Test + public void testStreamsOnNewMetadataImage() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder().build(); + + // Topology of group 1 uses a and b. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group1", + new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId("subtopology1") + .setSourceTopics(List.of("a")) + .setRepartitionSourceTopics(List.of(new TopicInfo().setName("b")) + )) + ))); + + // Topology of group 2 uses b and c. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group2", + new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId("subtopology2") + .setSourceTopics(List.of("b")) + .setStateChangelogTopics(List.of(new TopicInfo().setName("c"))) + )) + )); + + // Topology of group 3 uses d. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group3", + new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId("subtopology3") + .setSourceTopics(List.of("d")) + )) + )); + + // Topology of group 4 subscribes to e. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group4", + new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId("subtopology4") + .setSourceTopics(List.of("e")) + )) + )); + + // Topology of group 5 subscribes to f. + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord("group5", + new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId("subtopology5") + .setSourceTopics(List.of("f")) + )) + )); + + // Ensures that all refresh flags are set to the future. + List.of("group1", "group2", "group3", "group4", "group5").forEach(groupId -> { + StreamsGroup group = context.groupMetadataManager.streamsGroup(groupId); + group.setMetadataRefreshDeadline(context.time.milliseconds() + 5000L, 0); + assertFalse(group.hasMetadataExpired(context.time.milliseconds())); + }); + + // Update the metadata image. + Uuid topicA = Uuid.randomUuid(); + Uuid topicB = Uuid.randomUuid(); + Uuid topicC = Uuid.randomUuid(); + Uuid topicD = Uuid.randomUuid(); + Uuid topicE = Uuid.randomUuid(); + + // Create a first base image with topic a, b, c and d. + MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); + delta.replay(new TopicRecord().setTopicId(topicA).setName("a")); + delta.replay(new PartitionRecord().setTopicId(topicA).setPartitionId(0)); + delta.replay(new TopicRecord().setTopicId(topicB).setName("b")); + delta.replay(new PartitionRecord().setTopicId(topicB).setPartitionId(0)); + delta.replay(new TopicRecord().setTopicId(topicC).setName("c")); + delta.replay(new PartitionRecord().setTopicId(topicC).setPartitionId(0)); + delta.replay(new TopicRecord().setTopicId(topicD).setName("d")); + delta.replay(new PartitionRecord().setTopicId(topicD).setPartitionId(0)); + MetadataImage image = delta.apply(MetadataProvenance.EMPTY); + + // Create a delta which updates topic B, deletes topic D and creates topic E. + delta = new MetadataDelta(image); + delta.replay(new PartitionRecord().setTopicId(topicB).setPartitionId(2)); + delta.replay(new RemoveTopicRecord().setTopicId(topicD)); + delta.replay(new TopicRecord().setTopicId(topicE).setName("e")); + delta.replay(new PartitionRecord().setTopicId(topicE).setPartitionId(1)); + image = delta.apply(MetadataProvenance.EMPTY); + + // Update metadata image with the delta. + context.groupMetadataManager.onNewMetadataImage(image, delta); + + // Verify the groups. + List.of("group1", "group2", "group3", "group4").forEach(groupId -> { + StreamsGroup group = context.groupMetadataManager.streamsGroup(groupId); + assertTrue(group.hasMetadataExpired(context.time.milliseconds()), groupId); + }); + + List.of("group5").forEach(groupId -> { + StreamsGroup group = context.groupMetadataManager.streamsGroup(groupId); + assertFalse(group.hasMetadataExpired(context.time.milliseconds())); + }); + + // Verify image. + assertEquals(image, context.groupMetadataManager.image()); + } + @Test public void testConsumerGroupDynamicConfigs() { String groupId = "fooup"; @@ -18070,6 +18213,100 @@ public void testShareGroupDynamicConfigs() { context.assertNoRebalanceTimeout(groupId, memberId); } + @Test + public void testStreamsGroupDynamicConfigs() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() + .build()) + .build(); + + assignor.prepareGroupAssignment( + Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))); + + // Session timer is scheduled on first heartbeat. + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(10000) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + assertEquals(1, result.response().data().memberEpoch()); + assertEquals(Map.of("num.standby.replicas", "0"), assignor.lastPassedAssignmentConfigs()); + + // Verify heartbeat interval + assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, result.response().data().heartbeatIntervalMs()); + + // Verify that there is a session time. + context.assertSessionTimeout(groupId, memberId, GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT); + + // Advance time. + assertEquals( + List.of(), + context.sleep(result.response().data().heartbeatIntervalMs()) + ); + + // Dynamic update group config + Properties newGroupConfig = new Properties(); + newGroupConfig.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 50000); + newGroupConfig.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 10000); + newGroupConfig.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 2); + context.updateGroupConfig(groupId, newGroupConfig); + + // Session timer is rescheduled on second heartbeat, new assignment with new parameter is calculated. + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(result.response().data().memberEpoch()) + .setRackId("bla")); + + // Verify heartbeat interval + assertEquals(10000, result.response().data().heartbeatIntervalMs()); + + // Verify that there is a session time. + context.assertSessionTimeout(groupId, memberId, 50000); + + // Verify that the new number of standby replicas is used + assertEquals(Map.of("num.standby.replicas", "2"), assignor.lastPassedAssignmentConfigs()); + + // Advance time. + assertEquals( + List.of(), + context.sleep(result.response().data().heartbeatIntervalMs()) + ); + + // Session timer is cancelled on leave. + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)); + assertEquals(LEAVE_GROUP_MEMBER_EPOCH, result.response().data().memberEpoch()); + + // Verify that there are no timers. + context.assertNoSessionTimeout(groupId, memberId); + context.assertNoRebalanceTimeout(groupId, memberId); + } + @Test public void testReplayConsumerGroupMemberMetadata() { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/MockTaskAssignor.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/MockTaskAssignor.java index ee38d6b130045..f2bde9f76fa13 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/MockTaskAssignor.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/MockTaskAssignor.java @@ -31,6 +31,7 @@ public class MockTaskAssignor implements TaskAssignor { private final String name; private GroupAssignment preparedGroupAssignment = null; + private Map<String, String> assignmentConfigs = Map.of(); public MockTaskAssignor(String name) { this.name = name; @@ -52,6 +53,10 @@ public void prepareGroupAssignment(Map<String, TasksTuple> memberAssignments) { }))); } + public Map<String, String> lastPassedAssignmentConfigs() { + return assignmentConfigs; + } + @Override public String name() { return name; @@ -60,6 +65,7 @@ public String name() { @Override public GroupAssignment assign(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber) throws TaskAssignorException { + assignmentConfigs = groupSpec.assignmentConfigs(); return preparedGroupAssignment; } }