Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public static ReadShareGroupStateSummaryResponseData toResponseData(
Uuid topicId,
int partition,
long startOffset,
int deliveryCompleteCount,
int leaderEpoch,
int stateEpoch
) {
Expand All @@ -110,6 +111,7 @@ public static ReadShareGroupStateSummaryResponseData toResponseData(
new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partition)
.setStartOffset(startOffset)
.setDeliveryCompleteCount(deliveryCompleteCount)
.setLeaderEpoch(leaderEpoch)
.setStateEpoch(stateEpoch)
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
"type": "request",
"listeners": ["broker"],
"name": "ReadShareGroupStateSummaryRequest",
"validVersions": "0",
// Version 0 is the initial version (KIP-932).
//
// Version 1 introduces DeliveryCompleteCount in the response (KIP-1226).
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
"apiKey": 87,
"type": "response",
"name": "ReadShareGroupStateSummaryResponse",
"validVersions": "0",
// Version 0 is the initial version (KIP-932).
//
// Version 1 introduces DeliveryCompleteCount (KIP-1226).
"validVersions": "0-1",
"flexibleVersions": "0+",
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
Expand All @@ -44,7 +47,9 @@
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The leader epoch of the share-partition." },
{ "name": "StartOffset", "type": "int64", "versions": "0+",
"about": "The share-partition start offset." }
"about": "The share-partition start offset." },
{ "name": "DeliveryCompleteCount", "type": "int32", "versions": "1+", "ignorable": "true", "default": "-1",
"about": "The number of offsets greater than or equal to share-partition start offset for which delivery has been completed." }
]}
]}
]
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12240,6 +12240,7 @@ class KafkaApisTest extends Logging {
.setErrorMessage(null)
.setStateEpoch(1)
.setStartOffset(10)
.setDeliveryCompleteCount(0)
))
)

Expand Down Expand Up @@ -12280,6 +12281,7 @@ class KafkaApisTest extends Logging {
.setErrorMessage(null)
.setStateEpoch(1)
.setStartOffset(10)
.setDeliveryCompleteCount(0)
))
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ ReadShareGroupStateSummaryResult readSummaryResponsesToResult(
partitionResult.partition(),
partitionResult.stateEpoch(),
partitionResult.startOffset(),
partitionResult.deliveryCompleteCount(),
partitionResult.leaderEpoch(),
partitionResult.errorCode(),
partitionResult.errorMessage()))
Expand All @@ -501,6 +502,7 @@ ReadShareGroupStateSummaryResult readSummaryResponsesToResult(
-1,
-1,
-1,
-1,
Errors.UNKNOWN_SERVER_ERROR.code(), // No specific public error code exists for InterruptedException / ExecutionException
"Error reading state from share coordinator: " + e.getMessage()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public CompletableFuture<ReadShareGroupStateSummaryResult> readSummary(ReadShare
resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream().
map(partitionIdData -> PartitionFactory.newPartitionStateSummaryData(
partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_LEADER_EPOCH, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
PartitionFactory.UNINITIALIZED_DELIVERY_COMPLETE_COUNT, PartitionFactory.DEFAULT_LEADER_EPOCH,
PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
.collect(Collectors.toList())));
}
return CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResult.Builder().setTopicsData(resultArgs).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public static PartitionErrorData newPartitionErrorData(int partition, short erro
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
}

public static PartitionStateSummaryData newPartitionStateSummaryData(int partition, int stateEpoch, long startOffset, int leaderEpoch, short errorCode, String errorMessage) {
return new PartitionData(partition, stateEpoch, startOffset, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, errorCode, errorMessage, leaderEpoch, null);
public static PartitionStateSummaryData newPartitionStateSummaryData(int partition, int stateEpoch, long startOffset, int deliveryCompleteCount, int leaderEpoch, short errorCode, String errorMessage) {
return new PartitionData(partition, stateEpoch, startOffset, deliveryCompleteCount, errorCode, errorMessage, leaderEpoch, null);
}

public static PartitionStateBatchData newPartitionStateBatchData(int partition, int stateEpoch, long startOffset, int deliveryCompleteCount, int leaderEpoch, List<PersisterStateBatch> stateBatches) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public interface PartitionStateSummaryData extends PartitionInfoData, PartitionI

long startOffset();

int deliveryCompleteCount();

short errorCode();

String errorMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public static ReadShareGroupStateSummaryResult from(ReadShareGroupStateSummaryRe
readStateSummaryResult.partitions().stream()
.map(partitionResult -> PartitionFactory.newPartitionStateSummaryData(
partitionResult.partition(), partitionResult.stateEpoch(), partitionResult.startOffset(),
partitionResult.leaderEpoch(), partitionResult.errorCode(), partitionResult.errorMessage()))
partitionResult.deliveryCompleteCount(), partitionResult.leaderEpoch(), partitionResult.errorCode(),
partitionResult.errorMessage()))
.collect(Collectors.toList())))
.collect(Collectors.toList()))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ public void testReadStateSummarySuccess() {

return requestGroupId.equals(groupId) && requestTopicId == topicId1 && requestPartition == partition1;
},
new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId1, partition1, 0, 1, 1)),
new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId1, partition1, 0, 0, 1, 1)),
coordinatorNode1);

client.prepareResponseFrom(
Expand All @@ -881,7 +881,7 @@ public void testReadStateSummarySuccess() {

return requestGroupId.equals(groupId) && requestTopicId == topicId2 && requestPartition == partition2;
},
new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId2, partition2, 0, 1, 1)),
new ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId2, partition2, 0, 0, 1, 1)),
coordinatorNode2);

ShareCoordinatorMetadataCacheHelper cacheHelper = getDefaultCacheHelper(suppliedNode);
Expand Down Expand Up @@ -931,12 +931,12 @@ public void testReadStateSummarySuccess() {

HashSet<PartitionData> expectedResultMap = new HashSet<>();
expectedResultMap.add(
(PartitionData) PartitionFactory.newPartitionStateSummaryData(partition1, 1, 0, 1, Errors.NONE.code(),
(PartitionData) PartitionFactory.newPartitionStateSummaryData(partition1, 1, 0, 0, 1, Errors.NONE.code(),
null
));

expectedResultMap.add(
(PartitionData) PartitionFactory.newPartitionStateSummaryData(partition2, 1, 0, 1, Errors.NONE.code(),
(PartitionData) PartitionFactory.newPartitionStateSummaryData(partition2, 1, 0, 0, 1, Errors.NONE.code(),
null
));

Expand Down Expand Up @@ -1438,6 +1438,7 @@ public void testReadStateSummaryResponseToResultPartialResults() {
tp1.topicId(),
tp1.partition(),
1L,
0,
1,
2
)
Expand Down Expand Up @@ -1470,15 +1471,15 @@ public void testReadStateSummaryResponseToResultPartialResults() {
results.topicsData().contains(
new TopicData<>(
tp1.topicId(),
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 1, Errors.NONE.code(), null))
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 0, 1, Errors.NONE.code(), null))
)
)
);
assertTrue(
results.topicsData().contains(
new TopicData<>(
tp2.topicId(),
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), 0, 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp"))
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), 0, 0, -1, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp"))
)
)
);
Expand All @@ -1498,6 +1499,7 @@ public void testReadStateSummaryResponseToResultFailedFuture() {
tp1.topicId(),
tp1.partition(),
1L,
0,
1,
2
)
Expand All @@ -1520,15 +1522,15 @@ public void testReadStateSummaryResponseToResultFailedFuture() {
results.topicsData().contains(
new TopicData<>(
tp1.topicId(),
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 1, Errors.NONE.code(), null))
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 0, 1, Errors.NONE.code(), null))
)
)
);
assertTrue(
results.topicsData().contains(
new TopicData<>(
tp2.topicId(),
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), -1, -1L, -1, Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share coordinator: java.lang.Exception: scary stuff"))
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), -1, -1L, -1, -1, Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share coordinator: java.lang.Exception: scary stuff"))
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ public CoordinatorResult<ReadShareGroupStateSummaryResponseData, CoordinatorReco
topicId,
partitionId,
PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.UNINITIALIZED_DELIVERY_COMPLETE_COUNT,
PartitionFactory.DEFAULT_LEADER_EPOCH,
PartitionFactory.DEFAULT_STATE_EPOCH
);
Expand All @@ -463,6 +464,7 @@ public CoordinatorResult<ReadShareGroupStateSummaryResponseData, CoordinatorReco
topicId,
partitionId,
offsetValue.startOffset(),
offsetValue.deliveryCompleteCount(),
offsetValue.leaderEpoch(),
offsetValue.stateEpoch()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ private void writeAndReplayRecord(ShareCoordinatorShard shard, int leaderEpoch)
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setDeliveryCompleteCount(0)
.setStateEpoch(0)
.setLeaderEpoch(leaderEpoch)
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
Expand Down Expand Up @@ -613,6 +614,7 @@ public void testReadStateSummarySuccess() {
PARTITION,
0,
0,
0,
0
), result.response());

Expand Down