Skip to content

Commit 82c9dcf

Browse files
committed
Include feedback
1 parent a952822 commit 82c9dcf

File tree

1 file changed

+220
-1
lines changed

1 file changed

+220
-1
lines changed

clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java

+220-1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import static org.junit.jupiter.api.Assertions.assertEquals;
5858
import static org.junit.jupiter.api.Assertions.assertFalse;
5959
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
60+
import static org.junit.jupiter.api.Assertions.assertNotEquals;
6061
import static org.junit.jupiter.api.Assertions.assertThrows;
6162
import static org.junit.jupiter.api.Assertions.assertTrue;
6263
import static org.mockito.ArgumentMatchers.any;
@@ -924,6 +925,200 @@ public void testReconcilingAndRevocationCallbackFails() {
924925
verifyInStateReconciling(membershipManager);
925926
}
926927

928+
@Test
929+
public void testReconcilingWhenReconciliationAbortedBeforeAssignmentDueToRejoin() {
930+
setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
931+
final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
932+
final CompletableFuture<Void> onTasksRevokedCallbackExecuted = new CompletableFuture<>();
933+
final CompletableFuture<Void> onAllTasksLostCallbackExecuted = new CompletableFuture<>();
934+
final Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(
935+
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
936+
);
937+
final Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(
938+
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
939+
);
940+
when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup, Collections.emptySet(), Collections.emptySet())))
941+
.thenReturn(onTasksAssignedCallbackExecutedSetup);
942+
when(streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
943+
.thenReturn(onTasksRevokedCallbackExecuted);
944+
when(streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation())
945+
.thenReturn(onAllTasksLostCallbackExecuted);
946+
when(subscriptionState.assignedPartitions())
947+
.thenReturn(Collections.emptySet())
948+
.thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
949+
joining();
950+
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
951+
acknowledging(onTasksAssignedCallbackExecutedSetup);
952+
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
953+
final Set<TopicPartition> partitionsToAssignAtSetup = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
954+
final Set<TopicPartition> expectedPartitionsToRevoke = partitionsToAssignAtSetup;
955+
final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
956+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
957+
verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
958+
expectedPartitionsToRevoke,
959+
expectedFullPartitionsToAssign,
960+
expectedNewPartitionsToAssign
961+
);
962+
membershipManager.onPollTimerExpired();
963+
membershipManager.onHeartbeatRequestGenerated();
964+
onAllTasksLostCallbackExecuted.complete(null);
965+
membershipManager.maybeRejoinStaleMember();
966+
967+
onTasksRevokedCallbackExecuted.complete(null);
968+
969+
verify(subscriptionState, never()).assignFromSubscribedAwaitingCallback(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
970+
verify(memberStateListener, never()).onGroupAssignmentUpdated(expectedFullPartitionsToAssign);
971+
verify(subscriptionState, never())
972+
.enablePartitionsAwaitingCallback(argThat(a -> !a.equals(partitionsToAssignAtSetup)));
973+
verify(streamsRebalanceEventsProcessor, never()).requestOnTasksAssignedCallbackInvocation(
974+
makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())
975+
);
976+
verifyInStateJoining(membershipManager);
977+
}
978+
979+
@Test
980+
public void testReconcilingWhenReconciliationAbortedBeforeAssignmentDueToNotInReconciling() {
981+
setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
982+
final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
983+
final CompletableFuture<Void> onTasksRevokedCallbackExecuted = new CompletableFuture<>();
984+
final CompletableFuture<Void> onAllTasksLostCallbackExecuted = new CompletableFuture<>();
985+
final Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(
986+
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
987+
);
988+
final Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(
989+
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
990+
);
991+
when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup, Collections.emptySet(), Collections.emptySet())))
992+
.thenReturn(onTasksAssignedCallbackExecutedSetup);
993+
when(streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
994+
.thenReturn(onTasksRevokedCallbackExecuted);
995+
when(streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation())
996+
.thenReturn(onAllTasksLostCallbackExecuted);
997+
when(subscriptionState.assignedPartitions())
998+
.thenReturn(Collections.emptySet())
999+
.thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
1000+
joining();
1001+
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
1002+
acknowledging(onTasksAssignedCallbackExecutedSetup);
1003+
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
1004+
final Set<TopicPartition> partitionsToAssignAtSetup = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
1005+
final Set<TopicPartition> expectedPartitionsToRevoke = partitionsToAssignAtSetup;
1006+
final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
1007+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
1008+
verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
1009+
expectedPartitionsToRevoke,
1010+
expectedFullPartitionsToAssign,
1011+
expectedNewPartitionsToAssign
1012+
);
1013+
membershipManager.transitionToFatal();
1014+
onAllTasksLostCallbackExecuted.complete(null);
1015+
1016+
onTasksRevokedCallbackExecuted.complete(null);
1017+
1018+
verify(subscriptionState, never()).assignFromSubscribedAwaitingCallback(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
1019+
verify(memberStateListener, never()).onGroupAssignmentUpdated(expectedFullPartitionsToAssign);
1020+
verify(subscriptionState, never())
1021+
.enablePartitionsAwaitingCallback(argThat(a -> !a.equals(partitionsToAssignAtSetup)));
1022+
verify(streamsRebalanceEventsProcessor, never()).requestOnTasksAssignedCallbackInvocation(
1023+
makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())
1024+
);
1025+
verifyInStateFatal(membershipManager);
1026+
}
1027+
1028+
@Test
1029+
public void testReconcilingWhenReconciliationAbortedAfterAssignmentDueToRejoin() {
1030+
setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
1031+
final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
1032+
final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new CompletableFuture<>();
1033+
final CompletableFuture<Void> onTasksRevokedCallbackExecuted = new CompletableFuture<>();
1034+
final CompletableFuture<Void> onAllTasksLostCallbackExecuted = new CompletableFuture<>();
1035+
final Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(
1036+
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
1037+
);
1038+
final Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(
1039+
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
1040+
);
1041+
when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup, Collections.emptySet(), Collections.emptySet())))
1042+
.thenReturn(onTasksAssignedCallbackExecutedSetup);
1043+
when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())))
1044+
.thenReturn(onTasksAssignedCallbackExecuted);
1045+
when(streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
1046+
.thenReturn(onTasksRevokedCallbackExecuted);
1047+
when(streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation())
1048+
.thenReturn(onAllTasksLostCallbackExecuted);
1049+
when(subscriptionState.assignedPartitions())
1050+
.thenReturn(Collections.emptySet())
1051+
.thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
1052+
joining();
1053+
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
1054+
acknowledging(onTasksAssignedCallbackExecutedSetup);
1055+
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
1056+
final Set<TopicPartition> partitionsToAssignAtSetup = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
1057+
final Set<TopicPartition> expectedPartitionsToRevoke = partitionsToAssignAtSetup;
1058+
final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
1059+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
1060+
verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
1061+
expectedPartitionsToRevoke,
1062+
expectedFullPartitionsToAssign,
1063+
expectedNewPartitionsToAssign
1064+
);
1065+
onTasksRevokedCallbackExecuted.complete(null);
1066+
membershipManager.onPollTimerExpired();
1067+
membershipManager.onHeartbeatRequestGenerated();
1068+
onAllTasksLostCallbackExecuted.complete(null);
1069+
membershipManager.maybeRejoinStaleMember();
1070+
1071+
onTasksAssignedCallbackExecuted.complete(null);
1072+
1073+
assertNotEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
1074+
}
1075+
1076+
@Test
1077+
public void testReconcilingWhenReconciliationAbortedAfterAssignmentDueToNotInReconciling() {
1078+
setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
1079+
final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
1080+
final CompletableFuture<Void> onTasksAssignedCallbackExecuted = new CompletableFuture<>();
1081+
final CompletableFuture<Void> onTasksRevokedCallbackExecuted = new CompletableFuture<>();
1082+
final CompletableFuture<Void> onAllTasksLostCallbackExecuted = new CompletableFuture<>();
1083+
final Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(
1084+
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
1085+
);
1086+
final Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(
1087+
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
1088+
);
1089+
when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup, Collections.emptySet(), Collections.emptySet())))
1090+
.thenReturn(onTasksAssignedCallbackExecutedSetup);
1091+
when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())))
1092+
.thenReturn(onTasksAssignedCallbackExecuted);
1093+
when(streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
1094+
.thenReturn(onTasksRevokedCallbackExecuted);
1095+
when(streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation())
1096+
.thenReturn(onAllTasksLostCallbackExecuted);
1097+
when(subscriptionState.assignedPartitions())
1098+
.thenReturn(Collections.emptySet())
1099+
.thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
1100+
joining();
1101+
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
1102+
acknowledging(onTasksAssignedCallbackExecutedSetup);
1103+
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
1104+
final Set<TopicPartition> partitionsToAssignAtSetup = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
1105+
final Set<TopicPartition> expectedPartitionsToRevoke = partitionsToAssignAtSetup;
1106+
final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
1107+
final Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
1108+
verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
1109+
expectedPartitionsToRevoke,
1110+
expectedFullPartitionsToAssign,
1111+
expectedNewPartitionsToAssign
1112+
);
1113+
onTasksRevokedCallbackExecuted.complete(null);
1114+
membershipManager.transitionToFatal();
1115+
onAllTasksLostCallbackExecuted.complete(null);
1116+
1117+
onTasksAssignedCallbackExecuted.complete(null);
1118+
1119+
assertNotEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
1120+
}
1121+
9271122
@Test
9281123
public void testLeaveGroupWhenNotInGroup() {
9291124
testLeaveGroupWhenNotInGroup(membershipManager::leaveGroup);
@@ -1681,6 +1876,31 @@ public void onMemberEpochUpdated(Optional<Integer> memberEpoch, String memberId)
16811876
assertEquals("Listener is already registered.", exception.getMessage());
16821877
}
16831878

1879+
@Test
1880+
public void testConsumerPollWhenNotJoining() {
1881+
setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
1882+
final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
1883+
final Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(
1884+
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
1885+
);
1886+
when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup, Collections.emptySet(), Collections.emptySet())))
1887+
.thenReturn(onTasksAssignedCallbackExecutedSetup);
1888+
joining();
1889+
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
1890+
membershipManager.onSubscriptionUpdated();
1891+
1892+
membershipManager.onConsumerPoll();
1893+
1894+
verifyInStateReconciling(membershipManager);
1895+
}
1896+
1897+
@Test
1898+
public void testConsumerPollWhenSubscriptionNotUpdated() {
1899+
membershipManager.onConsumerPoll();
1900+
1901+
verifyInStateUnsubscribed(membershipManager);
1902+
}
1903+
16841904
private void verifyThatNoTasksHaveBeenRevoked() {
16851905
verify(streamsRebalanceEventsProcessor, never()).requestOnTasksRevokedCallbackInvocation(any());
16861906
verify(subscriptionState, never()).markPendingRevocation(any());
@@ -1936,7 +2156,6 @@ private void reconcile(final StreamsGroupHeartbeatResponse response) {
19362156
membershipManager.onHeartbeatSuccess(response);
19372157
membershipManager.poll(time.milliseconds());
19382158
verifyInStateReconciling(membershipManager);
1939-
19402159
}
19412160

19422161
private void acknowledging(final CompletableFuture<Void> future) {

0 commit comments

Comments
 (0)