Skip to content

Commit c5ae7b4

Browse files
authored
KAFKA-18794: Add check to send one ShareFetchEvent per poll in ShareConsumer. (#20794)
*What* - Currently in `ShareConsumerImpl`, there is a chance we could be sending multiple `ShareFetchEvents` for a single `poll()`. - This could happen when the `ShareFetchBuffer` is empty in the first wait and the `pollTimer` has not completed yet, then we would send multiple events. - Usually the `ShareFetchBuffer` would wait for a time until the pollTimeout, so we would not send multiple events, but the buffer wait time is also controlled by `applicationEventHandler.maximumTimeToWait()` which can return lower values (even 0) in some cases (especially during startup of heartbeat request manager). - If this happens, we will see multiple events sent, and this could even lead to multiple fetching from the broker (sort of a fetch and a pre-fetch) if the response for the previous `ShareFetchEvent` arrives before the next `ShareFetchEvent` was processed. - To avoid this, we have a check now which only sends one `ShareFetchEvent` per poll. - This was the reason a couple of tests were flaky in KafkaShareConsumerTest - https://issues.apache.org/jira/browse/KAFKA-18794. The PR fixes the flakiness, now the tests reliably pass locally. Reviewers: Andrew Schofield <[email protected]>
1 parent cc869d9 commit c5ae7b4

File tree

7 files changed

+82
-91
lines changed

7 files changed

+82
-91
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -283,16 +283,14 @@ private boolean maybeAddAcknowledgements(ShareSessionHandler handler,
283283
}
284284
}
285285

286-
public void fetch(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap,
287-
Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements) {
286+
public void fetch(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
288287
if (!fetchMoreRecords) {
289288
log.debug("Fetch more data");
290289
fetchMoreRecords = true;
291290
}
292291

293-
// Process both acknowledgement maps and sends them in the next ShareFetch.
292+
// Store the acknowledgements and send them in the next ShareFetch.
294293
processAcknowledgementsMap(acknowledgementsMap);
295-
processAcknowledgementsMap(controlRecordAcknowledgements);
296294
}
297295

298296
private void processAcknowledgementsMap(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) {
203203
// and is used to prevent multithreaded access
204204
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
205205
private final AtomicInteger refCount = new AtomicInteger(0);
206+
private boolean shouldSendShareFetchEvent = false;
206207

207208
ShareConsumerImpl(final ConsumerConfig config,
208209
final Deserializer<K> keyDeserializer,
@@ -581,6 +582,8 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
581582
throw new IllegalStateException("Consumer is not subscribed to any topics.");
582583
}
583584

585+
shouldSendShareFetchEvent = true;
586+
584587
do {
585588
// Make sure the network thread can tell the application is actively polling
586589
applicationEventHandler.add(new PollEvent(timer.currentTimeMs()));
@@ -654,28 +657,29 @@ private ShareFetch<K, V> collect(Map<TopicIdPartition, NodeAcknowledgements> ack
654657
if (currentFetch.isEmpty()) {
655658
final ShareFetch<K, V> fetch = fetchCollector.collect(fetchBuffer);
656659
if (fetch.isEmpty()) {
657-
// Check for any acknowledgements which could have come from control records (GAP) and include them.
658-
applicationEventHandler.add(new ShareFetchEvent(acknowledgementsMap, fetch.takeAcknowledgedRecords()));
660+
Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements = fetch.takeAcknowledgedRecords();
659661

660-
// Notify the network thread to wake up and start the next round of fetching
661-
applicationEventHandler.wakeupNetworkThread();
662+
if (!controlRecordAcknowledgements.isEmpty()) {
663+
// Asynchronously commit any waiting acknowledgements from control records.
664+
sendShareAcknowledgeAsyncEvent(controlRecordAcknowledgements);
665+
}
666+
// We only send one ShareFetchEvent per poll call.
667+
if (shouldSendShareFetchEvent) {
668+
// Check for any acknowledgements which could have come from control records (GAP) and include them.
669+
applicationEventHandler.add(new ShareFetchEvent(acknowledgementsMap));
670+
shouldSendShareFetchEvent = false;
671+
// Notify the network thread to wake up and start the next round of fetching
672+
applicationEventHandler.wakeupNetworkThread();
673+
}
662674
} else if (!acknowledgementsMap.isEmpty()) {
663675
// Asynchronously commit any waiting acknowledgements
664-
Timer timer = time.timer(defaultApiTimeoutMs);
665-
applicationEventHandler.add(new ShareAcknowledgeAsyncEvent(acknowledgementsMap, calculateDeadlineMs(timer)));
666-
667-
// Notify the network thread to wake up and start the next round of fetching
668-
applicationEventHandler.wakeupNetworkThread();
676+
sendShareAcknowledgeAsyncEvent(acknowledgementsMap);
669677
}
670678
return fetch;
671679
} else {
672680
if (!acknowledgementsMap.isEmpty()) {
673681
// Asynchronously commit any waiting acknowledgements
674-
Timer timer = time.timer(defaultApiTimeoutMs);
675-
applicationEventHandler.add(new ShareAcknowledgeAsyncEvent(acknowledgementsMap, calculateDeadlineMs(timer)));
676-
677-
// Notify the network thread to wake up and start the next round of fetching
678-
applicationEventHandler.wakeupNetworkThread();
682+
sendShareAcknowledgeAsyncEvent(acknowledgementsMap);
679683
}
680684
if (acknowledgementMode == ShareAcknowledgementMode.EXPLICIT) {
681685
// We cannot leave unacknowledged records in EXPLICIT acknowledgement mode, so we throw an exception to the application.
@@ -685,6 +689,14 @@ private ShareFetch<K, V> collect(Map<TopicIdPartition, NodeAcknowledgements> ack
685689
}
686690
}
687691

692+
private void sendShareAcknowledgeAsyncEvent(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
693+
Timer timer = time.timer(defaultApiTimeoutMs);
694+
applicationEventHandler.add(new ShareAcknowledgeAsyncEvent(acknowledgementsMap, calculateDeadlineMs(timer)));
695+
696+
// Notify the network thread to wake up and start the next round of fetching
697+
applicationEventHandler.wakeupNetworkThread();
698+
}
699+
688700
/**
689701
* {@inheritDoc}
690702
*/

clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ private void process(@SuppressWarnings("unused") final StopFindCoordinatorOnClos
514514
* Process event that tells the share consume request manager to fetch more records.
515515
*/
516516
private void process(final ShareFetchEvent event) {
517-
requestManagers.shareConsumeRequestManager.ifPresent(scrm -> scrm.fetch(event.acknowledgementsMap(), event.controlRecordAcknowledgements()));
517+
requestManagers.shareConsumeRequestManager.ifPresent(scrm -> scrm.fetch(event.acknowledgementsMap()));
518518
}
519519

520520
/**

clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareFetchEvent.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,15 @@ public class ShareFetchEvent extends ApplicationEvent {
2525

2626
private final Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap;
2727

28-
private final Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements;
29-
30-
public ShareFetchEvent(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap,
31-
Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements) {
28+
public ShareFetchEvent(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
3229
super(Type.SHARE_FETCH);
3330
this.acknowledgementsMap = acknowledgementsMap;
34-
this.controlRecordAcknowledgements = controlRecordAcknowledgements;
3531
}
3632

3733
public Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap() {
3834
return acknowledgementsMap;
3935
}
4036

41-
public Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements() {
42-
return controlRecordAcknowledgements;
43-
}
44-
4537
@Override
4638
protected String toStringBase() {
4739
return super.toStringBase() + ", acknowledgementsMap=" + acknowledgementsMap;

clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.apache.kafka.common.utils.MockTime;
5050
import org.apache.kafka.common.utils.Time;
5151

52-
import org.junit.jupiter.api.Disabled;
5352
import org.junit.jupiter.api.Test;
5453
import org.junit.jupiter.api.Timeout;
5554

@@ -141,9 +140,6 @@ public void testVerifyHeartbeats() throws InterruptedException {
141140
}
142141
}
143142

144-
// This test is proving sufficiently flaky that it has been disabled pending investigation
145-
@Disabled
146-
// @Flaky("KAFKA-18488")
147143
@Test
148144
public void testVerifyFetchAndCommitSyncImplicit() {
149145
ShareConsumerMetadata metadata = new ShareConsumerMetadata(0, 0, Long.MAX_VALUE, false,
@@ -218,9 +214,6 @@ public void testVerifyFetchAndCommitSyncImplicit() {
218214
}
219215
}
220216

221-
// This test is proving sufficiently flaky that it has been disabled pending investigation
222-
@Disabled
223-
//@Flaky("KAFKA-18794")
224217
@Test
225218
public void testVerifyFetchAndCloseImplicit() {
226219
ShareConsumerMetadata metadata = new ShareConsumerMetadata(0, 0, Long.MAX_VALUE, false,

0 commit comments

Comments
 (0)