Skip to content

Commit c90fa91

Browse files
authored
Fix issue where user-supplied enrichments were lost during the startu… (#133)
* Fix issue where user-supplied enrichments were lost during the startup phase * bug fix * add unit tests to simulate startup queue replay * fix unit test
1 parent 4fe89cb commit c90fa91

File tree

5 files changed

+268
-8
lines changed

5 files changed

+268
-8
lines changed

Analytics-CSharp/Segment/Analytics/Analytics.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,10 @@ public void Process(RawEvent incomingEvent, Func<RawEvent, RawEvent> enrichment
8686
{
8787
if (!Enable) return;
8888

89-
incomingEvent.ApplyRawEventData(_userInfo);
89+
incomingEvent.ApplyRawEventData(_userInfo, enrichment);
9090
AnalyticsScope.Launch(AnalyticsDispatcher, () =>
9191
{
92-
Timeline.Process(incomingEvent, enrichment);
92+
Timeline.Process(incomingEvent);
9393
});
9494
}
9595

Analytics-CSharp/Segment/Analytics/Plugins/StartupQueue.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private void ReplayEvents()
5858
{
5959
if (_queuedEvents.TryDequeue(out RawEvent e))
6060
{
61-
Analytics.Process(e);
61+
Analytics.Process(e, e.Enrichment);
6262
}
6363
}
6464
}

Analytics-CSharp/Segment/Analytics/Timeline.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,15 @@ public class Timeline
3030
/// <param name="incomingEvent">event to be processed</param>
3131
/// <param name="enrichment">a closure that enables enrichment on the generated event</param>
3232
/// <returns>event after processing</returns>
33-
internal RawEvent Process(RawEvent incomingEvent, Func<RawEvent, RawEvent> enrichment = default)
33+
internal RawEvent Process(RawEvent incomingEvent)
3434
{
3535
// Apply before and enrichment types first to start the timeline processing.
3636
RawEvent beforeResult = ApplyPlugins(PluginType.Before, incomingEvent);
3737
// Enrichment is like middleware, a chance to update the event across the board before going to destinations.
3838
RawEvent enrichmentResult = ApplyPlugins(PluginType.Enrichment, beforeResult);
39-
if (enrichment != null)
39+
if (enrichmentResult != null && enrichmentResult.Enrichment != null)
4040
{
41-
enrichmentResult = enrichment(enrichmentResult);
41+
enrichmentResult = enrichmentResult.Enrichment(enrichmentResult);
4242
}
4343

4444
// Make sure not to update the events during this next cycle. Since each destination may want different

Analytics-CSharp/Segment/Analytics/Types.cs

+4-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ public abstract class RawEvent
1818
public virtual string UserId { get; set; }
1919
public virtual string Timestamp { get; set; }
2020

21+
public Func<RawEvent, RawEvent> Enrichment { get; set; }
22+
2123
// JSON types
2224
public JsonObject Context { get; set; }
2325
public JsonObject Integrations { get; set; }
@@ -36,8 +38,9 @@ internal void ApplyRawEventData(RawEvent rawEvent)
3638
Integrations = rawEvent.Integrations;
3739
}
3840

39-
internal void ApplyRawEventData(UserInfo userInfo)
41+
internal void ApplyRawEventData(UserInfo userInfo, Func<RawEvent, RawEvent> enrichment)
4042
{
43+
Enrichment = enrichment;
4144
MessageId = Guid.NewGuid().ToString();
4245
Context = new JsonObject();
4346
Timestamp = DateTime.UtcNow.ToString("o"); // iso8601

Tests/EventsTest.cs

+258-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System.Collections.Generic;
2-
using System.Threading.Tasks;
2+
using System.Diagnostics;
3+
using System.Threading;
34
using Moq;
45
using Segment.Analytics;
56
using Segment.Analytics.Utilities;
@@ -690,4 +691,260 @@ public void TestAliasEnrichment()
690691
Assert.Equal("test", actual[0].AnonymousId);
691692
}
692693
}
694+
695+
public class DelayedEventsTest
696+
{
697+
private readonly Analytics _analytics;
698+
699+
private Settings? _settings;
700+
701+
private readonly Mock<StubEventPlugin> _plugin;
702+
703+
private readonly Mock<StubAfterEventPlugin> _afterPlugin;
704+
705+
private readonly SemaphoreSlim _httpSemaphore;
706+
private readonly SemaphoreSlim _assertSemaphore;
707+
private readonly List<RawEvent> _actual;
708+
709+
public DelayedEventsTest()
710+
{
711+
_httpSemaphore = new SemaphoreSlim(0);
712+
_assertSemaphore = new SemaphoreSlim(0);
713+
_settings = JsonUtility.FromJson<Settings?>(
714+
"{\"integrations\":{\"Segment.io\":{\"apiKey\":\"1vNgUqwJeCHmqgI9S1sOm9UHCyfYqbaQ\"}},\"plan\":{},\"edgeFunction\":{}}");
715+
716+
var mockHttpClient = new Mock<HTTPClient>(null, null, null);
717+
mockHttpClient
718+
.Setup(httpClient => httpClient.Settings())
719+
.Returns(async () =>
720+
{
721+
// suspend http calls until we tracked events
722+
// this will force events get into startup queue
723+
await _httpSemaphore.WaitAsync();
724+
return _settings;
725+
});
726+
727+
_plugin = new Mock<StubEventPlugin>
728+
{
729+
CallBase = true
730+
};
731+
732+
_afterPlugin = new Mock<StubAfterEventPlugin> { CallBase = true };
733+
_actual = new List<RawEvent>();
734+
_afterPlugin.Setup(o => o.Execute(Capture.In(_actual)))
735+
.Returns((RawEvent e) =>
736+
{
737+
// since this is an after plugin, when its execute function is called,
738+
// it is guaranteed that the enrichment closure has been called.
739+
// so we can release the semaphore on assertions.
740+
_assertSemaphore.Release();
741+
return e;
742+
});
743+
744+
var config = new Configuration(
745+
writeKey: "123",
746+
storageProvider: new DefaultStorageProvider("tests"),
747+
autoAddSegmentDestination: false,
748+
useSynchronizeDispatcher: false, // we need async analytics to buildup events on start queue
749+
httpClientProvider: new MockHttpClientProvider(mockHttpClient)
750+
);
751+
_analytics = new Analytics(config);
752+
}
753+
754+
[Fact]
755+
public void TestTrackEnrichment()
756+
{
757+
string expectedEvent = "foo";
758+
string expectedAnonymousId = "bar";
759+
760+
_analytics.Add(_afterPlugin.Object);
761+
_analytics.Track(expectedEvent, enrichment: @event =>
762+
{
763+
@event.AnonymousId = expectedAnonymousId;
764+
return @event;
765+
});
766+
767+
// now we have tracked event, i.e. event added to startup queue
768+
// release the semaphore put on http client, so we startup queue will replay the events
769+
_httpSemaphore.Release();
770+
// now we need to wait for events being fully replayed before making assertions
771+
_assertSemaphore.Wait();
772+
773+
Assert.NotEmpty(_actual);
774+
Assert.IsType<TrackEvent>(_actual[0]);
775+
var actual = _actual[0] as TrackEvent;
776+
Debug.Assert(actual != null, nameof(actual) + " != null");
777+
Assert.True(actual.Properties.Count == 0);
778+
Assert.Equal(expectedEvent, actual.Event);
779+
Assert.Equal(expectedAnonymousId, actual.AnonymousId);
780+
}
781+
782+
[Fact]
783+
public void TestIdentifyEnrichment()
784+
{
785+
var expected = new JsonObject
786+
{
787+
["foo"] = "bar"
788+
};
789+
string expectedUserId = "newUserId";
790+
791+
_analytics.Add(_afterPlugin.Object);
792+
_analytics.Identify(expectedUserId, expected, @event =>
793+
{
794+
if (@event is IdentifyEvent identifyEvent)
795+
{
796+
identifyEvent.Traits["foo"] = "baz";
797+
}
798+
799+
return @event;
800+
});
801+
802+
// now we have tracked event, i.e. event added to startup queue
803+
// release the semaphore put on http client, so we startup queue will replay the events
804+
_httpSemaphore.Release();
805+
// now we need to wait for events being fully replayed before making assertions
806+
_assertSemaphore.Wait();
807+
808+
string actualUserId = _analytics.UserId();
809+
810+
Assert.NotEmpty(_actual);
811+
var actual = _actual[0] as IdentifyEvent;
812+
Debug.Assert(actual != null, nameof(actual) + " != null");
813+
Assert.Equal(expected, actual.Traits);
814+
Assert.Equal(expectedUserId, actualUserId);
815+
}
816+
817+
[Fact]
818+
public void TestScreenEnrichment()
819+
{
820+
var expected = new JsonObject
821+
{
822+
["foo"] = "bar"
823+
};
824+
string expectedTitle = "foo";
825+
string expectedCategory = "bar";
826+
827+
_analytics.Add(_afterPlugin.Object);
828+
_analytics.Screen(expectedTitle, expected, expectedCategory, @event =>
829+
{
830+
if (@event is ScreenEvent screenEvent)
831+
{
832+
screenEvent.Properties["foo"] = "baz";
833+
}
834+
835+
return @event;
836+
});
837+
838+
// now we have tracked event, i.e. event added to startup queue
839+
// release the semaphore put on http client, so we startup queue will replay the events
840+
_httpSemaphore.Release();
841+
// now we need to wait for events being fully replayed before making assertions
842+
_assertSemaphore.Wait();
843+
844+
Assert.NotEmpty(_actual);
845+
var actual = _actual[0] as ScreenEvent;
846+
Debug.Assert(actual != null, nameof(actual) + " != null");
847+
Assert.Equal(expected, actual.Properties);
848+
Assert.Equal(expectedTitle, actual.Name);
849+
Assert.Equal(expectedCategory, actual.Category);
850+
}
851+
852+
[Fact]
853+
public void TestPageEnrichment()
854+
{
855+
var expected = new JsonObject
856+
{
857+
["foo"] = "bar"
858+
};
859+
string expectedTitle = "foo";
860+
string expectedCategory = "bar";
861+
862+
_analytics.Add(_afterPlugin.Object);
863+
_analytics.Page(expectedTitle, expected, expectedCategory, @event =>
864+
{
865+
if (@event is PageEvent pageEvent)
866+
{
867+
pageEvent.Properties["foo"] = "baz";
868+
}
869+
870+
return @event;
871+
});
872+
873+
// now we have tracked event, i.e. event added to startup queue
874+
// release the semaphore put on http client, so we startup queue will replay the events
875+
_httpSemaphore.Release();
876+
// now we need to wait for events being fully replayed before making assertions
877+
_assertSemaphore.Wait();
878+
879+
Assert.NotEmpty(_actual);
880+
var actual = _actual[0] as PageEvent;
881+
Debug.Assert(actual != null, nameof(actual) + " != null");
882+
Assert.Equal(expected, actual.Properties);
883+
Assert.Equal(expectedTitle, actual.Name);
884+
Assert.Equal(expectedCategory, actual.Category);
885+
Assert.Equal("page", actual.Type);
886+
}
887+
888+
[Fact]
889+
public void TestGroupEnrichment()
890+
{
891+
var expected = new JsonObject
892+
{
893+
["foo"] = "bar"
894+
};
895+
string expectedGroupId = "foo";
896+
897+
_analytics.Add(_afterPlugin.Object);
898+
_analytics.Group(expectedGroupId, expected, @event =>
899+
{
900+
if (@event is GroupEvent groupEvent)
901+
{
902+
groupEvent.Traits["foo"] = "baz";
903+
}
904+
905+
return @event;
906+
});
907+
908+
// now we have tracked event, i.e. event added to startup queue
909+
// release the semaphore put on http client, so we startup queue will replay the events
910+
_httpSemaphore.Release();
911+
// now we need to wait for events being fully replayed before making assertions
912+
_assertSemaphore.Wait();
913+
914+
Assert.NotEmpty(_actual);
915+
var actual = _actual[0] as GroupEvent;
916+
Debug.Assert(actual != null, nameof(actual) + " != null");
917+
Assert.Equal(expected, actual.Traits);
918+
Assert.Equal(expectedGroupId, actual.GroupId);
919+
}
920+
921+
[Fact]
922+
public void TestAliasEnrichment()
923+
{
924+
string expected = "bar";
925+
926+
_analytics.Add(_afterPlugin.Object);
927+
_analytics.Alias(expected, @event =>
928+
{
929+
if (@event is AliasEvent aliasEvent)
930+
{
931+
aliasEvent.AnonymousId = "test";
932+
}
933+
934+
return @event;
935+
});
936+
937+
// now we have tracked event, i.e. event added to startup queue
938+
// release the semaphore put on http client, so we startup queue will replay the events
939+
_httpSemaphore.Release();
940+
// now we need to wait for events being fully replayed before making assertions
941+
_assertSemaphore.Wait();
942+
943+
Assert.NotEmpty(_actual);
944+
var actual = _actual.Find(o => o is AliasEvent) as AliasEvent;
945+
Debug.Assert(actual != null, nameof(actual) + " != null");
946+
Assert.Equal(expected, actual.UserId);
947+
Assert.Equal("test", actual.AnonymousId);
948+
}
949+
}
693950
}

0 commit comments

Comments
 (0)