-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathServiceRequest.fs
261 lines (227 loc) · 12.2 KB
/
ServiceRequest.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
/// QueueProcessing based on state machine agent
module Microsoft.FSharpLu.Actor.ServiceRequests
open FSharp.Control
open FSharp.Reflection
/// A stateless request with header of type 'Header
/// and input of type 'Input
type StatelessRequest<'Input> =
{
input : 'Input
}
/// A stateful request with header of type 'Header
/// input of type 'Input and state 'State
type StatefulRequest<'Input, 'State> =
{
input : 'Input
state : 'State
}
/// Implementing dynamic casting of a service request type into StatefulRequest<_,_>
/// using precomputeed reflective operations on type 'Request
type CachedReflection<'Request> () =
static let caseInfo =
FSharpType.GetUnionCases(typeof< 'Request>)
static let caseConstructors =
caseInfo
|> Array.map FSharpValue.PreComputeUnionConstructor
static let caseInfoReaders =
caseInfo
|> Array.map FSharpValue.PreComputeUnionReader
static let readCaseTag =
FSharpValue.PreComputeUnionTagReader(typeof< 'Request>)
/// Cast a service request as StatefulRequest<_,_>
static member tryCastAsStatefulRequest<'Input, 'State> (r: 'Request) =
let tagNumber = readCaseTag r
let fields = caseInfoReaders.[tagNumber] (r:>obj)
let firstField = fields.[0]
let isStatefulRequest = firstField.GetType().GetGenericTypeDefinition() = typeof<StatefulRequest<'Input,'State>>.GetGenericTypeDefinition()
if isStatefulRequest then
Some <| (firstField :?> StatefulRequest<'Input,'State>)
else
None
static member castAsStatefulRequest<'Input, 'State> (r:'Request) =
match CachedReflection<'Request>.tryCastAsStatefulRequest<'Input, 'State> r with
| Some v -> v
| None -> failwith "Invalid request type: expecting a service request of type StatefulRequest<_,_,_>"
/// Return the Discriminated Union constructor that was used to create the specified request
static member getStatefulRequestConstructor<'Input, 'State> (r: 'Request) =
let tagNumber = readCaseTag r
let constructor = caseConstructors.[tagNumber]
let cons (statefulRequest:StatefulRequest<'Input,'State>) : 'Request =
constructor [| statefulRequest:>obj |] :?> 'Request
cons
/// Extract the state of a request using type reflection (not ideal but best we can do given lack of higher-order type polymorphism in F#)
static member tryExtractState<'Input, 'State, 'Request> (r:'Request) =
CachedReflection< 'Request>.tryCastAsStatefulRequest<'Input, 'State> r
|> Option.map (fun y -> y.state)
module TestExtract =
/// Example of service request type
type ServiceRequests =
| Request1 of StatelessRequest<int>
| Request2 of StatefulRequest<unit, string>
let aRequest = ServiceRequests.Request2 { input = (); state = "foo"}
let anotherRequest = ServiceRequests.Request1 { StatelessRequest.input = 1 }
let x = CachedReflection<_>.tryExtractState<unit, string, ServiceRequests> aRequest
let shouldThrow = CachedReflection<_>.tryExtractState<unit, string, ServiceRequests> anotherRequest
open Microsoft.FSharpLu.Actor.StateMachine
open Microsoft.FSharpLu.Actor.StateMachine.Agent
open Microsoft.FSharpLu.Actor.QueueScheduler
/// Each request in wrapped inside an envelope prior to be pushed on a queue
/// Requirement: 'Request must be either of type
/// StatelessRequest<'Input> or StatefulRequest<'Input, 'State>
/// F# weak type system does not allow us to statically enforce this restriction,
/// so unfortunately we can only validate this requirement dynamically via reflection and casting...
type Envelope<'Header, 'Request> =
{
/// Request metadata used to run state machine-based agents
metadata : RequestMetadata option
/// Custom header information
header : 'Header
/// The request itself
request : 'Request // 'Request must be StatelessRequest<'Input> or StatefulRequest<'Input, 'State> for some 'Input and 'State
/// If the request results from a returned callee intruction (i.e. another agent called via Transiation.Call has returned)
/// then this holds the result value returned by the callee.
calleeReturnResult : obj option
}
with
/// Create a new envelop with same request but updated state
/// Requirement: 'Request must be of type StatefulRequest<'Input, 'State>
member envelope.updateState<'Input, 'State> (newState:'State) =
let statefulRequest = CachedReflection< 'Request>.castAsStatefulRequest<'Input, 'State> envelope.request
let cons = CachedReflection<'Request>.getStatefulRequestConstructor<'Input, 'State> envelope.request
{
envelope with request = cons { statefulRequest with state = newState }
}
/// Create a new envelop with same request but updated state and metadata
/// Requirement: 'Request must be of type StatefulRequest<'Input, 'State>
member envelope.updateMetadataAndState<'Input, 'State> (metadata:RequestMetadata) (newState:'State) =
let statefulRequest = CachedReflection<'Request>.castAsStatefulRequest<'Input, 'State> envelope.request
let cons = CachedReflection<'Request>.getStatefulRequestConstructor<'Input, 'State> envelope.request
{
envelope with
request = cons { statefulRequest with state = newState }
metadata = Some metadata
}
/// Embed a callee returned result in the request envelope
member e.embedResult<'t> (result:'t) =
{ e with calleeReturnResult = Some (result :> obj) }
/// Return the callee result cast to type 't
member e.ReturnResult<'t> () =
match e.calleeReturnResult with
| None -> failwith "This request was not produced from a returned callee."
| Some result ->
match result with
| :? 't as r -> r
| _ ->
failwithf "The actual agent return type is %s instead of the expected %s" (result.GetType().FullName) typeof<'t>.FullName
/// Operations that can be performed by a request handler
type Operations<'QueueMessage, 'Header, 'Request>
(
/// The queue scheduling system
queue:IQueueingAPI<'QueueMessage,Envelope<'Header,'Request>>,
/// The envelope of the current request being processed by the handler
envelope:Envelope<'Header,'Request>
) =
/// Spawn a new request with a new request metadata but preserving the same header
member __.spawnNewRequest (r: 'Request) =
queue.post { metadata = None; request = r; header = envelope.header; calleeReturnResult = None }
/// Spawn a request with the specific request metadata and preserving the same header
member __.spawnRequest metadata (r: 'Request) =
queue.post { metadata = metadata; request = r; header = envelope.header; calleeReturnResult = None }
/// Spawn a new request already wrapped in an envelope
member __.spawnRequestEnvelop (e:Envelope<'Header,'Request>) =
queue.post e
/// Return the callee result cast to type 't
member __.ReturnResult<'t> () =
envelope.ReturnResult<'t>()
/// Context parameters passed to every request handler
/// - 'QueueMessage is the underlying queueing system message type
/// - 'Header and 'Request and 'CustomContext are types provided by the API consumer.
/// The first two define how requests get encoded, while 'CustomContext defines customer fields that gets passed to every request handler
[<NoEquality;NoComparison>]
type QueueingContext<'QueueMessage, 'Header, 'Request, 'CustomContext> =
{
/// the underlying queing system.
queue : IQueueingAPI<'QueueMessage, Envelope<'Header, 'Request>>
/// the message that has just been dequeued and need to be processed
queuedMessage : 'QueueMessage
/// the storage system used to record join/fork points
joinStore : Join.IStorage<Envelope<'Header, 'Request>>
/// Custom context defined by the QueueProcessing API user
customContext : 'CustomContext
}
/// A scheduler factory instanciating a scheduling API for any possible state type 'State
type ISchedulerFactory<'QueueMessage, 'Header, 'Request, 'CustomContext> =
abstract create<'Input, 'State> :
QueueingContext<'QueueMessage, 'Header, 'Request, 'CustomContext>
-> Envelope<'Header, 'Request>
-> Scheduler<'State, Envelope<'Header, 'Request>>
/// A scheduler factory based on in-memory queueing system
type InMemorySchedulerFactory<'QueueMessage, 'Header, 'Request, 'CustomContext> () =
interface ISchedulerFactory<'QueueMessage, 'Header, 'Request, 'CustomContext> with
member __.create<'Input, 'State> context (envelope:Envelope<'Header, 'Request>) : Scheduler<'State, Envelope<'Header, 'Request>>=
{
joinStore = context.joinStore
onInProcessSleep = fun delay -> async.Return ()
onGoto = fun state -> async.Return ()
spawn = fun request -> context.queue.post request
embed = fun metadata state -> envelope.updateMetadataAndState<'Input, 'State> metadata state
embedCallReturnValue =
{ new ICallReturnEmbedder<_> with
member __.embed<'t> (result:'t) m = m.embedResult result
}
}
/// Type of a request handler
type Handler<'QueueMessage, 'Header, 'Request, 'CustomContext, 'Result>
= ISchedulerFactory<'QueueMessage, 'Header, 'Request, 'CustomContext>
-> QueueingContext<'QueueMessage, 'Header, 'Request, 'CustomContext>
-> Envelope<'Header,'Request>
-> Async<RequestStatus<Envelope<'Header,'Request>,'Result>>
/// Process a service request by executing a state machine agent via Agent.execute.
/// The function processes a request wrapped in an envelope via a state machine agent
/// built from the provided transition function.
///
/// This helper function can be used to define state-machine handlers for each service requests.
///
/// Requirement 'Request must be a type of the form:
/// type 'Request = ... | SomeRequest of StatefulRequest<'Input, 'State> | ...
/// for any possible value `SomeRequest { input = ...; state = ...}`
/// of `envelope.Request`.
let run
title
tags
/// Ghost parameter: helps infer that the request type
/// is of kind `StatefulRequest<'Input, 'State>`
/// in handlers defined with `run`.
(requestConstructor: StatefulRequest<'Input, 'State> -> 'Request)
(transition: Operations<_,_,_> -> 'Input -> 'State -> Async<Transition<'State, 'Result, Envelope<'Header, 'Request>>>)
: Handler<'QueueMessage, 'Header, 'Request, 'CustomContext, 'Result>
= fun schedulerFactory context envelope ->
async {
let statefulRequest = CachedReflection<'Request>.castAsStatefulRequest<'Input, 'State> envelope.request
let operations = Operations<_,_,_>(context.queue, envelope)
let agent :Agent<'State, 'Result, Envelope<'Header, 'Request>>= {
Agent.title = title
logger = Microsoft.FSharpLu.Logging.TraceTags.info
tags = tags
transition = transition operations statefulRequest.input
maximumInprocessSleep = System.TimeSpan.FromMinutes(5.0)
scheduler = schedulerFactory.create<'Input, 'State> context envelope
}
let! metadata =
match envelope.metadata with
| None -> Agent.createRequest<Envelope<'Header, 'Request>> context.joinStore
| Some m -> async.Return m
let! r = Agent.executeWithResult statefulRequest.state metadata agent
return
match r with
| ExecutionInstruction.Completed result ->
RequestStatus.Completed result
| ExecutionInstruction.Suspended ->
RequestStatus.Suspended
| ExecutionInstruction.Coreturn request ->
RequestStatus.Coreturn request
| ExecutionInstruction.SleepAndResume sleepTime ->
RequestStatus.SleepAndResume sleepTime
| ExecutionInstruction.SleepAndResumeAt (sleepTime, s) ->
RequestStatus.SleepAndResumeWith (sleepTime, envelope.updateState<'Input, 'State> s)
}