diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 5374cb9..622447c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -10,7 +10,7 @@ on: jobs: cancel_previous: permissions: write-all - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - uses: styfle/cancel-workflow-action@0.9.1 with: @@ -18,7 +18,7 @@ jobs: build: needs: cancel_previous - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v3 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 76bba31..62f2906 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -9,7 +9,7 @@ on: jobs: release: permissions: write-all - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 environment: deployment steps: diff --git a/Analytics-CSharp/Segment/Analytics/Configuration.cs b/Analytics-CSharp/Segment/Analytics/Configuration.cs index d8d997b..79de436 100644 --- a/Analytics-CSharp/Segment/Analytics/Configuration.cs +++ b/Analytics-CSharp/Segment/Analytics/Configuration.cs @@ -85,7 +85,7 @@ public Configuration(string writeKey, IStorageProvider storageProvider = default, IHTTPClientProvider httpClientProvider = default, IList flushPolicies = default, - EventPipelineProvider eventPipelineProvider = default) + IEventPipelineProvider eventPipelineProvider = default) { WriteKey = writeKey; FlushAt = flushAt; diff --git a/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs b/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs index a52565b..4411e6f 100644 --- a/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs +++ b/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs @@ -1,5 +1,6 @@ using System.Collections.Generic; using System.Threading; +using System.Threading.Tasks; using global::System; using global::System.Linq; using Segment.Analytics.Policies; @@ -70,9 +71,12 @@ public SyncEventPipeline( public void Put(RawEvent @event) => _writeChannel.Send(@event); public void Flush() { - FlushEvent flushEvent = new FlushEvent(new SemaphoreSlim(1,1)); - _writeChannel.Send(flushEvent); - flushEvent._semaphore.Wait(_flushTimeout, _flushCancellationToken); + if (Running && !_uploadChannel.isCancelled) + { + FlushEvent flushEvent = new FlushEvent(new SemaphoreSlim(0)); + _writeChannel.Send(flushEvent); + flushEvent._semaphore.Wait(_flushTimeout, _flushCancellationToken); + } } public void Start() diff --git a/Tests/Utilities/EventPipelineTest.cs b/Tests/Utilities/EventPipelineTest.cs index 02d20d2..45ff74c 100644 --- a/Tests/Utilities/EventPipelineTest.cs +++ b/Tests/Utilities/EventPipelineTest.cs @@ -10,6 +10,7 @@ using Segment.Serialization; using Tests.Utils; using Xunit; +using System.Linq; namespace Tests.Utilities { @@ -205,5 +206,145 @@ public async Task TestFlushInterruptedWhenNoFileExist(IEventPipelineProvider pro _mockHttpClient.Verify(o => o.Upload(_bytes), Times.Exactly(0)); _storage.Verify(o => o.RemoveFile(_file), Times.Exactly(0)); } + + [Theory] + [MemberData(nameof(GetPipelineProvider))] + public void TestConfigWithEventPipelineProviders(IEventPipelineProvider provider) + { + // Just validate that the provider is used in the configuration + var config = new Configuration( + writeKey: "123", + autoAddSegmentDestination: false, + useSynchronizeDispatcher: true, + flushInterval: 0, + flushAt: 2, + httpClientProvider: new MockHttpClientProvider(_mockHttpClient), + storageProvider: new MockStorageProvider(_storage), + eventPipelineProvider: provider + ); + var analytics = new Analytics(config); + analytics.Track("test"); + } + + [Fact] + public void TestSyncEventPipelineProviderWaits() + { + const int iterations = 100; + const int newAnalyticsEvery = 10; + const int eventCount = 10; + + int totalTracks = 0; + int totalUploads = 0; + + _mockHttpClient + .Setup(client => client.Upload(It.IsAny())) + .Callback(bytes => + { + string content = System.Text.Encoding.UTF8.GetString(bytes); + int count = content.Split(new string[] { "test" }, StringSplitOptions.None).Length - 1; + totalUploads += count; + }) + .ReturnsAsync(true); + + var config = new Configuration( + writeKey: "123", + useSynchronizeDispatcher: true, + flushInterval: 100000, + flushAt: eventCount * 2, + httpClientProvider: new MockHttpClientProvider(_mockHttpClient), + storageProvider: new InMemoryStorageProvider(), + eventPipelineProvider: new SyncEventPipelineProvider() + ); + + var analytics = new Analytics(config); + for (int j = 0; j < iterations; j++) + { + if (j % newAnalyticsEvery == 0) + { + analytics = new Analytics(config); + } + _mockHttpClient.Invocations.Clear(); + for (int i = 0; i < eventCount; i++) + { + analytics.Track($"test {i}"); + totalTracks++; + } + analytics.Flush(); + +#pragma warning disable CS4014 // Silly compiler, this isn't an invocation so it doesn't need to be awaited + _mockHttpClient.Verify(client => client.Upload(It.IsAny()), Times.AtLeastOnce, $"Iteration {j} of {eventCount}"); +#pragma warning restore CS4014 + IInvocation lastUploadInvocation = _mockHttpClient.Invocations.Last(invocation => invocation.Method.Name == "Upload"); + int testsUploaded = System.Text.Encoding.UTF8 + .GetString((byte[])lastUploadInvocation.Arguments[0]) + .Split(new string[] { "test" }, StringSplitOptions.None).Length - 1; + Assert.Equal(eventCount, testsUploaded); + } + Assert.Equal(totalTracks, totalUploads); + } + + [Fact] + public void TestRepeatedFlushesDontHang() + { + var config = new Configuration( + writeKey: "123", + useSynchronizeDispatcher: true, + flushInterval: 0, + flushAt: 1, + httpClientProvider: new MockHttpClientProvider(_mockHttpClient), + storageProvider: new MockStorageProvider(_storage), + eventPipelineProvider: new SyncEventPipelineProvider(5000) + ); + var analytics = new Analytics(config); + analytics.Track("test"); + DateTime startTime = DateTime.Now; + analytics.Flush(); + analytics.Flush(); + analytics.Flush(); + analytics.Flush(); + analytics.Flush(); + Assert.True(DateTime.Now - startTime < TimeSpan.FromMilliseconds(100)); + } + + [Fact] + public void TestConfigWithCustomEventPipelineProvider() + { + // Just validate that the provider is used in the configuration + var config = new Configuration( + writeKey: "123", + useSynchronizeDispatcher: true, + flushInterval: 0, + flushAt: 1, + httpClientProvider: new MockHttpClientProvider(_mockHttpClient), + storageProvider: new MockStorageProvider(_storage), + eventPipelineProvider: new CustomEventPipelineProvider() + ); + Assert.Throws(() => { + var analytics = new Analytics(config); + analytics.Track("test"); + analytics.Flush(); + }); + } + + + public class CustomEventPipelineProvider : IEventPipelineProvider + { + public CustomEventPipelineProvider() {} + public IEventPipeline Create(Analytics analytics, string key) + { + return new CustomEventPipeline(analytics, key); + } + + private class CustomEventPipeline : IEventPipeline + { + public CustomEventPipeline(Analytics analytics, string key) {} + public bool Running => throw new NotImplementedException(); + public string ApiHost { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } + public void Flush() => throw new NotImplementedException(); + public void Put(RawEvent @event) => throw new NotImplementedException(); + public void Start() => throw new NotImplementedException(); + public void Stop() => throw new NotImplementedException(); + } + } } }