Skip to content

KAFKA-18925: Add streams groups support to Admin.listGroups #19155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions clients/src/main/java/org/apache/kafka/common/GroupState.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,18 @@
* The following table shows the correspondence between the group states and types.
* <table>
* <thead>
* <tr><th>State</th><th>Classic group</th><th>Consumer group</th><th>Share group</th></tr>
* <tr><th>State</th><th>Classic group</th><th>Consumer group</th><th>Share group</th><th>Streams group</th></tr>
* </thead>
* <tbody>
* <tr><td>UNKNOWN</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
* <tr><td>PREPARING_REBALANCE</td><td>Yes</td><td>Yes</td><td></td></tr>
* <tr><td>COMPLETING_REBALANCE</td><td>Yes</td><td>Yes</td><td></td></tr>
* <tr><td>STABLE</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
* <tr><td>DEAD</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
* <tr><td>EMPTY</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
* <tr><td>ASSIGNING</td><td></td><td>Yes</td><td></td></tr>
* <tr><td>RECONCILING</td><td></td><td>Yes</td><td></td></tr>
* <tr><td>UNKNOWN</td><td>Yes</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
* <tr><td>PREPARING_REBALANCE</td><td>Yes</td><td>Yes</td><td></td><td></td></tr>
* <tr><td>COMPLETING_REBALANCE</td><td>Yes</td><td>Yes</td><td></td><td></td></tr>
* <tr><td>STABLE</td><td>Yes</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
* <tr><td>DEAD</td><td>Yes</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
* <tr><td>EMPTY</td><td>Yes</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
* <tr><td>ASSIGNING</td><td></td><td>Yes</td><td></td><td>Yes</td></tr>
* <tr><td>RECONCILING</td><td></td><td>Yes</td><td></td><td>Yes</td></tr>
* <tr><td>NOT_READY</td><td></td><td></td><td></td><td>Yes</td></tr>
* </tbody>
* </table>
*/
Expand All @@ -55,7 +56,8 @@ public enum GroupState {
DEAD("Dead"),
EMPTY("Empty"),
ASSIGNING("Assigning"),
RECONCILING("Reconciling");
RECONCILING("Reconciling"),
NOT_READY("NotReady");

private static final Map<String, GroupState> NAME_TO_ENUM = Arrays.stream(values())
.collect(Collectors.toMap(state -> state.name.toUpperCase(Locale.ROOT), Function.identity()));
Expand All @@ -79,6 +81,8 @@ public static Set<GroupState> groupStatesForType(GroupType type) {
return Set.of(PREPARING_REBALANCE, COMPLETING_REBALANCE, STABLE, DEAD, EMPTY);
} else if (type == GroupType.CONSUMER) {
return Set.of(PREPARING_REBALANCE, COMPLETING_REBALANCE, STABLE, DEAD, EMPTY, ASSIGNING, RECONCILING);
} else if (type == GroupType.STREAMS) {
return Set.of(STABLE, DEAD, EMPTY, ASSIGNING, RECONCILING, NOT_READY);
} else if (type == GroupType.SHARE) {
return Set.of(STABLE, DEAD, EMPTY);
} else {
Expand Down
3 changes: 2 additions & 1 deletion clients/src/main/java/org/apache/kafka/common/GroupType.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public enum GroupType {
UNKNOWN("Unknown"),
CONSUMER("Consumer"),
CLASSIC("Classic"),
SHARE("Share");
SHARE("Share"),
STREAMS("Streams");

private static final Map<String, GroupType> NAME_TO_ENUM = Arrays.stream(values())
.collect(Collectors.toMap(type -> type.name.toLowerCase(Locale.ROOT), Function.identity()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData.ListedGroup;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
Expand Down Expand Up @@ -5991,6 +5992,189 @@ public void testDescribeMultipleStreamsGroups() {
}
}

@Test
public void testListStreamsGroups() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0),
AdminClientConfig.RETRIES_CONFIG, "2")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());

// Empty metadata response should be retried
env.kafkaClient().prepareResponse(
RequestTestUtils.metadataResponse(
Collections.emptyList(),
env.cluster().clusterResource().clusterId(),
-1,
Collections.emptyList()));

env.kafkaClient().prepareResponse(
RequestTestUtils.metadataResponse(
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
env.cluster().controller().id(),
Collections.emptyList()));

env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(singletonList(
new ListedGroup()
.setGroupId("streams-group-1")
.setGroupType(GroupType.STREAMS.toString())
.setGroupState("Stable")
))),
env.cluster().nodeById(0));

// handle retriable errors
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setGroups(Collections.emptyList())
),
env.cluster().nodeById(1));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
.setGroups(Collections.emptyList())
),
env.cluster().nodeById(1));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(Arrays.asList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("streams-group-2")
.setGroupType(GroupType.STREAMS.toString())
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("streams-group-3")
.setGroupType(GroupType.STREAMS.toString())
.setGroupState("Stable")
))),
env.cluster().nodeById(1));

env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(singletonList(
new ListedGroup()
.setGroupId("streams-group-4")
.setGroupType(GroupType.STREAMS.toString())
.setGroupState("Stable")
))),
env.cluster().nodeById(2));

// fatal error
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setGroups(Collections.emptyList())),
env.cluster().nodeById(3));

final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
TestUtils.assertFutureThrows(UnknownServerException.class, result.all());

Collection<GroupListing> listings = result.valid().get();
assertEquals(4, listings.size());

Set<String> groupIds = new HashSet<>();
for (GroupListing listing : listings) {
groupIds.add(listing.groupId());
assertTrue(listing.groupState().isPresent());
}

assertEquals(Set.of("streams-group-1", "streams-group-2", "streams-group-3", "streams-group-4"), groupIds);
assertEquals(1, result.errors().get().size());
}
}

@Test
public void testListStreamsGroupsMetadataFailure() throws Exception {
final Cluster cluster = mockCluster(3, 0);
final Time time = new MockTime();

try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
AdminClientConfig.RETRIES_CONFIG, "0")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());

// Empty metadata causes the request to fail since we have no list of brokers
// to send the ListGroups requests to
env.kafkaClient().prepareResponse(
RequestTestUtils.metadataResponse(
Collections.emptyList(),
env.cluster().clusterResource().clusterId(),
-1,
Collections.emptyList()));

final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
TestUtils.assertFutureThrows(KafkaException.class, result.all());
}
}

@Test
public void testListStreamsGroupsWithStates() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());

env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));

env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(Arrays.asList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("streams-group-1")
.setGroupType(GroupType.STREAMS.toString())
.setProtocolType("streams")
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("streams-group-2")
.setGroupType(GroupType.STREAMS.toString())
.setProtocolType("streams")
.setGroupState("NotReady")))),
env.cluster().nodeById(0));

final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
Collection<GroupListing> listings = result.valid().get();

assertEquals(2, listings.size());
List<GroupListing> expected = new ArrayList<>();
expected.add(new GroupListing("streams-group-1", Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)));
expected.add(new GroupListing("streams-group-2", Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.NOT_READY)));
assertEquals(expected, listings);
assertEquals(0, result.errors().get().size());
}
}

@Test
public void testListStreamsGroupsWithStatesOlderBrokerVersion() {
ApiVersion listGroupV4 = new ApiVersion()
.setApiKey(ApiKeys.LIST_GROUPS.id)
.setMinVersion((short) 0)
.setMaxVersion((short) 4);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4)));

env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));

// Check we should not be able to list streams groups with broker having version < 5
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(Collections.singletonList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("streams-group-1")))),
env.cluster().nodeById(0));
ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all());
}
}

@Test
public void testDescribeShareGroups() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
Expand Down
Loading