Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for before_consume option #139

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions lib/broadway_rabbitmq/amqp_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,25 @@ defmodule BroadwayRabbitMQ.AmqpClient do
the `:declare` and `:bindings` options (described above).
"""
],
before_consume: [
type: {:fun, 1},
doc: """
A function that takes the AMQP channel that the producer
is connected to and can run arbitrary setup. This function can return
`:ok` if everything went well or `{:error, reason}`. In the error case then
the producer will consider the connection failed and will try to reconnect
later (same behavior as when the connection drops, for example).
This function is run **before** the consumer starts consuming messages, but
**after** the queue is declared and the bindings are set up. Until this function
returns, the messages will keep queueing up in the RabbitMQ. This function is useful
to execute additional logic before we start processing messages, for example, to
load existing state from any database or to wait for some external system to
be ready. This could make sure that the messages are not lost before the system
is fully ready to process them. Make sure that this function does not block for
too long, otherwise, the RabbitMQ server might consider the connection dead or the
messages can get expired if configured for the queue.
"""
],
consume_options: [
type: :keyword_list,
default: [],
Expand Down Expand Up @@ -194,7 +213,8 @@ defmodule BroadwayRabbitMQ.AmqpClient do
qos: Keyword.fetch!(opts, :qos),
metadata: Keyword.fetch!(opts, :metadata),
consume_options: Keyword.fetch!(opts, :consume_options),
after_connect: Keyword.get(opts, :after_connect, fn _channel -> :ok end)
after_connect: Keyword.get(opts, :after_connect, fn _channel -> :ok end),
before_consume: Keyword.get(opts, :before_consume, fn _channel -> :ok end)
}}
else
{:error, %NimbleOptions.ValidationError{} = error} -> {:error, Exception.message(error)}
Expand Down Expand Up @@ -372,9 +392,29 @@ defmodule BroadwayRabbitMQ.AmqpClient do
end

@impl true
def consume(channel, %{queue: queue, consume_options: consume_options} = _config) do
{:ok, consumer_tag} = Basic.consume(channel, queue, _consumer_pid = self(), consume_options)
consumer_tag
def consume(channel, %{queue: queue, consume_options: consume_options} = config) do
case call_before_consume(config, channel) do
:ok ->
Basic.consume(channel, queue, _consumer_pid = self(), consume_options)

{:error, reason} ->
close_channel(config, channel)
{:error, reason}
end
end

defp call_before_consume(config, channel) do
case config.before_consume.(channel) do
:ok ->
:ok

{:error, reason} ->
{:error, reason}

other ->
close_channel(config, channel)
raise "unexpected return value from the :before_consume function: #{inspect(other)}"
end
end

@impl true
Expand Down
2 changes: 1 addition & 1 deletion lib/broadway_rabbitmq/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ defmodule BroadwayRabbitMQ.Producer do
# We monitor the channel but link to the connection (in the client, not here).
channel_ref = Process.monitor(channel.pid)
backoff = backoff && Backoff.reset(backoff)
consumer_tag = client.consume(channel, config)
{:ok, consumer_tag} = client.consume(channel, config)

%{
state
Expand Down
2 changes: 1 addition & 1 deletion lib/broadway_rabbitmq/rabbitmq_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule BroadwayRabbitMQ.RabbitmqClient do
@callback ack(channel :: Channel.t(), delivery_tag :: Basic.delivery_tag()) :: any
@callback reject(channel :: Channel.t(), delivery_tag :: Basic.delivery_tag(), opts :: keyword) ::
any
@callback consume(channel :: Channel.t(), config) :: Basic.consumer_tag()
@callback consume(channel :: Channel.t(), config) :: {:ok, Basic.consumer_tag()} | {:error, any}
@callback cancel(channel :: Channel.t(), Basic.consumer_tag()) :: :ok | Basic.error()
@callback close_connection(config, channel :: Channel.t()) :: :ok | {:error, any}
end
24 changes: 22 additions & 2 deletions test/broadway_rabbitmq/ampq_client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ defmodule BroadwayRabbitMQ.AmqpClientTest do
bindings: [],
declare_opts: nil,
queue: "queue",
after_connect: after_connect
after_connect: after_connect,
before_consume: before_consume
}} = AmqpClient.init(queue: "queue")

assert after_connect.(:channel) == :ok
assert before_consume.(:channel) == :ok
end

test "connection name" do
Expand All @@ -25,6 +27,7 @@ defmodule BroadwayRabbitMQ.AmqpClientTest do
describe "validate init options" do
test "supported options" do
after_connect = fn _ -> :ok end
before_consume = fn _ -> :ok end

connection = [
username: nil,
Expand All @@ -51,6 +54,7 @@ defmodule BroadwayRabbitMQ.AmqpClientTest do
connection: connection,
qos: qos,
after_connect: after_connect,
before_consume: before_consume,
consume_options: [no_ack: true, exclusive: false]
]

Expand All @@ -67,6 +71,7 @@ defmodule BroadwayRabbitMQ.AmqpClientTest do
declare_opts: nil,
queue: "queue",
after_connect: after_connect,
before_consume: before_consume,
consume_options: [no_ack: true, exclusive: false]
}}
end
Expand Down Expand Up @@ -347,6 +352,21 @@ defmodule BroadwayRabbitMQ.AmqpClientTest do
message = "unexpected return value from the :after_connect function: :bad_return_value"
assert_raise RuntimeError, message, fn -> AmqpClient.setup_channel(config) end
end

@tag :capture_log
test "raises if :before_consume returns a bad value" do
{:ok, config} =
AmqpClient.init(
queue: "",
declare: [auto_delete: true],
before_consume: fn _channel -> :bad_return_value end
)

assert {:ok, %AMQP.Channel{} = channel} = AmqpClient.setup_channel(config)

message = "unexpected return value from the :before_consume function: :bad_return_value"
assert_raise RuntimeError, message, fn -> AmqpClient.consume(channel, config) end
end
end

@tag :integration
Expand All @@ -358,7 +378,7 @@ defmodule BroadwayRabbitMQ.AmqpClientTest do
assert {:ok, %AMQP.Channel{} = channel} = AmqpClient.setup_channel(config)

# Consume from the queue.
consumer_tag = AmqpClient.consume(channel, config)
{:ok, consumer_tag} = AmqpClient.consume(channel, config)
assert_receive {:basic_consume_ok, %{consumer_tag: ^consumer_tag}}

# Publish a message and ack it.
Expand Down
2 changes: 1 addition & 1 deletion test/broadway_rabbitmq/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ defmodule BroadwayRabbitMQ.ProducerTest do
@impl true
def consume(_channel, _config) do
send(self(), {:basic_consume_ok, %{consumer_tag: :fake_consumer_tag}})
:fake_consumer_tag
{:ok, :fake_consumer_tag}
end

@impl true
Expand Down
Loading