Skip to content

Commit 5b2a196

Browse files
committed
Include feedback
1 parent 050eb0b commit 5b2a196

File tree

2 files changed

+310
-155
lines changed

2 files changed

+310
-155
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java

+6-9
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,14 @@
5454
* <ul>
5555
* <li>Keeping member state</li>
5656
* <li>Keeping assignment for the member</li>
57-
* <li>Reconciling assignment, for example if tasks need to be revoked before other tasks can be assigned</li>
57+
* <li>Reconciling assignment, for example, if tasks need to be revoked before other tasks can be assigned</li>
5858
* <li>Calling the assignment and revocation callbacks on the Streams client</li>
5959
* </ul>
6060
*/
6161
public class StreamsMembershipManager implements RequestManager {
6262

6363
/**
64-
* A data structure to represent the current task assignment, and current target task assignment of a member in a
64+
* A data structure to represent the current task assignment, and target task assignment of a member in a
6565
* streams group.
6666
* <p/>
6767
* Besides the assigned tasks, it contains a local epoch that is bumped whenever the assignment changes, to ensure
@@ -590,7 +590,7 @@ public boolean shouldHeartbeatNow() {
590590
* If the member is already part of the group, this will only ensure that the updated subscription
591591
* is included in the next heartbeat request.
592592
* <p/>
593-
* Note that list of topics of the subscription is taken from the shared subscription state.
593+
* Note that the list of topics in the subscription is taken from the shared subscription state.
594594
*/
595595
public void onSubscriptionUpdated() {
596596
subscriptionUpdated.compareAndSet(false, true);
@@ -875,12 +875,12 @@ private CompletableFuture<Void> leaveGroup(final boolean isOnClose) {
875875
transitionTo(MemberState.PREPARE_LEAVING);
876876
CompletableFuture<Void> onGroupLeft = new CompletableFuture<>();
877877
leaveGroupInProgress = Optional.of(onGroupLeft);
878-
if (!isOnClose) {
878+
if (isOnClose) {
879+
leaving();
880+
} else {
879881
CompletableFuture<Void> onAllActiveTasksReleasedCallbackExecuted = releaseActiveTasks();
880882
onAllActiveTasksReleasedCallbackExecuted
881883
.whenComplete((__, callbackError) -> leavingAfterReleasingActiveTasks(callbackError));
882-
} else {
883-
leaving();
884884
}
885885

886886
return onGroupLeft;
@@ -933,9 +933,6 @@ private void processAssignmentReceived(Map<String, SortedSet<Integer>> activeTas
933933
log.debug("Target assignment {} received from the broker is equals to the member " +
934934
"current assignment {}. Nothing to reconcile.",
935935
targetAssignment, currentAssignment);
936-
// Make sure we transition the member back to STABLE if it was RECONCILING (ex.
937-
// member was RECONCILING unresolved assignments that were just removed by the
938-
// broker), or JOINING (member joining received empty assignment).
939936
if (state == MemberState.RECONCILING || state == MemberState.JOINING) {
940937
transitionTo(MemberState.STABLE);
941938
}

0 commit comments

Comments
 (0)