diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java index a2787ff82c96e..4f8c1c3b0cb57 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java @@ -99,6 +99,7 @@ public static ReadShareGroupStateSummaryResponseData toResponseData( Uuid topicId, int partition, long startOffset, + int deliveryCompleteCount, int leaderEpoch, int stateEpoch ) { @@ -110,6 +111,7 @@ public static ReadShareGroupStateSummaryResponseData toResponseData( new ReadShareGroupStateSummaryResponseData.PartitionResult() .setPartition(partition) .setStartOffset(startOffset) + .setDeliveryCompleteCount(deliveryCompleteCount) .setLeaderEpoch(leaderEpoch) .setStateEpoch(stateEpoch) )) diff --git a/clients/src/main/resources/common/message/ReadShareGroupStateSummaryRequest.json b/clients/src/main/resources/common/message/ReadShareGroupStateSummaryRequest.json index cdbad63bfa22b..6e2bc52654a45 100644 --- a/clients/src/main/resources/common/message/ReadShareGroupStateSummaryRequest.json +++ b/clients/src/main/resources/common/message/ReadShareGroupStateSummaryRequest.json @@ -18,7 +18,10 @@ "type": "request", "listeners": ["broker"], "name": "ReadShareGroupStateSummaryRequest", - "validVersions": "0", + // Version 0 is the initial version (KIP-932). + // + // Version 1 introduces DeliveryCompleteCount in the response (KIP-1226). + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", diff --git a/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json b/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json index 81e3edc554ece..993043addeec5 100644 --- a/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json +++ b/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json @@ -17,7 +17,10 @@ "apiKey": 87, "type": "response", "name": "ReadShareGroupStateSummaryResponse", - "validVersions": "0", + // Version 0 is the initial version (KIP-932). + // + // Version 1 introduces DeliveryCompleteCount (KIP-1226). + "validVersions": "0-1", "flexibleVersions": "0+", // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) @@ -44,7 +47,9 @@ { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The leader epoch of the share-partition." }, { "name": "StartOffset", "type": "int64", "versions": "0+", - "about": "The share-partition start offset." } + "about": "The share-partition start offset." }, + { "name": "DeliveryCompleteCount", "type": "int32", "versions": "1+", "ignorable": "true", "default": "-1", + "about": "The number of offsets greater than or equal to share-partition start offset for which delivery has been completed." } ]} ]} ] diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 01daa19d7a01d..691814b94b419 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -12240,6 +12240,7 @@ class KafkaApisTest extends Logging { .setErrorMessage(null) .setStateEpoch(1) .setStartOffset(10) + .setDeliveryCompleteCount(0) )) ) @@ -12280,6 +12281,7 @@ class KafkaApisTest extends Logging { .setErrorMessage(null) .setStateEpoch(1) .setStartOffset(10) + .setDeliveryCompleteCount(0) )) ) diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java index 776c4f73bdf71..4d593a3bd98fc 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java @@ -490,6 +490,7 @@ ReadShareGroupStateSummaryResult readSummaryResponsesToResult( partitionResult.partition(), partitionResult.stateEpoch(), partitionResult.startOffset(), + partitionResult.deliveryCompleteCount(), partitionResult.leaderEpoch(), partitionResult.errorCode(), partitionResult.errorMessage())) @@ -501,6 +502,7 @@ ReadShareGroupStateSummaryResult readSummaryResponsesToResult( -1, -1, -1, + -1, Errors.UNKNOWN_SERVER_ERROR.code(), // No specific public error code exists for InterruptedException / ExecutionException "Error reading state from share coordinator: " + e.getMessage())); } diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java index 908891dd463bc..270d1fabfa3e2 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java @@ -93,7 +93,8 @@ public CompletableFuture readSummary(ReadShare resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream(). map(partitionIdData -> PartitionFactory.newPartitionStateSummaryData( partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.UNINITIALIZED_START_OFFSET, - PartitionFactory.DEFAULT_LEADER_EPOCH, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE)) + PartitionFactory.UNINITIALIZED_DELIVERY_COMPLETE_COUNT, PartitionFactory.DEFAULT_LEADER_EPOCH, + PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE)) .collect(Collectors.toList()))); } return CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResult.Builder().setTopicsData(resultArgs).build()); diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java index f0612677fc904..ed998bb0b1f30 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java @@ -48,8 +48,8 @@ public static PartitionErrorData newPartitionErrorData(int partition, short erro return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null); } - public static PartitionStateSummaryData newPartitionStateSummaryData(int partition, int stateEpoch, long startOffset, int leaderEpoch, short errorCode, String errorMessage) { - return new PartitionData(partition, stateEpoch, startOffset, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, errorCode, errorMessage, leaderEpoch, null); + public static PartitionStateSummaryData newPartitionStateSummaryData(int partition, int stateEpoch, long startOffset, int deliveryCompleteCount, int leaderEpoch, short errorCode, String errorMessage) { + return new PartitionData(partition, stateEpoch, startOffset, deliveryCompleteCount, errorCode, errorMessage, leaderEpoch, null); } public static PartitionStateBatchData newPartitionStateBatchData(int partition, int stateEpoch, long startOffset, int deliveryCompleteCount, int leaderEpoch, List stateBatches) { diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java index 58a9dc1061520..7cc8ed2cae167 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java @@ -28,6 +28,8 @@ public interface PartitionStateSummaryData extends PartitionInfoData, PartitionI long startOffset(); + int deliveryCompleteCount(); + short errorCode(); String errorMessage(); diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java index 249eb20ed94aa..81a1d200c9d48 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java @@ -39,7 +39,8 @@ public static ReadShareGroupStateSummaryResult from(ReadShareGroupStateSummaryRe readStateSummaryResult.partitions().stream() .map(partitionResult -> PartitionFactory.newPartitionStateSummaryData( partitionResult.partition(), partitionResult.stateEpoch(), partitionResult.startOffset(), - partitionResult.leaderEpoch(), partitionResult.errorCode(), partitionResult.errorMessage())) + partitionResult.deliveryCompleteCount(), partitionResult.leaderEpoch(), partitionResult.errorCode(), + partitionResult.errorMessage())) .collect(Collectors.toList()))) .collect(Collectors.toList())) .build(); diff --git a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java index 9dc1b7886ed96..51a617c43db63 100644 --- a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java @@ -869,7 +869,7 @@ public void testReadStateSummarySuccess() { return requestGroupId.equals(groupId) && requestTopicId == topicId1 && requestPartition == partition1; }, - new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId1, partition1, 0, 1, 1)), + new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId1, partition1, 0, 0, 1, 1)), coordinatorNode1); client.prepareResponseFrom( @@ -881,7 +881,7 @@ public void testReadStateSummarySuccess() { return requestGroupId.equals(groupId) && requestTopicId == topicId2 && requestPartition == partition2; }, - new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId2, partition2, 0, 1, 1)), + new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId2, partition2, 0, 0, 1, 1)), coordinatorNode2); ShareCoordinatorMetadataCacheHelper cacheHelper = getDefaultCacheHelper(suppliedNode); @@ -931,12 +931,12 @@ public void testReadStateSummarySuccess() { HashSet expectedResultMap = new HashSet<>(); expectedResultMap.add( - (PartitionData) PartitionFactory.newPartitionStateSummaryData(partition1, 1, 0, 1, Errors.NONE.code(), + (PartitionData) PartitionFactory.newPartitionStateSummaryData(partition1, 1, 0, 0, 1, Errors.NONE.code(), null )); expectedResultMap.add( - (PartitionData) PartitionFactory.newPartitionStateSummaryData(partition2, 1, 0, 1, Errors.NONE.code(), + (PartitionData) PartitionFactory.newPartitionStateSummaryData(partition2, 1, 0, 0, 1, Errors.NONE.code(), null )); @@ -1438,6 +1438,7 @@ public void testReadStateSummaryResponseToResultPartialResults() { tp1.topicId(), tp1.partition(), 1L, + 0, 1, 2 ) @@ -1470,7 +1471,7 @@ public void testReadStateSummaryResponseToResultPartialResults() { results.topicsData().contains( new TopicData<>( tp1.topicId(), - List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 1, Errors.NONE.code(), null)) + List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 0, 1, Errors.NONE.code(), null)) ) ) ); @@ -1478,7 +1479,7 @@ public void testReadStateSummaryResponseToResultPartialResults() { results.topicsData().contains( new TopicData<>( tp2.topicId(), - List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), 0, 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp")) + List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), 0, 0, -1, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp")) ) ) ); @@ -1498,6 +1499,7 @@ public void testReadStateSummaryResponseToResultFailedFuture() { tp1.topicId(), tp1.partition(), 1L, + 0, 1, 2 ) @@ -1520,7 +1522,7 @@ public void testReadStateSummaryResponseToResultFailedFuture() { results.topicsData().contains( new TopicData<>( tp1.topicId(), - List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 1, Errors.NONE.code(), null)) + List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 0, 1, Errors.NONE.code(), null)) ) ) ); @@ -1528,7 +1530,7 @@ public void testReadStateSummaryResponseToResultFailedFuture() { results.topicsData().contains( new TopicData<>( tp2.topicId(), - List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), -1, -1L, -1, Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share coordinator: java.lang.Exception: scary stuff")) + List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), -1, -1L, -1, -1, Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share coordinator: java.lang.Exception: scary stuff")) ) ) ); diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java index db8819425d645..aa098090aef89 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java @@ -445,6 +445,7 @@ public CoordinatorResult