Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18887 Implement Streams Admin APIs #19049

Merged
merged 4 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,31 @@ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, List
return listConsumerGroupOffsets(groupSpecs, new ListConsumerGroupOffsetsOptions());
}

/**
* List the Streams group offsets available in the cluster for the specified Streams groups.
*
* <em>Note</em>: this method effectively does the same as the corresponding consumer group method {@link Admin#listConsumerGroupOffsets} does.
*
* @param groupSpecs Map of Streams group ids to a spec that specifies the topic partitions of the group to list offsets for.
*
* @param options The options to use when listing the Streams group offsets.
* @return The ListStreamsGroupOffsetsResult
*/
ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs, ListStreamsGroupOffsetsOptions options);

/**
* List the Streams group offsets available in the cluster for the specified groups with the default options.
* <p>
* This is a convenience method for
* {@link #listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)} with default options.
*
* @param groupSpecs Map of Streams group ids to a spec that specifies the topic partitions of the group to list offsets for.
* @return The ListStreamsGroupOffsetsResult.
*/
default ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs) {
return listStreamsGroupOffsets(groupSpecs, new ListStreamsGroupOffsetsOptions());
}

/**
* Delete consumer groups from the cluster.
*
Expand All @@ -962,6 +987,25 @@ default DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> group
return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions());
}

/**
* Delete Streams groups from the cluster.
*
* <em>Note</em>: this method effectively does the same as the corresponding consumer group method {@link Admin#deleteConsumerGroups} does.
*
* @param options The options to use when deleting a Streams group.
* @return The DeleteStreamsGroupsResult.
*/
DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds, DeleteStreamsGroupsOptions options);

/**
* Delete Streams groups from the cluster with the default options.
*
* @return The DeleteStreamsGroupResult.
*/
default DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds) {
return deleteStreamsGroups(groupIds, new DeleteStreamsGroupsOptions());
}

/**
* Delete committed offsets for a set of partitions in a consumer group. This will
* succeed at the partition level only if the group is not actively subscribed
Expand All @@ -985,6 +1029,31 @@ default DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String group
return deleteConsumerGroupOffsets(groupId, partitions, new DeleteConsumerGroupOffsetsOptions());
}

/**
* Delete committed offsets for a set of partitions in a Streams group. This will
* succeed at the partition level only if the group is not actively subscribed
* to the corresponding topic.
*
* <em>Note</em>: this method effectively does the same as the corresponding consumer group method {@link Admin#deleteConsumerGroupOffsets} does.
*
* @param options The options to use when deleting offsets in a Streams group.
* @return The DeleteStreamsGroupOffsetsResult.
*/
DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId,
Set<TopicPartition> partitions,
DeleteStreamsGroupOffsetsOptions options);

/**
* Delete committed offsets for a set of partitions in a Streams group with the default
* options. This will succeed at the partition level only if the group is not actively
* subscribed to the corresponding topic.
*
* @return The DeleteStreamsGroupOffsetsResult.
*/
default DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId, Set<TopicPartition> partitions) {
return deleteStreamsGroupOffsets(groupId, partitions, new DeleteStreamsGroupOffsetsOptions());
}

/**
* List the groups available in the cluster with the default options.
*
Expand Down Expand Up @@ -1211,6 +1280,34 @@ default AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId
*/
AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterConsumerGroupOffsetsOptions options);

/**
* <p>Alters offsets for the specified group. In order to succeed, the group must be empty.
*
* <p>This is a convenience method for {@link #alterStreamsGroupOffsets(String, Map, AlterStreamsGroupOffsetsOptions)} with default options.
* See the overload for more details.
*
* @param groupId The group for which to alter offsets.
* @param offsets A map of offsets by partition with associated metadata.
* @return The AlterOffsetsResult.
*/
default AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets) {
return alterStreamsGroupOffsets(groupId, offsets, new AlterStreamsGroupOffsetsOptions());
}

/**
* <p>Alters offsets for the specified group. In order to succeed, the group must be empty.
*
* <p>This operation is not transactional so it may succeed for some partitions while fail for others.
*
* <em>Note</em>: this method effectively does the same as the corresponding consumer group method {@link Admin#alterConsumerGroupOffsets} does.
*
* @param groupId The group for which to alter offsets.
* @param offsets A map of offsets by partition with associated metadata. Partitions not specified in the map are ignored.
* @param options The options to use when altering the offsets.
* @return The AlterOffsetsResult.
*/
AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterStreamsGroupOffsetsOptions options);

/**
* <p>List offset for the specified partitions and OffsetSpec. This operation enables to find
* the beginning offset, end offset as well as the offset matching a timestamp in partitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public KafkaFuture<Void> all() {
for (Errors error : topicPartitionErrorsMap.values()) {
if (error != Errors.NONE) {
throw error.exception(
"Failed altering consumer group offsets for the following partitions: " + partitionsFailed);
"Failed altering group offsets for the following partitions: " + partitionsFailed);
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;


/**
* Options for the {@link Admin#alterStreamsGroupOffsets(String groupId, Map), AlterStreamsGroupOffsetsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class AlterStreamsGroupOffsetsOptions extends AbstractOptions<AlterStreamsGroupOffsetsOptions> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Map;

/**
* The result of the {@link AdminClient#alterStreamsGroupOffsets(String, Map)} call.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class AlterStreamsGroupOffsetsResult {

private final AlterConsumerGroupOffsetsResult delegate;

AlterStreamsGroupOffsetsResult(final AlterConsumerGroupOffsetsResult delegate) {
this.delegate = delegate;
}

/**
* Return a future which can be used to check the result for a given partition.
*/
public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
return delegate.partitionResult(partition);
}

/**
* Return a future which succeeds if all the alter offsets succeed.
*/
public KafkaFuture<Void> all() {
return delegate.all();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Set;

/**
* Options for the {@link Admin#deleteStreamsGroupOffsets(String, Set, DeleteStreamsGroupOffsetsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteStreamsGroupOffsetsOptions extends AbstractOptions<DeleteStreamsGroupOffsetsOptions> {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.protocol.Errors;

import java.util.Map;
import java.util.Set;

/**
* The result of the {@link Admin#deleteStreamsGroupOffsets(String, Set, DeleteStreamsGroupOffsetsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteStreamsGroupOffsetsResult {
private final DeleteConsumerGroupOffsetsResult delegate;

DeleteStreamsGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future, Set<TopicPartition> partitions) {
delegate = new DeleteConsumerGroupOffsetsResult(future, partitions);
}

DeleteStreamsGroupOffsetsResult(final DeleteConsumerGroupOffsetsResult delegate) {
this.delegate = delegate;
}

/**
* Return a future which succeeds only if all the deletions succeed.
*/
public KafkaFuture<Void> all() {
return delegate.all();
}

/**
* Return a future which can be used to check the result for a given topic.
*/
public KafkaFuture<Void> partitionResult(final TopicPartition topicPartition) {
return delegate.partitionResult(topicPartition);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Collection;

/**
* Options for the {@link Admin#deleteStreamsGroups(Collection<String>, DeleteStreamsGroupsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteStreamsGroupsOptions extends AbstractOptions<DeleteStreamsGroupsOptions> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Collection;
import java.util.Map;

/**
* The result of the {@link Admin#deleteStreamsGroups(Collection, DeleteStreamsGroupsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteStreamsGroupsResult {

private final DeleteConsumerGroupsResult delegate;

DeleteStreamsGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
delegate = new DeleteConsumerGroupsResult(futures);
}

DeleteStreamsGroupsResult(final DeleteConsumerGroupsResult delegate) {
this.delegate = delegate;
}

/**
* Return a future which succeeds only if all the deletions succeed.
*/
public KafkaFuture<Void> all() {
return delegate.all();
}

/**
* Return a map from group id to futures which can be used to check the status of individual deletions.
*/
public Map<String, KafkaFuture<Void>> deletedGroups() {
return delegate.deletedGroups();
}
}
Loading