Skip to content

Commit c205e69

Browse files
authored
KAFKA-19020 [1/n]: Handle strict max fetch records in share fetch (#20246)
JIRA: [https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-19020](https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-19020) KIP : [here](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1206:+Strict+max+fetch+records+in+share+fetch) The `maxFetchRecords` parameter in shared fetch operations currently acts as a soft limit, meaning the actual number of records returned to the client may exceed this threshold under certain conditions. This PR introduces an `AcquireMode` in the `ShareFetchRequest` to offer clients a more flexible method for acquiring records. By setting this value to strict, consumers can ensure that the actual number of records received will never exceed the limit specified by the max.poll.records configuration. Reviewers: Apoorv Mittal <[email protected]>, Lan Ding <[email protected]>, Abhinav Dixit <[email protected]>, Andrew Schofield <[email protected]>
1 parent baec231 commit c205e69

File tree

31 files changed

+1571
-233
lines changed

31 files changed

+1571
-233
lines changed

checkstyle/import-control.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@
183183
<allow pkg="org.apache.kafka.server.authorizer" />
184184
<!-- for IncrementalAlterConfigsRequest Builder -->
185185
<allow pkg="org.apache.kafka.clients.admin" />
186+
<allow pkg="org.apache.kafka.clients.consumer" />
186187
<!-- for testing -->
187188
<!-- for testing -->
188189
<allow pkg="io.opentelemetry.proto"/>

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ public class ShareConsumerTest {
140140
private static final String VALUE = "application/octet-stream";
141141
private static final String EXPLICIT = "explicit";
142142
private static final String IMPLICIT = "implicit";
143+
private static final String RECORD_LIMIT = "record_limit";
144+
private static final String BATCH_OPTIMIZED = "batch_optimized";
143145

144146
public ShareConsumerTest(ClusterInstance cluster) {
145147
this.cluster = cluster;
@@ -2756,6 +2758,154 @@ record = records.iterator().next();
27562758
verifyShareGroupStateTopicRecordsProduced();
27572759
}
27582760

2761+
@ClusterTest
2762+
public void testPollInBatchOptimizedMode() {
2763+
alterShareAutoOffsetReset("group1", "earliest");
2764+
try (Producer<byte[], byte[]> producer = createProducer();
2765+
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
2766+
"group1",
2767+
Map.of(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5, ConsumerConfig.SHARE_ACQUIRE_MODE_CONFIG, BATCH_OPTIMIZED))
2768+
) {
2769+
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
2770+
for (int i = 0; i < 10; i++) {
2771+
producer.send(record);
2772+
}
2773+
producer.flush();
2774+
shareConsumer.subscribe(List.of(tp.topic()));
2775+
2776+
// although max.poll.records is set to 5, in batch optimized mode we will still get all 10 records.
2777+
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 10);
2778+
assertEquals(10, records.count());
2779+
verifyShareGroupStateTopicRecordsProduced();
2780+
}
2781+
}
2782+
2783+
@ClusterTest
2784+
public void testPollInRecordLimitMode() {
2785+
alterShareAutoOffsetReset("group1", "earliest");
2786+
try (Producer<byte[], byte[]> producer = createProducer();
2787+
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
2788+
"group1",
2789+
Map.of(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5, ConsumerConfig.SHARE_ACQUIRE_MODE_CONFIG, RECORD_LIMIT))
2790+
) {
2791+
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
2792+
for (int i = 0; i < 10; i++) {
2793+
producer.send(record);
2794+
}
2795+
producer.flush();
2796+
shareConsumer.subscribe(List.of(tp.topic()));
2797+
2798+
// In record limit mode we will get only up to max.poll.records number of records.
2799+
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 5);
2800+
assertEquals(5, records.count());
2801+
verifyShareGroupStateTopicRecordsProduced();
2802+
}
2803+
}
2804+
2805+
@ClusterTest
2806+
public void testPollAndExplicitAcknowledgeSingleMessageInRecordLimitMode() {
2807+
alterShareAutoOffsetReset("group1", "earliest");
2808+
try (Producer<byte[], byte[]> producer = createProducer();
2809+
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
2810+
"group1",
2811+
Map.of(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
2812+
ConsumerConfig.SHARE_ACQUIRE_MODE_CONFIG, RECORD_LIMIT,
2813+
ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))
2814+
) {
2815+
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
2816+
for (int i = 0; i < 10; i++) {
2817+
producer.send(record);
2818+
}
2819+
producer.flush();
2820+
shareConsumer.subscribe(List.of(tp.topic()));
2821+
2822+
for (int i = 0; i < 10; i++) {
2823+
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
2824+
assertEquals(1, records.count());
2825+
for (ConsumerRecord<byte[], byte[]> rec : records) {
2826+
shareConsumer.acknowledge(rec, AcknowledgeType.ACCEPT);
2827+
}
2828+
shareConsumer.commitSync();
2829+
}
2830+
verifyShareGroupStateTopicRecordsProduced();
2831+
}
2832+
}
2833+
2834+
@ClusterTest
2835+
public void testExplicitAcknowledgeSuccessInRecordLimitMode() {
2836+
alterShareAutoOffsetReset("group1", "earliest");
2837+
try (Producer<byte[], byte[]> producer = createProducer();
2838+
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
2839+
"group1",
2840+
Map.of(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10,
2841+
ConsumerConfig.SHARE_ACQUIRE_MODE_CONFIG, RECORD_LIMIT,
2842+
ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))
2843+
) {
2844+
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
2845+
for (int i = 0; i < 15; i++) {
2846+
producer.send(record);
2847+
}
2848+
producer.flush();
2849+
shareConsumer.subscribe(List.of(tp.topic()));
2850+
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 10);
2851+
assertEquals(10, records.count());
2852+
for (ConsumerRecord<byte[], byte[]> rec : records) {
2853+
shareConsumer.acknowledge(rec, AcknowledgeType.ACCEPT);
2854+
}
2855+
shareConsumer.commitSync();
2856+
2857+
records = waitedPoll(shareConsumer, 2500L, 5);
2858+
assertEquals(5, records.count());
2859+
verifyShareGroupStateTopicRecordsProduced();
2860+
}
2861+
}
2862+
2863+
@ClusterTest
2864+
public void testExplicitAcknowledgeReleaseAcceptInRecordLimitMode() {
2865+
alterShareAutoOffsetReset("group1", "earliest");
2866+
try (Producer<byte[], byte[]> producer = createProducer();
2867+
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
2868+
"group1",
2869+
Map.of(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10,
2870+
ConsumerConfig.SHARE_ACQUIRE_MODE_CONFIG, RECORD_LIMIT,
2871+
ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))
2872+
) {
2873+
for (int i = 0; i < 20; i++) {
2874+
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp2.topic(), tp2.partition(), null, "key".getBytes(), ("Message " + i).getBytes());
2875+
producer.send(record);
2876+
}
2877+
producer.flush();
2878+
shareConsumer.subscribe(List.of(tp2.topic()));
2879+
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 10);
2880+
assertEquals(10, records.count());
2881+
2882+
int count = 0;
2883+
Map<TopicIdPartition, Optional<KafkaException>> result;
2884+
for (ConsumerRecord<byte[], byte[]> record : records) {
2885+
if (count % 2 == 0) {
2886+
shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
2887+
} else {
2888+
shareConsumer.acknowledge(record, AcknowledgeType.RELEASE);
2889+
}
2890+
result = shareConsumer.commitSync();
2891+
assertEquals(1, result.size());
2892+
count++;
2893+
}
2894+
2895+
// Poll again to get 10 records.
2896+
records = waitedPoll(shareConsumer, 2500L, 10);
2897+
assertEquals(10, records.count());
2898+
for (ConsumerRecord<byte[], byte[]> record : records) {
2899+
shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT);
2900+
}
2901+
2902+
// Get the rest of all 5 records.
2903+
records = waitedPoll(shareConsumer, 2500L, 5);
2904+
assertEquals(5, records.count());
2905+
verifyShareGroupStateTopicRecordsProduced();
2906+
}
2907+
}
2908+
27592909
/**
27602910
* Util class to encapsulate state for a consumer/producer
27612911
* being executed by an {@link ExecutorService}.

clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -380,14 +380,25 @@ public class ConsumerConfig extends AbstractConfig {
380380
" If set to <code>explicit</code>, the acknowledgement mode of the consumer is explicit and it must use" +
381381
" <code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to acknowledge delivery of records.";
382382

383+
/**
384+
* <code>share.acquire.mode</code>
385+
*/
386+
public static final String SHARE_ACQUIRE_MODE_CONFIG = "share.acquire.mode";
387+
private static final String SHARE_ACQUIRE_MODE_DOC = "Controls the acquire mode for a share consumer." +
388+
" If set to <code>record_limit</code>, the number of records returned in each poll() will not exceed the value of <code>max.poll.records</code>." +
389+
" If set to <code>batch_optimized</code>, the number of records returned in each poll() call may exceed <code>max.poll.records</code>" +
390+
" to align with batch boundaries for optimization.";
391+
public static final String DEFAULT_SHARE_ACQUIRE_MODE = ShareAcquireMode.BATCH_OPTIMIZED.name();
392+
383393
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
384394

385395
/**
386396
* A list of configuration keys not supported for CLASSIC protocol.
387397
*/
388398
private static final List<String> CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS = List.of(
389399
GROUP_REMOTE_ASSIGNOR_CONFIG,
390-
SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
400+
SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
401+
SHARE_ACQUIRE_MODE_CONFIG
391402
);
392403

393404
/**
@@ -397,7 +408,8 @@ public class ConsumerConfig extends AbstractConfig {
397408
PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
398409
HEARTBEAT_INTERVAL_MS_CONFIG,
399410
SESSION_TIMEOUT_MS_CONFIG,
400-
SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
411+
SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
412+
SHARE_ACQUIRE_MODE_CONFIG
401413
);
402414

403415
static {
@@ -683,6 +695,12 @@ public class ConsumerConfig extends AbstractConfig {
683695
new ShareAcknowledgementMode.Validator(),
684696
Importance.MEDIUM,
685697
ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_DOC)
698+
.define(ConsumerConfig.SHARE_ACQUIRE_MODE_CONFIG,
699+
Type.STRING,
700+
DEFAULT_SHARE_ACQUIRE_MODE,
701+
new ShareAcquireMode.Validator(),
702+
Importance.MEDIUM,
703+
ConsumerConfig.SHARE_ACQUIRE_MODE_DOC)
686704
.define(CONFIG_PROVIDERS_CONFIG,
687705
ConfigDef.Type.LIST,
688706
List.of(),
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.consumer;
18+
19+
import org.apache.kafka.common.config.ConfigDef;
20+
import org.apache.kafka.common.config.ConfigException;
21+
22+
import java.util.Arrays;
23+
import java.util.Locale;
24+
import java.util.stream.Collectors;
25+
26+
public enum ShareAcquireMode {
27+
BATCH_OPTIMIZED("batch_optimized", (byte) 0),
28+
RECORD_LIMIT("record_limit", (byte) 1);
29+
30+
public final String name;
31+
32+
public final byte id;
33+
34+
ShareAcquireMode(final String name, final byte id) {
35+
this.name = name;
36+
this.id = id;
37+
}
38+
39+
/**
40+
* Case-insensitive acquire mode lookup by string name.
41+
*/
42+
public static ShareAcquireMode of(final String name) {
43+
try {
44+
return ShareAcquireMode.valueOf(name.toUpperCase(Locale.ROOT));
45+
} catch (IllegalArgumentException e) {
46+
throw new IllegalArgumentException("Invalid value `" + name + "` for configuration " +
47+
name + ". The value must either be 'batch_optimized' or 'record_limit'.");
48+
}
49+
}
50+
51+
public byte id() {
52+
return id;
53+
}
54+
55+
public static ShareAcquireMode forId(byte id) {
56+
switch (id) {
57+
case 0:
58+
return BATCH_OPTIMIZED;
59+
case 1:
60+
return RECORD_LIMIT;
61+
default:
62+
throw new IllegalArgumentException("Unknown share acquire mode id: " + id);
63+
}
64+
}
65+
66+
@Override
67+
public String toString() {
68+
return "ShareAcquireMode(" + name + " (" + id + "))";
69+
}
70+
71+
public static class Validator implements ConfigDef.Validator {
72+
@Override
73+
public void ensureValid(String name, Object value) {
74+
String acquireMode = (String) value;
75+
try {
76+
of(acquireMode);
77+
} catch (Exception e) {
78+
throw new ConfigException(name, value, "Invalid value `" + acquireMode + "` for configuration " +
79+
name + ". The value must either be 'batch_optimized' or 'record_limit'.");
80+
}
81+
}
82+
83+
@Override
84+
public String toString() {
85+
String values = Arrays.stream(ShareAcquireMode.values())
86+
.map(ShareAcquireMode::toString).collect(Collectors.joining(", "));
87+
return "[" + values + "]";
88+
}
89+
}
90+
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ public static Supplier<RequestManagers> supplier(final Time time,
332332
protected RequestManagers create() {
333333
long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
334334
long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
335-
FetchConfig fetchConfig = new FetchConfig(config);
335+
ShareFetchConfig shareFetchConfig = new ShareFetchConfig(config);
336336

337337
CoordinatorRequestManager coordinator = new CoordinatorRequestManager(
338338
logContext,
@@ -369,7 +369,7 @@ protected RequestManagers create() {
369369
groupRebalanceConfig.groupId,
370370
metadata,
371371
subscriptions,
372-
fetchConfig,
372+
shareFetchConfig,
373373
fetchBuffer,
374374
backgroundEventHandler,
375375
shareFetchMetricsManager,

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
8080
private final String groupId;
8181
private final ShareConsumerMetadata metadata;
8282
private final SubscriptionState subscriptions;
83-
private final FetchConfig fetchConfig;
83+
private final ShareFetchConfig shareFetchConfig;
8484
protected final ShareFetchBuffer shareFetchBuffer;
8585
private final BackgroundEventHandler backgroundEventHandler;
8686
private final Map<Integer, ShareSessionHandler> sessionHandlers;
@@ -105,7 +105,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
105105
final String groupId,
106106
final ShareConsumerMetadata metadata,
107107
final SubscriptionState subscriptions,
108-
final FetchConfig fetchConfig,
108+
final ShareFetchConfig shareFetchConfig,
109109
final ShareFetchBuffer shareFetchBuffer,
110110
final BackgroundEventHandler backgroundEventHandler,
111111
final ShareFetchMetricsManager metricsManager,
@@ -117,7 +117,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
117117
this.groupId = groupId;
118118
this.metadata = metadata;
119119
this.subscriptions = subscriptions;
120-
this.fetchConfig = fetchConfig;
120+
this.shareFetchConfig = shareFetchConfig;
121121
this.shareFetchBuffer = shareFetchBuffer;
122122
this.backgroundEventHandler = backgroundEventHandler;
123123
this.metricsManager = metricsManager;
@@ -244,7 +244,7 @@ public PollResult poll(long currentTimeMs) {
244244
ShareSessionHandler handler = entry.getValue();
245245

246246
log.trace("Building ShareFetch request to send to node {}", target.id());
247-
ShareFetchRequest.Builder requestBuilder = handler.newShareFetchBuilder(groupId, fetchConfig);
247+
ShareFetchRequest.Builder requestBuilder = handler.newShareFetchBuilder(groupId, shareFetchConfig);
248248

249249
nodesWithPendingRequests.add(target.id());
250250

@@ -1191,7 +1191,7 @@ UnsentRequest buildRequest() {
11911191
sessionHandler.addPartitionToFetch(entry.getKey(), entry.getValue());
11921192
}
11931193

1194-
ShareAcknowledgeRequest.Builder requestBuilder = sessionHandler.newShareAcknowledgeBuilder(groupId, fetchConfig);
1194+
ShareAcknowledgeRequest.Builder requestBuilder = sessionHandler.newShareAcknowledgeBuilder(groupId, shareFetchConfig);
11951195

11961196
isProcessed = false;
11971197
Node nodeToSend = metadata.fetch().nodeById(nodeId);

0 commit comments

Comments
 (0)