41
41
import java .util .Map ;
42
42
import java .util .Optional ;
43
43
import java .util .Set ;
44
+ import java .util .concurrent .atomic .AtomicInteger ;
44
45
import java .util .concurrent .locks .Lock ;
45
46
import java .util .stream .Collectors ;
46
47
@@ -124,7 +125,7 @@ private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, FetchRequ
124
125
try {
125
126
LinkedHashMap <TopicIdPartition , LogReadResult > responseData ;
126
127
if (partitionsAlreadyFetched .isEmpty ())
127
- responseData = readFromLog (topicPartitionData );
128
+ responseData = readFromLog (topicPartitionData , shareFetch . fetchParams (). maxBytes / topicPartitionData . size () );
128
129
else
129
130
// There shouldn't be a case when we have a partitionsAlreadyFetched value here and this variable is getting
130
131
// updated in a different tryComplete thread.
@@ -206,7 +207,6 @@ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions
206
207
LinkedHashMap <TopicIdPartition , FetchRequest .PartitionData > topicPartitionData = new LinkedHashMap <>();
207
208
208
209
sharePartitions .forEach ((topicIdPartition , sharePartition ) -> {
209
- int partitionMaxBytes = shareFetch .partitionMaxBytes ().getOrDefault (topicIdPartition , 0 );
210
210
// Add the share partition to the list of partitions to be fetched only if we can
211
211
// acquire the fetch lock on it.
212
212
if (sharePartition .maybeAcquireFetchLock ()) {
@@ -219,7 +219,6 @@ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions
219
219
topicIdPartition .topicId (),
220
220
sharePartition .nextFetchOffset (),
221
221
0 ,
222
- partitionMaxBytes ,
223
222
Optional .empty ()
224
223
)
225
224
);
@@ -250,7 +249,7 @@ private LinkedHashMap<TopicIdPartition, LogReadResult> maybeReadFromLog(LinkedHa
250
249
return new LinkedHashMap <>();
251
250
}
252
251
// We fetch data from replica manager corresponding to the topic partitions that have missing fetch offset metadata.
253
- return readFromLog (partitionsNotMatchingFetchOffsetMetadata );
252
+ return readFromLog (partitionsNotMatchingFetchOffsetMetadata , shareFetch . fetchParams (). maxBytes / topicPartitionData . size () );
254
253
}
255
254
256
255
private void maybeUpdateFetchOffsetMetadata (
@@ -311,7 +310,7 @@ private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest
311
310
return true ;
312
311
} else if (fetchOffsetMetadata .onSameSegment (endOffsetMetadata )) {
313
312
// we take the partition fetch size as upper bound when accumulating the bytes.
314
- long bytesAvailable = Math .min (endOffsetMetadata .positionDiff (fetchOffsetMetadata ), partitionData . maxBytes );
313
+ long bytesAvailable = Math .min (endOffsetMetadata .positionDiff (fetchOffsetMetadata ), shareFetch . fetchParams (). maxBytes / topicPartitionData . size () );
315
314
accumulatedSize += bytesAvailable ;
316
315
}
317
316
}
@@ -334,12 +333,13 @@ else if (isolationType == FetchIsolation.HIGH_WATERMARK)
334
333
335
334
}
336
335
337
- private LinkedHashMap <TopicIdPartition , LogReadResult > readFromLog (LinkedHashMap <TopicIdPartition , FetchRequest .PartitionData > topicPartitionData ) {
336
+ private LinkedHashMap <TopicIdPartition , LogReadResult > readFromLog (LinkedHashMap <TopicIdPartition , FetchRequest .PartitionData > topicPartitionData , int partitionMaxBytes ) {
338
337
// Filter if there already exists any erroneous topic partition.
339
338
Set <TopicIdPartition > partitionsToFetch = shareFetch .filterErroneousTopicPartitions (topicPartitionData .keySet ());
340
339
if (partitionsToFetch .isEmpty ()) {
341
340
return new LinkedHashMap <>();
342
341
}
342
+ topicPartitionData .values ().forEach (partitionData -> partitionData .updateMaxBytes (partitionMaxBytes ));
343
343
344
344
Seq <Tuple2 <TopicIdPartition , LogReadResult >> responseLogResult = replicaManager .readFromLog (
345
345
shareFetch .fetchParams (),
@@ -392,15 +392,21 @@ private void handleFetchException(
392
392
LinkedHashMap <TopicIdPartition , LogReadResult > combineLogReadResponse (LinkedHashMap <TopicIdPartition , FetchRequest .PartitionData > topicPartitionData ,
393
393
LinkedHashMap <TopicIdPartition , LogReadResult > existingFetchedData ) {
394
394
LinkedHashMap <TopicIdPartition , FetchRequest .PartitionData > missingLogReadTopicPartitions = new LinkedHashMap <>();
395
+ AtomicInteger totalPartitionMaxBytesUsed = new AtomicInteger ();
395
396
topicPartitionData .forEach ((topicIdPartition , partitionData ) -> {
396
397
if (!existingFetchedData .containsKey (topicIdPartition )) {
397
398
missingLogReadTopicPartitions .put (topicIdPartition , partitionData );
399
+ } else {
400
+ totalPartitionMaxBytesUsed .addAndGet (partitionData .maxBytes );
398
401
}
399
402
});
400
403
if (missingLogReadTopicPartitions .isEmpty ()) {
401
404
return existingFetchedData ;
402
405
}
403
- LinkedHashMap <TopicIdPartition , LogReadResult > missingTopicPartitionsLogReadResponse = readFromLog (missingLogReadTopicPartitions );
406
+ LinkedHashMap <TopicIdPartition , LogReadResult > missingTopicPartitionsLogReadResponse = readFromLog (
407
+ missingLogReadTopicPartitions ,
408
+ (shareFetch .fetchParams ().maxBytes - totalPartitionMaxBytesUsed .get ()) / missingLogReadTopicPartitions .size ()
409
+ );
404
410
missingTopicPartitionsLogReadResponse .putAll (existingFetchedData );
405
411
return missingTopicPartitionsLogReadResponse ;
406
412
}
0 commit comments