diff --git a/dd-java-agent/instrumentation/kafka-streams-0.11/src/latestDepTest/groovy/KafkaStreamsTest.groovy b/dd-java-agent/instrumentation/kafka-streams-0.11/src/latestDepTest/groovy/KafkaStreamsTest.groovy index 095d479e2d9..ed7ce1292c7 100644 --- a/dd-java-agent/instrumentation/kafka-streams-0.11/src/latestDepTest/groovy/KafkaStreamsTest.groovy +++ b/dd-java-agent/instrumentation/kafka-streams-0.11/src/latestDepTest/groovy/KafkaStreamsTest.groovy @@ -3,7 +3,6 @@ import datadog.trace.api.DDTags import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags import datadog.trace.bootstrap.instrumentation.api.Tags import datadog.trace.core.datastreams.StatsGroup -import datadog.trace.test.util.Flaky import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.KafkaStreams @@ -27,8 +26,8 @@ import spock.lang.Shared import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit +import java.util.regex.Pattern -@Flaky("https://github.com/DataDog/dd-trace-java/issues/3865") class KafkaStreamsTest extends AgentTestRunner { static final STREAM_PENDING = "test.pending" static final STREAM_PROCESSED = "test.processed" @@ -39,6 +38,11 @@ class KafkaStreamsTest extends AgentTestRunner { @Shared EmbeddedKafkaBroker embeddedKafka = kafkaRule.embeddedKafka + def setup() { + // Filter out additional traces for kafka.poll operation, otherwise, there will be more traces than expected. + TEST_WRITER.setFilter { trace -> trace[0].operationName.toString() != 'kafka.poll' } + } + @Override protected boolean isDataStreamsEnabled() { return true @@ -135,6 +139,7 @@ class KafkaStreamsTest extends AgentTestRunner { if ({ isDataStreamsEnabled()}) { "$DDTags.PATHWAY_HASH" { String } } + "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" Pattern.compile("127.0.0.1:[0-9]+") defaultTagsNoPeerService() } } @@ -159,7 +164,6 @@ class KafkaStreamsTest extends AgentTestRunner { "$InstrumentationTags.PARTITION" { it >= 0 } "$InstrumentationTags.OFFSET" 0 "$InstrumentationTags.PROCESSOR_NAME" "KSTREAM-SOURCE-0000000000" - "$InstrumentationTags.MESSAGING_DESTINATION_NAME" "$STREAM_PENDING" "asdf" "testing" if ({ isDataStreamsEnabled()}) { "$DDTags.PATHWAY_HASH" { String } @@ -186,6 +190,7 @@ class KafkaStreamsTest extends AgentTestRunner { if ({ isDataStreamsEnabled()}) { "$DDTags.PATHWAY_HASH" { String } } + "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" Pattern.compile("127.0.0.1:[0-9]+") defaultTagsNoPeerService() } } @@ -212,6 +217,7 @@ class KafkaStreamsTest extends AgentTestRunner { if ({ isDataStreamsEnabled()}) { "$DDTags.PATHWAY_HASH" { String } } + "$InstrumentationTags.KAFKA_BOOTSTRAP_SERVERS" Pattern.compile("127.0.0.1:[0-9]+") defaultTags(true) } } @@ -226,8 +232,11 @@ class KafkaStreamsTest extends AgentTestRunner { if (isDataStreamsEnabled()) { StatsGroup originProducerPoint = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 } verifyAll(originProducerPoint) { - edgeTags == ["direction:out", "topic:$STREAM_PENDING", "type:kafka"] - edgeTags.size() == 3 + edgeTags.any { it.startsWith("kafka_cluster_id:") } + for (String tag : ["direction:out", "topic:$STREAM_PENDING", "type:kafka"]) { + assert edgeTags.contains(tag) + } + edgeTags.size() == 4 } StatsGroup kafkaStreamsConsumerPoint = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == originProducerPoint.hash } @@ -243,14 +252,20 @@ class KafkaStreamsTest extends AgentTestRunner { StatsGroup kafkaStreamsProducerPoint = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == kafkaStreamsConsumerPoint.hash } verifyAll(kafkaStreamsProducerPoint) { - edgeTags == ["direction:out", "topic:$STREAM_PROCESSED", "type:kafka"] - edgeTags.size() == 3 + edgeTags.any { it.startsWith("kafka_cluster_id:") } + for (String tag : ["direction:out", "topic:$STREAM_PROCESSED", "type:kafka"]) { + assert edgeTags.contains(tag) + } + edgeTags.size() == 4 } StatsGroup finalConsumerPoint = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == kafkaStreamsProducerPoint.hash } verifyAll(finalConsumerPoint) { - edgeTags == ["direction:in", "group:sender", "topic:$STREAM_PROCESSED".toString(), "type:kafka"] - edgeTags.size() == 4 + edgeTags.any { it.startsWith("kafka_cluster_id:") } + for (String tag : ["direction:in", "group:sender", "topic:$STREAM_PROCESSED".toString(), "type:kafka"]) { + assert edgeTags.contains(tag) + } + edgeTags.size() == 5 } }