Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 3fde353

Browse files
committedFeb 3, 2025·
Port recent changes from abstract membership manager
1 parent b49cb76 commit 3fde353

File tree

2 files changed

+183
-76
lines changed

2 files changed

+183
-76
lines changed
 

‎clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java

+52-14
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.Map;
4141
import java.util.Objects;
4242
import java.util.Optional;
43+
import java.util.Set;
4344
import java.util.SortedSet;
4445
import java.util.TreeSet;
4546
import java.util.concurrent.CompletableFuture;
@@ -48,7 +49,7 @@
4849
import java.util.stream.Stream;
4950

5051
/**
51-
* Tracks state the state of a single member in relationship to a group:
52+
* Tracks the state of a single member in relationship to a group:
5253
* <p/>
5354
* Responsible for:
5455
* <ul>
@@ -161,7 +162,7 @@ public int hashCode() {
161162
private final SubscriptionState subscriptionState;
162163

163164
/**
164-
* Current state of this member as part of the consumer group, as defined in {@link MemberState}
165+
* Current state of this member as part of the consumer group, as defined in {@link MemberState}.
165166
*/
166167
private MemberState state;
167168

@@ -372,6 +373,15 @@ private void notifyEpochChange(Optional<Integer> epoch) {
372373
stateUpdatesListeners.forEach(stateListener -> stateListener.onMemberEpochUpdated(epoch, memberId));
373374
}
374375

376+
/**
377+
* Invokes the {@link MemberStateListener#onGroupAssignmentUpdated(java.util.Set)} callback for each listener when the
378+
* set of assigned partitions changes. This includes on assignment changes, unsubscribe, and when leaving
379+
* the group.
380+
*/
381+
void notifyAssignmentChange(Set<TopicPartition> partitions) {
382+
stateUpdatesListeners.forEach(stateListener -> stateListener.onGroupAssignmentUpdated(partitions));
383+
}
384+
375385
/**
376386
* Transition to the {@link MemberState#JOINING} state, indicating that the member will
377387
* try to join the group on the next heartbeat request. This is expected to be invoked when
@@ -564,6 +574,7 @@ private void clearCurrentTaskAssignment() {
564574
*/
565575
private void clearTaskAndPartitionAssignment() {
566576
subscriptionState.assignFromSubscribed(Collections.emptySet());
577+
notifyAssignmentChange(Collections.emptySet());
567578
currentAssignment = LocalAssignment.NONE;
568579
targetAssignment = LocalAssignment.NONE;
569580
}
@@ -864,6 +875,7 @@ private CompletableFuture<Void> leaveGroup(final boolean isOnClose) {
864875
transitionTo(MemberState.UNSUBSCRIBED);
865876
}
866877
subscriptionState.unsubscribe();
878+
notifyAssignmentChange(Collections.emptySet());
867879
return CompletableFuture.completedFuture(null);
868880
}
869881

@@ -1032,9 +1044,9 @@ private void maybeReconcile() {
10321044
SortedSet<TopicPartition> partitionsToRevoke = new TreeSet<>(ownedTopicPartitionsFromSubscriptionState);
10331045
partitionsToRevoke.removeAll(assignedTopicPartitions);
10341046

1035-
final CompletableFuture<Void> onTasksRevokedCallbackExecuted = revokeActiveTasks(activeTasksToRevoke);
1047+
final CompletableFuture<Void> tasksRevoked = revokeActiveTasks(activeTasksToRevoke);
10361048

1037-
final CompletableFuture<Void> onTasksRevokedAndAssignedCallbacksExecuted = onTasksRevokedCallbackExecuted.thenCompose(__ -> {
1049+
final CompletableFuture<Void> tasksRevokedAndAssigned = tasksRevoked.thenCompose(__ -> {
10381050
if (!maybeAbortReconciliation()) {
10391051
return assignTasks(assignedActiveTasks, ownedActiveTasks, assignedStandbyTasks, assignedWarmupTasks);
10401052
}
@@ -1044,14 +1056,13 @@ private void maybeReconcile() {
10441056
// The current target assignment is captured to ensure that acknowledging the current assignment is done with
10451057
// the same target assignment that was used when this reconciliation was initiated.
10461058
LocalAssignment currentTargetAssignment = targetAssignment;
1047-
onTasksRevokedAndAssignedCallbacksExecuted.whenComplete((__, callbackError) -> {
1059+
tasksRevokedAndAssigned.whenComplete((__, callbackError) -> {
10481060
if (callbackError != null) {
10491061
log.error("Reconciliation failed: callback invocation failed for tasks {}",
10501062
currentTargetAssignment, callbackError);
10511063
markReconciliationCompleted();
10521064
} else {
10531065
if (reconciliationInProgress && !maybeAbortReconciliation()) {
1054-
subscriptionState.enablePartitionsAwaitingCallback(assignedTopicPartitionsNotPreviouslyOwned);
10551066
currentAssignment = currentTargetAssignment;
10561067
transitionTo(MemberState.ACKNOWLEDGING);
10571068
markReconciliationCompleted();
@@ -1073,7 +1084,19 @@ private CompletableFuture<Void> revokeActiveTasks(final SortedSet<StreamsRebalan
10731084
log.debug("Marking partitions pending for revocation: {}", partitionsToRevoke);
10741085
subscriptionState.markPendingRevocation(partitionsToRevoke);
10751086

1076-
return streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasksToRevoke);
1087+
CompletableFuture<Void> tasksRevoked = new CompletableFuture<>();
1088+
CompletableFuture<Void> onTasksRevokedCallbackExecuted =
1089+
streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasksToRevoke);
1090+
onTasksRevokedCallbackExecuted.whenComplete((__, callbackError) -> {
1091+
if (callbackError != null) {
1092+
log.error("onTasksRevoked callback invocation failed for tasks {}",
1093+
activeTasksToRevoke, callbackError);
1094+
tasksRevoked.completeExceptionally(callbackError);
1095+
} else {
1096+
tasksRevoked.complete(null);
1097+
}
1098+
});
1099+
return tasksRevoked;
10771100
}
10781101

10791102
private CompletableFuture<Void> assignTasks(final SortedSet<StreamsRebalanceData.TaskId> activeTasksToAssign,
@@ -1100,14 +1123,29 @@ private CompletableFuture<Void> assignTasks(final SortedSet<StreamsRebalanceData
11001123
partitionsToAssign,
11011124
partitionsToAssigneNotPreviouslyOwned
11021125
);
1126+
notifyAssignmentChange(partitionsToAssign);
1127+
1128+
CompletableFuture<Void> onTasksAssignedCallbackExecuted =
1129+
streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
1130+
new StreamsRebalanceData.Assignment(
1131+
activeTasksToAssign,
1132+
standbyTasksToAssign,
1133+
warmupTasksToAssign
1134+
)
1135+
);
1136+
onTasksAssignedCallbackExecuted.whenComplete((__, callbackError) -> {
1137+
if (callbackError == null) {
1138+
subscriptionState.enablePartitionsAwaitingCallback(partitionsToAssign);
1139+
} else {
1140+
if (!partitionsToAssigneNotPreviouslyOwned.isEmpty()) {
1141+
log.warn("Leaving newly assigned partitions {} marked as non-fetchable and not " +
1142+
"requiring initializing positions after onTasksAssigned callback failed.",
1143+
partitionsToAssigneNotPreviouslyOwned, callbackError);
1144+
}
1145+
}
1146+
});
11031147

1104-
return streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
1105-
new StreamsRebalanceData.Assignment(
1106-
activeTasksToAssign,
1107-
standbyTasksToAssign,
1108-
warmupTasksToAssign
1109-
)
1110-
);
1148+
return onTasksAssignedCallbackExecuted;
11111149
}
11121150

11131151
private CompletableFuture<Void> releaseLostActiveTasks() {

‎clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java

+131-62
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import static org.junit.jupiter.api.Assertions.assertThrows;
6161
import static org.junit.jupiter.api.Assertions.assertTrue;
6262
import static org.mockito.ArgumentMatchers.any;
63+
import static org.mockito.ArgumentMatchers.argThat;
6364
import static org.mockito.Mockito.never;
6465
import static org.mockito.Mockito.times;
6566
import static org.mockito.Mockito.verify;
@@ -185,8 +186,8 @@ public void testReconcilingEmptyToSingleActiveTask() {
185186

186187
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
187188

188-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
189-
final Collection<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
189+
final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
190+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
190191
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
191192
onTasksAssignedCallbackExecuted.complete(null);
192193
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
@@ -221,8 +222,8 @@ public void testReconcilingActiveTaskToDifferentActiveTask() {
221222
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
222223

223224
final Set<TopicPartition> expectedPartitionsToRevoke = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
224-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
225-
final Collection<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
225+
final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
226+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
226227
verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
227228
expectedPartitionsToRevoke,
228229
expectedFullPartitionsToAssign,
@@ -231,7 +232,7 @@ public void testReconcilingActiveTaskToDifferentActiveTask() {
231232
onTasksRevokedCallbackExecuted.complete(null);
232233
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
233234
onTasksAssignedCallbackExecuted.complete(null);
234-
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
235+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
235236
}
236237

237238
@Test
@@ -259,14 +260,14 @@ public void testReconcilingSingleActiveTaskToAdditionalActiveTask() {
259260

260261
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0, PARTITION_1)));
261262

262-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Set.of(
263+
final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(
263264
new TopicPartition(TOPIC_0, PARTITION_0),
264265
new TopicPartition(TOPIC_0, PARTITION_1)
265266
);
266-
final Collection<TopicPartition> expectedNewPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
267+
final Set<TopicPartition> expectedNewPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
267268
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
268269
onTasksAssignedCallbackExecuted.complete(null);
269-
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
270+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
270271
verifyThatNoTasksHaveBeenRevoked();
271272
}
272273

@@ -302,8 +303,8 @@ public void testReconcilingMultipleActiveTaskToSingleActiveTask() {
302303
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
303304

304305
final Set<TopicPartition> expectedPartitionsToRevoke = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
305-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
306-
final Collection<TopicPartition> expectedNewPartitionsToAssign = Collections.emptySet();
306+
final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
307+
final Set<TopicPartition> expectedNewPartitionsToAssign = Collections.emptySet();
307308
verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
308309
expectedPartitionsToRevoke,
309310
expectedFullPartitionsToAssign,
@@ -312,7 +313,7 @@ public void testReconcilingMultipleActiveTaskToSingleActiveTask() {
312313
onTasksRevokedCallbackExecuted.complete(null);
313314
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
314315
onTasksAssignedCallbackExecuted.complete(null);
315-
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
316+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
316317
}
317318

318319
@Test
@@ -338,14 +339,14 @@ public void testReconcilingEmptyToMultipleActiveTaskOfDifferentSubtopologies() {
338339
SUBTOPOLOGY_ID_1, List.of(PARTITION_0))
339340
);
340341

341-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Set.of(
342+
final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(
342343
new TopicPartition(TOPIC_0, PARTITION_0),
343344
new TopicPartition(TOPIC_1, PARTITION_0)
344345
);
345-
final Collection<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
346+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
346347
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
347348
onTasksAssignedCallbackExecuted.complete(null);
348-
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
349+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
349350
verifyThatNoTasksHaveBeenRevoked();
350351
}
351352

@@ -382,8 +383,8 @@ public void testReconcilingActiveTaskToStandbyTask() {
382383
reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
383384

384385
final Set<TopicPartition> expectedPartitionsToRevoke = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
385-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
386-
final Collection<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
386+
final Set<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
387+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
387388
verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
388389
expectedPartitionsToRevoke,
389390
expectedFullPartitionsToAssign,
@@ -392,7 +393,7 @@ public void testReconcilingActiveTaskToStandbyTask() {
392393
onTasksRevokedCallbackExecuted.complete(null);
393394
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
394395
onTasksAssignedCallbackExecuted.complete(null);
395-
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
396+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
396397
}
397398

398399
@Test
@@ -428,8 +429,8 @@ public void testReconcilingActiveTaskToWarmupTask() {
428429
reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
429430

430431
final Set<TopicPartition> expectedPartitionsToRevoke = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
431-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
432-
final Collection<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
432+
final Set<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
433+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
433434
verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
434435
expectedPartitionsToRevoke,
435436
expectedFullPartitionsToAssign,
@@ -438,7 +439,7 @@ public void testReconcilingActiveTaskToWarmupTask() {
438439
onTasksRevokedCallbackExecuted.complete(null);
439440
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
440441
onTasksAssignedCallbackExecuted.complete(null);
441-
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
442+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
442443
}
443444

444445
@Test
@@ -455,11 +456,11 @@ public void testReconcilingEmptyToSingleStandbyTask() {
455456

456457
reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
457458

458-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
459-
final Collection<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
459+
final Set<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
460+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
460461
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
461462
onTasksAssignedCallbackExecuted.complete(null);
462-
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
463+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
463464
verifyThatNoTasksHaveBeenRevoked();
464465
}
465466

@@ -487,14 +488,15 @@ public void testReconcilingStandbyTaskToDifferentStandbyTask() {
487488
reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
488489
acknowledging(onTasksAssignedCallbackExecutedSetup);
489490
Mockito.reset(subscriptionState);
491+
Mockito.reset(memberStateListener);
490492

491493
reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
492494

493-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
494-
final Collection<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
495+
final Set<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
496+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
495497
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
496498
onTasksAssignedCallbackExecuted.complete(null);
497-
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
499+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
498500
verifyThatNoTasksHaveBeenRevoked();
499501
}
500502

@@ -523,14 +525,15 @@ public void testReconcilingSingleStandbyTaskToAdditionalStandbyTask() {
523525
reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
524526
acknowledging(onTasksAssignedCallbackExecutedSetup);
525527
Mockito.reset(subscriptionState);
528+
Mockito.reset(memberStateListener);
526529

527530
reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0, PARTITION_1)));
528531

529-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
530-
final Collection<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
532+
final Set<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
533+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
531534
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
532535
onTasksAssignedCallbackExecuted.complete(null);
533-
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
536+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
534537
verifyThatNoTasksHaveBeenRevoked();
535538
}
536539

@@ -559,14 +562,15 @@ public void testReconcilingMultipleStandbyTaskToSingleStandbyTask() {
559562
reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0, PARTITION_1)));
560563
acknowledging(onTasksAssignedCallbackExecutedSetup);
561564
Mockito.reset(subscriptionState);
565+
Mockito.reset(memberStateListener);
562566

563567
reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
564568

565-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
566-
final Collection<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
569+
final Set<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
570+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
567571
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
568572
onTasksAssignedCallbackExecuted.complete(null);
569-
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
573+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
570574
verifyThatNoTasksHaveBeenRevoked();
571575
}
572576

@@ -601,11 +605,11 @@ public void testReconcilingStandbyTaskToActiveTask() {
601605

602606
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
603607

604-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
605-
final Collection<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
608+
final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
609+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
606610
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
607611
onTasksAssignedCallbackExecuted.complete(null);
608-
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
612+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
609613
verifyThatNoTasksHaveBeenRevoked();
610614
}
611615

@@ -631,14 +635,15 @@ public void testReconcilingStandbyTaskToWarmupTask() {
631635
reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
632636
acknowledging(onTasksAssignedCallbackExecutedSetup);
633637
Mockito.reset(subscriptionState);
638+
Mockito.reset(memberStateListener);
634639

635640
reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
636641

637-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
638-
final Collection<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
642+
final Set<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
643+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
639644
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
640645
onTasksAssignedCallbackExecuted.complete(null);
641-
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
646+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
642647
verifyThatNoTasksHaveBeenRevoked();
643648
}
644649

@@ -656,11 +661,11 @@ public void testReconcilingEmptyToSingleWarmupTask() {
656661

657662
reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
658663

659-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
660-
final Collection<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
664+
final Set<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
665+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
661666
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
662667
onTasksAssignedCallbackExecuted.complete(null);
663-
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
668+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
664669
verifyThatNoTasksHaveBeenRevoked();
665670
}
666671

@@ -688,14 +693,15 @@ public void testReconcilingWarmupTaskToDifferentWarmupTask() {
688693
reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
689694
acknowledging(onTasksAssignedCallbackExecutedSetup);
690695
Mockito.reset(subscriptionState);
696+
Mockito.reset(memberStateListener);
691697

692698
reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
693699

694-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
695-
final Collection<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
700+
final Set<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
701+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
696702
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
697703
onTasksAssignedCallbackExecuted.complete(null);
698-
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
704+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
699705
verifyThatNoTasksHaveBeenRevoked();
700706
}
701707

@@ -724,14 +730,15 @@ public void testReconcilingSingleWarmupTaskToAdditionalWarmupTask() {
724730
reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
725731
acknowledging(onTasksAssignedCallbackExecutedSetup);
726732
Mockito.reset(subscriptionState);
733+
Mockito.reset(memberStateListener);
727734

728735
reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0, PARTITION_1)));
729736

730-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
731-
final Collection<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
737+
final Set<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
738+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
732739
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
733740
onTasksAssignedCallbackExecuted.complete(null);
734-
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
741+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
735742
verifyThatNoTasksHaveBeenRevoked();
736743
}
737744

@@ -760,14 +767,15 @@ public void testReconcilingMultipleWarmupTaskToSingleWarmupTask() {
760767
reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0, PARTITION_1)));
761768
acknowledging(onTasksAssignedCallbackExecutedSetup);
762769
Mockito.reset(subscriptionState);
770+
Mockito.reset(memberStateListener);
763771

764772
reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
765773

766-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
767-
final Collection<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
774+
final Set<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
775+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
768776
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
769777
onTasksAssignedCallbackExecuted.complete(null);
770-
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
778+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
771779
verifyThatNoTasksHaveBeenRevoked();
772780
}
773781

@@ -802,11 +810,11 @@ public void testReconcilingWarmupTaskToActiveTask() {
802810

803811
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
804812

805-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
806-
final Collection<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
813+
final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
814+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
807815
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
808816
onTasksAssignedCallbackExecuted.complete(null);
809-
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
817+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
810818
verifyThatNoTasksHaveBeenRevoked();
811819
}
812820

@@ -834,14 +842,15 @@ public void testReconcilingWarmupTaskToStandbyTask() {
834842
reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
835843
acknowledging(onTasksAssignedCallbackExecutedSetup);
836844
Mockito.reset(subscriptionState);
845+
Mockito.reset(memberStateListener);
837846

838847
reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
839848

840-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
841-
final Collection<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
849+
final Set<TopicPartition> expectedFullPartitionsToAssign = Collections.emptySet();
850+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
842851
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
843852
onTasksAssignedCallbackExecuted.complete(null);
844-
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedNewPartitionsToAssign);
853+
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
845854
verifyThatNoTasksHaveBeenRevoked();
846855
}
847856

@@ -857,8 +866,8 @@ public void testReconcilingAndAssignmentCallbackFails() {
857866

858867
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
859868

860-
final Collection<TopicPartition> expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
861-
final Collection<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
869+
final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
870+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
862871
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
863872

864873
onTasksAssignedCallbackExecuted.completeExceptionally(new RuntimeException("KABOOM!"));
@@ -867,6 +876,54 @@ public void testReconcilingAndAssignmentCallbackFails() {
867876
verify(subscriptionState, never()).enablePartitionsAwaitingCallback(any());
868877
}
869878

879+
@Test
880+
public void testReconcilingAndRevocationCallbackFails() {
881+
setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
882+
final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
883+
final CompletableFuture<Void> onTasksRevokedCallbackExecuted = new CompletableFuture<>();
884+
final Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(
885+
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
886+
);
887+
final Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(
888+
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
889+
);
890+
when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup, Collections.emptySet(), Collections.emptySet())))
891+
.thenReturn(onTasksAssignedCallbackExecutedSetup);
892+
when(streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
893+
.thenReturn(onTasksRevokedCallbackExecuted);
894+
when(subscriptionState.assignedPartitions())
895+
.thenReturn(Collections.emptySet())
896+
.thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
897+
joining();
898+
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
899+
acknowledging(onTasksAssignedCallbackExecutedSetup);
900+
901+
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
902+
903+
final Set<TopicPartition> partitionsToAssignAtSetup = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
904+
final Set<TopicPartition> expectedPartitionsToRevoke = partitionsToAssignAtSetup;
905+
final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
906+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
907+
verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
908+
expectedPartitionsToRevoke,
909+
expectedFullPartitionsToAssign,
910+
expectedNewPartitionsToAssign
911+
);
912+
913+
onTasksRevokedCallbackExecuted.completeExceptionally(new RuntimeException("KABOOM!"));
914+
915+
verify(subscriptionState).markPendingRevocation(expectedPartitionsToRevoke);
916+
verify(subscriptionState, never()).assignFromSubscribedAwaitingCallback(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
917+
verify(memberStateListener, never()).onGroupAssignmentUpdated(expectedFullPartitionsToAssign);
918+
verify(subscriptionState, never())
919+
.enablePartitionsAwaitingCallback(argThat(a -> !a.equals(partitionsToAssignAtSetup)));
920+
verifyInStateReconciling(membershipManager);
921+
verify(streamsRebalanceEventsProcessor, never()).requestOnTasksAssignedCallbackInvocation(
922+
makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())
923+
);
924+
verifyInStateReconciling(membershipManager);
925+
}
926+
870927
@Test
871928
public void testLeaveGroupWhenNotInGroup() {
872929
testLeaveGroupWhenNotInGroup(membershipManager::leaveGroup);
@@ -885,6 +942,7 @@ private void testLeaveGroupWhenNotInGroup(final Supplier<CompletableFuture<Void>
885942
assertFalse(future.isCancelled());
886943
assertFalse(future.isCompletedExceptionally());
887944
verify(subscriptionState).unsubscribe();
945+
verify(memberStateListener).onGroupAssignmentUpdated(Collections.emptySet());
888946
verifyInStateUnsubscribed(membershipManager);
889947
}
890948

@@ -912,6 +970,7 @@ private void testLeaveGroupOnCloseWhenNotInGroupAndFenced(final Supplier<Complet
912970
assertFalse(future.isCompletedExceptionally());
913971
verify(subscriptionState).unsubscribe();
914972
verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
973+
verify(memberStateListener, times(2)).onGroupAssignmentUpdated(Collections.emptySet());
915974
verifyInStateUnsubscribed(membershipManager);
916975
}
917976

@@ -940,6 +999,7 @@ public void testLeaveGroupWhenInGroupWithAssignment() {
940999
final CompletableFuture<Void> onGroupLeftOnCloseBeforeRevocationCallback = membershipManager.leaveGroupOnClose();
9411000
assertEquals(onGroupLeft, onGroupLeftOnCloseBeforeRevocationCallback);
9421001
onTasksRevokedCallbackExecuted.complete(null);
1002+
verify(memberStateListener).onGroupAssignmentUpdated(Collections.emptySet());
9431003
verify(subscriptionState).unsubscribe();
9441004
assertFalse(onGroupLeft.isDone());
9451005
verifyInStateLeaving(membershipManager);
@@ -970,6 +1030,7 @@ public void testLeaveGroupOnCloseWhenInGroupWithAssignment() {
9701030
assertFalse(onGroupLeft.isDone());
9711031
verifyInStateLeaving(membershipManager);
9721032
verify(subscriptionState).unsubscribe();
1033+
verify(memberStateListener).onGroupAssignmentUpdated(Collections.emptySet());
9731034
verify(streamsRebalanceEventsProcessor, never()).requestOnTasksRevokedCallbackInvocation(any());
9741035
final CompletableFuture<Void> onGroupLeftBeforeHeartbeatRequestGenerated = membershipManager.leaveGroup();
9751036
assertEquals(onGroupLeft, onGroupLeftBeforeHeartbeatRequestGenerated);
@@ -1199,6 +1260,10 @@ public void testOnHeartbeatRequestGeneratedWhenInLeavingAndPollTimerExpired() {
11991260
membershipManager.onHeartbeatRequestGenerated();
12001261

12011262
verifyInStateStale(membershipManager);
1263+
verify(subscriptionState, never()).assignFromSubscribed(Collections.emptySet());
1264+
onAllTasksLostCallbackExecuted.complete(null);
1265+
verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
1266+
verify(memberStateListener).onGroupAssignmentUpdated(Collections.emptySet());
12021267
}
12031268

12041269
@Test
@@ -1313,6 +1378,7 @@ private void testOnFencedWhenInJoiningOrReconcilingOrAcknowledgingOrStable() {
13131378
verify(subscriptionState, never()).assignFromSubscribed(Collections.emptySet());
13141379
onAllTasksLostCallbackExecuted.complete(null);
13151380
verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
1381+
verify(memberStateListener).onGroupAssignmentUpdated(Collections.emptySet());
13161382
verifyInStateJoining(membershipManager);
13171383
}
13181384

@@ -1354,6 +1420,7 @@ public void testTransitionToFatalWhenInPrepareLeaving() {
13541420
joining();
13551421

13561422
testTransitionToFatalWhenInPrepareLeavingOrLeaving(prepareLeaving());
1423+
13571424
verify(memberStateListener).onMemberEpochUpdated(Optional.empty(), membershipManager.memberId());
13581425
}
13591426

@@ -1452,6 +1519,7 @@ private void testTransitionToFatalWhenInJoiningOrReconcilingOrAcknowledgingOrSta
14521519
verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
14531520
verifyInStateFatal(membershipManager);
14541521
verify(memberStateListener).onMemberEpochUpdated(Optional.empty(), membershipManager.memberId());
1522+
verify(memberStateListener).onGroupAssignmentUpdated(Collections.emptySet());
14551523
}
14561524

14571525
@Test
@@ -1619,16 +1687,17 @@ private void verifyThatNoTasksHaveBeenRevoked() {
16191687
}
16201688

16211689
private void verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(Set<TopicPartition> expectedPartitionsToRevoke,
1622-
Collection<TopicPartition> expectedAllPartitionsToAssign,
1623-
Collection<TopicPartition> expectedNewPartitionsToAssign) {
1690+
Set<TopicPartition> expectedAllPartitionsToAssign,
1691+
Set<TopicPartition> expectedNewPartitionsToAssign) {
16241692
verify(subscriptionState).markPendingRevocation(expectedPartitionsToRevoke);
16251693
verify(subscriptionState, never()).assignFromSubscribedAwaitingCallback(expectedAllPartitionsToAssign, expectedNewPartitionsToAssign);
16261694
verifyInStateReconciling(membershipManager);
16271695
}
16281696

1629-
private void verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(Collection<TopicPartition> expectedAllPartitionsToAssign,
1630-
Collection<TopicPartition> expectedNewPartitionsToAssign) {
1697+
private void verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(Set<TopicPartition> expectedAllPartitionsToAssign,
1698+
Set<TopicPartition> expectedNewPartitionsToAssign) {
16311699
verify(subscriptionState).assignFromSubscribedAwaitingCallback(expectedAllPartitionsToAssign, expectedNewPartitionsToAssign);
1700+
verify(memberStateListener).onGroupAssignmentUpdated(expectedAllPartitionsToAssign);
16321701
verify(subscriptionState, never()).enablePartitionsAwaitingCallback(expectedNewPartitionsToAssign);
16331702
verifyInStateReconciling(membershipManager);
16341703
}

0 commit comments

Comments
 (0)
Please sign in to comment.