Skip to content

Commit 017692e

Browse files
authored
KAFKA-18887: Implement Streams Admin APIs (#19049)
Implement Admin API extensions beyond list/describe group (delete group, offset-related APIs). * adds methods for describing and manipulating offsets, as described in KIP-1071 * adds corresponding unit tests These are doing the exact same thing as the corresponding consumer group counter-parts. Reviewers: Lucas Brutschy <[email protected]>
1 parent 3d7ac0c commit 017692e

16 files changed

+1460
-7
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/Admin.java

+97
Original file line numberDiff line numberDiff line change
@@ -940,6 +940,31 @@ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, List
940940
return listConsumerGroupOffsets(groupSpecs, new ListConsumerGroupOffsetsOptions());
941941
}
942942

943+
/**
944+
* List the Streams group offsets available in the cluster for the specified Streams groups.
945+
*
946+
* <em>Note</em>: this method effectively does the same as the corresponding consumer group method {@link Admin#listConsumerGroupOffsets} does.
947+
*
948+
* @param groupSpecs Map of Streams group ids to a spec that specifies the topic partitions of the group to list offsets for.
949+
*
950+
* @param options The options to use when listing the Streams group offsets.
951+
* @return The ListStreamsGroupOffsetsResult
952+
*/
953+
ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs, ListStreamsGroupOffsetsOptions options);
954+
955+
/**
956+
* List the Streams group offsets available in the cluster for the specified groups with the default options.
957+
* <p>
958+
* This is a convenience method for
959+
* {@link #listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)} with default options.
960+
*
961+
* @param groupSpecs Map of Streams group ids to a spec that specifies the topic partitions of the group to list offsets for.
962+
* @return The ListStreamsGroupOffsetsResult.
963+
*/
964+
default ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs) {
965+
return listStreamsGroupOffsets(groupSpecs, new ListStreamsGroupOffsetsOptions());
966+
}
967+
943968
/**
944969
* Delete consumer groups from the cluster.
945970
*
@@ -957,6 +982,25 @@ default DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> group
957982
return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions());
958983
}
959984

985+
/**
986+
* Delete Streams groups from the cluster.
987+
*
988+
* <em>Note</em>: this method effectively does the same as the corresponding consumer group method {@link Admin#deleteConsumerGroups} does.
989+
*
990+
* @param options The options to use when deleting a Streams group.
991+
* @return The DeleteStreamsGroupsResult.
992+
*/
993+
DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds, DeleteStreamsGroupsOptions options);
994+
995+
/**
996+
* Delete Streams groups from the cluster with the default options.
997+
*
998+
* @return The DeleteStreamsGroupResult.
999+
*/
1000+
default DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds) {
1001+
return deleteStreamsGroups(groupIds, new DeleteStreamsGroupsOptions());
1002+
}
1003+
9601004
/**
9611005
* Delete committed offsets for a set of partitions in a consumer group. This will
9621006
* succeed at the partition level only if the group is not actively subscribed
@@ -980,6 +1024,31 @@ default DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String group
9801024
return deleteConsumerGroupOffsets(groupId, partitions, new DeleteConsumerGroupOffsetsOptions());
9811025
}
9821026

1027+
/**
1028+
* Delete committed offsets for a set of partitions in a Streams group. This will
1029+
* succeed at the partition level only if the group is not actively subscribed
1030+
* to the corresponding topic.
1031+
*
1032+
* <em>Note</em>: this method effectively does the same as the corresponding consumer group method {@link Admin#deleteConsumerGroupOffsets} does.
1033+
*
1034+
* @param options The options to use when deleting offsets in a Streams group.
1035+
* @return The DeleteStreamsGroupOffsetsResult.
1036+
*/
1037+
DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId,
1038+
Set<TopicPartition> partitions,
1039+
DeleteStreamsGroupOffsetsOptions options);
1040+
1041+
/**
1042+
* Delete committed offsets for a set of partitions in a Streams group with the default
1043+
* options. This will succeed at the partition level only if the group is not actively
1044+
* subscribed to the corresponding topic.
1045+
*
1046+
* @return The DeleteStreamsGroupOffsetsResult.
1047+
*/
1048+
default DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId, Set<TopicPartition> partitions) {
1049+
return deleteStreamsGroupOffsets(groupId, partitions, new DeleteStreamsGroupOffsetsOptions());
1050+
}
1051+
9831052
/**
9841053
* List the groups available in the cluster with the default options.
9851054
*
@@ -1213,6 +1282,34 @@ default AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId
12131282
*/
12141283
AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterConsumerGroupOffsetsOptions options);
12151284

1285+
/**
1286+
* <p>Alters offsets for the specified group. In order to succeed, the group must be empty.
1287+
*
1288+
* <p>This is a convenience method for {@link #alterStreamsGroupOffsets(String, Map, AlterStreamsGroupOffsetsOptions)} with default options.
1289+
* See the overload for more details.
1290+
*
1291+
* @param groupId The group for which to alter offsets.
1292+
* @param offsets A map of offsets by partition with associated metadata.
1293+
* @return The AlterOffsetsResult.
1294+
*/
1295+
default AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {
1296+
return alterStreamsGroupOffsets(groupId, offsets, new AlterStreamsGroupOffsetsOptions());
1297+
}
1298+
1299+
/**
1300+
* <p>Alters offsets for the specified group. In order to succeed, the group must be empty.
1301+
*
1302+
* <p>This operation is not transactional so it may succeed for some partitions while fail for others.
1303+
*
1304+
* <em>Note</em>: this method effectively does the same as the corresponding consumer group method {@link Admin#alterConsumerGroupOffsets} does.
1305+
*
1306+
* @param groupId The group for which to alter offsets.
1307+
* @param offsets A map of offsets by partition with associated metadata. Partitions not specified in the map are ignored.
1308+
* @param options The options to use when altering the offsets.
1309+
* @return The AlterOffsetsResult.
1310+
*/
1311+
AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterStreamsGroupOffsetsOptions options);
1312+
12161313
/**
12171314
* <p>List offset for the specified partitions and OffsetSpec. This operation enables to find
12181315
* the beginning offset, end offset as well as the offset matching a timestamp in partitions.

clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsResult.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public KafkaFuture<Void> all() {
7474
for (Errors error : topicPartitionErrorsMap.values()) {
7575
if (error != Errors.NONE) {
7676
throw error.exception(
77-
"Failed altering consumer group offsets for the following partitions: " + partitionsFailed);
77+
"Failed altering group offsets for the following partitions: " + partitionsFailed);
7878
}
7979
}
8080
return null;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.admin;
18+
19+
import org.apache.kafka.common.annotation.InterfaceStability;
20+
21+
22+
/**
23+
* Options for the {@link Admin#alterStreamsGroupOffsets(String groupId, Map), AlterStreamsGroupOffsetsOptions)} call.
24+
* <p>
25+
* The API of this class is evolving, see {@link Admin} for details.
26+
*/
27+
@InterfaceStability.Evolving
28+
public class AlterStreamsGroupOffsetsOptions extends AbstractOptions<AlterStreamsGroupOffsetsOptions> {
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.admin;
18+
19+
import org.apache.kafka.common.KafkaFuture;
20+
import org.apache.kafka.common.TopicPartition;
21+
import org.apache.kafka.common.annotation.InterfaceStability;
22+
23+
import java.util.Map;
24+
25+
/**
26+
* The result of the {@link AdminClient#alterStreamsGroupOffsets(String, Map)} call.
27+
*
28+
* The API of this class is evolving, see {@link AdminClient} for details.
29+
*/
30+
@InterfaceStability.Evolving
31+
public class AlterStreamsGroupOffsetsResult {
32+
33+
private final AlterConsumerGroupOffsetsResult delegate;
34+
35+
AlterStreamsGroupOffsetsResult(final AlterConsumerGroupOffsetsResult delegate) {
36+
this.delegate = delegate;
37+
}
38+
39+
/**
40+
* Return a future which can be used to check the result for a given partition.
41+
*/
42+
public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
43+
return delegate.partitionResult(partition);
44+
}
45+
46+
/**
47+
* Return a future which succeeds if all the alter offsets succeed.
48+
*/
49+
public KafkaFuture<Void> all() {
50+
return delegate.all();
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.admin;
18+
19+
import org.apache.kafka.common.annotation.InterfaceStability;
20+
21+
import java.util.Set;
22+
23+
/**
24+
* Options for the {@link Admin#deleteStreamsGroupOffsets(String, Set, DeleteStreamsGroupOffsetsOptions)} call.
25+
* <p>
26+
* The API of this class is evolving, see {@link Admin} for details.
27+
*/
28+
@InterfaceStability.Evolving
29+
public class DeleteStreamsGroupOffsetsOptions extends AbstractOptions<DeleteStreamsGroupOffsetsOptions> {
30+
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.admin;
18+
19+
import org.apache.kafka.common.KafkaFuture;
20+
import org.apache.kafka.common.TopicPartition;
21+
import org.apache.kafka.common.annotation.InterfaceStability;
22+
import org.apache.kafka.common.protocol.Errors;
23+
24+
import java.util.Map;
25+
import java.util.Set;
26+
27+
/**
28+
* The result of the {@link Admin#deleteStreamsGroupOffsets(String, Set, DeleteStreamsGroupOffsetsOptions)} call.
29+
* <p>
30+
* The API of this class is evolving, see {@link Admin} for details.
31+
*/
32+
@InterfaceStability.Evolving
33+
public class DeleteStreamsGroupOffsetsResult {
34+
private final DeleteConsumerGroupOffsetsResult delegate;
35+
36+
DeleteStreamsGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future, Set<TopicPartition> partitions) {
37+
delegate = new DeleteConsumerGroupOffsetsResult(future, partitions);
38+
}
39+
40+
DeleteStreamsGroupOffsetsResult(final DeleteConsumerGroupOffsetsResult delegate) {
41+
this.delegate = delegate;
42+
}
43+
44+
/**
45+
* Return a future which succeeds only if all the deletions succeed.
46+
*/
47+
public KafkaFuture<Void> all() {
48+
return delegate.all();
49+
}
50+
51+
/**
52+
* Return a future which can be used to check the result for a given topic.
53+
*/
54+
public KafkaFuture<Void> partitionResult(final TopicPartition topicPartition) {
55+
return delegate.partitionResult(topicPartition);
56+
}
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.admin;
18+
19+
import org.apache.kafka.common.annotation.InterfaceStability;
20+
21+
import java.util.Collection;
22+
23+
/**
24+
* Options for the {@link Admin#deleteStreamsGroups(Collection<String>, DeleteStreamsGroupsOptions)} call.
25+
* <p>
26+
* The API of this class is evolving, see {@link Admin} for details.
27+
*/
28+
@InterfaceStability.Evolving
29+
public class DeleteStreamsGroupsOptions extends AbstractOptions<DeleteStreamsGroupsOptions> {
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.clients.admin;
19+
20+
import org.apache.kafka.common.KafkaFuture;
21+
import org.apache.kafka.common.annotation.InterfaceStability;
22+
23+
import java.util.Collection;
24+
import java.util.Map;
25+
26+
/**
27+
* The result of the {@link Admin#deleteStreamsGroups(Collection, DeleteStreamsGroupsOptions)} call.
28+
* <p>
29+
* The API of this class is evolving, see {@link Admin} for details.
30+
*/
31+
@InterfaceStability.Evolving
32+
public class DeleteStreamsGroupsResult {
33+
34+
private final DeleteConsumerGroupsResult delegate;
35+
36+
DeleteStreamsGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
37+
delegate = new DeleteConsumerGroupsResult(futures);
38+
}
39+
40+
DeleteStreamsGroupsResult(final DeleteConsumerGroupsResult delegate) {
41+
this.delegate = delegate;
42+
}
43+
44+
/**
45+
* Return a future which succeeds only if all the deletions succeed.
46+
*/
47+
public KafkaFuture<Void> all() {
48+
return delegate.all();
49+
}
50+
51+
/**
52+
* Return a map from group id to futures which can be used to check the status of individual deletions.
53+
*/
54+
public Map<String, KafkaFuture<Void>> deletedGroups() {
55+
return delegate.deletedGroups();
56+
}
57+
}

0 commit comments

Comments
 (0)