Skip to content

Commit dd8cf4c

Browse files
committed
[DEVEX-227] Updated ProjectState to keep dictionary of states instead of the single state
That gives possibility to cache multiple single stream projection states instead of rolling always a single one
1 parent 3644406 commit dd8cf4c

File tree

1 file changed

+45
-26
lines changed

1 file changed

+45
-26
lines changed

src/Kurrent.Client/Streams/GettingState/StateBuilder.cs

Lines changed: 45 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,29 @@ public class GetStreamStateOptions<TState> : ReadStreamOptions where TState : no
150150
}
151151

152152
public static class KurrentClientGettingStateClientExtensions {
153+
public static async Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
154+
this KurrentClient eventStore,
155+
string streamName,
156+
IStateBuilder<TState> stateBuilder,
157+
GetStreamStateOptions<TState>? options,
158+
CancellationToken ct = default
159+
) where TState : notnull {
160+
StateAtPointInTime<TState>? stateAtPointInTime = null;
161+
162+
options ??= new GetStreamStateOptions<TState>();
163+
164+
if (options.GetSnapshot != null)
165+
stateAtPointInTime = await options.GetSnapshot(
166+
GetSnapshotOptions.ForStream(streamName),
167+
ct
168+
);
169+
170+
options.StreamPosition = stateAtPointInTime?.LastStreamPosition ?? StreamPosition.Start;
171+
172+
return await eventStore.ReadStreamAsync(streamName, options, ct)
173+
.GetStateAsync(stateBuilder, ct);
174+
}
175+
153176
public static async Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
154177
this IAsyncEnumerable<ResolvedEvent> messages,
155178
TState initialState,
@@ -178,21 +201,33 @@ public static async IAsyncEnumerable<StateAtPointInTime<TState>> ProjectState<TS
178201
this IAsyncEnumerable<ResolvedEvent> messages,
179202
TState initialState,
180203
Func<TState, ResolvedEvent, TState> evolve,
204+
Func<ResolvedEvent, string>? getProjectedId,
181205
[EnumeratorCancellation] CancellationToken ct
182206
) where TState : notnull {
183-
var state = initialState;
184-
185207
if (messages is KurrentClient.ReadStreamResult readStreamResult) {
186208
if (await readStreamResult.ReadState.ConfigureAwait(false) == ReadState.StreamNotFound) {
187-
yield return new StateAtPointInTime<TState>(state);
209+
yield return new StateAtPointInTime<TState>(initialState);
188210

189211
yield break;
190212
}
191213
}
192214

215+
var states = new Dictionary<string, TState>();
216+
217+
getProjectedId ??= resolvedEvent => resolvedEvent.OriginalStreamId;
218+
193219
await foreach (var resolvedEvent in messages.WithCancellation(ct)) {
220+
var projectedId = getProjectedId(resolvedEvent);
221+
#if NET48
222+
var state = states.TryGetValue(projectedId, out TState? value) ? value : initialState;
223+
#else
224+
var state = states.GetValueOrDefault(projectedId, initialState);
225+
#endif
226+
194227
state = evolve(state, resolvedEvent);
195228

229+
states[projectedId] = state;
230+
196231
yield return new StateAtPointInTime<TState>(
197232
state,
198233
resolvedEvent.Event.EventNumber,
@@ -201,29 +236,13 @@ [EnumeratorCancellation] CancellationToken ct
201236
}
202237
}
203238

204-
public static async Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
205-
this KurrentClient eventStore,
206-
string streamName,
207-
IStateBuilder<TState> stateBuilder,
208-
GetStreamStateOptions<TState>? options,
209-
CancellationToken ct = default
210-
) where TState : notnull {
211-
StateAtPointInTime<TState>? stateAtPointInTime = null;
212-
213-
options ??= new GetStreamStateOptions<TState> ();
214-
215-
if (options.GetSnapshot != null) {
216-
stateAtPointInTime = await options.GetSnapshot(
217-
GetSnapshotOptions.ForStream(streamName),
218-
ct
219-
);
220-
}
221-
222-
options.StreamPosition = stateAtPointInTime?.LastStreamPosition ?? StreamPosition.Start;
223-
224-
return await eventStore.ReadStreamAsync(streamName, options, ct)
225-
.GetStateAsync(stateBuilder, ct);
226-
}
239+
public static IAsyncEnumerable<StateAtPointInTime<TState>> ProjectState<TState>(
240+
this IAsyncEnumerable<ResolvedEvent> messages,
241+
TState initialState,
242+
Func<TState, ResolvedEvent, TState> evolve,
243+
CancellationToken ct
244+
) where TState : notnull =>
245+
messages.ProjectState(initialState, evolve, null, ct);
227246

228247
public static Task<StateAtPointInTime<TState>> GetStateAsync<TState>(
229248
this KurrentClient eventStore,

0 commit comments

Comments
 (0)