From f6dbceed26acc7f1780e96d6f9b50e8db5d31075 Mon Sep 17 00:00:00 2001 From: Gabriel Oliveira Date: Wed, 20 Mar 2024 21:58:07 -0300 Subject: [PATCH 1/2] feat: implement handle_cancelled/3 worker callback --- lib/nimble_pool.ex | 47 ++++++++++++- test/nimble_pool_test.exs | 136 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 181 insertions(+), 2 deletions(-) diff --git a/lib/nimble_pool.ex b/lib/nimble_pool.ex index d76dafe..8951ef2 100644 --- a/lib/nimble_pool.ex +++ b/lib/nimble_pool.ex @@ -297,6 +297,29 @@ defmodule NimblePool do pool_state ) :: :ok + @doc """ + Handle cancelled checkout requests. + + This callback is executed when a checkout request is cancelled unexpectedly. + + The context argument may be `:queued` or `:checked_out`: + + * `:queued` means the cancellation happened before resource checkout. This may happen + when the pool is starving under load and can not serve resources. Since no checkout + happened the worker_state argument will be `nil`. + + * `:checked_out` means the cancellation happened after resource checkout. This may happen + when the function given to `checkout!/4` raises. + + This callback is optional. + """ + @doc callback: :worker + @callback handle_cancelled( + worker_state :: worker_state | nil, + pool_state, + context :: :queued | :checked_out + ) :: :ok + @optional_callbacks init_pool: 1, handle_checkin: 4, handle_info: 2, @@ -304,7 +327,8 @@ defmodule NimblePool do handle_update: 3, handle_ping: 2, terminate_worker: 3, - terminate_pool: 2 + terminate_pool: 2, + handle_cancelled: 3 @doc """ Defines a pool to be started under the supervision tree. @@ -745,19 +769,38 @@ defmodule NimblePool do {:noreply, %{state | resources: resources, async: async, state: pool_state}} end - defp cancel_request_ref(ref, reason, %{requests: requests} = state) do + defp cancel_request_ref( + ref, + reason, + %{requests: requests, worker: worker, state: pool_state} = state + ) do case requests do # Exited or timed out before we could serve it %{^ref => {_, mon_ref, :command, _command, _deadline}} -> + if function_exported?(worker, :handle_cancelled, 3) do + args = [nil, pool_state, :queued] + apply_worker_callback(worker, :handle_cancelled, args) + end + {:noreply, remove_request(state, ref, mon_ref)} # Exited or errored during client processing %{^ref => {_, mon_ref, :state, worker_server_state}} -> + if function_exported?(worker, :handle_cancelled, 3) do + args = [worker_server_state, pool_state, :checked_out] + apply_worker_callback(worker, :handle_cancelled, args) + end + state = remove_request(state, ref, mon_ref) {:noreply, remove_worker(reason, worker_server_state, state)} # The client timed out, sent us a message, and we dropped the deadlined request %{} -> + if function_exported?(worker, :handle_cancelled, 3) do + args = [nil, pool_state, :queued] + apply_worker_callback(worker, :handle_cancelled, args) + end + {:noreply, state} end end diff --git a/test/nimble_pool_test.exs b/test/nimble_pool_test.exs index dcea5ab..7a20403 100644 --- a/test/nimble_pool_test.exs +++ b/test/nimble_pool_test.exs @@ -107,6 +107,10 @@ defmodule NimblePoolTest do def terminate_pool(reason, pool_state) do TestAgent.next(pool_state.state, :terminate_pool, [reason, pool_state]) end + + def handle_cancelled(worker_state, pool_state, context) do + TestAgent.next(pool_state, :handle_cancelled, [worker_state, pool_state, context]) + end end defp stateless_pool!(instructions, opts \\ []) do @@ -1019,6 +1023,7 @@ defmodule NimblePoolTest do handle_checkout: fn :checkout, _from, _next, pool_state -> {:ok, :client_state_out, :server_state_out, pool_state} end, + handle_cancelled: fn :server_state_out, _pool_state, :checked_out -> :ok end, terminate_worker: fn reason, :server_state_out, state -> send(parent, {:terminate, reason}) {:ok, state} @@ -1054,6 +1059,7 @@ defmodule NimblePoolTest do handle_update: fn :update, _next, pool_state -> {:ok, :updated_state, pool_state} end, + handle_cancelled: fn :server_state_out, _pool_state, :checked_out -> :ok end, terminate_worker: fn reason, :updated_state, pool_state -> send(parent, {:terminate, reason}) {:ok, pool_state} @@ -1680,4 +1686,134 @@ defmodule NimblePoolTest do assert termination_time_pool > termination_time_worker end end + + describe "handle_cancelled" do + test "should run when client raise after checkout" do + parent = self() + + {_, pool} = + stateful_pool!( + [ + init_worker: fn next -> {:ok, :worker1, next} end, + handle_checkout: fn :checkout, _from, worker_state, pool_state -> + {:ok, :client_state_out, worker_state, pool_state} + end, + handle_cancelled: fn worker_state, _pool_state, :checked_out -> + send(parent, {:ping, worker_state}) + :ok + end, + terminate_worker: fn _reason, _, state -> {:ok, state} end + ], + pool_size: 1 + ) + + assert_raise( + RuntimeError, + fn -> + NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out -> + raise "unexpected error" + end) + end + ) + + assert_receive({:ping, :worker1}) + + NimblePool.stop(pool, :shutdown) + end + + test "should run when checkout timeout and known request ref" do + parent = self() + + {_, pool} = + stateful_pool!( + [ + init_worker: fn next -> {:ok, :worker1, next} end, + handle_checkout: fn :checkout, _from, worker_state, pool_state -> + {:ok, :client_state_out, worker_state, pool_state} + end, + handle_cancelled: fn nil, _pool_state, :queued -> + send(parent, {:ping, nil}) + :ok + end, + handle_checkin: fn :client_state_in, _from, next, pool_state -> + {:ok, next, pool_state} + end, + terminate_worker: fn _reason, _, state -> {:ok, state} end + ], + pool_size: 1 + ) + + task1 = + Task.async(fn -> + NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out -> + send(parent, :lock) + assert_receive :release + {:result, :client_state_in} + end) + end) + + assert_receive :lock + + assert {:timeout, {NimblePool, :checkout, _}} = + catch_exit( + NimblePool.checkout!( + pool, + :checkout, + fn _ref, :client_state_out -> raise "should never execute this line" end, + 1 + ) + ) + + send(task1.pid, :release) + + assert_receive({:ping, nil}) + + NimblePool.stop(pool, :shutdown) + end + + test "should run when checkout timeout and unkown request ref" do + parent = self() + + {_, pool} = + stateful_pool!( + [ + init_worker: fn next -> {:ok, :worker1, next} end, + handle_cancelled: fn nil, _pool_state, :queued -> + send(parent, {:ping, nil}) + :ok + end, + handle_checkout: fn :checkout, _from, worker_state, pool_state -> + {:ok, :client_state_out, worker_state, pool_state} + end, + handle_checkin: fn :client_state_in, _from, next, pool_state -> + {:ok, next, pool_state} + end, + terminate_worker: fn _reason, _, state -> {:ok, state} end + ], + pool_size: 1 + ) + + :sys.suspend(pool) + + assert {:timeout, {NimblePool, :checkout, _}} = + catch_exit( + NimblePool.checkout!( + pool, + :checkout, + fn _ref, :client_state_out -> raise "should never execute this line" end, + 1 + ) + ) + + :sys.resume(pool) + + assert_receive({:ping, nil}) + + assert NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out -> + {:result, :client_state_in} + end) == :result + + NimblePool.stop(pool, :shutdown) + end + end end From 15923e4d666a55083bd8e077430229f0dfb42d86 Mon Sep 17 00:00:00 2001 From: Gabriel Oliveira Date: Thu, 21 Mar 2024 18:48:40 -0300 Subject: [PATCH 2/2] chore: remove worker state from handle cancelled callback --- lib/nimble_pool.ex | 24 +++++++++++------------- test/nimble_pool_test.exs | 26 +++++++++++++------------- 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/lib/nimble_pool.ex b/lib/nimble_pool.ex index 8951ef2..3920892 100644 --- a/lib/nimble_pool.ex +++ b/lib/nimble_pool.ex @@ -305,19 +305,17 @@ defmodule NimblePool do The context argument may be `:queued` or `:checked_out`: * `:queued` means the cancellation happened before resource checkout. This may happen - when the pool is starving under load and can not serve resources. Since no checkout - happened the worker_state argument will be `nil`. + when the pool is starving under load and can not serve resources. * `:checked_out` means the cancellation happened after resource checkout. This may happen when the function given to `checkout!/4` raises. This callback is optional. """ - @doc callback: :worker + @doc callback: :pool @callback handle_cancelled( - worker_state :: worker_state | nil, - pool_state, - context :: :queued | :checked_out + context :: :queued | :checked_out, + pool_state ) :: :ok @optional_callbacks init_pool: 1, @@ -328,7 +326,7 @@ defmodule NimblePool do handle_ping: 2, terminate_worker: 3, terminate_pool: 2, - handle_cancelled: 3 + handle_cancelled: 2 @doc """ Defines a pool to be started under the supervision tree. @@ -777,8 +775,8 @@ defmodule NimblePool do case requests do # Exited or timed out before we could serve it %{^ref => {_, mon_ref, :command, _command, _deadline}} -> - if function_exported?(worker, :handle_cancelled, 3) do - args = [nil, pool_state, :queued] + if function_exported?(worker, :handle_cancelled, 2) do + args = [:queued, pool_state] apply_worker_callback(worker, :handle_cancelled, args) end @@ -786,8 +784,8 @@ defmodule NimblePool do # Exited or errored during client processing %{^ref => {_, mon_ref, :state, worker_server_state}} -> - if function_exported?(worker, :handle_cancelled, 3) do - args = [worker_server_state, pool_state, :checked_out] + if function_exported?(worker, :handle_cancelled, 2) do + args = [:checked_out, pool_state] apply_worker_callback(worker, :handle_cancelled, args) end @@ -796,8 +794,8 @@ defmodule NimblePool do # The client timed out, sent us a message, and we dropped the deadlined request %{} -> - if function_exported?(worker, :handle_cancelled, 3) do - args = [nil, pool_state, :queued] + if function_exported?(worker, :handle_cancelled, 2) do + args = [:queued, pool_state] apply_worker_callback(worker, :handle_cancelled, args) end diff --git a/test/nimble_pool_test.exs b/test/nimble_pool_test.exs index 7a20403..cd8de78 100644 --- a/test/nimble_pool_test.exs +++ b/test/nimble_pool_test.exs @@ -108,8 +108,8 @@ defmodule NimblePoolTest do TestAgent.next(pool_state.state, :terminate_pool, [reason, pool_state]) end - def handle_cancelled(worker_state, pool_state, context) do - TestAgent.next(pool_state, :handle_cancelled, [worker_state, pool_state, context]) + def handle_cancelled(context, pool_state) do + TestAgent.next(pool_state, :handle_cancelled, [context, pool_state]) end end @@ -1023,7 +1023,7 @@ defmodule NimblePoolTest do handle_checkout: fn :checkout, _from, _next, pool_state -> {:ok, :client_state_out, :server_state_out, pool_state} end, - handle_cancelled: fn :server_state_out, _pool_state, :checked_out -> :ok end, + handle_cancelled: fn :checked_out, _pool_state -> :ok end, terminate_worker: fn reason, :server_state_out, state -> send(parent, {:terminate, reason}) {:ok, state} @@ -1059,7 +1059,7 @@ defmodule NimblePoolTest do handle_update: fn :update, _next, pool_state -> {:ok, :updated_state, pool_state} end, - handle_cancelled: fn :server_state_out, _pool_state, :checked_out -> :ok end, + handle_cancelled: fn :checked_out, _pool_state -> :ok end, terminate_worker: fn reason, :updated_state, pool_state -> send(parent, {:terminate, reason}) {:ok, pool_state} @@ -1698,8 +1698,8 @@ defmodule NimblePoolTest do handle_checkout: fn :checkout, _from, worker_state, pool_state -> {:ok, :client_state_out, worker_state, pool_state} end, - handle_cancelled: fn worker_state, _pool_state, :checked_out -> - send(parent, {:ping, worker_state}) + handle_cancelled: fn :checked_out, _pool_state -> + send(parent, :ping) :ok end, terminate_worker: fn _reason, _, state -> {:ok, state} end @@ -1716,7 +1716,7 @@ defmodule NimblePoolTest do end ) - assert_receive({:ping, :worker1}) + assert_receive(:ping) NimblePool.stop(pool, :shutdown) end @@ -1731,8 +1731,8 @@ defmodule NimblePoolTest do handle_checkout: fn :checkout, _from, worker_state, pool_state -> {:ok, :client_state_out, worker_state, pool_state} end, - handle_cancelled: fn nil, _pool_state, :queued -> - send(parent, {:ping, nil}) + handle_cancelled: fn :queued, _pool_state -> + send(parent, :ping) :ok end, handle_checkin: fn :client_state_in, _from, next, pool_state -> @@ -1766,7 +1766,7 @@ defmodule NimblePoolTest do send(task1.pid, :release) - assert_receive({:ping, nil}) + assert_receive(:ping) NimblePool.stop(pool, :shutdown) end @@ -1778,8 +1778,8 @@ defmodule NimblePoolTest do stateful_pool!( [ init_worker: fn next -> {:ok, :worker1, next} end, - handle_cancelled: fn nil, _pool_state, :queued -> - send(parent, {:ping, nil}) + handle_cancelled: fn :queued, _pool_state -> + send(parent, :ping) :ok end, handle_checkout: fn :checkout, _from, worker_state, pool_state -> @@ -1807,7 +1807,7 @@ defmodule NimblePoolTest do :sys.resume(pool) - assert_receive({:ping, nil}) + assert_receive(:ping) assert NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out -> {:result, :client_state_in}