Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Implemented notifications are:

1. Celery task
2. HTTP request
3. RabbitMQ Message


## YAML file configuration
Expand Down Expand Up @@ -51,3 +52,44 @@ eventisc.trigger("foo_created", {"foo": "bar"}) # Should fire only celery

```


## RabbitMQ / Pika special behaviour

Pika does not support multithreading, pika-pool is used to have thread-safety but still have long-lived persistent
connections.

You should periodically call `eventisc.heartbeat` if you're planning on supporting rabbit listeners. This ensures
server heartbeats are handled and reduces connection churn:

```python
import threading

def heartbeat_thread():
while True:
time.sleep(60) # 60 is pika's default as of 1.2.0
app.heartbeat()


t = threading.Thread(target=heartbeat_thread, daemon=True)
t.start()
```

Alternatively, users can avoid connection lost errors by setting the `stale` parameter lower than the heartbeat
timeout:

```yaml
listeners:
- kind: rabbit
event_name_regex: .*
url: "amqp://localhost:5672/?heartbeat=90" # heartbeat timeout set to 90 secs
queue_kwargs:
queue:
queue: myqueue
publish_kwargs:
exchange: ""
routing_key: "{queue_kwargs['queue']['queue']}"
stale: 60
```

This will *not* avoid connection churn, but it should reduce the chances of getting a connection error when sending a
message.
14 changes: 12 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,19 @@ def get_version():
extras_require={ # Optional
'requests': ['requests'],
'celery': ['celery'],
'pika': ['pika'],
'pika': [
'pika',
'pika-pool @ git+https://github.com/radiocutfm/pika-pool.git@20220124#egg=pika_pool',
],
'dev': ['check-manifest', 'responses', 'celery', 'requests', 'pika'],
'test': ['coverage', 'responses', 'celery', 'requests', 'pika'],
'test': [
'coverage',
'responses',
'celery',
'requests',
'pika',
'pika-pool @ git+https://github.com/radiocutfm/pika-pool.git@20220124#egg=pika_pool'
],
},

# List additional URLs that are relevant to your project as a dict.
Expand Down
11 changes: 11 additions & 0 deletions src/eventisc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ def trigger(self, event_name, event_data):
count += 1
return count

def heartbeat(self):
for lis in self.listeners:
try:
lis.heartbeat()
except AttributeError:
pass


class Listener(ABC):
_registry = {}
Expand Down Expand Up @@ -213,3 +220,7 @@ def get_current_app():

def trigger(event_name, event_data):
return get_current_app().trigger(event_name, event_data)


def heartbeat():
return get_current_app().heartbeat()
85 changes: 35 additions & 50 deletions src/eventisc/rabbit_listener.py
Original file line number Diff line number Diff line change
@@ -1,87 +1,72 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from . import Listener
import pika
import json
import logging
import threading
from functools import wraps

import pika
import pika_pool

logger = logging.getLogger(__name__)

# Reduce pika's verbosity
logging.getLogger("pika").setLevel(logging.WARNING)


def reconnection_handler(f):
def retry_on_connection_error(f):
@wraps(f)
def decorator(self, *args, **kwargs):
def decorated(*args, **kwargs):
try:
ret = f(self, *args, **kwargs)
ret = f(*args, **kwargs)
except pika.exceptions.AMQPConnectionError:
logger.info("Error connecting RabbitMQ, deleting channel...")
del self.local.channel
ret = f(self, *args, **kwargs)
logger.warning("AMQPConnection error executing %s. Retrying...", f.__name__)
ret = f(*args, **kwargs)
except Exception as e:
logger.warning("Ignoring exception %s from decorated function %s: %s", type(e), f.__name__, e)
raise
return ret
return decorator
return decorated


class RabbitListener(Listener):
kind = "rabbit"

def __init__(self, event_name, url, queue_kwargs=None, publish_kwargs=None, filter=None):
def __init__(self, event_name, url, queue_kwargs=None, publish_kwargs=None, filter=None, stale=None, declare=True):
super(RabbitListener, self).__init__(event_name, filter)
self.url = url
self.queue_kwargs = queue_kwargs
self.publish_kwargs = publish_kwargs

self.local = threading.local()
self.lock = threading.Lock()
self.connection = None
self.pool = pika_pool.QueuedPool(
create=lambda: pika.BlockingConnection(pika.URLParameters(self.url)),
stale=stale
)

self.declare_queue_and_exchange()
if declare:
self.declare_queue_and_exchange()

@reconnection_handler
def heartbeat(self):
self.pool.process_data_events()

@retry_on_connection_error
def declare_queue_and_exchange(self):
queue = self.queue_kwargs.get("queue", {})
self.get_channel().queue_declare(**queue)
exchange = self.queue_kwargs.get("exchange", {})
if exchange:
self.get_channel().exchange_declare(**exchange)
self.get_channel().queue_bind(exchange=exchange.get("exchange"), queue=queue.get("queue"))

def get_channel(self):
self._connect()

if hasattr(self.local, 'channel'):
return self.local.channel

try:
self.local.channel = self.connection.channel()
except pika.exceptions.AMQPConnectionError:
logger.info("Error connecting RabbitMQ, getting new connection...")
self._connect(force_reconnect=True)
self.local.channel = self.connection.channel()

return self.local.channel

def _connect(self, force_reconnect=False):
# thread safe connection aquire
with self.lock:
if force_reconnect:
self.connection = None

if self.connection is not None:
return

self.connection = pika.BlockingConnection(pika.URLParameters(self.url))

if not hasattr(self.local, "channel"):
self.local.channel = self.connection.channel()
with self.pool.acquire() as conn:
conn.channel.queue_declare(**queue)
if exchange:
conn.channel.exchange_declare(**exchange)
conn.channel.queue_bind(exchange=exchange.get("exchange"), queue=queue.get("queue"))

@reconnection_handler
@retry_on_connection_error
def _do_notify(self, event_name, event_data):
if self.publish_kwargs:
data = dict((k, self.format(v, event_name, event_data, queue_kwargs=self.queue_kwargs))
for (k, v) in self.publish_kwargs.items())
else:
data = None
data["body"] = json.dumps(event_data)
self.get_channel().basic_publish(**data)
with self.pool.acquire() as conn:
conn.channel.basic_publish(**data)
return True
79 changes: 37 additions & 42 deletions tests/test_rabbit.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import re
from unittest import TestCase
from unittest import mock

Expand All @@ -7,57 +8,51 @@


class TestRabbitListenerApp(TestCase):
def tearDown(self):
eventisc.default_app = None

def _default_rabbit_config(self):
return {
"event_name_regex": ".*",
"kind": "rabbit",
"url": "http://rabbit:5672/end",
"queue_kwargs": {
"exchange": {
"exchange": "some-exchange",
"exchange_type": "direct"
},
"queue": {
"queue": "some-queue"
}
},
"publish_kwargs": {
"exchange": "{queue_kwargs['exchange']['exchange']}",
"routing_key": "{queue_kwargs['queue']['queue']}"
}
}
@mock.patch("eventisc.rabbit_listener.pika.BlockingConnection", autospec=True)
def test_message_publish(self, pika_mock):
listener = eventisc.rabbit_listener.RabbitListener(
event_name=re.compile(".*"),
url="amqp://rabbit:5672",
publish_kwargs={"exchange": "", "routing_key": "testrun"},
queue_kwargs={"queue": "testrun"},
declare=False,
)

@mock.patch.object(eventisc.rabbit_listener, "pika")
def test_rabbit_listener(self, pika_mock):
app = eventisc.init_default_app(listeners=[self._default_rabbit_config()])
with listener.pool.acquire() as conn:
channel = conn.channel

channel_mock = app.listeners[0].local.channel
listener.notify("test_event", {"some": "data"})

app.trigger('client_created', {"id": 14, "topic": "client", "action": "created"})
channel.basic_publish.assert_called_once_with(
body='{"some": "data"}',
exchange="",
routing_key="testrun",
)

channel_mock.basic_publish.assert_called_once_with(
exchange='some-exchange',
routing_key='some-queue',
body='{"id": 14, "topic": "client", "action": "created"}'
@mock.patch("eventisc.rabbit_listener.pika.BlockingConnection", autospec=True)
def test_declare_queue_and_exchange(self, pika_mock):
listener = eventisc.rabbit_listener.RabbitListener(
event_name=re.compile(".*"),
url="amqp://rabbit:5672",
publish_kwargs={"exchange": "", "routing_key": "testrun"},
queue_kwargs={
"exchange": {"exchange": "some-exchange", "exchange_type": "direct"},
"queue": {"queue": "some-queue"},
},
declare=False,
)

@mock.patch.object(eventisc.rabbit_listener, "pika")
def test_pika_connection(self, pika_mock):
app = eventisc.init_default_app(listeners=[self._default_rabbit_config()])
with listener.pool.acquire() as conn:
channel = conn.channel

channel_mock = app.listeners[0].local.channel
listener.declare_queue_and_exchange()

channel_mock.queue_declare.assert_called_once_with(queue="some-queue")
channel.queue_declare.assert_called_once_with(queue="some-queue")

channel_mock.exchange_declare.assert_called_once_with(
exchange="some-exchange",
exchange_type="direct"
channel.exchange_declare.assert_called_once_with(
exchange="some-exchange", exchange_type="direct"
)

channel_mock.queue_bind.assert_called_once_with(
exchange="some-exchange",
queue="some-queue"
channel.queue_bind.assert_called_once_with(
exchange="some-exchange", queue="some-queue"
)