Skip to content

Commit 617816a

Browse files
committed
refactor request queue mechanics
This is a prelude to #159 which introduces upgrade requests, and much of the diff is identical, with a few trivial changes in `Reqd` and a few major changes in `Server_connection`. The goals here are: 1. Make `Reqd` return better types that encapsulate its state instead of requiring the user to probe it with `requires_<input|output>` and `is_complete`. 2. Try to make queue management easier to reason about by folding bits of logic from `advance_request_queue_if_necessary` into `next_read_operation` and `next_write_operation` such that we only perform side-effects when the operation in question demands it. One of the ways I tried to make this easier to reason about was to make the `yield_<reader|writer>` and `next_<read|write>_operation` functions very parallel. For one, the extra logic in `yield_writer` was puzzling. Ideally, if you're calling `yield_writer`, you're doing so because you just called `next_action` and were told to `Yield`, so all of the conditions being checked should not be possible. Looking at the next-operation functions, they both start out with a short-circuit for shutting down when the server can no longer make progress (reader is closed and queue is empty). This doesn't feel like it belongs here. Perhaps this check should be part of `advance_request_queue` with some extra logic triggering in `shutdown_reader`? After that, the next-operation functions use some very simple probing of the input/output state of `Reqd` to determine what to do next. Only in the case of `Complete` do we move into a separate function (to make it easier to read): `_final_<read|write>_operation`. In these functions, we decide if we should shutdown the respective reader/writer or consider the `reqd` complete and move it off the queue. When we do shift it off, we recursively ask for the next operation given the new queue state. In all cases, before we return the result, we wakeup the other side so that it too can evaluate the next operation given the new queue state. Though on the surface, these pieces feel fairly straightforward, there are still a slew of re-entrancy bugs to consider. I think there are two things that we can do to make this drastically easier to manage: 1. We call `t.request_handler` in two places, and this is mostly because we want to keep the invariant that the head of the request queue has already been passed off to the handler. I feel like splitting this up into a simple queue of unhandled requests and a [Reqd.t option] that represents the current request would be easier to manage. 2. It would be nice to schedule calls. Things like waking up the writer before you let the read loop know its next operation just immediately makes my mind fall apart and lose track of state. There's a fairly obvious solution of asking for a `schedule : (unit -> unit) -> unit` function from the runtime that promises to not call the thunk synchronously, but rather waits until it is outside of the read and write loops. But maybe we can solve it using what we have now, like establishing a contract that when the reader/writer is woken up, they must schedule their work for a fresh call stack and not immediately ask for operations. I added a `Queue.clear` to shutdown, not because it was necessary in any sense, but because it was part of `advance_request_queue_if_necessary`, which could have come into play in certain situations where `shutdown` was called from the runtime (e.g., in case of some exception). I would like to note that despite the fact that all tests pass, I have very little confidence in this being correct right now and would like to do some further testing within the actual runtimes.
1 parent e8e8f89 commit 617816a

2 files changed

Lines changed: 64 additions & 55 deletions

File tree

lib/reqd.ml

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -243,15 +243,6 @@ let output_state t : Output_state.t =
243243
| Waiting -> Waiting
244244
;;
245245

246-
let is_complete t =
247-
match input_state t with
248-
| Ready -> false
249-
| Complete ->
250-
(match output_state t with
251-
| Waiting | Ready -> false
252-
| Complete -> true)
253-
;;
254-
255246
let flush_request_body t =
256247
let request_body = request_body t in
257248
if Body.has_pending_output request_body

lib/server_connection.ml

Lines changed: 64 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ let current_reqd_exn t =
8181

8282
let yield_reader t k =
8383
if is_closed t
84-
then failwith "on_wakeup_reader on closed conn"
84+
then failwith "yield_reader on closed conn"
8585
else if Optional_thunk.is_some t.wakeup_reader
8686
then failwith "yield_reader: only one callback can be registered at a time"
8787
else t.wakeup_reader <- Optional_thunk.some k
@@ -155,6 +155,7 @@ let error_code t =
155155
else None
156156

157157
let shutdown t =
158+
Queue.clear t.request_queue;
158159
shutdown_reader t;
159160
shutdown_writer t;
160161
wakeup_reader t;
@@ -182,53 +183,44 @@ let set_error_and_handle ?request t error =
182183
let report_exn t exn =
183184
set_error_and_handle t (`Exn exn)
184185

185-
let advance_request_queue_if_necessary t =
186-
if is_active t then begin
187-
let reqd = current_reqd_exn t in
188-
if Reqd.persistent_connection reqd then begin
189-
if Reqd.is_complete reqd then begin
190-
ignore (Queue.take t.request_queue);
191-
if not (Queue.is_empty t.request_queue)
192-
then t.request_handler (current_reqd_exn t);
193-
wakeup_reader t;
194-
end
195-
end else begin
196-
(* Take the head of the queue, close the remaining request bodies, clear
197-
* the queue, and push the head back on. We do not plan on processing any
198-
* more requests after the current one. *)
199-
ignore (Queue.take t.request_queue);
200-
Queue.iter Reqd.close_request_body t.request_queue;
201-
Queue.clear t.request_queue;
202-
Queue.push reqd t.request_queue;
203-
if Reqd.is_complete reqd
204-
then shutdown t
205-
else
206-
match Reqd.input_state reqd with
207-
| Ready -> ()
208-
| Complete -> shutdown_reader t
209-
end
210-
end else if Reader.is_closed t.reader
211-
then shutdown t
212-
213-
let _next_read_operation t =
214-
advance_request_queue_if_necessary t;
215-
if is_active t
216-
then (
186+
let advance_request_queue t =
187+
ignore (Queue.take t.request_queue);
188+
if not (Queue.is_empty t.request_queue)
189+
then t.request_handler (Queue.peek_exn t.request_queue);
190+
;;
191+
192+
let rec _next_read_operation t =
193+
if not (is_active t) then (
194+
if Reader.is_closed t.reader
195+
then shutdown t;
196+
Reader.next t.reader
197+
) else (
217198
let reqd = current_reqd_exn t in
218199
match Reqd.input_state reqd with
219200
| Ready -> Reader.next t.reader
220-
| Complete ->
221-
if Reqd.persistent_connection reqd
222-
then `Yield
223-
else (
224-
shutdown_reader t;
225-
Reader.next t.reader)
201+
| Complete -> _final_read_operation_for t reqd
226202
)
227-
else Reader.next t.reader
203+
204+
and _final_read_operation_for t reqd =
205+
let next =
206+
if not (Reqd.persistent_connection reqd) then (
207+
shutdown_reader t;
208+
Reader.next t.reader;
209+
) else (
210+
match Reqd.output_state reqd with
211+
| Waiting | Ready -> `Yield
212+
| Complete ->
213+
advance_request_queue t;
214+
_next_read_operation t;
215+
)
216+
in
217+
wakeup_writer t;
218+
next
228219
;;
229220

230221
let next_read_operation t =
231222
match _next_read_operation t with
223+
(* XXX(dpatti): These two [`Error _] constructors are never returned *)
232224
| `Error (`Parse _) -> set_error_and_handle t `Bad_request; `Close
233225
| `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close
234226
| (`Read | `Yield | `Close) as operation -> operation
@@ -259,13 +251,39 @@ let read t bs ~off ~len =
259251
let read_eof t bs ~off ~len =
260252
read_with_more t bs ~off ~len Complete
261253

262-
let next_write_operation t =
263-
advance_request_queue_if_necessary t;
264-
if is_active t
265-
then (
254+
let rec _next_write_operation t =
255+
if not (is_active t) then (
256+
if Reader.is_closed t.reader
257+
then shutdown t;
258+
Writer.next t.writer
259+
) else (
266260
let reqd = current_reqd_exn t in
267-
Reqd.flush_response_body reqd);
268-
Writer.next t.writer
261+
match Reqd.output_state reqd with
262+
| Waiting -> `Yield
263+
| Ready ->
264+
Reqd.flush_response_body reqd;
265+
Writer.next t.writer
266+
| Complete -> _final_write_operation_for t reqd
267+
)
268+
269+
and _final_write_operation_for t reqd =
270+
let next =
271+
if not (Reqd.persistent_connection reqd) then (
272+
shutdown_writer t;
273+
Writer.next t.writer;
274+
) else (
275+
match Reqd.input_state reqd with
276+
| Ready -> assert false
277+
| Complete ->
278+
advance_request_queue t;
279+
_next_write_operation t;
280+
)
281+
in
282+
wakeup_reader t;
283+
next
284+
;;
285+
286+
let next_write_operation t = _next_write_operation t
269287

270288
let report_write_result t result =
271289
Writer.report_result t.writer result

0 commit comments

Comments
 (0)