27
27
import org .apache .kafka .server .purgatory .DelayedOperation ;
28
28
import org .apache .kafka .server .share .SharePartitionKey ;
29
29
import org .apache .kafka .server .share .fetch .DelayedShareFetchGroupKey ;
30
- import org .apache .kafka .server .share .fetch .PartitionMaxBytesStrategy ;
31
30
import org .apache .kafka .server .share .fetch .ShareFetch ;
32
31
import org .apache .kafka .server .storage .log .FetchIsolation ;
33
32
import org .apache .kafka .server .storage .log .FetchPartitionData ;
@@ -61,35 +60,24 @@ public class DelayedShareFetch extends DelayedOperation {
61
60
private final ShareFetch shareFetch ;
62
61
private final ReplicaManager replicaManager ;
63
62
private final BiConsumer <SharePartitionKey , Throwable > exceptionHandler ;
64
- private final PartitionMaxBytesStrategy partitionMaxBytesStrategy ;
65
63
// The topic partitions that need to be completed for the share fetch request are given by sharePartitions.
66
64
// sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important.
67
65
private final LinkedHashMap <TopicIdPartition , SharePartition > sharePartitions ;
68
- private LinkedHashMap <TopicIdPartition , Long > partitionsAcquired ;
66
+ private LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > partitionsAcquired ;
69
67
private LinkedHashMap <TopicIdPartition , LogReadResult > partitionsAlreadyFetched ;
70
68
71
69
DelayedShareFetch (
72
70
ShareFetch shareFetch ,
73
71
ReplicaManager replicaManager ,
74
72
BiConsumer <SharePartitionKey , Throwable > exceptionHandler ,
75
73
LinkedHashMap <TopicIdPartition , SharePartition > sharePartitions ) {
76
- this (shareFetch , replicaManager , exceptionHandler , sharePartitions , PartitionMaxBytesStrategy .type (PartitionMaxBytesStrategy .StrategyType .UNIFORM ));
77
- }
78
-
79
- DelayedShareFetch (
80
- ShareFetch shareFetch ,
81
- ReplicaManager replicaManager ,
82
- BiConsumer <SharePartitionKey , Throwable > exceptionHandler ,
83
- LinkedHashMap <TopicIdPartition , SharePartition > sharePartitions ,
84
- PartitionMaxBytesStrategy partitionMaxBytesStrategy ) {
85
74
super (shareFetch .fetchParams ().maxWaitMs , Optional .empty ());
86
75
this .shareFetch = shareFetch ;
87
76
this .replicaManager = replicaManager ;
88
77
this .partitionsAcquired = new LinkedHashMap <>();
89
78
this .partitionsAlreadyFetched = new LinkedHashMap <>();
90
79
this .exceptionHandler = exceptionHandler ;
91
80
this .sharePartitions = sharePartitions ;
92
- this .partitionMaxBytesStrategy = partitionMaxBytesStrategy ;
93
81
}
94
82
95
83
@ Override
@@ -111,7 +99,7 @@ public void onComplete() {
111
99
partitionsAcquired .keySet ());
112
100
113
101
try {
114
- LinkedHashMap <TopicIdPartition , Long > topicPartitionData ;
102
+ LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > topicPartitionData ;
115
103
// tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch.
116
104
if (partitionsAcquired .isEmpty ())
117
105
topicPartitionData = acquirablePartitions ();
@@ -133,13 +121,11 @@ public void onComplete() {
133
121
}
134
122
}
135
123
136
- private void completeShareFetchRequest (LinkedHashMap <TopicIdPartition , Long > topicPartitionData ) {
124
+ private void completeShareFetchRequest (LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > topicPartitionData ) {
137
125
try {
138
126
LinkedHashMap <TopicIdPartition , LogReadResult > responseData ;
139
127
if (partitionsAlreadyFetched .isEmpty ())
140
- responseData = readFromLog (
141
- topicPartitionData ,
142
- partitionMaxBytesStrategy .maxBytes (shareFetch .fetchParams ().maxBytes , topicPartitionData .keySet (), topicPartitionData .size ()));
128
+ responseData = readFromLog (topicPartitionData );
143
129
else
144
130
// There shouldn't be a case when we have a partitionsAlreadyFetched value here and this variable is getting
145
131
// updated in a different tryComplete thread.
@@ -172,7 +158,7 @@ private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, Long> top
172
158
*/
173
159
@ Override
174
160
public boolean tryComplete () {
175
- LinkedHashMap <TopicIdPartition , Long > topicPartitionData = acquirablePartitions ();
161
+ LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > topicPartitionData = acquirablePartitions ();
176
162
177
163
try {
178
164
if (!topicPartitionData .isEmpty ()) {
@@ -181,7 +167,7 @@ public boolean tryComplete() {
181
167
// those topic partitions.
182
168
LinkedHashMap <TopicIdPartition , LogReadResult > replicaManagerReadResponse = maybeReadFromLog (topicPartitionData );
183
169
maybeUpdateFetchOffsetMetadata (topicPartitionData , replicaManagerReadResponse );
184
- if (anyPartitionHasLogReadError (replicaManagerReadResponse ) || isMinBytesSatisfied (topicPartitionData , partitionMaxBytesStrategy . maxBytes ( shareFetch . fetchParams (). maxBytes , topicPartitionData . keySet (), topicPartitionData . size ()) )) {
170
+ if (anyPartitionHasLogReadError (replicaManagerReadResponse ) || isMinBytesSatisfied (topicPartitionData )) {
185
171
partitionsAcquired = topicPartitionData ;
186
172
partitionsAlreadyFetched = replicaManagerReadResponse ;
187
173
boolean completedByMe = forceComplete ();
@@ -216,18 +202,28 @@ public boolean tryComplete() {
216
202
* Prepare fetch request structure for partitions in the share fetch request for which we can acquire records.
217
203
*/
218
204
// Visible for testing
219
- LinkedHashMap <TopicIdPartition , Long > acquirablePartitions () {
205
+ LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > acquirablePartitions () {
220
206
// Initialize the topic partitions for which the fetch should be attempted.
221
- LinkedHashMap <TopicIdPartition , Long > topicPartitionData = new LinkedHashMap <>();
207
+ LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > topicPartitionData = new LinkedHashMap <>();
222
208
223
209
sharePartitions .forEach ((topicIdPartition , sharePartition ) -> {
210
+ int partitionMaxBytes = shareFetch .partitionMaxBytes ().getOrDefault (topicIdPartition , 0 );
224
211
// Add the share partition to the list of partitions to be fetched only if we can
225
212
// acquire the fetch lock on it.
226
213
if (sharePartition .maybeAcquireFetchLock ()) {
227
214
try {
228
215
// If the share partition is already at capacity, we should not attempt to fetch.
229
216
if (sharePartition .canAcquireRecords ()) {
230
- topicPartitionData .put (topicIdPartition , sharePartition .nextFetchOffset ());
217
+ topicPartitionData .put (
218
+ topicIdPartition ,
219
+ new FetchRequest .PartitionData (
220
+ topicIdPartition .topicId (),
221
+ sharePartition .nextFetchOffset (),
222
+ 0 ,
223
+ partitionMaxBytes ,
224
+ Optional .empty ()
225
+ )
226
+ );
231
227
} else {
232
228
sharePartition .releaseFetchLock ();
233
229
log .trace ("Record lock partition limit exceeded for SharePartition {}, " +
@@ -243,28 +239,24 @@ LinkedHashMap<TopicIdPartition, Long> acquirablePartitions() {
243
239
return topicPartitionData ;
244
240
}
245
241
246
- private LinkedHashMap <TopicIdPartition , LogReadResult > maybeReadFromLog (LinkedHashMap <TopicIdPartition , Long > topicPartitionData ) {
247
- LinkedHashMap <TopicIdPartition , Long > partitionsNotMatchingFetchOffsetMetadata = new LinkedHashMap <>();
248
- topicPartitionData .forEach ((topicIdPartition , fetchOffset ) -> {
242
+ private LinkedHashMap <TopicIdPartition , LogReadResult > maybeReadFromLog (LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > topicPartitionData ) {
243
+ LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > partitionsNotMatchingFetchOffsetMetadata = new LinkedHashMap <>();
244
+ topicPartitionData .forEach ((topicIdPartition , partitionData ) -> {
249
245
SharePartition sharePartition = sharePartitions .get (topicIdPartition );
250
- if (sharePartition .fetchOffsetMetadata (fetchOffset ).isEmpty ()) {
251
- partitionsNotMatchingFetchOffsetMetadata .put (topicIdPartition , fetchOffset );
246
+ if (sharePartition .fetchOffsetMetadata (partitionData . fetchOffset ).isEmpty ()) {
247
+ partitionsNotMatchingFetchOffsetMetadata .put (topicIdPartition , partitionData );
252
248
}
253
249
});
254
250
if (partitionsNotMatchingFetchOffsetMetadata .isEmpty ()) {
255
251
return new LinkedHashMap <>();
256
252
}
257
253
// We fetch data from replica manager corresponding to the topic partitions that have missing fetch offset metadata.
258
- // Although we are fetching partition max bytes for partitionsNotMatchingFetchOffsetMetadata,
259
- // we will take acquired partitions size = topicPartitionData.size() because we do not want to let the
260
- // leftover partitions to starve which will be fetched later.
261
- return readFromLog (
262
- partitionsNotMatchingFetchOffsetMetadata ,
263
- partitionMaxBytesStrategy .maxBytes (shareFetch .fetchParams ().maxBytes , partitionsNotMatchingFetchOffsetMetadata .keySet (), topicPartitionData .size ()));
254
+ return readFromLog (partitionsNotMatchingFetchOffsetMetadata );
264
255
}
265
256
266
- private void maybeUpdateFetchOffsetMetadata (LinkedHashMap <TopicIdPartition , Long > topicPartitionData ,
267
- LinkedHashMap <TopicIdPartition , LogReadResult > replicaManagerReadResponseData ) {
257
+ private void maybeUpdateFetchOffsetMetadata (
258
+ LinkedHashMap <TopicIdPartition , FetchRequest .PartitionData > topicPartitionData ,
259
+ LinkedHashMap <TopicIdPartition , LogReadResult > replicaManagerReadResponseData ) {
268
260
for (Map .Entry <TopicIdPartition , LogReadResult > entry : replicaManagerReadResponseData .entrySet ()) {
269
261
TopicIdPartition topicIdPartition = entry .getKey ();
270
262
SharePartition sharePartition = sharePartitions .get (topicIdPartition );
@@ -275,18 +267,17 @@ private void maybeUpdateFetchOffsetMetadata(LinkedHashMap<TopicIdPartition, Long
275
267
continue ;
276
268
}
277
269
sharePartition .updateFetchOffsetMetadata (
278
- topicPartitionData .get (topicIdPartition ),
270
+ topicPartitionData .get (topicIdPartition ). fetchOffset ,
279
271
replicaManagerLogReadResult .info ().fetchOffsetMetadata );
280
272
}
281
273
}
282
274
283
275
// minByes estimation currently assumes the common case where all fetched data is acquirable.
284
- private boolean isMinBytesSatisfied (LinkedHashMap <TopicIdPartition , Long > topicPartitionData ,
285
- LinkedHashMap <TopicIdPartition , Integer > partitionMaxBytes ) {
276
+ private boolean isMinBytesSatisfied (LinkedHashMap <TopicIdPartition , FetchRequest .PartitionData > topicPartitionData ) {
286
277
long accumulatedSize = 0 ;
287
- for (Map .Entry <TopicIdPartition , Long > entry : topicPartitionData .entrySet ()) {
278
+ for (Map .Entry <TopicIdPartition , FetchRequest . PartitionData > entry : topicPartitionData .entrySet ()) {
288
279
TopicIdPartition topicIdPartition = entry .getKey ();
289
- long fetchOffset = entry .getValue ();
280
+ FetchRequest . PartitionData partitionData = entry .getValue ();
290
281
291
282
LogOffsetMetadata endOffsetMetadata ;
292
283
try {
@@ -303,7 +294,7 @@ private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, Long> topicP
303
294
304
295
SharePartition sharePartition = sharePartitions .get (topicIdPartition );
305
296
306
- Optional <LogOffsetMetadata > optionalFetchOffsetMetadata = sharePartition .fetchOffsetMetadata (fetchOffset );
297
+ Optional <LogOffsetMetadata > optionalFetchOffsetMetadata = sharePartition .fetchOffsetMetadata (partitionData . fetchOffset );
307
298
if (optionalFetchOffsetMetadata .isEmpty () || optionalFetchOffsetMetadata .get () == LogOffsetMetadata .UNKNOWN_OFFSET_METADATA )
308
299
continue ;
309
300
LogOffsetMetadata fetchOffsetMetadata = optionalFetchOffsetMetadata .get ();
@@ -321,7 +312,7 @@ private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, Long> topicP
321
312
return true ;
322
313
} else if (fetchOffsetMetadata .onSameSegment (endOffsetMetadata )) {
323
314
// we take the partition fetch size as upper bound when accumulating the bytes.
324
- long bytesAvailable = Math .min (endOffsetMetadata .positionDiff (fetchOffsetMetadata ), partitionMaxBytes . get ( topicIdPartition ) );
315
+ long bytesAvailable = Math .min (endOffsetMetadata .positionDiff (fetchOffsetMetadata ), partitionData . maxBytes );
325
316
accumulatedSize += bytesAvailable ;
326
317
}
327
318
}
@@ -344,25 +335,13 @@ else if (isolationType == FetchIsolation.HIGH_WATERMARK)
344
335
345
336
}
346
337
347
- private LinkedHashMap <TopicIdPartition , LogReadResult > readFromLog (LinkedHashMap <TopicIdPartition , Long > topicPartitionFetchOffsets ,
348
- LinkedHashMap <TopicIdPartition , Integer > partitionMaxBytes ) {
338
+ private LinkedHashMap <TopicIdPartition , LogReadResult > readFromLog (LinkedHashMap <TopicIdPartition , FetchRequest .PartitionData > topicPartitionData ) {
349
339
// Filter if there already exists any erroneous topic partition.
350
- Set <TopicIdPartition > partitionsToFetch = shareFetch .filterErroneousTopicPartitions (topicPartitionFetchOffsets .keySet ());
340
+ Set <TopicIdPartition > partitionsToFetch = shareFetch .filterErroneousTopicPartitions (topicPartitionData .keySet ());
351
341
if (partitionsToFetch .isEmpty ()) {
352
342
return new LinkedHashMap <>();
353
343
}
354
344
355
- LinkedHashMap <TopicIdPartition , FetchRequest .PartitionData > topicPartitionData = new LinkedHashMap <>();
356
-
357
- topicPartitionFetchOffsets .forEach ((topicIdPartition , fetchOffset ) -> topicPartitionData .put (topicIdPartition ,
358
- new FetchRequest .PartitionData (
359
- topicIdPartition .topicId (),
360
- fetchOffset ,
361
- 0 ,
362
- partitionMaxBytes .get (topicIdPartition ),
363
- Optional .empty ())
364
- ));
365
-
366
345
Seq <Tuple2 <TopicIdPartition , LogReadResult >> responseLogResult = replicaManager .readFromLog (
367
346
shareFetch .fetchParams (),
368
347
CollectionConverters .asScala (
@@ -411,21 +390,18 @@ private void handleFetchException(
411
390
}
412
391
413
392
// Visible for testing.
414
- LinkedHashMap <TopicIdPartition , LogReadResult > combineLogReadResponse (LinkedHashMap <TopicIdPartition , Long > topicPartitionData ,
415
- LinkedHashMap <TopicIdPartition , LogReadResult > existingFetchedData ) {
416
- LinkedHashMap <TopicIdPartition , Long > missingLogReadTopicPartitions = new LinkedHashMap <>();
417
- topicPartitionData .forEach ((topicIdPartition , fetchOffset ) -> {
393
+ LinkedHashMap <TopicIdPartition , LogReadResult > combineLogReadResponse (LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > topicPartitionData ,
394
+ LinkedHashMap <TopicIdPartition , LogReadResult > existingFetchedData ) {
395
+ LinkedHashMap <TopicIdPartition , FetchRequest . PartitionData > missingLogReadTopicPartitions = new LinkedHashMap <>();
396
+ topicPartitionData .forEach ((topicIdPartition , partitionData ) -> {
418
397
if (!existingFetchedData .containsKey (topicIdPartition )) {
419
- missingLogReadTopicPartitions .put (topicIdPartition , fetchOffset );
398
+ missingLogReadTopicPartitions .put (topicIdPartition , partitionData );
420
399
}
421
400
});
422
401
if (missingLogReadTopicPartitions .isEmpty ()) {
423
402
return existingFetchedData ;
424
403
}
425
-
426
- LinkedHashMap <TopicIdPartition , LogReadResult > missingTopicPartitionsLogReadResponse = readFromLog (
427
- missingLogReadTopicPartitions ,
428
- partitionMaxBytesStrategy .maxBytes (shareFetch .fetchParams ().maxBytes , missingLogReadTopicPartitions .keySet (), topicPartitionData .size ()));
404
+ LinkedHashMap <TopicIdPartition , LogReadResult > missingTopicPartitionsLogReadResponse = readFromLog (missingLogReadTopicPartitions );
429
405
missingTopicPartitionsLogReadResponse .putAll (existingFetchedData );
430
406
return missingTopicPartitionsLogReadResponse ;
431
407
}
0 commit comments