diff --git a/README.md b/README.md index 40831e3..4265679 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ Implemented notifications are: 1. Celery task 2. HTTP request +3. RabbitMQ Message ## YAML file configuration @@ -51,3 +52,44 @@ eventisc.trigger("foo_created", {"foo": "bar"}) # Should fire only celery ``` + +## RabbitMQ / Pika special behaviour + +Pika does not support multithreading, pika-pool is used to have thread-safety but still have long-lived persistent +connections. + +You should periodically call `eventisc.heartbeat` if you're planning on supporting rabbit listeners. This ensures +server heartbeats are handled and reduces connection churn: + +```python +import threading + +def heartbeat_thread(): + while True: + time.sleep(60) # 60 is pika's default as of 1.2.0 + app.heartbeat() + + +t = threading.Thread(target=heartbeat_thread, daemon=True) +t.start() +``` + +Alternatively, users can avoid connection lost errors by setting the `stale` parameter lower than the heartbeat +timeout: + +```yaml +listeners: + - kind: rabbit + event_name_regex: .* + url: "amqp://localhost:5672/?heartbeat=90" # heartbeat timeout set to 90 secs + queue_kwargs: + queue: + queue: myqueue + publish_kwargs: + exchange: "" + routing_key: "{queue_kwargs['queue']['queue']}" + stale: 60 +``` + +This will *not* avoid connection churn, but it should reduce the chances of getting a connection error when sending a +message. diff --git a/setup.py b/setup.py index 401396c..5487cdf 100644 --- a/setup.py +++ b/setup.py @@ -94,9 +94,19 @@ def get_version(): extras_require={ # Optional 'requests': ['requests'], 'celery': ['celery'], - 'pika': ['pika'], + 'pika': [ + 'pika', + 'pika-pool @ git+https://github.com/radiocutfm/pika-pool.git@20220124#egg=pika_pool', + ], 'dev': ['check-manifest', 'responses', 'celery', 'requests', 'pika'], - 'test': ['coverage', 'responses', 'celery', 'requests', 'pika'], + 'test': [ + 'coverage', + 'responses', + 'celery', + 'requests', + 'pika', + 'pika-pool @ git+https://github.com/radiocutfm/pika-pool.git@20220124#egg=pika_pool' + ], }, # List additional URLs that are relevant to your project as a dict. diff --git a/src/eventisc/__init__.py b/src/eventisc/__init__.py index 52ea717..0f39bac 100644 --- a/src/eventisc/__init__.py +++ b/src/eventisc/__init__.py @@ -50,6 +50,13 @@ def trigger(self, event_name, event_data): count += 1 return count + def heartbeat(self): + for lis in self.listeners: + try: + lis.heartbeat() + except AttributeError: + pass + class Listener(ABC): _registry = {} @@ -213,3 +220,7 @@ def get_current_app(): def trigger(event_name, event_data): return get_current_app().trigger(event_name, event_data) + + +def heartbeat(): + return get_current_app().heartbeat() diff --git a/src/eventisc/rabbit_listener.py b/src/eventisc/rabbit_listener.py index 0ba3a3b..e897078 100644 --- a/src/eventisc/rabbit_listener.py +++ b/src/eventisc/rabbit_listener.py @@ -1,82 +1,65 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import from . import Listener -import pika import json import logging -import threading from functools import wraps +import pika +import pika_pool + logger = logging.getLogger(__name__) +# Reduce pika's verbosity +logging.getLogger("pika").setLevel(logging.WARNING) + -def reconnection_handler(f): +def retry_on_connection_error(f): @wraps(f) - def decorator(self, *args, **kwargs): + def decorated(*args, **kwargs): try: - ret = f(self, *args, **kwargs) + ret = f(*args, **kwargs) except pika.exceptions.AMQPConnectionError: - logger.info("Error connecting RabbitMQ, deleting channel...") - del self.local.channel - ret = f(self, *args, **kwargs) + logger.warning("AMQPConnection error executing %s. Retrying...", f.__name__) + ret = f(*args, **kwargs) + except Exception as e: + logger.warning("Ignoring exception %s from decorated function %s: %s", type(e), f.__name__, e) + raise return ret - return decorator + return decorated class RabbitListener(Listener): kind = "rabbit" - def __init__(self, event_name, url, queue_kwargs=None, publish_kwargs=None, filter=None): + def __init__(self, event_name, url, queue_kwargs=None, publish_kwargs=None, filter=None, stale=None, declare=True): super(RabbitListener, self).__init__(event_name, filter) self.url = url self.queue_kwargs = queue_kwargs self.publish_kwargs = publish_kwargs - self.local = threading.local() - self.lock = threading.Lock() - self.connection = None + self.pool = pika_pool.QueuedPool( + create=lambda: pika.BlockingConnection(pika.URLParameters(self.url)), + stale=stale + ) - self.declare_queue_and_exchange() + if declare: + self.declare_queue_and_exchange() - @reconnection_handler + def heartbeat(self): + self.pool.process_data_events() + + @retry_on_connection_error def declare_queue_and_exchange(self): queue = self.queue_kwargs.get("queue", {}) - self.get_channel().queue_declare(**queue) exchange = self.queue_kwargs.get("exchange", {}) - if exchange: - self.get_channel().exchange_declare(**exchange) - self.get_channel().queue_bind(exchange=exchange.get("exchange"), queue=queue.get("queue")) - - def get_channel(self): - self._connect() - - if hasattr(self.local, 'channel'): - return self.local.channel - - try: - self.local.channel = self.connection.channel() - except pika.exceptions.AMQPConnectionError: - logger.info("Error connecting RabbitMQ, getting new connection...") - self._connect(force_reconnect=True) - self.local.channel = self.connection.channel() - - return self.local.channel - - def _connect(self, force_reconnect=False): - # thread safe connection aquire - with self.lock: - if force_reconnect: - self.connection = None - - if self.connection is not None: - return - - self.connection = pika.BlockingConnection(pika.URLParameters(self.url)) - - if not hasattr(self.local, "channel"): - self.local.channel = self.connection.channel() + with self.pool.acquire() as conn: + conn.channel.queue_declare(**queue) + if exchange: + conn.channel.exchange_declare(**exchange) + conn.channel.queue_bind(exchange=exchange.get("exchange"), queue=queue.get("queue")) - @reconnection_handler + @retry_on_connection_error def _do_notify(self, event_name, event_data): if self.publish_kwargs: data = dict((k, self.format(v, event_name, event_data, queue_kwargs=self.queue_kwargs)) @@ -84,4 +67,6 @@ def _do_notify(self, event_name, event_data): else: data = None data["body"] = json.dumps(event_data) - self.get_channel().basic_publish(**data) + with self.pool.acquire() as conn: + conn.channel.basic_publish(**data) + return True diff --git a/tests/test_rabbit.py b/tests/test_rabbit.py index 136d728..7257ea8 100644 --- a/tests/test_rabbit.py +++ b/tests/test_rabbit.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +import re from unittest import TestCase from unittest import mock @@ -7,57 +8,51 @@ class TestRabbitListenerApp(TestCase): - def tearDown(self): - eventisc.default_app = None - - def _default_rabbit_config(self): - return { - "event_name_regex": ".*", - "kind": "rabbit", - "url": "http://rabbit:5672/end", - "queue_kwargs": { - "exchange": { - "exchange": "some-exchange", - "exchange_type": "direct" - }, - "queue": { - "queue": "some-queue" - } - }, - "publish_kwargs": { - "exchange": "{queue_kwargs['exchange']['exchange']}", - "routing_key": "{queue_kwargs['queue']['queue']}" - } - } + @mock.patch("eventisc.rabbit_listener.pika.BlockingConnection", autospec=True) + def test_message_publish(self, pika_mock): + listener = eventisc.rabbit_listener.RabbitListener( + event_name=re.compile(".*"), + url="amqp://rabbit:5672", + publish_kwargs={"exchange": "", "routing_key": "testrun"}, + queue_kwargs={"queue": "testrun"}, + declare=False, + ) - @mock.patch.object(eventisc.rabbit_listener, "pika") - def test_rabbit_listener(self, pika_mock): - app = eventisc.init_default_app(listeners=[self._default_rabbit_config()]) + with listener.pool.acquire() as conn: + channel = conn.channel - channel_mock = app.listeners[0].local.channel + listener.notify("test_event", {"some": "data"}) - app.trigger('client_created', {"id": 14, "topic": "client", "action": "created"}) + channel.basic_publish.assert_called_once_with( + body='{"some": "data"}', + exchange="", + routing_key="testrun", + ) - channel_mock.basic_publish.assert_called_once_with( - exchange='some-exchange', - routing_key='some-queue', - body='{"id": 14, "topic": "client", "action": "created"}' + @mock.patch("eventisc.rabbit_listener.pika.BlockingConnection", autospec=True) + def test_declare_queue_and_exchange(self, pika_mock): + listener = eventisc.rabbit_listener.RabbitListener( + event_name=re.compile(".*"), + url="amqp://rabbit:5672", + publish_kwargs={"exchange": "", "routing_key": "testrun"}, + queue_kwargs={ + "exchange": {"exchange": "some-exchange", "exchange_type": "direct"}, + "queue": {"queue": "some-queue"}, + }, + declare=False, ) - @mock.patch.object(eventisc.rabbit_listener, "pika") - def test_pika_connection(self, pika_mock): - app = eventisc.init_default_app(listeners=[self._default_rabbit_config()]) + with listener.pool.acquire() as conn: + channel = conn.channel - channel_mock = app.listeners[0].local.channel + listener.declare_queue_and_exchange() - channel_mock.queue_declare.assert_called_once_with(queue="some-queue") + channel.queue_declare.assert_called_once_with(queue="some-queue") - channel_mock.exchange_declare.assert_called_once_with( - exchange="some-exchange", - exchange_type="direct" + channel.exchange_declare.assert_called_once_with( + exchange="some-exchange", exchange_type="direct" ) - channel_mock.queue_bind.assert_called_once_with( - exchange="some-exchange", - queue="some-queue" + channel.queue_bind.assert_called_once_with( + exchange="some-exchange", queue="some-queue" )