Skip to content

Commit b0eaf13

Browse files
committed
include feedback
1 parent 6b66457 commit b0eaf13

File tree

2 files changed

+143
-21
lines changed

2 files changed

+143
-21
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java

+8-18
Original file line numberDiff line numberDiff line change
@@ -90,21 +90,19 @@ public LocalAssignment(final long localEpoch,
9090
this.standbyTasks = standbyTasks;
9191
this.warmupTasks = warmupTasks;
9292
if (localEpoch == NONE_EPOCH &&
93-
(!activeTasks.isEmpty() || !standbyTasks.isEmpty() || !warmupTasks.isEmpty())) {
93+
(!activeTasks.isEmpty() || !standbyTasks.isEmpty() || !warmupTasks.isEmpty())) {
9494
throw new IllegalArgumentException("Local epoch must be set if tasks are assigned.");
9595
}
9696
}
9797

9898
Optional<LocalAssignment> updateWith(final Map<String, SortedSet<Integer>> activeTasks,
9999
final Map<String, SortedSet<Integer>> standbyTasks,
100100
final Map<String, SortedSet<Integer>> warmupTasks) {
101-
if (localEpoch != NONE_EPOCH) {
102-
if (activeTasks.equals(this.activeTasks) &&
101+
if (localEpoch != NONE_EPOCH &&
102+
activeTasks.equals(this.activeTasks) &&
103103
standbyTasks.equals(this.standbyTasks) &&
104104
warmupTasks.equals(this.warmupTasks)) {
105-
106-
return Optional.empty();
107-
}
105+
return Optional.empty();
108106
}
109107

110108
long nextLocalEpoch = localEpoch + 1;
@@ -230,12 +228,10 @@ public MemberState state() {
230228
}
231229

232230
public boolean isLeavingGroup() {
233-
MemberState state = state();
234231
return state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING;
235232
}
236233

237234
private boolean isNotInGroup() {
238-
MemberState state = state();
239235
return state == MemberState.UNSUBSCRIBED ||
240236
state == MemberState.FENCED ||
241237
state == MemberState.FATAL ||
@@ -280,7 +276,7 @@ private void transitionToSendingLeaveGroup(boolean dueToExpiredPollTimer) {
280276
isPollTimerExpired = true;
281277
// Briefly transition through prepare leaving. The member does not have to release
282278
// any assignment before sending the leave group given that is stale. It will invoke
283-
// onTaskAssignment with empty assignment after sending the leave group on the STALE state.
279+
// onAllTasksLost after sending the leave group on the STALE state.
284280
transitionTo(MemberState.PREPARE_LEAVING);
285281
}
286282
finalizeLeaving();
@@ -317,7 +313,7 @@ public void transitionToFatal() {
317313

318314
if (previousState == MemberState.UNSUBSCRIBED) {
319315
log.debug("Member {} with epoch {} got fatal error from the broker but it already " +
320-
"left the group, so onTaskAssignment callback won't be triggered.", memberId, memberEpoch);
316+
"left the group, so onAllTasksLost callback won't be triggered.", memberId, memberEpoch);
321317
return;
322318
}
323319

@@ -332,7 +328,7 @@ public void transitionToFatal() {
332328
CompletableFuture<Void> onAllTasksLostCallbackExecuted = streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation();
333329
onAllTasksLostCallbackExecuted.whenComplete((result, error) -> {
334330
if (error != null) {
335-
log.error("onTaskAssignment callback invocation failed while releasing assignment" +
331+
log.error("onAllTasksLost callback invocation failed while releasing assignment " +
336332
"after member failed with fatal error.", error);
337333
}
338334
clearTaskAndPartitionAssignment();
@@ -406,7 +402,6 @@ public boolean shouldSkipHeartbeat() {
406402
}
407403

408404
public boolean shouldHeartbeatNow() {
409-
MemberState state = state();
410405
return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING || state == MemberState.JOINING;
411406
}
412407

@@ -421,7 +416,6 @@ public void onConsumerPoll() {
421416
}
422417

423418
public void onHeartbeatRequestGenerated() {
424-
MemberState state = state();
425419
if (state == MemberState.ACKNOWLEDGING) {
426420
if (targetAssignmentReconciled()) {
427421
transitionTo(MemberState.STABLE);
@@ -857,11 +851,7 @@ private CompletableFuture<Void> assignTasks(final SortedSet<StreamsRebalanceData
857851
final SortedSet<StreamsRebalanceData.TaskId> ownedActiveTasks,
858852
final SortedSet<StreamsRebalanceData.TaskId> standbyTasksToAssign,
859853
final SortedSet<StreamsRebalanceData.TaskId> warmupTasksToAssign) {
860-
log.info("Assigning " +
861-
(activeTasksToAssign.isEmpty() ? "no active tasks, " : "active tasks {}, ") +
862-
(standbyTasksToAssign.isEmpty() ? "no standby tasks, " : "standby tasks {}, and ") +
863-
(warmupTasksToAssign.isEmpty() ? "no warm-up tasks. " : "warm-up tasks {}.") +
864-
"to the member.",
854+
log.info("Assigning active tasks {{}}, standby tasks {{}}, and warm-up tasks {{}} to the member.",
865855
activeTasksToAssign.stream()
866856
.map(StreamsRebalanceData.TaskId::toString)
867857
.collect(Collectors.joining(", ")),

clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java

+135-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent;
2020
import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackCompletedEvent;
21+
import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackCompletedEvent;
2122
import org.apache.kafka.common.KafkaException;
23+
import org.apache.kafka.common.MetricName;
2224
import org.apache.kafka.common.TopicPartition;
2325
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
2426
import org.apache.kafka.common.metrics.Metrics;
@@ -41,10 +43,15 @@
4143
import java.util.List;
4244
import java.util.Optional;
4345
import java.util.Set;
46+
import java.util.SortedSet;
47+
import java.util.TreeSet;
4448
import java.util.concurrent.CompletableFuture;
4549
import java.util.concurrent.ExecutionException;
4650
import java.util.function.Supplier;
51+
import java.util.stream.Collectors;
4752

53+
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
54+
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;
4855
import static org.apache.kafka.common.utils.Utils.mkEntry;
4956
import static org.apache.kafka.common.utils.Utils.mkMap;
5057
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -73,8 +80,8 @@ public class StreamsMembershipManagerTest {
7380
private static final int PARTITION_0 = 0;
7481
private static final int PARTITION_1 = 1;
7582

76-
private Time time = new MockTime(0);
77-
private Metrics metrics = new Metrics(time);
83+
private final Time time = new MockTime(0);
84+
private final Metrics metrics = new Metrics(time);
7885

7986
private StreamsMembershipManager membershipManager;
8087

@@ -1075,6 +1082,54 @@ public void testOnHeartbeatRequestGeneratedWhenInLeavingAndPollTimerExpired() {
10751082
verifyInStateStale(membershipManager);
10761083
}
10771084

1085+
@Test
1086+
public void testOnHeartbeatFailureAfterLeaveRequestGenerated() {
1087+
final CompletableFuture<Void> onAllTasksLostCallbackExecuted = new CompletableFuture<>();
1088+
when(streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation())
1089+
.thenReturn(onAllTasksLostCallbackExecuted);
1090+
joining();
1091+
final CompletableFuture<Void> groupLeft = leaving(onAllTasksLostCallbackExecuted);
1092+
membershipManager.onHeartbeatRequestGenerated();
1093+
assertFalse(groupLeft.isDone());
1094+
1095+
membershipManager.onHeartbeatFailure(true);
1096+
1097+
assertTrue(groupLeft.isDone());
1098+
}
1099+
1100+
@Test
1101+
public void testOnHeartbeatFatalFailure() {
1102+
testOnHeartbeatFailure(false);
1103+
}
1104+
1105+
@Test
1106+
public void testOnHeartbeatRetriableFailure() {
1107+
testOnHeartbeatFailure(true);
1108+
}
1109+
1110+
private void testOnHeartbeatFailure(boolean retriable) {
1111+
final MetricName failedRebalanceTotalMetricName = metrics.metricName(
1112+
"failed-rebalance-total",
1113+
CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX
1114+
);
1115+
setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUB_TOPOLOGY_ID_0, TOPIC_0);
1116+
final Set<StreamsRebalanceData.TaskId> activeTasks =
1117+
Set.of(new StreamsRebalanceData.TaskId(SUB_TOPOLOGY_ID_0, PARTITION_0));
1118+
final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new CompletableFuture<>();
1119+
when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())))
1120+
.thenReturn(onTasksAssignedCallbackExecuted);
1121+
joining();
1122+
time.sleep(1);
1123+
reconcile(makeHeartbeatResponseWithActiveTasks(SUB_TOPOLOGY_ID_0, List.of(PARTITION_0)));
1124+
final double failedRebalancesTotalBefore = (double) metrics.metric(failedRebalanceTotalMetricName).metricValue();
1125+
assertEquals(0L, failedRebalancesTotalBefore);
1126+
1127+
membershipManager.onHeartbeatFailure(retriable);
1128+
1129+
final double failedRebalancesTotalAfter = (double) metrics.metric(failedRebalanceTotalMetricName).metricValue();
1130+
assertEquals(retriable ? 0L : 1L, failedRebalancesTotalAfter);
1131+
}
1132+
10781133
@Test
10791134
public void testOnFencedWhenInJoining() {
10801135
joining();
@@ -1292,6 +1347,65 @@ public void testOnTasksAssignedCallbackCompleted() {
12921347
assertFalse(future.isCompletedExceptionally());
12931348
}
12941349

1350+
@Test
1351+
public void testOnTasksAssignedCallbackCompletedWhenCallbackFails() {
1352+
final String errorMessage = "KABOOM!";
1353+
final CompletableFuture<Void> future = new CompletableFuture<>();
1354+
final StreamsOnTasksAssignedCallbackCompletedEvent event = new StreamsOnTasksAssignedCallbackCompletedEvent(
1355+
future,
1356+
Optional.of(new KafkaException(errorMessage))
1357+
);
1358+
1359+
membershipManager.onTasksAssignedCallbackCompleted(event);
1360+
1361+
assertTrue(future.isDone());
1362+
assertFalse(future.isCancelled());
1363+
assertTrue(future.isCompletedExceptionally());
1364+
final ExecutionException executionException = assertThrows(ExecutionException.class, future::get);
1365+
assertInstanceOf(KafkaException.class, executionException.getCause());
1366+
assertEquals(errorMessage, executionException.getCause().getMessage());
1367+
1368+
final SortedSet<StreamsRebalanceData.TaskId> activeTasksToAssign = new TreeSet<>();
1369+
activeTasksToAssign.add(new StreamsRebalanceData.TaskId(SUB_TOPOLOGY_ID_0, PARTITION_0));
1370+
System.out.println(activeTasksToAssign.stream()
1371+
.map(StreamsRebalanceData.TaskId::toString)
1372+
.collect(Collectors.joining(", ")));
1373+
}
1374+
1375+
@Test
1376+
public void testOnTasksRevokedCallbackCompleted() {
1377+
final CompletableFuture<Void> future = new CompletableFuture<>();
1378+
final StreamsOnTasksRevokedCallbackCompletedEvent event = new StreamsOnTasksRevokedCallbackCompletedEvent(
1379+
future,
1380+
Optional.empty()
1381+
);
1382+
1383+
membershipManager.onTasksRevokedCallbackCompleted(event);
1384+
1385+
assertTrue(future.isDone());
1386+
assertFalse(future.isCancelled());
1387+
assertFalse(future.isCompletedExceptionally());
1388+
}
1389+
1390+
@Test
1391+
public void testOnTasksRevokedCallbackCompletedWhenCallbackFails() {
1392+
final String errorMessage = "KABOOM!";
1393+
final CompletableFuture<Void> future = new CompletableFuture<>();
1394+
final StreamsOnTasksRevokedCallbackCompletedEvent event = new StreamsOnTasksRevokedCallbackCompletedEvent(
1395+
future,
1396+
Optional.of(new KafkaException(errorMessage))
1397+
);
1398+
1399+
membershipManager.onTasksRevokedCallbackCompleted(event);
1400+
1401+
assertTrue(future.isDone());
1402+
assertFalse(future.isCancelled());
1403+
assertTrue(future.isCompletedExceptionally());
1404+
final ExecutionException executionException = assertThrows(ExecutionException.class, future::get);
1405+
assertInstanceOf(KafkaException.class, executionException.getCause());
1406+
assertEquals(errorMessage, executionException.getCause().getMessage());
1407+
}
1408+
12951409
@Test
12961410
public void testOnAllTasksLostCallbackCompleted() {
12971411
final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -1308,7 +1422,7 @@ public void testOnAllTasksLostCallbackCompleted() {
13081422
}
13091423

13101424
@Test
1311-
public void testOnTasksAssignedCallbackCompletedWhenCallbackFails() {
1425+
public void testOnAllTasksLostCallbackCompletedWhenCallbackFails() {
13121426
final String errorMessage = "KABOOM!";
13131427
final CompletableFuture<Void> future = new CompletableFuture<>();
13141428
final StreamsOnAllTasksLostCallbackCompletedEvent event = new StreamsOnAllTasksLostCallbackCompletedEvent(
@@ -1326,6 +1440,24 @@ public void testOnTasksAssignedCallbackCompletedWhenCallbackFails() {
13261440
assertEquals(errorMessage, executionException.getCause().getMessage());
13271441
}
13281442

1443+
@Test
1444+
public void testMaybeRejoinStaleMember() {
1445+
final CompletableFuture<Void> onAllTasksLostCallbackExecuted = new CompletableFuture<>();
1446+
when(streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation())
1447+
.thenReturn(onAllTasksLostCallbackExecuted);
1448+
joining();
1449+
membershipManager.onPollTimerExpired();
1450+
membershipManager.onHeartbeatRequestGenerated();
1451+
verifyInStateStale(membershipManager);
1452+
1453+
membershipManager.maybeRejoinStaleMember();
1454+
1455+
verifyInStateStale(membershipManager);
1456+
onAllTasksLostCallbackExecuted.complete(null);
1457+
verifyInStateJoining(membershipManager);
1458+
assertEquals(StreamsGroupHeartbeatRequest.JOIN_GROUP_MEMBER_EPOCH, membershipManager.memberEpoch());
1459+
}
1460+
13291461
private void verifyThatNoTasksHaveBeenRevoked() {
13301462
verify(streamsRebalanceEventsProcessor, never()).requestOnTasksRevokedCallbackInvocation(any());
13311463
verify(subscriptionState, never()).markPendingRevocation(any());

0 commit comments

Comments
 (0)