Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ private void consumeMessage(KafkaConsumer<String, String> consumer) {
private void processRecord(ConsumerRecord<String, String> record) {
NewRelic.getAgent().getTransaction().setTransactionName(TransactionNamePriority.CUSTOM_HIGH,
true, "kafka", "processRecord");

final Iterator<Header> traceparentIterator = record.headers().headers("traceparent").iterator();
Assert.assertTrue("W3C traceparent header should be present", traceparentIterator.hasNext());

final Iterator<Header> tracestateIterator = record.headers().headers("tracestate").iterator();
Assert.assertTrue("W3C tracestate header should be present", tracestateIterator.hasNext());

final Iterator<Header> nrIterator = record.headers().headers("newrelic").iterator();
if (nrIterator.hasNext()) {
final Header nrHeader = nrIterator.next();
Expand All @@ -164,6 +171,139 @@ private void processRecord(ConsumerRecord<String, String> record) {
}
}

@Test
public void produceConsumeTestExcludeNewRelicHeader() throws Exception {
EnvironmentHolderSettingsGenerator envHolderSettings = new EnvironmentHolderSettingsGenerator(CONFIG_FILE, "exclude_newrelic_header_test", CLASS_LOADER);
EnvironmentHolder holder = new EnvironmentHolder(envHolderSettings);
holder.setupEnvironment();
kafkaUnitRule.getKafkaUnit().createTopic(testTopic, 1);
final KafkaConsumer<String, String> consumer = setupConsumer();

final CountDownLatch latch = new CountDownLatch(2);
final ConcurrentLinkedQueue<TransactionData> finishedTransactions = new ConcurrentLinkedQueue<>();
TransactionListener transactionListener = (transactionData, transactionStats) -> {
finishedTransactions.add(transactionData);
latch.countDown();
};
ServiceFactory.getTransactionService().addTransactionListener(transactionListener);

try {
produceMessage();
final Future<?> submit = executorService.submit(() -> consumeMessageExcludeNewRelicHeader(consumer));
submit.get(30, TimeUnit.SECONDS);
latch.await(30, TimeUnit.SECONDS);

Assert.assertEquals(2, finishedTransactions.size());

TransactionData firstTransaction = finishedTransactions.poll();
TransactionData secondTransaction = finishedTransactions.poll();

TransactionData conTxn = null;
Assert.assertNotNull(firstTransaction);
if (firstTransaction.getInboundDistributedTracePayload() != null) {
conTxn = firstTransaction;
} else {
Assert.assertNotNull(secondTransaction);
if (secondTransaction.getInboundDistributedTracePayload() != null) {
conTxn = secondTransaction;
}
}

Assert.assertNotNull("Consumer transaction should have an inbound distributed trace payload", conTxn);
Assert.assertNotNull("Inbound distributed trace payload should not be null", conTxn.getInboundDistributedTracePayload());
} finally {
ServiceFactory.getTransactionService().removeTransactionListener(transactionListener);
consumer.close();
}
}

@Trace(dispatcher = true)
private void consumeMessageExcludeNewRelicHeader(KafkaConsumer<String, String> consumer) {
final ConsumerRecords<String, String> records = consumer.poll(1000);
Assert.assertEquals(1, records.count());

for (ConsumerRecord<String, String> record : records) {
processRecordExcludeNewRelicHeader(record);
}
}

private void processRecordExcludeNewRelicHeader(ConsumerRecord<String, String> record) {
NewRelic.getAgent().getTransaction().setTransactionName(TransactionNamePriority.CUSTOM_HIGH,
true, "kafka", "processRecord");

// Verify W3C Trace Context headers are present
final Iterator<Header> traceparentIterator = record.headers().headers("traceparent").iterator();
Assert.assertTrue("W3C traceparent header should be present", traceparentIterator.hasNext());

final Iterator<Header> tracestateIterator = record.headers().headers("tracestate").iterator();
Assert.assertTrue("W3C tracestate header should be present", tracestateIterator.hasNext());

// Verify legacy newrelic header is NOT present when exclude_newrelic_header is true
final Iterator<Header> nrIterator = record.headers().headers("newrelic").iterator();
if (nrIterator.hasNext()) {
Assert.fail("newrelic header should NOT be present when exclude_newrelic_header is true");
}

// Accept W3C distributed trace headers (traceparent + tracestate)
NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(
com.newrelic.api.agent.TransportType.Kafka,
new KafkaHeadersAdapter(record.headers()));
}

// Adapter to expose Kafka headers as NewRelic Headers for DT propagation
private static class KafkaHeadersAdapter implements com.newrelic.api.agent.Headers {
private final org.apache.kafka.common.header.Headers headers;

KafkaHeadersAdapter(org.apache.kafka.common.header.Headers headers) {
this.headers = headers;
}

@Override
public com.newrelic.api.agent.HeaderType getHeaderType() {
return com.newrelic.api.agent.HeaderType.MESSAGE;
}

@Override
public String getHeader(String name) {
Iterator<Header> it = headers.headers(name).iterator();
return it.hasNext() ? new String(it.next().value(), StandardCharsets.UTF_8) : null;
}

@Override
public Collection<String> getHeaders(String name) {
Collection<String> result = new java.util.ArrayList<>();
for (Header h : headers.headers(name)) {
result.add(new String(h.value(), StandardCharsets.UTF_8));
}
return result;
}

@Override
public void setHeader(String name, String value) {
headers.remove(name);
headers.add(name, value.getBytes(StandardCharsets.UTF_8));
}

@Override
public void addHeader(String name, String value) {
headers.add(name, value.getBytes(StandardCharsets.UTF_8));
}

@Override
public Collection<String> getHeaderNames() {
Collection<String> names = new java.util.HashSet<>();
for (Header h : headers) {
names.add(h.key());
}
return names;
}

@Override
public boolean containsHeader(String name) {
return headers.headers(name).iterator().hasNext();
}
}

private KafkaConsumer<String, String> setupConsumer() {
final Properties props = new Properties();
props.put("bootstrap.servers", kafkaUnitRule.getKafkaUnit().getKafkaConnect());
Expand Down
11 changes: 10 additions & 1 deletion functional_test/src/test/resources/configs/span_events_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,13 @@ transaction_events_disabled_attribute_filtering:

span_events.attributes.enabled: true

span_events.attributes.exclude: txAttrib5,spanAttrib1
span_events.attributes.exclude: txAttrib5,spanAttrib1

exclude_newrelic_header_test:

distributed_tracing:
enabled: true
exclude_newrelic_header: true

span_events:
enabled: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
*
* * Copyright 2025 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.instrumentation.kafka;

import com.newrelic.api.agent.HeaderType;
import com.newrelic.api.agent.Headers;
import org.apache.kafka.common.header.Header;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;

public class HeadersWrapper implements Headers {

private final org.apache.kafka.common.header.Headers delegate;

public HeadersWrapper(org.apache.kafka.common.header.Headers headers) {
this.delegate = headers;
}

@Override
public HeaderType getHeaderType() {
return HeaderType.MESSAGE;
}

@Override
public String getHeader(String name) {
String value = null;
Iterator<Header> iterator = delegate.headers(name).iterator();
if (iterator.hasNext()) {
byte[] bytes = iterator.next().value();
if (bytes != null) {
value = new String(bytes);
}
}
return value;
}

@Override
public Collection<String> getHeaders(String name) {
Collection<String> headers = new ArrayList<>();
Iterator<Header> iterator = delegate.headers(name).iterator();
while (iterator.hasNext()) {
byte[] bytes = iterator.next().value();
if (bytes != null) {
headers.add(new String(bytes));
}
}
return headers;
}

@Override
public void setHeader(String name, String value) {
delegate.remove(name);
delegate.add(name, value.getBytes());
}

@Override
public void addHeader(String name, String value) {
delegate.add(name, value.getBytes());
}

@Override
public Collection<String> getHeaderNames() {
Collection<String> headerNames = new HashSet<>();
for(Header header : delegate) {
headerNames.add(header.key());
}
return headerNames;
}

@Override
public boolean containsHeader(String name) {
for(Header header : delegate) {
if (Objects.equals(name,header.key())) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
package org.apache.kafka.clients.producer;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.agent.bridge.NoOpDistributedTracePayload;
import com.newrelic.agent.bridge.Transaction;
import com.newrelic.api.agent.DistributedTracePayload;
import com.newrelic.api.agent.Headers;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.nr.instrumentation.kafka.HeadersWrapper;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.Future;

@Weave(originalName = "org.apache.kafka.clients.producer.KafkaProducer")
Expand All @@ -25,10 +25,8 @@ public class KafkaProducer_Instrumentation<K, V> {
private Future<RecordMetadata> doSend(ProducerRecord record, Callback callback) {
final Transaction transaction = AgentBridge.getAgent().getTransaction(false);
if (transaction != null) {
DistributedTracePayload payload = transaction.createDistributedTracePayload();
if (!(payload instanceof NoOpDistributedTracePayload)) {
record.headers().add("newrelic", payload.text().getBytes(StandardCharsets.UTF_8));
}
Headers dtHeaders = new HeadersWrapper(record.headers());
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(dtHeaders);
}
return Weaver.callOriginal();
}
Expand Down
Loading