Skip to content

Commit 16661ef

Browse files
authored
Merge pull request #1078 from viktorshevchenko210/workflow-reliability
Fixed consistency of WorkflowConsumer when persisting workflow
2 parents d0a2f74 + 4cf59da commit 16661ef

File tree

11 files changed

+225
-17
lines changed

11 files changed

+225
-17
lines changed

src/WorkflowCore/Interface/Persistence/IWorkflowRepository.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ public interface IWorkflowRepository
1212

1313
Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default);
1414

15+
Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default);
16+
1517
Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default);
1618

1719
[Obsolete]

src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
5555
finally
5656
{
5757
WorkflowActivity.Enrich(result);
58-
await _persistenceStore.PersistWorkflow(workflow, cancellationToken);
58+
await _persistenceStore.PersistWorkflow(workflow, result.Subscriptions, cancellationToken);
5959
await QueueProvider.QueueWork(itemId, QueueType.Index);
6060
_greylist.Remove($"wf:{itemId}");
6161
}
@@ -68,7 +68,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
6868
{
6969
foreach (var sub in result.Subscriptions)
7070
{
71-
await SubscribeEvent(sub, _persistenceStore, cancellationToken);
71+
await TryProcessSubscription(sub, _persistenceStore, cancellationToken);
7272
}
7373

7474
await _persistenceStore.PersistErrors(result.Errors, cancellationToken);
@@ -98,12 +98,8 @@ await _persistenceStore.ScheduleCommand(new ScheduledCommand()
9898

9999
}
100100

101-
private async Task SubscribeEvent(EventSubscription subscription, IPersistenceProvider persistenceStore, CancellationToken cancellationToken)
101+
private async Task TryProcessSubscription(EventSubscription subscription, IPersistenceProvider persistenceStore, CancellationToken cancellationToken)
102102
{
103-
//TODO: move to own class
104-
Logger.LogDebug("Subscribing to event {0} {1} for workflow {2} step {3}", subscription.EventName, subscription.EventKey, subscription.WorkflowId, subscription.StepId);
105-
106-
await persistenceStore.CreateEventSubscription(subscription, cancellationToken);
107103
if (subscription.EventName != Event.EventTypeActivity)
108104
{
109105
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf, cancellationToken);

src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,25 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _
4646
}
4747
}
4848

49+
public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
50+
{
51+
lock (_instances)
52+
{
53+
var existing = _instances.First(x => x.Id == workflow.Id);
54+
_instances.Remove(existing);
55+
_instances.Add(workflow);
56+
57+
lock (_subscriptions)
58+
{
59+
foreach (var subscription in subscriptions)
60+
{
61+
subscription.Id = Guid.NewGuid().ToString();
62+
_subscriptions.Add(subscription);
63+
}
64+
}
65+
}
66+
}
67+
4968
public async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken _ = default)
5069
{
5170
lock (_instances)

src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,16 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)
5050

5151
public Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default) => _innerService.PersistWorkflow(workflow);
5252

53+
public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
54+
{
55+
await PersistWorkflow(workflow, cancellationToken);
56+
57+
foreach(var subscription in subscriptions)
58+
{
59+
await CreateEventSubscription(subscription, cancellationToken);
60+
}
61+
}
62+
5363
public Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.TerminateSubscription(eventSubscriptionId);
5464
public Task<EventSubscription> GetSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.GetSubscription(eventSubscriptionId);
5565

src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,32 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c
151151
await db.SaveChangesAsync(cancellationToken);
152152
}
153153
}
154+
155+
public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
156+
{
157+
using (var db = ConstructDbContext())
158+
{
159+
var uid = new Guid(workflow.Id);
160+
var existingEntity = await db.Set<PersistedWorkflow>()
161+
.Where(x => x.InstanceId == uid)
162+
.Include(wf => wf.ExecutionPointers)
163+
.ThenInclude(ep => ep.ExtensionAttributes)
164+
.Include(wf => wf.ExecutionPointers)
165+
.AsTracking()
166+
.FirstAsync(cancellationToken);
167+
168+
var workflowPersistable = workflow.ToPersistable(existingEntity);
169+
170+
foreach (var subscription in subscriptions)
171+
{
172+
subscription.Id = Guid.NewGuid().ToString();
173+
var subscriptionPersistable = subscription.ToPersistable();
174+
db.Set<PersistedSubscription>().Add(subscriptionPersistable);
175+
}
176+
177+
await db.SaveChangesAsync(cancellationToken);
178+
}
179+
}
154180

155181
public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default)
156182
{

src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,17 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c
149149
await WorkflowInstances.ReplaceOneAsync(x => x.Id == workflow.Id, workflow, cancellationToken: cancellationToken);
150150
}
151151

152+
public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
153+
{
154+
using (var session = await _database.Client.StartSessionAsync())
155+
{
156+
session.StartTransaction();
157+
await PersistWorkflow(workflow, cancellationToken);
158+
await EventSubscriptions.InsertManyAsync(subscriptions, cancellationToken: cancellationToken);
159+
await session.CommitTransactionAsync();
160+
}
161+
}
162+
152163
public async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default)
153164
{
154165
var now = asAt.ToUniversalTime().Ticks;

src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using Raven.Client.Documents;
22
using Raven.Client.Documents.Linq;
33
using Raven.Client.Documents.Operations;
4+
using Raven.Client.Documents.Session;
45
using System;
56
using System.Collections.Generic;
67
using System.Linq;
@@ -56,21 +57,41 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c
5657
{
5758
using (var session = _database.OpenAsyncSession())
5859
{
59-
session.Advanced.Patch<WorkflowInstance, string>(workflow.Id, x => x.WorkflowDefinitionId, workflow.WorkflowDefinitionId);
60-
session.Advanced.Patch<WorkflowInstance, int>(workflow.Id, x => x.Version, workflow.Version);
61-
session.Advanced.Patch<WorkflowInstance, string>(workflow.Id, x => x.Description, workflow.Description);
62-
session.Advanced.Patch<WorkflowInstance, string>(workflow.Id, x => x.Reference, workflow.Reference);
63-
session.Advanced.Patch<WorkflowInstance, ExecutionPointerCollection>(workflow.Id, x => x.ExecutionPointers, workflow.ExecutionPointers);
64-
session.Advanced.Patch<WorkflowInstance, long?>(workflow.Id, x => x.NextExecution, workflow.NextExecution);
65-
session.Advanced.Patch<WorkflowInstance, WorkflowStatus>(workflow.Id, x => x.Status, workflow.Status);
66-
session.Advanced.Patch<WorkflowInstance, object>(workflow.Id, x => x.Data, workflow.Data);
67-
session.Advanced.Patch<WorkflowInstance, DateTime>(workflow.Id, x => x.CreateTime, workflow.CreateTime);
68-
session.Advanced.Patch<WorkflowInstance, DateTime?>(workflow.Id, x => x.CompleteTime, workflow.CompleteTime);
60+
PatchSession(session, workflow);
61+
await session.SaveChangesAsync(cancellationToken);
62+
}
63+
}
64+
65+
public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
66+
{
67+
using (var session = _database.OpenAsyncSession())
68+
{
69+
PatchSession(session, workflow);
70+
71+
foreach (var subscription in subscriptions)
72+
{
73+
await session.StoreAsync(subscription, cancellationToken);
74+
}
6975

7076
await session.SaveChangesAsync(cancellationToken);
7177
}
7278
}
7379

80+
private void PatchSession(IAsyncDocumentSession session, WorkflowInstance workflow)
81+
{
82+
session.Advanced.Patch<WorkflowInstance, string>(workflow.Id, x => x.WorkflowDefinitionId, workflow.WorkflowDefinitionId);
83+
session.Advanced.Patch<WorkflowInstance, int>(workflow.Id, x => x.Version, workflow.Version);
84+
session.Advanced.Patch<WorkflowInstance, string>(workflow.Id, x => x.Description, workflow.Description);
85+
session.Advanced.Patch<WorkflowInstance, string>(workflow.Id, x => x.Reference, workflow.Reference);
86+
session.Advanced.Patch<WorkflowInstance, ExecutionPointerCollection>(workflow.Id, x => x.ExecutionPointers, workflow.ExecutionPointers);
87+
session.Advanced.Patch<WorkflowInstance, long?>(workflow.Id, x => x.NextExecution, workflow.NextExecution);
88+
session.Advanced.Patch<WorkflowInstance, WorkflowStatus>(workflow.Id, x => x.Status, workflow.Status);
89+
session.Advanced.Patch<WorkflowInstance, object>(workflow.Id, x => x.Data, workflow.Data);
90+
session.Advanced.Patch<WorkflowInstance, DateTime>(workflow.Id, x => x.CreateTime, workflow.CreateTime);
91+
session.Advanced.Patch<WorkflowInstance, DateTime?>(workflow.Id, x => x.CompleteTime, workflow.CompleteTime);
92+
93+
}
94+
7495
public async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default)
7596
{
7697
var now = asAt.ToUniversalTime().Ticks;

src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,43 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c
6161
var response = await _client.PutItemAsync(request, cancellationToken);
6262
}
6363

64+
public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
65+
{
66+
var transactionWriteItemsRequest = new TransactWriteItemsRequest()
67+
{
68+
TransactItems = new List<TransactWriteItem>()
69+
{
70+
{
71+
new TransactWriteItem()
72+
{
73+
Put = new Put()
74+
{
75+
TableName = $"{_tablePrefix}-{WORKFLOW_TABLE}",
76+
Item = workflow.ToDynamoMap()
77+
}
78+
}
79+
}
80+
}
81+
};
82+
83+
foreach(var subscription in subscriptions)
84+
{
85+
subscription.Id = Guid.NewGuid().ToString();
86+
87+
transactionWriteItemsRequest.TransactItems.Add(new TransactWriteItem()
88+
{
89+
Put = new Put()
90+
{
91+
TableName = $"{_tablePrefix}-{SUBCRIPTION_TABLE}",
92+
Item = subscription.ToDynamoMap(),
93+
ConditionExpression = "attribute_not_exists(id)"
94+
}
95+
});
96+
}
97+
98+
await _client.TransactWriteItemsAsync(transactionWriteItemsRequest, cancellationToken);
99+
}
100+
64101
public async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default)
65102
{
66103
var result = new List<string>();

src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,16 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c
227227
await _workflowContainer.Value.UpsertItemAsync(PersistedWorkflow.FromInstance(workflow), cancellationToken: cancellationToken);
228228
}
229229

230+
public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
231+
{
232+
await PersistWorkflow(workflow, cancellationToken);
233+
234+
foreach(var subscription in subscriptions)
235+
{
236+
await CreateEventSubscription(subscription, cancellationToken);
237+
}
238+
}
239+
230240
public Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default)
231241
{
232242
throw new NotImplementedException();

src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,16 @@ public async Task<string> CreateNewWorkflow(WorkflowInstance workflow, Cancellat
4747
return workflow.Id;
4848
}
4949

50+
public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
51+
{
52+
await PersistWorkflow(workflow, cancellationToken);
53+
54+
foreach (var subscription in subscriptions)
55+
{
56+
await CreateEventSubscription(subscription, cancellationToken);
57+
}
58+
}
59+
5060
public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default)
5161
{
5262
var str = JsonConvert.SerializeObject(workflow, _serializerSettings);

test/WorkflowCore.UnitTests/BasePersistenceFixture.cs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,72 @@ public void PersistWorkflow()
185185
var current = Subject.GetWorkflowInstance(workflowId).Result;
186186
current.ShouldBeEquivalentTo(newWorkflow);
187187
}
188+
189+
[Fact]
190+
public void PersistWorkflow_with_subscriptions()
191+
{
192+
var workflow = new WorkflowInstance
193+
{
194+
Data = new TestData { Value1 = 7 },
195+
Description = "My Description",
196+
Status = WorkflowStatus.Runnable,
197+
NextExecution = 0,
198+
Version = 1,
199+
WorkflowDefinitionId = "My Workflow",
200+
CreateTime = new DateTime(2000, 1, 1).ToUniversalTime(),
201+
ExecutionPointers = new ExecutionPointerCollection(),
202+
Reference = Guid.NewGuid().ToString()
203+
};
204+
205+
workflow.ExecutionPointers.Add(new ExecutionPointer
206+
{
207+
Id = Guid.NewGuid().ToString(),
208+
Active = true,
209+
StepId = 0,
210+
Scope = new List<string> { "1", "2", "3", "4" },
211+
EventName = "Event1"
212+
});
213+
214+
workflow.ExecutionPointers.Add(new ExecutionPointer
215+
{
216+
Id = Guid.NewGuid().ToString(),
217+
Active = true,
218+
StepId = 1,
219+
Scope = new List<string> { "1", "2", "3", "4" },
220+
EventName = "Event2",
221+
});
222+
223+
var workflowId = Subject.CreateNewWorkflow(workflow).Result;
224+
workflow.NextExecution = 0;
225+
226+
List<EventSubscription> subscriptions = new List<EventSubscription>();
227+
foreach (var pointer in workflow.ExecutionPointers)
228+
{
229+
var subscription = new EventSubscription()
230+
{
231+
WorkflowId = workflowId,
232+
StepId = pointer.StepId,
233+
ExecutionPointerId = pointer.Id,
234+
EventName = pointer.EventName,
235+
EventKey = workflowId,
236+
SubscribeAsOf = DateTime.UtcNow,
237+
SubscriptionData = "data"
238+
};
239+
240+
subscriptions.Add(subscription);
241+
}
242+
243+
Subject.PersistWorkflow(workflow, subscriptions).Wait();
244+
245+
var current = Subject.GetWorkflowInstance(workflowId).Result;
246+
current.ShouldBeEquivalentTo(workflow);
247+
248+
foreach (var pointer in workflow.ExecutionPointers)
249+
{
250+
subscriptions = Subject.GetSubscriptions(pointer.EventName, workflowId, DateTime.UtcNow).Result.ToList();
251+
subscriptions.Should().HaveCount(1);
252+
}
253+
}
188254

189255
[Fact]
190256
public void ConcurrentPersistWorkflow()

0 commit comments

Comments
 (0)