Skip to content

Commit b900f2f

Browse files
KAFKA-19797: Added deliveryCompleteCount to writeState, ShareUpdate and ShareSnapshot schemas (#20810)
This PR is part of [KIP-1226](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1226%3A+Introducing+Share+Partition+Lag+Persistence+and+Retrieval). It introduces a new field called `deliveryCompleteCount` in writeShareGroupState RPC and `ShareUpdate` and `ShareSnapshot`. The share partition now maintains `inFlightTerminalCount` (introduced [here](#20787)) and it uses this exact value to pass to the writeState request for `deliveryCompleteCount`. Reviewers: Andrew Schofield <[email protected]>, Abhinav Dixit <[email protected]>
1 parent c5ae7b4 commit b900f2f

File tree

23 files changed

+506
-372
lines changed

23 files changed

+506
-372
lines changed

clients/src/main/resources/common/message/WriteShareGroupStateRequest.json

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
"type": "request",
1919
"listeners": ["broker"],
2020
"name": "WriteShareGroupStateRequest",
21-
"validVersions": "0",
21+
// Version 0 is the initial version (KIP-932).
22+
//
23+
// Version 1 introduces DeliveryCompleteCount (KIP-1226).
24+
"validVersions": "0-1",
2225
"flexibleVersions": "0+",
2326
"fields": [
2427
{ "name": "GroupId", "type": "string", "versions": "0+",
@@ -37,6 +40,8 @@
3740
"about": "The leader epoch of the share-partition." },
3841
{ "name": "StartOffset", "type": "int64", "versions": "0+",
3942
"about": "The share-partition start offset, or -1 if the start offset is not being written." },
43+
{ "name": "DeliveryCompleteCount", "type": "int32", "versions": "1+", "ignorable": "true", "default": "-1",
44+
"about": "The number of offsets greater than or equal to share-partition start offset for which delivery has been completed." },
4045
{ "name": "StateBatches", "type": "[]StateBatch", "versions": "0+",
4146
"about": "The state batches for the share-partition.", "fields": [
4247
{ "name": "FirstOffset", "type": "int64", "versions": "0+",

clients/src/main/resources/common/message/WriteShareGroupStateResponse.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
"apiKey": 85,
1818
"type": "response",
1919
"name": "WriteShareGroupStateResponse",
20-
"validVersions": "0",
20+
// Version 0 is the initial version (KIP-932).
21+
//
22+
// Version 1 introduces DeliveryCompleteCount in the request (KIP-1226).
23+
"validVersions": "0-1",
2124
"flexibleVersions": "0+",
2225
// - NOT_COORDINATOR (version 0+)
2326
// - COORDINATOR_NOT_AVAILABLE (version 0+)

clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3737,6 +3737,7 @@ private WriteShareGroupStateRequest createWriteShareGroupStateRequest(short vers
37373737
.setPartition(0)
37383738
.setStateEpoch(0)
37393739
.setStartOffset(0)
3740+
.setDeliveryCompleteCount(0)
37403741
.setStateBatches(singletonList(new WriteShareGroupStateRequestData.StateBatch()
37413742
.setFirstOffset(0)
37423743
.setLastOffset(0)

core/src/main/java/kafka/server/share/SharePartition.java

Lines changed: 40 additions & 40 deletions
Large diffs are not rendered by default.

core/src/test/java/kafka/server/share/SharePartitionTest.java

Lines changed: 313 additions & 313 deletions
Large diffs are not rendered by default.

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13299,6 +13299,7 @@ class KafkaApisTest extends Logging {
1329913299
.setLeaderEpoch(1)
1330013300
.setStateEpoch(2)
1330113301
.setStartOffset(10)
13302+
.setDeliveryCompleteCount(5)
1330213303
.setStateBatches(util.List.of(
1330313304
new WriteShareGroupStateRequestData.StateBatch()
1330413305
.setFirstOffset(11)
@@ -13346,6 +13347,7 @@ class KafkaApisTest extends Logging {
1334613347
.setLeaderEpoch(1)
1334713348
.setStateEpoch(2)
1334813349
.setStartOffset(10)
13350+
.setDeliveryCompleteCount(5)
1334913351
.setStateBatches(util.List.of(
1335013352
new WriteShareGroupStateRequestData.StateBatch()
1335113353
.setFirstOffset(11)

server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ stateManager.new WriteStateHandler(
150150
partitionData.stateEpoch(),
151151
partitionData.leaderEpoch(),
152152
partitionData.startOffset(),
153+
partitionData.deliveryCompleteCount(),
153154
partitionData.stateBatches(),
154155
future, null)
155156
);

server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionData.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,18 @@ public class PartitionData implements
3030
private final int partition;
3131
private final int stateEpoch;
3232
private final long startOffset;
33+
private final int deliveryCompleteCount;
3334
private final short errorCode;
3435
private final String errorMessage;
3536
private final int leaderEpoch;
3637
private final List<PersisterStateBatch> stateBatches;
3738

38-
public PartitionData(int partition, int stateEpoch, long startOffset, short errorCode,
39+
public PartitionData(int partition, int stateEpoch, long startOffset, int deliveryCompleteCount, short errorCode,
3940
String errorMessage, int leaderEpoch, List<PersisterStateBatch> stateBatches) {
4041
this.partition = partition;
4142
this.stateEpoch = stateEpoch;
4243
this.startOffset = startOffset;
44+
this.deliveryCompleteCount = deliveryCompleteCount;
4345
this.errorCode = errorCode;
4446
this.leaderEpoch = leaderEpoch;
4547
this.errorMessage = errorMessage;
@@ -58,6 +60,10 @@ public long startOffset() {
5860
return startOffset;
5961
}
6062

63+
public int deliveryCompleteCount() {
64+
return deliveryCompleteCount;
65+
}
66+
6167
public short errorCode() {
6268
return errorCode;
6369
}
@@ -82,6 +88,7 @@ public boolean equals(Object o) {
8288
return Objects.equals(partition, that.partition) &&
8389
Objects.equals(stateEpoch, that.stateEpoch) &&
8490
Objects.equals(startOffset, that.startOffset) &&
91+
Objects.equals(deliveryCompleteCount, that.deliveryCompleteCount) &&
8592
Objects.equals(errorCode, that.errorCode) &&
8693
Objects.equals(errorMessage, that.errorMessage) &&
8794
Objects.equals(leaderEpoch, that.leaderEpoch) &&
@@ -90,13 +97,14 @@ public boolean equals(Object o) {
9097

9198
@Override
9299
public int hashCode() {
93-
return Objects.hash(partition, stateEpoch, startOffset, errorCode, leaderEpoch, errorMessage, stateBatches);
100+
return Objects.hash(partition, stateEpoch, startOffset, deliveryCompleteCount, errorCode, leaderEpoch, errorMessage, stateBatches);
94101
}
95102

96103
public static class Builder {
97104
private int partition;
98105
private int stateEpoch;
99106
private long startOffset;
107+
private int deliveryCompleteCount;
100108
private short errorCode;
101109
private String errorMessage;
102110
private int leaderEpoch;
@@ -117,6 +125,11 @@ public Builder setStartOffset(long startOffset) {
117125
return this;
118126
}
119127

128+
public Builder setDeliveryCompleteCount(int deliveryCompleteCount) {
129+
this.deliveryCompleteCount = deliveryCompleteCount;
130+
return this;
131+
}
132+
120133
public Builder setErrorCode(short errorCode) {
121134
this.errorCode = errorCode;
122135
return this;
@@ -138,7 +151,7 @@ public Builder setStateBatches(List<PersisterStateBatch> stateBatches) {
138151
}
139152

140153
public PartitionData build() {
141-
return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, leaderEpoch, stateBatches);
154+
return new PartitionData(partition, stateEpoch, startOffset, deliveryCompleteCount, errorCode, errorMessage, leaderEpoch, stateBatches);
142155
}
143156
}
144157

@@ -148,6 +161,7 @@ public String toString() {
148161
"partition=" + partition + "," +
149162
"stateEpoch=" + stateEpoch + "," +
150163
"startOffset=" + startOffset + "," +
164+
"deliveryCompleteCount=" + deliveryCompleteCount + "," +
151165
"errorCode=" + errorCode + "," +
152166
"errorMessage=" + errorMessage + "," +
153167
"leaderEpoch=" + leaderEpoch + "," +

server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,35 +27,36 @@
2727
public class PartitionFactory {
2828
public static final int DEFAULT_STATE_EPOCH = 0;
2929
public static final int UNINITIALIZED_START_OFFSET = -1;
30+
public static final int UNINITIALIZED_DELIVERY_COMPLETE_COUNT = -1;
3031
public static final short DEFAULT_ERROR_CODE = Errors.NONE.code();
3132
public static final int DEFAULT_LEADER_EPOCH = 0;
3233
public static final String DEFAULT_ERR_MESSAGE = Errors.NONE.message();
3334

3435
public static PartitionIdData newPartitionIdData(int partition) {
35-
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, DEFAULT_LEADER_EPOCH, null);
36+
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, DEFAULT_LEADER_EPOCH, null);
3637
}
3738

3839
public static PartitionIdLeaderEpochData newPartitionIdLeaderEpochData(int partition, int leaderEpoch) {
39-
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, leaderEpoch, null);
40+
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, leaderEpoch, null);
4041
}
4142

4243
public static PartitionStateData newPartitionStateData(int partition, int stateEpoch, long startOffset) {
43-
return new PartitionData(partition, stateEpoch, startOffset, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, DEFAULT_LEADER_EPOCH, null);
44+
return new PartitionData(partition, stateEpoch, startOffset, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, DEFAULT_LEADER_EPOCH, null);
4445
}
4546

4647
public static PartitionErrorData newPartitionErrorData(int partition, short errorCode, String errorMessage) {
47-
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
48+
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
4849
}
4950

5051
public static PartitionStateSummaryData newPartitionStateSummaryData(int partition, int stateEpoch, long startOffset, int leaderEpoch, short errorCode, String errorMessage) {
51-
return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, leaderEpoch, null);
52+
return new PartitionData(partition, stateEpoch, startOffset, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, errorCode, errorMessage, leaderEpoch, null);
5253
}
5354

54-
public static PartitionStateBatchData newPartitionStateBatchData(int partition, int stateEpoch, long startOffset, int leaderEpoch, List<PersisterStateBatch> stateBatches) {
55-
return new PartitionData(partition, stateEpoch, startOffset, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, leaderEpoch, stateBatches);
55+
public static PartitionStateBatchData newPartitionStateBatchData(int partition, int stateEpoch, long startOffset, int deliveryCompleteCount, int leaderEpoch, List<PersisterStateBatch> stateBatches) {
56+
return new PartitionData(partition, stateEpoch, startOffset, deliveryCompleteCount, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, leaderEpoch, stateBatches);
5657
}
5758

5859
public static PartitionAllData newPartitionAllData(int partition, int stateEpoch, long startOffset, short errorCode, String errorMessage, List<PersisterStateBatch> stateBatches) {
59-
return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, stateBatches);
60+
return new PartitionData(partition, stateEpoch, startOffset, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, stateBatches);
6061
}
6162
}

server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateBatchData.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public interface PartitionStateBatchData extends PartitionInfoData, PartitionIdD
2828

2929
long startOffset();
3030

31+
int deliveryCompleteCount();
32+
3133
int leaderEpoch();
3234

3335
List<PersisterStateBatch> stateBatches();

0 commit comments

Comments
 (0)