Skip to content

Commit 0f6e44d

Browse files
committed
Dispose on token cancellation request
1 parent 12b6e39 commit 0f6e44d

File tree

6 files changed

+458
-162
lines changed

6 files changed

+458
-162
lines changed

Diff for: src/EventStore.Client.PersistentSubscriptions/PersistentSubscription.cs

+83-37
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
using EventStore.Client.PersistentSubscriptions;
2-
using Grpc.Core;
31
using Microsoft.Extensions.Logging;
42

53
namespace EventStore.Client {
64
/// <summary>
75
/// Represents a persistent subscription connection.
86
/// </summary>
97
public class PersistentSubscription : IDisposable {
10-
private readonly EventStorePersistentSubscriptionsClient.PersistentSubscriptionResult _persistentSubscriptionResult;
8+
private readonly EventStorePersistentSubscriptionsClient.PersistentSubscriptionResult
9+
_persistentSubscriptionResult;
10+
1111
private readonly IAsyncEnumerator<PersistentSubscriptionMessage> _enumerator;
1212
private readonly Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> _eventAppeared;
1313
private readonly Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> _subscriptionDropped;
@@ -25,7 +25,8 @@ internal static async Task<PersistentSubscription> Confirm(
2525
EventStorePersistentSubscriptionsClient.PersistentSubscriptionResult persistentSubscriptionResult,
2626
Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> eventAppeared,
2727
Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> subscriptionDropped,
28-
ILogger log, UserCredentials? userCredentials, CancellationToken cancellationToken = default) {
28+
ILogger log, UserCredentials? userCredentials, CancellationToken cancellationToken = default
29+
) {
2930
var enumerator = persistentSubscriptionResult
3031
.Messages
3132
.GetAsyncEnumerator(cancellationToken);
@@ -34,11 +35,20 @@ internal static async Task<PersistentSubscription> Confirm(
3435

3536
return (result, enumerator.Current) switch {
3637
(true, PersistentSubscriptionMessage.SubscriptionConfirmation (var subscriptionId)) =>
37-
new PersistentSubscription(persistentSubscriptionResult, enumerator, subscriptionId, eventAppeared,
38-
subscriptionDropped, log, cancellationToken),
38+
new PersistentSubscription(
39+
persistentSubscriptionResult,
40+
enumerator,
41+
subscriptionId,
42+
eventAppeared,
43+
subscriptionDropped,
44+
log,
45+
cancellationToken
46+
),
3947
(true, PersistentSubscriptionMessage.NotFound) =>
40-
throw new PersistentSubscriptionNotFoundException(persistentSubscriptionResult.StreamName,
41-
persistentSubscriptionResult.GroupName),
48+
throw new PersistentSubscriptionNotFoundException(
49+
persistentSubscriptionResult.StreamName,
50+
persistentSubscriptionResult.GroupName
51+
),
4252
_ => throw new InvalidOperationException("Subscription could not be confirmed.")
4353
};
4454
}
@@ -49,14 +59,15 @@ private PersistentSubscription(
4959
IAsyncEnumerator<PersistentSubscriptionMessage> enumerator, string subscriptionId,
5060
Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> eventAppeared,
5161
Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> subscriptionDropped, ILogger log,
52-
CancellationToken cancellationToken) {
62+
CancellationToken cancellationToken
63+
) {
5364
_persistentSubscriptionResult = persistentSubscriptionResult;
54-
_enumerator = enumerator;
55-
SubscriptionId = subscriptionId;
56-
_eventAppeared = eventAppeared;
57-
_subscriptionDropped = subscriptionDropped;
58-
_log = log;
59-
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
65+
_enumerator = enumerator;
66+
SubscriptionId = subscriptionId;
67+
_eventAppeared = eventAppeared;
68+
_subscriptionDropped = subscriptionDropped;
69+
_log = log;
70+
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
6071

6172
Task.Run(Subscribe, _cts.Token);
6273
}
@@ -91,15 +102,15 @@ public Task Ack(params ResolvedEvent[] resolvedEvents) =>
91102
public Task Ack(IEnumerable<ResolvedEvent> resolvedEvents) =>
92103
Ack(resolvedEvents.Select(resolvedEvent => resolvedEvent.OriginalEvent.EventId));
93104

94-
95105
/// <summary>
96106
/// Acknowledge that a message has failed processing (this will tell the server it has not been processed).
97107
/// </summary>
98108
/// <param name="action">The <see cref="PersistentSubscriptionNakEventAction"/> to take.</param>
99109
/// <param name="reason">A reason given.</param>
100110
/// <param name="eventIds">The <see cref="Uuid"/> of the <see cref="ResolvedEvent" />s to nak. There should not be more than 2000 to nak at a time.</param>
101111
/// <exception cref="ArgumentException">The number of eventIds exceeded the limit of 2000.</exception>
102-
public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params Uuid[] eventIds) => NackInternal(eventIds, action, reason);
112+
public Task Nack(PersistentSubscriptionNakEventAction action, string reason, params Uuid[] eventIds) =>
113+
NackInternal(eventIds, action, reason);
103114

104115
/// <summary>
105116
/// Acknowledge that a message has failed processing (this will tell the server it has not been processed).
@@ -108,10 +119,15 @@ public Task Ack(IEnumerable<ResolvedEvent> resolvedEvents) =>
108119
/// <param name="reason">A reason given.</param>
109120
/// <param name="resolvedEvents">The <see cref="ResolvedEvent" />s to nak. There should not be more than 2000 to nak at a time.</param>
110121
/// <exception cref="ArgumentException">The number of resolvedEvents exceeded the limit of 2000.</exception>
111-
public Task Nack(PersistentSubscriptionNakEventAction action, string reason,
112-
params ResolvedEvent[] resolvedEvents) =>
113-
Nack(action, reason,
114-
Array.ConvertAll(resolvedEvents, resolvedEvent => resolvedEvent.OriginalEvent.EventId));
122+
public Task Nack(
123+
PersistentSubscriptionNakEventAction action, string reason,
124+
params ResolvedEvent[] resolvedEvents
125+
) =>
126+
Nack(
127+
action,
128+
reason,
129+
Array.ConvertAll(resolvedEvents, resolvedEvent => resolvedEvent.OriginalEvent.EventId)
130+
);
115131

116132
/// <inheritdoc />
117133
public void Dispose() => SubscriptionDropped(SubscriptionDroppedReason.Disposed);
@@ -121,64 +137,94 @@ private async Task Subscribe() {
121137

122138
try {
123139
while (await _enumerator.MoveNextAsync(_cts.Token).ConfigureAwait(false)) {
124-
if (_enumerator.Current is not PersistentSubscriptionMessage.Event(var resolvedEvent, var retryCount)) {
140+
if (_enumerator.Current is not
141+
PersistentSubscriptionMessage.Event(var resolvedEvent, var retryCount)) {
125142
continue;
126143
}
127144

128145
if (_enumerator.Current is PersistentSubscriptionMessage.NotFound) {
129146
if (_subscriptionDroppedInvoked != 0) {
130147
return;
131148
}
132-
SubscriptionDropped(SubscriptionDroppedReason.ServerError,
149+
150+
SubscriptionDropped(
151+
SubscriptionDroppedReason.ServerError,
133152
new PersistentSubscriptionNotFoundException(
134-
_persistentSubscriptionResult.StreamName, _persistentSubscriptionResult.GroupName));
153+
_persistentSubscriptionResult.StreamName,
154+
_persistentSubscriptionResult.GroupName
155+
)
156+
);
157+
135158
return;
136159
}
137-
160+
138161
_log.LogTrace(
139162
"Persistent Subscription {subscriptionId} received event {streamName}@{streamRevision} {position}",
140-
SubscriptionId, resolvedEvent.OriginalEvent.EventStreamId,
141-
resolvedEvent.OriginalEvent.EventNumber, resolvedEvent.OriginalEvent.Position);
163+
SubscriptionId,
164+
resolvedEvent.OriginalEvent.EventStreamId,
165+
resolvedEvent.OriginalEvent.EventNumber,
166+
resolvedEvent.OriginalEvent.Position
167+
);
142168

143169
try {
144170
await _eventAppeared(
145171
this,
146172
resolvedEvent,
147173
retryCount,
148-
_cts.Token).ConfigureAwait(false);
174+
_cts.Token
175+
).ConfigureAwait(false);
149176
} catch (Exception ex) when (ex is ObjectDisposedException or OperationCanceledException) {
150177
if (_subscriptionDroppedInvoked != 0) {
151178
return;
152179
}
153180

154-
_log.LogWarning(ex,
181+
_log.LogWarning(
182+
ex,
155183
"Persistent Subscription {subscriptionId} was dropped because cancellation was requested by another caller.",
156-
SubscriptionId);
184+
SubscriptionId
185+
);
157186

158187
SubscriptionDropped(SubscriptionDroppedReason.Disposed);
159188

160189
return;
161190
} catch (Exception ex) {
162-
_log.LogError(ex,
191+
_log.LogError(
192+
ex,
163193
"Persistent Subscription {subscriptionId} was dropped because the subscriber made an error.",
164-
SubscriptionId);
194+
SubscriptionId
195+
);
196+
165197
SubscriptionDropped(SubscriptionDroppedReason.SubscriberError, ex);
166198

167199
return;
168200
}
169201
}
170202
} catch (Exception ex) {
171203
if (_subscriptionDroppedInvoked == 0) {
172-
_log.LogError(ex,
173-
"Persistent Subscription {subscriptionId} was dropped because an error occurred on the server.",
174-
SubscriptionId);
175-
SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex);
204+
if (_cts.Token.IsCancellationRequested) {
205+
_log.LogInformation(
206+
"Subscription {subscriptionId} was dropped because cancellation was requested.",
207+
SubscriptionId
208+
);
209+
210+
SubscriptionDropped(SubscriptionDroppedReason.Disposed, ex);
211+
} else {
212+
_log.LogError(
213+
ex,
214+
"Persistent Subscription {subscriptionId} was dropped because an error occurred on the server.",
215+
SubscriptionId
216+
);
217+
218+
SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex);
219+
}
176220
}
177221
} finally {
178222
if (_subscriptionDroppedInvoked == 0) {
179223
_log.LogError(
180224
"Persistent Subscription {subscriptionId} was unexpectedly terminated.",
181-
SubscriptionId);
225+
SubscriptionId
226+
);
227+
182228
SubscriptionDropped(SubscriptionDroppedReason.ServerError);
183229
}
184230
}

Diff for: src/EventStore.Client.Streams/StreamSubscription.cs

+15-6
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,22 @@ await _checkpointReached(this, position, _cts.Token)
135135
SubscriptionDropped(SubscriptionDroppedReason.Disposed, ex);
136136
} catch (Exception ex) {
137137
if (_subscriptionDroppedInvoked == 0) {
138-
_log.LogError(
139-
ex,
140-
"Subscription {subscriptionId} was dropped because an error occurred on the server.",
141-
SubscriptionId
142-
);
138+
if (_cts.IsCancellationRequested) {
139+
_log.LogInformation(
140+
"Subscription {subscriptionId} was dropped because cancellation was requested by the client.",
141+
SubscriptionId
142+
);
143+
144+
SubscriptionDropped(SubscriptionDroppedReason.Disposed, ex);
145+
} else {
146+
_log.LogError(
147+
ex,
148+
"Subscription {subscriptionId} was dropped because an error occurred on the server.",
149+
SubscriptionId
150+
);
143151

144-
SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex);
152+
SubscriptionDropped(SubscriptionDroppedReason.ServerError, ex);
153+
}
145154
}
146155
}
147156
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// ReSharper disable InconsistentNaming
2+
3+
using EventStore.Client.Streams.Tests.Subscriptions;
4+
5+
namespace EventStore.Client.PersistentSubscriptions.Tests.SubscriptionToAll.Obsolete;
6+
7+
public class
8+
PersistentSubscriptionDropsDueToCancellationToken(PersistentSubscriptionDropsDueToCancellationToken.Fixture fixture)
9+
: IClassFixture<
10+
PersistentSubscriptionDropsDueToCancellationToken.Fixture> {
11+
static readonly string Group = Guid.NewGuid().ToString();
12+
static readonly string Stream = Guid.NewGuid().ToString();
13+
14+
[SupportsPSToAll.Fact]
15+
public async Task persistent_subscription_to_all_drops_due_to_cancellation_token() {
16+
var subscriptionDropped = new TaskCompletionSource<SubscriptionDroppedResult>();
17+
18+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
19+
20+
await fixture.Client.CreateToAllAsync(
21+
Group,
22+
cancellationToken: cts.Token,
23+
settings: new PersistentSubscriptionSettings()
24+
);
25+
26+
using var subscription = await fixture.Client.SubscribeToAllAsync(
27+
Group,
28+
async (s, e, r, ct) => await s.Ack(e),
29+
(sub, reason, ex) => subscriptionDropped.SetResult(new SubscriptionDroppedResult(reason, ex)),
30+
userCredentials: TestCredentials.Root,
31+
cancellationToken: cts.Token
32+
)
33+
.WithTimeout();
34+
35+
// wait until the cancellation token cancels
36+
await Task.Delay(TimeSpan.FromSeconds(3));
37+
38+
var result = await subscriptionDropped.Task.WithTimeout();
39+
result.Reason.ShouldBe(SubscriptionDroppedReason.Disposed);
40+
}
41+
42+
[SupportsPSToAll.Fact]
43+
public async Task persistent_subscription_to_stream_drops_due_to_cancellation_token() {
44+
var subscriptionDropped = new TaskCompletionSource<SubscriptionDroppedResult>();
45+
46+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
47+
48+
await fixture.Client.CreateToStreamAsync(
49+
Group,
50+
Stream,
51+
cancellationToken: cts.Token,
52+
settings: new PersistentSubscriptionSettings()
53+
);
54+
55+
using var subscription = await fixture.Client.SubscribeToStreamAsync(
56+
Group,
57+
Stream,
58+
async (s, e, r, ct) => await s.Ack(e),
59+
(sub, reason, ex) => subscriptionDropped.SetResult(new SubscriptionDroppedResult(reason, ex)),
60+
userCredentials: TestCredentials.Root,
61+
cancellationToken: cts.Token
62+
)
63+
.WithTimeout();
64+
65+
// wait until the cancellation token cancels
66+
await Task.Delay(TimeSpan.FromSeconds(3));
67+
68+
var result = await subscriptionDropped.Task.WithTimeout();
69+
result.Reason.ShouldBe(SubscriptionDroppedReason.Disposed);
70+
}
71+
72+
public class Fixture : EventStoreClientFixture {
73+
protected override Task Given() {
74+
return Task.CompletedTask;
75+
}
76+
77+
protected override Task When() => Task.CompletedTask;
78+
}
79+
}

0 commit comments

Comments
 (0)