From 281251ce9d7637e1105d3ef786c2e0666a8d10bc Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Thu, 17 Apr 2025 13:46:42 +0200 Subject: [PATCH] Add process tags to client stats payload --- .../common/metrics/SerializingMetricWriter.java | 13 ++++++++++++- .../metrics/SerializingMetricWriterTest.groovy | 16 ++++++++++++++-- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java index ce0d9f1de68..485b3e90bbc 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java @@ -5,7 +5,9 @@ import datadog.communication.serialization.GrowableBuffer; import datadog.communication.serialization.WritableFormatter; import datadog.communication.serialization.msgpack.MsgPackWriter; +import datadog.trace.api.ProcessTags; import datadog.trace.api.WellKnownTags; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; public final class SerializingMetricWriter implements MetricWriter { @@ -28,6 +30,7 @@ public final class SerializingMetricWriter implements MetricWriter { private static final byte[] STATS = "Stats".getBytes(ISO_8859_1); private static final byte[] OK_SUMMARY = "OkSummary".getBytes(ISO_8859_1); private static final byte[] ERROR_SUMMARY = "ErrorSummary".getBytes(ISO_8859_1); + private static final byte[] PROCESS_TAGS = "ProcessTags".getBytes(ISO_8859_1); private final WellKnownTags wellKnownTags; private final WritableFormatter writer; @@ -48,7 +51,9 @@ public SerializingMetricWriter(WellKnownTags wellKnownTags, Sink sink, int initi @Override public void startBucket(int metricCount, long start, long duration) { - writer.startMap(6); + final UTF8BytesString processTags = ProcessTags.getTagsForSerialization(); + final boolean writeProcessTags = processTags != null; + writer.startMap(6 + (writeProcessTags ? 1 : 0)); writer.writeUTF8(RUNTIME_ID); writer.writeUTF8(wellKnownTags.getRuntimeId()); @@ -65,7 +70,13 @@ public void startBucket(int metricCount, long start, long duration) { writer.writeUTF8(VERSION); writer.writeUTF8(wellKnownTags.getVersion()); + if (writeProcessTags) { + writer.writeUTF8(PROCESS_TAGS); + writer.writeUTF8(processTags); + } + writer.writeUTF8(STATS); + writer.startArray(1); writer.startMap(3); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy index 5b6ce8bee3d..e1d6f8f24b9 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy @@ -1,5 +1,7 @@ package datadog.trace.common.metrics +import datadog.trace.api.Config +import datadog.trace.api.ProcessTags import datadog.trace.api.WellKnownTags import datadog.trace.api.Pair import datadog.trace.test.util.DDSpecification @@ -9,13 +11,18 @@ import org.msgpack.core.MessageUnpacker import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicLongArray +import static datadog.trace.api.config.GeneralConfig.EXPERIMENTAL_COLLECT_PROCESS_TAGS_ENABLED import static java.util.concurrent.TimeUnit.MILLISECONDS import static java.util.concurrent.TimeUnit.SECONDS class SerializingMetricWriterTest extends DDSpecification { - def "should produce correct message #iterationIndex" () { + def "should produce correct message #iterationIndex with process tags enabled #withProcessTags" () { setup: + if (withProcessTags) { + injectSysConfig(EXPERIMENTAL_COLLECT_PROCESS_TAGS_ENABLED, "true") + } + ProcessTags.reset() long startTime = MILLISECONDS.toNanos(System.currentTimeMillis()) long duration = SECONDS.toNanos(10) WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version","language") @@ -43,6 +50,7 @@ class SerializingMetricWriterTest extends DDSpecification { Pair.of(new MetricKey("resource" + i, "service" + i, "operation" + i, "type", 0, false), new AggregateMetric().recordDurations(10, new AtomicLongArray(1L))) }) ] + withProcessTags << [true, false] } @@ -70,7 +78,7 @@ class SerializingMetricWriterTest extends DDSpecification { void accept(int messageCount, ByteBuffer buffer) { MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(buffer) int mapSize = unpacker.unpackMapHeader() - assert mapSize == 6 + assert mapSize == (6 + (Config.get().isExperimentalCollectProcessTagsEnabled() ? 1 : 0)) assert unpacker.unpackString() == "RuntimeId" assert unpacker.unpackString() == wellKnownTags.getRuntimeId() as String assert unpacker.unpackString() == "Seq" @@ -81,6 +89,10 @@ class SerializingMetricWriterTest extends DDSpecification { assert unpacker.unpackString() == wellKnownTags.getEnv() as String assert unpacker.unpackString() == "Version" assert unpacker.unpackString() == wellKnownTags.getVersion() as String + if (Config.get().isExperimentalCollectProcessTagsEnabled()) { + assert unpacker.unpackString() == "ProcessTags" + assert unpacker.unpackString() == ProcessTags.tagsForSerialization as String + } assert unpacker.unpackString() == "Stats" int outerLength = unpacker.unpackArrayHeader() assert outerLength == 1