Skip to content

Commit 09fcaa6

Browse files
authored
Fix check for parent context when extract propagation context fails (#319)
* Fix context check in trace subscription method * Extract propagation context from Event instead of OriginalEvent
1 parent c736189 commit 09fcaa6

File tree

7 files changed

+138
-114
lines changed

7 files changed

+138
-114
lines changed

src/EventStore.Client.Streams/EventStoreClient.Append.cs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,9 @@ await call.RequestStream
132132
foreach (var e in eventData) {
133133
var appendReq = new AppendReq {
134134
ProposedMessage = new() {
135-
Id = e.EventId.ToDto(),
136-
Data = ByteString.CopyFrom(e.Data.Span),
137-
CustomMetadata = ByteString.CopyFrom(
138-
e.ContentType == Constants.Metadata.ContentTypes.ApplicationJson
139-
? e.Metadata.InjectTracingContext(Activity.Current)
140-
: e.Metadata.Span
141-
),
135+
Id = e.EventId.ToDto(),
136+
Data = ByteString.CopyFrom(e.Data.Span),
137+
CustomMetadata = ByteString.CopyFrom(e.Metadata.InjectTracingContext(Activity.Current)),
142138
Metadata = {
143139
{ Constants.Metadata.Type, e.Type },
144140
{ Constants.Metadata.ContentType, e.ContentType }
@@ -392,13 +388,9 @@ IEnumerable<BatchAppendReq> GetRequests(IEnumerable<EventData> events, BatchAppe
392388

393389
foreach (var eventData in events) {
394390
var proposedMessage = new BatchAppendReq.Types.ProposedMessage {
395-
Data = ByteString.CopyFrom(eventData.Data.Span),
396-
CustomMetadata = ByteString.CopyFrom(
397-
eventData.ContentType == Constants.Metadata.ContentTypes.ApplicationJson
398-
? eventData.Metadata.InjectTracingContext(Activity.Current)
399-
: eventData.Metadata.Span
400-
),
401-
Id = eventData.EventId.ToDto(),
391+
Data = ByteString.CopyFrom(eventData.Data.Span),
392+
CustomMetadata = ByteString.CopyFrom(eventData.Metadata.InjectTracingContext(Activity.Current)),
393+
Id = eventData.EventId.ToDto(),
402394
Metadata = {
403395
{ Constants.Metadata.Type, eventData.Type },
404396
{ Constants.Metadata.ContentType, eventData.ContentType }
@@ -435,4 +427,4 @@ public void Dispose() {
435427
}
436428
}
437429
}
438-
}
430+
}

src/EventStore.Client/Common/Diagnostics/ActivitySourceExtensions.cs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ public static async ValueTask<T> TraceClientOperation<T>(
1818
var res = await tracedOperation().ConfigureAwait(false);
1919
activity?.StatusOk();
2020
return res;
21-
}
22-
catch (Exception ex) {
21+
} catch (Exception ex) {
2322
activity?.StatusError(ex);
2423
throw;
2524
}
@@ -33,15 +32,12 @@ public static void TraceSubscriptionEvent(
3332
EventStoreClientSettings settings,
3433
UserCredentials? userCredentials
3534
) {
36-
if (resolvedEvent.OriginalEvent.ContentType != Constants.Metadata.ContentTypes.ApplicationJson)
37-
return;
38-
3935
if (source.HasNoActiveListeners())
4036
return;
4137

42-
var parentContext = resolvedEvent.OriginalEvent.Metadata.ExtractPropagationContext();
38+
var parentContext = resolvedEvent.Event.Metadata.ExtractPropagationContext();
4339

44-
if (parentContext is null) return;
40+
if (parentContext == default(ActivityContext)) return;
4541

4642
var tags = new ActivityTagsCollection()
4743
.WithRequiredTag(TelemetryTags.EventStore.Stream, resolvedEvent.OriginalEvent.EventStreamId)
@@ -51,14 +47,19 @@ public static void TraceSubscriptionEvent(
5147
// Ensure consistent server.address attribute when connecting to cluster via dns discovery
5248
.WithGrpcChannelServerTags(channelInfo)
5349
.WithClientSettingsServerTags(settings)
54-
.WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? settings.DefaultCredentials?.Username);
50+
.WithOptionalTag(
51+
TelemetryTags.Database.User,
52+
userCredentials?.Username ?? settings.DefaultCredentials?.Username
53+
);
5554

56-
StartActivity(source, TracingConstants.Operations.Subscribe, ActivityKind.Consumer, tags, parentContext)?.Dispose();
55+
StartActivity(source, TracingConstants.Operations.Subscribe, ActivityKind.Consumer, tags, parentContext)
56+
?.Dispose();
5757
}
5858

5959
static Activity? StartActivity(
6060
this ActivitySource source,
61-
string operationName, ActivityKind activityKind, ActivityTagsCollection? tags = null, ActivityContext? parentContext = null
61+
string operationName, ActivityKind activityKind, ActivityTagsCollection? tags = null,
62+
ActivityContext? parentContext = null
6263
) {
6364
if (source.HasNoActiveListeners())
6465
return null;
@@ -79,4 +80,4 @@ public static void TraceSubscriptionEvent(
7980
}
8081

8182
static bool HasNoActiveListeners(this ActivitySource source) => !source.HasListeners();
82-
}
83+
}

src/EventStore.Client/Common/Diagnostics/Core/Tracing/TracingMetadata.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,8 @@ readonly record struct TracingMetadata(
2525
isRemote: isRemote
2626
)
2727
: default;
28-
}
29-
catch (Exception) {
28+
} catch (Exception) {
3029
return default;
3130
}
3231
}
33-
}
32+
}

src/EventStore.Client/Common/Diagnostics/EventMetadataExtensions.cs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ namespace EventStore.Client.Diagnostics;
88

99
static class EventMetadataExtensions {
1010
[MethodImpl(MethodImplOptions.AggressiveInlining)]
11-
public static ReadOnlySpan<byte> InjectTracingContext(this ReadOnlyMemory<byte> eventMetadata, Activity? activity) =>
11+
public static ReadOnlySpan<byte> InjectTracingContext(
12+
this ReadOnlyMemory<byte> eventMetadata, Activity? activity
13+
) =>
1214
eventMetadata.InjectTracingMetadata(activity?.GetTracingMetadata() ?? TracingMetadata.None);
1315

1416
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -32,14 +34,15 @@ public static TracingMetadata ExtractTracingMetadata(this ReadOnlyMemory<byte> e
3234

3335
return new TracingMetadata(traceId.GetString(), spanId.GetString());
3436
}
35-
}
36-
catch (Exception) {
37+
} catch (Exception) {
3738
return TracingMetadata.None;
3839
}
3940
}
4041

4142
[MethodImpl(MethodImplOptions.AggressiveInlining)]
42-
static ReadOnlySpan<byte> InjectTracingMetadata(this ReadOnlyMemory<byte> eventMetadata, TracingMetadata tracingMetadata) {
43+
static ReadOnlySpan<byte> InjectTracingMetadata(
44+
this ReadOnlyMemory<byte> eventMetadata, TracingMetadata tracingMetadata
45+
) {
4346
if (tracingMetadata == TracingMetadata.None || !tracingMetadata.IsValid)
4447
return eventMetadata.Span;
4548

@@ -49,7 +52,9 @@ static ReadOnlySpan<byte> InjectTracingMetadata(this ReadOnlyMemory<byte> eventM
4952
}
5053

5154
[MethodImpl(MethodImplOptions.AggressiveInlining)]
52-
static ReadOnlyMemory<byte> TryInjectTracingMetadata(this ReadOnlyMemory<byte> utf8Json, TracingMetadata tracingMetadata) {
55+
static ReadOnlyMemory<byte> TryInjectTracingMetadata(
56+
this ReadOnlyMemory<byte> utf8Json, TracingMetadata tracingMetadata
57+
) {
5358
try {
5459
using var doc = JsonDocument.Parse(utf8Json);
5560
using var stream = new MemoryStream();
@@ -72,9 +77,8 @@ static ReadOnlyMemory<byte> TryInjectTracingMetadata(this ReadOnlyMemory<byte> u
7277
writer.Flush();
7378

7479
return stream.ToArray();
75-
}
76-
catch (Exception) {
80+
} catch (Exception) {
7781
return utf8Json;
7882
}
7983
}
80-
}
84+
}

test/EventStore.Client.Streams.Tests/Diagnostics/StreamsTracingInstrumentationTests.cs

Lines changed: 68 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
using System.Text.Json;
21
using EventStore.Client.Diagnostics;
32
using EventStore.Diagnostics.Tracing;
43

54
namespace EventStore.Client.Streams.Tests.Diagnostics;
65

76
[Trait("Category", "Diagnostics:Tracing")]
8-
public class StreamsTracingInstrumentationTests(ITestOutputHelper output, DiagnosticsFixture fixture) : EventStoreTests<DiagnosticsFixture>(output, fixture) {
7+
public class StreamsTracingInstrumentationTests(ITestOutputHelper output, DiagnosticsFixture fixture)
8+
: EventStoreTests<DiagnosticsFixture>(output, fixture) {
99
[Fact]
1010
public async Task AppendIsInstrumentedWithTracingAsExpected() {
1111
var stream = Fixture.GetStreamName();
@@ -42,66 +42,6 @@ public async Task AppendTraceIsTaggedWithErrorStatusOnException() {
4242
Fixture.AssertErroneousAppendActivityHasExpectedTags(activity, actualException);
4343
}
4444

45-
[Fact]
46-
public async Task CatchupSubscriptionIsInstrumentedWithTracingAndRestoresRemoteAppendContextAsExpected() {
47-
var stream = Fixture.GetStreamName();
48-
var events = Fixture.CreateTestEvents(2, metadata: Fixture.CreateTestJsonMetadata()).ToArray();
49-
50-
await Fixture.Streams.AppendToStreamAsync(
51-
stream,
52-
StreamState.NoStream,
53-
events
54-
);
55-
56-
string? subscriptionId = null;
57-
await Subscribe().WithTimeout();
58-
59-
var appendActivity = Fixture
60-
.GetActivitiesForOperation(TracingConstants.Operations.Append, stream)
61-
.SingleOrDefault()
62-
.ShouldNotBeNull();
63-
64-
var subscribeActivities = Fixture
65-
.GetActivitiesForOperation(TracingConstants.Operations.Subscribe, stream)
66-
.ToArray();
67-
68-
subscriptionId.ShouldNotBeNull();
69-
subscribeActivities.Length.ShouldBe(events.Length);
70-
71-
for (var i = 0; i < subscribeActivities.Length; i++) {
72-
subscribeActivities[i].TraceId.ShouldBe(appendActivity.Context.TraceId);
73-
subscribeActivities[i].ParentSpanId.ShouldBe(appendActivity.Context.SpanId);
74-
subscribeActivities[i].HasRemoteParent.ShouldBeTrue();
75-
76-
Fixture.AssertSubscriptionActivityHasExpectedTags(
77-
subscribeActivities[i],
78-
stream,
79-
events[i].EventId.ToString(),
80-
subscriptionId
81-
);
82-
}
83-
84-
return;
85-
86-
async Task Subscribe() {
87-
await using var subscription = Fixture.Streams.SubscribeToStream(stream, FromStream.Start);
88-
await using var enumerator = subscription.Messages.GetAsyncEnumerator();
89-
90-
var eventsAppeared = 0;
91-
while (await enumerator.MoveNextAsync()) {
92-
if (enumerator.Current is StreamMessage.SubscriptionConfirmation(var sid))
93-
subscriptionId = sid;
94-
95-
if (enumerator.Current is not StreamMessage.Event(_))
96-
continue;
97-
98-
eventsAppeared++;
99-
if (eventsAppeared >= events.Length)
100-
return;
101-
}
102-
}
103-
}
104-
10545
[Fact]
10646
public async Task TracingContextIsInjectedWhenUserMetadataIsValidJsonObject() {
10747
var stream = Fixture.GetStreamName();
@@ -148,7 +88,7 @@ await Fixture.Streams.AppendToStreamAsync(
14888
}
14989

15090
[Fact]
151-
public async Task TracingContextIsNotInjectedWhenEventIsNotJsonButHasJsonMetadata() {
91+
public async Task TracingContextIsInjectedWhenEventIsNotJsonButHasJsonMetadata() {
15292
var stream = Fixture.GetStreamName();
15393

15494
var inputMetadata = Fixture.CreateTestJsonMetadata().ToArray();
@@ -166,7 +106,69 @@ await Fixture.Streams.AppendToStreamAsync(
166106
.ToListAsync();
167107

168108
var outputMetadata = readResult[0].OriginalEvent.Metadata.ToArray();
169-
var test = JsonSerializer.Deserialize<object>(outputMetadata);
170-
outputMetadata.ShouldBe(inputMetadata);
109+
outputMetadata.ShouldNotBe(inputMetadata);
110+
111+
var appendActivities = Fixture.GetActivitiesForOperation(TracingConstants.Operations.Append, stream);
112+
113+
appendActivities.ShouldNotBeEmpty();
114+
}
115+
116+
[Fact]
117+
public async Task json_metadata_event_is_traced_and_non_json_metadata_event_is_not_traced() {
118+
var streamName = Fixture.GetStreamName();
119+
120+
var seedEvents = new[] {
121+
Fixture.CreateTestEvent(metadata: Fixture.CreateTestJsonMetadata()),
122+
Fixture.CreateTestEvent(metadata: Fixture.CreateTestNonJsonMetadata())
123+
};
124+
125+
var availableEvents = new HashSet<Uuid>(seedEvents.Select(x => x.EventId));
126+
127+
await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents);
128+
129+
await using var subscription = Fixture.Streams.SubscribeToStream(streamName, FromStream.Start);
130+
await using var enumerator = subscription.Messages.GetAsyncEnumerator();
131+
132+
var appendActivities = Fixture
133+
.GetActivitiesForOperation(TracingConstants.Operations.Append, streamName)
134+
.ShouldNotBeNull();
135+
136+
Assert.True(await enumerator.MoveNextAsync());
137+
138+
Assert.IsType<StreamMessage.SubscriptionConfirmation>(enumerator.Current);
139+
140+
await Subscribe(enumerator).WithTimeout();
141+
142+
var subscribeActivities = Fixture
143+
.GetActivitiesForOperation(TracingConstants.Operations.Subscribe, streamName)
144+
.ToArray();
145+
146+
appendActivities.ShouldHaveSingleItem();
147+
148+
subscribeActivities.ShouldHaveSingleItem();
149+
150+
subscribeActivities.First().ParentId.ShouldBe(appendActivities.First().Id);
151+
152+
var jsonMetadataEvent = seedEvents.First();
153+
154+
Fixture.AssertSubscriptionActivityHasExpectedTags(
155+
subscribeActivities.First(),
156+
streamName,
157+
jsonMetadataEvent.EventId.ToString()
158+
);
159+
160+
return;
161+
162+
async Task Subscribe(IAsyncEnumerator<StreamMessage> internalEnumerator) {
163+
while (await internalEnumerator.MoveNextAsync()) {
164+
if (internalEnumerator.Current is not StreamMessage.Event(var resolvedEvent))
165+
continue;
166+
167+
availableEvents.Remove(resolvedEvent.Event.EventId);
168+
169+
if (availableEvents.Count == 0)
170+
return;
171+
}
172+
}
171173
}
172-
}
174+
}

test/EventStore.Client.Tests.Common/Fixtures/DiagnosticsFixture.cs

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public class DiagnosticsFixture : EventStoreFixture {
1414
public DiagnosticsFixture() {
1515
var diagnosticActivityListener = new ActivityListener {
1616
ShouldListenTo = source => source.Name == EventStoreClientDiagnostics.InstrumentationName,
17-
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllDataAndRecorded,
17+
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllDataAndRecorded,
1818
ActivityStopped = activity => {
1919
var operation = (string?)activity.GetTagItem(TelemetryTags.Database.Operation);
2020
var stream = (string?)activity.GetTagItem(TelemetryTags.EventStore.Stream);
@@ -71,23 +71,37 @@ public void AssertErroneousAppendActivityHasExpectedTags(Activity activity, Exce
7171
var actualEvent = activity.Events.ShouldHaveSingleItem();
7272

7373
actualEvent.Name.ShouldBe(TelemetryTags.Exception.EventName);
74-
actualEvent.Tags.ShouldContain(new KeyValuePair<string, object?>(TelemetryTags.Exception.Type, actualException.GetType().FullName));
75-
actualEvent.Tags.ShouldContain(new KeyValuePair<string, object?>(TelemetryTags.Exception.Message, actualException.Message));
74+
actualEvent.Tags.ShouldContain(
75+
new KeyValuePair<string, object?>(TelemetryTags.Exception.Type, actualException.GetType().FullName)
76+
);
77+
78+
actualEvent.Tags.ShouldContain(
79+
new KeyValuePair<string, object?>(TelemetryTags.Exception.Message, actualException.Message)
80+
);
81+
7682
actualEvent.Tags.Any(x => x.Key == TelemetryTags.Exception.Stacktrace).ShouldBeTrue();
7783
}
7884

79-
public void AssertSubscriptionActivityHasExpectedTags(Activity activity, string stream, string eventId, string? subscriptionId) {
85+
public void AssertSubscriptionActivityHasExpectedTags(
86+
Activity activity,
87+
string stream,
88+
string eventId,
89+
string? subscriptionId = null
90+
) {
8091
var expectedTags = new Dictionary<string, string?> {
8192
{ TelemetryTags.Database.System, EventStoreClientDiagnostics.InstrumentationName },
8293
{ TelemetryTags.Database.Operation, TracingConstants.Operations.Subscribe },
8394
{ TelemetryTags.EventStore.Stream, stream },
8495
{ TelemetryTags.EventStore.EventId, eventId },
8596
{ TelemetryTags.EventStore.EventType, TestEventType },
86-
{ TelemetryTags.EventStore.SubscriptionId, subscriptionId },
8797
{ TelemetryTags.Database.User, TestCredentials.Root.Username }
8898
};
8999

90-
foreach (var tag in expectedTags)
100+
if (subscriptionId != null)
101+
expectedTags[TelemetryTags.EventStore.SubscriptionId] = subscriptionId;
102+
103+
foreach (var tag in expectedTags) {
91104
activity.Tags.ShouldContain(tag);
105+
}
92106
}
93-
}
107+
}

0 commit comments

Comments
 (0)