Skip to content

DSM optimizations - major refactoring to get rid of LinkedHashMap #9151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
4f895c7
DSM optimizations - major refactoring to get rid of LinkedHashMap
kr-igor Jul 10, 2025
1a1e77e
Refactored DSM tags for all integrations
kr-igor Jul 11, 2025
018b9d5
Removed tests which no longer needed
kr-igor Jul 11, 2025
9c69c1c
Merge branch 'master' into kr-igor/dsm-optimization-v2
kr-igor Jul 11, 2025
dba7d11
Fixed some more tests
kr-igor Jul 11, 2025
0b96dd0
Fixed payload writer tests
kr-igor Jul 11, 2025
4c28cbb
Fixed http tests
kr-igor Jul 11, 2025
6836f49
Removed DataStreamsTagsBuilder, updated all integraions and tests
kr-igor Jul 14, 2025
ea3add7
Spotless apply
kr-igor Jul 14, 2025
47b6142
Merge branch 'master' into kr-igor/dsm-optimization-v2
kr-igor Jul 14, 2025
b9d8818
Merge branch 'master' into kr-igor/dsm-optimization-v2
kr-igor Jul 15, 2025
5e197d1
Fixed more tests
kr-igor Jul 15, 2025
1c32c8d
Add base hash support and service name overrides
kr-igor Jul 15, 2025
5ec63df
Fixed more tests
kr-igor Jul 15, 2025
35a2156
Spotless apply
kr-igor Jul 15, 2025
fac8b25
Merge branch 'master' into kr-igor/dsm-optimization-v2
kr-igor Jul 16, 2025
9ca0f51
Added tests for tags
kr-igor Jul 16, 2025
6faef84
Improved coverage
kr-igor Jul 16, 2025
85846c7
Removed unused imports
kr-igor Jul 16, 2025
ddebf44
Fixed even more tests
kr-igor Jul 16, 2025
9466113
Updated multiple tests
kr-igor Jul 16, 2025
0def203
Fixed more tests
kr-igor Jul 16, 2025
3433f48
Spotless apply
kr-igor Jul 16, 2025
8718b22
Fixed even more tests
kr-igor Jul 16, 2025
6800ca8
Fixed http tests, improved test coverage
kr-igor Jul 17, 2025
1205937
Spotless apply
kr-igor Jul 17, 2025
18258b0
One more refactoring
kr-igor Jul 17, 2025
ddf3008
Some fixes in tests
kr-igor Jul 17, 2025
b559bdf
Improved test coverage
kr-igor Jul 17, 2025
cd5073f
Spotless apply
kr-igor Jul 17, 2025
30b796e
Removed unused import
kr-igor Jul 17, 2025
82431fe
Fixed all tags
kr-igor Jul 17, 2025
68373af
Fixed kafka tests
kr-igor Jul 17, 2025
3de3d08
Merge branch 'master' into kr-igor/dsm-optimization-v2
kr-igor Jul 17, 2025
9297499
Merge branch 'master' into kr-igor/dsm-optimization-v2
kr-igor Jul 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
import static datadog.context.propagation.Propagators.defaultPropagator;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;

import datadog.context.Context;
import datadog.context.propagation.CarrierSetter;
Expand All @@ -14,6 +11,7 @@
import datadog.trace.api.cache.DDCache;
import datadog.trace.api.cache.DDCaches;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.api.naming.SpanNaming;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
Expand All @@ -23,7 +21,6 @@
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.BitSet;
import java.util.LinkedHashMap;
import java.util.Set;
import java.util.function.Function;

Expand All @@ -35,10 +32,8 @@ public class GrpcClientDecorator extends ClientDecorator {
public static final CharSequence GRPC_MESSAGE = UTF8BytesString.create("grpc.message");

private static DataStreamsContext createDsmContext() {
LinkedHashMap<String, String> result = new LinkedHashMap<>();
result.put(DIRECTION_TAG, DIRECTION_OUT);
result.put(TYPE_TAG, "grpc");
return DataStreamsContext.fromTags(result);
return DataStreamsContext.fromTags(
Copy link
Contributor

@dougqh dougqh Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would avoid builder idioms in the critical path. That's still extra allocation that we really don't need.

DataStreamsTags.create("grpc", DataStreamsTags.Direction.Outbound));
}

public static final GrpcClientDecorator DECORATE = new GrpcClientDecorator();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package datadog.trace.instrumentation.armeria.grpc.server;

import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;

import datadog.trace.api.Config;
import datadog.trace.api.cache.DDCache;
import datadog.trace.api.cache.DDCaches;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.api.naming.SpanNaming;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.ErrorPriorities;
Expand All @@ -18,7 +15,6 @@
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import java.util.BitSet;
import java.util.LinkedHashMap;
import java.util.function.Function;

public class GrpcServerDecorator extends ServerDecorator {
Expand All @@ -33,15 +29,11 @@ public class GrpcServerDecorator extends ServerDecorator {
public static final CharSequence COMPONENT_NAME = UTF8BytesString.create("armeria-grpc-server");
public static final CharSequence GRPC_MESSAGE = UTF8BytesString.create("grpc.message");

private static final LinkedHashMap<String, String> createServerPathwaySortedTags() {
LinkedHashMap<String, String> result = new LinkedHashMap<>();
result.put(DIRECTION_TAG, DIRECTION_IN);
result.put(TYPE_TAG, "grpc");
return result;
private static DataStreamsTags createServerPathwaySortedTags() {
return DataStreamsTags.create("grpc", DataStreamsTags.Direction.Inbound);
}

public static final LinkedHashMap<String, String> SERVER_PATHWAY_EDGE_TAGS =
createServerPathwaySortedTags();
public static final DataStreamsTags SERVER_PATHWAY_EDGE_TAGS = createServerPathwaySortedTags();
public static final GrpcServerDecorator DECORATE = new GrpcServerDecorator();

private static final Function<String, String> NORMALIZE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,12 @@ abstract class ArmeriaGrpcTest extends VersionedNamingTestBase {
if (isDataStreamsEnabled()) {
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }
verifyAll(first) {
edgeTags.containsAll(["direction:out", "type:grpc"])
edgeTags.size() == 2
tags.hasAllTags("direction:out", "type:grpc")
}

StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash }
verifyAll(second) {
edgeTags.containsAll(["direction:in", "type:grpc"])
edgeTags.size() == 2
tags.hasAllTags("direction:in", "type:grpc")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,14 @@

import static datadog.context.propagation.Propagators.defaultPropagator;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.core.datastreams.TagsProcessor.BUS_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
import static datadog.trace.instrumentation.aws.v2.eventbridge.TextMapInjectAdapter.SETTER;

import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.api.datastreams.PathwayContext;
import datadog.trace.bootstrap.InstanceStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -89,7 +85,9 @@ private String getTraceContextToInject(
// Inject context
datadog.context.Context context = span;
if (traceConfig().isDataStreamsEnabled()) {
DataStreamsContext dsmContext = DataStreamsContext.fromTags(getTags(eventBusName));
DataStreamsTags tags =
DataStreamsTags.createWithBus(DataStreamsTags.Direction.Outbound, eventBusName);
DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags);
context = context.with(dsmContext);
}
defaultPropagator().inject(context, jsonBuilder, SETTER);
Expand All @@ -111,13 +109,4 @@ private String getTraceContextToInject(
jsonBuilder.append('}');
return jsonBuilder.toString();
}

private LinkedHashMap<String, String> getTags(String eventBusName) {
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
sortedTags.put(DIRECTION_TAG, DIRECTION_OUT);
sortedTags.put(BUS_TAG, eventBusName);
sortedTags.put(TYPE_TAG, "bus");

return sortedTags;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ abstract class AWS1KinesisClientTest extends VersionedNamingTestBase {
pathwayLatencyCount += group.pathwayLatency.count
edgeLatencyCount += group.edgeLatency.count
verifyAll(group) {
edgeTags.containsAll(["direction:" + dsmDirection, "topic:" + streamArn, "type:kinesis"])
edgeTags.size() == 3
tags.hasAllTags("direction:" + dsmDirection, "topic:" + streamArn, "type:kinesis")
}
}
verifyAll {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ abstract class AWS1SnsClientTest extends VersionedNamingTestBase {
pathwayLatencyCount += group.pathwayLatency.count
edgeLatencyCount += group.edgeLatency.count
verifyAll(group) {
edgeTags.containsAll(["direction:" + dsmDirection, "topic:" + topicName, "type:sns"])
edgeTags.size() == 3
tags.hasAllTags("direction:" + dsmDirection, "topic:" + topicName, "type:sns")
}
}
verifyAll {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static datadog.trace.api.datastreams.DataStreamsContext.create;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.bootstrap.instrumentation.api.ResourceNamePriorities.RPC_COMMAND_NAME;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;

import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.AmazonWebServiceResponse;
Expand All @@ -15,6 +14,7 @@
import datadog.trace.api.DDTags;
import datadog.trace.api.cache.DDCache;
import datadog.trace.api.cache.DDCaches;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.api.naming.SpanNaming;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
Expand All @@ -23,9 +23,7 @@
import datadog.trace.bootstrap.instrumentation.api.Tags;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
import datadog.trace.core.datastreams.TagsProcessor;
import java.net.URI;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -255,17 +253,12 @@ && traceConfig().isDataStreamsEnabled()) {
if (HttpMethodName.GET.name().equals(span.getTag(Tags.HTTP_METHOD))
&& ("GetObjectMetadataRequest".equalsIgnoreCase(awsOperation)
|| "GetObjectRequest".equalsIgnoreCase(awsOperation))) {
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();

sortedTags.put(TagsProcessor.DIRECTION_TAG, TagsProcessor.DIRECTION_IN);
sortedTags.put(TagsProcessor.DATASET_NAME_TAG, key);
sortedTags.put(TagsProcessor.DATASET_NAMESPACE_TAG, bucket);
sortedTags.put(TagsProcessor.TOPIC_TAG, bucket);
sortedTags.put(TagsProcessor.TYPE_TAG, "s3");

DataStreamsTags tags =
DataStreamsTags.createWithDataset(
"s3", DataStreamsTags.Direction.Inbound, bucket, key, bucket);
AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, create(sortedTags, 0, responseSize));
.setCheckpoint(span, create(tags, 0, responseSize));
}

if ("PutObjectRequest".equalsIgnoreCase(awsOperation)
Expand All @@ -275,18 +268,12 @@ && traceConfig().isDataStreamsEnabled()) {
if (requestSize != null) {
payloadSize = (long) requestSize;
}

LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();

sortedTags.put(TagsProcessor.DIRECTION_TAG, DIRECTION_OUT);
sortedTags.put(TagsProcessor.DATASET_NAME_TAG, key);
sortedTags.put(TagsProcessor.DATASET_NAMESPACE_TAG, bucket);
sortedTags.put(TagsProcessor.TOPIC_TAG, bucket);
sortedTags.put(TagsProcessor.TYPE_TAG, "s3");

DataStreamsTags tags =
DataStreamsTags.createWithDataset(
"s3", DataStreamsTags.Direction.Outbound, bucket, key, bucket);
AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, create(sortedTags, 0, payloadSize));
.setCheckpoint(span, create(tags, 0, payloadSize));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.blackholeSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
import static datadog.trace.instrumentation.aws.v0.AwsSdkClientDecorator.AWS_LEGACY_TRACING;
import static datadog.trace.instrumentation.aws.v0.AwsSdkClientDecorator.DECORATE;

Expand All @@ -20,14 +16,11 @@
import com.amazonaws.handlers.RequestHandler2;
import datadog.context.propagation.Propagators;
import datadog.trace.api.Config;
import datadog.trace.api.datastreams.AgentDataStreamsMonitoring;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.PathwayContext;
import datadog.trace.api.datastreams.*;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -116,16 +109,14 @@ && traceConfig().isDataStreamsEnabled()
List<?> records =
GetterAccess.of(response.getAwsResponse()).getRecords(response.getAwsResponse());
if (null != records) {
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
sortedTags.put(DIRECTION_TAG, DIRECTION_IN);
sortedTags.put(TOPIC_TAG, streamArn);
sortedTags.put(TYPE_TAG, "kinesis");
DataStreamsTags tags =
DataStreamsTags.create("kinesis", DataStreamsTags.Direction.Inbound, streamArn);
for (Object record : records) {
Date arrivalTime = GetterAccess.of(record).getApproximateArrivalTimestamp(record);
AgentDataStreamsMonitoring dataStreamsMonitoring =
AgentTracer.get().getDataStreamsMonitoring();
PathwayContext pathwayContext = dataStreamsMonitoring.newPathwayContext();
DataStreamsContext context = create(sortedTags, arrivalTime.getTime(), 0);
DataStreamsContext context = create(tags, arrivalTime.getTime(), 0);
pathwayContext.setCheckpoint(context, dataStreamsMonitoring::add);
if (!span.context().getPathwayContext().isStarted()) {
span.context().mergePathwayContext(pathwayContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ abstract class Aws2KinesisDataStreamsTest extends VersionedNamingTestBase {
pathwayLatencyCount += group.pathwayLatency.count
edgeLatencyCount += group.edgeLatency.count
verifyAll(group) {
edgeTags.containsAll(["direction:" + dsmDirection, "topic:arnprefix:stream/somestream", "type:kinesis"])
edgeTags.size() == 3
tags.hasAllTags("direction:" + dsmDirection, "topic:arnprefix:stream/somestream", "type:kinesis")
}
}
verifyAll {
Expand Down Expand Up @@ -278,8 +277,7 @@ abstract class Aws2KinesisDataStreamsTest extends VersionedNamingTestBase {
pathwayLatencyCount += group.pathwayLatency.count
edgeLatencyCount += group.edgeLatency.count
verifyAll(group) {
edgeTags.containsAll(["direction:" + dsmDirection, "topic:arnprefix:stream/somestream", "type:kinesis"])
edgeTags.size() == 3
tags.hasAllTags("direction:" + dsmDirection, "topic:arnprefix:stream/somestream", "type:kinesis")
}
}
verifyAll {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ abstract class Aws2SnsDataStreamsTest extends VersionedNamingTestBase {
pathwayLatencyCount += group.pathwayLatency.count
edgeLatencyCount += group.edgeLatency.count
verifyAll(group) {
edgeTags.containsAll(["direction:" + dsmDirection, "topic:mytopic", "type:sns"])
edgeTags.size() == 3
tags.hasAllTags("direction:" + dsmDirection, "topic:mytopic", "type:sns")
}
}
verifyAll {
Expand Down Expand Up @@ -243,8 +242,7 @@ abstract class Aws2SnsDataStreamsTest extends VersionedNamingTestBase {
pathwayLatencyCount += group.pathwayLatency.count
edgeLatencyCount += group.edgeLatency.count
verifyAll(group) {
edgeTags.containsAll(["direction:" + dsmDirection, "topic:mytopic", "type:sns"])
edgeTags.size() == 3
tags.hasAllTags("direction:" + dsmDirection, "topic:mytopic", "type:sns")
}
}
verifyAll {
Expand Down
Loading