Skip to content

Commit

Permalink
Multiple amqp consumer publisher (#575)
Browse files Browse the repository at this point in the history
* Convert amqp consumer and publisher code in macros

* Adapt surrounding code

* Add operations configurations queues

* Add individual consumer and producers per context

* Adapt tests

* Bump gen_rmq version to 5.0.1

* Improve queue names

* Use module alias instead of constant
  • Loading branch information
arbulu89 authored Feb 19, 2025
1 parent e19d45b commit 7f1f955
Show file tree
Hide file tree
Showing 22 changed files with 286 additions and 136 deletions.
49 changes: 36 additions & 13 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,50 @@ import Config
config :wanda, Wanda.Messaging.Publisher, adapter: Wanda.Messaging.Adapters.AMQP

config :wanda,
children: [Wanda.Messaging.Adapters.AMQP.Publisher, Wanda.Messaging.Adapters.AMQP.Consumer]
children: [
Wanda.Executions.Messaging.Publisher,
Wanda.Executions.Messaging.Consumer,
Wanda.Operations.Messaging.Publisher,
Wanda.Operations.Messaging.Consumer
]

config :wanda, :messaging, adapter: Wanda.Messaging.Adapters.AMQP

config :wanda, Wanda.Messaging.Adapters.AMQP,
consumer: [
queue: "trento.checks.executions",
exchange: "trento.checks",
routing_key: "executions",
prefetch_count: "10",
connection: "amqp://wanda:wanda@localhost:5672"
checks: [
consumer: [
queue: "trento.checks.executions",
exchange: "trento.checks",
routing_key: "executions",
prefetch_count: "10",
connection: "amqp://wanda:wanda@localhost:5672"
],
publisher: [
exchange: "trento.checks",
connection: "amqp://wanda:wanda@localhost:5672"
],
processor: Wanda.Messaging.Adapters.AMQP.Processor
],
publisher: [
exchange: "trento.checks",
connection: "amqp://wanda:wanda@localhost:5672"
],
processor: Wanda.Messaging.Adapters.AMQP.Processor
operations: [
consumer: [
queue: "trento.operations.requests",
exchange: "trento.operations",
routing_key: "requests",
prefetch_count: "10",
connection: "amqp://wanda:wanda@localhost:5672"
],
publisher: [
exchange: "trento.operations",
connection: "amqp://wanda:wanda@localhost:5672"
],
processor: Wanda.Messaging.Adapters.AMQP.Processor
]

config :wanda, Wanda.Catalog, catalog_paths: ["priv/catalog"]

config :wanda, Wanda.Policy, execution_server_impl: Wanda.Executions.Server
config :wanda, Wanda.Policy,
execution_server_impl: Wanda.Executions.Server,
operation_server_impl: Wanda.Operations.Server

# Phoenix configuration

Expand Down
18 changes: 14 additions & 4 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,21 @@ config :wanda, WandaWeb.Endpoint,
watchers: []

config :wanda, Wanda.Messaging.Adapters.AMQP,
consumer: [
connection: "amqp://wanda:wanda@localhost:5674"
checks: [
consumer: [
connection: "amqp://wanda:wanda@localhost:5674"
],
publisher: [
connection: "amqp://wanda:wanda@localhost:5674"
]
],
publisher: [
connection: "amqp://wanda:wanda@localhost:5674"
operations: [
consumer: [
connection: "amqp://wanda:wanda@localhost:5674"
],
publisher: [
connection: "amqp://wanda:wanda@localhost:5674"
]
]

# ## SSL Support
Expand Down
23 changes: 17 additions & 6 deletions config/prod.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,24 @@ config :wanda, WandaWeb.Endpoint,
config :logger, level: :info

config :wanda, Wanda.Messaging.Adapters.AMQP,
consumer: [
connection: "amqp://wanda:wanda@localhost:5672"
checks: [
consumer: [
connection: "amqp://wanda:wanda@localhost:5672"
],
publisher: [
connection: "amqp://wanda:wanda@localhost:5672"
],
processor: Wanda.Messaging.Adapters.AMQP.Processor
],
publisher: [
connection: "amqp://wanda:wanda@localhost:5672"
],
processor: Wanda.Messaging.Adapters.AMQP.Processor
operations: [
consumer: [
connection: "amqp://wanda:wanda@localhost:5672"
],
publisher: [
connection: "amqp://wanda:wanda@localhost:5672"
],
processor: Wanda.Messaging.Adapters.AMQP.Processor
]

config :wanda,
operations_enabled: false
Expand Down
18 changes: 14 additions & 4 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,21 @@ if config_env() in [:prod, :demo] do
"""

config :wanda, Wanda.Messaging.Adapters.AMQP,
consumer: [
connection: amqp_url
checks: [
consumer: [
connection: amqp_url
],
publisher: [
connection: amqp_url
]
],
publisher: [
connection: amqp_url
operations: [
consumer: [
connection: amqp_url
],
publisher: [
connection: amqp_url
]
]

cors_enabled = System.get_env("CORS_ENABLED", "true") == "true"
Expand Down
67 changes: 48 additions & 19 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,58 @@ config :wanda, Wanda.Catalog,
config :wanda, :messaging, adapter: Wanda.Messaging.Adapters.AMQP

config :wanda, Wanda.Messaging.Adapters.AMQP,
consumer: [
queue: "trento.test.checks.executions",
exchange: "trento.test.checks",
routing_key: "executions",
prefetch_count: "10",
connection: "amqp://wanda:wanda@localhost:5674",
queue_options: [
durable: false,
auto_delete: true
checks: [
consumer: [
queue: "trento.test.checks.executions",
exchange: "trento.test.checks",
routing_key: "executions",
prefetch_count: "10",
connection: "amqp://wanda:wanda@localhost:5674",
queue_options: [
durable: false,
auto_delete: true
],
deadletter_queue_options: [
durable: false,
auto_delete: true
]
],
deadletter_queue_options: [
durable: false,
auto_delete: true
]
],
publisher: [
exchange: "trento.test.checks",
connection: "amqp://wanda:wanda@localhost:5674"
publisher: [
exchange: "trento.test.checks",
connection: "amqp://wanda:wanda@localhost:5674"
],
processor: GenRMQ.Processor.Mock
],
processor: GenRMQ.Processor.Mock
operations: [
consumer: [
queue: "trento.test.operations.requests",
exchange: "trento.test.operations",
routing_key: "requests",
prefetch_count: "10",
connection: "amqp://wanda:wanda@localhost:5674",
queue_options: [
durable: false,
auto_delete: true
],
deadletter_queue_options: [
durable: false,
auto_delete: true
]
],
publisher: [
exchange: "trento.test.operations",
connection: "amqp://wanda:wanda@localhost:5674"
],
processor: GenRMQ.Processor.Mock
]

config :wanda,
children: [Wanda.Messaging.Adapters.AMQP.Publisher, Wanda.Messaging.Adapters.AMQP.Consumer]
children: [
Wanda.Executions.Messaging.Publisher,
Wanda.Executions.Messaging.Consumer,
Wanda.Operations.Messaging.Publisher,
Wanda.Operations.Messaging.Consumer
]

config :joken,
access_token_signer: "s2ZdE+3+ke1USHEJ5O45KT364KiXPYaB9cJPdH3p60t8yT0nkLexLBNw8TFSzC7k"
Expand Down
7 changes: 5 additions & 2 deletions demo/fake_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ defmodule Wanda.Executions.FakeServer do
}

@default_config [sleep: 2_000]

@publisher Wanda.Executions.Messaging.Publisher

@impl true
def start_execution(
execution_id,
Expand All @@ -35,7 +38,7 @@ defmodule Wanda.Executions.FakeServer do

Executions.create_execution!(execution_id, group_id, targets)
execution_started = Messaging.Mapper.to_execution_started(execution_id, group_id, targets)
:ok = Messaging.publish("results", execution_started)
:ok = Messaging.publish(@publisher, "results", execution_started)

Process.sleep(Keyword.get(config, :sleep, 2_000))

Expand All @@ -56,7 +59,7 @@ defmodule Wanda.Executions.FakeServer do

execution_completed = Messaging.Mapper.to_execution_completed(evaluation_result, target_type)

:ok = Messaging.publish("results", execution_completed)
:ok = Messaging.publish(@publisher, "results", execution_completed)
end

@impl true
Expand Down
7 changes: 7 additions & 0 deletions lib/wanda/executions/messaging/consumer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
defmodule Wanda.Executions.Messaging.Consumer do
@moduledoc """
Executions messagging consumer module
"""

use Wanda.Messaging.Adapters.AMQP.Consumer, id: __MODULE__, name: :checks
end
7 changes: 7 additions & 0 deletions lib/wanda/executions/messaging/publisher.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
defmodule Wanda.Executions.Messaging.Publisher do
@moduledoc """
Executions messagging publisher module
"""

use Wanda.Messaging.Adapters.AMQP.Publisher, id: __MODULE__, name: :checks
end
8 changes: 5 additions & 3 deletions lib/wanda/executions/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ defmodule Wanda.Executions.Server do

alias Wanda.EvaluationEngine

alias Wanda.Executions.Messaging.Publisher

require Logger

@default_target_type "cluster"
Expand Down Expand Up @@ -104,8 +106,8 @@ defmodule Wanda.Executions.Server do

Executions.create_execution!(execution_id, group_id, targets)

:ok = Messaging.publish("results", execution_started)
:ok = Messaging.publish("agents", facts_gathering_requested)
:ok = Messaging.publish(Publisher, "results", execution_started)
:ok = Messaging.publish(Publisher, "agents", facts_gathering_requested)

Process.send_after(self(), :timeout, timeout)

Expand Down Expand Up @@ -214,7 +216,7 @@ defmodule Wanda.Executions.Server do
target_type = Map.get(env, "target_type", @default_target_type)

execution_completed = Messaging.Mapper.to_execution_completed(result, target_type)
:ok = Messaging.publish("results", execution_completed)
:ok = Messaging.publish(Publisher, "results", execution_completed)
end

defp via_tuple(group_id),
Expand Down
6 changes: 3 additions & 3 deletions lib/wanda/messaging.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ defmodule Wanda.Messaging do
Publishes messages to the message bus
"""

@spec publish(String.t(), any()) :: :ok | {:error, any()}
def publish(topic, message) do
adapter().publish(topic, message)
@spec publish(module(), String.t(), any()) :: :ok | {:error, any()}
def publish(publisher, topic, message) do
adapter().publish(publisher, topic, message)
end

defp adapter,
Expand Down
7 changes: 2 additions & 5 deletions lib/wanda/messaging/adapters/amqp.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@ defmodule Wanda.Messaging.Adapters.AMQP do

@behaviour Wanda.Messaging.Adapters.Behaviour

alias Wanda.Messaging.Adapters.AMQP.Publisher

@impl true

def publish(routing_key, message) do
def publish(publisher, routing_key, message) do
message
|> Trento.Contracts.to_event(source: "github.com/trento-project/wanda")
|> Publisher.publish_message(routing_key)
|> publisher.publish_message(routing_key)
end
end
Loading

0 comments on commit 7f1f955

Please sign in to comment.