Skip to content

Commit 057c8c5

Browse files
committed
Return ConsumerStoppedEvent with Abnormal reason when MLC stopped abnormally
Signed-off-by: AlamuriLokesh <[email protected]>
1 parent ec154a3 commit 057c8c5

File tree

3 files changed

+28
-2
lines changed

3 files changed

+28
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerStoppedEvent.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -43,6 +43,11 @@ public enum Reason {
4343
*/
4444
NORMAL,
4545

46+
/**
47+
* The consumer was stopped because the container was stopped abnormally.
48+
*/
49+
ABNORMAL,
50+
4651
/**
4752
* The transactional producer was fenced and the container
4853
* {@code stopContainerWhenFenced} property is true.

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,9 @@ else if (throwable instanceof AuthenticationException || throwable instanceof Au
537537
else if (throwable instanceof NoOffsetForPartitionException) {
538538
reason = Reason.NO_OFFSET;
539539
}
540+
else if (!this.isStoppedNormally()) {
541+
reason = Reason.ABNORMAL;
542+
}
540543
else {
541544
reason = Reason.NORMAL;
542545
}

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1090,11 +1090,29 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
10901090
assertThat(container.getContainers()).
10911091
doesNotContain(childContainer1);
10921092

1093-
container.getContainers().forEach(containerForEach -> containerForEach.stop());
1093+
KafkaMessageListenerContainer<Integer, String> childContainer0SecRun = container.getContainers().get(0);
1094+
KafkaMessageListenerContainer<Integer, String> childContainer1SecRun = container.getContainers().get(1);
1095+
1096+
childContainer0SecRun.stopAbnormally(() -> {
1097+
});
1098+
1099+
childContainer1SecRun.stop();
1100+
10941101
assertThat(container.getContainers()).isNotEmpty();
10951102
container.stop();
10961103
assertThat(concurrentContainerSecondStopLatch.await(30, TimeUnit.SECONDS)).isTrue();
10971104

1105+
events.stream().forEach(event -> {
1106+
if (event.getContainer(MessageListenerContainer.class).equals(childContainer0SecRun)
1107+
&& event instanceof ConsumerStoppedEvent) {
1108+
assertThat(((ConsumerStoppedEvent) event).getReason()).isEqualTo(ConsumerStoppedEvent.Reason.ABNORMAL);
1109+
}
1110+
else if (event.getContainer(MessageListenerContainer.class).equals(childContainer1SecRun)
1111+
&& event instanceof ConsumerStoppedEvent) {
1112+
assertThat(((ConsumerStoppedEvent) event).getReason()).isEqualTo(ConsumerStoppedEvent.Reason.NORMAL);
1113+
}
1114+
});
1115+
10981116
this.logger.info("Stop containerStartStop");
10991117
}
11001118

0 commit comments

Comments
 (0)