Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "KAFKA-18034: CommitRequestManager should fail pending requests on fatal coordinator errors (#18050)" #18544

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
if (coordinatorRequestManager.coordinator().isEmpty() || membershipManager().shouldSkipHeartbeat()) {
membershipManager().onHeartbeatRequestSkipped();
maybePropagateCoordinatorFatalErrorEvent();
return NetworkClientDelegate.PollResult.EMPTY;
}
pollTimer.update(currentTimeMs);
Expand Down Expand Up @@ -264,11 +263,6 @@ public void resetPollTimer(final long pollMs) {
pollTimer.reset(maxPollIntervalMs);
}

private void maybePropagateCoordinatorFatalErrorEvent() {
coordinatorRequestManager.fatalError()
.ifPresent(fatalError -> backgroundEventHandler.add(new ErrorEvent(fatalError)));
}

private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, final boolean ignoreResponse) {
NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse);
heartbeatRequestState.onSendAttempt(currentTimeMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,9 @@ public CommitRequestManager(
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
// poll when the coordinator node is known and fatal error is not present
if (coordinatorRequestManager.coordinator().isEmpty()) {
pendingRequests.maybeFailOnCoordinatorFatalError();
// poll only when the coordinator node is known.
if (coordinatorRequestManager.coordinator().isEmpty())
return EMPTY;
}

if (closing) {
return drainPendingOffsetCommitRequests();
Expand Down Expand Up @@ -1248,16 +1246,6 @@ private List<NetworkClientDelegate.UnsentRequest> drainPendingCommits() {
clearAll();
return res;
}

private void maybeFailOnCoordinatorFatalError() {
coordinatorRequestManager.fatalError().ifPresent(error -> {
log.warn("Failing all unsent commit requests and offset fetches because of coordinator fatal error. ", error);
unsentOffsetCommits.forEach(request -> request.future.completeExceptionally(error));
unsentOffsetFetches.forEach(request -> request.future.completeExceptionally(error));
clearAll();
}
);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
Expand Down Expand Up @@ -51,27 +53,24 @@
public class CoordinatorRequestManager implements RequestManager {
private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 * 1000;
private final Logger log;
private final BackgroundEventHandler backgroundEventHandler;
private final String groupId;

private final RequestState coordinatorRequestState;
private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while
private long totalDisconnectedMin = 0;
private Node coordinator;
// Hold the latest fatal error received. It is exposed so that managers requiring a coordinator can access it and take
// appropriate actions.
// For example:
// - AbstractHeartbeatRequestManager propagates the error event to the application thread.
// - CommitRequestManager fail pending requests.
private Optional<Throwable> fatalError = Optional.empty();

public CoordinatorRequestManager(
final LogContext logContext,
final long retryBackoffMs,
final long retryBackoffMaxMs,
final BackgroundEventHandler errorHandler,
final String groupId
) {
Objects.requireNonNull(groupId);
this.log = logContext.logger(this.getClass());
this.backgroundEventHandler = errorHandler;
this.groupId = groupId;
this.coordinatorRequestState = new RequestState(
logContext,
Expand Down Expand Up @@ -115,7 +114,6 @@ NetworkClientDelegate.UnsentRequest makeFindCoordinatorRequest(final long curren
);

return unsentRequest.whenComplete((clientResponse, throwable) -> {
clearFatalError();
if (clientResponse != null) {
FindCoordinatorResponse response = (FindCoordinatorResponse) clientResponse.responseBody();
onResponse(clientResponse.receivedTimeMs(), response);
Expand Down Expand Up @@ -202,12 +200,12 @@ private void onFailedResponse(final long currentTimeMs, final Throwable exceptio
if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage());
KafkaException groupAuthorizationException = GroupAuthorizationException.forGroupId(this.groupId);
fatalError = Optional.of(groupAuthorizationException);
backgroundEventHandler.add(new ErrorEvent(groupAuthorizationException));
return;
}

log.warn("FindCoordinator request failed due to fatal exception", exception);
fatalError = Optional.of(exception);
backgroundEventHandler.add(new ErrorEvent(exception));
}

/**
Expand Down Expand Up @@ -246,12 +244,4 @@ private void onResponse(
public Optional<Node> coordinator() {
return Optional.ofNullable(this.coordinator);
}

private void clearFatalError() {
this.fatalError = Optional.empty();
}

public Optional<Throwable> fatalError() {
return fatalError;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ protected RequestManagers create() {
logContext,
retryBackoffMs,
retryBackoffMaxMs,
backgroundEventHandler,
groupRebalanceConfig.groupId);
commitRequestManager = new CommitRequestManager(
time,
Expand Down Expand Up @@ -294,6 +295,7 @@ protected RequestManagers create() {
logContext,
retryBackoffMs,
retryBackoffMaxMs,
backgroundEventHandler,
groupRebalanceConfig.groupId);
ShareMembershipManager shareMembershipManager = new ShareMembershipManager(
logContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2239,7 +2239,7 @@ public void testPollAuthenticationFailure(GroupProtocol groupProtocol) throws In
// by the background thread, so it can realize there is authentication fail and then
// throw the AuthenticationException
assertPollEventuallyThrows(consumer, AuthenticationException.class,
"this consumer was not able to discover metadata errors during continuous polling.");
"he consumer was not able to discover metadata errors during continuous polling.");
} else {
assertThrows(AuthenticationException.class, () -> consumer.poll(Duration.ZERO));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1498,23 +1498,6 @@ public void testSignalClose() {
OffsetCommitRequestData data = (OffsetCommitRequestData) res.unsentRequests.get(0).requestBuilder().build().data();
assertEquals("topic", data.topics().get(0).name());
}

@Test
public void testPollWithFatalErrorShouldFailAllUnsentRequests() {
CommitRequestManager commitRequestManager = create(true, 100);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));

commitRequestManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), 200);
assertEquals(1, commitRequestManager.pendingRequests.unsentOffsetFetches.size());

when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
when(coordinatorRequestManager.fatalError())
.thenReturn(Optional.of(new GroupAuthorizationException("Group authorization exception")));

assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(200));

assertEmptyPendingRequests(commitRequestManager);
}

private static void assertEmptyPendingRequests(CommitRequestManager commitRequestManager) {
assertTrue(commitRequestManager.pendingRequests.inflightOffsetFetches.isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
Expand All @@ -33,8 +35,6 @@
import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.util.Collections;
import java.util.List;
Expand All @@ -49,7 +49,9 @@
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;

public class CoordinatorRequestManagerTest {
Expand Down Expand Up @@ -189,10 +191,23 @@ public void testBackoffAfterRetriableFailure() {
}

@Test
public void testBackoffAfterFatalError() {
public void testPropagateAndBackoffAfterFatalError() {
CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID);
expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED);

verify(backgroundEventHandler).add(argThat(backgroundEvent -> {
if (!(backgroundEvent instanceof ErrorEvent))
return false;

RuntimeException exception = ((ErrorEvent) backgroundEvent).error();

if (!(exception instanceof GroupAuthorizationException))
return false;

GroupAuthorizationException groupAuthException = (GroupAuthorizationException) exception;
return groupAuthException.groupId().equals(GROUP_ID);
}));

time.sleep(RETRY_BACKOFF_MS - 1);
assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests);

Expand Down Expand Up @@ -238,22 +253,6 @@ public void testNetworkTimeout() {
res2 = coordinatorManager.poll(time.milliseconds());
assertEquals(1, res2.unsentRequests.size());
}

@ParameterizedTest
@EnumSource(value = Errors.class, names = {"NONE", "COORDINATOR_NOT_AVAILABLE"})
public void testClearFatalErrorWhenReceivingSuccessfulResponse(Errors error) {
CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID);
expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED);
assertTrue(coordinatorManager.fatalError().isPresent());

time.sleep(RETRY_BACKOFF_MS);
// there are no successful responses, so the fatal error should persist
assertTrue(coordinatorManager.fatalError().isPresent());

// receiving a successful response should clear the fatal error
expectFindCoordinatorRequest(coordinatorManager, error);
assertTrue(coordinatorManager.fatalError().isEmpty());
}

private void expectFindCoordinatorRequest(
CoordinatorRequestManager coordinatorManager,
Expand All @@ -274,6 +273,7 @@ private CoordinatorRequestManager setupCoordinatorManager(String groupId) {
new LogContext(),
RETRY_BACKOFF_MS,
RETRY_BACKOFF_MS,
this.backgroundEventHandler,
groupId
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
def testCommitWithNoAccess(quorum: String, groupProtocol: String): Unit = {
val consumer = createConsumer()
assertThrows(classOf[GroupAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava))
Expand Down Expand Up @@ -1309,7 +1309,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
def testCommitWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = {
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource)
val consumer = createConsumer()
Expand All @@ -1335,7 +1335,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
def testOffsetFetchWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,4 +441,8 @@ object QuorumTestHarness {

// The following is for tests that only work with the classic group protocol because of relying on Zookeeper
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit: java.util.stream.Stream[Arguments] = stream.Stream.of(Arguments.of("zk", GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT)))

// The following parameter groups are to *temporarily* avoid bugs with the CONSUMER group protocol Consumer
// implementation that would otherwise cause tests to fail.
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
}
Loading