From 1b30873f80583727363f12a5d6c64b794726d3a4 Mon Sep 17 00:00:00 2001 From: Mat Trudel Date: Tue, 16 Jan 2024 10:51:05 -0500 Subject: [PATCH] Track response metrics on chunked responses (#291) --- lib/bandit/http1/adapter.ex | 23 ++++++++++++++++++++--- lib/bandit/http2/adapter.ex | 12 ++++++++++-- lib/bandit/pipeline.ex | 9 +++++++-- lib/bandit/telemetry.ex | 6 ++---- mix.lock | 2 +- test/bandit/http1/request_test.exs | 7 ++++--- test/bandit/http2/plug_test.exs | 29 +++++++++++++++++++++++++++++ 7 files changed, 73 insertions(+), 15 deletions(-) diff --git a/lib/bandit/http1/adapter.ex b/lib/bandit/http1/adapter.ex index a4155528..aeee1546 100644 --- a/lib/bandit/http1/adapter.ex +++ b/lib/bandit/http1/adapter.ex @@ -495,9 +495,26 @@ defmodule Bandit.HTTP1.Adapter do end @impl Plug.Conn.Adapter - def chunk(%__MODULE__{write_state: :chunking_out, socket: socket}, chunk) do - byte_size = chunk |> IO.iodata_length() |> Integer.to_string(16) - ThousandIsland.Socket.send(socket, [byte_size, "\r\n", chunk, "\r\n"]) + def chunk(%__MODULE__{write_state: :chunking_out} = req, chunk) do + byte_size = chunk |> IO.iodata_length() + payload = [Integer.to_string(byte_size, 16), "\r\n", chunk, "\r\n"] + + case ThousandIsland.Socket.send(req.socket, payload) do + :ok -> + metrics = Map.update(req.metrics, :resp_body_bytes, byte_size, &(&1 + byte_size)) + + metrics = + if byte_size == 0 do + Map.put(metrics, :resp_end_time, Bandit.Telemetry.monotonic_time()) + else + metrics + end + + {:ok, nil, %{req | metrics: metrics}} + + {:error, reason} -> + {:error, reason} + end end def chunk(_, _), do: :ok diff --git a/lib/bandit/http2/adapter.ex b/lib/bandit/http2/adapter.ex index 3a168ba2..28cf9ed5 100644 --- a/lib/bandit/http2/adapter.ex +++ b/lib/bandit/http2/adapter.ex @@ -231,8 +231,16 @@ defmodule Bandit.HTTP2.Adapter do # Moreover, if the caller is chunking out on a HEAD, 204 or 304 response, the underlying # stream will have been closed in send_chunked/3 above, and so this call will return an # `{:error, :not_owner}` error here (which we ignore, but it's still kinda odd) - _ = send_data(adapter, chunk, IO.iodata_length(chunk) == 0) - :ok + + byte_size = chunk |> IO.iodata_length() + adapter = send_data(adapter, chunk, byte_size == 0) + + if byte_size == 0 do + metrics = Map.put(adapter.metrics, :resp_end_time, Bandit.Telemetry.monotonic_time()) + {:ok, nil, %{adapter | metrics: metrics}} + else + {:ok, nil, adapter} + end end @impl Plug.Conn.Adapter diff --git a/lib/bandit/pipeline.ex b/lib/bandit/pipeline.ex index 5d01d57e..803fb8d4 100644 --- a/lib/bandit/pipeline.ex +++ b/lib/bandit/pipeline.ex @@ -148,8 +148,13 @@ defmodule Bandit.Pipeline do Plug.Conn.send_resp(conn) %Plug.Conn{state: :chunked, adapter: {mod, req}} -> - mod.chunk(req, "") - conn + req = + case mod.chunk(req, "") do + {:ok, _, req} -> req + _ -> req + end + + %{conn | adapter: {mod, req}} %Plug.Conn{} -> conn diff --git a/lib/bandit/telemetry.ex b/lib/bandit/telemetry.ex index 33aab2fc..68e2b77b 100644 --- a/lib/bandit/telemetry.ex +++ b/lib/bandit/telemetry.ex @@ -41,15 +41,13 @@ defmodule Bandit.Telemetry do breaks. Not included for HTTP/2 requests * `req_body_bytes`: The length of the request body, in octets * `resp_start_time`: The time that the response started, in `:native` units - * `resp_end_time`: The time that the response completed, in `:native` units. Not included - for chunked responses + * `resp_end_time`: The time that the response completed, in `:native` units * `resp_line_bytes`: The length of the response line, in octets. Includes all line breaks. Not included for HTTP/2 requests * `resp_header_bytes`: The length of the response headers, in octets. Includes all line breaks. Not included for HTTP/2 requests * `resp_body_bytes`: The length of the response body, in octets. If the response is - compressed, this is the size of the compressed payload as sent on the wire. Set to 0 for - chunked responses + compressed, this is the size of the compressed payload as sent on the wire * `resp_uncompressed_body_bytes`: The length of the original, uncompressed body. Only included for responses which are compressed * `resp_compression_method`: The method of compression, as sent in the `Content-Encoding` diff --git a/mix.lock b/mix.lock index ebcf6c97..dd452fd2 100644 --- a/mix.lock +++ b/mix.lock @@ -20,7 +20,7 @@ "nimble_options": {:hex, :nimble_options, "1.1.0", "3b31a57ede9cb1502071fade751ab0c7b8dbe75a9a4c2b5bbb0943a690b63172", [:mix], [], "hexpm", "8bbbb3941af3ca9acc7835f5655ea062111c9c27bcac53e004460dfd19008a99"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "nimble_pool": {:hex, :nimble_pool, "1.0.0", "5eb82705d138f4dd4423f69ceb19ac667b3b492ae570c9f5c900bb3d2f50a847", [:mix], [], "hexpm", "80be3b882d2d351882256087078e1b1952a28bf98d0a287be87e4a24a710b67a"}, - "plug": {:hex, :plug, "1.15.2", "94cf1fa375526f30ff8770837cb804798e0045fd97185f0bb9e5fcd858c792a3", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "02731fa0c2dcb03d8d21a1d941bdbbe99c2946c0db098eee31008e04c6283615"}, + "plug": {:hex, :plug, "1.15.3", "712976f504418f6dff0a3e554c40d705a9bcf89a7ccef92fc6a5ef8f16a30a97", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cc4365a3c010a56af402e0809208873d113e9c38c401cabd88027ef4f5c01fd2"}, "plug_crypto": {:hex, :plug_crypto, "2.0.0", "77515cc10af06645abbfb5e6ad7a3e9714f805ae118fa1a70205f80d2d70fe73", [:mix], [], "hexpm", "53695bae57cc4e54566d993eb01074e4d894b65a3766f1c43e2c61a1b0f45ea9"}, "req": {:hex, :req, "0.4.8", "2b754a3925ddbf4ad78c56f30208ced6aefe111a7ea07fb56c23dccc13eb87ae", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.9", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "7146e51d52593bb7f20d00b5308a5d7d17d663d6e85cd071452b613a8277100c"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, diff --git a/test/bandit/http1/request_test.exs b/test/bandit/http1/request_test.exs index 01dc3c15..ca049821 100644 --- a/test/bandit/http1/request_test.exs +++ b/test/bandit/http1/request_test.exs @@ -1252,7 +1252,7 @@ defmodule HTTP1RequestTest do |> send_chunked(200) |> chunk("OK") - # This is a pretty bogus wayr to get an error out of socket sending, but it's easy to set up + # This is a pretty bogus way to get an error out of socket sending, but it's easy to set up {_, adapter} = conn.adapter ThousandIsland.Socket.close(adapter.socket) @@ -1709,8 +1709,9 @@ defmodule HTTP1RequestTest do req_header_bytes: 49, resp_line_bytes: 17, resp_header_bytes: 119, - resp_body_bytes: 0, - resp_start_time: integer() + resp_body_bytes: 2, + resp_start_time: integer(), + resp_end_time: integer() }, %{ connection_telemetry_span_context: reference(), diff --git a/test/bandit/http2/plug_test.exs b/test/bandit/http2/plug_test.exs index 25cf1ef6..07291d56 100644 --- a/test/bandit/http2/plug_test.exs +++ b/test/bandit/http2/plug_test.exs @@ -704,6 +704,35 @@ defmodule HTTP2PlugTest do ] end + test "it should add resp metrics to `stop` events for chunked responses", context do + {:ok, collector_pid} = + start_supervised({Bandit.TelemetryCollector, [[:bandit, :request, :stop]]}) + + Req.get!(context.req, url: "/chunk_test") + + assert Bandit.TelemetryCollector.get_events(collector_pid) + ~> [ + {[:bandit, :request, :stop], + %{ + monotonic_time: integer(), + duration: integer(), + req_header_end_time: integer(), + resp_body_bytes: 4, + resp_start_time: integer(), + resp_end_time: integer() + }, + %{ + connection_telemetry_span_context: reference(), + telemetry_span_context: reference(), + conn: struct_like(Plug.Conn, []), + status: 200, + method: "GET", + request_target: {"https", "localhost", integer(), "/chunk_test"}, + stream_id: integer() + }} + ] + end + test "it should add resp metrics to `stop` events for sendfile responses", context do {:ok, collector_pid} = start_supervised({Bandit.TelemetryCollector, [[:bandit, :request, :stop]]})