Skip to content

Commit cfe85c2

Browse files
KAFKA-19802: Admin client changes for KIP-1226 (#20771)
Admin client changes for KIP-1226 which adds lag information for share groups. Reviewers: Lianet Magrans <[email protected]>
1 parent 5571821 commit cfe85c2

File tree

9 files changed

+182
-87
lines changed

9 files changed

+182
-87
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3814,7 +3814,7 @@ public AlterShareGroupOffsetsResult alterShareGroupOffsets(final String groupId,
38143814
@Override
38153815
public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String, ListShareGroupOffsetsSpec> groupSpecs,
38163816
final ListShareGroupOffsetsOptions options) {
3817-
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> future = ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet());
3817+
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, SharePartitionOffsetInfo>> future = ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet());
38183818
ListShareGroupOffsetsHandler handler = new ListShareGroupOffsetsHandler(groupSpecs, logContext);
38193819
invokeDriver(handler, future, options.timeoutMs);
38203820
return new ListShareGroupOffsetsResult(future.all());

clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.kafka.clients.admin;
1919

2020
import org.apache.kafka.clients.admin.internals.CoordinatorKey;
21-
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2221
import org.apache.kafka.common.KafkaFuture;
2322
import org.apache.kafka.common.TopicPartition;
2423
import org.apache.kafka.common.annotation.InterfaceStability;
@@ -36,22 +35,22 @@
3635
@InterfaceStability.Evolving
3736
public class ListShareGroupOffsetsResult {
3837

39-
private final Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures;
38+
private final Map<String, KafkaFuture<Map<TopicPartition, SharePartitionOffsetInfo>>> futures;
4039

41-
ListShareGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
40+
ListShareGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, SharePartitionOffsetInfo>>> futures) {
4241
this.futures = futures.entrySet().stream()
4342
.collect(Collectors.toMap(e -> e.getKey().idValue, Map.Entry::getValue));
4443
}
4544

4645
/**
4746
* Return the future when the requests for all groups succeed.
4847
*
49-
* @return Future which yields all {@code Map<String, Map<TopicPartition, OffsetAndMetadata>>} objects, if requests for all the groups succeed.
48+
* @return Future which yields all {@code Map<String, Map<TopicPartition, SharePartitionOffsetInfo>>} objects, if requests for all the groups succeed.
5049
*/
51-
public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> all() {
50+
public KafkaFuture<Map<String, Map<TopicPartition, SharePartitionOffsetInfo>>> all() {
5251
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture<?>[0])).thenApply(
5352
nil -> {
54-
Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets = new HashMap<>(futures.size());
53+
Map<String, Map<TopicPartition, SharePartitionOffsetInfo>> offsets = new HashMap<>(futures.size());
5554
futures.forEach((groupId, future) -> {
5655
try {
5756
offsets.put(groupId, future.get());
@@ -67,9 +66,9 @@ public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> all() {
6766

6867
/**
6968
* Return a future which yields a map of topic partitions to offsets for the specified group. If the group doesn't
70-
* have a committed offset for a specific partition, the corresponding value in the returned map will be null.
69+
* have offset information for a specific partition, the corresponding value in the returned map will be null.
7170
*/
72-
public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata(String groupId) {
71+
public KafkaFuture<Map<TopicPartition, SharePartitionOffsetInfo>> partitionsToOffsetInfo(String groupId) {
7372
if (!futures.containsKey(groupId)) {
7473
throw new IllegalArgumentException("Group ID not found: " + groupId);
7574
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.clients.admin;
19+
20+
import org.apache.kafka.common.annotation.InterfaceStability;
21+
22+
import java.util.Objects;
23+
import java.util.Optional;
24+
25+
/**
26+
* This class is used to contain the offset and lag information for a share-partition.
27+
*/
28+
@InterfaceStability.Evolving
29+
public class SharePartitionOffsetInfo {
30+
private final long startOffset;
31+
private final Optional<Integer> leaderEpoch;
32+
private final Optional<Long> lag;
33+
34+
/**
35+
* Construct a new SharePartitionOffsetInfo.
36+
*
37+
* @param startOffset The share-partition start offset
38+
* @param leaderEpoch The optional leader epoch of the share-partition
39+
* @param lag The optional lag for the share-partition
40+
*/
41+
public SharePartitionOffsetInfo(long startOffset, Optional<Integer> leaderEpoch, Optional<Long> lag) {
42+
this.startOffset = startOffset;
43+
this.leaderEpoch = leaderEpoch;
44+
this.lag = lag;
45+
}
46+
47+
public long startOffset() {
48+
return startOffset;
49+
}
50+
51+
public Optional<Integer> leaderEpoch() {
52+
return leaderEpoch;
53+
}
54+
55+
public Optional<Long> lag() {
56+
return lag;
57+
}
58+
59+
@Override
60+
public boolean equals(Object o) {
61+
if (this == o) return true;
62+
if (o == null || getClass() != o.getClass()) return false;
63+
SharePartitionOffsetInfo that = (SharePartitionOffsetInfo) o;
64+
return startOffset == that.startOffset &&
65+
Objects.equals(leaderEpoch, that.leaderEpoch) &&
66+
Objects.equals(lag, that.lag);
67+
}
68+
69+
@Override
70+
public int hashCode() {
71+
return Objects.hash(startOffset, leaderEpoch, lag);
72+
}
73+
74+
@Override
75+
public String toString() {
76+
return "SharePartitionOffsetInfo{" +
77+
"startOffset=" + startOffset +
78+
", leaderEpoch=" + leaderEpoch.orElse(null) +
79+
", lag=" + lag.orElse(null) +
80+
'}';
81+
}
82+
}

clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.apache.kafka.clients.admin.KafkaAdminClient;
2020
import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
2121
import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
22-
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
22+
import org.apache.kafka.clients.admin.SharePartitionOffsetInfo;
2323
import org.apache.kafka.common.Node;
2424
import org.apache.kafka.common.TopicPartition;
2525
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
@@ -47,7 +47,7 @@
4747
/**
4848
* This class is the handler for {@link KafkaAdminClient#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call
4949
*/
50-
public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> {
50+
public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, SharePartitionOffsetInfo>> {
5151

5252
private final Map<String, ListShareGroupOffsetsSpec> groupSpecs;
5353
private final Logger log;
@@ -60,7 +60,7 @@ public ListShareGroupOffsetsHandler(Map<String, ListShareGroupOffsetsSpec> group
6060
this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext);
6161
}
6262

63-
public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> newFuture(Collection<String> groupIds) {
63+
public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, SharePartitionOffsetInfo>> newFuture(Collection<String> groupIds) {
6464
return AdminApiFuture.forKeys(coordinatorKeys(groupIds));
6565
}
6666

@@ -110,13 +110,13 @@ public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int coordina
110110
}
111111

112112
@Override
113-
public ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleResponse(Node coordinator,
114-
Set<CoordinatorKey> groupIds,
115-
AbstractResponse abstractResponse) {
113+
public ApiResult<CoordinatorKey, Map<TopicPartition, SharePartitionOffsetInfo>> handleResponse(Node coordinator,
114+
Set<CoordinatorKey> groupIds,
115+
AbstractResponse abstractResponse) {
116116
validateKeys(groupIds);
117117

118118
final DescribeShareGroupOffsetsResponse response = (DescribeShareGroupOffsetsResponse) abstractResponse;
119-
final Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed = new HashMap<>();
119+
final Map<CoordinatorKey, Map<TopicPartition, SharePartitionOffsetInfo>> completed = new HashMap<>();
120120
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
121121
final List<CoordinatorKey> unmapped = new ArrayList<>();
122122

@@ -125,7 +125,7 @@ public ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleR
125125
if (response.hasGroupError(groupId)) {
126126
handleGroupError(coordinatorKey, response.groupError(groupId), failed, unmapped);
127127
} else {
128-
Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
128+
Map<TopicPartition, SharePartitionOffsetInfo> groupOffsetsListing = new HashMap<>();
129129
response.data().groups().stream().filter(g -> g.groupId().equals(groupId)).forEach(groupResponse -> {
130130
for (DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic topicResponse : groupResponse.topics()) {
131131
for (DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition partitionResponse : topicResponse.partitions()) {
@@ -137,7 +137,7 @@ public ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleR
137137
if (partitionResponse.startOffset() < 0) {
138138
groupOffsetsListing.put(tp, null);
139139
} else {
140-
groupOffsetsListing.put(tp, new OffsetAndMetadata(startOffset, leaderEpoch, ""));
140+
groupOffsetsListing.put(tp, new SharePartitionOffsetInfo(startOffset, leaderEpoch, Optional.empty()));
141141
}
142142
} else {
143143
log.warn("Skipping return offset for {} due to error {}: {}.", tp, partitionResponse.errorCode(), partitionResponse.errorMessage());

clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,8 @@ public static ListConfigResourcesResult listConfigResourcesResult(KafkaException
186186
return new ListConfigResourcesResult(future);
187187
}
188188

189-
public static ListShareGroupOffsetsResult createListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> groupOffsets) {
190-
Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> coordinatorFutures = groupOffsets.entrySet().stream()
189+
public static ListShareGroupOffsetsResult createListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, SharePartitionOffsetInfo>>> groupOffsets) {
190+
Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, SharePartitionOffsetInfo>>> coordinatorFutures = groupOffsets.entrySet().stream()
191191
.collect(Collectors.toMap(
192192
entry -> CoordinatorKey.byGroupId(entry.getKey()),
193193
Map.Entry::getValue

clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11186,15 +11186,15 @@ public void testListShareGroupOffsets() throws Exception {
1118611186
env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data));
1118711187

1118811188
final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs);
11189-
final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get();
11189+
final Map<TopicPartition, SharePartitionOffsetInfo> partitionToOffsetInfo = result.partitionsToOffsetInfo(GROUP_ID).get();
1119011190

11191-
assertEquals(6, partitionToOffsetAndMetadata.size());
11192-
assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition0));
11193-
assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition1));
11194-
assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition2));
11195-
assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""), partitionToOffsetAndMetadata.get(myTopicPartition3));
11196-
assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""), partitionToOffsetAndMetadata.get(myTopicPartition4));
11197-
assertEquals(new OffsetAndMetadata(500, Optional.of(3), ""), partitionToOffsetAndMetadata.get(myTopicPartition5));
11191+
assertEquals(6, partitionToOffsetInfo.size());
11192+
assertEquals(new SharePartitionOffsetInfo(10, Optional.of(0), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition0));
11193+
assertEquals(new SharePartitionOffsetInfo(11, Optional.of(0), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition1));
11194+
assertEquals(new SharePartitionOffsetInfo(40, Optional.of(0), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition2));
11195+
assertEquals(new SharePartitionOffsetInfo(50, Optional.of(1), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition3));
11196+
assertEquals(new SharePartitionOffsetInfo(100, Optional.of(2), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition4));
11197+
assertEquals(new SharePartitionOffsetInfo(500, Optional.of(3), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition5));
1119811198
}
1119911199
}
1120011200

@@ -11257,17 +11257,17 @@ public void testListShareGroupOffsetsMultipleGroups() throws Exception {
1125711257
final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs);
1125811258
assertEquals(2, result.all().get().size());
1125911259

11260-
final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadataGroup0 = result.partitionsToOffsetAndMetadata(GROUP_ID).get();
11261-
assertEquals(4, partitionToOffsetAndMetadataGroup0.size());
11262-
assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition0));
11263-
assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition1));
11264-
assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition2));
11265-
assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition3));
11260+
final Map<TopicPartition, SharePartitionOffsetInfo> partitionToOffsetInfoGroup0 = result.partitionsToOffsetInfo(GROUP_ID).get();
11261+
assertEquals(4, partitionToOffsetInfoGroup0.size());
11262+
assertEquals(new SharePartitionOffsetInfo(10, Optional.of(0), Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition0));
11263+
assertEquals(new SharePartitionOffsetInfo(11, Optional.of(0), Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition1));
11264+
assertEquals(new SharePartitionOffsetInfo(40, Optional.of(0), Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition2));
11265+
assertEquals(new SharePartitionOffsetInfo(50, Optional.of(1), Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition3));
1126611266

11267-
final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadataGroup1 = result.partitionsToOffsetAndMetadata("group-1").get();
11268-
assertEquals(2, partitionToOffsetAndMetadataGroup1.size());
11269-
assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""), partitionToOffsetAndMetadataGroup1.get(myTopicPartition4));
11270-
assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""), partitionToOffsetAndMetadataGroup1.get(myTopicPartition5));
11267+
final Map<TopicPartition, SharePartitionOffsetInfo> partitionToOffsetInfoGroup1 = result.partitionsToOffsetInfo("group-1").get();
11268+
assertEquals(2, partitionToOffsetInfoGroup1.size());
11269+
assertEquals(new SharePartitionOffsetInfo(100, Optional.of(2), Optional.empty()), partitionToOffsetInfoGroup1.get(myTopicPartition4));
11270+
assertEquals(new SharePartitionOffsetInfo(500, Optional.of(2), Optional.empty()), partitionToOffsetInfoGroup1.get(myTopicPartition5));
1127111271
}
1127211272
}
1127311273

@@ -11290,9 +11290,9 @@ public void testListShareGroupOffsetsEmpty() throws Exception {
1129011290
env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data));
1129111291

1129211292
final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs);
11293-
final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get();
11293+
final Map<TopicPartition, SharePartitionOffsetInfo> partitionToOffsetInfo = result.partitionsToOffsetInfo(GROUP_ID).get();
1129411294

11295-
assertEquals(0, partitionToOffsetAndMetadata.size());
11295+
assertEquals(0, partitionToOffsetInfo.size());
1129611296
}
1129711297
}
1129811298

@@ -11342,13 +11342,13 @@ public void testListShareGroupOffsetsWithErrorInOnePartition() throws Exception
1134211342
env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data));
1134311343

1134411344
final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs);
11345-
final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get();
11345+
final Map<TopicPartition, SharePartitionOffsetInfo> partitionToOffsetInfo = result.partitionsToOffsetInfo(GROUP_ID).get();
1134611346

1134711347
// For myTopicPartition2 we have set an error as the response. Thus, it should be skipped from the final result
11348-
assertEquals(3, partitionToOffsetAndMetadata.size());
11349-
assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition0));
11350-
assertEquals(new OffsetAndMetadata(11, Optional.of(1), ""), partitionToOffsetAndMetadata.get(myTopicPartition1));
11351-
assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""), partitionToOffsetAndMetadata.get(myTopicPartition3));
11348+
assertEquals(3, partitionToOffsetInfo.size());
11349+
assertEquals(new SharePartitionOffsetInfo(10, Optional.of(0), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition0));
11350+
assertEquals(new SharePartitionOffsetInfo(11, Optional.of(1), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition1));
11351+
assertEquals(new SharePartitionOffsetInfo(500, Optional.of(2), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition3));
1135211352
}
1135311353
}
1135411354

0 commit comments

Comments
 (0)