Skip to content

Commit 7b1b13c

Browse files
committed
[DEVEX-227] Moved code responsible for GettingState to a dedicated file
1 parent f983649 commit 7b1b13c

File tree

4 files changed

+213
-192
lines changed

4 files changed

+213
-192
lines changed

Diff for: src/Kurrent.Client/Streams/DecisionMaking/AggregateStore.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ Task<IWriteResult> HandleAsync(
3333
);
3434
}
3535

36+
public interface IAggregateStore<TAggregate> : IAggregateStore<TAggregate, object>
37+
where TAggregate : IAggregate<object>;
38+
3639
public static class AggregateStoreExtensions {
3740
public static Task<IWriteResult> AddAsync<TAggregate, TEvent>(
3841
this IAggregateStore<TAggregate, TEvent> aggregateStore,
@@ -94,9 +97,6 @@ public static Task<IWriteResult> HandleAsync<TAggregate, TEvent>(
9497
);
9598
}
9699

97-
public interface IAggregateStore<TAggregate> : IAggregateStore<TAggregate, object>
98-
where TAggregate : IAggregate<object>;
99-
100100
public class AggregateStoreOptions<TState> where TState : notnull {
101101
#if NET48
102102
public IStateBuilder<TState> StateBuilder { get; set; } = null!;

Diff for: src/Kurrent.Client/Streams/GettingState/GetState.cs

+174
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
using EventStore.Client;
2+
3+
namespace Kurrent.Client.Streams.GettingState;
4+
5+
public record GetStateOptions<TState> where TState : notnull {
6+
public StateAtPointInTime<TState>? CurrentState { get; set; }
7+
}
8+
9+
public class GetStreamStateOptions<TState> : ReadStreamOptions where TState : notnull {
10+
public GetSnapshot<TState>? GetSnapshot { get; set; }
11+
}
12+
13+
public delegate ValueTask<StateAtPointInTime<TState>> GetSnapshot<TState>(
14+
GetSnapshotOptions options,
15+
CancellationToken ct = default
16+
) where TState : notnull;
17+
18+
public record GetSnapshotOptions {
19+
public string? StreamName { get; set; }
20+
21+
public string? SnapshotVersion { get; set; }
22+
23+
public static GetSnapshotOptions ForStream(string streamName) =>
24+
new GetSnapshotOptions { StreamName = streamName };
25+
26+
public static GetSnapshotOptions ForAll() =>
27+
new GetSnapshotOptions();
28+
}
29+
30+
public static class KurrentClientGettingStateClientExtensions {
31+
public static async Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
32+
this KurrentClient eventStore,
33+
string streamName,
34+
IStateBuilder<TState> stateBuilder,
35+
GetStreamStateOptions<TState>? options,
36+
CancellationToken ct = default
37+
) where TState : notnull {
38+
StateAtPointInTime<TState>? stateAtPointInTime = null;
39+
40+
options ??= new GetStreamStateOptions<TState>();
41+
42+
if (options.GetSnapshot != null)
43+
stateAtPointInTime = await options.GetSnapshot(
44+
GetSnapshotOptions.ForStream(streamName),
45+
ct
46+
);
47+
48+
options.StreamPosition = stateAtPointInTime?.LastStreamPosition ?? StreamPosition.Start;
49+
50+
return await eventStore.ReadStreamAsync(streamName, options, ct)
51+
.GetStateAsync(stateBuilder, ct);
52+
}
53+
54+
public static async Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
55+
this IAsyncEnumerable<ResolvedEvent> messages,
56+
TState initialState,
57+
Func<TState, ResolvedEvent, TState> evolve,
58+
CancellationToken ct
59+
) where TState : notnull {
60+
var state = initialState;
61+
62+
if (messages is KurrentClient.ReadStreamResult readStreamResult) {
63+
if (await readStreamResult.ReadState.ConfigureAwait(false) == ReadState.StreamNotFound)
64+
return new StateAtPointInTime<TState>(state);
65+
}
66+
67+
ResolvedEvent? lastEvent = null;
68+
69+
await foreach (var resolvedEvent in messages.WithCancellation(ct)) {
70+
lastEvent = resolvedEvent;
71+
72+
state = evolve(state, resolvedEvent);
73+
}
74+
75+
return new StateAtPointInTime<TState>(state, lastEvent?.Event.EventNumber, lastEvent?.Event.Position);
76+
}
77+
78+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
79+
this KurrentClient eventStore,
80+
string streamName,
81+
IStateBuilder<TState> streamStateBuilder,
82+
CancellationToken ct = default
83+
) where TState : notnull =>
84+
eventStore.GetStateAsync(streamName, streamStateBuilder, new GetStreamStateOptions<TState>(), ct);
85+
86+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState, TEvent>(
87+
this KurrentClient eventStore,
88+
string streamName,
89+
GetStreamStateOptions<TState> options,
90+
CancellationToken ct = default
91+
) where TState : IState<TEvent>, new() =>
92+
eventStore.GetStateAsync(
93+
streamName,
94+
StateBuilder.For<TState, TEvent>(),
95+
options,
96+
ct
97+
);
98+
99+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState, TEvent>(
100+
this KurrentClient eventStore,
101+
string streamName,
102+
CancellationToken ct = default
103+
) where TState : IState<TEvent>, new() =>
104+
eventStore.GetStateAsync<TState, TEvent>(streamName, new GetStreamStateOptions<TState>(), ct);
105+
106+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
107+
this KurrentClient eventStore,
108+
string streamName,
109+
CancellationToken ct = default
110+
) where TState : IState<object>, new() =>
111+
eventStore.GetStateAsync<TState, object>(streamName, new GetStreamStateOptions<TState>(), ct);
112+
}
113+
114+
public static class KurrentClientGettingStateReadAndSubscribeExtensions {
115+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
116+
this KurrentClient.ReadStreamResult readStreamResult,
117+
IStateBuilder<TState> stateBuilder,
118+
GetStateOptions<TState> options,
119+
CancellationToken ct = default
120+
) where TState : notnull =>
121+
stateBuilder.GetAsync(readStreamResult, options, ct);
122+
123+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
124+
this KurrentClient.ReadStreamResult readStreamResult,
125+
IStateBuilder<TState> stateBuilder,
126+
CancellationToken ct = default
127+
) where TState : notnull =>
128+
stateBuilder.GetAsync(readStreamResult, new GetStateOptions<TState>(), ct);
129+
130+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
131+
this KurrentClient.ReadAllStreamResult readAllStreamResult,
132+
IStateBuilder<TState> stateBuilder,
133+
GetStateOptions<TState> options,
134+
CancellationToken ct = default
135+
) where TState : notnull =>
136+
stateBuilder.GetAsync(readAllStreamResult, options, ct);
137+
138+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
139+
this KurrentClient.ReadAllStreamResult readAllStreamResult,
140+
IStateBuilder<TState> stateBuilder,
141+
CancellationToken ct = default
142+
) where TState : notnull =>
143+
stateBuilder.GetAsync(readAllStreamResult, new GetStateOptions<TState>(), ct);
144+
145+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
146+
this KurrentClient.StreamSubscriptionResult subscriptionResult,
147+
IStateBuilder<TState> stateBuilder,
148+
GetStateOptions<TState> options,
149+
CancellationToken ct = default
150+
) where TState : notnull =>
151+
stateBuilder.GetAsync(subscriptionResult, options, ct);
152+
153+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
154+
this KurrentClient.StreamSubscriptionResult subscriptionResult,
155+
IStateBuilder<TState> stateBuilder,
156+
CancellationToken ct = default
157+
) where TState : notnull =>
158+
stateBuilder.GetAsync(subscriptionResult, new GetStateOptions<TState>(), ct);
159+
160+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
161+
this KurrentPersistentSubscriptionsClient.PersistentSubscriptionResult subscriptionResult,
162+
IStateBuilder<TState> stateBuilder,
163+
GetStateOptions<TState> options,
164+
CancellationToken ct = default
165+
) where TState : notnull =>
166+
stateBuilder.GetAsync(subscriptionResult, options, ct);
167+
168+
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
169+
this KurrentPersistentSubscriptionsClient.PersistentSubscriptionResult subscriptionResult,
170+
IStateBuilder<TState> stateBuilder,
171+
CancellationToken ct = default
172+
) where TState : notnull =>
173+
stateBuilder.GetAsync(subscriptionResult, new GetStateOptions<TState>(), ct);
174+
}

Diff for: src/Kurrent.Client/Streams/GettingState/ProjectState.cs

+36-17
Original file line numberDiff line numberDiff line change
@@ -5,34 +5,27 @@ namespace Kurrent.Client.Streams.GettingState;
55

66
public class ProjectStateOptions<TState> {
77
public Func<ResolvedEvent, string>? GetProjectedId { get; set; }
8-
8+
99
public IStateCache<TState>? StateCache { get; set; }
1010
}
1111

1212
public static class KurrentClientProjectStateExtensions {
1313
public static async IAsyncEnumerable<StateAtPointInTime<TState>> ProjectState<TState>(
1414
this IAsyncEnumerable<ResolvedEvent> messages,
15-
TState initialState,
1615
Func<TState, ResolvedEvent, TState> evolve,
16+
Func<ResolvedEvent, CancellationToken, ValueTask<TState>> getInitialState,
1717
ProjectStateOptions<TState>? options,
1818
[EnumeratorCancellation] CancellationToken ct
1919
) where TState : notnull {
20-
if (messages is KurrentClient.ReadStreamResult readStreamResult) {
21-
if (await readStreamResult.ReadState.ConfigureAwait(false) == ReadState.StreamNotFound) {
22-
yield return new StateAtPointInTime<TState>(initialState);
23-
24-
yield break;
25-
}
26-
}
27-
2820
var getProjectedId = options?.GetProjectedId ?? (resolvedEvent => resolvedEvent.OriginalStreamId);
2921
var stateCache = options?.StateCache ?? new DictionaryStateCache<TState>();
3022

3123
await foreach (var resolvedEvent in messages.WithCancellation(ct)) {
32-
var projectedId = getProjectedId(resolvedEvent);
33-
24+
var projectedId = getProjectedId(resolvedEvent);
25+
var initialState = await getInitialState(resolvedEvent, ct);
26+
3427
var state = await stateCache.GetValueOrDefaultAsync(projectedId, initialState, ct).ConfigureAwait(false);
35-
28+
3629
state = evolve(state, resolvedEvent);
3730

3831
await stateCache.SetValueAsync(projectedId, state, ct).ConfigureAwait(false);
@@ -47,13 +40,39 @@ [EnumeratorCancellation] CancellationToken ct
4740

4841
public static IAsyncEnumerable<StateAtPointInTime<TState>> ProjectState<TState>(
4942
this IAsyncEnumerable<ResolvedEvent> messages,
50-
TState initialState,
5143
Func<TState, ResolvedEvent, TState> evolve,
44+
Func<ResolvedEvent, TState> getInitialState,
45+
ProjectStateOptions<TState>? options,
5246
CancellationToken ct
5347
) where TState : notnull =>
54-
messages.ProjectState(initialState, evolve, null, ct);
55-
}
48+
messages.ProjectState(
49+
evolve,
50+
(resolvedEvent, _) => new ValueTask<TState>(getInitialState(resolvedEvent)),
51+
options,
52+
ct
53+
);
54+
55+
public static IAsyncEnumerable<StateAtPointInTime<TState>> ProjectState<TState>(
56+
this IAsyncEnumerable<ResolvedEvent> messages,
57+
Func<TState, ResolvedEvent, TState> evolve,
58+
Func<ResolvedEvent, TState> getInitialState,
59+
CancellationToken ct
60+
) where TState : notnull =>
61+
messages.ProjectState(
62+
evolve,
63+
(resolvedEvent, _) => new ValueTask<TState>(getInitialState(resolvedEvent)),
64+
null,
65+
ct
66+
);
5667

68+
public static IAsyncEnumerable<StateAtPointInTime<TState>> ProjectState<TState>(
69+
this IAsyncEnumerable<ResolvedEvent> messages,
70+
Func<TState, ResolvedEvent, TState> evolve,
71+
Func<ResolvedEvent, CancellationToken, ValueTask<TState>> getInitialState,
72+
CancellationToken ct
73+
) where TState : notnull =>
74+
messages.ProjectState(evolve, getInitialState, null, ct);
75+
}
5776

5877
public interface IStateCache<TState> {
5978
public ValueTask<TState> GetValueOrDefaultAsync(string key, TState defaultValue, CancellationToken ct = default);
@@ -66,7 +85,7 @@ public class DictionaryStateCache<TState> : IStateCache<TState> {
6685

6786
public ValueTask<TState> GetValueOrDefaultAsync(string key, TState defaultValue, CancellationToken ct = default) {
6887
#if NET48
69-
var state = _states.TryGetValue(key, out TState? value) ? value : defaultValue;
88+
var state = _states.TryGetValue(key, out TState? value) ? value : defaultValue;
7089
#else
7190
var state = _states.GetValueOrDefault(key, defaultValue);
7291
#endif

0 commit comments

Comments
 (0)