Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename TransactAsync -> Transact #314

Merged
merged 5 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ The `Unreleased` section name is replaced by the expected version of next releas
## [Unreleased]

### Added

- `Equinox`: `Decider.Transact(interpret : 'state -> Async<'event list>)` [#314](https://github.com/jet/equinox/pull/314)

### Changed

- `eqx`/`Equinox.Tool`: Flip `-P` option to opt _in_ to pretty printing [#313](https://github.com/jet/equinox/pull/313)
- `Equinox`: rename `Decider.TransactAsync` to `Transact` [#314](https://github.com/jet/equinox/pull/314)
- `CosmosStore`: Require `Microsoft.Azure.Cosmos` v `3.0.25` [#310](https://github.com/jet/equinox/pull/310)
- `CosmosStore`: Switch to natively using `JsonElement` event bodies [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
- `CosmosStore`: Switch to natively using `System.Text.Json` for serialization of all `Microsoft.Azure.Cosmos` round-trips [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
Expand Down
17 changes: 5 additions & 12 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ Equinox’s Command Handling consists of < 200 lines including interfaces and
comments in https://github.com/jet/equinox/tree/master/src/Equinox - the
elements you'll touch in a normal application are:

- [`module Flow`](https://github.com/jet/equinox/blob/master/src/Equinox/Flow.fs#L34) -
- [`module Flow`](https://github.com/jet/equinox/blob/master/src/Equinox/Core.fs#L34) -
internal implementation of Optimistic Concurrency Control / retry loop used
by `Decider`. It's recommended to at least scan this file as it defines the
Transaction semantics that are central to Equinox and the overall `Decider` concept.
Expand Down Expand Up @@ -854,13 +854,6 @@ let create resolve =
Service(resolve)
```

The `Decider`-related functions in a given Aggregate establish the access
patterns used across when Service methods access streams (see below). Typically
these are relatively straightforward calls forwarding to a `Equinox.Decider`
equivalent (see [`src/Equinox/Decider.fs`](src/Equinox/Decider.fs)), which in
turn use the Optimistic Concurrency retry-loop in
[`src/Equinox/Flow.fs`](src/Equinox/Flow.fs).

`Read` above will do a roundtrip to the Store in order to fetch the most recent
state (in `AllowStale` mode, the store roundtrip can be optimized out by
reading through the cache). This Synchronous Read can be used to
Expand Down Expand Up @@ -1326,7 +1319,7 @@ type Service internal (resolve : CartId -> Equinox.Decider<Events.Event, Fold.St

member _.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async<Fold.State> =
let decider = resolve (cartId,if optimistic then Some Equinox.AllowStale else None)
decider.TransactAsync(fun state -> async {
decider.Transact(fun state -> async {
match prepare with None -> () | Some prep -> do! prep
return interpretMany Fold.fold (Seq.map interpret commands) state })
```
Expand Down Expand Up @@ -1368,7 +1361,7 @@ type Accumulator<'event, 'state>(fold : 'state -> 'event seq -> 'state, originSt
interpret x.State |> accumulated.AddRange
/// Invoke an Async decision function, gathering the events (if any) that
/// it decides are necessary into the `Accumulated` sequence
member x.TransactAsync(interpret : 'state -> Async<'event list>) : Async<unit> = async {
member x.Transact(interpret : 'state -> Async<'event list>) : Async<unit> = async {
let! events = interpret x.State
accumulated.AddRange events }
/// Invoke a decision function, while also propagating a result yielded as
Expand All @@ -1379,15 +1372,15 @@ type Accumulator<'event, 'state>(fold : 'state -> 'event seq -> 'state, originSt
result
/// Invoke a decision function, while also propagating a result yielded as
/// the fst of an (result, events) pair
member x.TransactAsync(decide : 'state -> Async<'result * 'event list>) : Async<'result> = async {
member x.Transact(decide : 'state -> Async<'result * 'event list>) : Async<'result> = async {
let! result, newEvents = decide x.State
accumulated.AddRange newEvents
return result }

type Service ... =
member _.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async<Fold.State> =
let decider = resolve (cartId,if optimistic then Some Equinox.AllowStale else None)
decider.TransactAsync(fun state -> async {
decider.Transact(fun state -> async {
match prepare with None -> () | Some prep -> do! prep
let acc = Accumulator(Fold.fold, state)
for cmd in commands do
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ Ouch, not looking forward to reading all that logic :frown: ? [Have a read, it's

> I'm having some trouble understanding how Equinox+ESDB handles "expected version". Most of the examples use `Equinox.Decider.Transact` which is storage agnostic and doesn't offer any obvious concurrency checking. In `Equinox.EventStore.Context`, there's a `Sync` and `TrySync` that take a `Token` which holds a `streamVersion`. Should I be be using that instead of `Transact`?

The bulk of the implementation is in [`Equinox/Flow.fs`](https://github.com/jet/equinox/blob/master/src/Equinox/Flow.fs)
The bulk of the implementation is in [`Equinox/Decider.fs`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs)

There are [sequence diagrams in Documentation MD](https://github.com/jet/equinox/blob/master/DOCUMENTATION.md#code-diagrams-for-equinoxeventstore--equinoxsqlstreamstore) but I'll summarize here:

Expand All @@ -763,7 +763,7 @@ b. for CosmosDB, the `expectedVersion` can actually be an `expectedEtag` - this

(The second usage did not necessitate an interface change - i.e. the Token mechanism was introduced to handle the first case, and just happened to fit the second case)

> Alternatively, I'm seeing in `proReactor` that there's a decide that does version checking. Is this recommended? [code](https://github.com/jet/dotnet-templates/blob/3329510601450ab77bcc40df7a407c5f0e3c8464/propulsion-reactor/TodoSummary.fs#L30-L52)
> Alternatively, I'm seeing in `proReactor` that there's a `decide` that does version checking. Is this recommended? [code](https://github.com/jet/dotnet-templates/blob/3329510601450ab77bcc40df7a407c5f0e3c8464/propulsion-reactor/TodoSummary.fs#L30-L52)

If you need to know the version in your actual handler, QueryEx and other such APIs alongside Transact expose it (e.g. if you want to include a version to accompany a directly rendered piece of data). (Note that doing this - including a version in a rendering of something should not be a goto strategy - i.e. having APIs that pass around expectedVersion is not a good idea in general)

Expand Down
6 changes: 3 additions & 3 deletions samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type Accumulator<'event, 'state>(fold : 'state -> 'event seq -> 'state, originSt
member x.Transact(interpret : 'state -> 'event list) : unit =
interpret x.State |> accumulated.AddRange
/// Invoke an Async decision function, gathering the events (if any) that it decides are necessary into the `Accumulated` sequence
member x.TransactAsync(interpret : 'state -> Async<'event list>) : Async<unit> = async {
member x.Transact(interpret : 'state -> Async<'event list>) : Async<unit> = async {
let! events = interpret x.State
accumulated.AddRange events }
/// Invoke a decision function, while also propagating a result yielded as the fst of an (result, events) pair
Expand All @@ -121,7 +121,7 @@ type Accumulator<'event, 'state>(fold : 'state -> 'event seq -> 'state, originSt
accumulated.AddRange newEvents
result
/// Invoke a decision function, while also propagating a result yielded as the fst of an (result, events) pair
member x.TransactAsync(decide : 'state -> Async<'result * 'event list>) : Async<'result> = async {
member x.Transact(decide : 'state -> Async<'result * 'event list>) : Async<'result> = async {
let! result, newEvents = decide x.State
accumulated.AddRange newEvents
return result }
Expand All @@ -138,7 +138,7 @@ type Service internal (resolve : CartId * Equinox.ResolveOption option -> Equino

member _.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async<Fold.State> =
let decider = resolve (cartId,if optimistic then Some Equinox.AllowStale else None)
decider.TransactAsync(fun state -> async {
decider.Transact(fun state -> async {
match prepare with None -> () | Some prep -> do! prep
#if ACCUMULATOR
let acc = Accumulator(Fold.fold, state)
Expand Down
2 changes: 1 addition & 1 deletion samples/Store/Domain/SavedForLater.fs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.

let remove clientId (resolveCommand : ((SkuId->bool) -> Async<Command>)) : Async<unit> =
let decider = resolve clientId
decider.TransactAsync(fun (state : Fold.State) -> async {
decider.Transact(fun (state : Fold.State) -> async {
let contents = seq { for item in state -> item.skuId } |> set
let! cmd = resolveCommand contents.Contains
let _, events = decide maxSavedItems cmd state
Expand Down
File renamed without changes.
6 changes: 3 additions & 3 deletions src/Equinox/Decider.fs
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ type Decider<'event, 'state>
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
/// 2. Yield result
member _.TransactAsync(decide : 'state -> Async<'result * 'event list>) : Async<'result> =
member _.Transact(decide : 'state -> Async<'result * 'event list>) : Async<'result> =
transact (fun context -> decide context.State) (fun result _context -> result)

/// 0. Invoke the supplied <c>_Async_</c> <c>decide</c> function with the present state (including extended context), holding the <c>'result</c>
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
/// 2. Uses <c>mapResult</c> to render the final outcome from the <c>'result</c> and/or the final <c>ISyncContext</c>
/// 3. Yields the outcome
/// 2. Uses <c>mapResult</c> to render the final 'view from the <c>'result</c> and/or the final <c>ISyncContext</c>
/// 3. Yields the 'view
member _.TransactEx(decide : ISyncContext<'state> -> Async<'result * 'event list>, mapResult : 'result -> ISyncContext<'state> -> 'view) : Async<'view> =
transact decide mapResult

Expand Down
2 changes: 1 addition & 1 deletion src/Equinox/Equinox.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="Flow.fs" />
<Compile Include="Core.fs" />
<Compile Include="Decider.fs" />
</ItemGroup>

Expand Down