Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
"type": "request",
"listeners": ["broker"],
"name": "WriteShareGroupStateRequest",
"validVersions": "0",
// Version 0 is the initial version (KIP-932).
//
// Version 1 introduces DeliveryCompleteCount (KIP-1226).
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+",
Expand All @@ -37,6 +40,8 @@
"about": "The leader epoch of the share-partition." },
{ "name": "StartOffset", "type": "int64", "versions": "0+",
"about": "The share-partition start offset, or -1 if the start offset is not being written." },
{ "name": "DeliveryCompleteCount", "type": "int32", "versions": "1+", "ignorable": "true", "default": "-1",
"about": "The number of offsets greater than or equal to share-partition start offset for which delivery has been completed." },
{ "name": "StateBatches", "type": "[]StateBatch", "versions": "0+",
"about": "The state batches for the share-partition.", "fields": [
{ "name": "FirstOffset", "type": "int64", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
"apiKey": 85,
"type": "response",
"name": "WriteShareGroupStateResponse",
"validVersions": "0",
// Version 0 is the initial version (KIP-932).
//
// Version 1 introduces DeliveryCompleteCount in the request (KIP-1226).
"validVersions": "0-1",
"flexibleVersions": "0+",
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3737,6 +3737,7 @@ private WriteShareGroupStateRequest createWriteShareGroupStateRequest(short vers
.setPartition(0)
.setStateEpoch(0)
.setStartOffset(0)
.setDeliveryCompleteCount(0)
.setStateBatches(singletonList(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(0)
Expand Down
80 changes: 40 additions & 40 deletions core/src/main/java/kafka/server/share/SharePartition.java

Large diffs are not rendered by default.

626 changes: 313 additions & 313 deletions core/src/test/java/kafka/server/share/SharePartitionTest.java

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13299,6 +13299,7 @@ class KafkaApisTest extends Logging {
.setLeaderEpoch(1)
.setStateEpoch(2)
.setStartOffset(10)
.setDeliveryCompleteCount(5)
.setStateBatches(util.List.of(
new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(11)
Expand Down Expand Up @@ -13346,6 +13347,7 @@ class KafkaApisTest extends Logging {
.setLeaderEpoch(1)
.setStateEpoch(2)
.setStartOffset(10)
.setDeliveryCompleteCount(5)
.setStateBatches(util.List.of(
new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(11)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ stateManager.new WriteStateHandler(
partitionData.stateEpoch(),
partitionData.leaderEpoch(),
partitionData.startOffset(),
partitionData.deliveryCompleteCount(),
partitionData.stateBatches(),
future, null)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,18 @@ public class PartitionData implements
private final int partition;
private final int stateEpoch;
private final long startOffset;
private final int deliveryCompleteCount;
private final short errorCode;
private final String errorMessage;
private final int leaderEpoch;
private final List<PersisterStateBatch> stateBatches;

public PartitionData(int partition, int stateEpoch, long startOffset, short errorCode,
public PartitionData(int partition, int stateEpoch, long startOffset, int deliveryCompleteCount, short errorCode,
String errorMessage, int leaderEpoch, List<PersisterStateBatch> stateBatches) {
this.partition = partition;
this.stateEpoch = stateEpoch;
this.startOffset = startOffset;
this.deliveryCompleteCount = deliveryCompleteCount;
this.errorCode = errorCode;
this.leaderEpoch = leaderEpoch;
this.errorMessage = errorMessage;
Expand All @@ -58,6 +60,10 @@ public long startOffset() {
return startOffset;
}

public int deliveryCompleteCount() {
return deliveryCompleteCount;
}

public short errorCode() {
return errorCode;
}
Expand All @@ -82,6 +88,7 @@ public boolean equals(Object o) {
return Objects.equals(partition, that.partition) &&
Objects.equals(stateEpoch, that.stateEpoch) &&
Objects.equals(startOffset, that.startOffset) &&
Objects.equals(deliveryCompleteCount, that.deliveryCompleteCount) &&
Objects.equals(errorCode, that.errorCode) &&
Objects.equals(errorMessage, that.errorMessage) &&
Objects.equals(leaderEpoch, that.leaderEpoch) &&
Expand All @@ -90,13 +97,14 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(partition, stateEpoch, startOffset, errorCode, leaderEpoch, errorMessage, stateBatches);
return Objects.hash(partition, stateEpoch, startOffset, deliveryCompleteCount, errorCode, leaderEpoch, errorMessage, stateBatches);
}

public static class Builder {
private int partition;
private int stateEpoch;
private long startOffset;
private int deliveryCompleteCount;
private short errorCode;
private String errorMessage;
private int leaderEpoch;
Expand All @@ -117,6 +125,11 @@ public Builder setStartOffset(long startOffset) {
return this;
}

public Builder setDeliveryCompleteCount(int deliveryCompleteCount) {
this.deliveryCompleteCount = deliveryCompleteCount;
return this;
}

public Builder setErrorCode(short errorCode) {
this.errorCode = errorCode;
return this;
Expand All @@ -138,7 +151,7 @@ public Builder setStateBatches(List<PersisterStateBatch> stateBatches) {
}

public PartitionData build() {
return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, leaderEpoch, stateBatches);
return new PartitionData(partition, stateEpoch, startOffset, deliveryCompleteCount, errorCode, errorMessage, leaderEpoch, stateBatches);
}
}

Expand All @@ -148,6 +161,7 @@ public String toString() {
"partition=" + partition + "," +
"stateEpoch=" + stateEpoch + "," +
"startOffset=" + startOffset + "," +
"deliveryCompleteCount=" + deliveryCompleteCount + "," +
"errorCode=" + errorCode + "," +
"errorMessage=" + errorMessage + "," +
"leaderEpoch=" + leaderEpoch + "," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,36 @@
public class PartitionFactory {
public static final int DEFAULT_STATE_EPOCH = 0;
public static final int UNINITIALIZED_START_OFFSET = -1;
public static final int UNINITIALIZED_DELIVERY_COMPLETE_COUNT = -1;
public static final short DEFAULT_ERROR_CODE = Errors.NONE.code();
public static final int DEFAULT_LEADER_EPOCH = 0;
public static final String DEFAULT_ERR_MESSAGE = Errors.NONE.message();

public static PartitionIdData newPartitionIdData(int partition) {
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, DEFAULT_LEADER_EPOCH, null);
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, DEFAULT_LEADER_EPOCH, null);
}

public static PartitionIdLeaderEpochData newPartitionIdLeaderEpochData(int partition, int leaderEpoch) {
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, leaderEpoch, null);
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, leaderEpoch, null);
}

public static PartitionStateData newPartitionStateData(int partition, int stateEpoch, long startOffset) {
return new PartitionData(partition, stateEpoch, startOffset, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, DEFAULT_LEADER_EPOCH, null);
return new PartitionData(partition, stateEpoch, startOffset, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, DEFAULT_LEADER_EPOCH, null);
}

public static PartitionErrorData newPartitionErrorData(int partition, short errorCode, String errorMessage) {
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
return new PartitionData(partition, DEFAULT_STATE_EPOCH, UNINITIALIZED_START_OFFSET, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
}

public static PartitionStateSummaryData newPartitionStateSummaryData(int partition, int stateEpoch, long startOffset, int leaderEpoch, short errorCode, String errorMessage) {
return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, leaderEpoch, null);
return new PartitionData(partition, stateEpoch, startOffset, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, errorCode, errorMessage, leaderEpoch, null);
}

public static PartitionStateBatchData newPartitionStateBatchData(int partition, int stateEpoch, long startOffset, int leaderEpoch, List<PersisterStateBatch> stateBatches) {
return new PartitionData(partition, stateEpoch, startOffset, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, leaderEpoch, stateBatches);
public static PartitionStateBatchData newPartitionStateBatchData(int partition, int stateEpoch, long startOffset, int deliveryCompleteCount, int leaderEpoch, List<PersisterStateBatch> stateBatches) {
return new PartitionData(partition, stateEpoch, startOffset, deliveryCompleteCount, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, leaderEpoch, stateBatches);
}

public static PartitionAllData newPartitionAllData(int partition, int stateEpoch, long startOffset, short errorCode, String errorMessage, List<PersisterStateBatch> stateBatches) {
return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, stateBatches);
return new PartitionData(partition, stateEpoch, startOffset, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, stateBatches);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public interface PartitionStateBatchData extends PartitionInfoData, PartitionIdD

long startOffset();

int deliveryCompleteCount();

int leaderEpoch();

List<PersisterStateBatch> stateBatches();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ public class WriteStateHandler extends PersisterStateManagerHandler {
private final int stateEpoch;
private final int leaderEpoch;
private final long startOffset;
private final int deliveryCompleteCount;
private final List<PersisterStateBatch> batches;
private final CompletableFuture<WriteShareGroupStateResponse> result;
private final BackoffManager writeStateBackoff;
Expand All @@ -667,6 +668,7 @@ public WriteStateHandler(
int stateEpoch,
int leaderEpoch,
long startOffset,
int deliveryCompleteCount,
List<PersisterStateBatch> batches,
CompletableFuture<WriteShareGroupStateResponse> result,
long backoffMs,
Expand All @@ -677,6 +679,7 @@ public WriteStateHandler(
this.stateEpoch = stateEpoch;
this.leaderEpoch = leaderEpoch;
this.startOffset = startOffset;
this.deliveryCompleteCount = deliveryCompleteCount;
this.batches = batches;
this.result = result;
this.writeStateBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs);
Expand All @@ -689,6 +692,7 @@ public WriteStateHandler(
int stateEpoch,
int leaderEpoch,
long startOffset,
int deliveryCompleteCount,
List<PersisterStateBatch> batches,
CompletableFuture<WriteShareGroupStateResponse> result,
Consumer<ClientResponse> onCompleteCallback
Expand All @@ -700,6 +704,7 @@ public WriteStateHandler(
stateEpoch,
leaderEpoch,
startOffset,
deliveryCompleteCount,
batches,
result,
REQUEST_BACKOFF_MS,
Expand Down Expand Up @@ -1456,6 +1461,7 @@ private static AbstractRequest.Builder<? extends AbstractRequest> coalesceWrites
.setStateEpoch(handler.stateEpoch)
.setLeaderEpoch(handler.leaderEpoch)
.setStartOffset(handler.startOffset)
.setDeliveryCompleteCount(handler.deliveryCompleteCount)
.setStateBatches(handler.batches.stream()
.map(batch -> new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(batch.firstOffset())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ public static WriteShareGroupStateParameters from(WriteShareGroupStateRequestDat
.setGroupTopicPartitionData(new GroupTopicPartitionData<>(data.groupId(), data.topics().stream()
.map(writeStateData -> new TopicData<>(writeStateData.topicId(),
writeStateData.partitions().stream()
.map(partitionData -> PartitionFactory.newPartitionStateBatchData(partitionData.partition(), partitionData.stateEpoch(), partitionData.startOffset(),
.map(partitionData -> PartitionFactory.newPartitionStateBatchData(
partitionData.partition(),
partitionData.stateEpoch(),
partitionData.startOffset(),
partitionData.deliveryCompleteCount(),
partitionData.leaderEpoch(),
partitionData.stateBatches().stream()
.map(PersisterStateBatch::from)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public void testWriteStateValidate() {
.setGroupId(groupId)
.setTopicsData(List.of(new TopicData<>(null,
List.of(PartitionFactory.newPartitionStateBatchData(
partition, 1, 0, 0, null))))).build()).build());
partition, 1, 0, 0, 0, null))))).build()).build());
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(IllegalArgumentException.class, result);
Expand All @@ -200,7 +200,7 @@ public void testWriteStateValidate() {
.setGroupId(groupId)
.setTopicsData(List.of(new TopicData<>(topicId,
List.of(PartitionFactory.newPartitionStateBatchData(
incorrectPartition, 1, 0, 0, null))))).build()).build());
incorrectPartition, 1, 0, 0, 0, null))))).build()).build());
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(IllegalArgumentException.class, result);
Expand Down Expand Up @@ -600,6 +600,7 @@ public void testWriteStateSuccess() {
.setStateEpoch(0)
.setLeaderEpoch(1)
.setStartOffset(0)
.setDeliveryCompleteCount(11)
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,7 @@ public void testWriteStateRequestCoordinatorFoundSuccessfully() {
0,
0,
0,
0,
stateBatches,
future,
REQUEST_BACKOFF_MS,
Expand Down Expand Up @@ -865,6 +866,7 @@ public void testWriteStateRequestRetryWithNotCoordinatorSuccessfulOnRetry() thro
0,
0,
0,
0,
stateBatches,
future,
REQUEST_BACKOFF_MS,
Expand Down Expand Up @@ -985,6 +987,7 @@ public void testWriteStateRequestCoordinatorFoundOnRetry() {
0,
0,
0,
0,
stateBatches,
future,
REQUEST_BACKOFF_MS,
Expand Down Expand Up @@ -1077,6 +1080,7 @@ public void testWriteStateRequestWithCoordinatorNodeLookup() {
0,
0,
0,
0,
stateBatches,
future,
REQUEST_BACKOFF_MS,
Expand Down Expand Up @@ -1191,6 +1195,7 @@ public void testWriteStateRequestWithRetryAndCoordinatorNodeLookup() {
0,
0,
0,
0,
stateBatches,
future,
REQUEST_BACKOFF_MS,
Expand Down Expand Up @@ -1326,6 +1331,7 @@ public void testWriteStateRequestFailedMaxRetriesExhausted() {
0,
0,
0,
0,
stateBatches,
future,
REQUEST_BACKOFF_MS,
Expand Down Expand Up @@ -1430,6 +1436,7 @@ public void testWriteStateRequestBatchingWithCoordinatorNodeLookup() throws Exec
0,
0,
0,
0,
stateBatches,
future,
REQUEST_BACKOFF_MS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public static CoordinatorRecord newShareSnapshotRecord(String groupId, Uuid topi
.setStateEpoch(offsetData.stateEpoch())
.setLeaderEpoch(offsetData.leaderEpoch())
.setStartOffset(offsetData.startOffset())
.setDeliveryCompleteCount(offsetData.deliveryCompleteCount())
.setStateBatches(offsetData.stateBatches().stream()
.map(batch -> new ShareSnapshotValue.StateBatch()
.setFirstOffset(batch.firstOffset())
Expand All @@ -61,6 +62,7 @@ public static CoordinatorRecord newShareUpdateRecord(String groupId, Uuid topicI
.setSnapshotEpoch(offsetData.snapshotEpoch())
.setLeaderEpoch(offsetData.leaderEpoch())
.setStartOffset(offsetData.startOffset())
.setDeliveryCompleteCount(offsetData.deliveryCompleteCount())
.setStateBatches(offsetData.stateBatches().stream()
.map(batch -> new ShareUpdateValue.StateBatch()
.setFirstOffset(batch.firstOffset())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ public CompletableFuture<WriteShareGroupStateResponseData> writeState(RequestCon
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(partitionData.partition())
.setStartOffset(partitionData.startOffset())
.setDeliveryCompleteCount(partitionData.deliveryCompleteCount())
.setLeaderEpoch(partitionData.leaderEpoch())
.setStateEpoch(partitionData.stateEpoch())
.setStateBatches(partitionData.stateBatches())))))))
Expand Down
Loading