Skip to content

Commit 0a9570c

Browse files
committed
KAFKA-18736: Add pollOnClose() and maximumTimeToWait()
Adds pollOnClose() and maximumTimeToWait() to the Streams group heartbeat request manager.
1 parent 0b4fdc5 commit 0a9570c

File tree

2 files changed

+176
-0
lines changed

2 files changed

+176
-0
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import java.util.stream.Collectors;
5050
import java.util.stream.IntStream;
5151

52+
import static org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult.EMPTY;
53+
5254
/**
5355
* <p>Manages the request creation and response handling for the Streams group heartbeat. The class creates a
5456
* heartbeat request using the state stored in the membership manager. The requests can be retrieved
@@ -374,6 +376,53 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
374376
}
375377
}
376378

379+
/**
380+
* Generate a heartbeat request to leave the group if the state is still LEAVING when this is
381+
* called to close the consumer.
382+
* <p/>
383+
* Note that when closing the consumer, even though an event to Unsubscribe is generated
384+
* (triggers callbacks and sends leave group), it could be the case that the Unsubscribe event
385+
* processing does not complete in time and moves on to close the managers (ex. calls to
386+
* close with zero timeout). So we could end up on this pollOnClose with the member in
387+
* {@link MemberState#PREPARE_LEAVING} (ex. app thread did not have the time to process the
388+
* event to execute callbacks), or {@link MemberState#LEAVING} (ex. the leave request could
389+
* not be sent due to coordinator not available at that time). In all cases, the pollOnClose
390+
* will be triggered right before sending the final requests, so we ensure that we generate
391+
* the request to leave if needed.
392+
*
393+
* @param currentTimeMs The current system time in milliseconds at which the method was called
394+
* @return PollResult containing the request to send
395+
*/
396+
@Override
397+
public NetworkClientDelegate.PollResult pollOnClose(long currentTimeMs) {
398+
if (membershipManager.isLeavingGroup()) {
399+
NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequestAndLogResponse(currentTimeMs);
400+
return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), List.of(request));
401+
}
402+
return EMPTY;
403+
}
404+
405+
/**
406+
* Returns the delay for which the application thread can safely wait before it should be responsive
407+
* to results from the request managers. For example, the subscription state can change when heartbeats
408+
* are sent, so blocking for longer than the heartbeat interval might mean the application thread is not
409+
* responsive to changes.
410+
*
411+
* <p>Similarly, we may have to unblock the application thread to send a `PollApplicationEvent` to make sure
412+
* our poll timer will not expire while we are polling.
413+
*
414+
* <p>In the event that heartbeats are currently being skipped, this still returns the next heartbeat
415+
* delay rather than {@code Long.MAX_VALUE} so that the application thread remains responsive.
416+
*/
417+
@Override
418+
public long maximumTimeToWait(long currentTimeMs) {
419+
pollTimer.update(currentTimeMs);
420+
if (pollTimer.isExpired() || (membershipManager.shouldNotWaitForHeartbeatInterval() && !heartbeatRequestState.requestInFlight())) {
421+
return 0L;
422+
}
423+
return Math.min(pollTimer.remainingMs() / 2, heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
424+
}
425+
377426
/**
378427
* A heartbeat should be sent without waiting for the heartbeat interval to expire if:
379428
* - the member is leaving the group

clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.junit.jupiter.api.extension.ExtendWith;
4646
import org.junit.jupiter.params.ParameterizedTest;
4747
import org.junit.jupiter.params.provider.Arguments;
48+
import org.junit.jupiter.params.provider.CsvSource;
4849
import org.junit.jupiter.params.provider.EnumSource;
4950
import org.junit.jupiter.params.provider.MethodSource;
5051
import org.junit.jupiter.params.provider.ValueSource;
@@ -1382,6 +1383,132 @@ private static Stream<Arguments> provideOtherErrors() {
13821383
.map(Arguments::of);
13831384
}
13841385

1386+
@Test
1387+
public void testPollOnCloseWhenIsNotLeaving() {
1388+
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
1389+
1390+
NetworkClientDelegate.PollResult result = heartbeatRequestManager.pollOnClose(time.milliseconds());
1391+
1392+
assertEquals(NetworkClientDelegate.PollResult.EMPTY, result);
1393+
}
1394+
1395+
@Test
1396+
public void testPollOnCloseWhenIsLeaving() {
1397+
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
1398+
when(membershipManager.isLeavingGroup()).thenReturn(true);
1399+
when(membershipManager.groupId()).thenReturn(GROUP_ID);
1400+
when(membershipManager.memberId()).thenReturn(MEMBER_ID);
1401+
when(membershipManager.memberEpoch()).thenReturn(LEAVE_GROUP_MEMBER_EPOCH);
1402+
1403+
NetworkClientDelegate.PollResult result = heartbeatRequestManager.pollOnClose(time.milliseconds());
1404+
1405+
assertEquals(1, result.unsentRequests.size());
1406+
final NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0);
1407+
StreamsGroupHeartbeatRequest streamsRequest = (StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build();
1408+
assertEquals(GROUP_ID, streamsRequest.data().groupId());
1409+
assertEquals(MEMBER_ID, streamsRequest.data().memberId());
1410+
assertEquals(LEAVE_GROUP_MEMBER_EPOCH, streamsRequest.data().memberEpoch());
1411+
}
1412+
1413+
@Test
1414+
public void testMaximumTimeToWaitPollTimerExpired() {
1415+
try (
1416+
final MockedConstruction<Timer> timerMockedConstruction = mockConstruction(Timer.class, (mock, context) -> {
1417+
when(mock.isExpired()).thenReturn(true);
1418+
});
1419+
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
1420+
HeartbeatRequestState.class,
1421+
(mock, context) -> {
1422+
when(mock.requestInFlight()).thenReturn(false);
1423+
})
1424+
) {
1425+
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
1426+
final Timer pollTimer = timerMockedConstruction.constructed().get(0);
1427+
time.sleep(1234);
1428+
1429+
final long maximumTimeToWait = heartbeatRequestManager.maximumTimeToWait(time.milliseconds());
1430+
1431+
assertEquals(0, maximumTimeToWait);
1432+
verify(pollTimer).update(time.milliseconds());
1433+
}
1434+
}
1435+
1436+
@Test
1437+
public void testMaximumTimeToWaitWhenHeartbeatShouldBeSentImmediately() {
1438+
try (
1439+
final MockedConstruction<Timer> timerMockedConstruction = mockConstruction(Timer.class);
1440+
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
1441+
HeartbeatRequestState.class,
1442+
(mock, context) -> {
1443+
when(mock.requestInFlight()).thenReturn(false);
1444+
})
1445+
) {
1446+
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
1447+
final Timer pollTimer = timerMockedConstruction.constructed().get(0);
1448+
when(membershipManager.shouldNotWaitForHeartbeatInterval()).thenReturn(true);
1449+
time.sleep(1234);
1450+
1451+
final long maximumTimeToWait = heartbeatRequestManager.maximumTimeToWait(time.milliseconds());
1452+
1453+
assertEquals(0, maximumTimeToWait);
1454+
verify(pollTimer).update(time.milliseconds());
1455+
}
1456+
}
1457+
1458+
@ParameterizedTest
1459+
@CsvSource({"true, false", "false, false", "true, true"})
1460+
public void testMaximumTimeToWaitWhenHeartbeatShouldBeNotSentImmediately(final boolean isRequestInFlight,
1461+
final boolean shouldNotWaitForHeartbeatInterval) {
1462+
final long remainingMs = 12L;
1463+
final long timeToNextHeartbeatMs = 6L;
1464+
try (
1465+
final MockedConstruction<Timer> timerMockedConstruction = mockConstruction(Timer.class, (mock, context) -> {
1466+
when(mock.remainingMs()).thenReturn(remainingMs);
1467+
});
1468+
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
1469+
HeartbeatRequestState.class,
1470+
(mock, context) -> {
1471+
when(mock.requestInFlight()).thenReturn(isRequestInFlight);
1472+
when(mock.timeToNextHeartbeatMs(anyLong())).thenReturn(timeToNextHeartbeatMs);
1473+
})
1474+
) {
1475+
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
1476+
final Timer pollTimer = timerMockedConstruction.constructed().get(0);
1477+
when(membershipManager.shouldNotWaitForHeartbeatInterval()).thenReturn(shouldNotWaitForHeartbeatInterval);
1478+
time.sleep(1234);
1479+
1480+
final long maximumTimeToWait = heartbeatRequestManager.maximumTimeToWait(time.milliseconds());
1481+
1482+
assertEquals(timeToNextHeartbeatMs, maximumTimeToWait);
1483+
verify(pollTimer).update(time.milliseconds());
1484+
}
1485+
}
1486+
1487+
@ParameterizedTest
1488+
@CsvSource({"12, 5", "10, 6"})
1489+
public void testMaximumTimeToWaitSelectingMinimumWaitTime(final long remainingMs,
1490+
final long timeToNextHeartbeatMs) {
1491+
try (
1492+
final MockedConstruction<Timer> timerMockedConstruction = mockConstruction(Timer.class, (mock, context) -> {
1493+
when(mock.remainingMs()).thenReturn(remainingMs);
1494+
});
1495+
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
1496+
HeartbeatRequestState.class,
1497+
(mock, context) -> {
1498+
when(mock.timeToNextHeartbeatMs(anyLong())).thenReturn(timeToNextHeartbeatMs);
1499+
})
1500+
) {
1501+
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
1502+
final Timer pollTimer = timerMockedConstruction.constructed().get(0);
1503+
time.sleep(1234);
1504+
1505+
final long maximumTimeToWait = heartbeatRequestManager.maximumTimeToWait(time.milliseconds());
1506+
1507+
assertEquals(5, maximumTimeToWait);
1508+
verify(pollTimer).update(time.milliseconds());
1509+
}
1510+
}
1511+
13851512
private static ConsumerConfig config() {
13861513
Properties prop = new Properties();
13871514
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

0 commit comments

Comments
 (0)