Skip to content

Commit f88e77c

Browse files
author
Jamil Maqdis Anton
committed
Refactor async.Delay calls to more familiar async blocks. Await tasks with awaitTaskWithInnerException instead of normal AwaitTask to handle AggregateExceptions.
1 parent c5a4c05 commit f88e77c

File tree

3 files changed

+90
-61
lines changed

3 files changed

+90
-61
lines changed

src/AppendRaw.fs

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,26 @@ module AppendRaw =
2626
(appendVersion: AppendVersion)
2727
(messageDetails: MessageDetails)
2828
: Async<AppendResult> =
29-
async.Delay(fun () ->
30-
store.AppendToStream
31-
(StreamId(streamName),
32-
fromAppendVersion appendVersion,
33-
[| newStreamMessageFromMessageDetails messageDetails |])
34-
|> Async.AwaitTask)
29+
async {
30+
return! store.AppendToStream
31+
(StreamId(streamName),
32+
fromAppendVersion appendVersion,
33+
[| newStreamMessageFromMessageDetails messageDetails |])
34+
|> Async.awaitTaskWithInnerException
35+
}
36+
3537

3638
let appendNewMessages (store: SqlStreamStore.IStreamStore)
3739
(streamName: string)
3840
(appendVersion: AppendVersion)
3941
(messages: MessageDetails list)
4042
: Async<AppendResult> =
41-
async.Delay(fun () ->
42-
store.AppendToStream
43-
(StreamId(streamName),
44-
fromAppendVersion appendVersion,
45-
messages
46-
|> List.map newStreamMessageFromMessageDetails
47-
|> List.toArray)
48-
|> Async.AwaitTask)
43+
async {
44+
return! store.AppendToStream
45+
(StreamId(streamName),
46+
fromAppendVersion appendVersion,
47+
messages
48+
|> List.map newStreamMessageFromMessageDetails
49+
|> List.toArray)
50+
|> Async.awaitTaskWithInnerException
51+
}

src/Postgres.fs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ module Postgres =
6161
new SqlStreamStore.PostgresStreamStore(SqlStreamStore.PostgresStreamStoreSettings(config))
6262

6363
let createSchemaRaw (store: SqlStreamStore.PostgresStreamStore): Async<unit> =
64-
async.Delay(fun () -> store.CreateSchemaIfNotExists() |> Async.AwaitTask)
64+
async {
65+
return! store.CreateSchemaIfNotExists()
66+
|> Async.awaitTaskWithInnerException'
67+
}
6568

6669
let createSchema (store: SqlStreamStore.PostgresStreamStore): Async<Result<unit, string>> =
6770
createSchemaRaw store

src/ReadRaw.fs

Lines changed: 69 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -21,41 +21,45 @@ module ReadRaw =
2121
(startPositionInclusive: StartPosition)
2222
(msgCount: int)
2323
: Async<ReadAllPage> =
24-
async.Delay(fun () ->
25-
match readingDirection with
26-
| ReadingDirection.Forward ->
27-
store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount)
28-
| ReadingDirection.Backward ->
29-
store.ReadAllBackwards(fromStartPositionInclusive startPositionInclusive, msgCount)
30-
|> Async.AwaitTask)
24+
async {
25+
return! match readingDirection with
26+
| ReadingDirection.Forward ->
27+
store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount)
28+
| ReadingDirection.Backward ->
29+
store.ReadAllBackwards(fromStartPositionInclusive startPositionInclusive, msgCount)
30+
|> Async.awaitTaskWithInnerException
31+
}
3132

3233
let readFromStream (store: SqlStreamStore.IStreamStore)
3334
(readingDirection: ReadingDirection)
3435
(streamName: string)
3536
(readVersion: ReadVersion)
3637
(msgCount: int)
3738
: Async<ReadStreamPage> =
38-
async.Delay(fun () ->
39-
match readingDirection with
40-
| ReadingDirection.Forward ->
41-
store.ReadStreamForwards(StreamId(streamName), fromReadVersion readVersion, msgCount)
42-
| ReadingDirection.Backward ->
43-
store.ReadStreamBackwards(StreamId(streamName), fromReadVersion readVersion, msgCount)
44-
|> Async.AwaitTask)
39+
async {
40+
return! match readingDirection with
41+
| ReadingDirection.Forward ->
42+
store.ReadStreamForwards(StreamId(streamName), fromReadVersion readVersion, msgCount)
43+
| ReadingDirection.Backward ->
44+
store.ReadStreamBackwards(StreamId(streamName), fromReadVersion readVersion, msgCount)
45+
|> Async.awaitTaskWithInnerException
46+
}
4547

4648
let readFromAllStream' (store: SqlStreamStore.IStreamStore)
4749
(readingDirection: ReadingDirection)
4850
(startPositionInclusive: StartPosition)
4951
(msgCount: int)
5052
(prefetchJson: bool)
5153
: Async<ReadAllPage> =
52-
async.Delay(fun () ->
53-
match readingDirection with
54-
| ReadingDirection.Forward ->
55-
store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson)
56-
| ReadingDirection.Backward ->
57-
store.ReadAllBackwards(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson)
58-
|> Async.AwaitTask)
54+
async {
55+
return! match readingDirection with
56+
| ReadingDirection.Forward ->
57+
store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson)
58+
| ReadingDirection.Backward ->
59+
store.ReadAllBackwards
60+
(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson)
61+
|> Async.awaitTaskWithInnerException
62+
}
5963

6064
let readFromStream' (store: SqlStreamStore.IStreamStore)
6165
(readingDirection: ReadingDirection)
@@ -64,13 +68,16 @@ module ReadRaw =
6468
(msgCount: int)
6569
(prefetchJson: bool)
6670
: Async<ReadStreamPage> =
67-
async.Delay(fun () ->
68-
match readingDirection with
69-
| ReadingDirection.Forward ->
70-
store.ReadStreamForwards(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson)
71-
| ReadingDirection.Backward ->
72-
store.ReadStreamBackwards(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson)
73-
|> Async.AwaitTask)
71+
async {
72+
return! match readingDirection with
73+
| ReadingDirection.Forward ->
74+
store.ReadStreamForwards
75+
(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson)
76+
| ReadingDirection.Backward ->
77+
store.ReadStreamBackwards
78+
(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson)
79+
|> Async.awaitTaskWithInnerException
80+
}
7481

7582
let readFromAllStream'' (store: SqlStreamStore.IStreamStore)
7683
(readingDirection: ReadingDirection)
@@ -79,15 +86,22 @@ module ReadRaw =
7986
(prefetchJson: bool)
8087
(cancellationToken: CancellationToken)
8188
: Async<ReadAllPage> =
82-
async.Delay(fun () ->
83-
match readingDirection with
84-
| ReadingDirection.Forward ->
85-
store.ReadAllForwards
86-
(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson, cancellationToken)
87-
| ReadingDirection.Backward ->
88-
store.ReadAllBackwards
89-
(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson, cancellationToken)
90-
|> Async.AwaitTask)
89+
async {
90+
return! match readingDirection with
91+
| ReadingDirection.Forward ->
92+
store.ReadAllForwards
93+
(fromStartPositionInclusive startPositionInclusive,
94+
msgCount,
95+
prefetchJson,
96+
cancellationToken)
97+
| ReadingDirection.Backward ->
98+
store.ReadAllBackwards
99+
(fromStartPositionInclusive startPositionInclusive,
100+
msgCount,
101+
prefetchJson,
102+
cancellationToken)
103+
|> Async.awaitTaskWithInnerException
104+
}
91105

92106
let readFromStream'' (store: SqlStreamStore.IStreamStore)
93107
(readingDirection: ReadingDirection)
@@ -97,12 +111,21 @@ module ReadRaw =
97111
(prefetchJson: bool)
98112
(cancellationToken: CancellationToken)
99113
: Async<ReadStreamPage> =
100-
async.Delay(fun () ->
101-
match readingDirection with
102-
| ReadingDirection.Forward ->
103-
store.ReadStreamForwards
104-
(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson, cancellationToken)
105-
| ReadingDirection.Backward ->
106-
store.ReadStreamBackwards
107-
(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson, cancellationToken)
108-
|> Async.AwaitTask)
114+
async {
115+
return! match readingDirection with
116+
| ReadingDirection.Forward ->
117+
store.ReadStreamForwards
118+
(StreamId(streamName),
119+
fromReadVersion readVersion,
120+
msgCount,
121+
prefetchJson,
122+
cancellationToken)
123+
| ReadingDirection.Backward ->
124+
store.ReadStreamBackwards
125+
(StreamId(streamName),
126+
fromReadVersion readVersion,
127+
msgCount,
128+
prefetchJson,
129+
cancellationToken)
130+
|> Async.awaitTaskWithInnerException
131+
}

0 commit comments

Comments
 (0)