diff --git a/lib/nimble_pool.ex b/lib/nimble_pool.ex index 3920892..28af782 100644 --- a/lib/nimble_pool.ex +++ b/lib/nimble_pool.ex @@ -318,12 +318,24 @@ defmodule NimblePool do pool_state ) :: :ok + @doc """ + Handle pings due to pool inactivity. + + Runs when the idle pool timer detects no activity for a duration exceeding :pool_idle_timeout (in milliseconds). + A pool is idle if no checkouts or checkins occur within this period. + + This callback is optional. + """ + @doc callback: :pool + @callback handle_ping(pool_state) :: {:ok, pool_state} | {:remove, user_reason(), pool_state} + @optional_callbacks init_pool: 1, handle_checkin: 4, handle_info: 2, handle_enqueue: 2, handle_update: 3, handle_ping: 2, + handle_ping: 1, terminate_worker: 3, terminate_pool: 2, handle_cancelled: 2 @@ -377,6 +389,10 @@ defmodule NimblePool do for each cycle of the `handle_ping/2` optional callback. Defaults to no limit. See `handle_ping/2` for more details. + * `:pool_idle_timeout` - Timeout in milliseconds to consider the pool idle. + If set, starts a periodic timer at this interval to ping the pool via the + optional `handle_ping/1` callback if there's no activity. Defaults to no timeout. + """ @spec start_link(keyword) :: GenServer.on_start() def start_link(opts) when is_list(opts) do @@ -389,6 +405,7 @@ defmodule NimblePool do {lazy, opts} = Keyword.pop(opts, :lazy, false) {worker_idle_timeout, opts} = Keyword.pop(opts, :worker_idle_timeout, nil) {max_idle_pings, opts} = Keyword.pop(opts, :max_idle_pings, -1) + {pool_idle_timeout, opts} = Keyword.pop(opts, :pool_idle_timeout, nil) unless is_atom(worker) do raise ArgumentError, "worker must be an atom, got: #{inspect(worker)}" @@ -400,7 +417,7 @@ defmodule NimblePool do GenServer.start_link( __MODULE__, - {worker, arg, pool_size, lazy, worker_idle_timeout, max_idle_pings}, + {worker, arg, pool_size, lazy, worker_idle_timeout, max_idle_pings, pool_idle_timeout}, opts ) end @@ -521,7 +538,7 @@ defmodule NimblePool do ## Callbacks @impl true - def init({worker, arg, pool_size, lazy, worker_idle_timeout, max_idle_pings}) do + def init({worker, arg, pool_size, lazy, worker_idle_timeout, max_idle_pings, pool_idle_timeout}) do Process.flag(:trap_exit, true) case Code.ensure_loaded(worker) do @@ -544,6 +561,16 @@ defmodule NimblePool do end end + if pool_idle_timeout do + if function_exported?(worker, :handle_ping, 1) do + Process.send_after(self(), :check_idle_pool, pool_idle_timeout) + else + IO.warn( + ":pool_idle_timeout was given but the worker does not export a handle_ping/1 callback" + ) + end + end + with {:ok, pool_state} <- do_init_pool(worker, arg) do {pool_state, resources, async} = if is_nil(lazy) do @@ -565,7 +592,9 @@ defmodule NimblePool do state: pool_state, lazy: lazy, worker_idle_timeout: worker_idle_timeout, - max_idle_pings: max_idle_pings + max_idle_pings: max_idle_pings, + pool_idle_timeout: pool_idle_timeout, + last_activity_ts: System.monotonic_time(:millisecond) } {:ok, state} @@ -582,11 +611,18 @@ defmodule NimblePool do case handle_enqueue(worker, command, pool_state) do {:ok, command, pool_state} -> - {:noreply, maybe_checkout(command, mon_ref, deadline, from, %{state | state: pool_state})} + {:noreply, + maybe_checkout(command, mon_ref, deadline, from, %{ + state + | state: pool_state, + last_activity_ts: System.monotonic_time(:millisecond) + })} {:skip, exception, pool_state} -> state = remove_request(%{state | state: pool_state}, ref, mon_ref) - {:reply, {:skipped, exception}, state} + + {:reply, {:skipped, exception}, + %{state | last_activity_ts: System.monotonic_time(:millisecond)}} end end @@ -637,7 +673,13 @@ defmodule NimblePool do end state = remove_request(state, ref, mon_ref) - {:noreply, maybe_checkout(%{state | resources: resources})} + + {:noreply, + maybe_checkout(%{ + state + | resources: resources, + last_activity_ts: System.monotonic_time(:millisecond) + })} %{} -> exit(:unexpected_checkin) @@ -723,6 +765,26 @@ defmodule NimblePool do end end + def handle_info(:check_idle_pool, state) do + %{ + last_activity_ts: last_ts, + pool_idle_timeout: timeout + } = state + + Process.send_after(self(), :check_idle_pool, timeout) + + diff = System.monotonic_time(:millisecond) - last_ts + + if diff > timeout do + case do_check_idle_pool(state) do + {:ok, new_state} -> {:noreply, new_state} + {:stop, reason, new_state} -> {:stop, {:shutdown, reason}, new_state} + end + else + {:noreply, state} + end + end + @impl true def handle_info(msg, state) do maybe_handle_info(msg, state) @@ -865,7 +927,14 @@ defmodule NimblePool do GenServer.reply({pid, ref}, worker_client_state) requests = Map.put(requests, ref, {pid, mon_ref, :state, worker_server_state}) - %{state | resources: resources, requests: requests, state: pool_state} + + %{ + state + | resources: resources, + requests: requests, + state: pool_state, + last_activity_ts: System.monotonic_time(:millisecond) + } {:remove, reason, pool_state} -> state = remove_worker(reason, worker_server_state, %{state | state: pool_state}) @@ -924,6 +993,25 @@ defmodule NimblePool do end end + defp do_check_idle_pool(state) do + %{ + worker: worker, + state: pool_state + } = state + + if function_exported?(worker, :handle_ping, 1) do + case worker.handle_ping(pool_state) do + {:ok, pool_state} -> + {:ok, %{state | state: pool_state}} + + {:stop, user_reason, new_state} -> + {:stop, user_reason, %{state | state: new_state}} + end + else + {:ok, state} + end + end + defp check_idle_resources(resources, state) do now_in_ms = System.monotonic_time(:millisecond) do_check_idle_resources(resources, now_in_ms, state, :queue.new(), state.max_idle_pings) diff --git a/test/nimble_pool_test.exs b/test/nimble_pool_test.exs index cd8de78..342731f 100644 --- a/test/nimble_pool_test.exs +++ b/test/nimble_pool_test.exs @@ -100,6 +100,10 @@ defmodule NimblePoolTest do TestAgent.next(pool_state, :handle_ping, [worker_state, pool_state]) end + def handle_ping(pool_state) do + TestAgent.next(pool_state, :handle_ping, [pool_state]) + end + def terminate_worker(reason, worker_state, pid) do TestAgent.next(pid, :terminate_worker, [reason, worker_state, pid]) end @@ -1395,7 +1399,7 @@ defmodule NimblePoolTest do end end - describe "handle_ping" do + describe "handle_ping/2" do test "ping only idle workers" do parent = self() @@ -1658,6 +1662,145 @@ defmodule NimblePoolTest do end end + describe "handle_ping/1" do + test "ping pool and terminate after timeout" do + parent = self() + + stateful_pool!( + [ + init_worker: fn next -> {:ok, :worker1, next} end, + init_worker: fn next -> {:ok, :worker2, next} end, + init_worker: fn next -> {:ok, :worker3, next} end, + handle_ping: fn pool_state -> + send(parent, :ping_pool_idle) + {:stop, :my_reason, pool_state} + end, + terminate_worker: fn _reason, _, state -> {:ok, state} end, + terminate_worker: fn _reason, _, state -> {:ok, state} end, + terminate_worker: fn _reason, _, state -> {:ok, state} end, + terminate_pool: fn reason, _state -> + send(parent, {:terminated_pool, reason}) + :ok + end + ], + pool_size: 3, + pool_idle_timeout: 5 + ) + + assert_receive(:ping_pool_idle) + assert_receive({:terminated_pool, {:shutdown, :my_reason}}) + end + + test "ping pool and terminate after 3x timeout" do + parent = self() + + stateful_pool!( + [ + init_worker: fn next -> {:ok, :worker1, next} end, + init_worker: fn next -> {:ok, :worker2, next} end, + init_worker: fn next -> {:ok, :worker3, next} end, + handle_ping: fn pool_state -> + send(parent, :ping_pool_idle_1) + {:ok, pool_state} + end, + handle_ping: fn pool_state -> + send(parent, :ping_pool_idle_2) + {:ok, pool_state} + end, + handle_ping: fn pool_state -> + send(parent, :ping_pool_idle_3) + {:stop, :my_reason, pool_state} + end, + terminate_worker: fn _reason, _, state -> {:ok, state} end, + terminate_worker: fn _reason, _, state -> {:ok, state} end, + terminate_worker: fn _reason, _, state -> {:ok, state} end, + terminate_pool: fn reason, _state -> + send(parent, {:terminated_pool, reason}) + :ok + end + ], + pool_size: 3, + pool_idle_timeout: 10 + ) + + assert_receive :ping_pool_idle_1, 20 + assert_receive :ping_pool_idle_2, 20 + assert_receive :ping_pool_idle_3, 20 + + assert_receive({:terminated_pool, {:shutdown, :my_reason}}) + end + + test "do not call if pool is not idle" do + parent = self() + + {_, pool} = + stateful_pool!( + [ + init_worker: fn next -> + send(parent, {:pool_started, System.monotonic_time(:millisecond)}) + {:ok, :worker1, next} + end, + init_worker: fn next -> {:ok, :worker2, next} end, + init_worker: fn next -> {:ok, :worker3, next} end, + handle_checkout: fn :checkout, _from, worker_state, pool_state -> + {:ok, :client_state_out, worker_state, pool_state} + end, + handle_checkin: fn :client_state_in, _from, worker_state, pool_state -> + {:ok, worker_state, pool_state} + end, + handle_checkout: fn :checkout, _from, worker_state, pool_state -> + {:ok, :client_state_out, worker_state, pool_state} + end, + handle_checkin: fn :client_state_in, _from, worker_state, pool_state -> + {:ok, worker_state, pool_state} + end, + handle_checkout: fn :checkout, _from, worker_state, pool_state -> + {:ok, :client_state_out, worker_state, pool_state} + end, + handle_checkin: fn :client_state_in, _from, worker_state, pool_state -> + {:ok, worker_state, pool_state} + end, + handle_ping: fn pool_state -> + send(parent, {:ping_pool_idle, System.monotonic_time(:millisecond)}) + {:stop, :my_reason, pool_state} + end, + terminate_worker: fn _reason, _, state -> {:ok, state} end, + terminate_worker: fn _reason, _, state -> {:ok, state} end, + terminate_worker: fn _reason, _, state -> {:ok, state} end, + terminate_pool: fn reason, _state -> + send(parent, {:terminated_pool, reason}) + :ok + end + ], + pool_size: 3, + pool_idle_timeout: 10 + ) + + assert_receive {:pool_started, started_ts} + + assert NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out -> + Process.sleep(5) + {:result, :client_state_in} + end) == :result + + assert NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out -> + Process.sleep(5) + {:result, :client_state_in} + end) == :result + + assert NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out -> + Process.sleep(5) + {:result, :client_state_in} + end) == :result + + assert_receive({:ping_pool_idle, idle_ts}) + + assert_receive({:terminated_pool, {:shutdown, :my_reason}}) + + assert idle_ts - started_ts > 30 + end + end + describe "terminate_pool" do test "should terminate workers and call parent when terminating" do parent = self()