diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index 90c2b3a647d7e..724472b212af2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -159,7 +159,7 @@ public abstract class AbstractMembershipManager impl
/**
* If the member is currently leaving the group after a call to {@link #leaveGroup()} or
* {@link #leaveGroupOnClose()}, this will have a future that will complete when the ongoing leave operation
- * completes (callbacks executed and heartbeat request to leave is sent out). This will be empty is the
+ * completes (callbacks executed and heartbeat request to leave is sent out). This will be empty if the
* member is not leaving.
*/
private Optional> leaveGroupInProgress = Optional.empty();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
new file mode 100644
index 0000000000000..0d281cd82d9ca
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
@@ -0,0 +1,1282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent;
+import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackCompletedEvent;
+import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackCompletedEvent;
+import org.apache.kafka.clients.consumer.internals.metrics.ConsumerRebalanceMetricsManager;
+import org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Tracks the state of a single member in relationship to a group:
+ *
+ * Responsible for:
+ *
+ * - Keeping member state
+ * - Keeping assignment for the member
+ * - Reconciling assignment, for example, if tasks need to be revoked before other tasks can be assigned
+ * - Calling the assignment and revocation callbacks on the Streams client
+ *
+ */
+public class StreamsMembershipManager implements RequestManager {
+
+ /**
+ * A data structure to represent the current task assignment, and target task assignment of a member in a
+ * streams group.
+ *
+ * Besides the assigned tasks, it contains a local epoch that is bumped whenever the assignment changes, to ensure
+ * that two assignments with the same tasks but different local epochs are not considered equal.
+ */
+ private static class LocalAssignment {
+ public static final long NONE_EPOCH = -1;
+ public static final LocalAssignment NONE = new LocalAssignment(
+ NONE_EPOCH,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap()
+ );
+
+ public final long localEpoch;
+ public final Map> activeTasks;
+ public final Map> standbyTasks;
+ public final Map> warmupTasks;
+
+ public LocalAssignment(final long localEpoch,
+ final Map> activeTasks,
+ final Map> standbyTasks,
+ final Map> warmupTasks) {
+ this.localEpoch = localEpoch;
+ this.activeTasks = activeTasks;
+ this.standbyTasks = standbyTasks;
+ this.warmupTasks = warmupTasks;
+ if (localEpoch == NONE_EPOCH &&
+ (!activeTasks.isEmpty() || !standbyTasks.isEmpty() || !warmupTasks.isEmpty())) {
+ throw new IllegalArgumentException("Local epoch must be set if tasks are assigned.");
+ }
+ }
+
+ Optional updateWith(final Map> activeTasks,
+ final Map> standbyTasks,
+ final Map> warmupTasks) {
+ if (localEpoch != NONE_EPOCH &&
+ activeTasks.equals(this.activeTasks) &&
+ standbyTasks.equals(this.standbyTasks) &&
+ warmupTasks.equals(this.warmupTasks)) {
+ return Optional.empty();
+ }
+
+ long nextLocalEpoch = localEpoch + 1;
+ return Optional.of(new LocalAssignment(nextLocalEpoch, activeTasks, standbyTasks, warmupTasks));
+ }
+
+ @Override
+ public String toString() {
+ return "LocalAssignment{" +
+ "localEpoch=" + localEpoch +
+ ", activeTasks=" + activeTasks +
+ ", standbyTasks=" + standbyTasks +
+ ", warmupTasks=" + warmupTasks +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ LocalAssignment that = (LocalAssignment) o;
+ return localEpoch == that.localEpoch &&
+ Objects.equals(activeTasks, that.activeTasks) &&
+ Objects.equals(standbyTasks, that.standbyTasks) &&
+ Objects.equals(warmupTasks, that.warmupTasks);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(localEpoch, activeTasks, standbyTasks, warmupTasks);
+ }
+ }
+
+ /**
+ * TopicPartition comparator based on topic name and partition.
+ */
+ static final Utils.TopicPartitionComparator TOPIC_PARTITION_COMPARATOR = new Utils.TopicPartitionComparator();
+
+ private final Logger log;
+
+ /**
+ * The processor that handles events of the Streams rebalance protocol.
+ * For example, requests for invocation of assignment/revocation callbacks.
+ */
+ private final StreamsRebalanceEventsProcessor streamsRebalanceEventsProcessor;
+
+ /**
+ * Data needed to participate in the Streams rebalance protocol.
+ */
+ private final StreamsRebalanceData streamsRebalanceData;
+
+ /**
+ * Subscription state object holding the current assignment the member has for the topology
+ * of the Streams application.
+ */
+ private final SubscriptionState subscriptionState;
+
+ /**
+ * Current state of this member as part of the consumer group, as defined in {@link MemberState}.
+ */
+ private MemberState state;
+
+ /**
+ * Group ID of the Streams group the member will be part of, provided when creating the current
+ * membership manager.
+ */
+ private final String groupId;
+
+ /**
+ * Member ID generated by the consumer at startup, which is unique within the group and remains consistent
+ * for the entire lifetime of the process. This ID acts as an incarnation identifier for the consumer process
+ * and does not reset or change, even if the consumer leaves and rejoins the group.
+ * The Member ID remains the same until the process is completely stopped or terminated.
+ */
+ private final String memberId = Uuid.randomUuid().toString();
+
+ /**
+ * Group instance ID to be used by a static member, provided when creating the current membership manager.
+ */
+ private final Optional groupInstanceId = Optional.empty();
+
+ /**
+ * Current epoch of the member. It will be set to 0 by the member, and provided to the server
+ * on the heartbeat request, to join the group. It will be then maintained by the server,
+ * incremented as the member reconciles and acknowledges the assignments it receives. It will
+ * be reset to 0 if the member gets fenced.
+ */
+ private int memberEpoch = 0;
+
+ /**
+ * If the member is currently leaving the group after a call to {@link #leaveGroup()} or
+ * {@link #leaveGroupOnClose()}, this will have a future that will complete when the ongoing leave operation
+ * completes (callbacks executed and heartbeat request to leave is sent out). This will be empty if the
+ * member is not leaving.
+ */
+ private Optional> leaveGroupInProgress = Optional.empty();
+
+ /**
+ * Future that will complete when a stale member completes releasing its assignment after
+ * leaving the group due to poll timer expired. Used to make sure that the member rejoins
+ * when the timer is reset, only when it completes releasing its assignment.
+ */
+ private CompletableFuture staleMemberAssignmentRelease;
+
+ /**
+ * If there is a reconciliation running (callbacks).
+ * This will be true if {@link #maybeReconcile()} has been triggered
+ * after receiving a heartbeat response, or a metadata update.
+ */
+ private boolean reconciliationInProgress;
+
+ /**
+ * True if a reconciliation is in progress and the member rejoined the group since the start
+ * of the reconciliation. Used to know that the reconciliation in progress should be
+ * interrupted and not be applied.
+ */
+ private boolean rejoinedWhileReconciliationInProgress;
+
+ /**
+ * Registered listeners that will be notified whenever the member epoch gets updated
+ * (valid values received from the broker, or values cleared due to member leaving
+ * the group, getting fenced or failing).
+ */
+ private final List stateUpdatesListeners = new ArrayList<>();
+
+ /**
+ * Tasks received in the last target assignment, together with its local epoch.
+ *
+ * This member variable is reassigned every time a new assignment is received.
+ * It is equal to LocalAssignment.NONE whenever we are not in a group.
+ */
+ private LocalAssignment targetAssignment = LocalAssignment.NONE;
+
+ /**
+ * Assignment that the member received from the server and successfully processed, together with
+ * its local epoch.
+ *
+ * This is equal to LocalAssignment.NONE when we are not in a group, or we haven't reconciled any assignment yet.
+ */
+ private LocalAssignment currentAssignment = LocalAssignment.NONE;
+
+ /**
+ * AtomicBoolean to track whether the subscription is updated.
+ * If it's true and subscription state is UNSUBSCRIBED, the next {@link #onConsumerPoll()} will change member state to JOINING.
+ */
+ private final AtomicBoolean subscriptionUpdated = new AtomicBoolean(false);
+
+ /**
+ * Measures successful rebalance latency and number of failed rebalances.
+ */
+ private final RebalanceMetricsManager metricsManager;
+
+ private final Time time;
+
+ /**
+ * True if the poll timer has expired, signaled by a call to
+ * {@link #transitionToSendingLeaveGroup(boolean)} with dueToExpiredPollTimer param true. This
+ * will be used to determine that the member should transition to STALE after leaving the
+ * group, to release its assignment and wait for the timer to be reset.
+ */
+ private boolean isPollTimerExpired;
+
+ /**
+ * Constructs the Streams membership manager.
+ *
+ * @param groupId The ID of the group.
+ * @param streamsRebalanceEventsProcessor The processor that handles Streams rebalance events like requests for
+ * invocation of assignment/revocation callbacks.
+ * @param streamsRebalanceData Data needed to participate in the Streams rebalance protocol.
+ * @param subscriptionState The subscription state of the member.
+ * @param logContext The log context.
+ * @param time The time.
+ * @param metrics The metrics.
+ */
+ public StreamsMembershipManager(final String groupId,
+ final StreamsRebalanceEventsProcessor streamsRebalanceEventsProcessor,
+ final StreamsRebalanceData streamsRebalanceData,
+ final SubscriptionState subscriptionState,
+ final LogContext logContext,
+ final Time time,
+ final Metrics metrics) {
+ log = logContext.logger(StreamsMembershipManager.class);
+ this.state = MemberState.UNSUBSCRIBED;
+ this.groupId = groupId;
+ this.streamsRebalanceEventsProcessor = streamsRebalanceEventsProcessor;
+ this.streamsRebalanceData = streamsRebalanceData;
+ this.subscriptionState = subscriptionState;
+ metricsManager = new ConsumerRebalanceMetricsManager(metrics);
+ this.time = time;
+ }
+
+ /**
+ * @return Group ID of the group the member is part of (or wants to be part of).
+ */
+ public String groupId() {
+ return groupId;
+ }
+
+ /**
+ * @return Member ID that is generated at startup and remains unchanged for the entire lifetime of the process.
+ */
+ public String memberId() {
+ return memberId;
+ }
+
+ /**
+ * @return Instance ID used by the member when joining the group. If non-empty, it will indicate that
+ * this is a static member.
+ */
+ public Optional groupInstanceId() {
+ return groupInstanceId;
+ }
+
+ /**
+ * @return Current epoch of the member, maintained by the server.
+ */
+ public int memberEpoch() {
+ return memberEpoch;
+ }
+
+ /**
+ * @return Current state of this member in relationship to a group, as defined in
+ * {@link MemberState}.
+ */
+ public MemberState state() {
+ return state;
+ }
+
+ /**
+ * @return True if the member is preparing to leave the group (waiting for callbacks), or
+ * leaving (sending last heartbeat). This is used to skip proactively leaving the group when
+ * the poll timer expires.
+ */
+ public boolean isLeavingGroup() {
+ return state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING;
+ }
+
+ private boolean isNotInGroup() {
+ return state == MemberState.UNSUBSCRIBED ||
+ state == MemberState.FENCED ||
+ state == MemberState.FATAL ||
+ state == MemberState.STALE;
+ }
+
+ /**
+ * Register a new listener that will be invoked whenever the member state changes, or a new
+ * member ID or epoch is received.
+ *
+ * @param listener Listener to invoke.
+ */
+ public void registerStateListener(MemberStateListener listener) {
+ Objects.requireNonNull(listener, "State updates listener cannot be null");
+ for (MemberStateListener registeredListener : stateUpdatesListeners) {
+ if (registeredListener == listener) {
+ throw new IllegalArgumentException("Listener is already registered.");
+ }
+ }
+ stateUpdatesListeners.add(listener);
+ }
+
+ /**
+ * Call all listeners that are registered to get notified when the member epoch is updated.
+ * This also includes the member ID in the notification. If the member fails or leaves
+ * the group, this will be invoked with empty epoch.
+ */
+ private void notifyEpochChange(Optional epoch) {
+ stateUpdatesListeners.forEach(stateListener -> stateListener.onMemberEpochUpdated(epoch, memberId));
+ }
+
+ /**
+ * Invokes the {@link MemberStateListener#onGroupAssignmentUpdated(java.util.Set)} callback for each listener when the
+ * set of assigned partitions changes. This includes on assignment changes, unsubscribe, and when leaving
+ * the group.
+ */
+ void notifyAssignmentChange(Set partitions) {
+ stateUpdatesListeners.forEach(stateListener -> stateListener.onGroupAssignmentUpdated(partitions));
+ }
+
+ /**
+ * Transition to the {@link MemberState#JOINING} state, indicating that the member will
+ * try to join the group on the next heartbeat request. This is expected to be invoked when
+ * the user calls the subscribe API, or when the member wants to rejoin after getting fenced.
+ * Visible for testing.
+ */
+ private void transitionToJoining() {
+ if (state == MemberState.FATAL) {
+ log.warn("No action taken to join the group with the updated subscription because " +
+ "the member is in FATAL state");
+ return;
+ }
+ if (reconciliationInProgress) {
+ rejoinedWhileReconciliationInProgress = true;
+ }
+ resetEpoch();
+ transitionTo(MemberState.JOINING);
+ clearCurrentTaskAssignment();
+ }
+
+ /**
+ * Reset member epoch to the value required for the leave the group heartbeat request, and
+ * transition to the {@link MemberState#LEAVING} state so that a heartbeat request is sent
+ * out with it.
+ *
+ * @param dueToExpiredPollTimer True if the leave group is due to an expired poll timer. This
+ * will indicate that the member must remain STALE after leaving,
+ * until it releases its assignment and the timer is reset.
+ */
+ private void transitionToSendingLeaveGroup(boolean dueToExpiredPollTimer) {
+ if (state == MemberState.FATAL) {
+ log.warn("Member {} with epoch {} won't send leave group request because it is in " +
+ "FATAL state", memberId, memberEpoch);
+ return;
+ }
+ if (state == MemberState.UNSUBSCRIBED) {
+ log.warn("Member {} won't send leave group request because it is already out of the group.",
+ memberId);
+ return;
+ }
+
+ if (dueToExpiredPollTimer) {
+ isPollTimerExpired = true;
+ // Briefly transition through prepare leaving. The member does not have to release
+ // any assignment before sending the leave group given that is stale. It will invoke
+ // onAllTasksLost after sending the leave group on the STALE state.
+ transitionTo(MemberState.PREPARE_LEAVING);
+ }
+ finalizeLeaving();
+ transitionTo(MemberState.LEAVING);
+ }
+
+ private void finalizeLeaving() {
+ updateMemberEpoch(StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH);
+ clearCurrentTaskAssignment();
+ }
+
+ /**
+ * Transition to STALE to release assignments because the member has left the group due to
+ * expired poll timer. This will trigger the onAllTasksLost callback. Once the callback
+ * completes, the member will remain stale until the poll timer is reset by an application
+ * poll event. See {@link #maybeRejoinStaleMember()}.
+ */
+ private void transitionToStale() {
+ transitionTo(MemberState.STALE);
+
+ final CompletableFuture onAllTasksLostCallbackExecution =
+ streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation();
+ staleMemberAssignmentRelease = onAllTasksLostCallbackExecution.whenComplete((result, error) -> {
+ if (error != null) {
+ log.error("Task revocation callback invocation failed " +
+ "after member left group due to expired poll timer.", error);
+ }
+ clearTaskAndPartitionAssignment();
+ log.debug("Member {} sent leave group heartbeat and released its assignment. It will remain " +
+ "in {} state until the poll timer is reset, and it will then rejoin the group",
+ memberId, MemberState.STALE);
+ });
+ }
+
+ /**
+ * Transition the member to the FATAL state and update the member info as required. This is
+ * invoked when un-recoverable errors occur (ex. when the heartbeat returns a non-retriable
+ * error)
+ */
+ public void transitionToFatal() {
+ MemberState previousState = state;
+ transitionTo(MemberState.FATAL);
+ log.error("Member {} with epoch {} transitioned to fatal state", memberId, memberEpoch);
+ notifyEpochChange(Optional.empty());
+
+ if (previousState == MemberState.UNSUBSCRIBED) {
+ log.debug("Member {} with epoch {} got fatal error from the broker but it already " +
+ "left the group, so onAllTasksLost callback won't be triggered.", memberId, memberEpoch);
+ return;
+ }
+
+ if (previousState == MemberState.LEAVING || previousState == MemberState.PREPARE_LEAVING) {
+ log.info("Member {} with epoch {} was leaving the group with state {} when it got a " +
+ "fatal error from the broker. It will discard the ongoing leave and remain in " +
+ "fatal state.", memberId, memberEpoch, previousState);
+ maybeCompleteLeaveInProgress();
+ return;
+ }
+
+ CompletableFuture onAllTasksLostCallbackExecuted = streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation();
+ onAllTasksLostCallbackExecuted.whenComplete((result, error) -> {
+ if (error != null) {
+ log.error("onAllTasksLost callback invocation failed while releasing assignment " +
+ "after member failed with fatal error.", error);
+ }
+ clearTaskAndPartitionAssignment();
+ });
+ }
+
+ /**
+ * Notify when the heartbeat request is skipped.
+ * Transition out of the {@link MemberState#LEAVING} state even if the heartbeat was not sent.
+ * This will ensure that the member is not blocked on {@link MemberState#LEAVING} (best
+ * effort to send the request, without any response handling or retry logic)
+ */
+ public void onHeartbeatRequestSkipped() {
+ if (state == MemberState.LEAVING) {
+ log.warn("Heartbeat to leave group cannot be sent (most probably due to coordinator " +
+ "not known/available). Member {} with epoch {} will transition to {}.",
+ memberId, memberEpoch, MemberState.UNSUBSCRIBED);
+ transitionTo(MemberState.UNSUBSCRIBED);
+ maybeCompleteLeaveInProgress();
+ }
+ }
+
+ /**
+ * Update the member state, setting it to the nextState only if it is a valid transition.
+ *
+ * @throws IllegalStateException If transitioning from the member {@link #state} to the
+ * nextState is not allowed as defined in {@link MemberState}.
+ */
+ private void transitionTo(MemberState nextState) {
+ if (!state.equals(nextState) && !nextState.getPreviousValidStates().contains(state)) {
+ throw new IllegalStateException(String.format("Invalid state transition from %s to %s",
+ state, nextState));
+ }
+
+ if (isCompletingRebalance(state, nextState)) {
+ metricsManager.recordRebalanceEnded(time.milliseconds());
+ }
+ if (isStartingRebalance(state, nextState)) {
+ metricsManager.recordRebalanceStarted(time.milliseconds());
+ }
+
+ log.info("Member {} with epoch {} transitioned from {} to {}.", memberId, memberEpoch, state, nextState);
+ this.state = nextState;
+ }
+
+ private static boolean isCompletingRebalance(MemberState currentState, MemberState nextState) {
+ return currentState == MemberState.RECONCILING &&
+ (nextState == MemberState.STABLE || nextState == MemberState.ACKNOWLEDGING);
+ }
+
+ private static boolean isStartingRebalance(MemberState currentState, MemberState nextState) {
+ return currentState != MemberState.RECONCILING && nextState == MemberState.RECONCILING;
+ }
+
+ private void resetEpoch() {
+ updateMemberEpoch(StreamsGroupHeartbeatRequest.JOIN_GROUP_MEMBER_EPOCH);
+ }
+
+ private void updateMemberEpoch(int newEpoch) {
+ boolean newEpochReceived = this.memberEpoch != newEpoch;
+ this.memberEpoch = newEpoch;
+ if (newEpochReceived) {
+ if (memberEpoch > 0) {
+ notifyEpochChange(Optional.of(memberEpoch));
+ } else {
+ notifyEpochChange(Optional.empty());
+ }
+ }
+ }
+
+ /**
+ * Discard assignments received that have not been reconciled yet (waiting for metadata
+ * or the next reconciliation loop).
+ */
+ private void clearCurrentTaskAssignment() {
+ currentAssignment = LocalAssignment.NONE;
+ }
+
+ /**
+ * Clear the assigned partitions in the member subscription, pending assignments and metadata cache.
+ */
+ private void clearTaskAndPartitionAssignment() {
+ subscriptionState.assignFromSubscribed(Collections.emptySet());
+ notifyAssignmentChange(Collections.emptySet());
+ currentAssignment = LocalAssignment.NONE;
+ targetAssignment = LocalAssignment.NONE;
+ }
+
+ /**
+ * @return True if the member should not send heartbeats, which is the case when it is in a
+ * state where it is not an active member of the group.
+ */
+ public boolean shouldSkipHeartbeat() {
+ return isNotInGroup();
+ }
+
+ /**
+ * @return True if the member should send heartbeat to the coordinator without waiting for
+ * the interval.
+ */
+ public boolean shouldHeartbeatNow() {
+ return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING || state == MemberState.JOINING;
+ }
+
+ /**
+ * Set {@link #subscriptionUpdated} to true to indicate that the subscription has been updated.
+ * The next {@link #onConsumerPoll()} will join the group with the updated subscription, if the member is not part of it yet.
+ * If the member is already part of the group, this will only ensure that the updated subscription
+ * is included in the next heartbeat request.
+ *
+ * Note that the list of topics in the subscription is taken from the shared subscription state.
+ */
+ public void onSubscriptionUpdated() {
+ subscriptionUpdated.compareAndSet(false, true);
+ }
+
+ /**
+ * Join the group if the member is not part of it yet. This function separates {@link #transitionToJoining}
+ * from the {@link #onSubscriptionUpdated} to fulfill the requirement of the "rebalances will only occur during an
+ * active call to {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)}"
+ */
+ public void onConsumerPoll() {
+ if (subscriptionUpdated.compareAndSet(true, false) && state == MemberState.UNSUBSCRIBED) {
+ transitionToJoining();
+ }
+ }
+
+ /**
+ * Update state when a heartbeat is generated. This will transition out of the states that end
+ * when a heartbeat request is sent, without waiting for a response (ex.
+ * {@link MemberState#ACKNOWLEDGING} and {@link MemberState#LEAVING}).
+ */
+ public void onHeartbeatRequestGenerated() {
+ if (state == MemberState.ACKNOWLEDGING) {
+ if (targetAssignmentReconciled()) {
+ transitionTo(MemberState.STABLE);
+ } else {
+ log.debug("Member {} with epoch {} transitioned to {} after a heartbeat was sent " +
+ "to ack a previous reconciliation. New assignments are ready to " +
+ "be reconciled.", memberId, memberEpoch, MemberState.RECONCILING);
+ transitionTo(MemberState.RECONCILING);
+ }
+ } else if (state == MemberState.LEAVING) {
+ if (isPollTimerExpired) {
+ log.debug("Member {} with epoch {} generated the heartbeat to leave due to expired poll timer. It will " +
+ "remain stale (no heartbeat) until it rejoins the group on the next consumer " +
+ "poll.", memberId, memberEpoch);
+ transitionToStale();
+ } else {
+ log.debug("Member {} with epoch {} generated the heartbeat to leave the group.", memberId, memberEpoch);
+ transitionTo(MemberState.UNSUBSCRIBED);
+ }
+ }
+ }
+
+ /**
+ * Notify about a successful heartbeat response.
+ *
+ * @param response Heartbeat response to extract member info and errors from.
+ */
+ public void onHeartbeatSuccess(StreamsGroupHeartbeatResponse response) {
+ StreamsGroupHeartbeatResponseData responseData = response.data();
+ throwIfUnexpectedError(responseData);
+ if (state == MemberState.LEAVING) {
+ log.debug("Ignoring heartbeat response received from broker. Member {} with epoch {} is " +
+ "already leaving the group.", memberId, memberEpoch);
+ return;
+ }
+ if (state == MemberState.UNSUBSCRIBED && maybeCompleteLeaveInProgress()) {
+ log.debug("Member {} with epoch {} received a successful response to the heartbeat " +
+ "to leave the group and completed the leave operation. ", memberId, memberEpoch);
+ return;
+ }
+ if (isNotInGroup()) {
+ log.debug("Ignoring heartbeat response received from broker. Member {} is in {} state" +
+ " so it's not a member of the group. ", memberId, state);
+ return;
+ }
+
+ updateMemberEpoch(responseData.memberEpoch());
+
+ final List activeTasks = responseData.activeTasks();
+ final List standbyTasks = responseData.standbyTasks();
+ final List warmupTasks = responseData.warmupTasks();
+
+ if (activeTasks != null && standbyTasks != null && warmupTasks != null) {
+
+ if (!state.canHandleNewAssignment()) {
+ log.debug("Ignoring new assignment: active tasks {}, standby tasks {}, and warm-up tasks {} received " +
+ "from server because member is in {} state.",
+ activeTasks, standbyTasks, warmupTasks, state);
+ return;
+ }
+
+ processAssignmentReceived(
+ toTasksAssignment(activeTasks),
+ toTasksAssignment(standbyTasks),
+ toTasksAssignment(warmupTasks)
+ );
+ } else {
+ if (responseData.activeTasks() != null ||
+ responseData.standbyTasks() != null ||
+ responseData.warmupTasks() != null) {
+
+ throw new IllegalStateException("Invalid response data, task collections must be all null or all non-null: "
+ + responseData);
+ }
+ }
+ }
+
+ /**
+ * Notify the member that an error heartbeat response was received.
+ *
+ * @param retriable True if the request failed with a retriable error.
+ */
+ public void onHeartbeatFailure(boolean retriable) {
+ if (!retriable) {
+ metricsManager.maybeRecordRebalanceFailed();
+ }
+ // The leave group request is sent out once (not retried), so we should complete the leave
+ // operation once the request completes, regardless of the response.
+ if (state == MemberState.UNSUBSCRIBED && maybeCompleteLeaveInProgress()) {
+ log.warn("Member {} with epoch {} received a failed response to the heartbeat to " +
+ "leave the group and completed the leave operation. ", memberId, memberEpoch);
+ }
+ }
+
+ /**
+ * Notify when the poll timer expired.
+ */
+ public void onPollTimerExpired() {
+ transitionToSendingLeaveGroup(true);
+ }
+
+ /**
+ * Notify when member is fenced.
+ */
+ public void onFenced() {
+ if (state == MemberState.PREPARE_LEAVING) {
+ log.debug("Member {} with epoch {} got fenced but it is already preparing to leave " +
+ "the group, so it will stop sending heartbeat and won't attempt to send the " +
+ "leave request or rejoin.", memberId, memberEpoch);
+ finalizeLeaving();
+ transitionTo(MemberState.UNSUBSCRIBED);
+ maybeCompleteLeaveInProgress();
+ return;
+ }
+
+ if (state == MemberState.LEAVING) {
+ log.debug("Member {} with epoch {} got fenced before sending leave group heartbeat. " +
+ "It will not send the leave request and won't attempt to rejoin.", memberId, memberEpoch);
+ transitionTo(MemberState.UNSUBSCRIBED);
+ maybeCompleteLeaveInProgress();
+ return;
+ }
+ if (state == MemberState.UNSUBSCRIBED) {
+ log.debug("Member {} with epoch {} got fenced but it already left the group, so it " +
+ "won't attempt to rejoin.", memberId, memberEpoch);
+ return;
+ }
+ transitionTo(MemberState.FENCED);
+ resetEpoch();
+ log.debug("Member {} with epoch {} transitioned to {} state. It will release its " +
+ "assignment and rejoin the group.", memberId, memberEpoch, MemberState.FENCED);
+
+ CompletableFuture callbackResult = streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation();
+ callbackResult.whenComplete((result, error) -> {
+ if (error != null) {
+ log.error("onAllTasksLost callback invocation failed while releasing assignment" +
+ " after member got fenced. Member will rejoin the group anyways.", error);
+ }
+ clearTaskAndPartitionAssignment();
+ if (state == MemberState.FENCED) {
+ transitionToJoining();
+ } else {
+ log.debug("Fenced member onAllTasksLost callback completed but the state has " +
+ "already changed to {}, so the member won't rejoin the group", state);
+ }
+ });
+ }
+
+ private void throwIfUnexpectedError(StreamsGroupHeartbeatResponseData responseData) {
+ if (responseData.errorCode() != Errors.NONE.code()) {
+ String errorMessage = String.format(
+ "Unexpected error in Heartbeat response. Expected no error, but received: %s with message: '%s'",
+ Errors.forCode(responseData.errorCode()), responseData.errorMessage()
+ );
+ throw new IllegalArgumentException(errorMessage);
+ }
+ }
+
+ /**
+ * Transition a {@link MemberState#STALE} member to {@link MemberState#JOINING} when it completes
+ * releasing its assignment. This is expected to be used when the poll timer is reset.
+ */
+ public void maybeRejoinStaleMember() {
+ isPollTimerExpired = false;
+ if (state == MemberState.STALE) {
+ log.debug("Expired poll timer has been reset so stale member {} will rejoin the group " +
+ "when it completes releasing its previous assignment.", memberId);
+ staleMemberAssignmentRelease.whenComplete((__, error) -> transitionToJoining());
+ }
+ }
+
+ /**
+ * Complete the leave in progress (if any). This is expected to be used to complete the leave
+ * in progress when a member receives the response to the leave heartbeat.
+ */
+ private boolean maybeCompleteLeaveInProgress() {
+ if (leaveGroupInProgress.isPresent()) {
+ leaveGroupInProgress.get().complete(null);
+ leaveGroupInProgress = Optional.empty();
+ return true;
+ }
+ return false;
+ }
+
+ private static SortedSet toTaskIdSet(final Map> tasks) {
+ SortedSet taskIdSet = new TreeSet<>();
+ for (final Map.Entry> task : tasks.entrySet()) {
+ final String subtopologyId = task.getKey();
+ final SortedSet partitions = task.getValue();
+ for (final int partition : partitions) {
+ taskIdSet.add(new StreamsRebalanceData.TaskId(subtopologyId, partition));
+ }
+ }
+ return taskIdSet;
+ }
+
+ private static Map> toTasksAssignment(final List taskIds) {
+ return taskIds.stream()
+ .collect(Collectors.toMap(StreamsGroupHeartbeatResponseData.TaskIds::subtopologyId, taskId -> new TreeSet<>(taskId.partitions())));
+ }
+
+ /**
+ * Leaves the group when the member closes.
+ *
+ *
+ * This method does the following:
+ *
+ * - Transitions member state to {@link MemberState#PREPARE_LEAVING}.
+ * - Skips the invocation of the revocation callback or lost callback.
+ * - Clears the current and target assignment, unsubscribes from all topics and
+ * transitions the member state to {@link MemberState#LEAVING}.
+ *
+ * States {@link MemberState#PREPARE_LEAVING} and {@link MemberState#LEAVING} cause the heartbeat request manager
+ * to send a leave group heartbeat.
+ *
+ *
+ * @return future that will complete when the heartbeat to leave the group has been sent out.
+ */
+ public CompletableFuture leaveGroupOnClose() {
+ return leaveGroup(true);
+ }
+
+ /**
+ * Leaves the group.
+ *
+ *
+ * This method does the following:
+ *
+ * - Transitions member state to {@link MemberState#PREPARE_LEAVING}.
+ * - Requests the invocation of the revocation callback or lost callback.
+ * - Once the callback completes, it clears the current and target assignment, unsubscribes from
+ * all topics and transitions the member state to {@link MemberState#LEAVING}.
+ *
+ * States {@link MemberState#PREPARE_LEAVING} and {@link MemberState#LEAVING} cause the heartbeat request manager
+ * to send a leave group heartbeat.
+ *
+ *
+ * @return future that will complete when the revocation callback execution completes and the heartbeat
+ * to leave the group has been sent out.
+ */
+ public CompletableFuture leaveGroup() {
+ return leaveGroup(false);
+ }
+
+ private CompletableFuture leaveGroup(final boolean isOnClose) {
+ if (isNotInGroup()) {
+ if (state == MemberState.FENCED) {
+ clearTaskAndPartitionAssignment();
+ transitionTo(MemberState.UNSUBSCRIBED);
+ }
+ subscriptionState.unsubscribe();
+ notifyAssignmentChange(Collections.emptySet());
+ return CompletableFuture.completedFuture(null);
+ }
+
+ if (state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING) {
+ log.debug("Leave group operation already in progress for member {}", memberId);
+ return leaveGroupInProgress.get();
+ }
+
+ transitionTo(MemberState.PREPARE_LEAVING);
+ CompletableFuture onGroupLeft = new CompletableFuture<>();
+ leaveGroupInProgress = Optional.of(onGroupLeft);
+ if (isOnClose) {
+ leaving();
+ } else {
+ CompletableFuture onAllActiveTasksReleasedCallbackExecuted = releaseActiveTasks();
+ onAllActiveTasksReleasedCallbackExecuted
+ .whenComplete((__, callbackError) -> leavingAfterReleasingActiveTasks(callbackError));
+ }
+
+ return onGroupLeft;
+ }
+
+ private CompletableFuture releaseActiveTasks() {
+ if (memberEpoch > 0) {
+ return revokeActiveTasks(toTaskIdSet(currentAssignment.activeTasks));
+ } else {
+ return releaseLostActiveTasks();
+ }
+ }
+
+ private void leavingAfterReleasingActiveTasks(Throwable callbackError) {
+ if (callbackError != null) {
+ log.error("Member {} callback to revoke task assignment failed. It will proceed " +
+ "to clear its assignment and send a leave group heartbeat",
+ memberId, callbackError);
+ } else {
+ log.info("Member {} completed callback to revoke task assignment. It will proceed " +
+ "to clear its assignment and send a leave group heartbeat",
+ memberId);
+ }
+ leaving();
+ }
+
+ private void leaving() {
+ clearTaskAndPartitionAssignment();
+ subscriptionState.unsubscribe();
+ transitionToSendingLeaveGroup(false);
+ }
+
+ /**
+ * This will process the assignment received if it is different from the member's current
+ * assignment. If a new assignment is received, this will make sure reconciliation is attempted
+ * on the next call of `poll`. If another reconciliation is currently in process, the first `poll`
+ * after that reconciliation will trigger the new reconciliation.
+ *
+ * @param activeTasks Target active tasks assignment received from the broker.
+ * @param standbyTasks Target standby tasks assignment received from the broker.
+ * @param warmupTasks Target warm-up tasks assignment received from the broker.
+ */
+ private void processAssignmentReceived(Map> activeTasks,
+ Map> standbyTasks,
+ Map> warmupTasks) {
+ replaceTargetAssignmentWithNewAssignment(activeTasks, standbyTasks, warmupTasks);
+ if (!targetAssignmentReconciled()) {
+ transitionTo(MemberState.RECONCILING);
+ } else {
+ log.debug("Target assignment {} received from the broker is equals to the member " +
+ "current assignment {}. Nothing to reconcile.",
+ targetAssignment, currentAssignment);
+ if (state == MemberState.RECONCILING || state == MemberState.JOINING) {
+ transitionTo(MemberState.STABLE);
+ }
+ }
+ }
+
+ private boolean targetAssignmentReconciled() {
+ return currentAssignment.equals(targetAssignment);
+ }
+
+ private void replaceTargetAssignmentWithNewAssignment(Map> activeTasks,
+ Map> standbyTasks,
+ Map> warmupTasks) {
+ targetAssignment.updateWith(activeTasks, standbyTasks, warmupTasks)
+ .ifPresent(updatedAssignment -> {
+ log.debug("Target assignment updated from {} to {}. Member will reconcile it on the next poll.",
+ targetAssignment, updatedAssignment);
+ targetAssignment = updatedAssignment;
+ });
+ }
+
+ /**
+ * Called by the network thread to reconcile the current and target assignment.
+ */
+ @Override
+ public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+ if (state == MemberState.RECONCILING) {
+ maybeReconcile();
+ }
+ return NetworkClientDelegate.PollResult.EMPTY;
+ }
+
+ /**
+ * Reconcile the assignment that has been received from the server. Reconciliation will trigger the
+ * callbacks and update the subscription state.
+ *
+ * There are two conditions under which no reconciliation will be triggered:
+ * - We have already reconciled the assignment (the target assignment is the same as the current assignment).
+ * - Another reconciliation is already in progress.
+ */
+ private void maybeReconcile() {
+ if (targetAssignmentReconciled()) {
+ log.trace("Ignoring reconciliation attempt. Target assignment is equal to the " +
+ "current assignment.");
+ return;
+ }
+ if (reconciliationInProgress) {
+ log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " +
+ targetAssignment + " will be handled in the next reconciliation loop.");
+ return;
+ }
+
+ markReconciliationInProgress();
+
+ SortedSet assignedActiveTasks = toTaskIdSet(targetAssignment.activeTasks);
+ SortedSet ownedActiveTasks = toTaskIdSet(currentAssignment.activeTasks);
+ SortedSet activeTasksToRevoke = new TreeSet<>(ownedActiveTasks);
+ activeTasksToRevoke.removeAll(assignedActiveTasks);
+ SortedSet assignedStandbyTasks = toTaskIdSet(targetAssignment.standbyTasks);
+ SortedSet ownedStandbyTasks = toTaskIdSet(currentAssignment.standbyTasks);
+ SortedSet assignedWarmupTasks = toTaskIdSet(targetAssignment.warmupTasks);
+ SortedSet ownedWarmupTasks = toTaskIdSet(currentAssignment.warmupTasks);
+
+ log.info("Assigned tasks with local epoch {}\n" +
+ "\tMember: {}\n" +
+ "\tAssigned active tasks: {}\n" +
+ "\tOwned active tasks: {}\n" +
+ "\tActive tasks to revoke: {}\n" +
+ "\tAssigned standby tasks: {}\n" +
+ "\tOwned standby tasks: {}\n" +
+ "\tAssigned warm-up tasks: {}\n" +
+ "\tOwned warm-up tasks: {}\n",
+ targetAssignment.localEpoch,
+ memberId,
+ assignedActiveTasks,
+ ownedActiveTasks,
+ activeTasksToRevoke,
+ assignedStandbyTasks,
+ ownedStandbyTasks,
+ assignedWarmupTasks,
+ ownedWarmupTasks
+ );
+
+ SortedSet ownedTopicPartitionsFromSubscriptionState = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+ ownedTopicPartitionsFromSubscriptionState.addAll(subscriptionState.assignedPartitions());
+ SortedSet ownedTopicPartitionsFromAssignedTasks =
+ topicPartitionsForActiveTasks(currentAssignment.activeTasks);
+ if (!ownedTopicPartitionsFromAssignedTasks.equals(ownedTopicPartitionsFromSubscriptionState)) {
+ throw new IllegalStateException("Owned partitions from subscription state and owned partitions from " +
+ "assigned active tasks are not equal. " +
+ "Owned partitions from subscription state: " + ownedTopicPartitionsFromSubscriptionState + ", " +
+ "Owned partitions from assigned active tasks: " + ownedTopicPartitionsFromAssignedTasks);
+ }
+ SortedSet assignedTopicPartitions = topicPartitionsForActiveTasks(targetAssignment.activeTasks);
+ SortedSet partitionsToRevoke = new TreeSet<>(ownedTopicPartitionsFromSubscriptionState);
+ partitionsToRevoke.removeAll(assignedTopicPartitions);
+
+ final CompletableFuture tasksRevoked = revokeActiveTasks(activeTasksToRevoke);
+
+ final CompletableFuture tasksRevokedAndAssigned = tasksRevoked.thenCompose(__ -> {
+ if (!maybeAbortReconciliation()) {
+ return assignTasks(assignedActiveTasks, ownedActiveTasks, assignedStandbyTasks, assignedWarmupTasks);
+ }
+ return CompletableFuture.completedFuture(null);
+ });
+
+ // The current target assignment is captured to ensure that acknowledging the current assignment is done with
+ // the same target assignment that was used when this reconciliation was initiated.
+ LocalAssignment currentTargetAssignment = targetAssignment;
+ tasksRevokedAndAssigned.whenComplete((__, callbackError) -> {
+ if (callbackError != null) {
+ log.error("Reconciliation failed: callback invocation failed for tasks {}",
+ currentTargetAssignment, callbackError);
+ markReconciliationCompleted();
+ } else {
+ if (reconciliationInProgress && !maybeAbortReconciliation()) {
+ currentAssignment = currentTargetAssignment;
+ transitionTo(MemberState.ACKNOWLEDGING);
+ markReconciliationCompleted();
+ }
+ }
+ });
+ }
+
+ private CompletableFuture revokeActiveTasks(final SortedSet activeTasksToRevoke) {
+ if (activeTasksToRevoke.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ log.info("Revoking previously assigned active tasks {}", activeTasksToRevoke.stream()
+ .map(StreamsRebalanceData.TaskId::toString)
+ .collect(Collectors.joining(", ")));
+
+ final SortedSet partitionsToRevoke = topicPartitionsForActiveTasks(activeTasksToRevoke);
+ log.debug("Marking partitions pending for revocation: {}", partitionsToRevoke);
+ subscriptionState.markPendingRevocation(partitionsToRevoke);
+
+ CompletableFuture tasksRevoked = new CompletableFuture<>();
+ CompletableFuture onTasksRevokedCallbackExecuted =
+ streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasksToRevoke);
+ onTasksRevokedCallbackExecuted.whenComplete((__, callbackError) -> {
+ if (callbackError != null) {
+ log.error("onTasksRevoked callback invocation failed for tasks {}",
+ activeTasksToRevoke, callbackError);
+ tasksRevoked.completeExceptionally(callbackError);
+ } else {
+ tasksRevoked.complete(null);
+ }
+ });
+ return tasksRevoked;
+ }
+
+ private CompletableFuture assignTasks(final SortedSet activeTasksToAssign,
+ final SortedSet ownedActiveTasks,
+ final SortedSet standbyTasksToAssign,
+ final SortedSet warmupTasksToAssign) {
+ log.info("Assigning active tasks {{}}, standby tasks {{}}, and warm-up tasks {{}} to the member.",
+ activeTasksToAssign.stream()
+ .map(StreamsRebalanceData.TaskId::toString)
+ .collect(Collectors.joining(", ")),
+ standbyTasksToAssign.stream()
+ .map(StreamsRebalanceData.TaskId::toString)
+ .collect(Collectors.joining(", ")),
+ warmupTasksToAssign.stream()
+ .map(StreamsRebalanceData.TaskId::toString)
+ .collect(Collectors.joining(", "))
+ );
+
+ final SortedSet partitionsToAssign = topicPartitionsForActiveTasks(activeTasksToAssign);
+ final SortedSet partitionsToAssigneNotPreviouslyOwned =
+ partitionsToAssignNotPreviouslyOwned(partitionsToAssign, topicPartitionsForActiveTasks(ownedActiveTasks));
+
+ subscriptionState.assignFromSubscribedAwaitingCallback(
+ partitionsToAssign,
+ partitionsToAssigneNotPreviouslyOwned
+ );
+ notifyAssignmentChange(partitionsToAssign);
+
+ CompletableFuture onTasksAssignedCallbackExecuted =
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ new StreamsRebalanceData.Assignment(
+ activeTasksToAssign,
+ standbyTasksToAssign,
+ warmupTasksToAssign
+ )
+ );
+ onTasksAssignedCallbackExecuted.whenComplete((__, callbackError) -> {
+ if (callbackError == null) {
+ subscriptionState.enablePartitionsAwaitingCallback(partitionsToAssign);
+ } else {
+ if (!partitionsToAssigneNotPreviouslyOwned.isEmpty()) {
+ log.warn("Leaving newly assigned partitions {} marked as non-fetchable and not " +
+ "requiring initializing positions after onTasksAssigned callback failed.",
+ partitionsToAssigneNotPreviouslyOwned, callbackError);
+ }
+ }
+ });
+
+ return onTasksAssignedCallbackExecuted;
+ }
+
+ private CompletableFuture releaseLostActiveTasks() {
+ final SortedSet activeTasksToRelease = toTaskIdSet(currentAssignment.activeTasks);
+ log.info("Revoking previously assigned and now lost active tasks {}", activeTasksToRelease.stream()
+ .map(StreamsRebalanceData.TaskId::toString)
+ .collect(Collectors.joining(", ")));
+
+ final SortedSet partitionsToRelease = topicPartitionsForActiveTasks(activeTasksToRelease);
+ log.debug("Marking lost partitions pending for revocation: {}", partitionsToRelease);
+ subscriptionState.markPendingRevocation(partitionsToRelease);
+
+ return streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation();
+ }
+
+ private SortedSet partitionsToAssignNotPreviouslyOwned(final SortedSet assignedTopicPartitions,
+ final SortedSet ownedTopicPartitions) {
+ SortedSet assignedPartitionsNotPreviouslyOwned = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+ assignedPartitionsNotPreviouslyOwned.addAll(assignedTopicPartitions);
+ assignedPartitionsNotPreviouslyOwned.removeAll(ownedTopicPartitions);
+ return assignedPartitionsNotPreviouslyOwned;
+ }
+
+ private SortedSet topicPartitionsForActiveTasks(final Map> activeTasks) {
+ final SortedSet topicPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+ activeTasks.forEach((subtopologyId, partitionIds) ->
+ Stream.concat(
+ streamsRebalanceData.subtopologies().get(subtopologyId).sourceTopics().stream(),
+ streamsRebalanceData.subtopologies().get(subtopologyId).repartitionSourceTopics().keySet().stream()
+ ).forEach(topic -> {
+ for (final int partitionId : partitionIds) {
+ topicPartitions.add(new TopicPartition(topic, partitionId));
+ }
+ })
+ );
+ return topicPartitions;
+ }
+
+ private SortedSet topicPartitionsForActiveTasks(final SortedSet activeTasks) {
+ final SortedSet topicPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+ activeTasks.forEach(task ->
+ Stream.concat(
+ streamsRebalanceData.subtopologies().get(task.subtopologyId()).sourceTopics().stream(),
+ streamsRebalanceData.subtopologies().get(task.subtopologyId()).repartitionSourceTopics().keySet().stream()
+ ).forEach(topic -> {
+ topicPartitions.add(new TopicPartition(topic, task.partitionId()));
+ })
+ );
+ return topicPartitions;
+ }
+
+ private void markReconciliationCompleted() {
+ reconciliationInProgress = false;
+ rejoinedWhileReconciliationInProgress = false;
+ }
+
+ private boolean maybeAbortReconciliation() {
+ boolean shouldAbort = state != MemberState.RECONCILING || rejoinedWhileReconciliationInProgress;
+ if (shouldAbort) {
+ String reason = rejoinedWhileReconciliationInProgress ?
+ "the member has re-joined the group" :
+ "the member already transitioned out of the reconciling state into " + state;
+ log.info("Interrupting reconciliation that is not relevant anymore because " + reason);
+ markReconciliationCompleted();
+ }
+ return shouldAbort;
+ }
+
+ private void markReconciliationInProgress() {
+ reconciliationInProgress = true;
+ rejoinedWhileReconciliationInProgress = false;
+ }
+
+ /**
+ * Completes the future that marks the completed execution of the onTasksRevoked callback.
+
+ * @param event The event containing the future sent from the application thread to the network thread to
+ * confirm the execution of the callback.
+ */
+ public void onTasksRevokedCallbackCompleted(final StreamsOnTasksRevokedCallbackCompletedEvent event) {
+ Optional error = event.error();
+ CompletableFuture future = event.future();
+
+ if (error.isPresent()) {
+ Exception e = error.get();
+ log.warn("The onTasksRevoked callback completed with an error ({}); " +
+ "signaling to continue to the next phase of rebalance", e.getMessage());
+ future.completeExceptionally(e);
+ } else {
+ log.debug("The onTasksRevoked callback completed successfully; signaling to continue to the next phase of rebalance");
+ future.complete(null);
+ }
+ }
+
+ /**
+ * Completes the future that marks the completed execution of the onTasksAssigned callback.
+
+ * @param event The event containing the future sent from the application thread to the network thread to
+ * confirm the execution of the callback.
+ */
+ public void onTasksAssignedCallbackCompleted(final StreamsOnTasksAssignedCallbackCompletedEvent event) {
+ Optional error = event.error();
+ CompletableFuture future = event.future();
+
+ if (error.isPresent()) {
+ Exception e = error.get();
+ log.warn("The onTasksAssigned callback completed with an error ({}); " +
+ "signaling to continue to the next phase of rebalance", e.getMessage());
+ future.completeExceptionally(e);
+ } else {
+ log.debug("The onTasksAssigned callback completed successfully; signaling to continue to the next phase of rebalance");
+ future.complete(null);
+ }
+ }
+
+ /**
+ * Completes the future that marks the completed execution of the onAllTasksLost callback.
+
+ * @param event The event containing the future sent from the application thread to the network thread to
+ * confirm the execution of the callback.
+ */
+ public void onAllTasksLostCallbackCompleted(final StreamsOnAllTasksLostCallbackCompletedEvent event) {
+ Optional error = event.error();
+ CompletableFuture future = event.future();
+
+ if (error.isPresent()) {
+ Exception e = error.get();
+ log.warn("The onAllTasksLost callback completed with an error ({}); " +
+ "signaling to continue to the next phase of rebalance", e.getMessage());
+ future.completeExceptionally(e);
+ } else {
+ log.debug("The onAllTasksLost callback completed successfully; signaling to continue to the next phase of rebalance");
+ future.complete(null);
+ }
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
new file mode 100644
index 0000000000000..b9c28406706d0
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
@@ -0,0 +1,2186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent;
+import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackCompletedEvent;
+import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackCompletedEvent;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class StreamsMembershipManagerTest {
+
+ private static final String GROUP_ID = "test-group";
+ private static final int MEMBER_EPOCH = 1;
+
+ private static final String SUBTOPOLOGY_ID_0 = "subtopology-0";
+ private static final String SUBTOPOLOGY_ID_1 = "subtopology-1";
+
+ private static final String TOPIC_0 = "topic-0";
+ private static final String TOPIC_1 = "topic-1";
+
+ private static final int PARTITION_0 = 0;
+ private static final int PARTITION_1 = 1;
+
+ private final Time time = new MockTime(0);
+ private final Metrics metrics = new Metrics(time);
+
+ private StreamsMembershipManager membershipManager;
+
+ @Mock
+ private SubscriptionState subscriptionState;
+
+ @Mock
+ private StreamsRebalanceEventsProcessor streamsRebalanceEventsProcessor;
+
+ @Mock
+ private StreamsRebalanceData streamsRebalanceData;
+
+ @Mock
+ private MemberStateListener memberStateListener;
+
+ @BeforeEach
+ public void setup() {
+ membershipManager = new StreamsMembershipManager(
+ GROUP_ID,
+ streamsRebalanceEventsProcessor,
+ streamsRebalanceData,
+ subscriptionState,
+ new LogContext("test"),
+ time,
+ metrics
+ );
+ membershipManager.registerStateListener(memberStateListener);
+ verifyInStateUnsubscribed(membershipManager);
+ }
+
+ @Test
+ public void testUnexpectedErrorInHeartbeatResponse() {
+ final String errorMessage = "Nobody expects the Spanish Inquisition!";
+ final StreamsGroupHeartbeatResponseData responseData = new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())
+ .setErrorMessage(errorMessage);
+ final StreamsGroupHeartbeatResponse response = new StreamsGroupHeartbeatResponse(responseData);
+
+ final IllegalArgumentException exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> membershipManager.onHeartbeatSuccess(response)
+ );
+
+ assertEquals(
+ "Unexpected error in Heartbeat response. Expected no error, but received: "
+ + Errors.GROUP_AUTHORIZATION_FAILED.name()
+ + " with message: '" + errorMessage + "'",
+ exception.getMessage()
+ );
+ }
+
+ @Test
+ public void testActiveTasksAreNullInHeartbeatResponse() {
+ testTasksAreNullInHeartbeatResponse(null, Collections.emptyList(), Collections.emptyList());
+ }
+
+ @Test
+ public void testStandbyTasksAreNullInHeartbeatResponse() {
+ testTasksAreNullInHeartbeatResponse(Collections.emptyList(), null, Collections.emptyList());
+ }
+
+ @Test
+ public void testWarmupTasksAreNullInHeartbeatResponse() {
+ testTasksAreNullInHeartbeatResponse(Collections.emptyList(), Collections.emptyList(), null);
+ }
+
+ private void testTasksAreNullInHeartbeatResponse(final List activeTasks,
+ final List standbyTasks,
+ final List warmupTasks) {
+ joining();
+ final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(activeTasks, standbyTasks, warmupTasks);
+
+ final IllegalStateException exception = assertThrows(
+ IllegalStateException.class,
+ () -> membershipManager.onHeartbeatSuccess(response)
+ );
+
+ assertEquals(
+ "Invalid response data, task collections must be all null or all non-null: " + response.data(),
+ exception.getMessage()
+ );
+ }
+
+ @Test
+ public void testJoining() {
+ joining();
+
+ verifyInStateJoining(membershipManager);
+ assertEquals(StreamsGroupHeartbeatRequest.JOIN_GROUP_MEMBER_EPOCH, membershipManager.memberEpoch());
+ }
+
+ @Test
+ public void testReconcilingEmptyToSingleActiveTask() {
+ setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
+ final Set activeTasks =
+ Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0));
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())))
+ .thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
+
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+
+ final Set expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+ verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingActiveTaskToDifferentActiveTask() {
+ setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ final CompletableFuture onTasksRevokedCallbackExecuted = new CompletableFuture<>();
+ final Set activeTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ final Set activeTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup, Collections.emptySet(), Collections.emptySet())))
+ .thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
+ .thenReturn(onTasksRevokedCallbackExecuted);
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())))
+ .thenReturn(onTasksAssignedCallbackExecuted);
+ when(subscriptionState.assignedPartitions())
+ .thenReturn(Collections.emptySet())
+ .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
+ joining();
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
+
+ final Set expectedPartitionsToRevoke = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
+ final Set expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
+ expectedPartitionsToRevoke,
+ expectedFullPartitionsToAssign,
+ expectedNewPartitionsToAssign
+ );
+ onTasksRevokedCallbackExecuted.complete(null);
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+ verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
+ }
+
+ @Test
+ public void testReconcilingSingleActiveTaskToAdditionalActiveTask() {
+ setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ final Set activeTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ final Set activeTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0),
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup, Collections.emptySet(), Collections.emptySet())))
+ .thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())))
+ .thenReturn(onTasksAssignedCallbackExecuted);
+ when(subscriptionState.assignedPartitions())
+ .thenReturn(Collections.emptySet())
+ .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
+ joining();
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0, PARTITION_1)));
+
+ final Set expectedFullPartitionsToAssign = Set.of(
+ new TopicPartition(TOPIC_0, PARTITION_0),
+ new TopicPartition(TOPIC_0, PARTITION_1)
+ );
+ final Set expectedNewPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+ verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingMultipleActiveTaskToSingleActiveTask() {
+ setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksRevokedCallbackExecuted = new CompletableFuture<>();
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ final Set activeTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0),
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ final Set activeTasksToRevoke = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ final Set activeTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup, Collections.emptySet(), Collections.emptySet())))
+ .thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasksToRevoke))
+ .thenReturn(onTasksRevokedCallbackExecuted);
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())))
+ .thenReturn(onTasksAssignedCallbackExecuted);
+ when(subscriptionState.assignedPartitions())
+ .thenReturn(Collections.emptySet())
+ .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0), new TopicPartition(TOPIC_0, PARTITION_1)));
+ joining();
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0, PARTITION_1)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
+
+ final Set expectedPartitionsToRevoke = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
+ final Set expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
+ final Set expectedNewPartitionsToAssign = Collections.emptySet();
+ verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
+ expectedPartitionsToRevoke,
+ expectedFullPartitionsToAssign,
+ expectedNewPartitionsToAssign
+ );
+ onTasksRevokedCallbackExecuted.complete(null);
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+ verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
+ }
+
+ @Test
+ public void testReconcilingEmptyToMultipleActiveTaskOfDifferentSubtopologies() {
+ setupStreamsAssignmentInterfaceWithTwoSubtopologies(
+ SUBTOPOLOGY_ID_0, TOPIC_0,
+ SUBTOPOLOGY_ID_1, TOPIC_1
+ );
+ final Set activeTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0),
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_1, PARTITION_0)
+ );
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())
+ )
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
+
+ reconcile(makeHeartbeatResponseWithActiveTasks(
+ SUBTOPOLOGY_ID_0, List.of(PARTITION_0),
+ SUBTOPOLOGY_ID_1, List.of(PARTITION_0))
+ );
+
+ final Set expectedFullPartitionsToAssign = Set.of(
+ new TopicPartition(TOPIC_0, PARTITION_0),
+ new TopicPartition(TOPIC_1, PARTITION_0)
+ );
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+ verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingActiveTaskToStandbyTask() {
+ setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ final CompletableFuture onTasksRevokedCallbackExecuted = new CompletableFuture<>();
+ final Set activeTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ final Set standbyTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(activeTasksSetup, Collections.emptySet(), Collections.emptySet())
+ )
+ ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
+ .thenReturn(onTasksRevokedCallbackExecuted);
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), standbyTasks, Collections.emptySet()))
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ when(subscriptionState.assignedPartitions())
+ .thenReturn(Collections.emptySet())
+ .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)))
+ .thenReturn(Collections.emptySet());
+ joining();
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+
+ reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
+
+ final Set expectedPartitionsToRevoke = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
+ final Set expectedFullPartitionsToAssign = Collections.emptySet();
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
+ expectedPartitionsToRevoke,
+ expectedFullPartitionsToAssign,
+ expectedNewPartitionsToAssign
+ );
+ onTasksRevokedCallbackExecuted.complete(null);
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+ verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
+ }
+
+ @Test
+ public void testReconcilingActiveTaskToWarmupTask() {
+ setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ final CompletableFuture onTasksRevokedCallbackExecuted = new CompletableFuture<>();
+ final Set activeTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ final Set warmupTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(activeTasksSetup, Collections.emptySet(), Collections.emptySet())
+ )
+ ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
+ .thenReturn(onTasksRevokedCallbackExecuted);
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), Collections.emptySet(), warmupTasks))
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ when(subscriptionState.assignedPartitions())
+ .thenReturn(Collections.emptySet())
+ .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)))
+ .thenReturn(Collections.emptySet());
+ joining();
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
+
+ final Set expectedPartitionsToRevoke = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
+ final Set expectedFullPartitionsToAssign = Collections.emptySet();
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
+ expectedPartitionsToRevoke,
+ expectedFullPartitionsToAssign,
+ expectedNewPartitionsToAssign
+ );
+ onTasksRevokedCallbackExecuted.complete(null);
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+ verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
+ }
+
+ @Test
+ public void testReconcilingEmptyToSingleStandbyTask() {
+ final Set standbyTasks =
+ Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0));
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), standbyTasks, Collections.emptySet())
+ )
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
+
+ reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+
+ final Set expectedFullPartitionsToAssign = Collections.emptySet();
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+ verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingStandbyTaskToDifferentStandbyTask() {
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ final Set standbyTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ final Set standbyTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), standbyTasksSetup, Collections.emptySet())
+ )
+ ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), standbyTasks, Collections.emptySet())
+ )
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
+ reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+ Mockito.reset(subscriptionState);
+ Mockito.reset(memberStateListener);
+
+ reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
+
+ final Set expectedFullPartitionsToAssign = Collections.emptySet();
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+ verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingSingleStandbyTaskToAdditionalStandbyTask() {
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ final Set standbyTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ final Set standbyTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0),
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), standbyTasksSetup, Collections.emptySet())
+ )
+ ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), standbyTasks, Collections.emptySet())
+ )
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
+ reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+ Mockito.reset(subscriptionState);
+ Mockito.reset(memberStateListener);
+
+ reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0, PARTITION_1)));
+
+ final Set expectedFullPartitionsToAssign = Collections.emptySet();
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+ verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingMultipleStandbyTaskToSingleStandbyTask() {
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ final Set standbyTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0),
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ final Set standbyTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), standbyTasksSetup, Collections.emptySet())
+ )
+ ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), standbyTasks, Collections.emptySet())
+ )
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
+ reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0, PARTITION_1)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+ Mockito.reset(subscriptionState);
+ Mockito.reset(memberStateListener);
+
+ reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
+
+ final Set expectedFullPartitionsToAssign = Collections.emptySet();
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+ verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingStandbyTaskToActiveTask() {
+ setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ final Set standbyTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ final Set activeTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), standbyTasksSetup, Collections.emptySet())
+ )
+ ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())
+ )
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ when(subscriptionState.assignedPartitions())
+ .thenReturn(Collections.emptySet())
+ .thenReturn(Collections.emptySet())
+ .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
+ joining();
+ reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
+
+ final Set expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+ verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingStandbyTaskToWarmupTask() {
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ final Set standbyTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ final Set warmupTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), standbyTasksSetup, Collections.emptySet())
+ )
+ ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), Collections.emptySet(), warmupTasks))
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
+ reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+ Mockito.reset(subscriptionState);
+ Mockito.reset(memberStateListener);
+
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
+
+ final Set expectedFullPartitionsToAssign = Collections.emptySet();
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+ verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingEmptyToSingleWarmupTask() {
+ final Set warmupTasks =
+ Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0));
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), Collections.emptySet(), warmupTasks)
+ )
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
+
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+
+ final Set expectedFullPartitionsToAssign = Collections.emptySet();
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+ verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingWarmupTaskToDifferentWarmupTask() {
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ final Set warmupTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ final Set warmupTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), Collections.emptySet(), warmupTasksSetup)
+ )
+ ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), Collections.emptySet(), warmupTasks)
+ )
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+ Mockito.reset(subscriptionState);
+ Mockito.reset(memberStateListener);
+
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
+
+ final Set expectedFullPartitionsToAssign = Collections.emptySet();
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+ verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingSingleWarmupTaskToAdditionalWarmupTask() {
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ final Set warmupTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ final Set warmupTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0),
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), Collections.emptySet(), warmupTasksSetup)
+ )
+ ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), Collections.emptySet(), warmupTasks)
+ )
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+ Mockito.reset(subscriptionState);
+ Mockito.reset(memberStateListener);
+
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0, PARTITION_1)));
+
+ final Set expectedFullPartitionsToAssign = Collections.emptySet();
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+ verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingMultipleWarmupTaskToSingleWarmupTask() {
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ final Set warmupTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0),
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ final Set warmupTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), Collections.emptySet(), warmupTasksSetup)
+ )
+ ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), Collections.emptySet(), warmupTasks)
+ )
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0, PARTITION_1)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+ Mockito.reset(subscriptionState);
+ Mockito.reset(memberStateListener);
+
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
+
+ final Set expectedFullPartitionsToAssign = Collections.emptySet();
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+ verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingWarmupTaskToActiveTask() {
+ setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ final Set warmupTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ final Set activeTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), Collections.emptySet(), warmupTasksSetup)
+ )
+ ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())
+ )
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ when(subscriptionState.assignedPartitions())
+ .thenReturn(Collections.emptySet())
+ .thenReturn(Collections.emptySet())
+ .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_1)));
+ joining();
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
+
+ final Set expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+ verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingWarmupTaskToStandbyTask() {
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ final Set warmupTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ final Set standbyTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), Collections.emptySet(), warmupTasksSetup)
+ )
+ ).thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(
+ streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(Collections.emptySet(), standbyTasks, Collections.emptySet())
+ )
+ ).thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
+ reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+ Mockito.reset(subscriptionState);
+ Mockito.reset(memberStateListener);
+
+ reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
+
+ final Set expectedFullPartitionsToAssign = Collections.emptySet();
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ onTasksAssignedCallbackExecuted.complete(null);
+ verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
+ verifyThatNoTasksHaveBeenRevoked();
+ }
+
+ @Test
+ public void testReconcilingAndAssignmentCallbackFails() {
+ setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
+ final Set activeTasks =
+ Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0));
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())))
+ .thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
+
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+
+ final Set expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+
+ onTasksAssignedCallbackExecuted.completeExceptionally(new RuntimeException("KABOOM!"));
+
+ verifyInStateReconciling(membershipManager);
+ verify(subscriptionState, never()).enablePartitionsAwaitingCallback(any());
+ }
+
+ @Test
+ public void testReconcilingAndRevocationCallbackFails() {
+ setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksRevokedCallbackExecuted = new CompletableFuture<>();
+ final Set activeTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ final Set activeTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup, Collections.emptySet(), Collections.emptySet())))
+ .thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
+ .thenReturn(onTasksRevokedCallbackExecuted);
+ when(subscriptionState.assignedPartitions())
+ .thenReturn(Collections.emptySet())
+ .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
+ joining();
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
+
+ final Set partitionsToAssignAtSetup = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
+ final Set expectedPartitionsToRevoke = partitionsToAssignAtSetup;
+ final Set expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
+ expectedPartitionsToRevoke,
+ expectedFullPartitionsToAssign,
+ expectedNewPartitionsToAssign
+ );
+
+ onTasksRevokedCallbackExecuted.completeExceptionally(new RuntimeException("KABOOM!"));
+
+ verify(subscriptionState).markPendingRevocation(expectedPartitionsToRevoke);
+ verify(subscriptionState, never()).assignFromSubscribedAwaitingCallback(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ verify(memberStateListener, never()).onGroupAssignmentUpdated(expectedFullPartitionsToAssign);
+ verify(subscriptionState, never())
+ .enablePartitionsAwaitingCallback(argThat(a -> !a.equals(partitionsToAssignAtSetup)));
+ verifyInStateReconciling(membershipManager);
+ verify(streamsRebalanceEventsProcessor, never()).requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())
+ );
+ verifyInStateReconciling(membershipManager);
+ }
+
+ @Test
+ public void testReconcilingWhenReconciliationAbortedBeforeAssignmentDueToRejoin() {
+ setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksRevokedCallbackExecuted = new CompletableFuture<>();
+ final CompletableFuture onAllTasksLostCallbackExecuted = new CompletableFuture<>();
+ final Set activeTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ final Set activeTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup, Collections.emptySet(), Collections.emptySet())))
+ .thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
+ .thenReturn(onTasksRevokedCallbackExecuted);
+ when(streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation())
+ .thenReturn(onAllTasksLostCallbackExecuted);
+ when(subscriptionState.assignedPartitions())
+ .thenReturn(Collections.emptySet())
+ .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
+ joining();
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
+ final Set partitionsToAssignAtSetup = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
+ final Set expectedPartitionsToRevoke = partitionsToAssignAtSetup;
+ final Set expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
+ expectedPartitionsToRevoke,
+ expectedFullPartitionsToAssign,
+ expectedNewPartitionsToAssign
+ );
+ membershipManager.onPollTimerExpired();
+ membershipManager.onHeartbeatRequestGenerated();
+ onAllTasksLostCallbackExecuted.complete(null);
+ membershipManager.maybeRejoinStaleMember();
+
+ onTasksRevokedCallbackExecuted.complete(null);
+
+ verify(subscriptionState, never()).assignFromSubscribedAwaitingCallback(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ verify(memberStateListener, never()).onGroupAssignmentUpdated(expectedFullPartitionsToAssign);
+ verify(subscriptionState, never())
+ .enablePartitionsAwaitingCallback(argThat(a -> !a.equals(partitionsToAssignAtSetup)));
+ verify(streamsRebalanceEventsProcessor, never()).requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())
+ );
+ verifyInStateJoining(membershipManager);
+ }
+
+ @Test
+ public void testReconcilingWhenReconciliationAbortedBeforeAssignmentDueToNotInReconciling() {
+ setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksRevokedCallbackExecuted = new CompletableFuture<>();
+ final CompletableFuture onAllTasksLostCallbackExecuted = new CompletableFuture<>();
+ final Set activeTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ final Set activeTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup, Collections.emptySet(), Collections.emptySet())))
+ .thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
+ .thenReturn(onTasksRevokedCallbackExecuted);
+ when(streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation())
+ .thenReturn(onAllTasksLostCallbackExecuted);
+ when(subscriptionState.assignedPartitions())
+ .thenReturn(Collections.emptySet())
+ .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
+ joining();
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
+ final Set partitionsToAssignAtSetup = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
+ final Set expectedPartitionsToRevoke = partitionsToAssignAtSetup;
+ final Set expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
+ expectedPartitionsToRevoke,
+ expectedFullPartitionsToAssign,
+ expectedNewPartitionsToAssign
+ );
+ membershipManager.transitionToFatal();
+ onAllTasksLostCallbackExecuted.complete(null);
+
+ onTasksRevokedCallbackExecuted.complete(null);
+
+ verify(subscriptionState, never()).assignFromSubscribedAwaitingCallback(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
+ verify(memberStateListener, never()).onGroupAssignmentUpdated(expectedFullPartitionsToAssign);
+ verify(subscriptionState, never())
+ .enablePartitionsAwaitingCallback(argThat(a -> !a.equals(partitionsToAssignAtSetup)));
+ verify(streamsRebalanceEventsProcessor, never()).requestOnTasksAssignedCallbackInvocation(
+ makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())
+ );
+ verifyInStateFatal(membershipManager);
+ }
+
+ @Test
+ public void testReconcilingWhenReconciliationAbortedAfterAssignmentDueToRejoin() {
+ setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ final CompletableFuture onTasksRevokedCallbackExecuted = new CompletableFuture<>();
+ final CompletableFuture onAllTasksLostCallbackExecuted = new CompletableFuture<>();
+ final Set activeTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ final Set activeTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup, Collections.emptySet(), Collections.emptySet())))
+ .thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())))
+ .thenReturn(onTasksAssignedCallbackExecuted);
+ when(streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
+ .thenReturn(onTasksRevokedCallbackExecuted);
+ when(streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation())
+ .thenReturn(onAllTasksLostCallbackExecuted);
+ when(subscriptionState.assignedPartitions())
+ .thenReturn(Collections.emptySet())
+ .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
+ joining();
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
+ final Set partitionsToAssignAtSetup = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
+ final Set expectedPartitionsToRevoke = partitionsToAssignAtSetup;
+ final Set expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
+ expectedPartitionsToRevoke,
+ expectedFullPartitionsToAssign,
+ expectedNewPartitionsToAssign
+ );
+ onTasksRevokedCallbackExecuted.complete(null);
+ membershipManager.onPollTimerExpired();
+ membershipManager.onHeartbeatRequestGenerated();
+ onAllTasksLostCallbackExecuted.complete(null);
+ membershipManager.maybeRejoinStaleMember();
+
+ onTasksAssignedCallbackExecuted.complete(null);
+
+ assertNotEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
+ }
+
+ @Test
+ public void testReconcilingWhenReconciliationAbortedAfterAssignmentDueToNotInReconciling() {
+ setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ final CompletableFuture onTasksRevokedCallbackExecuted = new CompletableFuture<>();
+ final CompletableFuture onAllTasksLostCallbackExecuted = new CompletableFuture<>();
+ final Set activeTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ final Set activeTasks = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_1)
+ );
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup, Collections.emptySet(), Collections.emptySet())))
+ .thenReturn(onTasksAssignedCallbackExecutedSetup);
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())))
+ .thenReturn(onTasksAssignedCallbackExecuted);
+ when(streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
+ .thenReturn(onTasksRevokedCallbackExecuted);
+ when(streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation())
+ .thenReturn(onAllTasksLostCallbackExecuted);
+ when(subscriptionState.assignedPartitions())
+ .thenReturn(Collections.emptySet())
+ .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
+ joining();
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_1)));
+ final Set partitionsToAssignAtSetup = Set.of(new TopicPartition(TOPIC_0, PARTITION_0));
+ final Set expectedPartitionsToRevoke = partitionsToAssignAtSetup;
+ final Set expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, PARTITION_1));
+ final Set expectedNewPartitionsToAssign = expectedFullPartitionsToAssign;
+ verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(
+ expectedPartitionsToRevoke,
+ expectedFullPartitionsToAssign,
+ expectedNewPartitionsToAssign
+ );
+ onTasksRevokedCallbackExecuted.complete(null);
+ membershipManager.transitionToFatal();
+ onAllTasksLostCallbackExecuted.complete(null);
+
+ onTasksAssignedCallbackExecuted.complete(null);
+
+ assertNotEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
+ }
+
+ @Test
+ public void testLeaveGroupWhenNotInGroup() {
+ testLeaveGroupWhenNotInGroup(membershipManager::leaveGroup);
+ }
+
+ @Test
+ public void testLeaveGroupOnCloseWhenNotInGroup() {
+ testLeaveGroupWhenNotInGroup(membershipManager::leaveGroupOnClose);
+ }
+
+ private void testLeaveGroupWhenNotInGroup(final Supplier> leaveGroup) {
+ final CompletableFuture future = leaveGroup.get();
+
+ assertFalse(membershipManager.isLeavingGroup());
+ assertTrue(future.isDone());
+ assertFalse(future.isCancelled());
+ assertFalse(future.isCompletedExceptionally());
+ verify(subscriptionState).unsubscribe();
+ verify(memberStateListener).onGroupAssignmentUpdated(Collections.emptySet());
+ verifyInStateUnsubscribed(membershipManager);
+ }
+
+ @Test
+ public void testLeaveGroupWhenNotInGroupAndFenced() {
+ testLeaveGroupOnCloseWhenNotInGroupAndFenced(membershipManager::leaveGroup);
+ }
+
+ @Test
+ public void testLeaveGroupOnCloseWhenNotInGroupAndFenced() {
+ testLeaveGroupOnCloseWhenNotInGroupAndFenced(membershipManager::leaveGroupOnClose);
+ }
+
+ private void testLeaveGroupOnCloseWhenNotInGroupAndFenced(final Supplier> leaveGroup) {
+ final CompletableFuture onAllTasksLostCallbackExecuted = new CompletableFuture<>();
+ when(streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation())
+ .thenReturn(onAllTasksLostCallbackExecuted);
+ joining();
+ fenced();
+ final CompletableFuture future = leaveGroup.get();
+
+ assertFalse(membershipManager.isLeavingGroup());
+ assertTrue(future.isDone());
+ assertFalse(future.isCancelled());
+ assertFalse(future.isCompletedExceptionally());
+ verify(subscriptionState).unsubscribe();
+ verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+ verify(memberStateListener, times(2)).onGroupAssignmentUpdated(Collections.emptySet());
+ verifyInStateUnsubscribed(membershipManager);
+ }
+
+ @Test
+ public void testLeaveGroupWhenInGroupWithAssignment() {
+ setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
+ final Set activeTasks =
+ Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0));
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())))
+ .thenReturn(onTasksAssignedCallbackExecutedSetup);
+ final CompletableFuture onTasksRevokedCallbackExecuted = new CompletableFuture<>();
+ when(streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasks))
+ .thenReturn(onTasksRevokedCallbackExecuted);
+ joining();
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+
+ final CompletableFuture onGroupLeft = membershipManager.leaveGroup();
+
+ assertFalse(onGroupLeft.isDone());
+ verify(subscriptionState, never()).unsubscribe();
+ verifyInStatePrepareLeaving(membershipManager);
+ final CompletableFuture onGroupLeftBeforeRevocationCallback = membershipManager.leaveGroup();
+ assertEquals(onGroupLeft, onGroupLeftBeforeRevocationCallback);
+ final CompletableFuture onGroupLeftOnCloseBeforeRevocationCallback = membershipManager.leaveGroupOnClose();
+ assertEquals(onGroupLeft, onGroupLeftOnCloseBeforeRevocationCallback);
+ onTasksRevokedCallbackExecuted.complete(null);
+ verify(memberStateListener).onGroupAssignmentUpdated(Collections.emptySet());
+ verify(subscriptionState).unsubscribe();
+ assertFalse(onGroupLeft.isDone());
+ verifyInStateLeaving(membershipManager);
+ final CompletableFuture onGroupLeftAfterRevocationCallback = membershipManager.leaveGroup();
+ assertEquals(onGroupLeft, onGroupLeftAfterRevocationCallback);
+ membershipManager.onHeartbeatRequestGenerated();
+ verifyInStateUnsubscribed(membershipManager);
+ membershipManager.onHeartbeatSuccess(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0), MEMBER_EPOCH + 1));
+ assertTrue(onGroupLeft.isDone());
+ assertFalse(onGroupLeft.isCompletedExceptionally());
+ verify(memberStateListener, never()).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH + 1), membershipManager.memberId());
+ }
+
+ @Test
+ public void testLeaveGroupOnCloseWhenInGroupWithAssignment() {
+ setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
+ final Set activeTasks =
+ Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0));
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasks, Collections.emptySet(), Collections.emptySet())))
+ .thenReturn(onTasksAssignedCallbackExecutedSetup);
+ joining();
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+
+ final CompletableFuture onGroupLeft = membershipManager.leaveGroupOnClose();
+
+ assertFalse(onGroupLeft.isDone());
+ verifyInStateLeaving(membershipManager);
+ verify(subscriptionState).unsubscribe();
+ verify(memberStateListener).onGroupAssignmentUpdated(Collections.emptySet());
+ verify(streamsRebalanceEventsProcessor, never()).requestOnTasksRevokedCallbackInvocation(any());
+ final CompletableFuture onGroupLeftBeforeHeartbeatRequestGenerated = membershipManager.leaveGroup();
+ assertEquals(onGroupLeft, onGroupLeftBeforeHeartbeatRequestGenerated);
+ final CompletableFuture onGroupLeftOnCloseBeforeHeartbeatRequestGenerated = membershipManager.leaveGroupOnClose();
+ assertEquals(onGroupLeft, onGroupLeftOnCloseBeforeHeartbeatRequestGenerated);
+ assertFalse(onGroupLeft.isDone());
+ membershipManager.onHeartbeatRequestGenerated();
+ verifyInStateUnsubscribed(membershipManager);
+ membershipManager.onHeartbeatSuccess(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0), MEMBER_EPOCH + 1));
+ assertTrue(onGroupLeft.isDone());
+ assertFalse(onGroupLeft.isCompletedExceptionally());
+ verify(memberStateListener, never()).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH + 1), membershipManager.memberId());
+ }
+
+ @Test
+ public void testOnHeartbeatRequestSkippedWhenInLeaving() {
+ setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, "topic");
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final Set activeTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup, Collections.emptySet(), Collections.emptySet())))
+ .thenReturn(onTasksAssignedCallbackExecutedSetup);
+ final CompletableFuture onAllTasksRevokedCallbackExecuted = new CompletableFuture<>();
+ when(streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
+ .thenReturn(onAllTasksRevokedCallbackExecuted);
+ joining();
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+ CompletableFuture future = leaving(onAllTasksRevokedCallbackExecuted);
+
+ membershipManager.onHeartbeatRequestSkipped();
+
+ verifyInStateUnsubscribed(membershipManager);
+ assertTrue(future.isDone());
+ assertFalse(future.isCancelled());
+ assertFalse(future.isCompletedExceptionally());
+ }
+
+ @Test
+ public void testOnHeartbeatSuccessWhenInLeaving() {
+ setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, "topic");
+ final CompletableFuture onTasksAssignedCallbackExecutedSetup = new CompletableFuture<>();
+ final Set activeTasksSetup = Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0)
+ );
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(activeTasksSetup, Collections.emptySet(), Collections.emptySet())))
+ .thenReturn(onTasksAssignedCallbackExecutedSetup);
+ final CompletableFuture onTasksRevokedCallbackExecuted = new CompletableFuture<>();
+ when(streamsRebalanceEventsProcessor.requestOnTasksRevokedCallbackInvocation(activeTasksSetup))
+ .thenReturn(onTasksRevokedCallbackExecuted);
+ joining();
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
+ acknowledging(onTasksAssignedCallbackExecutedSetup);
+ CompletableFuture future = leaving(onTasksRevokedCallbackExecuted);
+
+ membershipManager.onHeartbeatSuccess(makeHeartbeatResponseWithActiveTasks(
+ SUBTOPOLOGY_ID_0, List.of(PARTITION_0),
+ membershipManager.memberEpoch() + 1
+ ));
+
+ verifyInStateLeaving(membershipManager);
+ assertFalse(future.isDone());
+ assertFalse(future.isCancelled());
+ assertFalse(future.isCompletedExceptionally());
+ verify(memberStateListener, never()).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH + 1), membershipManager.memberId());
+ }
+
+ @Test
+ public void testOnHeartbeatSuccessWhenInUnsubscribeLeaveNotInProgress() {
+ membershipManager.onHeartbeatSuccess(makeHeartbeatResponseWithActiveTasks(
+ SUBTOPOLOGY_ID_0, List.of(PARTITION_0),
+ MEMBER_EPOCH
+ ));
+
+ verify(memberStateListener, never()).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH), membershipManager.memberId());
+ }
+
+ @Test
+ public void testOnHeartbeatSuccessWhenInFenced() {
+ final CompletableFuture onAllTasksLostCallbackExecuted = new CompletableFuture<>();
+ when(streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation())
+ .thenReturn(onAllTasksLostCallbackExecuted);
+ joining();
+ fenced();
+
+ membershipManager.onHeartbeatSuccess(makeHeartbeatResponseWithActiveTasks(
+ SUBTOPOLOGY_ID_0, List.of(PARTITION_0),
+ MEMBER_EPOCH
+ ));
+
+ verify(memberStateListener, never()).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH), membershipManager.memberId());
+ }
+
+ @Test
+ public void testOnHeartbeatSuccessWhenInFatal() {
+ membershipManager.transitionToFatal();
+
+ membershipManager.onHeartbeatSuccess(makeHeartbeatResponseWithActiveTasks(
+ SUBTOPOLOGY_ID_0, List.of(PARTITION_0),
+ MEMBER_EPOCH
+ ));
+
+ verify(memberStateListener, never()).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH), membershipManager.memberId());
+ }
+
+ @Test
+ public void testOnHeartbeatSuccessWhenInStale() {
+ final CompletableFuture onAllTasksLostCallbackExecuted = new CompletableFuture<>();
+ when(streamsRebalanceEventsProcessor.requestOnAllTasksLostCallbackInvocation())
+ .thenReturn(onAllTasksLostCallbackExecuted);
+ joining();
+ membershipManager.onPollTimerExpired();
+ membershipManager.onHeartbeatRequestGenerated();
+
+ membershipManager.onHeartbeatSuccess(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0), MEMBER_EPOCH + 1));
+
+ verify(memberStateListener, never()).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH + 1), membershipManager.memberId());
+ }
+
+ @Test
+ public void testOnHeartbeatSuccessWhenInReconciling() {
+ final CompletableFuture onTasksAssignedCallbackExecuted = new CompletableFuture<>();
+ when(streamsRebalanceEventsProcessor.requestOnTasksAssignedCallbackInvocation(makeTaskAssignment(Set.of(), Set.of(), Set.of())))
+ .thenReturn(onTasksAssignedCallbackExecuted);
+ joining();
+ reconcile(makeHeartbeatResponseWithActiveTasks(List.of(), MEMBER_EPOCH));
+ onTasksAssignedCallbackExecuted.complete(null);
+ membershipManager.onHeartbeatRequestGenerated();
+
+ membershipManager.onHeartbeatSuccess(makeHeartbeatResponseWithActiveTasks(List.of(), MEMBER_EPOCH));
+
+ verify(memberStateListener).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH), membershipManager.memberId());
+ verifyInStateStable(membershipManager);
+ }
+
+ @Test
+ public void testOnPollTimerExpired() {
+ setupStreamsAssignmentInterfaceWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
+ final Set