-
Notifications
You must be signed in to change notification settings - Fork 1.6k
support per-record observations in batch listeners #3944
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
support per-record observations in batch listeners #3944
Conversation
df42ec9
to
351e874
Compare
document recordObservationsInBatch container property describe new option in the change history add integration test for per-record observations Signed-off-by: Igor Macedo Quintanilha <[email protected]>
351e874
to
46a21b6
Compare
@igormq Thank you for the PR. We will do a detailed review soon. But, please add your name as an author to the classes you modified. Also, make the modifications in the ref docs directly and then add a short sentence about it in the |
spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java
Outdated
Show resolved
Hide resolved
spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for looking into this!
Not easy task to tackle.
Hope you'll find my review as reasonable.
spring-kafka-docs/src/main/antora/modules/ROOT/pages/appendix/change-history.adoc
Outdated
Show resolved
Hide resolved
...ng-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
Outdated
Show resolved
Hide resolved
this.observationRegistry); | ||
observation.start(); | ||
observations.add(observation); | ||
scopes.add(observation.openScope()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this approach is OK.
Every next scope would override whatever was there in the ThreadLocal
.
Therefore there would be only one last observation available downstream for the service method.
So, that info would be misleading and dangerous.
My proposal was just observe()
every record here to finish their traces.
Nothing more suspicious and confusing.
The logic you propose is something what end-user would do in their service method for the batch.
And since we cannot predict what's the goal, therefore we don't provide any observation handling for batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this approach is OK. Every next scope would override whatever was there in the
ThreadLocal
. Therefore there would be only one last observation available downstream for the service method. So, that info would be misleading and dangerous. My proposal was justobserve()
every record here to finish their traces. Nothing more suspicious and confusing.The logic you propose is something what end-user would do in their service method for the batch. And since we cannot predict what's the goal, therefore we don't provide any observation handling for batches.
Don't know if i understood everything correctly, but did all the changes that i thought were pertinent!
catch (RuntimeException e) { | ||
this.batchFailed = true; | ||
failureTimer(sample, null, e); | ||
batchInterceptAfter(records, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that we need to handle errors and timer here.
That simply can be done in the calling method as it was before.
See Observation.observe()
API to be called for each those records before we call the target listener.
We don't need to surround the target service call with any of those observations.
They are for each record, not the whole batch, so whatever you have left here still leads to confusion.
In the Observation.observe()
callback we simply can log each of those record under debug logging level.
This way, the trace and span for those records would be present in logs and reported down to the observation collector.
But again: that's my feeling how that supposed to be for the batch if we still want to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @artembilan's comments above. Explicitly starting before the listener and stopping the observations after the listener method is invoked creates semantic confusion. It gives the impression that the observations cover the listener's processing. However, they are not for the whole batch; observations are made per record. Each observation will include the listener method invocation duration, which is misleading.
In addition, managing start()
and stop()
manually, plus storing observations in a List
, adds complexity and potential memory overhead for large batches.
As Artem pointed out, we should use Observation.observe()
to create and complete each observation before the listener runs. This method handles the observation lifecycle in one shot. It starts the observation, opens any scopes, executes the callback, handles errors by marking them on the observation, stops the observation, and closes any scopes. As you can see, it automatically does a lot of heavy lifting on your behalf. Manually calling start
, stop
, etc., makes things more error-prone, so I suggest going by the observe()
approach with a callback for logging purposes, as Artem suggested above. This clarifies the semantics, as observations are completed before the listener, showing they only cover header extraction, avoiding confusion about their scope. Especially when tracing from a tool on the front-end, it will be very confusing if all the observations from the batch erroneously report the entire batch processing as part of their spans.
Also, as Artem has already pointed out, please consolidate all the error handling, timers, etc., from the calling method.
} | ||
throw e; | ||
} | ||
finally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we switch to Observe.observe()
, then there is no need for this finally
block as observe()
will implicitly do that for you.
if (!isListenerAdapterObservationAware()) { | ||
// Stop observations | ||
for (Observation observation : observations) { | ||
observation.stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to stop when using observe()
.
this.consumer); | ||
|
||
// Handle individual record tracing for batch mode if enabled | ||
if (this.containerProperties.isRecordObservationsInBatch() && this.observationEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this.observationEnabled
already covering this.containerProperties.isRecordObservationsInBatch()
?
@igormq Thank you for this PR. Implementing observability with batch listeners is not trivial, and I think you laid good foundations here. Providing this as an optional feature is very important, as we don't want to impose observability by default in all batch listeners. Please take a look at some of the feedback I provided, which largely echoes what @artembilan has already reviewed. As you make further changes, we will need to add docs and other related components, but getting the implementation right is the critical step right now. Thanks again! |
Don't know if i am doing the right approach here, but.
Related to #3872 (comment)