Skip to content

Commit f3a9355

Browse files
authored
Revert "KAFKA-18034: CommitRequestManager should fail pending requests on fatal coordinator errors (#18050)" (#18544)
This reverts commit 70d6312. Reviewers: Luke Chen <[email protected]>
1 parent 1672a4b commit f3a9355

File tree

9 files changed

+38
-77
lines changed

9 files changed

+38
-77
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java

-6
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
161161
public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
162162
if (coordinatorRequestManager.coordinator().isEmpty() || membershipManager().shouldSkipHeartbeat()) {
163163
membershipManager().onHeartbeatRequestSkipped();
164-
maybePropagateCoordinatorFatalErrorEvent();
165164
return NetworkClientDelegate.PollResult.EMPTY;
166165
}
167166
pollTimer.update(currentTimeMs);
@@ -264,11 +263,6 @@ public void resetPollTimer(final long pollMs) {
264263
pollTimer.reset(maxPollIntervalMs);
265264
}
266265

267-
private void maybePropagateCoordinatorFatalErrorEvent() {
268-
coordinatorRequestManager.fatalError()
269-
.ifPresent(fatalError -> backgroundEventHandler.add(new ErrorEvent(fatalError)));
270-
}
271-
272266
private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, final boolean ignoreResponse) {
273267
NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse);
274268
heartbeatRequestState.onSendAttempt(currentTimeMs);

clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java

+2-14
Original file line numberDiff line numberDiff line change
@@ -176,11 +176,9 @@ public CommitRequestManager(
176176
*/
177177
@Override
178178
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
179-
// poll when the coordinator node is known and fatal error is not present
180-
if (coordinatorRequestManager.coordinator().isEmpty()) {
181-
pendingRequests.maybeFailOnCoordinatorFatalError();
179+
// poll only when the coordinator node is known.
180+
if (coordinatorRequestManager.coordinator().isEmpty())
182181
return EMPTY;
183-
}
184182

185183
if (closing) {
186184
return drainPendingOffsetCommitRequests();
@@ -1248,16 +1246,6 @@ private List<NetworkClientDelegate.UnsentRequest> drainPendingCommits() {
12481246
clearAll();
12491247
return res;
12501248
}
1251-
1252-
private void maybeFailOnCoordinatorFatalError() {
1253-
coordinatorRequestManager.fatalError().ifPresent(error -> {
1254-
log.warn("Failing all unsent commit requests and offset fetches because of coordinator fatal error. ", error);
1255-
unsentOffsetCommits.forEach(request -> request.future.completeExceptionally(error));
1256-
unsentOffsetFetches.forEach(request -> request.future.completeExceptionally(error));
1257-
clearAll();
1258-
}
1259-
);
1260-
}
12611249
}
12621250

12631251
/**

clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java

+7-17
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.kafka.clients.consumer.internals;
1818

19+
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
20+
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
1921
import org.apache.kafka.common.KafkaException;
2022
import org.apache.kafka.common.Node;
2123
import org.apache.kafka.common.errors.DisconnectException;
@@ -51,27 +53,24 @@
5153
public class CoordinatorRequestManager implements RequestManager {
5254
private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 * 1000;
5355
private final Logger log;
56+
private final BackgroundEventHandler backgroundEventHandler;
5457
private final String groupId;
5558

5659
private final RequestState coordinatorRequestState;
5760
private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while
5861
private long totalDisconnectedMin = 0;
5962
private Node coordinator;
60-
// Hold the latest fatal error received. It is exposed so that managers requiring a coordinator can access it and take
61-
// appropriate actions.
62-
// For example:
63-
// - AbstractHeartbeatRequestManager propagates the error event to the application thread.
64-
// - CommitRequestManager fail pending requests.
65-
private Optional<Throwable> fatalError = Optional.empty();
6663

6764
public CoordinatorRequestManager(
6865
final LogContext logContext,
6966
final long retryBackoffMs,
7067
final long retryBackoffMaxMs,
68+
final BackgroundEventHandler errorHandler,
7169
final String groupId
7270
) {
7371
Objects.requireNonNull(groupId);
7472
this.log = logContext.logger(this.getClass());
73+
this.backgroundEventHandler = errorHandler;
7574
this.groupId = groupId;
7675
this.coordinatorRequestState = new RequestState(
7776
logContext,
@@ -115,7 +114,6 @@ NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long curren
115114
);
116115

117116
return unsentRequest.whenComplete((clientResponse, throwable) -> {
118-
clearFatalError();
119117
if (clientResponse != null) {
120118
FindCoordinatorResponse response = (FindCoordinatorResponse) clientResponse.responseBody();
121119
onResponse(clientResponse.receivedTimeMs(), response);
@@ -202,12 +200,12 @@ private void onFailedResponse(final long currentTimeMs, final Throwable exceptio
202200
if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
203201
log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage());
204202
KafkaException groupAuthorizationException = GroupAuthorizationException.forGroupId(this.groupId);
205-
fatalError = Optional.of(groupAuthorizationException);
203+
backgroundEventHandler.add(new ErrorEvent(groupAuthorizationException));
206204
return;
207205
}
208206

209207
log.warn("FindCoordinator request failed due to fatal exception", exception);
210-
fatalError = Optional.of(exception);
208+
backgroundEventHandler.add(new ErrorEvent(exception));
211209
}
212210

213211
/**
@@ -246,12 +244,4 @@ private void onResponse(
246244
public Optional<Node> coordinator() {
247245
return Optional.ofNullable(this.coordinator);
248246
}
249-
250-
private void clearFatalError() {
251-
this.fatalError = Optional.empty();
252-
}
253-
254-
public Optional<Throwable> fatalError() {
255-
return fatalError;
256-
}
257247
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java

+2
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ protected RequestManagers create() {
194194
logContext,
195195
retryBackoffMs,
196196
retryBackoffMaxMs,
197+
backgroundEventHandler,
197198
groupRebalanceConfig.groupId);
198199
commitRequestManager = new CommitRequestManager(
199200
time,
@@ -294,6 +295,7 @@ protected RequestManagers create() {
294295
logContext,
295296
retryBackoffMs,
296297
retryBackoffMaxMs,
298+
backgroundEventHandler,
297299
groupRebalanceConfig.groupId);
298300
ShareMembershipManager shareMembershipManager = new ShareMembershipManager(
299301
logContext,

clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -2239,7 +2239,7 @@ public void testPollAuthenticationFailure(GroupProtocol groupProtocol) throws In
22392239
// by the background thread, so it can realize there is authentication fail and then
22402240
// throw the AuthenticationException
22412241
assertPollEventuallyThrows(consumer, AuthenticationException.class,
2242-
"this consumer was not able to discover metadata errors during continuous polling.");
2242+
"he consumer was not able to discover metadata errors during continuous polling.");
22432243
} else {
22442244
assertThrows(AuthenticationException.class, () -> consumer.poll(Duration.ZERO));
22452245
}

clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java

-17
Original file line numberDiff line numberDiff line change
@@ -1498,23 +1498,6 @@ public void testSignalClose() {
14981498
OffsetCommitRequestData data = (OffsetCommitRequestData) res.unsentRequests.get(0).requestBuilder().build().data();
14991499
assertEquals("topic", data.topics().get(0).name());
15001500
}
1501-
1502-
@Test
1503-
public void testPollWithFatalErrorShouldFailAllUnsentRequests() {
1504-
CommitRequestManager commitRequestManager = create(true, 100);
1505-
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
1506-
1507-
commitRequestManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), 200);
1508-
assertEquals(1, commitRequestManager.pendingRequests.unsentOffsetFetches.size());
1509-
1510-
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
1511-
when(coordinatorRequestManager.fatalError())
1512-
.thenReturn(Optional.of(new GroupAuthorizationException("Group authorization exception")));
1513-
1514-
assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(200));
1515-
1516-
assertEmptyPendingRequests(commitRequestManager);
1517-
}
15181501

15191502
private static void assertEmptyPendingRequests(CommitRequestManager commitRequestManager) {
15201503
assertTrue(commitRequestManager.pendingRequests.inflightOffsetFetches.isEmpty());

clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java

+19-19
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import org.apache.kafka.clients.ClientResponse;
2020
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
21+
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
2122
import org.apache.kafka.common.Node;
23+
import org.apache.kafka.common.errors.GroupAuthorizationException;
2224
import org.apache.kafka.common.errors.TimeoutException;
2325
import org.apache.kafka.common.protocol.ApiKeys;
2426
import org.apache.kafka.common.protocol.Errors;
@@ -33,8 +35,6 @@
3335
import org.apache.logging.log4j.Level;
3436
import org.junit.jupiter.api.BeforeEach;
3537
import org.junit.jupiter.api.Test;
36-
import org.junit.jupiter.params.ParameterizedTest;
37-
import org.junit.jupiter.params.provider.EnumSource;
3838

3939
import java.util.Collections;
4040
import java.util.List;
@@ -49,7 +49,9 @@
4949
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
5050
import static org.junit.jupiter.api.Assertions.assertThrows;
5151
import static org.junit.jupiter.api.Assertions.assertTrue;
52+
import static org.mockito.ArgumentMatchers.argThat;
5253
import static org.mockito.Mockito.mock;
54+
import static org.mockito.Mockito.verify;
5355
import static org.mockito.Mockito.verifyNoInteractions;
5456

5557
public class CoordinatorRequestManagerTest {
@@ -189,10 +191,23 @@ public void testBackoffAfterRetriableFailure() {
189191
}
190192

191193
@Test
192-
public void testBackoffAfterFatalError() {
194+
public void testPropagateAndBackoffAfterFatalError() {
193195
CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID);
194196
expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED);
195197

198+
verify(backgroundEventHandler).add(argThat(backgroundEvent -> {
199+
if (!(backgroundEvent instanceof ErrorEvent))
200+
return false;
201+
202+
RuntimeException exception = ((ErrorEvent) backgroundEvent).error();
203+
204+
if (!(exception instanceof GroupAuthorizationException))
205+
return false;
206+
207+
GroupAuthorizationException groupAuthException = (GroupAuthorizationException) exception;
208+
return groupAuthException.groupId().equals(GROUP_ID);
209+
}));
210+
196211
time.sleep(RETRY_BACKOFF_MS - 1);
197212
assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests);
198213

@@ -238,22 +253,6 @@ public void testNetworkTimeout() {
238253
res2 = coordinatorManager.poll(time.milliseconds());
239254
assertEquals(1, res2.unsentRequests.size());
240255
}
241-
242-
@ParameterizedTest
243-
@EnumSource(value = Errors.class, names = {"NONE", "COORDINATOR_NOT_AVAILABLE"})
244-
public void testClearFatalErrorWhenReceivingSuccessfulResponse(Errors error) {
245-
CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID);
246-
expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED);
247-
assertTrue(coordinatorManager.fatalError().isPresent());
248-
249-
time.sleep(RETRY_BACKOFF_MS);
250-
// there are no successful responses, so the fatal error should persist
251-
assertTrue(coordinatorManager.fatalError().isPresent());
252-
253-
// receiving a successful response should clear the fatal error
254-
expectFindCoordinatorRequest(coordinatorManager, error);
255-
assertTrue(coordinatorManager.fatalError().isEmpty());
256-
}
257256

258257
private void expectFindCoordinatorRequest(
259258
CoordinatorRequestManager coordinatorManager,
@@ -274,6 +273,7 @@ private CoordinatorRequestManager setupCoordinatorManager(String groupId) {
274273
new LogContext(),
275274
RETRY_BACKOFF_MS,
276275
RETRY_BACKOFF_MS,
276+
this.backgroundEventHandler,
277277
groupId
278278
);
279279
}

core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -1272,7 +1272,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
12721272
}
12731273

12741274
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
1275-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
1275+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
12761276
def testCommitWithNoAccess(quorum: String, groupProtocol: String): Unit = {
12771277
val consumer = createConsumer()
12781278
assertThrows(classOf[GroupAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava))
@@ -1309,7 +1309,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
13091309
}
13101310

13111311
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
1312-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
1312+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
13131313
def testCommitWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = {
13141314
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource)
13151315
val consumer = createConsumer()
@@ -1335,7 +1335,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
13351335
}
13361336

13371337
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
1338-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
1338+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
13391339
def testOffsetFetchWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = {
13401340
createTopicWithBrokerPrincipal(topic)
13411341
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource)

core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala

+4
Original file line numberDiff line numberDiff line change
@@ -441,4 +441,8 @@ object QuorumTestHarness {
441441

442442
// The following is for tests that only work with the classic group protocol because of relying on Zookeeper
443443
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit: java.util.stream.Stream[Arguments] = stream.Stream.of(Arguments.of("zk", GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT)))
444+
445+
// The following parameter groups are to *temporarily* avoid bugs with the CONSUMER group protocol Consumer
446+
// implementation that would otherwise cause tests to fail.
447+
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
444448
}

0 commit comments

Comments
 (0)