Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add handle_ping/1 pool callback #47

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 95 additions & 7 deletions lib/nimble_pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,24 @@ defmodule NimblePool do
pool_state
) :: :ok

@doc """
Handle pings due to pool inactivity.

Runs when the idle pool timer detects no activity for a duration exceeding :pool_idle_timeout (in milliseconds).
A pool is idle if no checkouts or checkins occur within this period.

This callback is optional.
"""
@doc callback: :pool
@callback handle_ping(pool_state) :: {:ok, pool_state} | {:remove, user_reason(), pool_state}

@optional_callbacks init_pool: 1,
handle_checkin: 4,
handle_info: 2,
handle_enqueue: 2,
handle_update: 3,
handle_ping: 2,
handle_ping: 1,
terminate_worker: 3,
terminate_pool: 2,
handle_cancelled: 2
Expand Down Expand Up @@ -377,6 +389,10 @@ defmodule NimblePool do
for each cycle of the `handle_ping/2` optional callback.
Defaults to no limit. See `handle_ping/2` for more details.

* `:pool_idle_timeout` - Timeout in milliseconds to consider the pool idle.
If set, starts a periodic timer at this interval to ping the pool via the
optional `handle_ping/1` callback if there's no activity. Defaults to no timeout.

"""
@spec start_link(keyword) :: GenServer.on_start()
def start_link(opts) when is_list(opts) do
Expand All @@ -389,6 +405,7 @@ defmodule NimblePool do
{lazy, opts} = Keyword.pop(opts, :lazy, false)
{worker_idle_timeout, opts} = Keyword.pop(opts, :worker_idle_timeout, nil)
{max_idle_pings, opts} = Keyword.pop(opts, :max_idle_pings, -1)
{pool_idle_timeout, opts} = Keyword.pop(opts, :pool_idle_timeout, nil)

unless is_atom(worker) do
raise ArgumentError, "worker must be an atom, got: #{inspect(worker)}"
Expand All @@ -400,7 +417,7 @@ defmodule NimblePool do

GenServer.start_link(
__MODULE__,
{worker, arg, pool_size, lazy, worker_idle_timeout, max_idle_pings},
{worker, arg, pool_size, lazy, worker_idle_timeout, max_idle_pings, pool_idle_timeout},
opts
)
end
Expand Down Expand Up @@ -521,7 +538,7 @@ defmodule NimblePool do
## Callbacks

@impl true
def init({worker, arg, pool_size, lazy, worker_idle_timeout, max_idle_pings}) do
def init({worker, arg, pool_size, lazy, worker_idle_timeout, max_idle_pings, pool_idle_timeout}) do
Process.flag(:trap_exit, true)

case Code.ensure_loaded(worker) do
Expand All @@ -544,6 +561,16 @@ defmodule NimblePool do
end
end

if pool_idle_timeout do
if function_exported?(worker, :handle_ping, 1) do
Process.send_after(self(), :check_idle_pool, pool_idle_timeout)
else
IO.warn(
":pool_idle_timeout was given but the worker does not export a handle_ping/1 callback"
)
end
end

with {:ok, pool_state} <- do_init_pool(worker, arg) do
{pool_state, resources, async} =
if is_nil(lazy) do
Expand All @@ -565,7 +592,9 @@ defmodule NimblePool do
state: pool_state,
lazy: lazy,
worker_idle_timeout: worker_idle_timeout,
max_idle_pings: max_idle_pings
max_idle_pings: max_idle_pings,
pool_idle_timeout: pool_idle_timeout,
last_activity_ts: System.monotonic_time(:millisecond)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is exactly my concern. If we have to track this, this is now state necessary on every single nimble pool... but not all of them may need it. It is better if you track this on Finch side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I think the implementation overhead is very low, I agree that not all clients may need it.

So it’s all good with me. Feel free to close this PR and the issue, and I’ll move forward with the implementation on the Finch side. Thanks for the feedback!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for trying it out! And, if the Finch side for some reason is not enough, we already have what is necessary to implement it here. :)

}

{:ok, state}
Expand All @@ -582,11 +611,18 @@ defmodule NimblePool do

case handle_enqueue(worker, command, pool_state) do
{:ok, command, pool_state} ->
{:noreply, maybe_checkout(command, mon_ref, deadline, from, %{state | state: pool_state})}
{:noreply,
maybe_checkout(command, mon_ref, deadline, from, %{
state
| state: pool_state,
last_activity_ts: System.monotonic_time(:millisecond)
})}

{:skip, exception, pool_state} ->
state = remove_request(%{state | state: pool_state}, ref, mon_ref)
{:reply, {:skipped, exception}, state}

{:reply, {:skipped, exception},
%{state | last_activity_ts: System.monotonic_time(:millisecond)}}
end
end

Expand Down Expand Up @@ -637,7 +673,13 @@ defmodule NimblePool do
end

state = remove_request(state, ref, mon_ref)
{:noreply, maybe_checkout(%{state | resources: resources})}

{:noreply,
maybe_checkout(%{
state
| resources: resources,
last_activity_ts: System.monotonic_time(:millisecond)
})}

%{} ->
exit(:unexpected_checkin)
Expand Down Expand Up @@ -723,6 +765,26 @@ defmodule NimblePool do
end
end

def handle_info(:check_idle_pool, state) do
%{
last_activity_ts: last_ts,
pool_idle_timeout: timeout
} = state

Process.send_after(self(), :check_idle_pool, timeout)

diff = System.monotonic_time(:millisecond) - last_ts

if diff > timeout do
case do_check_idle_pool(state) do
{:ok, new_state} -> {:noreply, new_state}
{:stop, reason, new_state} -> {:stop, {:shutdown, reason}, new_state}
end
else
{:noreply, state}
end
end

@impl true
def handle_info(msg, state) do
maybe_handle_info(msg, state)
Expand Down Expand Up @@ -865,7 +927,14 @@ defmodule NimblePool do
GenServer.reply({pid, ref}, worker_client_state)

requests = Map.put(requests, ref, {pid, mon_ref, :state, worker_server_state})
%{state | resources: resources, requests: requests, state: pool_state}

%{
state
| resources: resources,
requests: requests,
state: pool_state,
last_activity_ts: System.monotonic_time(:millisecond)
}

{:remove, reason, pool_state} ->
state = remove_worker(reason, worker_server_state, %{state | state: pool_state})
Expand Down Expand Up @@ -924,6 +993,25 @@ defmodule NimblePool do
end
end

defp do_check_idle_pool(state) do
%{
worker: worker,
state: pool_state
} = state

if function_exported?(worker, :handle_ping, 1) do
case worker.handle_ping(pool_state) do
{:ok, pool_state} ->
{:ok, %{state | state: pool_state}}

{:stop, user_reason, new_state} ->
{:stop, user_reason, %{state | state: new_state}}
end
else
{:ok, state}
end
end

defp check_idle_resources(resources, state) do
now_in_ms = System.monotonic_time(:millisecond)
do_check_idle_resources(resources, now_in_ms, state, :queue.new(), state.max_idle_pings)
Expand Down
145 changes: 144 additions & 1 deletion test/nimble_pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ defmodule NimblePoolTest do
TestAgent.next(pool_state, :handle_ping, [worker_state, pool_state])
end

def handle_ping(pool_state) do
TestAgent.next(pool_state, :handle_ping, [pool_state])
end

def terminate_worker(reason, worker_state, pid) do
TestAgent.next(pid, :terminate_worker, [reason, worker_state, pid])
end
Expand Down Expand Up @@ -1395,7 +1399,7 @@ defmodule NimblePoolTest do
end
end

describe "handle_ping" do
describe "handle_ping/2" do
test "ping only idle workers" do
parent = self()

Expand Down Expand Up @@ -1658,6 +1662,145 @@ defmodule NimblePoolTest do
end
end

describe "handle_ping/1" do
test "ping pool and terminate after timeout" do
parent = self()

stateful_pool!(
[
init_worker: fn next -> {:ok, :worker1, next} end,
init_worker: fn next -> {:ok, :worker2, next} end,
init_worker: fn next -> {:ok, :worker3, next} end,
handle_ping: fn pool_state ->
send(parent, :ping_pool_idle)
{:stop, :my_reason, pool_state}
end,
terminate_worker: fn _reason, _, state -> {:ok, state} end,
terminate_worker: fn _reason, _, state -> {:ok, state} end,
terminate_worker: fn _reason, _, state -> {:ok, state} end,
terminate_pool: fn reason, _state ->
send(parent, {:terminated_pool, reason})
:ok
end
],
pool_size: 3,
pool_idle_timeout: 5
)

assert_receive(:ping_pool_idle)
assert_receive({:terminated_pool, {:shutdown, :my_reason}})
end

test "ping pool and terminate after 3x timeout" do
parent = self()

stateful_pool!(
[
init_worker: fn next -> {:ok, :worker1, next} end,
init_worker: fn next -> {:ok, :worker2, next} end,
init_worker: fn next -> {:ok, :worker3, next} end,
handle_ping: fn pool_state ->
send(parent, :ping_pool_idle_1)
{:ok, pool_state}
end,
handle_ping: fn pool_state ->
send(parent, :ping_pool_idle_2)
{:ok, pool_state}
end,
handle_ping: fn pool_state ->
send(parent, :ping_pool_idle_3)
{:stop, :my_reason, pool_state}
end,
terminate_worker: fn _reason, _, state -> {:ok, state} end,
terminate_worker: fn _reason, _, state -> {:ok, state} end,
terminate_worker: fn _reason, _, state -> {:ok, state} end,
terminate_pool: fn reason, _state ->
send(parent, {:terminated_pool, reason})
:ok
end
],
pool_size: 3,
pool_idle_timeout: 10
)

assert_receive :ping_pool_idle_1, 20
assert_receive :ping_pool_idle_2, 20
assert_receive :ping_pool_idle_3, 20

assert_receive({:terminated_pool, {:shutdown, :my_reason}})
end

test "do not call if pool is not idle" do
parent = self()

{_, pool} =
stateful_pool!(
[
init_worker: fn next ->
send(parent, {:pool_started, System.monotonic_time(:millisecond)})
{:ok, :worker1, next}
end,
init_worker: fn next -> {:ok, :worker2, next} end,
init_worker: fn next -> {:ok, :worker3, next} end,
handle_checkout: fn :checkout, _from, worker_state, pool_state ->
{:ok, :client_state_out, worker_state, pool_state}
end,
handle_checkin: fn :client_state_in, _from, worker_state, pool_state ->
{:ok, worker_state, pool_state}
end,
handle_checkout: fn :checkout, _from, worker_state, pool_state ->
{:ok, :client_state_out, worker_state, pool_state}
end,
handle_checkin: fn :client_state_in, _from, worker_state, pool_state ->
{:ok, worker_state, pool_state}
end,
handle_checkout: fn :checkout, _from, worker_state, pool_state ->
{:ok, :client_state_out, worker_state, pool_state}
end,
handle_checkin: fn :client_state_in, _from, worker_state, pool_state ->
{:ok, worker_state, pool_state}
end,
handle_ping: fn pool_state ->
send(parent, {:ping_pool_idle, System.monotonic_time(:millisecond)})
{:stop, :my_reason, pool_state}
end,
terminate_worker: fn _reason, _, state -> {:ok, state} end,
terminate_worker: fn _reason, _, state -> {:ok, state} end,
terminate_worker: fn _reason, _, state -> {:ok, state} end,
terminate_pool: fn reason, _state ->
send(parent, {:terminated_pool, reason})
:ok
end
],
pool_size: 3,
pool_idle_timeout: 10
)

assert_receive {:pool_started, started_ts}

assert NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out ->
Process.sleep(5)
{:result, :client_state_in}
end) == :result

assert NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out ->
Process.sleep(5)
{:result, :client_state_in}
end) == :result

assert NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out ->
Process.sleep(5)
{:result, :client_state_in}
end) == :result

assert_receive({:ping_pool_idle, idle_ts})

assert_receive({:terminated_pool, {:shutdown, :my_reason}})

assert idle_ts - started_ts > 30
end
end

describe "terminate_pool" do
test "should terminate workers and call parent when terminating" do
parent = self()
Expand Down
Loading