Skip to content

Commit 7cbfa23

Browse files
jamil7Jamil Maqdis Anton
and
Jamil Maqdis Anton
authored
New read api (#8)
* Changed Read's api * Removed ReadingDirection. Refactored ReadVersion and StartPosition. * Fixed tests. Co-authored-by: Jamil Maqdis Anton <[email protected]>
1 parent 41eb898 commit 7cbfa23

File tree

5 files changed

+186
-127
lines changed

5 files changed

+186
-127
lines changed

src/Read.fs

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,36 +3,62 @@ namespace SqlStreamStore.FSharp
33
open SqlStreamStore.Streams
44

55
module Read =
6-
let readFromAllStream (store: SqlStreamStore.IStreamStore)
7-
(readingDirection: ReadingDirection)
8-
(startPositionInclusive: StartPosition)
9-
(msgCount: int)
10-
: Async<Result<ReadAllPage, string>> =
11-
ReadRaw.readFromAllStream store readingDirection startPositionInclusive msgCount
6+
let allForwards (store: SqlStreamStore.IStreamStore)
7+
(startPositionInclusive: StartPosition)
8+
(msgCount: int)
9+
: Async<Result<ReadAllPage, string>> =
10+
ReadRaw.allForwards store startPositionInclusive msgCount
1211
|> ExceptionsHandler.asyncExceptionHandler
1312

14-
let readFromStream (store: SqlStreamStore.IStreamStore)
15-
(readingDirection: ReadingDirection)
13+
let allBackwards (store: SqlStreamStore.IStreamStore)
14+
(startPositionInclusive: StartPosition)
15+
(msgCount: int)
16+
: Async<Result<ReadAllPage, string>> =
17+
ReadRaw.allBackwards store startPositionInclusive msgCount
18+
|> ExceptionsHandler.asyncExceptionHandler
19+
20+
let streamForwards (store: SqlStreamStore.IStreamStore)
1621
(streamName: string)
1722
(readVersion: ReadVersion)
1823
(msgCount: int)
1924
: Async<Result<ReadStreamPage, string>> =
20-
ReadRaw.readFromStream store readingDirection streamName readVersion msgCount
25+
ReadRaw.streamForwards store streamName readVersion msgCount
26+
|> ExceptionsHandler.asyncExceptionHandler
27+
28+
let streamBackwards (store: SqlStreamStore.IStreamStore)
29+
(streamName: string)
30+
(readVersion: ReadVersion)
31+
(msgCount: int)
32+
: Async<Result<ReadStreamPage, string>> =
33+
ReadRaw.streamBackwards store streamName readVersion msgCount
34+
|> ExceptionsHandler.asyncExceptionHandler
35+
36+
let allForwardsPrefetch (store: SqlStreamStore.IStreamStore)
37+
(startPositionInclusive: StartPosition)
38+
(msgCount: int)
39+
: Async<Result<ReadAllPage, string>> =
40+
ReadRaw.allForwardsPrefetch store startPositionInclusive msgCount true
41+
|> ExceptionsHandler.asyncExceptionHandler
42+
43+
let allBackwardsPrefetch (store: SqlStreamStore.IStreamStore)
44+
(startPositionInclusive: StartPosition)
45+
(msgCount: int)
46+
: Async<Result<ReadAllPage, string>> =
47+
ReadRaw.allBackwardsPrefetch store startPositionInclusive msgCount true
2148
|> ExceptionsHandler.asyncExceptionHandler
2249

23-
let readFromAllStreamAndPrefetchJsonData (store: SqlStreamStore.IStreamStore)
24-
(readingDirection: ReadingDirection)
25-
(startPositionInclusive: StartPosition)
26-
(msgCount: int)
27-
: Async<Result<ReadAllPage, string>> =
28-
ReadRaw.readFromAllStream' store readingDirection startPositionInclusive msgCount true
50+
let streamForwardsPrefetch (store: SqlStreamStore.IStreamStore)
51+
(streamName: string)
52+
(readVersion: ReadVersion)
53+
(msgCount: int)
54+
: Async<Result<ReadStreamPage, string>> =
55+
ReadRaw.streamForwardsPrefetch store streamName readVersion msgCount true
2956
|> ExceptionsHandler.asyncExceptionHandler
3057

31-
let readFromStreamAndPrefetchJsonData (store: SqlStreamStore.IStreamStore)
32-
(readingDirection: ReadingDirection)
33-
(streamName: string)
34-
(readVersion: ReadVersion)
35-
(msgCount: int)
36-
: Async<Result<ReadStreamPage, string>> =
37-
ReadRaw.readFromStream' store readingDirection streamName readVersion msgCount true
58+
let streamBackwardsPrefetch (store: SqlStreamStore.IStreamStore)
59+
(streamName: string)
60+
(readVersion: ReadVersion)
61+
(msgCount: int)
62+
: Async<Result<ReadStreamPage, string>> =
63+
ReadRaw.streamBackwardsPrefetch store streamName readVersion msgCount true
3864
|> ExceptionsHandler.asyncExceptionHandler

src/ReadRaw.fs

Lines changed: 124 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -4,128 +4,170 @@ open System.Threading
44
open SqlStreamStore.Streams
55

66
module ReadRaw =
7-
let private fromReadVersion: ReadVersion -> int =
7+
let private fromReadVersionForwards: ReadVersion -> int =
88
function
9-
| ReadVersion.Start -> int (Position.Start)
10-
| ReadVersion.End -> int (Position.End)
9+
| ReadVersion.Any -> int (Position.Start)
1110
| ReadVersion.SpecificVersion version -> int (version)
1211

13-
let private fromStartPositionInclusive: StartPosition -> int64 =
12+
let private fromReadVersionBackwards: ReadVersion -> int =
1413
function
15-
| StartPosition.Start -> 0L
16-
| StartPosition.End -> -1L
14+
| ReadVersion.Any -> int (Position.End)
15+
| ReadVersion.SpecificVersion version -> int (version)
16+
17+
let private fromStartPositionInclusiveForwards: StartPosition -> int64 =
18+
function
19+
| StartPosition.Any -> 0L
20+
| StartPosition.SpecificPosition position -> position
21+
22+
let private fromStartPositionInclusiveBackwards: StartPosition -> int64 =
23+
function
24+
| StartPosition.Any -> -1L
1725
| StartPosition.SpecificPosition position -> position
1826

19-
let readFromAllStream (store: SqlStreamStore.IStreamStore)
20-
(readingDirection: ReadingDirection)
21-
(startPositionInclusive: StartPosition)
22-
(msgCount: int)
23-
: Async<ReadAllPage> =
27+
let allForwards (store: SqlStreamStore.IStreamStore)
28+
(startPositionInclusive: StartPosition)
29+
(msgCount: int)
30+
: Async<ReadAllPage> =
2431
async {
25-
return! match readingDirection with
26-
| ReadingDirection.Forward ->
27-
store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount)
28-
| ReadingDirection.Backward ->
29-
store.ReadAllBackwards(fromStartPositionInclusive startPositionInclusive, msgCount)
32+
return! store.ReadAllForwards(fromStartPositionInclusiveForwards startPositionInclusive, msgCount)
3033
|> Async.awaitTaskWithInnerException
3134
}
3235

33-
let readFromStream (store: SqlStreamStore.IStreamStore)
34-
(readingDirection: ReadingDirection)
35-
(streamName: string)
36-
(readVersion: ReadVersion)
37-
(msgCount: int)
38-
: Async<ReadStreamPage> =
36+
let allBackwards (store: SqlStreamStore.IStreamStore)
37+
(startPositionInclusive: StartPosition)
38+
(msgCount: int)
39+
: Async<ReadAllPage> =
3940
async {
40-
return! match readingDirection with
41-
| ReadingDirection.Forward ->
42-
store.ReadStreamForwards(StreamId(streamName), fromReadVersion readVersion, msgCount)
43-
| ReadingDirection.Backward ->
44-
store.ReadStreamBackwards(StreamId(streamName), fromReadVersion readVersion, msgCount)
41+
return! store.ReadAllBackwards(fromStartPositionInclusiveBackwards startPositionInclusive, msgCount)
4542
|> Async.awaitTaskWithInnerException
4643
}
4744

48-
let readFromAllStream' (store: SqlStreamStore.IStreamStore)
49-
(readingDirection: ReadingDirection)
50-
(startPositionInclusive: StartPosition)
51-
(msgCount: int)
52-
(prefetchJson: bool)
53-
: Async<ReadAllPage> =
45+
let streamForwards (store: SqlStreamStore.IStreamStore)
46+
(streamName: string)
47+
(readVersion: ReadVersion)
48+
(msgCount: int)
49+
: Async<ReadStreamPage> =
5450
async {
55-
return! match readingDirection with
56-
| ReadingDirection.Forward ->
57-
store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson)
58-
| ReadingDirection.Backward ->
59-
store.ReadAllBackwards
60-
(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson)
51+
return! store.ReadStreamForwards(StreamId(streamName), fromReadVersionForwards readVersion, msgCount)
6152
|> Async.awaitTaskWithInnerException
6253
}
6354

64-
let readFromStream' (store: SqlStreamStore.IStreamStore)
65-
(readingDirection: ReadingDirection)
55+
let streamBackwards (store: SqlStreamStore.IStreamStore)
6656
(streamName: string)
6757
(readVersion: ReadVersion)
6858
(msgCount: int)
69-
(prefetchJson: bool)
7059
: Async<ReadStreamPage> =
7160
async {
72-
return! match readingDirection with
73-
| ReadingDirection.Forward ->
74-
store.ReadStreamForwards
75-
(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson)
76-
| ReadingDirection.Backward ->
77-
store.ReadStreamBackwards
78-
(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson)
61+
return! store.ReadStreamBackwards(StreamId(streamName), fromReadVersionBackwards readVersion, msgCount)
7962
|> Async.awaitTaskWithInnerException
8063
}
8164

82-
let readFromAllStream'' (store: SqlStreamStore.IStreamStore)
83-
(readingDirection: ReadingDirection)
65+
let allForwardsPrefetch (store: SqlStreamStore.IStreamStore)
8466
(startPositionInclusive: StartPosition)
8567
(msgCount: int)
8668
(prefetchJson: bool)
87-
(cancellationToken: CancellationToken)
8869
: Async<ReadAllPage> =
8970
async {
90-
return! match readingDirection with
91-
| ReadingDirection.Forward ->
92-
store.ReadAllForwards
93-
(fromStartPositionInclusive startPositionInclusive,
94-
msgCount,
95-
prefetchJson,
96-
cancellationToken)
97-
| ReadingDirection.Backward ->
98-
store.ReadAllBackwards
99-
(fromStartPositionInclusive startPositionInclusive,
100-
msgCount,
101-
prefetchJson,
102-
cancellationToken)
71+
return! store.ReadAllForwards
72+
(fromStartPositionInclusiveForwards startPositionInclusive, msgCount, prefetchJson)
73+
|> Async.awaitTaskWithInnerException
74+
}
75+
76+
let allBackwardsPrefetch (store: SqlStreamStore.IStreamStore)
77+
(startPositionInclusive: StartPosition)
78+
(msgCount: int)
79+
(prefetchJson: bool)
80+
: Async<ReadAllPage> =
81+
async {
82+
return! store.ReadAllBackwards
83+
(fromStartPositionInclusiveBackwards startPositionInclusive, msgCount, prefetchJson)
84+
|> Async.awaitTaskWithInnerException
85+
}
86+
87+
let streamForwardsPrefetch (store: SqlStreamStore.IStreamStore)
88+
(streamName: string)
89+
(readVersion: ReadVersion)
90+
(msgCount: int)
91+
(prefetchJson: bool)
92+
: Async<ReadStreamPage> =
93+
async {
94+
return! store.ReadStreamForwards
95+
(StreamId(streamName), fromReadVersionForwards readVersion, msgCount, prefetchJson)
96+
|> Async.awaitTaskWithInnerException
97+
}
98+
99+
let streamBackwardsPrefetch (store: SqlStreamStore.IStreamStore)
100+
(streamName: string)
101+
(readVersion: ReadVersion)
102+
(msgCount: int)
103+
(prefetchJson: bool)
104+
: Async<ReadStreamPage> =
105+
async {
106+
return! store.ReadStreamBackwards
107+
(StreamId(streamName), fromReadVersionBackwards readVersion, msgCount, prefetchJson)
108+
|> Async.awaitTaskWithInnerException
109+
}
110+
111+
let allForwards' (store: SqlStreamStore.IStreamStore)
112+
(startPositionInclusive: StartPosition)
113+
(msgCount: int)
114+
(prefetchJson: bool)
115+
(cancellationToken: CancellationToken)
116+
: Async<ReadAllPage> =
117+
async {
118+
return! store.ReadAllForwards
119+
(fromStartPositionInclusiveForwards startPositionInclusive,
120+
msgCount,
121+
prefetchJson,
122+
cancellationToken)
123+
|> Async.awaitTaskWithInnerException
124+
}
125+
126+
let allBackwards' (store: SqlStreamStore.IStreamStore)
127+
(startPositionInclusive: StartPosition)
128+
(msgCount: int)
129+
(prefetchJson: bool)
130+
(cancellationToken: CancellationToken)
131+
: Async<ReadAllPage> =
132+
async {
133+
return! store.ReadAllBackwards
134+
(fromStartPositionInclusiveBackwards startPositionInclusive,
135+
msgCount,
136+
prefetchJson,
137+
cancellationToken)
138+
|> Async.awaitTaskWithInnerException
139+
}
140+
141+
let streamForwards' (store: SqlStreamStore.IStreamStore)
142+
(streamName: string)
143+
(readVersion: ReadVersion)
144+
(msgCount: int)
145+
(prefetchJson: bool)
146+
(cancellationToken: CancellationToken)
147+
: Async<ReadStreamPage> =
148+
async {
149+
return! store.ReadStreamForwards
150+
(StreamId(streamName),
151+
fromReadVersionForwards readVersion,
152+
msgCount,
153+
prefetchJson,
154+
cancellationToken)
103155
|> Async.awaitTaskWithInnerException
104156
}
105157

106-
let readFromStream'' (store: SqlStreamStore.IStreamStore)
107-
(readingDirection: ReadingDirection)
158+
let streamBackwards' (store: SqlStreamStore.IStreamStore)
108159
(streamName: string)
109160
(readVersion: ReadVersion)
110161
(msgCount: int)
111162
(prefetchJson: bool)
112163
(cancellationToken: CancellationToken)
113164
: Async<ReadStreamPage> =
114165
async {
115-
return! match readingDirection with
116-
| ReadingDirection.Forward ->
117-
store.ReadStreamForwards
118-
(StreamId(streamName),
119-
fromReadVersion readVersion,
120-
msgCount,
121-
prefetchJson,
122-
cancellationToken)
123-
| ReadingDirection.Backward ->
124-
store.ReadStreamBackwards
125-
(StreamId(streamName),
126-
fromReadVersion readVersion,
127-
msgCount,
128-
prefetchJson,
129-
cancellationToken)
166+
return! store.ReadStreamBackwards
167+
(StreamId(streamName),
168+
fromReadVersionBackwards readVersion,
169+
msgCount,
170+
prefetchJson,
171+
cancellationToken)
130172
|> Async.awaitTaskWithInnerException
131173
}

src/Types.fs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,10 @@ type AppendVersion =
2020

2121
[<RequireQualifiedAccessAttribute>]
2222
type ReadVersion =
23-
| Start
24-
| End
23+
| Any
2524
| SpecificVersion of uint
2625

2726
[<RequireQualifiedAccessAttribute>]
2827
type StartPosition =
29-
| Start
30-
| End
28+
| Any
3129
| SpecificPosition of int64
32-
33-
[<RequireQualifiedAccessAttribute>]
34-
type ReadingDirection =
35-
| Forward
36-
| Backward

tests/ReadRawTests.fs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ let tests =
3333
do! AppendRaw.appendNewMessages inMemStore streamName appendVersion msgList
3434
|> Async.Ignore
3535

36-
let readVersion = ReadVersion.Start
36+
let readVersion = ReadVersion.Any
3737

38-
let! readResult = ReadRaw.readFromStream inMemStore ReadingDirection.Forward streamName readVersion 10
38+
let! readResult = ReadRaw.streamForwards inMemStore streamName readVersion 10
3939

4040
readResult.Messages
4141
|> Array.sortBy (fun msg -> msg.MessageId)
@@ -65,9 +65,9 @@ let tests =
6565
do! AppendRaw.appendNewMessages inMemStore streamName appendVersion msgList
6666
|> Async.Ignore
6767

68-
let readVersion = ReadVersion.End
68+
let readVersion = ReadVersion.Any
6969

70-
let! readResult = ReadRaw.readFromStream inMemStore ReadingDirection.Backward streamName readVersion 10
70+
let! readResult = ReadRaw.streamBackwards inMemStore streamName readVersion 10
7171

7272
readResult.Messages
7373
|> Array.sortBy (fun msg -> msg.MessageId)
@@ -101,7 +101,7 @@ let tests =
101101
do! AppendRaw.appendNewMessage inMemStore stream2 appendVersion msg2
102102
|> Async.Ignore
103103

104-
let! readResult = ReadRaw.readFromAllStream inMemStore ReadingDirection.Forward StartPosition.Start 10
104+
let! readResult = ReadRaw.allForwards inMemStore StartPosition.Any 10
105105

106106
readResult.Messages
107107
|> Array.sortBy (fun msg -> msg.MessageId)
@@ -135,7 +135,7 @@ let tests =
135135
do! AppendRaw.appendNewMessage inMemStore stream2 appendVersion msg2
136136
|> Async.Ignore
137137

138-
let! readResult = ReadRaw.readFromAllStream inMemStore ReadingDirection.Backward StartPosition.End 10
138+
let! readResult = ReadRaw.allBackwards inMemStore StartPosition.Any 10
139139

140140
readResult.Messages
141141
|> Array.sortBy (fun msg -> msg.MessageId)

0 commit comments

Comments
 (0)