Skip to content

Commit 692bcdb

Browse files
authored
Add GetAll module (#14)
1 parent a7aa351 commit 692bcdb

File tree

3 files changed

+79
-20
lines changed

3 files changed

+79
-20
lines changed

src/Get.fs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ open SqlStreamStore.Streams
55

66
module Get =
77

8+
// A function to help wit type inference in this module
89
let private curriedMap : (ReadStreamPage -> 'a) -> AsyncResult<ReadStreamPage, exn> -> AsyncResult<'a, exn> =
910
AsyncResult.map
1011

@@ -37,6 +38,39 @@ module Get =
3738
let nextStreamVersion =
3839
curriedMap (fun page -> page.NextStreamVersion)
3940

41+
let nextStreamPage =
42+
AsyncResult.bind (fun (page: ReadStreamPage) -> page.ReadNext |> AsyncResult.ofTask)
43+
44+
module GetAll =
45+
46+
// A function to help wit type inference in this module
47+
let private curriedMap : (ReadAllPage -> 'a) -> AsyncResult<ReadAllPage, exn> -> AsyncResult<'a, exn> =
48+
AsyncResult.map
49+
50+
let messages =
51+
curriedMap (fun page -> page.Messages |> Array.toList)
52+
53+
let messagesData =
54+
messages
55+
>> AsyncResult.bind (List.traverseAsyncResultM (fun msg -> msg.GetJsonData()))
56+
57+
let messagesDataAs<'data> =
58+
messages
59+
>> AsyncResult.bind (List.traverseAsyncResultM (fun msg -> msg.GetJsonDataAs<'data>()))
60+
61+
let direction = curriedMap (fun page -> page.Direction)
62+
63+
let fromPosition =
64+
curriedMap (fun page -> page.FromPosition)
65+
66+
let isEnd = curriedMap (fun page -> page.IsEnd)
67+
68+
let nextPosition =
69+
curriedMap (fun page -> page.NextPosition)
70+
71+
let nextAllStreamPage =
72+
AsyncResult.bind (fun (page: ReadAllPage) -> page.ReadNext |> AsyncResult.ofTask)
73+
4074

4175
namespace SqlStreamStore.FSharp.EventSourcing
4276

@@ -59,6 +93,10 @@ module Get =
5993
events<'event>
6094
>> AsyncResult.bind (List.traverseAsyncResultM (fun event -> event.data))
6195

96+
let eventDataAsString<'event> =
97+
events<'event>
98+
>> AsyncResult.bind (List.traverseAsyncResultM (fun event -> event.dataAsString))
99+
62100
let eventsAndEventsData<'event> =
63101
fun (page: AsyncResult<ReadStreamPage, exn>) ->
64102
asyncResult {
@@ -67,6 +105,28 @@ module Get =
67105
return List.zip events' data
68106
}
69107

108+
module GetAll =
109+
let events<'event> =
110+
GetAll.messages
111+
>> Async.map (
112+
Result.bind (
113+
List.filter (fun msg -> Seq.contains msg.Type (getEventUnionCases<'event> ()))
114+
>> List.traverseResultM StreamEvent.ofStreamMessage<'event>
115+
)
116+
)
117+
118+
let eventsData<'event> =
119+
events<'event>
120+
>> AsyncResult.bind (List.traverseAsyncResultM (fun event -> event.data))
121+
70122
let eventDataAsString<'event> =
71123
events<'event>
72124
>> AsyncResult.bind (List.traverseAsyncResultM (fun event -> event.dataAsString))
125+
126+
let eventsAndEventsData<'event> =
127+
fun (page: AsyncResult<ReadAllPage, exn>) ->
128+
asyncResult {
129+
let! events' = events<'event> page
130+
let! data = List.traverseAsyncResultM (fun event -> event.data) events'
131+
return List.zip events' data
132+
}

src/SqlStreamStore.FSharp.fsproj

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,26 +10,26 @@
1010
<PackageLicenseExpression>MIT</PackageLicenseExpression>
1111
<PackageTags>SqlStreamStore; FSharp; postgresql; cqrs; event-sourcing; event-store; stream-store</PackageTags>
1212
</PropertyGroup>
13-
13+
1414
<ItemGroup>
15-
<PackageReference Update="FSharp.Core" Version="5.0.0" />
16-
<PackageReference Include="FSharp.Prelude" Version="3.0.0" />
17-
<PackageReference Include="FSharp.SystemTextJson" Version="0.16.6" />
18-
<PackageReference Include="Npgsql" Version="5.0.4" />
19-
<PackageReference Include="SqlStreamStore" Version="1.2.0-beta.8" />
20-
<PackageReference Include="SqlStreamStore.Postgres" Version="1.2.0-beta.8" />
15+
<PackageReference Update="FSharp.Core" Version="5.0.0"/>
16+
<PackageReference Include="FSharp.Prelude" Version="3.0.0"/>
17+
<PackageReference Include="FSharp.SystemTextJson" Version="0.16.6"/>
18+
<PackageReference Include="Npgsql" Version="5.0.4"/>
19+
<PackageReference Include="SqlStreamStore" Version="1.2.0-beta.8"/>
20+
<PackageReference Include="SqlStreamStore.Postgres" Version="1.2.0-beta.8"/>
2121
</ItemGroup>
22-
22+
2323
<ItemGroup>
24-
<Compile Include="Infrastructure.fs" />
25-
<Compile Include="SqlStreamStoreExtensions.fs" />
26-
<Compile Include="StreamEvent.fs" />
27-
<Compile Include="Create.fs" />
28-
<Compile Include="Connect.fs" />
29-
<Compile Include="Append.fs" />
30-
<Compile Include="Read.fs" />
31-
<Compile Include="Get.fs" />
32-
<Compile Include="Subscribe.fs" />
24+
<Compile Include="Infrastructure.fs"/>
25+
<Compile Include="SqlStreamStoreExtensions.fs"/>
26+
<Compile Include="StreamEvent.fs"/>
27+
<Compile Include="Create.fs"/>
28+
<Compile Include="Connect.fs"/>
29+
<Compile Include="Append.fs"/>
30+
<Compile Include="Read.fs"/>
31+
<Compile Include="Get.fs"/>
32+
<Compile Include="Subscribe.fs"/>
3333
</ItemGroup>
34-
34+
3535
</Project>

src/SqlStreamStoreExtensions.fs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ module SqlStreamExtensions =
1717
let private getJsonDataAs<'a> (streamMessage: StreamMessage) =
1818
asyncResult {
1919
let! json = getJsonData streamMessage
20-
return JayJson.decode<'a> json
20+
return! JayJson.decode<'a> json
2121
}
2222

2323
type StreamMessage with
@@ -264,7 +264,6 @@ module SqlStreamExtensions =
264264

265265
readStreamForwards this streamId fromVersionInclusive' maxCount' prefetch' cancellationToken'
266266

267-
268267
/// Lists Streams in SQLStreamStore.
269268
/// Defaults: maxCount = 1000, continuationToken = null
270269
member this.ListStreams(?maxCount: int, ?continuationToken: string) =

0 commit comments

Comments
 (0)