2727import org .apache .kafka .server .purgatory .DelayedOperation ;
2828import org .apache .kafka .server .share .SharePartitionKey ;
2929import org .apache .kafka .server .share .fetch .DelayedShareFetchGroupKey ;
30+ import org .apache .kafka .server .share .fetch .PartitionMaxBytesStrategy ;
3031import org .apache .kafka .server .share .fetch .ShareFetch ;
3132import org .apache .kafka .server .storage .log .FetchIsolation ;
3233import org .apache .kafka .server .storage .log .FetchPartitionData ;
@@ -60,24 +61,35 @@ public class DelayedShareFetch extends DelayedOperation {
6061 private final ShareFetch shareFetch ;
6162 private final ReplicaManager replicaManager ;
6263 private final BiConsumer <SharePartitionKey , Throwable > exceptionHandler ;
64+ private final PartitionMaxBytesStrategy partitionMaxBytesStrategy ;
6365 // The topic partitions that need to be completed for the share fetch request are given by sharePartitions.
6466 // sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important.
6567 private final LinkedHashMap <TopicIdPartition , SharePartition > sharePartitions ;
66- private LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > partitionsAcquired ;
68+ private LinkedHashMap <TopicIdPartition , Long > partitionsAcquired ;
6769 private LinkedHashMap <TopicIdPartition , LogReadResult > partitionsAlreadyFetched ;
6870
6971 DelayedShareFetch (
7072 ShareFetch shareFetch ,
7173 ReplicaManager replicaManager ,
7274 BiConsumer <SharePartitionKey , Throwable > exceptionHandler ,
7375 LinkedHashMap <TopicIdPartition , SharePartition > sharePartitions ) {
76+ this (shareFetch , replicaManager , exceptionHandler , sharePartitions , PartitionMaxBytesStrategy .type (PartitionMaxBytesStrategy .StrategyType .UNIFORM ));
77+ }
78+
79+ DelayedShareFetch (
80+ ShareFetch shareFetch ,
81+ ReplicaManager replicaManager ,
82+ BiConsumer <SharePartitionKey , Throwable > exceptionHandler ,
83+ LinkedHashMap <TopicIdPartition , SharePartition > sharePartitions ,
84+ PartitionMaxBytesStrategy partitionMaxBytesStrategy ) {
7485 super (shareFetch .fetchParams ().maxWaitMs , Optional .empty ());
7586 this .shareFetch = shareFetch ;
7687 this .replicaManager = replicaManager ;
7788 this .partitionsAcquired = new LinkedHashMap <>();
7889 this .partitionsAlreadyFetched = new LinkedHashMap <>();
7990 this .exceptionHandler = exceptionHandler ;
8091 this .sharePartitions = sharePartitions ;
92+ this .partitionMaxBytesStrategy = partitionMaxBytesStrategy ;
8193 }
8294
8395 @ Override
@@ -99,7 +111,7 @@ public void onComplete() {
99111 partitionsAcquired .keySet ());
100112
101113 try {
102- LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > topicPartitionData ;
114+ LinkedHashMap <TopicIdPartition , Long > topicPartitionData ;
103115 // tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch.
104116 if (partitionsAcquired .isEmpty ())
105117 topicPartitionData = acquirablePartitions ();
@@ -121,11 +133,13 @@ public void onComplete() {
121133 }
122134 }
123135
124- private void completeShareFetchRequest (LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > topicPartitionData ) {
136+ private void completeShareFetchRequest (LinkedHashMap <TopicIdPartition , Long > topicPartitionData ) {
125137 try {
126138 LinkedHashMap <TopicIdPartition , LogReadResult > responseData ;
127139 if (partitionsAlreadyFetched .isEmpty ())
128- responseData = readFromLog (topicPartitionData );
140+ responseData = readFromLog (
141+ topicPartitionData ,
142+ partitionMaxBytesStrategy .maxBytes (shareFetch .fetchParams ().maxBytes , topicPartitionData .keySet (), topicPartitionData .size ()));
129143 else
130144 // There shouldn't be a case when we have a partitionsAlreadyFetched value here and this variable is getting
131145 // updated in a different tryComplete thread.
@@ -158,7 +172,7 @@ private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, FetchRequ
158172 */
159173 @ Override
160174 public boolean tryComplete () {
161- LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > topicPartitionData = acquirablePartitions ();
175+ LinkedHashMap <TopicIdPartition , Long > topicPartitionData = acquirablePartitions ();
162176
163177 try {
164178 if (!topicPartitionData .isEmpty ()) {
@@ -167,7 +181,7 @@ public boolean tryComplete() {
167181 // those topic partitions.
168182 LinkedHashMap <TopicIdPartition , LogReadResult > replicaManagerReadResponse = maybeReadFromLog (topicPartitionData );
169183 maybeUpdateFetchOffsetMetadata (topicPartitionData , replicaManagerReadResponse );
170- if (anyPartitionHasLogReadError (replicaManagerReadResponse ) || isMinBytesSatisfied (topicPartitionData )) {
184+ if (anyPartitionHasLogReadError (replicaManagerReadResponse ) || isMinBytesSatisfied (topicPartitionData , partitionMaxBytesStrategy . maxBytes ( shareFetch . fetchParams (). maxBytes , topicPartitionData . keySet (), topicPartitionData . size ()) )) {
171185 partitionsAcquired = topicPartitionData ;
172186 partitionsAlreadyFetched = replicaManagerReadResponse ;
173187 boolean completedByMe = forceComplete ();
@@ -202,28 +216,18 @@ public boolean tryComplete() {
202216 * Prepare fetch request structure for partitions in the share fetch request for which we can acquire records.
203217 */
204218 // Visible for testing
205- LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > acquirablePartitions () {
219+ LinkedHashMap <TopicIdPartition , Long > acquirablePartitions () {
206220 // Initialize the topic partitions for which the fetch should be attempted.
207- LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > topicPartitionData = new LinkedHashMap <>();
221+ LinkedHashMap <TopicIdPartition , Long > topicPartitionData = new LinkedHashMap <>();
208222
209223 sharePartitions .forEach ((topicIdPartition , sharePartition ) -> {
210- int partitionMaxBytes = shareFetch .partitionMaxBytes ().getOrDefault (topicIdPartition , 0 );
211224 // Add the share partition to the list of partitions to be fetched only if we can
212225 // acquire the fetch lock on it.
213226 if (sharePartition .maybeAcquireFetchLock ()) {
214227 try {
215228 // If the share partition is already at capacity, we should not attempt to fetch.
216229 if (sharePartition .canAcquireRecords ()) {
217- topicPartitionData .put (
218- topicIdPartition ,
219- new FetchRequest .PartitionData (
220- topicIdPartition .topicId (),
221- sharePartition .nextFetchOffset (),
222- 0 ,
223- partitionMaxBytes ,
224- Optional .empty ()
225- )
226- );
230+ topicPartitionData .put (topicIdPartition , sharePartition .nextFetchOffset ());
227231 } else {
228232 sharePartition .releaseFetchLock ();
229233 log .trace ("Record lock partition limit exceeded for SharePartition {}, " +
@@ -239,24 +243,28 @@ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions
239243 return topicPartitionData ;
240244 }
241245
242- private LinkedHashMap <TopicIdPartition , LogReadResult > maybeReadFromLog (LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > topicPartitionData ) {
243- LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > partitionsNotMatchingFetchOffsetMetadata = new LinkedHashMap <>();
244- topicPartitionData .forEach ((topicIdPartition , partitionData ) -> {
246+ private LinkedHashMap <TopicIdPartition , LogReadResult > maybeReadFromLog (LinkedHashMap <TopicIdPartition , Long > topicPartitionData ) {
247+ LinkedHashMap <TopicIdPartition , Long > partitionsNotMatchingFetchOffsetMetadata = new LinkedHashMap <>();
248+ topicPartitionData .forEach ((topicIdPartition , fetchOffset ) -> {
245249 SharePartition sharePartition = sharePartitions .get (topicIdPartition );
246- if (sharePartition .fetchOffsetMetadata (partitionData . fetchOffset ).isEmpty ()) {
247- partitionsNotMatchingFetchOffsetMetadata .put (topicIdPartition , partitionData );
250+ if (sharePartition .fetchOffsetMetadata (fetchOffset ).isEmpty ()) {
251+ partitionsNotMatchingFetchOffsetMetadata .put (topicIdPartition , fetchOffset );
248252 }
249253 });
250254 if (partitionsNotMatchingFetchOffsetMetadata .isEmpty ()) {
251255 return new LinkedHashMap <>();
252256 }
253257 // We fetch data from replica manager corresponding to the topic partitions that have missing fetch offset metadata.
254- return readFromLog (partitionsNotMatchingFetchOffsetMetadata );
258+ // Although we are fetching partition max bytes for partitionsNotMatchingFetchOffsetMetadata,
259+ // we will take acquired partitions size = topicPartitionData.size() because we do not want to let the
260+ // leftover partitions to starve which will be fetched later.
261+ return readFromLog (
262+ partitionsNotMatchingFetchOffsetMetadata ,
263+ partitionMaxBytesStrategy .maxBytes (shareFetch .fetchParams ().maxBytes , partitionsNotMatchingFetchOffsetMetadata .keySet (), topicPartitionData .size ()));
255264 }
256265
257- private void maybeUpdateFetchOffsetMetadata (
258- LinkedHashMap <TopicIdPartition , FetchRequest .PartitionData > topicPartitionData ,
259- LinkedHashMap <TopicIdPartition , LogReadResult > replicaManagerReadResponseData ) {
266+ private void maybeUpdateFetchOffsetMetadata (LinkedHashMap <TopicIdPartition , Long > topicPartitionData ,
267+ LinkedHashMap <TopicIdPartition , LogReadResult > replicaManagerReadResponseData ) {
260268 for (Map .Entry <TopicIdPartition , LogReadResult > entry : replicaManagerReadResponseData .entrySet ()) {
261269 TopicIdPartition topicIdPartition = entry .getKey ();
262270 SharePartition sharePartition = sharePartitions .get (topicIdPartition );
@@ -267,17 +275,18 @@ private void maybeUpdateFetchOffsetMetadata(
267275 continue ;
268276 }
269277 sharePartition .updateFetchOffsetMetadata (
270- topicPartitionData .get (topicIdPartition ). fetchOffset ,
278+ topicPartitionData .get (topicIdPartition ),
271279 replicaManagerLogReadResult .info ().fetchOffsetMetadata );
272280 }
273281 }
274282
275283 // minByes estimation currently assumes the common case where all fetched data is acquirable.
276- private boolean isMinBytesSatisfied (LinkedHashMap <TopicIdPartition , FetchRequest .PartitionData > topicPartitionData ) {
284+ private boolean isMinBytesSatisfied (LinkedHashMap <TopicIdPartition , Long > topicPartitionData ,
285+ LinkedHashMap <TopicIdPartition , Integer > partitionMaxBytes ) {
277286 long accumulatedSize = 0 ;
278- for (Map .Entry <TopicIdPartition , FetchRequest . PartitionData > entry : topicPartitionData .entrySet ()) {
287+ for (Map .Entry <TopicIdPartition , Long > entry : topicPartitionData .entrySet ()) {
279288 TopicIdPartition topicIdPartition = entry .getKey ();
280- FetchRequest . PartitionData partitionData = entry .getValue ();
289+ long fetchOffset = entry .getValue ();
281290
282291 LogOffsetMetadata endOffsetMetadata ;
283292 try {
@@ -294,7 +303,7 @@ private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest
294303
295304 SharePartition sharePartition = sharePartitions .get (topicIdPartition );
296305
297- Optional <LogOffsetMetadata > optionalFetchOffsetMetadata = sharePartition .fetchOffsetMetadata (partitionData . fetchOffset );
306+ Optional <LogOffsetMetadata > optionalFetchOffsetMetadata = sharePartition .fetchOffsetMetadata (fetchOffset );
298307 if (optionalFetchOffsetMetadata .isEmpty () || optionalFetchOffsetMetadata .get () == LogOffsetMetadata .UNKNOWN_OFFSET_METADATA )
299308 continue ;
300309 LogOffsetMetadata fetchOffsetMetadata = optionalFetchOffsetMetadata .get ();
@@ -312,7 +321,7 @@ private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest
312321 return true ;
313322 } else if (fetchOffsetMetadata .onSameSegment (endOffsetMetadata )) {
314323 // we take the partition fetch size as upper bound when accumulating the bytes.
315- long bytesAvailable = Math .min (endOffsetMetadata .positionDiff (fetchOffsetMetadata ), partitionData . maxBytes );
324+ long bytesAvailable = Math .min (endOffsetMetadata .positionDiff (fetchOffsetMetadata ), partitionMaxBytes . get ( topicIdPartition ) );
316325 accumulatedSize += bytesAvailable ;
317326 }
318327 }
@@ -335,13 +344,25 @@ else if (isolationType == FetchIsolation.HIGH_WATERMARK)
335344
336345 }
337346
338- private LinkedHashMap <TopicIdPartition , LogReadResult > readFromLog (LinkedHashMap <TopicIdPartition , FetchRequest .PartitionData > topicPartitionData ) {
347+ private LinkedHashMap <TopicIdPartition , LogReadResult > readFromLog (LinkedHashMap <TopicIdPartition , Long > topicPartitionFetchOffsets ,
348+ LinkedHashMap <TopicIdPartition , Integer > partitionMaxBytes ) {
339349 // Filter if there already exists any erroneous topic partition.
340- Set <TopicIdPartition > partitionsToFetch = shareFetch .filterErroneousTopicPartitions (topicPartitionData .keySet ());
350+ Set <TopicIdPartition > partitionsToFetch = shareFetch .filterErroneousTopicPartitions (topicPartitionFetchOffsets .keySet ());
341351 if (partitionsToFetch .isEmpty ()) {
342352 return new LinkedHashMap <>();
343353 }
344354
355+ LinkedHashMap <TopicIdPartition , FetchRequest .PartitionData > topicPartitionData = new LinkedHashMap <>();
356+
357+ topicPartitionFetchOffsets .forEach ((topicIdPartition , fetchOffset ) -> topicPartitionData .put (topicIdPartition ,
358+ new FetchRequest .PartitionData (
359+ topicIdPartition .topicId (),
360+ fetchOffset ,
361+ 0 ,
362+ partitionMaxBytes .get (topicIdPartition ),
363+ Optional .empty ())
364+ ));
365+
345366 Seq <Tuple2 <TopicIdPartition , LogReadResult >> responseLogResult = replicaManager .readFromLog (
346367 shareFetch .fetchParams (),
347368 CollectionConverters .asScala (
@@ -390,18 +411,21 @@ private void handleFetchException(
390411 }
391412
392413 // Visible for testing.
393- LinkedHashMap <TopicIdPartition , LogReadResult > combineLogReadResponse (LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > topicPartitionData ,
394- LinkedHashMap <TopicIdPartition , LogReadResult > existingFetchedData ) {
395- LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > missingLogReadTopicPartitions = new LinkedHashMap <>();
396- topicPartitionData .forEach ((topicIdPartition , partitionData ) -> {
414+ LinkedHashMap <TopicIdPartition , LogReadResult > combineLogReadResponse (LinkedHashMap <TopicIdPartition , Long > topicPartitionData ,
415+ LinkedHashMap <TopicIdPartition , LogReadResult > existingFetchedData ) {
416+ LinkedHashMap <TopicIdPartition , Long > missingLogReadTopicPartitions = new LinkedHashMap <>();
417+ topicPartitionData .forEach ((topicIdPartition , fetchOffset ) -> {
397418 if (!existingFetchedData .containsKey (topicIdPartition )) {
398- missingLogReadTopicPartitions .put (topicIdPartition , partitionData );
419+ missingLogReadTopicPartitions .put (topicIdPartition , fetchOffset );
399420 }
400421 });
401422 if (missingLogReadTopicPartitions .isEmpty ()) {
402423 return existingFetchedData ;
403424 }
404- LinkedHashMap <TopicIdPartition , LogReadResult > missingTopicPartitionsLogReadResponse = readFromLog (missingLogReadTopicPartitions );
425+
426+ LinkedHashMap <TopicIdPartition , LogReadResult > missingTopicPartitionsLogReadResponse = readFromLog (
427+ missingLogReadTopicPartitions ,
428+ partitionMaxBytesStrategy .maxBytes (shareFetch .fetchParams ().maxBytes , missingLogReadTopicPartitions .keySet (), topicPartitionData .size ()));
405429 missingTopicPartitionsLogReadResponse .putAll (existingFetchedData );
406430 return missingTopicPartitionsLogReadResponse ;
407431 }
0 commit comments