4141import  java .util .Map ;
4242import  java .util .Optional ;
4343import  java .util .Set ;
44+ import  java .util .concurrent .atomic .AtomicInteger ;
4445import  java .util .concurrent .locks .Lock ;
4546import  java .util .function .BiConsumer ;
4647import  java .util .stream .Collectors ;
@@ -125,7 +126,7 @@ private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, FetchRequ
125126        try  {
126127            LinkedHashMap <TopicIdPartition , LogReadResult > responseData ;
127128            if  (partitionsAlreadyFetched .isEmpty ())
128-                 responseData  = readFromLog (topicPartitionData );
129+                 responseData  = readFromLog (topicPartitionData ,  shareFetch . fetchParams (). maxBytes  /  topicPartitionData . size () );
129130            else 
130131                // There shouldn't be a case when we have a partitionsAlreadyFetched value here and this variable is getting 
131132                // updated in a different tryComplete thread. 
@@ -207,7 +208,6 @@ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions
207208        LinkedHashMap <TopicIdPartition , FetchRequest .PartitionData > topicPartitionData  = new  LinkedHashMap <>();
208209
209210        sharePartitions .forEach ((topicIdPartition , sharePartition ) -> {
210-             int  partitionMaxBytes  = shareFetch .partitionMaxBytes ().getOrDefault (topicIdPartition , 0 );
211211            // Add the share partition to the list of partitions to be fetched only if we can 
212212            // acquire the fetch lock on it. 
213213            if  (sharePartition .maybeAcquireFetchLock ()) {
@@ -220,7 +220,6 @@ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions
220220                                topicIdPartition .topicId (),
221221                                sharePartition .nextFetchOffset (),
222222                                0 ,
223-                                 partitionMaxBytes ,
224223                                Optional .empty ()
225224                            )
226225                        );
@@ -251,7 +250,7 @@ private LinkedHashMap<TopicIdPartition, LogReadResult> maybeReadFromLog(LinkedHa
251250            return  new  LinkedHashMap <>();
252251        }
253252        // We fetch data from replica manager corresponding to the topic partitions that have missing fetch offset metadata. 
254-         return  readFromLog (partitionsNotMatchingFetchOffsetMetadata );
253+         return  readFromLog (partitionsNotMatchingFetchOffsetMetadata ,  shareFetch . fetchParams (). maxBytes  /  topicPartitionData . size () );
255254    }
256255
257256    private  void  maybeUpdateFetchOffsetMetadata (
@@ -312,7 +311,7 @@ private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest
312311                    return  true ;
313312                } else  if  (fetchOffsetMetadata .onSameSegment (endOffsetMetadata )) {
314313                    // we take the partition fetch size as upper bound when accumulating the bytes. 
315-                     long  bytesAvailable  = Math .min (endOffsetMetadata .positionDiff (fetchOffsetMetadata ), partitionData . maxBytes );
314+                     long  bytesAvailable  = Math .min (endOffsetMetadata .positionDiff (fetchOffsetMetadata ), shareFetch . fetchParams (). maxBytes  /  topicPartitionData . size () );
316315                    accumulatedSize  += bytesAvailable ;
317316                }
318317            }
@@ -335,12 +334,13 @@ else if (isolationType == FetchIsolation.HIGH_WATERMARK)
335334
336335    }
337336
338-     private  LinkedHashMap <TopicIdPartition , LogReadResult > readFromLog (LinkedHashMap <TopicIdPartition , FetchRequest .PartitionData > topicPartitionData ) {
337+     private  LinkedHashMap <TopicIdPartition , LogReadResult > readFromLog (LinkedHashMap <TopicIdPartition , FetchRequest .PartitionData > topicPartitionData ,  int   partitionMaxBytes ) {
339338        // Filter if there already exists any erroneous topic partition. 
340339        Set <TopicIdPartition > partitionsToFetch  = shareFetch .filterErroneousTopicPartitions (topicPartitionData .keySet ());
341340        if  (partitionsToFetch .isEmpty ()) {
342341            return  new  LinkedHashMap <>();
343342        }
343+         topicPartitionData .values ().forEach (partitionData  -> partitionData .updateMaxBytes (partitionMaxBytes ));
344344
345345        Seq <Tuple2 <TopicIdPartition , LogReadResult >> responseLogResult  = replicaManager .readFromLog (
346346            shareFetch .fetchParams (),
@@ -393,15 +393,21 @@ private void handleFetchException(
393393    LinkedHashMap <TopicIdPartition , LogReadResult > combineLogReadResponse (LinkedHashMap <TopicIdPartition , FetchRequest .PartitionData > topicPartitionData ,
394394                                                                LinkedHashMap <TopicIdPartition , LogReadResult > existingFetchedData ) {
395395        LinkedHashMap <TopicIdPartition , FetchRequest .PartitionData > missingLogReadTopicPartitions  = new  LinkedHashMap <>();
396+         AtomicInteger  totalPartitionMaxBytesUsed  = new  AtomicInteger ();
396397        topicPartitionData .forEach ((topicIdPartition , partitionData ) -> {
397398            if  (!existingFetchedData .containsKey (topicIdPartition )) {
398399                missingLogReadTopicPartitions .put (topicIdPartition , partitionData );
400+             } else  {
401+                 totalPartitionMaxBytesUsed .addAndGet (partitionData .maxBytes );
399402            }
400403        });
401404        if  (missingLogReadTopicPartitions .isEmpty ()) {
402405            return  existingFetchedData ;
403406        }
404-         LinkedHashMap <TopicIdPartition , LogReadResult > missingTopicPartitionsLogReadResponse  = readFromLog (missingLogReadTopicPartitions );
407+         LinkedHashMap <TopicIdPartition , LogReadResult > missingTopicPartitionsLogReadResponse  = readFromLog (
408+             missingLogReadTopicPartitions ,
409+             (shareFetch .fetchParams ().maxBytes  - totalPartitionMaxBytesUsed .get ()) / missingLogReadTopicPartitions .size ()
410+             );
405411        missingTopicPartitionsLogReadResponse .putAll (existingFetchedData );
406412        return  missingTopicPartitionsLogReadResponse ;
407413    }
0 commit comments