From e14e6c256115ed276b958cd963023570940e10a1 Mon Sep 17 00:00:00 2001 From: Alamuri Lokesh Date: Fri, 6 Jun 2025 21:04:53 +0530 Subject: [PATCH] Return ConsumerStoppedEvent with Abnormal reason when MLC stopped abnormally Signed-off-by: Alamuri Lokesh --- .../modules/ROOT/pages/kafka/events.adoc | 1 + .../kafka/event/ConsumerStoppedEvent.java | 10 +++++++++- .../KafkaMessageListenerContainer.java | 3 +++ ...ncurrentMessageListenerContainerTests.java | 20 ++++++++++++++++++- 4 files changed, 32 insertions(+), 2 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/events.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/events.adoc index a6249286bd..81ce8244a4 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/events.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/events.adoc @@ -102,6 +102,7 @@ In addition, the `ConsumerStoppedEvent` has the following additional property: * `reason`: ** `NORMAL` - the consumer stopped normally (container was stopped). +** `ABNORMAL` - the consumer stopped abnormally (container was stopped abnormally). ** `ERROR` - a `java.lang.Error` was thrown. ** `FENCED` - the transactional producer was fenced and the `stopContainerWhenFenced` container property is `true`. ** `AUTH` - an `AuthenticationException` or `AuthorizationException` was thrown and the `authExceptionRetryInterval` is not configured. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerStoppedEvent.java b/spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerStoppedEvent.java index df425afd9c..ddb0994206 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerStoppedEvent.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerStoppedEvent.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ * to restart a container that was stopped because a transactional producer was fenced. * * @author Gary Russell + * @author Lokesh Alamuri * @since 2.2 * */ @@ -43,6 +44,13 @@ public enum Reason { */ NORMAL, + /** + * The consumer was stopped because the container was stopped abnormally. + * @since 4.0 + * + */ + ABNORMAL, + /** * The transactional producer was fenced and the container * {@code stopContainerWhenFenced} property is true. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 8362cc4144..11725ca758 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -537,6 +537,9 @@ else if (throwable instanceof AuthenticationException || throwable instanceof Au else if (throwable instanceof NoOffsetForPartitionException) { reason = Reason.NO_OFFSET; } + else if (!this.isStoppedNormally()) { + reason = Reason.ABNORMAL; + } else { reason = Reason.NORMAL; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index e1fe24478f..4b47a51f25 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -1088,11 +1088,29 @@ protected Consumer createKafkaConsumer(String groupId, String c assertThat(container.getContainers()). doesNotContain(childContainer1); - container.getContainers().forEach(containerForEach -> containerForEach.stop()); + KafkaMessageListenerContainer childContainer0SecRun = container.getContainers().get(0); + KafkaMessageListenerContainer childContainer1SecRun = container.getContainers().get(1); + + childContainer0SecRun.stopAbnormally(() -> { + }); + + childContainer1SecRun.stop(); + assertThat(container.getContainers()).isNotEmpty(); container.stop(); assertThat(concurrentContainerSecondStopLatch.await(30, TimeUnit.SECONDS)).isTrue(); + events.stream().forEach(event -> { + if (event.getContainer(MessageListenerContainer.class).equals(childContainer0SecRun) + && event instanceof ConsumerStoppedEvent) { + assertThat(((ConsumerStoppedEvent) event).getReason()).isEqualTo(ConsumerStoppedEvent.Reason.ABNORMAL); + } + else if (event.getContainer(MessageListenerContainer.class).equals(childContainer1SecRun) + && event instanceof ConsumerStoppedEvent) { + assertThat(((ConsumerStoppedEvent) event).getReason()).isEqualTo(ConsumerStoppedEvent.Reason.NORMAL); + } + }); + this.logger.info("Stop containerStartStop"); }