Skip to content

Commit 5571821

Browse files
KAFKA-19799: Added deliveryCompleteCount to ReadShareGroupStateSummary (#20820)
This PR is part of [KIP-1226](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1226%3A+Introducing+Share+Partition+Lag+Persistence+and+Retrieval). In [KAFKA-19797](#20810), a new field deliveryCompleteCount was being sent by Share Partition to the persister. This PR introduces deliveryCompleteCount in ReadShareGroupStateSummary RPC so that it can later be used to calculate the share partition lag. Reviewers: Abhinav Dixit <[email protected]>, Sushant Mahajan <[email protected]>, Andrew Schofield <[email protected]>
1 parent 750523f commit 5571821

File tree

12 files changed

+39
-15
lines changed

12 files changed

+39
-15
lines changed

clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ public static ReadShareGroupStateSummaryResponseData toResponseData(
9999
Uuid topicId,
100100
int partition,
101101
long startOffset,
102+
int deliveryCompleteCount,
102103
int leaderEpoch,
103104
int stateEpoch
104105
) {
@@ -110,6 +111,7 @@ public static ReadShareGroupStateSummaryResponseData toResponseData(
110111
new ReadShareGroupStateSummaryResponseData.PartitionResult()
111112
.setPartition(partition)
112113
.setStartOffset(startOffset)
114+
.setDeliveryCompleteCount(deliveryCompleteCount)
113115
.setLeaderEpoch(leaderEpoch)
114116
.setStateEpoch(stateEpoch)
115117
))

clients/src/main/resources/common/message/ReadShareGroupStateSummaryRequest.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
"type": "request",
1919
"listeners": ["broker"],
2020
"name": "ReadShareGroupStateSummaryRequest",
21-
"validVersions": "0",
21+
// Version 0 is the initial version (KIP-932).
22+
//
23+
// Version 1 introduces DeliveryCompleteCount in the response (KIP-1226).
24+
"validVersions": "0-1",
2225
"flexibleVersions": "0+",
2326
"fields": [
2427
{ "name": "GroupId", "type": "string", "versions": "0+",

clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
"apiKey": 87,
1818
"type": "response",
1919
"name": "ReadShareGroupStateSummaryResponse",
20-
"validVersions": "0",
20+
// Version 0 is the initial version (KIP-932).
21+
//
22+
// Version 1 introduces DeliveryCompleteCount (KIP-1226).
23+
"validVersions": "0-1",
2124
"flexibleVersions": "0+",
2225
// - NOT_COORDINATOR (version 0+)
2326
// - COORDINATOR_NOT_AVAILABLE (version 0+)
@@ -44,7 +47,9 @@
4447
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
4548
"about": "The leader epoch of the share-partition." },
4649
{ "name": "StartOffset", "type": "int64", "versions": "0+",
47-
"about": "The share-partition start offset." }
50+
"about": "The share-partition start offset." },
51+
{ "name": "DeliveryCompleteCount", "type": "int32", "versions": "1+", "ignorable": "true", "default": "-1",
52+
"about": "The number of offsets greater than or equal to share-partition start offset for which delivery has been completed." }
4853
]}
4954
]}
5055
]

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12240,6 +12240,7 @@ class KafkaApisTest extends Logging {
1224012240
.setErrorMessage(null)
1224112241
.setStateEpoch(1)
1224212242
.setStartOffset(10)
12243+
.setDeliveryCompleteCount(0)
1224312244
))
1224412245
)
1224512246

@@ -12280,6 +12281,7 @@ class KafkaApisTest extends Logging {
1228012281
.setErrorMessage(null)
1228112282
.setStateEpoch(1)
1228212283
.setStartOffset(10)
12284+
.setDeliveryCompleteCount(0)
1228312285
))
1228412286
)
1228512287

server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,7 @@ ReadShareGroupStateSummaryResult readSummaryResponsesToResult(
490490
partitionResult.partition(),
491491
partitionResult.stateEpoch(),
492492
partitionResult.startOffset(),
493+
partitionResult.deliveryCompleteCount(),
493494
partitionResult.leaderEpoch(),
494495
partitionResult.errorCode(),
495496
partitionResult.errorMessage()))
@@ -501,6 +502,7 @@ ReadShareGroupStateSummaryResult readSummaryResponsesToResult(
501502
-1,
502503
-1,
503504
-1,
505+
-1,
504506
Errors.UNKNOWN_SERVER_ERROR.code(), // No specific public error code exists for InterruptedException / ExecutionException
505507
"Error reading state from share coordinator: " + e.getMessage()));
506508
}

server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ public CompletableFuture<ReadShareGroupStateSummaryResult> readSummary(ReadShare
9393
resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream().
9494
map(partitionIdData -> PartitionFactory.newPartitionStateSummaryData(
9595
partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.UNINITIALIZED_START_OFFSET,
96-
PartitionFactory.DEFAULT_LEADER_EPOCH, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
96+
PartitionFactory.UNINITIALIZED_DELIVERY_COMPLETE_COUNT, PartitionFactory.DEFAULT_LEADER_EPOCH,
97+
PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
9798
.collect(Collectors.toList())));
9899
}
99100
return CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResult.Builder().setTopicsData(resultArgs).build());

server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ public static PartitionErrorData newPartitionErrorData(int partition, short erro
4848
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
4949
}
5050

51-
public static PartitionStateSummaryData newPartitionStateSummaryData(int partition, int stateEpoch, long startOffset, int leaderEpoch, short errorCode, String errorMessage) {
52-
return new PartitionData(partition, stateEpoch, startOffset, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, errorCode, errorMessage, leaderEpoch, null);
51+
public static PartitionStateSummaryData newPartitionStateSummaryData(int partition, int stateEpoch, long startOffset, int deliveryCompleteCount, int leaderEpoch, short errorCode, String errorMessage) {
52+
return new PartitionData(partition, stateEpoch, startOffset, deliveryCompleteCount, errorCode, errorMessage, leaderEpoch, null);
5353
}
5454

5555
public static PartitionStateBatchData newPartitionStateBatchData(int partition, int stateEpoch, long startOffset, int deliveryCompleteCount, int leaderEpoch, List<PersisterStateBatch> stateBatches) {

server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public interface PartitionStateSummaryData extends PartitionInfoData, PartitionI
2828

2929
long startOffset();
3030

31+
int deliveryCompleteCount();
32+
3133
short errorCode();
3234

3335
String errorMessage();

server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ public static ReadShareGroupStateSummaryResult from(ReadShareGroupStateSummaryRe
3939
readStateSummaryResult.partitions().stream()
4040
.map(partitionResult -> PartitionFactory.newPartitionStateSummaryData(
4141
partitionResult.partition(), partitionResult.stateEpoch(), partitionResult.startOffset(),
42-
partitionResult.leaderEpoch(), partitionResult.errorCode(), partitionResult.errorMessage()))
42+
partitionResult.deliveryCompleteCount(), partitionResult.leaderEpoch(), partitionResult.errorCode(),
43+
partitionResult.errorMessage()))
4344
.collect(Collectors.toList())))
4445
.collect(Collectors.toList()))
4546
.build();

server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -869,7 +869,7 @@ public void testReadStateSummarySuccess() {
869869

870870
return requestGroupId.equals(groupId) && requestTopicId == topicId1 && requestPartition == partition1;
871871
},
872-
new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId1, partition1, 0, 1, 1)),
872+
new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId1, partition1, 0, 0, 1, 1)),
873873
coordinatorNode1);
874874

875875
client.prepareResponseFrom(
@@ -881,7 +881,7 @@ public void testReadStateSummarySuccess() {
881881

882882
return requestGroupId.equals(groupId) && requestTopicId == topicId2 && requestPartition == partition2;
883883
},
884-
new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId2, partition2, 0, 1, 1)),
884+
new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId2, partition2, 0, 0, 1, 1)),
885885
coordinatorNode2);
886886

887887
ShareCoordinatorMetadataCacheHelper cacheHelper = getDefaultCacheHelper(suppliedNode);
@@ -931,12 +931,12 @@ public void testReadStateSummarySuccess() {
931931

932932
HashSet<PartitionData> expectedResultMap = new HashSet<>();
933933
expectedResultMap.add(
934-
(PartitionData) PartitionFactory.newPartitionStateSummaryData(partition1, 1, 0, 1, Errors.NONE.code(),
934+
(PartitionData) PartitionFactory.newPartitionStateSummaryData(partition1, 1, 0, 0, 1, Errors.NONE.code(),
935935
null
936936
));
937937

938938
expectedResultMap.add(
939-
(PartitionData) PartitionFactory.newPartitionStateSummaryData(partition2, 1, 0, 1, Errors.NONE.code(),
939+
(PartitionData) PartitionFactory.newPartitionStateSummaryData(partition2, 1, 0, 0, 1, Errors.NONE.code(),
940940
null
941941
));
942942

@@ -1438,6 +1438,7 @@ public void testReadStateSummaryResponseToResultPartialResults() {
14381438
tp1.topicId(),
14391439
tp1.partition(),
14401440
1L,
1441+
0,
14411442
1,
14421443
2
14431444
)
@@ -1470,15 +1471,15 @@ public void testReadStateSummaryResponseToResultPartialResults() {
14701471
results.topicsData().contains(
14711472
new TopicData<>(
14721473
tp1.topicId(),
1473-
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 1, Errors.NONE.code(), null))
1474+
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 0, 1, Errors.NONE.code(), null))
14741475
)
14751476
)
14761477
);
14771478
assertTrue(
14781479
results.topicsData().contains(
14791480
new TopicData<>(
14801481
tp2.topicId(),
1481-
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), 0, 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp"))
1482+
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), 0, 0, -1, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp"))
14821483
)
14831484
)
14841485
);
@@ -1498,6 +1499,7 @@ public void testReadStateSummaryResponseToResultFailedFuture() {
14981499
tp1.topicId(),
14991500
tp1.partition(),
15001501
1L,
1502+
0,
15011503
1,
15021504
2
15031505
)
@@ -1520,15 +1522,15 @@ public void testReadStateSummaryResponseToResultFailedFuture() {
15201522
results.topicsData().contains(
15211523
new TopicData<>(
15221524
tp1.topicId(),
1523-
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 1, Errors.NONE.code(), null))
1525+
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 0, 1, Errors.NONE.code(), null))
15241526
)
15251527
)
15261528
);
15271529
assertTrue(
15281530
results.topicsData().contains(
15291531
new TopicData<>(
15301532
tp2.topicId(),
1531-
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), -1, -1L, -1, Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share coordinator: java.lang.Exception: scary stuff"))
1533+
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), -1, -1L, -1, -1, Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share coordinator: java.lang.Exception: scary stuff"))
15321534
)
15331535
)
15341536
);

0 commit comments

Comments
 (0)