Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 100 additions & 52 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import kafka.server.ReplicaManager;
import kafka.server.share.SharePartitionManager.SharePartitionListener;

import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
Expand Down Expand Up @@ -138,6 +139,16 @@ enum SharePartitionState {
FENCED
}

/**
* To provide static mapping between acknowledgement type bytes to RecordState.
*/
private static final Map<Byte, RecordState> ACK_TYPE_TO_RECORD_STATE = Map.of(
(byte) 0, RecordState.ARCHIVED, // Represents gap
AcknowledgeType.ACCEPT.id, RecordState.ACKNOWLEDGED,
AcknowledgeType.RELEASE.id, RecordState.AVAILABLE,
AcknowledgeType.REJECT.id, RecordState.ARCHIVED
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether logically a fifth entry of AcknowledgeType.RENEW.id, RecordState.ACQUIRED makes sense. wdyt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though it is correct, we currently have no use for the the recordState value corresponding to to RENEW ack type. So it felt a bit more clean to leave it out.

);

/**
* The group id of the share partition belongs to.
*/
Expand Down Expand Up @@ -916,9 +927,9 @@ public CompletableFuture<Void> acknowledge(
for (ShareAcknowledgementBatch batch : acknowledgementBatches) {
// Client can either send a single entry in acknowledgeTypes which represents the state
// of the complete batch or can send individual offsets state.
Map<Long, RecordState> recordStateMap;
Map<Long, Byte> ackTypeMap;
try {
recordStateMap = fetchRecordStateMapForAcknowledgementBatch(batch);
ackTypeMap = fetchAckTypeMapForBatch(batch);
} catch (IllegalArgumentException e) {
log.debug("Invalid acknowledge type: {} for share partition: {}-{}",
batch.acknowledgeTypes(), groupId, topicIdPartition);
Expand Down Expand Up @@ -946,7 +957,7 @@ public CompletableFuture<Void> acknowledge(
Optional<Throwable> ackThrowable = acknowledgeBatchRecords(
memberId,
batch,
recordStateMap,
ackTypeMap,
subMap,
persisterBatches
);
Expand Down Expand Up @@ -1856,26 +1867,21 @@ private boolean checkForStartOffsetWithinBatch(long batchFirstOffset, long batch
return batchFirstOffset < localStartOffset && batchLastOffset >= localStartOffset;
}

private Map<Long, RecordState> fetchRecordStateMapForAcknowledgementBatch(
ShareAcknowledgementBatch batch) {
// Visibility for test
static Map<Long, Byte> fetchAckTypeMapForBatch(ShareAcknowledgementBatch batch) {
// Client can either send a single entry in acknowledgeTypes which represents the state
// of the complete batch or can send individual offsets state. Construct a map with record state
// for each offset in the batch, if single acknowledge type is sent, the map will have only one entry.
Map<Long, RecordState> recordStateMap = new HashMap<>();
Map<Long, Byte> ackTypeMap = new HashMap<>();
for (int index = 0; index < batch.acknowledgeTypes().size(); index++) {
recordStateMap.put(batch.firstOffset() + index,
fetchRecordState(batch.acknowledgeTypes().get(index)));
byte ackType = batch.acknowledgeTypes().get(index);
// Validate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is going to throw an exception if the ackType is invalid, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is going to throw an exception if the ackType is invalid, right?

Yes, correct AcknowledgeType.forId(ackType) will throw the exception. In this case the caller is SharePartition.acknowledge which surrounds the call to this method in a try catch.

if (ackType != 0) {
AcknowledgeType.forId(ackType);
}
ackTypeMap.put(batch.firstOffset() + index, ackType);
}
return recordStateMap;
}

private static RecordState fetchRecordState(byte acknowledgeType) {
return switch (acknowledgeType) {
case 1 /* ACCEPT */ -> RecordState.ACKNOWLEDGED;
case 2 /* RELEASE */ -> RecordState.AVAILABLE;
case 3, 0 /* REJECT / GAP */ -> RecordState.ARCHIVED;
default -> throw new IllegalArgumentException("Invalid acknowledge type: " + acknowledgeType);
};
return ackTypeMap;
}

private NavigableMap<Long, InFlightBatch> fetchSubMapForAcknowledgementBatch(
Expand Down Expand Up @@ -1930,7 +1936,7 @@ private NavigableMap<Long, InFlightBatch> fetchSubMapForAcknowledgementBatch(
private Optional<Throwable> acknowledgeBatchRecords(
String memberId,
ShareAcknowledgementBatch batch,
Map<Long, RecordState> recordStateMap,
Map<Long, Byte> ackTypeMap,
NavigableMap<Long, InFlightBatch> subMap,
List<PersisterBatch> persisterBatches
) {
Expand Down Expand Up @@ -1994,11 +2000,11 @@ private Optional<Throwable> acknowledgeBatchRecords(
}

throwable = acknowledgePerOffsetBatchRecords(memberId, batch, inFlightBatch,
recordStateMap, persisterBatches);
ackTypeMap, persisterBatches);
} else {
// The in-flight batch is a full match hence change the state of the complete batch.
throwable = acknowledgeCompleteBatch(batch, inFlightBatch,
recordStateMap.get(batch.firstOffset()), persisterBatches);
ackTypeMap.get(batch.firstOffset()), persisterBatches, memberId);
}

if (throwable.isPresent()) {
Expand Down Expand Up @@ -2034,14 +2040,11 @@ private Optional<Throwable> acknowledgePerOffsetBatchRecords(
String memberId,
ShareAcknowledgementBatch batch,
InFlightBatch inFlightBatch,
Map<Long, RecordState> recordStateMap,
Map<Long, Byte> ackTypeMap,
List<PersisterBatch> persisterBatches
) {
lock.writeLock().lock();
try {
// Fetch the first record state from the map to be used as default record state in case the
// offset record state is not provided by client.
RecordState recordStateDefault = recordStateMap.get(batch.firstOffset());
for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState().entrySet()) {

// 1. For the first batch which might have offsets prior to the request base
Expand Down Expand Up @@ -2081,31 +2084,50 @@ private Optional<Throwable> acknowledgePerOffsetBatchRecords(
new InvalidRecordStateException("Member is not the owner of offset"));
}

// Determine the record state for the offset. If the per offset record state is not provided
// by the client, then use the batch record state.
RecordState recordState =
recordStateMap.size() > 1 ? recordStateMap.get(offsetState.getKey()) :
recordStateDefault;
InFlightState updateResult = offsetState.getValue().startStateTransition(
recordState,
DeliveryCountOps.NO_OP,
this.maxDeliveryCount,
EMPTY_MEMBER_ID
);
if (updateResult == null) {
log.debug("Unable to acknowledge records for the offset: {} in batch: {}"
+ " for the share partition: {}-{}", offsetState.getKey(),
inFlightBatch, groupId, topicIdPartition);
return Optional.of(new InvalidRecordStateException(
"Unable to acknowledge records for the batch"));
}
// Successfully updated the state of the offset and created a persister state batch for write to persister.
persisterBatches.add(new PersisterBatch(updateResult, new PersisterStateBatch(offsetState.getKey(),
offsetState.getKey(), updateResult.state().id(), (short) updateResult.deliveryCount())));
if (isStateTerminal(updateResult.state())) {
deliveryCompleteCount.incrementAndGet();
// In case of 0 size ackTypeMap, we have already validated the batch.acknowledgeTypes.
byte ackType = ackTypeMap.size() > 1 ? ackTypeMap.get(offsetState.getKey()) : batch.acknowledgeTypes().get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just confirming, we could have used ackTypeMap.get(batch.firstOffset()) instead of batch.acknowledgeTypes().get(0) as well, right?

Copy link
Collaborator Author

@smjn smjn Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent Yes correct - just avoiding a lookup


if (ackType == AcknowledgeType.RENEW.id) {
// If RENEW, renew the acquisition lock timer for this offset and continue without changing state.
// We do not care about recordState map here.
// Only valid for ACQUIRED offsets; the check above ensures this.
long key = offsetState.getKey();
InFlightState state = offsetState.getValue();
log.debug("Renewing acq lock for {}-{} with offset {} in batch {} for member {}.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
log.debug("Renewing acq lock for {}-{} with offset {} in batch {} for member {}.",
log.debug("Renewing acquisition lock for {}-{} with offset {} in batch {} for member {}.",

groupId, topicIdPartition, key, inFlightBatch, memberId);
state.cancelAndClearAcquisitionLockTimeoutTask();
AcquisitionLockTimerTask renewalTask = scheduleAcquisitionLockTimeout(memberId, key, key);
state.updateAcquisitionLockTimeoutTask(renewalTask);
} else {
// Determine the record state for the offset. If the per offset record state is not provided
// by the client, then use the batch record state. This will always be present as it is a static
// mapping between bytes and record state type. All ack types have been added except for RENEW which
// has been handled above.
RecordState recordState = ACK_TYPE_TO_RECORD_STATE.get(ackType);
Objects.requireNonNull(recordState);
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, though post validation of ackType we should not get null recordState though but as we have check for Objects.requireNonNull in code then it can throw null pointer exception, is it the correct way to handle the incorrect ackType?


InFlightState updateResult = offsetState.getValue().startStateTransition(
recordState,
DeliveryCountOps.NO_OP,
this.maxDeliveryCount,
EMPTY_MEMBER_ID
);

if (updateResult == null) {
log.debug("Unable to acknowledge records for the offset: {} in batch: {}"
+ " for the share partition: {}-{}", offsetState.getKey(),
inFlightBatch, groupId, topicIdPartition);
return Optional.of(new InvalidRecordStateException(
"Unable to acknowledge records for the batch"));
}
// Successfully updated the state of the offset and created a persister state batch for write to persister.
persisterBatches.add(new PersisterBatch(updateResult, new PersisterStateBatch(offsetState.getKey(),
offsetState.getKey(), updateResult.state().id(), (short) updateResult.deliveryCount())));
if (isStateTerminal(updateResult.state())) {
deliveryCompleteCount.incrementAndGet();
}
// Do not update the nextFetchOffset as the offset has not completed the transition yet.
}
// Do not update the nextFetchOffset as the offset has not completed the transition yet.
}
} finally {
lock.writeLock().unlock();
Expand All @@ -2116,8 +2138,9 @@ private Optional<Throwable> acknowledgePerOffsetBatchRecords(
private Optional<Throwable> acknowledgeCompleteBatch(
ShareAcknowledgementBatch batch,
InFlightBatch inFlightBatch,
RecordState recordState,
List<PersisterBatch> persisterBatches
byte ackType,
List<PersisterBatch> persisterBatches,
String memberId
) {
lock.writeLock().lock();
try {
Expand All @@ -2131,10 +2154,30 @@ private Optional<Throwable> acknowledgeCompleteBatch(
"The batch cannot be acknowledged. The batch is not in the acquired state."));
}

// If the request is a full-batch RENEW acknowledgement (ack type 4), then renew the
// acquisition lock without changing the state or persisting anything.
// Before reaching this point, it should be verified that it is full batch ack and
// not per offset ack as well as startOffset not moved.
if (ackType == AcknowledgeType.RENEW.id) {
// Renew the acquisition lock timer for the complete batch. We have already
// checked that the batchState is ACQUIRED above.
log.debug("Renewing acq lock for {}-{} with batch {}-{} for member {}.",
groupId, topicIdPartition, inFlightBatch.firstOffset(), inFlightBatch.lastOffset(), memberId);
inFlightBatch.cancelAndClearAcquisitionLockTimeoutTask();
AcquisitionLockTimerTask renewalTask = scheduleAcquisitionLockTimeout(memberId,
inFlightBatch.firstOffset(), inFlightBatch.lastOffset());
inFlightBatch.updateAcquisitionLockTimeout(renewalTask);
// Nothing to persist.
return Optional.empty();
}

// Change the state of complete batch since the same state exists for the entire inFlight batch.
// The member id is reset to EMPTY_MEMBER_ID irrespective of the acknowledge type as the batch is
// The member id is reset to EMPTY_MEMBER_ID irrespective of the ack type as the batch is
// either released or moved to a state where member id existence is not important. The member id
// is only important when the batch is acquired.
RecordState recordState = ACK_TYPE_TO_RECORD_STATE.get(ackType);
Objects.requireNonNull(recordState);

InFlightState updateResult = inFlightBatch.startBatchStateTransition(
recordState,
DeliveryCountOps.NO_OP,
Expand Down Expand Up @@ -3121,4 +3164,9 @@ private record LastOffsetAndMaxRecords(
long lastOffset,
int maxRecords
) { }

// Visibility for testing
static Map<Byte, RecordState> ackTypeToRecordStateMapping() {
return ACK_TYPE_TO_RECORD_STATE;
}
}
Loading
Loading