-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Add Kafka shared consumer container support #3945
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Kafka shared consumer container support #3945
Conversation
sobychacko
commented
Jun 5, 2025
- New AbstractShareKafkaMessageListenerContainer base class with lifecycle management
- ShareKafkaMessageListenerContainer implementation for share consumer protocol
- Integration tests for end-to-end message delivery validation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have too many concerns.
Just regular review.
Thanks
*/ | ||
public static final int DEFAULT_PHASE = Integer.MAX_VALUE - 100; | ||
|
||
@NonNull |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are missing a package-info.java
file over here with that our @org.jspecify.annotations.NullMarked
to avoid this annotation explicitly.
...main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java
Show resolved
Hide resolved
...fka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java
Show resolved
Hide resolved
|
||
ShareListenerConsumer(GenericMessageListener<?> listener) { | ||
this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer( | ||
ShareKafkaMessageListenerContainer.this.getGroupId(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just nit-pick: I prefer to have a blank line after method subject if it is multi-line.
Otherwise it is hard to read the method body.
Well, just my humble opinion.
this.clientId = ShareKafkaMessageListenerContainer.this.getClientId(); | ||
// Subscribe to topics, just like in the test | ||
ContainerProperties containerProperties = getContainerProperties(); | ||
this.consumer.subscribe(java.util.Arrays.asList(containerProperties.getTopics())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why fully-qualified class name?
Why not import for this java.util.Arrays
?
Throwable exitThrowable = null; | ||
while (isRunning()) { | ||
try { | ||
var records = this.consumer.poll(java.time.Duration.ofMillis(1000)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the poll duration could be a property of this class.
...fka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java
Show resolved
Hide resolved
new ConfigResource(ConfigResource.Type.GROUP, groupId), List.of(op)); | ||
try (Admin admin = AdminClient.create(adminProperties)) { | ||
admin.incrementalAlterConfigs(configs).all().get(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit awkward approach to just set that share.auto.offset.reset
property.
Cannot it be done via our brokerProperties
on the @EmbeddedKafka
?
How this suppose to happen in the target applications?
It feels like changing the broker state from the application is not OK.
Especially when we have several instances looking into the same Kafka cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in the current release of Kafka (4.0.0), this is how it is done, i.e. we need to set it at the groupId. I don't think we can set it via broker properties in EmbeddedKafka
. Maybe that will change once this feature is stabilized and production ready in 4.1. See this for details: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255070434#KIP932:QueuesforKafka-Groupconfiguration
I guess time to deprecate those Reactor Kafka components in |
Ok, i will send a PR soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
> Could not resolve all files for configuration ':spring-kafka:compileClasspath'.
> Could not find io.projectreactor.kafka:reactor-kafka:.
Required by:
project :spring-kafka
I guess this PR has to be rebased to the latest main
.
Thanks
- New AbstractShareKafkaMessageListenerContainer base class with lifecycle management - ShareKafkaMessageListenerContainer implementation for share consumer protocol - Integration tests for end-to-end message delivery validation Signed-off-by: Soby Chacko <[email protected]>
619d89d
to
4c1d5e1
Compare