Skip to content

Commit dfcf22c

Browse files
authoredJul 9, 2024
AKCORE-243: Tidy up sending of acknowledgements when consumer is closed (#1412)
* AKCORE-243: Tidy up sending of acknowledgements when consumer is closed

File tree

14 files changed

+491
-396
lines changed

14 files changed

+491
-396
lines changed
 

‎clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -293,12 +293,12 @@ private void closeInternal(final Duration timeout) {
293293
* Check the unsent queue one last time and poll until all requests are sent or the timer runs out.
294294
*/
295295
private void sendUnsentRequests(final Timer timer) {
296-
if (networkClientDelegate.unsentRequests().isEmpty())
296+
if (!networkClientDelegate.hasAnyPendingRequests())
297297
return;
298298
do {
299299
networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs());
300300
timer.update();
301-
} while (timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty());
301+
} while (timer.notExpired() && networkClientDelegate.hasAnyPendingRequests());
302302
}
303303

304304
void cleanup() {

‎clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java

+7
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,13 @@ public void poll(final long timeoutMs, final long currentTimeMs) {
131131
checkDisconnects(currentTimeMs);
132132
}
133133

134+
/**
135+
* Return true if there is at least one in-flight request or unsent request.
136+
*/
137+
public boolean hasAnyPendingRequests() {
138+
return client.hasInFlightRequests() || !unsentRequests.isEmpty();
139+
}
140+
134141
/**
135142
* Tries to send the requests in the unsentRequest queue. If the request doesn't have an assigned node, it will
136143
* find the leastLoadedOne, and will be retried in the next {@code poll()}. If the request is expired, a

‎clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java

+224-145
Large diffs are not rendered by default.

‎clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java

+35-16
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@
4040
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
4141
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
4242
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent;
43-
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
43+
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
4444
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeSyncEvent;
45+
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
4546
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
46-
import org.apache.kafka.clients.consumer.internals.events.ShareLeaveOnCloseEvent;
47-
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeApplicationEvent;
48-
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeApplicationEvent;
47+
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
48+
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
4949
import org.apache.kafka.clients.consumer.internals.metrics.KafkaShareConsumerMetrics;
5050
import org.apache.kafka.common.KafkaException;
5151
import org.apache.kafka.common.Metric;
@@ -527,7 +527,7 @@ public void subscribe(final Collection<String> topics) {
527527

528528
// Trigger subscribe event to effectively join the group if not already part of it,
529529
// or just send the new subscription to the broker.
530-
applicationEventHandler.add(new ShareSubscriptionChangeApplicationEvent());
530+
applicationEventHandler.add(new ShareSubscriptionChangeEvent());
531531
}
532532
} finally {
533533
release();
@@ -541,7 +541,8 @@ public void subscribe(final Collection<String> topics) {
541541
public void unsubscribe() {
542542
acquireAndEnsureOpen();
543543
try {
544-
ShareUnsubscribeApplicationEvent unsubscribeApplicationEvent = new ShareUnsubscribeApplicationEvent();
544+
Timer timer = time.timer(Long.MAX_VALUE);
545+
ShareUnsubscribeEvent unsubscribeApplicationEvent = new ShareUnsubscribeEvent(calculateDeadlineMs(timer));
545546
applicationEventHandler.add(unsubscribeApplicationEvent);
546547
log.info("Unsubscribing all topics");
547548

@@ -836,14 +837,19 @@ private void close(final Duration timeout, final boolean swallowException) {
836837
closeTimer.update();
837838

838839
// Prepare shutting down the network thread
839-
prepareShutdown(closeTimer, firstException);
840-
closeTimer.update();
840+
swallow(log, Level.ERROR, "Failed to release assignment before closing consumer",
841+
() -> sendAcknowledgementsAndLeaveGroup(closeTimer, firstException), firstException);
842+
swallow(log, Level.ERROR, "Failed invoking acknowledgement commit callback",
843+
this::handleCompletedAcknowledgements, firstException);
841844
if (applicationEventHandler != null)
842845
closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException);
843-
swallow(log, Level.ERROR, "Failed invoking acknowledgement commit callback.", this::handleCompletedAcknowledgements,
844-
firstException);
845846
closeTimer.update();
846847

848+
// close() can be called from inside one of the constructors. In that case, it's possible that neither
849+
// the reaper nor the background event queue were constructed, so check them first to avoid NPE.
850+
if (backgroundEventReaper != null && backgroundEventQueue != null)
851+
backgroundEventReaper.reap(backgroundEventQueue);
852+
847853
closeQuietly(kafkaShareConsumerMetrics, "kafka share consumer metrics", firstException);
848854
closeQuietly(metrics, "consumer metrics", firstException);
849855
closeQuietly(deserializers, "consumer deserializers", firstException);
@@ -863,14 +869,27 @@ private void close(final Duration timeout, final boolean swallowException) {
863869
/**
864870
* Prior to closing the network thread, we need to make sure the following operations happen in the right sequence:
865871
* 1. commit pending acknowledgements and close any share sessions
866-
* 2. send leave group
872+
* 2. leave the group
867873
*/
868-
void prepareShutdown(final Timer timer, final AtomicReference<Throwable> firstException) {
874+
private void sendAcknowledgementsAndLeaveGroup(final Timer timer, final AtomicReference<Throwable> firstException) {
869875
completeQuietly(
870-
() -> {
871-
applicationEventHandler.addAndGet(new ShareLeaveOnCloseEvent(acknowledgementsToSend(), calculateDeadlineMs(timer)));
872-
},
873-
"Failed to send leaveGroup heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException);
876+
() -> {
877+
applicationEventHandler.addAndGet(new ShareAcknowledgeOnCloseEvent(acknowledgementsToSend(), calculateDeadlineMs(timer)));
878+
},
879+
"Failed to send pending acknowledgements with a timeout(ms)=" + timer.timeoutMs(), firstException);
880+
timer.update();
881+
882+
ShareUnsubscribeEvent unsubscribeEvent = new ShareUnsubscribeEvent(calculateDeadlineMs(timer));
883+
applicationEventHandler.add(unsubscribeEvent);
884+
try {
885+
processBackgroundEvents(unsubscribeEvent.future(), timer);
886+
log.info("Completed releasing assignment and leaving group to close consumer.");
887+
} catch (TimeoutException e) {
888+
log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't " +
889+
"complete it within {} ms. It will proceed to close.", timer.timeoutMs());
890+
} finally {
891+
timer.update();
892+
}
874893
}
875894

876895
/**

‎clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ public enum Type {
3333
UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
3434
COMMIT_ON_CLOSE, LEAVE_ON_CLOSE,
3535
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
36-
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE, SHARE_LEAVE_ON_CLOSE
36+
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
37+
SHARE_ACKNOWLEDGE_ON_CLOSE
3738
}
3839

3940
private final Type type;

‎clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java

+19-19
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636

3737
import java.util.List;
3838
import java.util.Map;
39-
import java.util.Objects;
4039
import java.util.concurrent.CompletableFuture;
4140
import java.util.function.BiConsumer;
4241
import java.util.function.Supplier;
@@ -136,15 +135,15 @@ public void process(ApplicationEvent event) {
136135
return;
137136

138137
case SHARE_SUBSCRIPTION_CHANGE:
139-
process((ShareSubscriptionChangeApplicationEvent) event);
138+
process((ShareSubscriptionChangeEvent) event);
140139
return;
141140

142141
case SHARE_UNSUBSCRIBE:
143-
process((ShareUnsubscribeApplicationEvent) event);
142+
process((ShareUnsubscribeEvent) event);
144143
return;
145144

146-
case SHARE_LEAVE_ON_CLOSE:
147-
process((ShareLeaveOnCloseEvent) event);
145+
case SHARE_ACKNOWLEDGE_ON_CLOSE:
146+
process((ShareAcknowledgeOnCloseEvent) event);
148147
return;
149148

150149
default:
@@ -328,7 +327,7 @@ private void process(final ShareAcknowledgeAsyncEvent event) {
328327
* consumer join the share group if it is not part of it yet, or send the updated subscription if
329328
* it is already a member.
330329
*/
331-
private void process(final ShareSubscriptionChangeApplicationEvent ignored) {
330+
private void process(final ShareSubscriptionChangeEvent ignored) {
332331
if (!requestManagers.shareHeartbeatRequestManager.isPresent()) {
333332
log.warn("Group membership manager not present when processing a subscribe event");
334333
return;
@@ -345,7 +344,7 @@ private void process(final ShareSubscriptionChangeApplicationEvent ignored) {
345344
* execution for releasing the assignment completes, and the request to leave
346345
* the group is sent out.
347346
*/
348-
private void process(final ShareUnsubscribeApplicationEvent event) {
347+
private void process(final ShareUnsubscribeEvent event) {
349348
if (!requestManagers.shareHeartbeatRequestManager.isPresent()) {
350349
KafkaException error = new KafkaException("Group membership manager not present when processing an unsubscribe event");
351350
event.future().completeExceptionally(error);
@@ -357,21 +356,22 @@ private void process(final ShareUnsubscribeApplicationEvent event) {
357356
future.whenComplete(complete(event.future()));
358357
}
359358

360-
private void process(final ShareLeaveOnCloseEvent event) {
361-
if (!requestManagers.shareHeartbeatRequestManager.isPresent()) {
362-
event.future().complete(null);
359+
/**
360+
* Process event indicating that the consumer is closing. This will make the consumer
361+
* complete pending acknowledgements.
362+
*
363+
* @param event Acknowledge-on-close event containing a future that will complete when
364+
* the acknowledgements have responses.
365+
*/
366+
private void process(final ShareAcknowledgeOnCloseEvent event) {
367+
if (!requestManagers.shareConsumeRequestManager.isPresent()) {
368+
KafkaException error = new KafkaException("Group membership manager not present when processing an acknowledge-on-close event");
369+
event.future().completeExceptionally(error);
363370
return;
364371
}
365372

366-
requestManagers.shareConsumeRequestManager.ifPresent(scrm -> scrm.acknowledgeOnClose(event.acknowledgementsMap()));
367-
368-
ShareMembershipManager membershipManager =
369-
Objects.requireNonNull(requestManagers.shareHeartbeatRequestManager.get().membershipManager(),
370-
"Expecting membership manager to be non-null");
371-
372-
log.debug("Leaving group before closing");
373-
CompletableFuture<Void> future = membershipManager.leaveGroup();
374-
// The future will be completed on heartbeat sent
373+
ShareConsumeRequestManager manager = requestManagers.shareConsumeRequestManager.get();
374+
CompletableFuture<Void> future = manager.acknowledgeOnClose(event.acknowledgementsMap(), event.deadlineMs());
375375
future.whenComplete(complete(event.future()));
376376
}
377377

‎clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareLeaveOnCloseEvent.java ‎clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgeOnCloseEvent.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121

2222
import java.util.Map;
2323

24-
public class ShareLeaveOnCloseEvent extends CompletableApplicationEvent<Void> {
24+
public class ShareAcknowledgeOnCloseEvent extends CompletableApplicationEvent<Void> {
2525

2626
private Map<TopicIdPartition, Acknowledgements> acknowledgementsMap;
2727

28-
public ShareLeaveOnCloseEvent(final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap, final long deadlineMs) {
29-
super(Type.SHARE_LEAVE_ON_CLOSE, deadlineMs);
28+
public ShareAcknowledgeOnCloseEvent(final Map<TopicIdPartition, Acknowledgements> acknowledgementsMap, final long deadlineMs) {
29+
super(Type.SHARE_ACKNOWLEDGE_ON_CLOSE, deadlineMs);
3030
this.acknowledgementsMap = acknowledgementsMap;
3131
}
3232

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
* calls the subscribe API. This will make the consumer join a share group if not part of it
2323
* yet, or just send the updated subscription to the broker if it's already a member of the group.
2424
*/
25-
public class ShareSubscriptionChangeApplicationEvent extends ApplicationEvent {
25+
public class ShareSubscriptionChangeEvent extends ApplicationEvent {
2626

27-
public ShareSubscriptionChangeApplicationEvent() {
27+
public ShareSubscriptionChangeEvent() {
2828
super(Type.SHARE_SUBSCRIPTION_CHANGE);
2929
}
3030
}

‎clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareUnsubscribeApplicationEvent.java ‎clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareUnsubscribeEvent.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
* complete and the heartbeat to leave the group is sent out (minimal effort to send the
2525
* leave group heartbeat, without waiting for any response or considering timeouts).
2626
*/
27-
public class ShareUnsubscribeApplicationEvent extends CompletableApplicationEvent<Void> {
28-
public ShareUnsubscribeApplicationEvent() {
29-
super(Type.SHARE_UNSUBSCRIBE, Integer.MAX_VALUE);
27+
public class ShareUnsubscribeEvent extends CompletableApplicationEvent<Void> {
28+
public ShareUnsubscribeEvent(final long deadlineMs) {
29+
super(Type.SHARE_UNSUBSCRIBE, deadlineMs);
3030
}
3131
}

‎clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java

+106-76
Large diffs are not rendered by default.

‎clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java

+23-19
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
2525
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
2626
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
27-
import org.apache.kafka.clients.consumer.internals.events.ShareLeaveOnCloseEvent;
28-
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeApplicationEvent;
29-
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeApplicationEvent;
27+
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
28+
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
29+
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
3030
import org.apache.kafka.common.KafkaException;
3131
import org.apache.kafka.common.TopicIdPartition;
3232
import org.apache.kafka.common.TopicPartition;
@@ -153,7 +153,8 @@ private ShareConsumerImpl<String, String> newConsumer(
153153
@Test
154154
public void testSuccessfulStartupShutdown() {
155155
consumer = newConsumer();
156-
completeShareLeaveOnCloseApplicationEventSuccessfully();
156+
completeShareAcknowledgeOnCloseApplicationEventSuccessfully();
157+
completeShareUnsubscribeApplicationEventSuccessfully();
157158
assertDoesNotThrow(() -> consumer.close());
158159
}
159160

@@ -179,7 +180,8 @@ public void testWakeupBeforeCallingPoll() {
179180
@Test
180181
public void testFailOnClosedConsumer() {
181182
consumer = newConsumer();
182-
completeShareLeaveOnCloseApplicationEventSuccessfully();
183+
completeShareAcknowledgeOnCloseApplicationEventSuccessfully();
184+
completeShareUnsubscribeApplicationEventSuccessfully();
183185
consumer.close();
184186
final IllegalStateException res = assertThrows(IllegalStateException.class, consumer::subscription);
185187
assertEquals("This consumer has already been closed.", res.getMessage());
@@ -188,10 +190,11 @@ public void testFailOnClosedConsumer() {
188190
@Test
189191
public void testVerifyApplicationEventOnShutdown() {
190192
consumer = newConsumer();
191-
completeShareLeaveOnCloseApplicationEventSuccessfully();
192-
doReturn(null).when(applicationEventHandler).addAndGet(any());
193+
completeShareAcknowledgeOnCloseApplicationEventSuccessfully();
194+
completeShareUnsubscribeApplicationEventSuccessfully();
193195
consumer.close();
194-
verify(applicationEventHandler).addAndGet(any(ShareLeaveOnCloseEvent.class));
196+
verify(applicationEventHandler).addAndGet(any(ShareAcknowledgeOnCloseEvent.class));
197+
verify(applicationEventHandler).add(any(ShareUnsubscribeEvent.class));
195198
}
196199

197200
@Test
@@ -215,7 +218,7 @@ public void testSubscribeGeneratesEvent() {
215218
String topic = "topic1";
216219
consumer.subscribe(singletonList(topic));
217220
assertEquals(singleton(topic), consumer.subscription());
218-
verify(applicationEventHandler).add(ArgumentMatchers.isA(ShareSubscriptionChangeApplicationEvent.class));
221+
verify(applicationEventHandler).add(ArgumentMatchers.isA(ShareSubscriptionChangeEvent.class));
219222
}
220223

221224
@Test
@@ -226,7 +229,7 @@ public void testUnsubscribeGeneratesUnsubscribeEvent() {
226229
consumer.unsubscribe();
227230

228231
assertTrue(consumer.subscription().isEmpty());
229-
verify(applicationEventHandler).add(ArgumentMatchers.isA(ShareUnsubscribeApplicationEvent.class));
232+
verify(applicationEventHandler).add(ArgumentMatchers.isA(ShareUnsubscribeEvent.class));
230233
}
231234

232235
@Test
@@ -236,7 +239,7 @@ public void testSubscribeToEmptyListActsAsUnsubscribe() {
236239

237240
consumer.subscribe(Collections.emptyList());
238241
assertTrue(consumer.subscription().isEmpty());
239-
verify(applicationEventHandler).add(ArgumentMatchers.isA(ShareUnsubscribeApplicationEvent.class));
242+
verify(applicationEventHandler).add(ArgumentMatchers.isA(ShareUnsubscribeEvent.class));
240243
}
241244

242245
@Test
@@ -348,11 +351,12 @@ public void testEnsurePollEventSentOnConsumerPoll() {
348351
consumer.subscribe(singletonList("topic1"));
349352
consumer.poll(Duration.ofMillis(100));
350353
verify(applicationEventHandler).add(any(PollEvent.class));
351-
verify(applicationEventHandler).add(any(ShareSubscriptionChangeApplicationEvent.class));
354+
verify(applicationEventHandler).add(any(ShareSubscriptionChangeEvent.class));
352355

353-
completeShareLeaveOnCloseApplicationEventSuccessfully();
356+
completeShareAcknowledgeOnCloseApplicationEventSuccessfully();
357+
completeShareUnsubscribeApplicationEventSuccessfully();
354358
consumer.close();
355-
verify(applicationEventHandler).addAndGet(any(ShareLeaveOnCloseEvent.class));
359+
verify(applicationEventHandler).addAndGet(any(ShareAcknowledgeOnCloseEvent.class));
356360
}
357361

358362
private Properties requiredConsumerPropertiesAndGroupId(final String groupId) {
@@ -448,17 +452,17 @@ public void testProcessBackgroundEventsTimesOut() throws Exception {
448452

449453
private void completeShareUnsubscribeApplicationEventSuccessfully() {
450454
doAnswer(invocation -> {
451-
ShareUnsubscribeApplicationEvent event = invocation.getArgument(0);
455+
ShareUnsubscribeEvent event = invocation.getArgument(0);
452456
event.future().complete(null);
453457
return null;
454-
}).when(applicationEventHandler).add(ArgumentMatchers.isA(ShareUnsubscribeApplicationEvent.class));
458+
}).when(applicationEventHandler).add(ArgumentMatchers.isA(ShareUnsubscribeEvent.class));
455459
}
456460

457-
private void completeShareLeaveOnCloseApplicationEventSuccessfully() {
461+
private void completeShareAcknowledgeOnCloseApplicationEventSuccessfully() {
458462
doAnswer(invocation -> {
459-
ShareLeaveOnCloseEvent event = invocation.getArgument(0);
463+
ShareAcknowledgeOnCloseEvent event = invocation.getArgument(0);
460464
event.future().complete(null);
461465
return null;
462-
}).when(applicationEventHandler).add(ArgumentMatchers.isA(ShareLeaveOnCloseEvent.class));
466+
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ShareAcknowledgeOnCloseEvent.class));
463467
}
464468
}

‎core/src/main/java/kafka/server/SharePartitionManager.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ public void acknowledgeSessionUpdate(String groupId, ShareFetchMetadata reqMetad
510510
if (shareSession.epoch != reqMetadata.epoch()) {
511511
log.debug("Share session error for {}: expected epoch {}, but got {} instead", key,
512512
shareSession.epoch, reqMetadata.epoch());
513-
throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
513+
throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
514514
} else {
515515
cache.touch(shareSession, time.milliseconds());
516516
shareSession.epoch = ShareFetchMetadata.nextEpoch(shareSession.epoch);
@@ -742,8 +742,6 @@ void shareAcknowledgement() {
742742
void recordAcknowledgement(byte ackType) {
743743
if (recordAcksSensorMap.containsKey(ackType)) {
744744
recordAcksSensorMap.get(ackType).record(1.0);
745-
} else {
746-
log.error("Unknown ack type {}", ackType);
747745
}
748746
}
749747

‎core/src/test/java/kafka/test/api/ShareConsumerTest.java

+62-105
Large diffs are not rendered by default.

‎server-common/src/main/java/org/apache/kafka/server/group/share/PersisterStateManager.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -276,9 +276,9 @@ protected void handleFindCoordinatorResponse(ClientResponse response) {
276276
enqueue(this);
277277
break;
278278

279-
case COORDINATOR_NOT_AVAILABLE: // retryable error codes
279+
case COORDINATOR_NOT_AVAILABLE: // retriable error codes
280280
case COORDINATOR_LOAD_IN_PROGRESS:
281-
log.warn("Received retryable error in find coordinator {}", error.message());
281+
log.warn("Received retriable error in find coordinator: {}", error.message());
282282
if (findCoordattempts >= this.maxFindCoordAttempts) {
283283
log.error("Exhausted max retries to find coordinator without success.");
284284
findCoordinatorErrorResponse(error, new Exception("Exhausted max retries to find coordinator without success."));

0 commit comments

Comments
 (0)
Please sign in to comment.