Skip to content

Commit 48a506b

Browse files
KAFKA-18522: Slice records for share fetch (#18804)
The PR handles slicing of fetched records based on acquire response for share fetch. There could be additional bytes fetched from log but acquired offsets can be a subset, typically with `max fetch records` configuration. Rather sending additional bytes of fetched data to client we should slice the file and wire only needed batches. Note: If the acquired offsets are within a batch then we need to send the entire batch within the file record. Hence rather checking for individual batches, PR finds the first and last acquired offset, and trims the file for all batches between (inclusive) these two offsets. Reviewers: Christo Lolov <[email protected]>, Andrew Schofield <[email protected]>, Jun Rao <[email protected]>
1 parent 38c9843 commit 48a506b

File tree

7 files changed

+381
-72
lines changed

7 files changed

+381
-72
lines changed

core/src/main/java/kafka/server/share/ShareFetchUtils.java

+69-7
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,11 @@
2525
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
2626
import org.apache.kafka.common.errors.OffsetNotAvailableException;
2727
import org.apache.kafka.common.message.ShareFetchResponseData;
28+
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
2829
import org.apache.kafka.common.protocol.Errors;
30+
import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
2931
import org.apache.kafka.common.record.FileRecords;
32+
import org.apache.kafka.common.record.Records;
3033
import org.apache.kafka.common.requests.ListOffsetsRequest;
3134
import org.apache.kafka.server.share.SharePartitionKey;
3235
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
@@ -39,6 +42,7 @@
3942

4043
import java.util.Collections;
4144
import java.util.HashMap;
45+
import java.util.Iterator;
4246
import java.util.LinkedHashMap;
4347
import java.util.List;
4448
import java.util.Map;
@@ -122,13 +126,7 @@ static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processFetchR
122126
.setAcquiredRecords(Collections.emptyList());
123127
} else {
124128
partitionData
125-
// We set the records to the fetchPartitionData records. We do not alter the records
126-
// fetched from the replica manager as they follow zero copy buffer. The acquired records
127-
// might be a subset of the records fetched from the replica manager, depending
128-
// on the max fetch records or available records in the share partition. The client
129-
// sends the max bytes in request which should limit the bytes sent to the client
130-
// in the response.
131-
.setRecords(fetchPartitionData.records)
129+
.setRecords(maybeSliceFetchRecords(fetchPartitionData.records, shareAcquiredRecords))
132130
.setAcquiredRecords(shareAcquiredRecords.acquiredRecords());
133131
acquiredRecordsCount += shareAcquiredRecords.count();
134132
}
@@ -196,4 +194,68 @@ static Partition partition(ReplicaManager replicaManager, TopicPartition tp) {
196194
}
197195
return partition;
198196
}
197+
198+
/**
199+
* Slice the fetch records based on the acquired records. The slicing is done based on the first
200+
* and last offset of the acquired records from the list. The slicing doesn't consider individual
201+
* acquired batches rather the boundaries of the acquired list. The method expects the acquired
202+
* records list to be within the fetch records bounds.
203+
*
204+
* @param records The records to be sliced.
205+
* @param shareAcquiredRecords The share acquired records containing the non-empty acquired records.
206+
* @return The sliced records, if the records are of type FileRecords and the acquired records are a subset
207+
* of the fetched records. Otherwise, the original records are returned.
208+
*/
209+
static Records maybeSliceFetchRecords(Records records, ShareAcquiredRecords shareAcquiredRecords) {
210+
if (!(records instanceof FileRecords fileRecords)) {
211+
return records;
212+
}
213+
// The acquired records should be non-empty, do not check as the method is called only when the
214+
// acquired records are non-empty.
215+
List<AcquiredRecords> acquiredRecords = shareAcquiredRecords.acquiredRecords();
216+
try {
217+
final Iterator<FileChannelRecordBatch> iterator = fileRecords.batchIterator();
218+
// Track the first overlapping batch with the first acquired offset.
219+
FileChannelRecordBatch firstOverlapBatch = iterator.next();
220+
// If there exists single fetch batch, then return the original records.
221+
if (!iterator.hasNext()) {
222+
return records;
223+
}
224+
// Find the first and last acquired offset to slice the records.
225+
final long firstAcquiredOffset = acquiredRecords.get(0).firstOffset();
226+
final long lastAcquiredOffset = acquiredRecords.get(acquiredRecords.size() - 1).lastOffset();
227+
int startPosition = 0;
228+
int size = 0;
229+
// Start iterating from the second batch.
230+
while (iterator.hasNext()) {
231+
FileChannelRecordBatch batch = iterator.next();
232+
// Iterate until finds the first overlap batch with the first acquired offset. All the
233+
// batches before this first overlap batch should be sliced hence increment the start
234+
// position.
235+
if (batch.baseOffset() <= firstAcquiredOffset) {
236+
startPosition += firstOverlapBatch.sizeInBytes();
237+
firstOverlapBatch = batch;
238+
continue;
239+
}
240+
// Break if traversed all the batches till the last acquired offset.
241+
if (batch.baseOffset() > lastAcquiredOffset) {
242+
break;
243+
}
244+
size += batch.sizeInBytes();
245+
}
246+
// Include the first overlap batch as it's the last batch traversed which overlapped the first
247+
// acquired offset.
248+
size += firstOverlapBatch.sizeInBytes();
249+
// Check if we do not need slicing i.e. neither start position nor size changed.
250+
if (startPosition == 0 && size == fileRecords.sizeInBytes()) {
251+
return records;
252+
}
253+
return fileRecords.slice(startPosition, size);
254+
} catch (Exception e) {
255+
log.error("Error while checking batches for acquired records: {}, skipping slicing.", acquiredRecords, e);
256+
// If there is an exception while slicing, return the original records so that the fetch
257+
// can continue with the original records.
258+
return records;
259+
}
260+
}
199261
}

core/src/test/java/kafka/server/share/DelayedShareFetchTest.java

+14-14
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.apache.kafka.server.share.SharePartitionKey;
3636
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
3737
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
38-
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
3938
import org.apache.kafka.server.share.fetch.ShareFetch;
4039
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
4140
import org.apache.kafka.server.storage.log.FetchIsolation;
@@ -74,6 +73,7 @@
7473
import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES;
7574
import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult;
7675
import static kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch;
76+
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
7777
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.orderedMap;
7878
import static org.junit.jupiter.api.Assertions.assertEquals;
7979
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -186,7 +186,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() {
186186
when(sp0.canAcquireRecords()).thenReturn(true);
187187
when(sp1.canAcquireRecords()).thenReturn(false);
188188
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any())).thenReturn(
189-
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
189+
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
190190

191191
// We are testing the case when the share partition is getting fetched for the first time, so for the first time
192192
// the fetchOffsetMetadata will return empty. Post the readFromLog call, the fetchOffsetMetadata will be
@@ -259,7 +259,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnSubsequentFetch() {
259259
when(sp0.canAcquireRecords()).thenReturn(true);
260260
when(sp1.canAcquireRecords()).thenReturn(false);
261261
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any())).thenReturn(
262-
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
262+
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
263263

264264
// We are testing the case when the share partition has been fetched before, hence we are mocking positionDiff
265265
// functionality to give the file position difference as 1 byte, so it doesn't satisfy the minBytes(2).
@@ -312,7 +312,7 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() {
312312
when(sp0.canAcquireRecords()).thenReturn(true);
313313
when(sp1.canAcquireRecords()).thenReturn(false);
314314
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
315-
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
315+
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
316316
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
317317

318318
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
@@ -427,7 +427,7 @@ public void testReplicaManagerFetchShouldHappenOnComplete() {
427427
when(sp0.canAcquireRecords()).thenReturn(true);
428428
when(sp1.canAcquireRecords()).thenReturn(false);
429429
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
430-
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
430+
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
431431
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
432432

433433
PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0));
@@ -591,7 +591,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() {
591591
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
592592
when(sp1.canAcquireRecords()).thenReturn(true);
593593
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
594-
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
594+
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
595595

596596
// when forceComplete is called for delayedShareFetch2, since tp1 is common in between delayed share fetch
597597
// requests, it should add a "check and complete" action for request key tp1 on the purgatory.
@@ -689,7 +689,7 @@ public void testExceptionInMinBytesCalculation() {
689689

690690
when(sp0.canAcquireRecords()).thenReturn(true);
691691
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any())).thenReturn(
692-
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
692+
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
693693
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
694694

695695
// Mocking partition object to throw an exception during min bytes calculation while calling fetchOffsetSnapshot
@@ -897,15 +897,15 @@ public void testPartitionMaxBytesFromUniformStrategyWhenAllPartitionsAreAcquirab
897897
BROKER_TOPIC_STATS);
898898

899899
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
900-
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
900+
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
901901
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
902-
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
902+
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
903903
when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
904-
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
904+
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
905905
when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
906-
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
906+
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
907907
when(sp4.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
908-
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
908+
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
909909

910910
// All 5 partitions are acquirable.
911911
doAnswer(invocation -> buildLogReadResult(sharePartitions.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
@@ -995,9 +995,9 @@ public void testPartitionMaxBytesFromUniformStrategyWhenFewPartitionsAreAcquirab
995995
BROKER_TOPIC_STATS);
996996

997997
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
998-
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
998+
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
999999
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
1000-
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
1000+
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
10011001

10021002
// Only 2 out of 5 partitions are acquirable.
10031003
Set<TopicIdPartition> acquirableTopicPartitions = new LinkedHashSet<>();

0 commit comments

Comments
 (0)