Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18736: Add pollOnClose() and maximumTimeToWait() #19233

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult.EMPTY;

/**
* <p>Manages the request creation and response handling for the Streams group heartbeat. The class creates a
* heartbeat request using the state stored in the membership manager. The requests can be retrieved
Expand Down Expand Up @@ -374,6 +376,55 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
}
}

/**
* Generate a heartbeat request to leave the group if the state is still LEAVING when this is
* called to close the consumer.
* <p/>
* Note that when closing the consumer, even though an event to Unsubscribe is generated
* (triggers callbacks and sends leave group), it could be the case that the Unsubscribe event
* processing does not complete in time and moves on to close the managers (ex. calls to
* close with zero timeout). So we could end up on this pollOnClose with the member in
* {@link MemberState#PREPARE_LEAVING} (ex. app thread did not have the time to process the
* event to execute callbacks), or {@link MemberState#LEAVING} (ex. the leave request could
* not be sent due to coordinator not available at that time). In all cases, the pollOnClose
* will be triggered right before sending the final requests, so we ensure that we generate
* the request to leave if needed.
*
* @param currentTimeMs The current system time in milliseconds at which the method was called
* @return PollResult containing the request to send
*/
@Override
public NetworkClientDelegate.PollResult pollOnClose(long currentTimeMs) {
if (membershipManager.isLeavingGroup()) {
NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequestAndLogResponse(currentTimeMs);
return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), List.of(request));
}
return EMPTY;
}

/**
* Returns the delay for which the application thread can safely wait before it should be responsive
* to results from the request managers. For example, the subscription state can change when heartbeats
* are sent, so blocking for longer than the heartbeat interval might mean the application thread is not
* responsive to changes.
*
* <p>Similarly, we may have to unblock the application thread to send a `PollApplicationEvent` to make sure
* our poll timer will not expire while we are polling.
*
* <p>In the event that heartbeats are currently being skipped, this still returns the next heartbeat
* delay rather than {@code Long.MAX_VALUE} so that the application thread remains responsive.
*/
@Override
public long maximumTimeToWait(long currentTimeMs) {
pollTimer.update(currentTimeMs);
if (pollTimer.isExpired() ||
membershipManager.shouldNotWaitForHeartbeatInterval() && !heartbeatRequestState.requestInFlight()) {

return 0L;
}
return Math.min(pollTimer.remainingMs() / 2, heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
}

/**
* A heartbeat should be sent without waiting for the heartbeat interval to expire if:
* - the member is leaving the group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down Expand Up @@ -1382,6 +1383,132 @@ private static Stream<Arguments> provideOtherErrors() {
.map(Arguments::of);
}

@Test
public void testPollOnCloseWhenIsNotLeaving() {
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();

NetworkClientDelegate.PollResult result = heartbeatRequestManager.pollOnClose(time.milliseconds());

assertEquals(NetworkClientDelegate.PollResult.EMPTY, result);
}

@Test
public void testPollOnCloseWhenIsLeaving() {
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
when(membershipManager.isLeavingGroup()).thenReturn(true);
when(membershipManager.groupId()).thenReturn(GROUP_ID);
when(membershipManager.memberId()).thenReturn(MEMBER_ID);
when(membershipManager.memberEpoch()).thenReturn(LEAVE_GROUP_MEMBER_EPOCH);

NetworkClientDelegate.PollResult result = heartbeatRequestManager.pollOnClose(time.milliseconds());

assertEquals(1, result.unsentRequests.size());
final NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0);
StreamsGroupHeartbeatRequest streamsRequest = (StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build();
assertEquals(GROUP_ID, streamsRequest.data().groupId());
assertEquals(MEMBER_ID, streamsRequest.data().memberId());
assertEquals(LEAVE_GROUP_MEMBER_EPOCH, streamsRequest.data().memberEpoch());
}

@Test
public void testMaximumTimeToWaitPollTimerExpired() {
try (
final MockedConstruction<Timer> timerMockedConstruction = mockConstruction(Timer.class, (mock, context) -> {
when(mock.isExpired()).thenReturn(true);
});
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
(mock, context) -> {
when(mock.requestInFlight()).thenReturn(false);
})
) {
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
final Timer pollTimer = timerMockedConstruction.constructed().get(0);
time.sleep(1234);

final long maximumTimeToWait = heartbeatRequestManager.maximumTimeToWait(time.milliseconds());

assertEquals(0, maximumTimeToWait);
verify(pollTimer).update(time.milliseconds());
}
}

@Test
public void testMaximumTimeToWaitWhenHeartbeatShouldBeSentImmediately() {
try (
final MockedConstruction<Timer> timerMockedConstruction = mockConstruction(Timer.class);
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
(mock, context) -> {
when(mock.requestInFlight()).thenReturn(false);
})
) {
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
final Timer pollTimer = timerMockedConstruction.constructed().get(0);
when(membershipManager.shouldNotWaitForHeartbeatInterval()).thenReturn(true);
time.sleep(1234);

final long maximumTimeToWait = heartbeatRequestManager.maximumTimeToWait(time.milliseconds());

assertEquals(0, maximumTimeToWait);
verify(pollTimer).update(time.milliseconds());
}
}

@ParameterizedTest
@CsvSource({"true, false", "false, false", "true, true"})
public void testMaximumTimeToWaitWhenHeartbeatShouldBeNotSentImmediately(final boolean isRequestInFlight,
final boolean shouldNotWaitForHeartbeatInterval) {
final long remainingMs = 12L;
final long timeToNextHeartbeatMs = 6L;
try (
final MockedConstruction<Timer> timerMockedConstruction = mockConstruction(Timer.class, (mock, context) -> {
when(mock.remainingMs()).thenReturn(remainingMs);
});
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
(mock, context) -> {
when(mock.requestInFlight()).thenReturn(isRequestInFlight);
when(mock.timeToNextHeartbeatMs(anyLong())).thenReturn(timeToNextHeartbeatMs);
})
) {
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
final Timer pollTimer = timerMockedConstruction.constructed().get(0);
when(membershipManager.shouldNotWaitForHeartbeatInterval()).thenReturn(shouldNotWaitForHeartbeatInterval);
time.sleep(1234);

final long maximumTimeToWait = heartbeatRequestManager.maximumTimeToWait(time.milliseconds());

assertEquals(timeToNextHeartbeatMs, maximumTimeToWait);
verify(pollTimer).update(time.milliseconds());
}
}

@ParameterizedTest
@CsvSource({"12, 5", "10, 6"})
public void testMaximumTimeToWaitSelectingMinimumWaitTime(final long remainingMs,
final long timeToNextHeartbeatMs) {
try (
final MockedConstruction<Timer> timerMockedConstruction = mockConstruction(Timer.class, (mock, context) -> {
when(mock.remainingMs()).thenReturn(remainingMs);
});
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
HeartbeatRequestState.class,
(mock, context) -> {
when(mock.timeToNextHeartbeatMs(anyLong())).thenReturn(timeToNextHeartbeatMs);
})
) {
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
final Timer pollTimer = timerMockedConstruction.constructed().get(0);
time.sleep(1234);

final long maximumTimeToWait = heartbeatRequestManager.maximumTimeToWait(time.milliseconds());

assertEquals(5, maximumTimeToWait);
verify(pollTimer).update(time.milliseconds());
}
}

private static ConsumerConfig config() {
Properties prop = new Properties();
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Expand Down