diff --git a/functional_test/src/test/java/com/newrelic/agent/instrumentation/kafka/KafkaTest.java b/functional_test/src/test/java/com/newrelic/agent/instrumentation/kafka/KafkaTest.java index c51972bc81..b3478e2deb 100644 --- a/functional_test/src/test/java/com/newrelic/agent/instrumentation/kafka/KafkaTest.java +++ b/functional_test/src/test/java/com/newrelic/agent/instrumentation/kafka/KafkaTest.java @@ -154,6 +154,13 @@ private void consumeMessage(KafkaConsumer consumer) { private void processRecord(ConsumerRecord record) { NewRelic.getAgent().getTransaction().setTransactionName(TransactionNamePriority.CUSTOM_HIGH, true, "kafka", "processRecord"); + + final Iterator
traceparentIterator = record.headers().headers("traceparent").iterator(); + Assert.assertTrue("W3C traceparent header should be present", traceparentIterator.hasNext()); + + final Iterator
tracestateIterator = record.headers().headers("tracestate").iterator(); + Assert.assertTrue("W3C tracestate header should be present", tracestateIterator.hasNext()); + final Iterator
nrIterator = record.headers().headers("newrelic").iterator(); if (nrIterator.hasNext()) { final Header nrHeader = nrIterator.next(); @@ -164,6 +171,139 @@ private void processRecord(ConsumerRecord record) { } } + @Test + public void produceConsumeTestExcludeNewRelicHeader() throws Exception { + EnvironmentHolderSettingsGenerator envHolderSettings = new EnvironmentHolderSettingsGenerator(CONFIG_FILE, "exclude_newrelic_header_test", CLASS_LOADER); + EnvironmentHolder holder = new EnvironmentHolder(envHolderSettings); + holder.setupEnvironment(); + kafkaUnitRule.getKafkaUnit().createTopic(testTopic, 1); + final KafkaConsumer consumer = setupConsumer(); + + final CountDownLatch latch = new CountDownLatch(2); + final ConcurrentLinkedQueue finishedTransactions = new ConcurrentLinkedQueue<>(); + TransactionListener transactionListener = (transactionData, transactionStats) -> { + finishedTransactions.add(transactionData); + latch.countDown(); + }; + ServiceFactory.getTransactionService().addTransactionListener(transactionListener); + + try { + produceMessage(); + final Future submit = executorService.submit(() -> consumeMessageExcludeNewRelicHeader(consumer)); + submit.get(30, TimeUnit.SECONDS); + latch.await(30, TimeUnit.SECONDS); + + Assert.assertEquals(2, finishedTransactions.size()); + + TransactionData firstTransaction = finishedTransactions.poll(); + TransactionData secondTransaction = finishedTransactions.poll(); + + TransactionData conTxn = null; + Assert.assertNotNull(firstTransaction); + if (firstTransaction.getInboundDistributedTracePayload() != null) { + conTxn = firstTransaction; + } else { + Assert.assertNotNull(secondTransaction); + if (secondTransaction.getInboundDistributedTracePayload() != null) { + conTxn = secondTransaction; + } + } + + Assert.assertNotNull("Consumer transaction should have an inbound distributed trace payload", conTxn); + Assert.assertNotNull("Inbound distributed trace payload should not be null", conTxn.getInboundDistributedTracePayload()); + } finally { + ServiceFactory.getTransactionService().removeTransactionListener(transactionListener); + consumer.close(); + } + } + + @Trace(dispatcher = true) + private void consumeMessageExcludeNewRelicHeader(KafkaConsumer consumer) { + final ConsumerRecords records = consumer.poll(1000); + Assert.assertEquals(1, records.count()); + + for (ConsumerRecord record : records) { + processRecordExcludeNewRelicHeader(record); + } + } + + private void processRecordExcludeNewRelicHeader(ConsumerRecord record) { + NewRelic.getAgent().getTransaction().setTransactionName(TransactionNamePriority.CUSTOM_HIGH, + true, "kafka", "processRecord"); + + // Verify W3C Trace Context headers are present + final Iterator
traceparentIterator = record.headers().headers("traceparent").iterator(); + Assert.assertTrue("W3C traceparent header should be present", traceparentIterator.hasNext()); + + final Iterator
tracestateIterator = record.headers().headers("tracestate").iterator(); + Assert.assertTrue("W3C tracestate header should be present", tracestateIterator.hasNext()); + + // Verify legacy newrelic header is NOT present when exclude_newrelic_header is true + final Iterator
nrIterator = record.headers().headers("newrelic").iterator(); + if (nrIterator.hasNext()) { + Assert.fail("newrelic header should NOT be present when exclude_newrelic_header is true"); + } + + // Accept W3C distributed trace headers (traceparent + tracestate) + NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders( + com.newrelic.api.agent.TransportType.Kafka, + new KafkaHeadersAdapter(record.headers())); + } + + // Adapter to expose Kafka headers as NewRelic Headers for DT propagation + private static class KafkaHeadersAdapter implements com.newrelic.api.agent.Headers { + private final org.apache.kafka.common.header.Headers headers; + + KafkaHeadersAdapter(org.apache.kafka.common.header.Headers headers) { + this.headers = headers; + } + + @Override + public com.newrelic.api.agent.HeaderType getHeaderType() { + return com.newrelic.api.agent.HeaderType.MESSAGE; + } + + @Override + public String getHeader(String name) { + Iterator
it = headers.headers(name).iterator(); + return it.hasNext() ? new String(it.next().value(), StandardCharsets.UTF_8) : null; + } + + @Override + public Collection getHeaders(String name) { + Collection result = new java.util.ArrayList<>(); + for (Header h : headers.headers(name)) { + result.add(new String(h.value(), StandardCharsets.UTF_8)); + } + return result; + } + + @Override + public void setHeader(String name, String value) { + headers.remove(name); + headers.add(name, value.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public void addHeader(String name, String value) { + headers.add(name, value.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public Collection getHeaderNames() { + Collection names = new java.util.HashSet<>(); + for (Header h : headers) { + names.add(h.key()); + } + return names; + } + + @Override + public boolean containsHeader(String name) { + return headers.headers(name).iterator().hasNext(); + } + } + private KafkaConsumer setupConsumer() { final Properties props = new Properties(); props.put("bootstrap.servers", kafkaUnitRule.getKafkaUnit().getKafkaConnect()); diff --git a/functional_test/src/test/resources/configs/span_events_test.yml b/functional_test/src/test/resources/configs/span_events_test.yml index 7512cf5789..8ba356cde9 100644 --- a/functional_test/src/test/resources/configs/span_events_test.yml +++ b/functional_test/src/test/resources/configs/span_events_test.yml @@ -78,4 +78,13 @@ transaction_events_disabled_attribute_filtering: span_events.attributes.enabled: true - span_events.attributes.exclude: txAttrib5,spanAttrib1 \ No newline at end of file + span_events.attributes.exclude: txAttrib5,spanAttrib1 + +exclude_newrelic_header_test: + + distributed_tracing: + enabled: true + exclude_newrelic_header: true + + span_events: + enabled: true \ No newline at end of file diff --git a/instrumentation/kafka-clients-spans-0.11.0.0/src/main/java/com/nr/instrumentation/kafka/HeadersWrapper.java b/instrumentation/kafka-clients-spans-0.11.0.0/src/main/java/com/nr/instrumentation/kafka/HeadersWrapper.java new file mode 100644 index 0000000000..28383f68b0 --- /dev/null +++ b/instrumentation/kafka-clients-spans-0.11.0.0/src/main/java/com/nr/instrumentation/kafka/HeadersWrapper.java @@ -0,0 +1,88 @@ +/* + * + * * Copyright 2025 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +import com.newrelic.api.agent.HeaderType; +import com.newrelic.api.agent.Headers; +import org.apache.kafka.common.header.Header; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Objects; + +public class HeadersWrapper implements Headers { + + private final org.apache.kafka.common.header.Headers delegate; + + public HeadersWrapper(org.apache.kafka.common.header.Headers headers) { + this.delegate = headers; + } + + @Override + public HeaderType getHeaderType() { + return HeaderType.MESSAGE; + } + + @Override + public String getHeader(String name) { + String value = null; + Iterator
iterator = delegate.headers(name).iterator(); + if (iterator.hasNext()) { + byte[] bytes = iterator.next().value(); + if (bytes != null) { + value = new String(bytes); + } + } + return value; + } + + @Override + public Collection getHeaders(String name) { + Collection headers = new ArrayList<>(); + Iterator
iterator = delegate.headers(name).iterator(); + while (iterator.hasNext()) { + byte[] bytes = iterator.next().value(); + if (bytes != null) { + headers.add(new String(bytes)); + } + } + return headers; + } + + @Override + public void setHeader(String name, String value) { + delegate.remove(name); + delegate.add(name, value.getBytes()); + } + + @Override + public void addHeader(String name, String value) { + delegate.add(name, value.getBytes()); + } + + @Override + public Collection getHeaderNames() { + Collection headerNames = new HashSet<>(); + for(Header header : delegate) { + headerNames.add(header.key()); + } + return headerNames; + } + + @Override + public boolean containsHeader(String name) { + for(Header header : delegate) { + if (Objects.equals(name,header.key())) { + return true; + } + } + return false; + } +} diff --git a/instrumentation/kafka-clients-spans-0.11.0.0/src/main/java/org/apache/kafka/clients/producer/KafkaProducer_Instrumentation.java b/instrumentation/kafka-clients-spans-0.11.0.0/src/main/java/org/apache/kafka/clients/producer/KafkaProducer_Instrumentation.java index 8b6314aab1..bc7c8b817c 100644 --- a/instrumentation/kafka-clients-spans-0.11.0.0/src/main/java/org/apache/kafka/clients/producer/KafkaProducer_Instrumentation.java +++ b/instrumentation/kafka-clients-spans-0.11.0.0/src/main/java/org/apache/kafka/clients/producer/KafkaProducer_Instrumentation.java @@ -8,14 +8,14 @@ package org.apache.kafka.clients.producer; import com.newrelic.agent.bridge.AgentBridge; -import com.newrelic.agent.bridge.NoOpDistributedTracePayload; import com.newrelic.agent.bridge.Transaction; -import com.newrelic.api.agent.DistributedTracePayload; +import com.newrelic.api.agent.Headers; +import com.newrelic.api.agent.NewRelic; import com.newrelic.api.agent.Trace; import com.newrelic.api.agent.weaver.Weave; import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.kafka.HeadersWrapper; -import java.nio.charset.StandardCharsets; import java.util.concurrent.Future; @Weave(originalName = "org.apache.kafka.clients.producer.KafkaProducer") @@ -25,10 +25,8 @@ public class KafkaProducer_Instrumentation { private Future doSend(ProducerRecord record, Callback callback) { final Transaction transaction = AgentBridge.getAgent().getTransaction(false); if (transaction != null) { - DistributedTracePayload payload = transaction.createDistributedTracePayload(); - if (!(payload instanceof NoOpDistributedTracePayload)) { - record.headers().add("newrelic", payload.text().getBytes(StandardCharsets.UTF_8)); - } + Headers dtHeaders = new HeadersWrapper(record.headers()); + NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(dtHeaders); } return Weaver.callOriginal(); }