@@ -7,32 +7,32 @@ module Patterns.Domain.ExactlyOnceIngester
77
88open FSharp.UMX // %
99
10- type IngestResult < 'req , 'res > = { accepted : 'res []; closed : bool ; residual : 'req [] }
10+ type IngestResult < 'req , 'res > = { accepted: 'res []; closed: bool ; residual: 'req [] }
1111
1212module Internal =
1313
1414 let unknown <[< Measure >] 'm > = UMX.tag - 1
15- let next <[< Measure >] 'm > ( value : int < 'm >) = UMX.tag< 'm>( UMX.untag value + 1 )
15+ let next <[< Measure >] 'm > ( value : int < 'm >) = UMX.tag< 'm>( UMX.untag value + 1 )
1616
1717/// Ensures any given item is only added to the series exactly once by virtue of the following protocol:
1818/// 1 . Caller obtains an origin epoch via ActiveIngestionEpochId, storing that alongside the source item
1919/// 2 . Caller deterministically obtains that origin epoch to supply to Ingest/TryIngest such that retries can be idempotent
2020type Service <[< Measure >] 'id , 'req , 'res , 'outcome > internal
21- ( log : Serilog.ILogger,
22- readActiveEpoch : unit -> Async< int< 'id>>,
23- markActiveEpoch : int< 'id> -> Async< unit>,
24- ingest : int< 'id> * 'req [] -> Async< IngestResult< 'req, 'res>>,
25- mapResults : 'res [] -> 'outcome seq,
21+ ( log: Serilog.ILogger,
22+ readActiveEpoch: unit -> Async< int< 'id>>,
23+ markActiveEpoch: int< 'id> -> Async< unit>,
24+ ingest: int< 'id> * 'req [] -> Async< IngestResult< 'req, 'res>>,
25+ mapResults: 'res [] -> 'outcome seq,
2626 linger) =
2727
28- let uninitializedSentinel : int = % Internal.unknown
28+ let uninitializedSentinel : int = % Internal.unknown
2929 let mutable currentEpochId_ = uninitializedSentinel
3030 let currentEpochId () = if currentEpochId_ <> uninitializedSentinel then Some % currentEpochId_ else None
3131
32- let tryIngest ( reqs : ( int < 'id > * 'req )[][]) =
32+ let tryIngest ( reqs : ( int < 'id > * 'req )[][]) =
3333 let rec aux ingestedItems items = async {
3434 let epochId = items |> Seq.map fst |> Seq.min
35- let epochItems , futureEpochItems = items |> Array.partition ( fun ( e , _ : 'req ) -> e = epochId)
35+ let epochItems , futureEpochItems = items |> Array.partition ( fun ( e , _ : 'req ) -> e = epochId)
3636 let! res = ingest ( epochId, Array.map snd epochItems)
3737 let ingestedItemIds = Array.append ingestedItems res.accepted
3838 let logLevel =
@@ -56,7 +56,7 @@ type Service<[<Measure>]'id, 'req, 'res, 'outcome> internal
5656
5757 /// In the overall processing using an Ingester, we frequently have a Scheduler running N streams concurrently
5858 /// If each thread works in isolation, they'll conflict with each other as they feed the Items into the batch in epochs.Ingest
59- /// Instead, we enable concurrent requests to coalesce by having requests converge in this AsyncBatchingGate
59+ /// Instead, we enable concurrent requests to coalesce by having requests converge in this Batcher
6060 /// This has the following critical effects:
6161 /// - Traffic to CosmosDB is naturally constrained to a single flight in progress
6262 /// (BatchingGate does not release next batch for execution until current has succeeded or throws)
@@ -65,11 +65,11 @@ type Service<[<Measure>]'id, 'req, 'res, 'outcome> internal
6565 /// a) back-off, re-read and retry if there's a concurrent write Optimistic Concurrency Check failure when writing the stream
6666 /// b) enter a prolonged period of retries if multiple concurrent writes trigger rate limiting and 429s from CosmosDB
6767 /// c) readers will less frequently encounter sustained 429s on the batch
68- let batchedIngest = Equinox.Core.AsyncBatchingGate ( tryIngest, linger)
68+ let batchedIngest = Equinox.Core.Batching.Batcher ( tryIngest, linger)
6969
7070 /// Run the requests over a chain of epochs.
7171 /// Returns the subset that actually got handled this time around (i.e., exclusive of items that did not trigger writes per the idempotency rules).
72- member _.IngestMany ( originEpoch , reqs ) : Async < 'outcome seq > = async {
72+ member _.IngestMany ( originEpoch , reqs ): Async < 'outcome seq > = async {
7373 if Array.isEmpty reqs then return Seq.empty else
7474
7575 let! results = batchedIngest.Execute [| for x in reqs -> originEpoch, x |]
@@ -80,7 +80,7 @@ type Service<[<Measure>]'id, 'req, 'res, 'outcome> internal
8080 /// The fact that any Ingest call for a given item (or set of items) always commences from the same origin is key to exactly once insertion guarantee.
8181 /// Caller should first store this alongside the item in order to deterministically be able to start from the same origin in idempotent retry cases.
8282 /// Uses cached values as epoch transitions are rare, and caller needs to deal with the inherent race condition in any case
83- member _.ActiveIngestionEpochId () : Async < int < 'id >> =
83+ member _.ActiveIngestionEpochId (): Async < int < 'id >> =
8484 match currentEpochId () with
8585 | Some currentEpochId -> async { return currentEpochId }
8686 | None -> readActiveEpoch()
0 commit comments