Skip to content

Commit 1173149

Browse files
authored
DEV-125 - Fix partial append on error append to stream (#283)
* Fix partial append on error append to stream * Catch for non rpc errors * Add trace log back and remove redundant catch for non rpc exceptions * Do nothing with rpc exception * Move RequestStream.CompleteAsync in try block * Log exceptions in catch and refactor * Update `GetVersion` to account pre-release tag * Add interceptor on request stream
1 parent a0c283b commit 1173149

File tree

3 files changed

+143
-95
lines changed

3 files changed

+143
-95
lines changed

src/EventStore.Client.Streams/EventStoreClient.Append.cs

Lines changed: 103 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -98,103 +98,126 @@ private async ValueTask<IWriteResult> AppendToStreamInternal(
9898
EventStoreClientOperationOptions operationOptions,
9999
TimeSpan? deadline,
100100
UserCredentials? userCredentials,
101-
CancellationToken cancellationToken) {
101+
CancellationToken cancellationToken
102+
) {
103+
using var call = new Streams.Streams.StreamsClient(callInvoker).Append(
104+
EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken)
105+
);
102106

103-
using var call = new Streams.Streams.StreamsClient(
104-
callInvoker).Append(EventStoreCallOptions.CreateNonStreaming(
105-
Settings, deadline, userCredentials, cancellationToken));
107+
await call.RequestStream.WriteAsync(header).ConfigureAwait(false);
106108

107-
IWriteResult writeResult;
108-
try {
109-
await call.RequestStream.WriteAsync(header).ConfigureAwait(false);
110-
111-
foreach (var e in eventData) {
112-
_log.LogTrace("Appending event to stream - {streamName}@{eventId} {eventType}.",
113-
header.Options.StreamIdentifier, e.EventId, e.Type);
114-
await call.RequestStream.WriteAsync(new AppendReq {
109+
foreach (var e in eventData) {
110+
await call.RequestStream.WriteAsync(
111+
new AppendReq {
115112
ProposedMessage = new AppendReq.Types.ProposedMessage {
116113
Id = e.EventId.ToDto(),
117114
Data = ByteString.CopyFrom(e.Data.Span),
118115
CustomMetadata = ByteString.CopyFrom(e.Metadata.Span),
119116
Metadata = {
120-
{Constants.Metadata.Type, e.Type},
121-
{Constants.Metadata.ContentType, e.ContentType}
117+
{ Constants.Metadata.Type, e.Type },
118+
{ Constants.Metadata.ContentType, e.ContentType }
122119
}
123-
}
124-
}).ConfigureAwait(false);
125-
}
126-
} finally {
127-
await call.RequestStream.CompleteAsync().ConfigureAwait(false);
120+
},
121+
}
122+
).ConfigureAwait(false);
123+
}
128124

129-
var response = await call.ResponseAsync.ConfigureAwait(false);
130-
131-
if (response.Success != null) {
132-
writeResult = new SuccessResult(response.Success.CurrentRevisionOptionCase ==
133-
AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
134-
? StreamRevision.None
135-
: new StreamRevision(response.Success.CurrentRevision),
136-
response.Success.PositionOptionCase == AppendResp.Types.Success.PositionOptionOneofCase.Position
137-
? new Position(response.Success.Position.CommitPosition,
138-
response.Success.Position.PreparePosition)
139-
: default);
140-
_log.LogDebug("Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.",
141-
header.Options.StreamIdentifier, writeResult.LogPosition, writeResult.NextExpectedStreamRevision);
142-
} else {
143-
if (response.WrongExpectedVersion != null) {
144-
var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase switch {
145-
AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase.CurrentNoStream =>
146-
StreamRevision.None,
147-
_ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
148-
};
125+
await call.RequestStream.CompleteAsync().ConfigureAwait(false);
149126

150-
_log.LogDebug(
151-
"Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}",
152-
header.Options.StreamIdentifier, new StreamRevision(header.Options.Revision),
153-
actualStreamRevision);
154-
155-
if (operationOptions.ThrowOnAppendFailure) {
156-
if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
157-
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) {
158-
throw new WrongExpectedVersionException(header.Options.StreamIdentifier!,
159-
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
160-
actualStreamRevision);
161-
}
127+
var response = await call.ResponseAsync.ConfigureAwait(false);
162128

163-
var expectedStreamState = response.WrongExpectedVersion.ExpectedRevisionOptionCase switch {
164-
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedAny =>
165-
StreamState.Any,
166-
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedNoStream =>
167-
StreamState.NoStream,
168-
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedStreamExists =>
169-
StreamState.StreamExists,
170-
_ => StreamState.Any
171-
};
172-
173-
throw new WrongExpectedVersionException(header.Options.StreamIdentifier!,
174-
expectedStreamState, actualStreamRevision);
175-
}
129+
if (response.Success != null)
130+
return HandleSuccessAppend(response, header);
176131

177-
if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
178-
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase.ExpectedRevision) {
179-
writeResult = new WrongExpectedVersionResult(header.Options.StreamIdentifier!,
180-
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
181-
actualStreamRevision);
182-
} else {
183-
writeResult = new WrongExpectedVersionResult(header.Options.StreamIdentifier!,
184-
StreamRevision.None,
185-
actualStreamRevision);
186-
}
132+
if (response.WrongExpectedVersion == null)
133+
throw new InvalidOperationException("The operation completed with an unexpected result.");
187134

188-
} else {
189-
throw new InvalidOperationException("The operation completed with an unexpected result.");
190-
}
135+
return HandleWrongExpectedRevision(response, header, operationOptions);
136+
}
137+
138+
private IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) {
139+
var currentRevision = response.Success.CurrentRevisionOptionCase ==
140+
AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
141+
? StreamRevision.None
142+
: new StreamRevision(response.Success.CurrentRevision);
143+
144+
var position = response.Success.PositionOptionCase ==
145+
AppendResp.Types.Success.PositionOptionOneofCase.Position
146+
? new Position(response.Success.Position.CommitPosition, response.Success.Position.PreparePosition)
147+
: default;
148+
149+
_log.LogDebug(
150+
"Append to stream succeeded - {streamName}@{logPosition}/{nextExpectedVersion}.",
151+
header.Options.StreamIdentifier,
152+
position,
153+
currentRevision);
154+
155+
return new SuccessResult(currentRevision, position);
156+
}
157+
158+
private IWriteResult HandleWrongExpectedRevision(
159+
AppendResp response, AppendReq header, EventStoreClientOperationOptions operationOptions
160+
) {
161+
var actualStreamRevision =
162+
response.WrongExpectedVersion.CurrentRevisionOptionCase switch {
163+
AppendResp.Types.WrongExpectedVersion.CurrentRevisionOptionOneofCase
164+
.CurrentNoStream =>
165+
StreamRevision.None,
166+
_ => new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
167+
};
168+
169+
_log.LogDebug(
170+
"Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}",
171+
header.Options.StreamIdentifier,
172+
new StreamRevision(header.Options.Revision),
173+
actualStreamRevision
174+
);
175+
176+
if (operationOptions.ThrowOnAppendFailure) {
177+
if (response.WrongExpectedVersion.ExpectedRevisionOptionCase == AppendResp.Types
178+
.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
179+
.ExpectedRevision) {
180+
throw new WrongExpectedVersionException(
181+
header.Options.StreamIdentifier!,
182+
new StreamRevision(response.WrongExpectedVersion.ExpectedRevision),
183+
actualStreamRevision
184+
);
191185
}
186+
187+
var expectedStreamState =
188+
response.WrongExpectedVersion.ExpectedRevisionOptionCase switch {
189+
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
190+
.ExpectedAny =>
191+
StreamState.Any,
192+
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
193+
.ExpectedNoStream =>
194+
StreamState.NoStream,
195+
AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
196+
.ExpectedStreamExists =>
197+
StreamState.StreamExists,
198+
_ => StreamState.Any
199+
};
200+
201+
throw new WrongExpectedVersionException(
202+
header.Options.StreamIdentifier!,
203+
expectedStreamState,
204+
actualStreamRevision
205+
);
192206
}
193207

194-
return writeResult;
208+
var expectedRevision = response.WrongExpectedVersion.ExpectedRevisionOptionCase
209+
== AppendResp.Types.WrongExpectedVersion.ExpectedRevisionOptionOneofCase
210+
.ExpectedRevision
211+
? new StreamRevision(response.WrongExpectedVersion.ExpectedRevision)
212+
: StreamRevision.None;
213+
214+
return new WrongExpectedVersionResult(
215+
header.Options.StreamIdentifier!,
216+
expectedRevision,
217+
actualStreamRevision
218+
);
195219
}
196220

197-
198221
private class StreamAppender : IDisposable {
199222
private readonly EventStoreClientSettings _settings;
200223
private readonly CancellationToken _cancellationToken;
@@ -355,4 +378,4 @@ private static async IAsyncEnumerable<T> ReadAllAsync<T>(ChannelReader<T> reader
355378
#endif
356379
}
357380
}
358-
}
381+
}

src/EventStore.Client/Interceptors/TypedExceptionInterceptor.cs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using System.Diagnostics.CodeAnalysis;
21
using Grpc.Core;
32
using Grpc.Core.Interceptors;
43
using static EventStore.Client.Constants;
@@ -55,7 +54,7 @@ AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation
5554
var response = continuation(context);
5655

5756
return new AsyncClientStreamingCall<TRequest, TResponse>(
58-
response.RequestStream,
57+
response.RequestStream.Apply(ConvertRpcException),
5958
response.ResponseAsync.Apply(ConvertRpcException),
6059
response.ResponseHeadersAsync,
6160
response.GetStatus,
@@ -103,7 +102,15 @@ public static IAsyncStreamReader<TRequest> Apply<TRequest>(this IAsyncStreamRead
103102

104103
public static Task<TResponse> Apply<TResponse>(this Task<TResponse> task, Func<RpcException, Exception> convertException) =>
105104
task.ContinueWith(t => t.Exception?.InnerException is RpcException ex ? throw convertException(ex) : t.Result);
106-
105+
106+
public static IClientStreamWriter<TRequest> Apply<TRequest>(
107+
this IClientStreamWriter<TRequest> writer, Func<RpcException, Exception> convertException
108+
) =>
109+
new ExceptionConverterStreamWriter<TRequest>(writer, convertException);
110+
111+
public static Task Apply(this Task task, Func<RpcException, Exception> convertException) =>
112+
task.ContinueWith(t => t.Exception?.InnerException is RpcException ex ? throw convertException(ex) : t);
113+
107114
public static AccessDeniedException ToAccessDeniedException(this RpcException exception) =>
108115
new(exception.Message, exception);
109116

@@ -142,3 +149,17 @@ public async Task<bool> MoveNext(CancellationToken cancellationToken) {
142149
}
143150
}
144151
}
152+
153+
class ExceptionConverterStreamWriter<TRequest>(
154+
IClientStreamWriter<TRequest> writer,
155+
Func<RpcException, Exception> convertException
156+
)
157+
: IClientStreamWriter<TRequest> {
158+
public WriteOptions? WriteOptions {
159+
get => writer.WriteOptions;
160+
set => writer.WriteOptions = value;
161+
}
162+
163+
public Task WriteAsync(TRequest message) => writer.WriteAsync(message).Apply(convertException);
164+
public Task CompleteAsync() => writer.CompleteAsync().Apply(convertException);
165+
}

test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System.Text;
12
using Grpc.Core;
23

34
namespace EventStore.Client.Streams.Tests.Append;
@@ -438,38 +439,41 @@ public async Task with_timeout_stream_revision_fails_when_operation_expired() {
438439

439440
ex.StatusCode.ShouldBe(StatusCode.DeadlineExceeded);
440441
}
441-
442+
442443
[Fact]
443444
public async Task when_events_enumerator_throws_the_write_does_not_succeed() {
444445
var streamName = Fixture.GetStreamName();
445446

446447
await Fixture.Streams
447-
.AppendToStreamAsync(streamName, StreamRevision.None, GetEvents())
448+
.AppendToStreamAsync(
449+
streamName,
450+
StreamRevision.None,
451+
GetEvents(),
452+
userCredentials: new UserCredentials(TestCredentials.Root.Username!, TestCredentials.Root.Password!)
453+
)
448454
.ShouldThrowAsync<EnumerationFailedException>();
449455

450-
var result = Fixture.Streams.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start);
451-
452-
var state = await result.ReadState;
456+
var state = await Fixture.Streams.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start)
457+
.ReadState;
453458

454459
state.ShouldBe(ReadState.StreamNotFound);
455-
460+
456461
return;
457462

458463
IEnumerable<EventData> GetEvents() {
459-
var i = 0;
460-
foreach (var evt in Fixture.CreateTestEvents(5)) {
461-
if (i++ % 3 == 0)
464+
for (var i = 0; i < 5; i++) {
465+
if (i % 3 == 0)
462466
throw new EnumerationFailedException();
463467

464-
yield return evt;
468+
yield return Fixture.CreateTestEvents(1).First();
465469
}
466470
}
467471
}
468472

469473
class EnumerationFailedException : Exception { }
470-
474+
471475
public static IEnumerable<object?[]> ArgumentOutOfRangeTestCases() {
472476
yield return new object?[] { StreamState.Any };
473477
yield return new object?[] { ulong.MaxValue - 1UL };
474478
}
475-
}
479+
}

0 commit comments

Comments
 (0)