Skip to content

Commit 2dfee88

Browse files
committed
contribution of new py-amqp/asyncio based sensor, bump to v1.1.2
1 parent e79bced commit 2dfee88

File tree

8 files changed

+599
-5
lines changed

8 files changed

+599
-5
lines changed

CHANGES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
# Change Log
2+
# 1.1.2
3+
- Added new py-amqp based sensor using asyncio (3.8) for concurrency. Details on why: https://github.com/StackStorm/st2/discussions/5743
4+
- Added pip dependency for `amqp==5.0.6` (py-amqp)
25

36
# 1.1.1
47
- Updated pip dependency to pika `1.3.x` to support python >= 3.7

README.md

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
Pack which allows integration with [RabbitMQ](http://www.rabbitmq.com/).
44

55
## Configuration
6-
7-
Configuration is required to use the RabbitMQ sensor. Copy the example configuration
6+
### rabbitmq.RabbitMQQueueSensor Sensor
7+
Configuration is required to use the `pika` based RabbitMQ sensor. Copy the example configuration
88
in [rabbitmq.yaml.example](./rabbitmq.yaml.example) to `/opt/stackstorm/configs/rabbitmq.yaml`
99
and edit as required.
1010

@@ -23,6 +23,7 @@ You can also use dynamic values from the datastore. See the
2323
You can specify multiple queues using this syntax:
2424

2525
```yaml
26+
---
2627
sensor_config:
2728
rabbitmq_queue_sensor:
2829
queues:
@@ -33,6 +34,40 @@ sensor_config:
3334
- queue2
3435
```
3536
37+
### rabbitmq.QueueWatcherAMQP Sensor
38+
You must configure this sensor under the top level configuration key `amqp_watcher_sensor_config`. This is to ensure backwards compatibility with other sensors.
39+
40+
The below config will declare a simple 'Classic' queue, with a 'direct' exchange with routing key to route messages to that queue (for publishers).
41+
42+
```yaml
43+
---
44+
amqp_watcher_sensor_config:
45+
host: "rabbitmq.domain.com"
46+
port: 5672
47+
username: "guest"
48+
password: "guest"
49+
queues:
50+
- queue: "temp_queue"
51+
type: "classic"
52+
exchanges:
53+
- exchange: "temp_exchange"
54+
type: "direct"
55+
bindings:
56+
- routing_key: "temp.messages"
57+
queue: "temp_queue"
58+
```
59+
The sensor then monitors any declared queues for new messages, and dispatches the trigger `rabbitmq.amqp_msg_rx` with the data:
60+
```json
61+
{"queue": "queue_name", "body": "message body"}
62+
```
63+
64+
If the message body is a serialized string of JSON, it will be deserialized and loaded before being dispatched.
65+
66+
#### Queues / Exchanges Config Parameters
67+
The sensor uses passthrough via `**kwargs` for declares and binds on `queues` and `exchanges` list items, so follow the [documentation of `py-yaml`](https://docs.celeryq.dev/projects/amqp/en/latest/reference/amqp.channel.html) for the `exchange_declare()`, `queue_declare()`, and `queue_bind()` methods.
68+
69+
**Note:** The `exchange` param is passed into `queue_bind()` explicitly via the sensor inferring it from config structure, and is not required.
70+
3671
## Actions
3772

3873
* ``list_exchanges`` - List available exchanges.
@@ -50,11 +85,19 @@ The following action will publish a message to a remote RabbitMQ server with a s
5085
$ st2 run rabbitmq.publish_message host=localhost port=5673 virtual_host=sensu exchange=metrics exchange_type=topic username=sensu password=password message="foo.bar.baz 1 1436802746"
5186
```
5287

53-
5488
## Sensors
5589

5690
* ``new_message`` - Sensor that triggers a rabbitmq.new_message with a payload containing the queue and the body
5791

92+
Configured in the pack config under the key `sensor_config`
93+
94+
This sensor uses Python's `threading` for concurrency via `pika`'s included methods.
95+
5896
This sensor should only be used with ``fanout`` and ``topic`` exchanges, this way it doesn't affect the behavior of the app since messages will still be delivered to other consumers / subscribers.
5997
If it's used with ``direct`` or ``headers`` exchanges, those messages won't be delivered to other consumers so it will affect app behavior and potentially break it.
6098

99+
* `amqp_msg_rx` - Sensor that triggers rabbitmq.amqp_msg_rx with a payload containing the queue and the body
100+
101+
Configured in the pack config under the key `amqp_watcher_sensor_config`
102+
103+
This sensor uses `asyncio` for concurrency and `py-amqp` and to handle the declaration of configured queues and exchanges on the remote RabbitMQ instance, and the subsequent consumption of messages.

config.schema.yaml

Lines changed: 129 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
sensor_config:
33
description: "RabbitMQ Sensor settings"
44
type: "object"
5-
required: true
5+
required: false
66
additionalProperties: true
77
properties:
88
host:
@@ -41,3 +41,131 @@ sensor_config:
4141
- "json"
4242
- "pickle"
4343
required: false
44+
amqp_watcher_sensor_config:
45+
description: "The config for a py-amqp sensor that creates and monitors queues in RabbitMQ for messages"
46+
required: false
47+
additionalProperties: false
48+
type: "object"
49+
properties:
50+
host:
51+
description: "The RabbitMQ host to connect to"
52+
type: "string"
53+
required: true
54+
port:
55+
description: "Connection port for RabbitMQ (Default: 5672)"
56+
type: "number"
57+
username:
58+
description: "Username for authenticating to RabbitMQ"
59+
type: "string"
60+
required: true
61+
password:
62+
description: "Password for authenticating to RabbitMQ"
63+
type: "string"
64+
required: true
65+
secret: true
66+
queues:
67+
description: "A list of queues to be declared (created if missing)"
68+
type: "array"
69+
required: false
70+
additionalProperties: false
71+
items:
72+
type: "object"
73+
additionalProperties: false
74+
required: true
75+
properties:
76+
queue:
77+
description: "The name of the queue"
78+
type: "string"
79+
required: true
80+
type:
81+
description: "The type of the queue"
82+
type: "string"
83+
required: false
84+
enum:
85+
- "classic"
86+
- "quorum"
87+
- "stream"
88+
passive:
89+
description: "Configure this queue as passive?"
90+
type: "boolean"
91+
required: false
92+
durable:
93+
description: "Configure this queue as durable?"
94+
type: "boolean"
95+
required: false
96+
exclusive:
97+
description: "Configure this queue as exclusive?"
98+
type: "boolean"
99+
required: false
100+
auto_delete:
101+
description: "Configure this queue to auto delete?"
102+
type: "boolean"
103+
required: false
104+
arguments:
105+
description: "Additional arguments to be pased during Queue declare"
106+
type: "object"
107+
required: false
108+
additionalProperties: true
109+
exchanges:
110+
description: "A list of exchanges to be declared (created if missing)"
111+
type: "array"
112+
required: false
113+
additionalProperties: false
114+
items:
115+
type: "object"
116+
additionalProperties: false
117+
required: true
118+
properties:
119+
exchange:
120+
description: "The name of the exchange"
121+
type: "string"
122+
required: true
123+
type:
124+
description: "The type of the exchange"
125+
type: "string"
126+
required: false
127+
enum:
128+
- "direct"
129+
- "fanout"
130+
- "headers"
131+
- "topic"
132+
passive:
133+
description: "Configure this exchange as passive?"
134+
type: "boolean"
135+
required: false
136+
durable:
137+
description: "Configure this exchange as durable?"
138+
type: "boolean"
139+
required: false
140+
auto_delete:
141+
description: "Configure this exchange to auto delete?"
142+
type: "boolean"
143+
required: false
144+
arguments:
145+
description: "Additional arguments to be pased during Exchange declare"
146+
type: "object"
147+
required: false
148+
additionalProperties: true
149+
bindings:
150+
description: "A list of bindings to be declared for this exchange"
151+
type: "array"
152+
required: false
153+
additionalProperties: false
154+
items:
155+
type: "object"
156+
additionalProperties: false
157+
required: true
158+
properties:
159+
routing_key:
160+
description: "The 'Routing Key' to bind with the queue"
161+
type: "string"
162+
required: true
163+
queue:
164+
description: "The name of the Queue that will be bound with the routing_key"
165+
type: "string"
166+
required: true
167+
arguments:
168+
description: "Additional arguments to be pased during binding"
169+
type: "object"
170+
required: false
171+
additionalProperties: true

pack.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ keywords:
99
- aqmp
1010
- stomp
1111
- message broker
12-
version: 1.1.1
12+
version: 1.1.2
1313
python_versions:
1414
- "3"
1515
author: StackStorm, Inc.

rabbitmq.yaml.example

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,17 @@ sensor_config:
77
queues:
88
- "queue1"
99
deserialization_method: "json"
10+
amqp_watcher_sensor_config:
11+
host: "rabbitmq.domain.com"
12+
port: 5672
13+
username: "guest"
14+
password: "guest"
15+
queues:
16+
- queue: "temp_queue"
17+
type: "classic"
18+
exchanges:
19+
- exchange: "temp_exchange"
20+
type: "direct"
21+
bindings:
22+
- routing_key: "temp.messages"
23+
queue: "temp_queue"

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
pika~=1.3.1
2+
amqp==5.0.6

0 commit comments

Comments
 (0)