diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java index e6dbf846eacb8..1f8ddc725b58d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java @@ -161,7 +161,6 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse public NetworkClientDelegate.PollResult poll(long currentTimeMs) { if (coordinatorRequestManager.coordinator().isEmpty() || membershipManager().shouldSkipHeartbeat()) { membershipManager().onHeartbeatRequestSkipped(); - maybePropagateCoordinatorFatalErrorEvent(); return NetworkClientDelegate.PollResult.EMPTY; } pollTimer.update(currentTimeMs); @@ -264,11 +263,6 @@ public void resetPollTimer(final long pollMs) { pollTimer.reset(maxPollIntervalMs); } - private void maybePropagateCoordinatorFatalErrorEvent() { - coordinatorRequestManager.fatalError() - .ifPresent(fatalError -> backgroundEventHandler.add(new ErrorEvent(fatalError))); - } - private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, final boolean ignoreResponse) { NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); heartbeatRequestState.onSendAttempt(currentTimeMs); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 1d3503886a9c4..4f0deef5bf890 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -176,11 +176,9 @@ public CommitRequestManager( */ @Override public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { - // poll when the coordinator node is known and fatal error is not present - if (coordinatorRequestManager.coordinator().isEmpty()) { - pendingRequests.maybeFailOnCoordinatorFatalError(); + // poll only when the coordinator node is known. + if (coordinatorRequestManager.coordinator().isEmpty()) return EMPTY; - } if (closing) { return drainPendingOffsetCommitRequests(); @@ -1248,16 +1246,6 @@ private List<NetworkClientDelegate.UnsentRequest> drainPendingCommits() { clearAll(); return res; } - - private void maybeFailOnCoordinatorFatalError() { - coordinatorRequestManager.fatalError().ifPresent(error -> { - log.warn("Failing all unsent commit requests and offset fetches because of coordinator fatal error. ", error); - unsentOffsetCommits.forEach(request -> request.future.completeExceptionally(error)); - unsentOffsetFetches.forEach(request -> request.future.completeExceptionally(error)); - clearAll(); - } - ); - } } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java index 07c4fad45b133..4664267a0e858 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.DisconnectException; @@ -51,27 +53,24 @@ public class CoordinatorRequestManager implements RequestManager { private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 * 1000; private final Logger log; + private final BackgroundEventHandler backgroundEventHandler; private final String groupId; private final RequestState coordinatorRequestState; private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while private long totalDisconnectedMin = 0; private Node coordinator; - // Hold the latest fatal error received. It is exposed so that managers requiring a coordinator can access it and take - // appropriate actions. - // For example: - // - AbstractHeartbeatRequestManager propagates the error event to the application thread. - // - CommitRequestManager fail pending requests. - private Optional<Throwable> fatalError = Optional.empty(); public CoordinatorRequestManager( final LogContext logContext, final long retryBackoffMs, final long retryBackoffMaxMs, + final BackgroundEventHandler errorHandler, final String groupId ) { Objects.requireNonNull(groupId); this.log = logContext.logger(this.getClass()); + this.backgroundEventHandler = errorHandler; this.groupId = groupId; this.coordinatorRequestState = new RequestState( logContext, @@ -115,7 +114,6 @@ NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long curren ); return unsentRequest.whenComplete((clientResponse, throwable) -> { - clearFatalError(); if (clientResponse != null) { FindCoordinatorResponse response = (FindCoordinatorResponse) clientResponse.responseBody(); onResponse(clientResponse.receivedTimeMs(), response); @@ -202,12 +200,12 @@ private void onFailedResponse(final long currentTimeMs, final Throwable exceptio if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) { log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage()); KafkaException groupAuthorizationException = GroupAuthorizationException.forGroupId(this.groupId); - fatalError = Optional.of(groupAuthorizationException); + backgroundEventHandler.add(new ErrorEvent(groupAuthorizationException)); return; } log.warn("FindCoordinator request failed due to fatal exception", exception); - fatalError = Optional.of(exception); + backgroundEventHandler.add(new ErrorEvent(exception)); } /** @@ -246,12 +244,4 @@ private void onResponse( public Optional<Node> coordinator() { return Optional.ofNullable(this.coordinator); } - - private void clearFatalError() { - this.fatalError = Optional.empty(); - } - - public Optional<Throwable> fatalError() { - return fatalError; - } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 960567ac96bec..304f0fffd4ad5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -194,6 +194,7 @@ protected RequestManagers create() { logContext, retryBackoffMs, retryBackoffMaxMs, + backgroundEventHandler, groupRebalanceConfig.groupId); commitRequestManager = new CommitRequestManager( time, @@ -294,6 +295,7 @@ protected RequestManagers create() { logContext, retryBackoffMs, retryBackoffMaxMs, + backgroundEventHandler, groupRebalanceConfig.groupId); ShareMembershipManager shareMembershipManager = new ShareMembershipManager( logContext, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 2749563df2742..33ca2844305c7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -2239,7 +2239,7 @@ public void testPollAuthenticationFailure(GroupProtocol groupProtocol) throws In // by the background thread, so it can realize there is authentication fail and then // throw the AuthenticationException assertPollEventuallyThrows(consumer, AuthenticationException.class, - "this consumer was not able to discover metadata errors during continuous polling."); + "he consumer was not able to discover metadata errors during continuous polling."); } else { assertThrows(AuthenticationException.class, () -> consumer.poll(Duration.ZERO)); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index dae0387206e3c..252d5a7ccbd08 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -1498,23 +1498,6 @@ public void testSignalClose() { OffsetCommitRequestData data = (OffsetCommitRequestData) res.unsentRequests.get(0).requestBuilder().build().data(); assertEquals("topic", data.topics().get(0).name()); } - - @Test - public void testPollWithFatalErrorShouldFailAllUnsentRequests() { - CommitRequestManager commitRequestManager = create(true, 100); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); - - commitRequestManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), 200); - assertEquals(1, commitRequestManager.pendingRequests.unsentOffsetFetches.size()); - - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); - when(coordinatorRequestManager.fatalError()) - .thenReturn(Optional.of(new GroupAuthorizationException("Group authorization exception"))); - - assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(200)); - - assertEmptyPendingRequests(commitRequestManager); - } private static void assertEmptyPendingRequests(CommitRequestManager commitRequestManager) { assertTrue(commitRequestManager.pendingRequests.inflightOffsetFetches.isEmpty()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java index 72599100a3685..7e805dc3cd3b6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java @@ -18,7 +18,9 @@ import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -33,8 +35,6 @@ import org.apache.logging.log4j.Level; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; import java.util.Collections; import java.util.List; @@ -49,7 +49,9 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; public class CoordinatorRequestManagerTest { @@ -189,10 +191,23 @@ public void testBackoffAfterRetriableFailure() { } @Test - public void testBackoffAfterFatalError() { + public void testPropagateAndBackoffAfterFatalError() { CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID); expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED); + verify(backgroundEventHandler).add(argThat(backgroundEvent -> { + if (!(backgroundEvent instanceof ErrorEvent)) + return false; + + RuntimeException exception = ((ErrorEvent) backgroundEvent).error(); + + if (!(exception instanceof GroupAuthorizationException)) + return false; + + GroupAuthorizationException groupAuthException = (GroupAuthorizationException) exception; + return groupAuthException.groupId().equals(GROUP_ID); + })); + time.sleep(RETRY_BACKOFF_MS - 1); assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests); @@ -238,22 +253,6 @@ public void testNetworkTimeout() { res2 = coordinatorManager.poll(time.milliseconds()); assertEquals(1, res2.unsentRequests.size()); } - - @ParameterizedTest - @EnumSource(value = Errors.class, names = {"NONE", "COORDINATOR_NOT_AVAILABLE"}) - public void testClearFatalErrorWhenReceivingSuccessfulResponse(Errors error) { - CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID); - expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED); - assertTrue(coordinatorManager.fatalError().isPresent()); - - time.sleep(RETRY_BACKOFF_MS); - // there are no successful responses, so the fatal error should persist - assertTrue(coordinatorManager.fatalError().isPresent()); - - // receiving a successful response should clear the fatal error - expectFindCoordinatorRequest(coordinatorManager, error); - assertTrue(coordinatorManager.fatalError().isEmpty()); - } private void expectFindCoordinatorRequest( CoordinatorRequestManager coordinatorManager, @@ -274,6 +273,7 @@ private CoordinatorRequestManager setupCoordinatorManager(String groupId) { new LogContext(), RETRY_BACKOFF_MS, RETRY_BACKOFF_MS, + this.backgroundEventHandler, groupId ); } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 8ec6b3c532776..3ee420f2c4992 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -1272,7 +1272,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034")) def testCommitWithNoAccess(quorum: String, groupProtocol: String): Unit = { val consumer = createConsumer() assertThrows(classOf[GroupAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)) @@ -1309,7 +1309,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034")) def testCommitWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = { addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) val consumer = createConsumer() @@ -1335,7 +1335,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034")) def testOffsetFetchWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = { createTopicWithBrokerPrincipal(topic) addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index d666bee6a41c2..a8a66dbd54f5c 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -441,4 +441,8 @@ object QuorumTestHarness { // The following is for tests that only work with the classic group protocol because of relying on Zookeeper def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit: java.util.stream.Stream[Arguments] = stream.Stream.of(Arguments.of("zk", GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT))) + + // The following parameter groups are to *temporarily* avoid bugs with the CONSUMER group protocol Consumer + // implementation that would otherwise cause tests to fail. + def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly }