-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Return ConsumerStoppedEvent with Abnormal reason when MLC stopped abn… #3949
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
/* | ||
* Copyright 2018-2020 the original author or authors. | ||
* Copyright 2018-2025 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
|
@@ -24,6 +24,7 @@ | |
* to restart a container that was stopped because a transactional producer was fenced. | ||
* | ||
* @author Gary Russell | ||
* @author Lokesh Alamuri | ||
* @since 2.2 | ||
* | ||
*/ | ||
|
@@ -43,6 +44,13 @@ public enum Reason { | |
*/ | ||
NORMAL, | ||
|
||
/** | ||
* The consumer was stopped because the container was stopped abnormally. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now this makes sense to me. Please, add here |
||
* @since 4.0 | ||
* | ||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
ABNORMAL, | ||
|
||
/** | ||
* The transactional producer was fenced and the container | ||
* {@code stopContainerWhenFenced} property is true. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -537,6 +537,9 @@ else if (throwable instanceof AuthenticationException || throwable instanceof Au | |
else if (throwable instanceof NoOffsetForPartitionException) { | ||
reason = Reason.NO_OFFSET; | ||
} | ||
else if (!this.isStoppedNormally()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add your name as an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have already contributed to this file using my personal email Id. Author name is |
||
reason = Reason.ABNORMAL; | ||
} | ||
else { | ||
reason = Reason.NORMAL; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1088,11 +1088,29 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c | |
assertThat(container.getContainers()). | ||
doesNotContain(childContainer1); | ||
|
||
container.getContainers().forEach(containerForEach -> containerForEach.stop()); | ||
KafkaMessageListenerContainer<Integer, String> childContainer0SecRun = container.getContainers().get(0); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add your name as an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have already contributed to this file using my personal email Id. Author name is Lokesh Alamuri. Please let me know if some changes are required. |
||
KafkaMessageListenerContainer<Integer, String> childContainer1SecRun = container.getContainers().get(1); | ||
|
||
childContainer0SecRun.stopAbnormally(() -> { | ||
}); | ||
|
||
childContainer1SecRun.stop(); | ||
|
||
assertThat(container.getContainers()).isNotEmpty(); | ||
container.stop(); | ||
assertThat(concurrentContainerSecondStopLatch.await(30, TimeUnit.SECONDS)).isTrue(); | ||
|
||
events.stream().forEach(event -> { | ||
if (event.getContainer(MessageListenerContainer.class).equals(childContainer0SecRun) | ||
&& event instanceof ConsumerStoppedEvent) { | ||
assertThat(((ConsumerStoppedEvent) event).getReason()).isEqualTo(ConsumerStoppedEvent.Reason.ABNORMAL); | ||
} | ||
else if (event.getContainer(MessageListenerContainer.class).equals(childContainer1SecRun) | ||
&& event instanceof ConsumerStoppedEvent) { | ||
assertThat(((ConsumerStoppedEvent) event).getReason()).isEqualTo(ConsumerStoppedEvent.Reason.NORMAL); | ||
} | ||
}); | ||
|
||
this.logger.info("Stop containerStartStop"); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add your name as an
@author
.