Skip to content

Commit 3a4e45b

Browse files
authored
GH-3786: Remove duplicated trace header
Fixes: #3786 Issue link: #3786 When tracing is enabled, the KafkaRecordSenderContext was adding a new trace header without removing existing ones, resulting in multiple headers in the same record. This commit fixes the issue by Updating KafkaRecordSenderContext to remove existing trace headers before adding new ones. **Auto-cherry-pick to `3.3.x` & `3.2.x`** Signed-off-by: Soby Chacko <[email protected]>
1 parent 01fb025 commit 3a4e45b

File tree

2 files changed

+78
-5
lines changed

2 files changed

+78
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2024 the original author or authors.
2+
* Copyright 2022-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,13 +21,15 @@
2121

2222
import io.micrometer.observation.transport.SenderContext;
2323
import org.apache.kafka.clients.producer.ProducerRecord;
24+
import org.apache.kafka.common.header.Headers;
2425

2526
/**
2627
* {@link SenderContext} for {@link ProducerRecord}s.
2728
*
2829
* @author Gary Russell
2930
* @author Christian Mergenthaler
3031
* @author Wang Zhiyang
32+
* @author Soby Chacko
3133
*
3234
* @since 3.0
3335
*
@@ -39,8 +41,12 @@ public class KafkaRecordSenderContext extends SenderContext<ProducerRecord<?, ?>
3941
private final ProducerRecord<?, ?> record;
4042

4143
public KafkaRecordSenderContext(ProducerRecord<?, ?> record, String beanName, Supplier<String> clusterId) {
42-
super((carrier, key, value) -> record.headers().add(key,
43-
value == null ? null : value.getBytes(StandardCharsets.UTF_8)));
44+
super((carrier, key, value) -> {
45+
Headers headers = record.headers();
46+
headers.remove(key);
47+
headers.add(key, value == null ? null : value.getBytes(StandardCharsets.UTF_8));
48+
});
49+
4450
setCarrier(record);
4551
this.beanName = beanName;
4652
this.record = record;

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

+69-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.support.micrometer;
1818

19+
import java.nio.charset.StandardCharsets;
1920
import java.util.Arrays;
2021
import java.util.Deque;
2122
import java.util.List;
@@ -26,6 +27,7 @@
2627
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.TimeoutException;
2829
import java.util.concurrent.atomic.AtomicReference;
30+
import java.util.stream.StreamSupport;
2931

3032
import io.micrometer.common.KeyValues;
3133
import io.micrometer.core.instrument.MeterRegistry;
@@ -56,6 +58,7 @@
5658
import org.apache.kafka.common.errors.InvalidTopicException;
5759
import org.apache.kafka.common.header.Header;
5860
import org.apache.kafka.common.header.Headers;
61+
import org.apache.kafka.common.header.internals.RecordHeader;
5962
import org.jspecify.annotations.Nullable;
6063
import org.junit.jupiter.api.Test;
6164
import reactor.core.publisher.Mono;
@@ -78,6 +81,7 @@
7881
import org.springframework.kafka.core.ProducerFactory;
7982
import org.springframework.kafka.listener.MessageListenerContainer;
8083
import org.springframework.kafka.listener.RecordInterceptor;
84+
import org.springframework.kafka.support.ProducerListener;
8185
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
8286
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
8387
import org.springframework.kafka.test.EmbeddedKafkaBroker;
@@ -104,7 +108,7 @@
104108
@SpringJUnitConfig
105109
@EmbeddedKafka(topics = { ObservationTests.OBSERVATION_TEST_1, ObservationTests.OBSERVATION_TEST_2,
106110
ObservationTests.OBSERVATION_TEST_3, ObservationTests.OBSERVATION_RUNTIME_EXCEPTION,
107-
ObservationTests.OBSERVATION_ERROR }, partitions = 1)
111+
ObservationTests.OBSERVATION_ERROR, ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE }, partitions = 1)
108112
@DirtiesContext
109113
public class ObservationTests {
110114

@@ -122,6 +126,8 @@ public class ObservationTests {
122126

123127
public final static String OBSERVATION_ERROR_MONO = "observation.error.mono";
124128

129+
public final static String OBSERVATION_TRACEPARENT_DUPLICATE = "observation.traceparent.duplicate";
130+
125131
@Test
126132
void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template,
127133
@Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler,
@@ -449,6 +455,62 @@ void kafkaAdminNotRecreatedIfBootstrapServersSameInProducerAndAdminConfig(
449455
assertThat(template.getKafkaAdmin()).isSameAs(kafkaAdmin);
450456
}
451457

458+
@Test
459+
void verifyKafkaRecordSenderContextTraceParentHandling() {
460+
String initialTraceParent = "traceparent-from-previous";
461+
String updatedTraceParent = "traceparent-current";
462+
ProducerRecord<Integer, String> record = new ProducerRecord<>("test-topic", "test-value");
463+
record.headers().add("traceparent", initialTraceParent.getBytes(StandardCharsets.UTF_8));
464+
465+
// Create the context and update the traceparent
466+
KafkaRecordSenderContext context = new KafkaRecordSenderContext(
467+
record,
468+
"test-bean",
469+
() -> "test-cluster"
470+
);
471+
context.getSetter().set(record, "traceparent", updatedTraceParent);
472+
473+
Iterable<Header> traceparentHeaders = record.headers().headers("traceparent");
474+
475+
List<String> headerValues = StreamSupport.stream(traceparentHeaders.spliterator(), false)
476+
.map(header -> new String(header.value(), StandardCharsets.UTF_8))
477+
.toList();
478+
479+
// Verify there's only one traceparent header and it contains the updated value
480+
assertThat(headerValues).containsExactly(updatedTraceParent);
481+
}
482+
483+
@Test
484+
void verifyTraceParentHeader(@Autowired KafkaTemplate<Integer, String> template,
485+
@Autowired SimpleTracer tracer) throws Exception {
486+
CompletableFuture<ProducerRecord<Integer, String>> producerRecordFuture = new CompletableFuture<>();
487+
template.setProducerListener(new ProducerListener<>() {
488+
@Override
489+
public void onSuccess(ProducerRecord<Integer, String> producerRecord, RecordMetadata recordMetadata) {
490+
producerRecordFuture.complete(producerRecord);
491+
}
492+
});
493+
String initialTraceParent = "traceparent-from-previous";
494+
Header header = new RecordHeader("traceparent", initialTraceParent.getBytes(StandardCharsets.UTF_8));
495+
ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(
496+
OBSERVATION_TRACEPARENT_DUPLICATE,
497+
null, null, null,
498+
"test-value",
499+
List.of(header)
500+
);
501+
502+
template.send(producerRecord).get(10, TimeUnit.SECONDS);
503+
ProducerRecord<Integer, String> recordResult = producerRecordFuture.get(10, TimeUnit.SECONDS);
504+
505+
Iterable<Header> traceparentHeaders = recordResult.headers().headers("traceparent");
506+
assertThat(traceparentHeaders).hasSize(1);
507+
508+
String traceparentValue = new String(traceparentHeaders.iterator().next().value(), StandardCharsets.UTF_8);
509+
assertThat(traceparentValue).isEqualTo("traceparent-from-propagator");
510+
511+
tracer.getSpans().clear();
512+
}
513+
452514
@Configuration
453515
@EnableKafka
454516
public static class Config {
@@ -598,6 +660,9 @@ public List<String> fields() {
598660
public <C> void inject(TraceContext context, @Nullable C carrier, Setter<C> setter) {
599661
setter.set(carrier, "foo", "some foo value");
600662
setter.set(carrier, "bar", "some bar value");
663+
664+
// Add a traceparent header to simulate W3C trace context
665+
setter.set(carrier, "traceparent", "traceparent-from-propagator");
601666
}
602667

603668
// This is called on the consumer side when the message is consumed
@@ -606,7 +671,9 @@ public <C> void inject(TraceContext context, @Nullable C carrier, Setter<C> sett
606671
public <C> Span.Builder extract(C carrier, Getter<C> getter) {
607672
String foo = getter.get(carrier, "foo");
608673
String bar = getter.get(carrier, "bar");
609-
return tracer.spanBuilder().tag("foo", foo).tag("bar", bar);
674+
return tracer.spanBuilder()
675+
.tag("foo", foo)
676+
.tag("bar", bar);
610677
}
611678
};
612679
}

0 commit comments

Comments
 (0)