Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18887 Implement Streams Admin APIs #19049

Merged
merged 4 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,29 @@ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, List
return listConsumerGroupOffsets(groupSpecs, new ListConsumerGroupOffsetsOptions());
}

/**
* List the Streams group offsets available in the cluster for the specified Streams groups.
*
* @param groupSpecs Map of Streams group ids to a spec that specifies the topic partitions of the group to list offsets for.
*
* @param options The options to use when listing the Streams group offsets.
* @return The ListStreamsGroupOffsetsResult
*/
ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs, ListStreamsGroupOffsetsOptions options);

/**
* List the Streams group offsets available in the cluster for the specified groups with the default options.
* <p>
* This is a convenience method for
* {@link #listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)} with default options.
*
* @param groupSpecs Map of Streams group ids to a spec that specifies the topic partitions of the group to list offsets for.
* @return The ListStreamsGroupOffsetsResult.
*/
default ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs) {
return listStreamsGroupOffsets(groupSpecs, new ListStreamsGroupOffsetsOptions());
}

/**
* Delete consumer groups from the cluster.
*
Expand All @@ -962,6 +985,23 @@ default DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> group
return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions());
}

/**
* Delete Streams groups from the cluster.
*
* @param options The options to use when deleting a Streams group.
* @return The DeleteStreamsGroupsResult.
*/
DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds, DeleteStreamsGroupsOptions options);

/**
* Delete Streams groups from the cluster with the default options.
*
* @return The DeleteStreamsGroupResult.
*/
default DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds) {
return deleteStreamsGroups(groupIds, new DeleteStreamsGroupsOptions());
}

/**
* Delete committed offsets for a set of partitions in a consumer group. This will
* succeed at the partition level only if the group is not actively subscribed
Expand All @@ -985,6 +1025,29 @@ default DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String group
return deleteConsumerGroupOffsets(groupId, partitions, new DeleteConsumerGroupOffsetsOptions());
}

/**
* Delete committed offsets for a set of partitions in a Streams group. This will
* succeed at the partition level only if the group is not actively subscribed
* to the corresponding topic.
*
* @param options The options to use when deleting offsets in a Streams group.
* @return The DeleteStreamsGroupOffsetsResult.
*/
DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId,
Set<TopicPartition> partitions,
DeleteStreamsGroupOffsetsOptions options);

/**
* Delete committed offsets for a set of partitions in a Streams group with the default
* options. This will succeed at the partition level only if the group is not actively
* subscribed to the corresponding topic.
*
* @return The DeleteStreamsGroupOffsetsResult.
*/
default DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId, Set<TopicPartition> partitions) {
return deleteStreamsGroupOffsets(groupId, partitions, new DeleteStreamsGroupOffsetsOptions());
}

/**
* List the groups available in the cluster with the default options.
*
Expand Down Expand Up @@ -1211,6 +1274,32 @@ default AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId
*/
AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterConsumerGroupOffsetsOptions options);

/**
* <p>Alters offsets for the specified group. In order to succeed, the group must be empty.
*
* <p>This is a convenience method for {@link #alterStreamsGroupOffsets(String, Map, AlterStreamsGroupOffsetsOptions)} with default options.
* See the overload for more details.
*
* @param groupId The group for which to alter offsets.
* @param offsets A map of offsets by partition with associated metadata.
* @return The AlterOffsetsResult.
*/
default AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {
return alterStreamsGroupOffsets(groupId, offsets, new AlterStreamsGroupOffsetsOptions());
}

/**
* <p>Alters offsets for the specified group. In order to succeed, the group must be empty.
*
* <p>This operation is not transactional so it may succeed for some partitions while fail for others.
*
* @param groupId The group for which to alter offsets.
* @param offsets A map of offsets by partition with associated metadata. Partitions not specified in the map are ignored.
* @param options The options to use when altering the offsets.
* @return The AlterOffsetsResult.
*/
AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterStreamsGroupOffsetsOptions options);

/**
* <p>List offset for the specified partitions and OffsetSpec. This operation enables to find
* the beginning offset, end offset as well as the offset matching a timestamp in partitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public class AlterConsumerGroupOffsetsResult {
this.future = future;
}

KafkaFuture<Map<TopicPartition, Errors>> future() {
return future;
}

/**
* Return a future which can be used to check the result for a given partition.
*/
Expand Down Expand Up @@ -78,7 +82,7 @@ public KafkaFuture<Void> all() {
for (Errors error : topicPartitionErrorsMap.values()) {
if (error != Errors.NONE) {
throw error.exception(
"Failed altering consumer group offsets for the following partitions: " + partitionsFailed);
"Failed altering group offsets for the following partitions: " + partitionsFailed);
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Map;

/**
* Options for the {@link AdminClient#alterStreamsGroupOffsets(String, Map, AlterStreamsGroupOffsetsOptions)} call.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class AlterStreamsGroupOffsetsOptions extends AlterConsumerGroupOffsetsOptions {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.protocol.Errors;

import java.util.Map;

/**
* The result of the {@link AdminClient#alterConsumerGroupOffsets(String, Map)} call.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class AlterStreamsGroupOffsetsResult extends AlterConsumerGroupOffsetsResult {

AlterStreamsGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future) {
super(future);
}

AlterStreamsGroupOffsetsResult(AlterConsumerGroupOffsetsResult parent) {
super(parent.future());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ public class DeleteConsumerGroupOffsetsResult {
this.partitions = partitions;
}

KafkaFuture<Map<TopicPartition, Errors>> future() {
return future;
}

Set<TopicPartition> partitions() {
return partitions;
}

/**
* Return a future which can be used to check the result for a given partition.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public abstract class DeleteGroupsResult {
this.futures = futures;
}

Map<String, KafkaFuture<Void>> futures() {
return futures;
}

/**
* Return a map from group id to futures which can be used to check the status of
* individual deletions.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Set;

/**
* Options for the {@link Admin#deleteStreamsGroupOffsets(String, Set)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteStreamsGroupOffsetsOptions extends DeleteConsumerGroupOffsetsOptions {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.protocol.Errors;

import java.util.Map;
import java.util.Set;

/**
* The result of the {@link Admin#deleteConsumerGroupOffsets(String, Set)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteStreamsGroupOffsetsResult extends DeleteConsumerGroupOffsetsResult {

DeleteStreamsGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future, Set<TopicPartition> partitions) {
super(future, partitions);
}

DeleteStreamsGroupOffsetsResult(DeleteConsumerGroupOffsetsResult parent) {
super(parent.future(), parent.partitions());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Collection;

/**
* Options for the {@link Admin#deleteStreamsGroups(Collection)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteStreamsGroupsOptions extends DeleteConsumerGroupsOptions {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;

import java.util.Collection;
import java.util.Map;

/**
* The result of the {@link Admin#deleteStreamsGroups(Collection <String>, DeleteStreamsGroupsOptions)} call.
*/
public class DeleteStreamsGroupsResult extends DeleteConsumerGroupsResult {
public DeleteStreamsGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
super(futures);
}

public DeleteStreamsGroupsResult(final DeleteConsumerGroupsResult parent) {
super(parent.futures());
}
}
Loading