Skip to content

Commit e604802

Browse files
committed
propagate scope in async failures
Signed-off-by: Igor Macedo Quintanilha <[email protected]>
1 parent 6425682 commit e604802

File tree

3 files changed

+156
-15
lines changed

3 files changed

+156
-15
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -680,16 +680,16 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
680680

681681
private final @Nullable CommonErrorHandler commonErrorHandler;
682682

683-
@Deprecated(since = "3.2", forRemoval = true)
684683
@SuppressWarnings("removal")
685684
private final @Nullable PlatformTransactionManager transactionManager =
686685
this.containerProperties.getKafkaAwareTransactionManager() != null ?
687686
this.containerProperties.getKafkaAwareTransactionManager() :
688687
this.containerProperties.getTransactionManager();
689688

690689
private final @Nullable KafkaAwareTransactionManager<?, ?> kafkaTxManager =
691-
this.transactionManager instanceof KafkaAwareTransactionManager<?, ?> kafkaAwareTransactionManager ?
692-
kafkaAwareTransactionManager : null;
690+
this.transactionManager instanceof KafkaAwareTransactionManager
691+
? (KafkaAwareTransactionManager<?, ?>) this.transactionManager
692+
: null;
693693

694694
private final @Nullable TransactionTemplate transactionTemplate;
695695

@@ -1498,7 +1498,13 @@ protected void handleAsyncFailure() {
14981498
// We will give up on retrying with the remaining copied and failed Records.
14991499
for (FailedRecordTuple<K, V> copyFailedRecord : copyFailedRecords) {
15001500
try {
1501-
invokeErrorHandlerBySingleRecord(copyFailedRecord);
1501+
KafkaListenerObservation.LISTENER_OBSERVATION.observation(
1502+
getContainerProperties().getObservationConvention(),
1503+
DefaultKafkaListenerObservationConvention.INSTANCE,
1504+
() -> new KafkaRecordReceiverContext(copyFailedRecord.record(), getListenerId(),
1505+
getClientId(), this.consumerGroupId, this::clusterId),
1506+
this.observationRegistry)
1507+
.observe(() -> invokeErrorHandlerBySingleRecord(copyFailedRecord));
15021508
}
15031509
catch (Exception e) {
15041510
this.logger.warn(() ->

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -736,13 +736,15 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm
736736
"Async Fail", Objects.requireNonNull(source).getPayload()), cause));
737737
}
738738
catch (Throwable ex) {
739-
this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
740739
acknowledge(acknowledgment);
741740
if (canAsyncRetry(request, ex) && this.asyncRetryCallback != null) {
742741
@SuppressWarnings("unchecked")
743742
ConsumerRecord<K, V> record = (ConsumerRecord<K, V>) request;
744743
this.asyncRetryCallback.accept(record, (RuntimeException) ex);
745744
}
745+
else {
746+
this.logger.error(ex, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
747+
}
746748
}
747749
}
748750

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

Lines changed: 143 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,16 @@
3838
import io.micrometer.observation.Observation;
3939
import io.micrometer.observation.ObservationHandler;
4040
import io.micrometer.observation.ObservationRegistry;
41-
import io.micrometer.observation.tck.TestObservationRegistry;
4241
import io.micrometer.tracing.Span;
4342
import io.micrometer.tracing.TraceContext;
4443
import io.micrometer.tracing.Tracer;
4544
import io.micrometer.tracing.handler.DefaultTracingObservationHandler;
4645
import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler;
4746
import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler;
47+
import io.micrometer.tracing.handler.TracingAwareMeterObservationHandler;
4848
import io.micrometer.tracing.propagation.Propagator;
4949
import io.micrometer.tracing.test.simple.SimpleSpan;
50+
import io.micrometer.tracing.test.simple.SimpleTraceContext;
5051
import io.micrometer.tracing.test.simple.SimpleTracer;
5152
import org.apache.kafka.clients.admin.AdminClientConfig;
5253
import org.apache.kafka.clients.consumer.Consumer;
@@ -70,8 +71,10 @@
7071
import org.springframework.context.annotation.Configuration;
7172
import org.springframework.context.annotation.Primary;
7273
import org.springframework.kafka.KafkaException;
74+
import org.springframework.kafka.annotation.DltHandler;
7375
import org.springframework.kafka.annotation.EnableKafka;
7476
import org.springframework.kafka.annotation.KafkaListener;
77+
import org.springframework.kafka.annotation.RetryableTopic;
7578
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
7679
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
7780
import org.springframework.kafka.core.ConsumerFactory;
@@ -80,6 +83,7 @@
8083
import org.springframework.kafka.core.KafkaAdmin;
8184
import org.springframework.kafka.core.KafkaTemplate;
8285
import org.springframework.kafka.core.ProducerFactory;
86+
import org.springframework.kafka.listener.ContainerProperties;
8387
import org.springframework.kafka.listener.MessageListenerContainer;
8488
import org.springframework.kafka.listener.RecordInterceptor;
8589
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
@@ -90,6 +94,9 @@
9094
import org.springframework.kafka.test.context.EmbeddedKafka;
9195
import org.springframework.kafka.test.utils.KafkaTestUtils;
9296
import org.springframework.messaging.handler.annotation.SendTo;
97+
import org.springframework.retry.annotation.Backoff;
98+
import org.springframework.scheduling.TaskScheduler;
99+
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
93100
import org.springframework.test.annotation.DirtiesContext;
94101
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
95102
import org.springframework.util.StringUtils;
@@ -113,7 +120,8 @@
113120
@EmbeddedKafka(topics = {ObservationTests.OBSERVATION_TEST_1, ObservationTests.OBSERVATION_TEST_2,
114121
ObservationTests.OBSERVATION_TEST_3, ObservationTests.OBSERVATION_TEST_4, ObservationTests.OBSERVATION_REPLY,
115122
ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, ObservationTests.OBSERVATION_ERROR,
116-
ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE}, partitions = 1)
123+
ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE, ObservationTests.OBSERVATION_ASYNC_FAILURE_TEST,
124+
ObservationTests.OBSERVATION_ASYNC_FAILURE_WITH_RETRY_TEST}, partitions = 1)
117125
@DirtiesContext
118126
public class ObservationTests {
119127

@@ -137,6 +145,55 @@ public class ObservationTests {
137145

138146
public final static String OBSERVATION_TRACEPARENT_DUPLICATE = "observation.traceparent.duplicate";
139147

148+
public final static String OBSERVATION_ASYNC_FAILURE_TEST = "observation.async.failure.test";
149+
150+
public final static String OBSERVATION_ASYNC_FAILURE_WITH_RETRY_TEST = "observation.async.failure.retry.test";
151+
152+
@Test
153+
void asyncRetryScopePropagation(@Autowired AsyncFailureListener asyncFailureListener,
154+
@Autowired KafkaTemplate<Integer, String> template,
155+
@Autowired SimpleTracer tracer,
156+
@Autowired ObservationRegistry observationRegistry) throws InterruptedException {
157+
158+
// Clear any previous spans
159+
tracer.getSpans().clear();
160+
161+
// Create an observation scope to ensure we have a proper trace context
162+
var testObservation = Observation.createNotStarted("test.message.send", observationRegistry);
163+
164+
// Send a message within the observation scope to ensure trace context is propagated
165+
testObservation.observe(() -> {
166+
try {
167+
template.send(OBSERVATION_ASYNC_FAILURE_TEST, "trigger-async-failure").get(5, TimeUnit.SECONDS);
168+
}
169+
catch (Exception e) {
170+
throw new RuntimeException("Failed to send message", e);
171+
}
172+
});
173+
174+
// Wait for the listener to process the message (initial + retry + DLT = 3 invocations)
175+
assertThat(asyncFailureListener.asyncFailureLatch.await(100000, TimeUnit.SECONDS)).isTrue();
176+
177+
// Verify that the captured spans from the listener contexts are all part of the same trace
178+
// This demonstrates that the tracing context propagates correctly through the retry mechanism
179+
Deque<SimpleSpan> spans = tracer.getSpans();
180+
assertThat(spans).hasSizeGreaterThanOrEqualTo(4); // template + listener + retry + DLT spans
181+
182+
// Verify that spans were captured for each phase and belong to the same trace
183+
assertThat(asyncFailureListener.capturedSpanInListener).isNotNull();
184+
assertThat(asyncFailureListener.capturedSpanInRetry).isNotNull();
185+
assertThat(asyncFailureListener.capturedSpanInDlt).isNotNull();
186+
187+
// All spans should have the same trace ID, demonstrating trace continuity
188+
var originalTraceId = asyncFailureListener.capturedSpanInListener.getTraceId();
189+
assertThat(originalTraceId).isNotBlank();
190+
assertThat(asyncFailureListener.capturedSpanInRetry.getTraceId()).isEqualTo(originalTraceId);
191+
assertThat(asyncFailureListener.capturedSpanInDlt.getTraceId()).isEqualTo(originalTraceId);
192+
193+
// Clear any previous spans
194+
tracer.getSpans().clear();
195+
}
196+
140197
@Test
141198
void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template,
142199
@Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler,
@@ -628,6 +685,11 @@ ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerF
628685
if (container.getListenerId().equals("obs3")) {
629686
container.setKafkaAdmin(this.mockAdmin);
630687
}
688+
if (container.getListenerId().contains("asyncFailure")) {
689+
// Enable async acks to trigger async failure handling
690+
container.getContainerProperties().setAsyncAcks(true);
691+
container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
692+
}
631693
if (container.getListenerId().equals("obs4")) {
632694
container.setRecordInterceptor(new RecordInterceptor<>() {
633695

@@ -662,17 +724,17 @@ MeterRegistry meterRegistry() {
662724

663725
@Bean
664726
ObservationRegistry observationRegistry(Tracer tracer, Propagator propagator, MeterRegistry meterRegistry) {
665-
TestObservationRegistry observationRegistry = TestObservationRegistry.create();
727+
var observationRegistry = ObservationRegistry.create();
666728
observationRegistry.observationConfig().observationHandler(
667729
// Composite will pick the first matching handler
668730
new ObservationHandler.FirstMatchingCompositeObservationHandler(
669-
// This is responsible for creating a child span on the sender side
670-
new PropagatingSenderTracingObservationHandler<>(tracer, propagator),
671731
// This is responsible for creating a span on the receiver side
672732
new PropagatingReceiverTracingObservationHandler<>(tracer, propagator),
733+
// This is responsible for creating a child span on the sender side
734+
new PropagatingSenderTracingObservationHandler<>(tracer, propagator),
673735
// This is responsible for creating a default span
674736
new DefaultTracingObservationHandler(tracer)))
675-
.observationHandler(new DefaultMeterObservationHandler(meterRegistry));
737+
.observationHandler(new TracingAwareMeterObservationHandler<>(new DefaultMeterObservationHandler(meterRegistry), tracer));
676738
return observationRegistry;
677739
}
678740

@@ -683,29 +745,41 @@ Propagator propagator(Tracer tracer) {
683745
// List of headers required for tracing propagation
684746
@Override
685747
public List<String> fields() {
686-
return Arrays.asList("foo", "bar");
748+
return Arrays.asList("traceId", "spanId", "foo", "bar");
687749
}
688750

689751
// This is called on the producer side when the message is being sent
690-
// Normally we would pass information from tracing context - for tests we don't need to
691752
@Override
692753
public <C> void inject(TraceContext context, @Nullable C carrier, Setter<C> setter) {
693754
setter.set(carrier, "foo", "some foo value");
694755
setter.set(carrier, "bar", "some bar value");
695756

757+
setter.set(carrier, "traceId", context.traceId());
758+
setter.set(carrier, "spanId", context.spanId());
759+
696760
// Add a traceparent header to simulate W3C trace context
697761
setter.set(carrier, "traceparent", "traceparent-from-propagator");
698762
}
699763

700764
// This is called on the consumer side when the message is consumed
701-
// Normally we would use tools like Extractor from tracing but for tests we are just manually creating a span
702765
@Override
703766
public <C> Span.Builder extract(C carrier, Getter<C> getter) {
704767
String foo = getter.get(carrier, "foo");
705768
String bar = getter.get(carrier, "bar");
706-
return tracer.spanBuilder()
769+
770+
var traceId = getter.get(carrier, "traceId");
771+
var spanId = getter.get(carrier, "spanId");
772+
773+
Span.Builder spanBuilder = tracer.spanBuilder()
707774
.tag("foo", foo)
708775
.tag("bar", bar);
776+
777+
var traceContext = new SimpleTraceContext();
778+
traceContext.setTraceId(traceId);
779+
traceContext.setSpanId(spanId);
780+
spanBuilder = spanBuilder.setParent(traceContext);
781+
782+
return spanBuilder;
709783
}
710784
};
711785
}
@@ -720,6 +794,15 @@ ExceptionListener exceptionListener() {
720794
return new ExceptionListener();
721795
}
722796

797+
@Bean
798+
AsyncFailureListener asyncFailureListener(SimpleTracer tracer) {
799+
return new AsyncFailureListener(tracer);
800+
}
801+
802+
@Bean
803+
public TaskScheduler taskExecutor() {
804+
return new ThreadPoolTaskScheduler();
805+
}
723806
}
724807

725808
public static class Listener {
@@ -801,4 +884,54 @@ Mono<Void> receive1(ConsumerRecord<Object, Object> record) {
801884

802885
}
803886

887+
public static class AsyncFailureListener {
888+
889+
final CountDownLatch asyncFailureLatch = new CountDownLatch(3);
890+
891+
volatile @Nullable SimpleSpan capturedSpanInListener;
892+
893+
volatile @Nullable SimpleSpan capturedSpanInRetry;
894+
895+
volatile @Nullable SimpleSpan capturedSpanInDlt;
896+
897+
private final SimpleTracer tracer;
898+
899+
public AsyncFailureListener(SimpleTracer tracer) {
900+
this.tracer = tracer;
901+
}
902+
903+
@RetryableTopic(
904+
attempts = "2",
905+
backoff = @Backoff(delay = 1000)
906+
)
907+
@KafkaListener(id = "asyncFailure", topics = OBSERVATION_ASYNC_FAILURE_TEST)
908+
CompletableFuture<Void> handleAsync(ConsumerRecord<Integer, String> record) {
909+
910+
// Use topic name to distinguish between original and retry calls
911+
String topicName = record.topic();
912+
913+
if (topicName.equals(OBSERVATION_ASYNC_FAILURE_TEST)) {
914+
// This is the original call
915+
this.capturedSpanInListener = this.tracer.currentSpan();
916+
}
917+
else {
918+
// This is a retry call (topic name will be different for retry topics)
919+
this.capturedSpanInRetry = this.tracer.currentSpan();
920+
}
921+
922+
this.asyncFailureLatch.countDown();
923+
924+
// Return a failed CompletableFuture to trigger async failure handling
925+
return CompletableFuture.supplyAsync(() -> {
926+
throw new RuntimeException("Async failure for observation test");
927+
});
928+
}
929+
930+
@DltHandler
931+
void handleDlt(ConsumerRecord<Integer, String> record, Exception exception) {
932+
this.capturedSpanInDlt = this.tracer.currentSpan();
933+
this.asyncFailureLatch.countDown();
934+
}
935+
}
936+
804937
}

0 commit comments

Comments
 (0)