Skip to content

Commit eea757d

Browse files
author
ada0
committed
merge ada2k/httpaf#expose-write-failures-in-flush
1 parent e46d522 commit eea757d

9 files changed

Lines changed: 211 additions & 75 deletions

File tree

examples/lwt/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
(executables
22
(libraries h1 h1-lwt-unix h1_examples base stdio lwt lwt.unix)
33
(optional true)
4-
(names lwt_get lwt_post lwt_echo_post lwt_echo_upgrade))
4+
(names lwt_get lwt_post lwt_echo_post lwt_echo_upgrade lwt_chunked))
55

66
(alias
77
(name runtest)

examples/lwt/lwt_chunked.ml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
open Base
2+
open Lwt.Infix
3+
module Arg = Stdlib.Arg
4+
5+
open H1_lwt_unix
6+
7+
let request_handler (_ : Unix.sockaddr) reqd =
8+
let body = H1.Reqd.respond_with_streaming reqd (H1.Response.create ~headers:(H1.Headers.of_list ["connection", "close"]) `OK) in
9+
let rec respond_loop i =
10+
H1.Body.Writer.write_string body (Printf.sprintf "Chunk %i\n" i);
11+
H1.Body.Writer.flush_with_reason body (function
12+
| `Closed -> Stdio.print_endline "closed"
13+
| `Written -> Stdio.print_endline "written"; Lwt.bind (Lwt_unix.sleep 5.) (fun () -> respond_loop (i+1)) |> ignore
14+
);
15+
Lwt.return_unit
16+
in ignore (respond_loop 0)
17+
18+
let error_handler (_ : Unix.sockaddr) = H1_examples.Server.error_handler
19+
20+
let main port =
21+
let listen_address = Unix.(ADDR_INET (inet_addr_loopback, port)) in
22+
Lwt.async (fun () ->
23+
Lwt_io.establish_server_with_client_socket
24+
listen_address
25+
(Server.create_connection_handler ~upgrade_handler:None ~request_handler ~error_handler)
26+
>|= fun _server ->
27+
Stdio.printf "Listening on port %i.\n" port);
28+
let forever, _ = Lwt.wait () in
29+
Lwt_main.run forever
30+
;;
31+
32+
let () =
33+
Stdlib.Sys.set_signal Stdlib.Sys.sigpipe Stdlib.Sys.Signal_ignore;
34+
let port = ref 8080 in
35+
Arg.parse
36+
["-p", Arg.Set_int port, " Listening port number (8080 by default)"]
37+
ignore
38+
"Echoes POST requests. Runs forever.";
39+
main !port
40+
;;

lib/body.ml

Lines changed: 71 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -102,53 +102,78 @@ module Reader = struct
102102
end
103103

104104
module Writer = struct
105+
module Writer = Serialize.Writer
106+
105107
type encoding =
106108
| Identity
107109
| Chunked of { mutable written_final_chunk : bool }
108110

109111
type t =
110-
{ faraday : Faraday.t
111-
; encoding : encoding
112-
; when_ready_to_write : unit -> unit
113-
; buffered_bytes : int ref
112+
{ faraday : Faraday.t
113+
; writer : Writer.t
114+
; encoding : encoding
115+
; buffered_bytes : int ref
114116
}
115117

116-
let of_faraday faraday ~encoding ~when_ready_to_write =
118+
let of_faraday faraday writer ~encoding =
117119
let encoding =
118120
match encoding with
119121
| `Fixed _ | `Close_delimited -> Identity
120122
| `Chunked -> Chunked { written_final_chunk = false }
121123
in
122124
{ faraday
123125
; encoding
124-
; when_ready_to_write
126+
; writer
125127
; buffered_bytes = ref 0
126128
}
127129

128-
let create buffer ~encoding ~when_ready_to_write =
129-
of_faraday (Faraday.of_bigstring buffer) ~encoding ~when_ready_to_write
130+
let create buffer writer ~encoding =
131+
of_faraday (Faraday.of_bigstring buffer) writer ~encoding
130132

131133
let write_char t c =
132-
Faraday.write_char t.faraday c
134+
if not (Faraday.is_closed t.faraday) then
135+
Faraday.write_char t.faraday c
133136

134137
let write_string t ?off ?len s =
135-
Faraday.write_string ?off ?len t.faraday s
138+
if not (Faraday.is_closed t.faraday) then
139+
Faraday.write_string ?off ?len t.faraday s
136140

137141
let write_bigstring t ?off ?len b =
138-
Faraday.write_bigstring ?off ?len t.faraday b
142+
if not (Faraday.is_closed t.faraday) then
143+
Faraday.write_bigstring ?off ?len t.faraday b
139144

140145
let schedule_bigstring t ?off ?len (b:Bigstringaf.t) =
141-
Faraday.schedule_bigstring ?off ?len t.faraday b
146+
if not (Faraday.is_closed t.faraday) then
147+
Faraday.schedule_bigstring ?off ?len t.faraday b
142148

143-
let ready_to_write t = t.when_ready_to_write ()
149+
let ready_to_write t = Writer.wakeup t.writer
144150

145151
let flush t kontinue =
146152
Faraday.flush t.faraday kontinue;
147153
ready_to_write t
148154

155+
let flush_with_reason t kontinue =
156+
if Writer.is_closed t.writer then
157+
kontinue `Closed
158+
else begin
159+
Faraday.flush_with_reason t.faraday (fun reason ->
160+
let result =
161+
match reason with
162+
| Nothing_pending | Shift -> `Written
163+
| Drain -> `Closed
164+
in
165+
kontinue result);
166+
ready_to_write t
167+
end
168+
149169
let is_closed t =
150170
Faraday.is_closed t.faraday
151171

172+
let close_and_drain t =
173+
Faraday.close t.faraday;
174+
(* Resolve all pending flushes *)
175+
ignore (Faraday.drain t.faraday : int)
176+
152177
let close t =
153178
Faraday.close t.faraday;
154179
ready_to_write t;
@@ -166,33 +191,39 @@ module Writer = struct
166191
in
167192
faraday_has_output || additional_encoding_output
168193

169-
let transfer_to_writer t writer =
194+
let transfer_to_writer t =
170195
let faraday = t.faraday in
171-
begin match Faraday.operation faraday with
172-
| `Yield -> ()
173-
| `Close ->
174-
(match t.encoding with
175-
| Identity -> ()
176-
| Chunked ({ written_final_chunk } as chunked) ->
177-
if not written_final_chunk then begin
178-
chunked.written_final_chunk <- true;
179-
Serialize.Writer.schedule_chunk writer [];
180-
end);
181-
Serialize.Writer.unyield writer;
182-
| `Writev iovecs ->
183-
let buffered = t.buffered_bytes in
184-
begin match IOVec.shiftv iovecs !buffered with
185-
| [] -> ()
186-
| iovecs ->
187-
let lengthv = IOVec.lengthv iovecs in
188-
buffered := !buffered + lengthv;
189-
begin match t.encoding with
190-
| Identity -> Serialize.Writer.schedule_fixed writer iovecs
191-
| Chunked _ -> Serialize.Writer.schedule_chunk writer iovecs
192-
end;
193-
Serialize.Writer.flush writer (fun () ->
194-
Faraday.shift faraday lengthv;
195-
buffered := !buffered - lengthv)
196-
end
196+
if Writer.is_closed t.writer then
197+
close_and_drain t
198+
else begin
199+
match Faraday.operation faraday with
200+
| `Yield -> ()
201+
| `Close ->
202+
(match t.encoding with
203+
| Identity -> ()
204+
| Chunked ({ written_final_chunk } as chunked) ->
205+
if not written_final_chunk then begin
206+
chunked.written_final_chunk <- true;
207+
Serialize.Writer.schedule_chunk t.writer [];
208+
end);
209+
Serialize.Writer.unyield t.writer;
210+
| `Writev iovecs ->
211+
let buffered = t.buffered_bytes in
212+
begin match IOVec.shiftv iovecs !buffered with
213+
| [] -> ()
214+
| iovecs ->
215+
let lengthv = IOVec.lengthv iovecs in
216+
buffered := !buffered + lengthv;
217+
begin match t.encoding with
218+
| Identity -> Serialize.Writer.schedule_fixed t.writer iovecs
219+
| Chunked _ -> Serialize.Writer.schedule_chunk t.writer iovecs
220+
end;
221+
Serialize.Writer.flush t.writer (fun result ->
222+
match result with
223+
| `Closed -> close_and_drain t
224+
| `Written ->
225+
Faraday.shift faraday lengthv;
226+
buffered := !buffered - lengthv)
227+
end
197228
end
198229
end

lib/client_connection.ml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ module Oneshot = struct
7171
| `Error `Bad_request ->
7272
failwith "H1.Client_connection.request: invalid body length"
7373
in
74-
Body.Writer.create (Bigstringaf.create config.request_body_buffer_size)
75-
~encoding ~when_ready_to_write:(fun () -> Writer.wakeup writer)
74+
Body.Writer.create (Bigstringaf.create config.request_body_buffer_size) writer
75+
~encoding
7676
in
7777
let t =
7878
{ request
@@ -89,7 +89,7 @@ module Oneshot = struct
8989

9090
let flush_request_body t =
9191
if Body.Writer.has_pending_output t.request_body
92-
then Body.Writer.transfer_to_writer t.request_body t.writer
92+
then Body.Writer.transfer_to_writer t.request_body
9393
;;
9494

9595
let set_error_and_handle_without_shutdown t error =

lib/h1.mli

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -494,23 +494,29 @@ module Body : sig
494494
modified until a subsequent call to {!flush} has successfully
495495
completed. *)
496496

497-
val flush : t -> (unit -> unit) -> unit
498-
(** [flush t f] makes all bytes in [t] available for writing to the awaiting
499-
output channel. Once those bytes have reached that output channel, [f]
500-
will be called.
497+
val flush_with_reason : t -> ([ `Written | `Closed ] -> unit) -> unit
498+
(** [flush_with_reason t f] makes all bytes in [t] available for writing to the awaiting output
499+
channel. Once those bytes have reached that output channel, [f `Written] will be
500+
called. If instead, the output channel is closed before all of those bytes are
501+
successfully written, [f `Closed] will be called.
501502
502503
The type of the output channel is runtime-dependent, as are guarantees
503504
about whether those packets have been queued for delivery or have
504505
actually been received by the intended recipient. *)
505506

507+
val flush: t -> (unit -> unit) -> unit
508+
(** [flush t f] is identical to [flush_with_reason t], except ignoring the result of the flush.
509+
In most situations, you should use flush_with_reason and properly handle a closed output channel. *)
510+
506511
val close : t -> unit
507512
(** [close t] closes [t], causing subsequent write calls to raise. If
508513
[t] is writable, this will cause any pending output to become available
509514
to the output channel. *)
510515

511516
val is_closed : t -> bool
512-
(** [is_closed t] is [true] if {!close} has been called on [t] and [false]
513-
otherwise. A closed [t] may still have pending output. *)
517+
(** [is_closed t] is [true] if {!close} has been called on [t], or if the attached
518+
output channel is closed (e.g. because [report_write_result `Closed] has been
519+
called). A closed [t] may still have pending output. *)
514520
end
515521

516522
end

lib/reqd.ml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,7 @@ let unsafe_respond_with_streaming ~flush_headers_immediately t response =
175175
"H1.Reqd.respond_with_streaming: invalid response body length"
176176
in
177177
let response_body =
178-
Body.Writer.create t.response_body_buffer ~encoding
179-
~when_ready_to_write:(fun () -> Writer.wakeup t.writer)
178+
Body.Writer.create t.response_body_buffer t.writer ~encoding
180179
in
181180
Writer.write_response t.writer response;
182181
if t.persistent then
@@ -288,6 +287,5 @@ let flush_request_body t =
288287

289288
let flush_response_body t =
290289
match t.response_state with
291-
| Streaming (_, response_body) ->
292-
Body.Writer.transfer_to_writer response_body t.writer
290+
| Streaming (_, response_body) -> Body.Writer.transfer_to_writer response_body
293291
| _ -> ()

lib/serialize.ml

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -89,18 +89,18 @@ let schedule_bigstring_chunk t chunk =
8989
module Writer = struct
9090
type t =
9191
{ buffer : Bigstringaf.t
92-
(* The buffer that the encoder uses for buffered writes. Managed by the
93-
* control module for the encoder. *)
92+
(* The buffer that the encoder uses for buffered writes. Managed by the
93+
* control module for the encoder. *)
9494
; encoder : Faraday.t
95-
(* The encoder that handles encoding for writes. Uses the [buffer]
96-
* referenced above internally. *)
95+
(* The encoder that handles encoding for writes. Uses the [buffer]
96+
* referenced above internally. *)
9797
; mutable drained_bytes : int
98-
(* The number of bytes that were not written due to the output stream
99-
* being closed before all buffered output could be written. Useful for
100-
* detecting error cases. *)
98+
(* The number of bytes that were not written due to the output stream
99+
* being closed before all buffered output could be written. Useful for
100+
* detecting error cases. *)
101101
; mutable wakeup : Optional_thunk.t
102-
(* The callback from the runtime to be invoked when output is ready to be
103-
* flushed. *)
102+
(* The callback from the runtime to be invoked when output is ready to be
103+
* flushed. *)
104104
}
105105

106106
let create ?(buffer_size=0x800) () =
@@ -158,13 +158,19 @@ module Writer = struct
158158
;;
159159

160160
let flush t f =
161-
flush t.encoder f
161+
flush_with_reason t.encoder (fun reason ->
162+
let result =
163+
match reason with
164+
| Nothing_pending | Shift -> `Written
165+
| Drain -> `Closed
166+
in
167+
f result)
162168

163169
let unyield t =
164170
(* This would be better implemented by a function that just takes the
165171
encoder out of a yielded state if it's in that state. Requires a change
166172
to the faraday library. *)
167-
flush t (fun () -> ())
173+
flush t (fun _result -> ())
168174

169175
let yield t =
170176
Faraday.yield t.encoder

lib/server_connection.ml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,9 @@ let set_error_and_handle ?request t error =
177177
"H1.Server_connection.error_handler: invalid response body \
178178
length"
179179
in
180-
Body.Writer.of_faraday (Writer.faraday writer) ~encoding
181-
~when_ready_to_write:(fun () -> Writer.wakeup writer)))
180+
Body.Writer.of_faraday (Writer.faraday writer) writer ~encoding
181+
)
182+
)
182183

183184
let report_exn t exn = set_error_and_handle t (`Exn exn)
184185

0 commit comments

Comments
 (0)