From cfa5c2a657503a84a3bf4acd978eb036382f2433 Mon Sep 17 00:00:00 2001 From: Martin Dennhardt Date: Mon, 8 Aug 2022 12:38:46 +0200 Subject: [PATCH 1/3] Add test to reproduce race condition If the host is stopped as soon as WorkflowCompleted LifeCycleEvent is raised, some persistence providers cannot persist the 'Completed' state in time. --- src/WorkflowCore.Testing/WorkflowTest.cs | 16 ++++++- .../Scenarios/StopScenario.cs | 47 +++++++++++++++++++ .../Scenarios/DynamoStopScenario.cs | 18 +++++++ .../Scenarios/MongoStopScenario.cs | 16 +++++++ .../Scenarios/MysqlStopScenario.cs | 16 +++++++ .../Scenarios/PostgresStopScenario.cs | 16 +++++++ .../Scenarios/RedisStopScenario.cs | 16 +++++++ .../Scenarios/SqlServerStopScenario.cs | 16 +++++++ .../Scenarios/SqliteStopScenario.cs | 17 +++++++ .../SqliteCollection.cs | 2 +- .../SqlitePersistenceProviderFixture.cs | 7 +-- 11 files changed, 180 insertions(+), 7 deletions(-) create mode 100644 test/WorkflowCore.IntegrationTests/Scenarios/StopScenario.cs create mode 100644 test/WorkflowCore.Tests.DynamoDB/Scenarios/DynamoStopScenario.cs create mode 100644 test/WorkflowCore.Tests.MongoDB/Scenarios/MongoStopScenario.cs create mode 100644 test/WorkflowCore.Tests.MySQL/Scenarios/MysqlStopScenario.cs create mode 100644 test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresStopScenario.cs create mode 100644 test/WorkflowCore.Tests.Redis/Scenarios/RedisStopScenario.cs create mode 100644 test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerStopScenario.cs create mode 100644 test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteStopScenario.cs diff --git a/src/WorkflowCore.Testing/WorkflowTest.cs b/src/WorkflowCore.Testing/WorkflowTest.cs index bf0eb97ab..1bda5ef1e 100644 --- a/src/WorkflowCore.Testing/WorkflowTest.cs +++ b/src/WorkflowCore.Testing/WorkflowTest.cs @@ -17,6 +17,7 @@ public abstract class WorkflowTest : IDisposable protected IWorkflowHost Host; protected IPersistenceProvider PersistenceProvider; protected List UnhandledStepErrors = new List(); + private bool isDisposed; protected virtual void Setup() { @@ -116,9 +117,22 @@ protected TData GetData(string workflowId) return (TData)instance.Data; } + protected virtual void Dispose(bool disposing) + { + if (!isDisposed) + { + if (disposing) + { + Host.Stop(); + } + isDisposed = true; + } + } + public void Dispose() { - Host.Stop(); + Dispose(disposing: true); + GC.SuppressFinalize(this); } } diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/StopScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/StopScenario.cs new file mode 100644 index 000000000..e487a4053 --- /dev/null +++ b/test/WorkflowCore.IntegrationTests/Scenarios/StopScenario.cs @@ -0,0 +1,47 @@ +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using WorkflowCore.Models.LifeCycleEvents; +using WorkflowCore.Testing; +using Xunit; + +namespace WorkflowCore.IntegrationTests.Scenarios +{ + public class StopScenario : WorkflowTest + { + public class StopWorkflow : IWorkflow + { + public string Id => "StopWorkflow"; + public int Version => 1; + public void Build(IWorkflowBuilder builder) + { + builder.StartWith(context => ExecutionResult.Next()); + } + } + + public StopScenario() => Setup(); + + [Fact] + public async Task Scenario() + { + var tcs = new TaskCompletionSource(); + Host.OnLifeCycleEvent += async (evt) => + { + if (evt is WorkflowCompleted) + { + await Host.StopAsync(CancellationToken.None); + tcs.SetResult(default); + } + }; + + var workflowId = StartWorkflow(default); + await tcs.Task; + + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); + } + + protected override void Dispose(bool disposing) { } + } +} diff --git a/test/WorkflowCore.Tests.DynamoDB/Scenarios/DynamoStopScenario.cs b/test/WorkflowCore.Tests.DynamoDB/Scenarios/DynamoStopScenario.cs new file mode 100644 index 000000000..6900460d3 --- /dev/null +++ b/test/WorkflowCore.Tests.DynamoDB/Scenarios/DynamoStopScenario.cs @@ -0,0 +1,18 @@ +using System; +using Amazon.DynamoDBv2; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.DynamoDB.Scenarios +{ + [Collection("DynamoDb collection")] + public class DynamoStopScenario : StopScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + var cfg = new AmazonDynamoDBConfig {ServiceURL = DynamoDbDockerSetup.ConnectionString}; + services.AddWorkflow(x => x.UseAwsDynamoPersistence(DynamoDbDockerSetup.Credentials, cfg, "tests-")); + } + } +} diff --git a/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoStopScenario.cs b/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoStopScenario.cs new file mode 100644 index 000000000..050c203c0 --- /dev/null +++ b/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoStopScenario.cs @@ -0,0 +1,16 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.MongoDB.Scenarios +{ + [Collection("Mongo collection")] + public class MongoStopScenario : StopScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UseMongoDB(MongoDockerSetup.ConnectionString, "integration-tests")); + } + } +} diff --git a/test/WorkflowCore.Tests.MySQL/Scenarios/MysqlStopScenario.cs b/test/WorkflowCore.Tests.MySQL/Scenarios/MysqlStopScenario.cs new file mode 100644 index 000000000..e00b738d1 --- /dev/null +++ b/test/WorkflowCore.Tests.MySQL/Scenarios/MysqlStopScenario.cs @@ -0,0 +1,16 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.MySQL.Scenarios +{ + [Collection("Mysql collection")] + public class MysqlStopScenario : StopScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UseMySQL(MysqlDockerSetup.ScenarioConnectionString, true, true)); + } + } +} diff --git a/test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresStopScenario.cs b/test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresStopScenario.cs new file mode 100644 index 000000000..995681160 --- /dev/null +++ b/test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresStopScenario.cs @@ -0,0 +1,16 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.PostgreSQL.Scenarios +{ + [Collection("Postgres collection")] + public class PostgresStopScenario : StopScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UsePostgreSQL(PostgresDockerSetup.ScenarioConnectionString, true, true)); + } + } +} diff --git a/test/WorkflowCore.Tests.Redis/Scenarios/RedisStopScenario.cs b/test/WorkflowCore.Tests.Redis/Scenarios/RedisStopScenario.cs new file mode 100644 index 000000000..fad60c3fc --- /dev/null +++ b/test/WorkflowCore.Tests.Redis/Scenarios/RedisStopScenario.cs @@ -0,0 +1,16 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.Redis.Scenarios +{ + [Collection("Redis collection")] + public class RedisStopScenario : StopScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UseRedisPersistence(RedisDockerSetup.ConnectionString, "scenario-")); + } + } +} diff --git a/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerStopScenario.cs b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerStopScenario.cs new file mode 100644 index 000000000..de4e06ec1 --- /dev/null +++ b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerStopScenario.cs @@ -0,0 +1,16 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.SqlServer.Scenarios +{ + [Collection("SqlServer collection")] + public class SqlServerStopScenario : StopScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UseSqlServer(SqlDockerSetup.ScenarioConnectionString, true, true)); + } + } +} diff --git a/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteStopScenario.cs b/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteStopScenario.cs new file mode 100644 index 000000000..0b3d7e188 --- /dev/null +++ b/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteStopScenario.cs @@ -0,0 +1,17 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using WorkflowCore.Tests.Sqlite; +using Xunit; + +namespace WorkflowCore.Tests.Sqlite.Scenarios +{ + [Collection("Sqlite collection")] + public class SqliteStopScenario : StopScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UseSqlite(SqliteSetup.ConnectionString, true)); + } + } +} diff --git a/test/WorkflowCore.Tests.Sqlite/SqliteCollection.cs b/test/WorkflowCore.Tests.Sqlite/SqliteCollection.cs index 69ed8c41a..41906a519 100644 --- a/test/WorkflowCore.Tests.Sqlite/SqliteCollection.cs +++ b/test/WorkflowCore.Tests.Sqlite/SqliteCollection.cs @@ -10,7 +10,7 @@ public class SqliteCollection : ICollectionFixture public class SqliteSetup : IDisposable { - public string ConnectionString { get; set; } + public static string ConnectionString { get; set; } public SqliteSetup() { diff --git a/test/WorkflowCore.Tests.Sqlite/SqlitePersistenceProviderFixture.cs b/test/WorkflowCore.Tests.Sqlite/SqlitePersistenceProviderFixture.cs index f8318ad9b..ded55751f 100644 --- a/test/WorkflowCore.Tests.Sqlite/SqlitePersistenceProviderFixture.cs +++ b/test/WorkflowCore.Tests.Sqlite/SqlitePersistenceProviderFixture.cs @@ -10,18 +10,15 @@ namespace WorkflowCore.Tests.Sqlite [Collection("Sqlite collection")] public class SqlitePersistenceProviderFixture : BasePersistenceFixture { - string _connectionString; - public SqlitePersistenceProviderFixture(SqliteSetup setup) { - _connectionString = setup.ConnectionString; } protected override IPersistenceProvider Subject { get - { - var db = new EntityFrameworkPersistenceProvider(new SqliteContextFactory(_connectionString), true, false); + { + var db = new EntityFrameworkPersistenceProvider(new SqliteContextFactory(SqliteSetup.ConnectionString), true, false); db.EnsureStoreExists(); return db; } From 59cca7326ab3d4e7d8aac1e3048e0fc77e733f2b Mon Sep 17 00:00:00 2001 From: Martin Dennhardt Date: Mon, 8 Aug 2022 12:39:02 +0200 Subject: [PATCH 2/3] Bump Docker.DotNet to 3.125.10 --- test/Docker.Testify/Docker.Testify.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/Docker.Testify/Docker.Testify.csproj b/test/Docker.Testify/Docker.Testify.csproj index a83648df4..cff2d13e8 100644 --- a/test/Docker.Testify/Docker.Testify.csproj +++ b/test/Docker.Testify/Docker.Testify.csproj @@ -1,7 +1,7 @@  - + From f150cd4ca984606da4b5e62f58117db8b5461a5b Mon Sep 17 00:00:00 2001 From: Martin Dennhardt Date: Mon, 8 Aug 2022 13:56:49 +0200 Subject: [PATCH 3/3] Prevent information loss on host shutdown Do not pass global CancellationToken to persistence operations on host shutdown. This way it is ensured that persistence operations are not cancelled. --- src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs index b092f41f6..ad845beed 100644 --- a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs +++ b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs @@ -55,7 +55,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance finally { WorkflowActivity.Enrich(result); - await _persistenceStore.PersistWorkflow(workflow, result?.Subscriptions, cancellationToken); + await _persistenceStore.PersistWorkflow(workflow, result?.Subscriptions); await QueueProvider.QueueWork(itemId, QueueType.Index); _greylist.Remove($"wf:{itemId}"); }