Skip to content

Commit 295686b

Browse files
committed
Check for duplicate state listener
1 parent 8dab352 commit 295686b

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,13 @@ private boolean isNotInGroup() {
354354
* @param listener Listener to invoke.
355355
*/
356356
public void registerStateListener(MemberStateListener listener) {
357-
stateUpdatesListeners.add(Objects.requireNonNull(listener, "State updates listener cannot be null"));
357+
Objects.requireNonNull(listener, "State updates listener cannot be null");
358+
for (MemberStateListener registeredListener : stateUpdatesListeners) {
359+
if (registeredListener == listener) {
360+
throw new IllegalArgumentException("Listener is already registered.");
361+
}
362+
}
363+
stateUpdatesListeners.add(listener);
358364
}
359365

360366
/**

clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java

+22
Original file line numberDiff line numberDiff line change
@@ -1458,6 +1458,28 @@ public void testMaybeRejoinStaleMember() {
14581458
assertEquals(StreamsGroupHeartbeatRequest.JOIN_GROUP_MEMBER_EPOCH, membershipManager.memberEpoch());
14591459
}
14601460

1461+
@Test
1462+
public void testForDuplicateRegistrationOfSameStateListener() {
1463+
final MemberStateListener listener1 = new MemberStateListener() {
1464+
1465+
@Override
1466+
public void onMemberEpochUpdated(Optional<Integer> memberEpoch, String memberId) {
1467+
}
1468+
};
1469+
final MemberStateListener listener2 = new MemberStateListener() {
1470+
1471+
@Override
1472+
public void onMemberEpochUpdated(Optional<Integer> memberEpoch, String memberId) {
1473+
}
1474+
};
1475+
1476+
membershipManager.registerStateListener(listener1);
1477+
membershipManager.registerStateListener(listener2);
1478+
final Exception exception =
1479+
assertThrows(IllegalArgumentException.class, () -> membershipManager.registerStateListener(listener1));
1480+
assertEquals("Listener is already registered.", exception.getMessage());
1481+
}
1482+
14611483
private void verifyThatNoTasksHaveBeenRevoked() {
14621484
verify(streamsRebalanceEventsProcessor, never()).requestOnTasksRevokedCallbackInvocation(any());
14631485
verify(subscriptionState, never()).markPendingRevocation(any());

0 commit comments

Comments
 (0)