diff --git a/README.md b/README.md index a405c26..8142a87 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # MRSAL AMQP -[![Release](https://img.shields.io/badge/release-2.1.0-blue.svg)](https://pypi.org/project/mrsal/) +[![Release](https://img.shields.io/badge/release-2.1.1-blue.svg)](https://pypi.org/project/mrsal/) [![Python 3.10+](https://img.shields.io/badge/python-3.10%7C3.11%7C3.12-blue.svg)](https://www.python.org/downloads/) [![Mrsal Workflow](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml/badge.svg?branch=main)](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml) [![Coverage](https://neomedsys.github.io/mrsal/reports/badges/coverage-badge.svg)](https://neomedsys.github.io/mrsal/reports/coverage/htmlcov/) diff --git a/mrsal/amqp/subclass.py b/mrsal/amqp/subclass.py index cb63898..c13cc05 100644 --- a/mrsal/amqp/subclass.py +++ b/mrsal/amqp/subclass.py @@ -292,7 +292,7 @@ def publish_message( exchange_type: str, queue_name: str, auto_declare: bool = True, - passive: bool = True, + passive: bool = True, prop: pika.BasicProperties | None = None, ) -> None: """Publish message to the exchange specifying routing key and properties. @@ -380,7 +380,6 @@ def publish_messages( # connect and use only blocking self.setup_blocking_connection() - if auto_declare: self._setup_exchange_and_queue( exchange_name=protocol.exchange_name, diff --git a/mrsal/superclass.py b/mrsal/superclass.py index 4e9290b..ecc1e32 100644 --- a/mrsal/superclass.py +++ b/mrsal/superclass.py @@ -89,66 +89,71 @@ def _setup_exchange_and_queue(self, if queue_args is None: queue_args = {} + if not passive: + if dlx_enable: + dlx_name = dlx_exchange_name or f"{exchange_name}.dlx" + dlx_routing = dlx_routing_key or routing_key + try: + self._declare_exchange( + exchange=dlx_name, + exchange_type=exchange_type, + arguments=None, + durable=exch_durable, + passive=passive, + internal=internal, + auto_delete=auto_delete + ) + if self.verbose: + log.info(f"Dead letter exchange {dlx_name} declared successfully") + + except MrsalSetupError as e: + log.warning(f"DLX {dlx_name} might already exist or failed to create: {e}") + + queue_args.update({ + 'x-dead-letter-exchange': dlx_name, + 'x-dead-letter-routing-key': dlx_routing + }) + + if use_quorum_queues: + queue_args.update({ + 'x-queue-type': 'quorum', + 'x-quorum-initial-group-size': 3 + }) - if dlx_enable: - dlx_name = dlx_exchange_name or f"{exchange_name}.dlx" - dlx_routing = dlx_routing_key or routing_key - try: - self._declare_exchange( - exchange=dlx_name, - exchange_type=exchange_type, - arguments=None, - durable=exch_durable, - passive=False, - internal=internal, - auto_delete=auto_delete - ) if self.verbose: - log.info(f"Dead letter exchange {dlx_name} declared successfully") - - except MrsalSetupError as e: - log.warning(f"DLX {dlx_name} might already exist or failed to create: {e}") - - queue_args.update({ - 'x-dead-letter-exchange': dlx_name, - 'x-dead-letter-routing-key': dlx_routing - }) - - if use_quorum_queues: - queue_args.update({ - 'x-queue-type': 'quorum', - 'x-quorum-initial-group-size': 3 - }) + log.info(f"Queue {queue_name} configured as quorum queue for enhanced reliability") - if self.verbose: - log.info(f"Queue {queue_name} configured as quorum queue for enhanced reliability") + # Add max length settings + if max_queue_length and max_queue_length > 0: + queue_args['x-max-length'] = max_queue_length + + if max_queue_length_bytes and max_queue_length_bytes > 0: + queue_args['x-max-length-bytes'] = max_queue_length_bytes - # Add max length settings - if max_queue_length and max_queue_length > 0: - queue_args['x-max-length'] = max_queue_length - - if max_queue_length_bytes and max_queue_length_bytes > 0: - queue_args['x-max-length-bytes'] = max_queue_length_bytes + # Add overflow behavior + if queue_overflow in ["drop-head", "reject-publish"]: + queue_args['x-overflow'] = queue_overflow - # Add overflow behavior - if queue_overflow in ["drop-head", "reject-publish"]: - queue_args['x-overflow'] = queue_overflow + # Add single active consumer + if single_active_consumer: + queue_args['x-single-active-consumer'] = True - # Add single active consumer - if single_active_consumer: - queue_args['x-single-active-consumer'] = True + # Add lazy queue setting + if lazy_queue: + queue_args['x-queue-mode'] = 'lazy' - # Add lazy queue setting - if lazy_queue: - queue_args['x-queue-mode'] = 'lazy' + if self.verbose and queue_args: + log.info(f"Queue {queue_name} configured with arguments: {queue_args}") + else: + queue_args = {} + if self.verbose: + log.info(f"Passive mode: checking existence of queue {queue_name} without configuration") - if self.verbose and queue_args: - log.info(f"Queue {queue_name} configured with arguments: {queue_args}") declare_exhange_dict = { 'exchange': exchange_name, 'exchange_type': exchange_type, - 'arguments': exch_args, + 'arguments': exch_args if not passive else None, 'durable': exch_durable, 'passive': passive, 'internal': internal, @@ -174,9 +179,13 @@ def _setup_exchange_and_queue(self, try: self._declare_exchange(**declare_exhange_dict) self._declare_queue(**declare_queue_dict) - self._declare_queue_binding(**declare_queue_binding_dict) + if not passive: + self._declare_queue_binding(**declare_queue_binding_dict) self.auto_declare_ok = True - log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.") + if not passive: + log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.") + else: + log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.") except MrsalSetupError as e: log.error(f'Splæt! I failed the declaration setup with {e}', exc_info=True) self.auto_declare_ok = False @@ -207,66 +216,72 @@ async def _async_setup_exchange_and_queue(self, if queue_args is None: queue_args = {} - if dlx_enable: - dlx_name = dlx_exchange_name or f"{exchange_name}.dlx" - dlx_routing = dlx_routing_key or routing_key - - try: - await self._async_declare_exchange( - exchange=dlx_name, - exchange_type=exchange_type, - arguments=None, - durable=exch_durable, - passive=False, - internal=internal, - auto_delete=auto_delete - ) + if not passive: + if dlx_enable: + dlx_name = dlx_exchange_name or f"{exchange_name}.dlx" + dlx_routing = dlx_routing_key or routing_key + + try: + await self._async_declare_exchange( + exchange=dlx_name, + exchange_type=exchange_type, + arguments=None, + durable=exch_durable, + passive=passive, + internal=internal, + auto_delete=auto_delete + ) + + if self.verbose: + log.info(f"Dead letter exchange {dlx_name} declared successfully") + + except MrsalSetupError as e: + log.warning(f"DLX {dlx_name} might already exist or failed to create: {e}") + + queue_args.update({ + 'x-dead-letter-exchange': dlx_name, + 'x-dead-letter-routing-key': dlx_routing + }) + + if use_quorum_queues: + queue_args.update({ + 'x-queue-type': 'quorum', + 'x-quorum-initial-group-size': 3 # Good default for 3+ node clusters + }) if self.verbose: - log.info(f"Dead letter exchange {dlx_name} declared successfully") + log.info(f"Queue {queue_name} configured as quorum queue for enhanced reliability") - except MrsalSetupError as e: - log.warning(f"DLX {dlx_name} might already exist or failed to create: {e}") + if max_queue_length and max_queue_length > 0: + queue_args['x-max-length'] = max_queue_length + + if max_queue_length_bytes and max_queue_length_bytes > 0: + queue_args['x-max-length-bytes'] = max_queue_length_bytes - queue_args.update({ - 'x-dead-letter-exchange': dlx_name, - 'x-dead-letter-routing-key': dlx_routing - }) + # Add overflow behavior + if queue_overflow and queue_overflow in ["drop-head", "reject-publish"]: + queue_args['x-overflow'] = queue_overflow - if use_quorum_queues: - queue_args.update({ - 'x-queue-type': 'quorum', - 'x-quorum-initial-group-size': 3 # Good default for 3+ node clusters - }) - - if self.verbose: - log.info(f"Queue {queue_name} configured as quorum queue for enhanced reliability") + # Add single active consumer + if single_active_consumer: + queue_args['x-single-active-consumer'] = True - if max_queue_length and max_queue_length > 0: - queue_args['x-max-length'] = max_queue_length - - if max_queue_length_bytes and max_queue_length_bytes > 0: - queue_args['x-max-length-bytes'] = max_queue_length_bytes + # Add lazy queue setting + if lazy_queue: + queue_args['x-queue-mode'] = 'lazy' - # Add overflow behavior - if queue_overflow and queue_overflow in ["drop-head", "reject-publish"]: - queue_args['x-overflow'] = queue_overflow - - # Add single active consumer - if single_active_consumer: - queue_args['x-single-active-consumer'] = True - - # Add lazy queue setting - if lazy_queue: - queue_args['x-queue-mode'] = 'lazy' + if self.verbose and queue_args: + log.info(f"Queue {queue_name} configured with arguments: {queue_args}") + else: + queue_args = {} + if self.verbose: + log.info(f"Passive mode: checking existence of queue {queue_name} without configuration") - if self.verbose and queue_args: - log.info(f"Queue {queue_name} configured with arguments: {queue_args}") async_declare_exhange_dict = { 'exchange': exchange_name, 'exchange_type': exchange_type, - 'arguments': exch_args, + 'arguments': exch_args if not passive else None, 'durable': exch_durable, 'passive': passive, 'internal': internal, @@ -292,9 +307,13 @@ async def _async_setup_exchange_and_queue(self, # Declare exchange and queue exchange = await self._async_declare_exchange(**async_declare_exhange_dict) queue = await self._async_declare_queue(**async_declare_queue_dict) - await self._async_declare_queue_binding(queue=queue, exchange=exchange, **async_declare_queue_binding_dict) + if not passive: + await self._async_declare_queue_binding(queue=queue, exchange=exchange, **async_declare_queue_binding_dict) self.auto_declare_ok = True - log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.") + if not passive: + log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.") + else: + log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.") if dlx_enable: log.info(f"You have a dead letter exhange {dlx_name} for fault tolerance -- use it well young grasshopper!") return queue diff --git a/pyproject.toml b/pyproject.toml index 240a393..0b4317b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ license = "GPL-3.0-or-later" maintainers = ["Jon E Nesvold "] name = "mrsal" readme = "README.md" -version = "2.1.0" +version = "2.1.1" homepage = "https://github.com/NeoMedSys/mrsal" repository = "https://github.com/NeoMedSys/mrsal" documentation = "https://neomedsys.github.io/mrsal/" diff --git a/reports/coverage/.coverage b/reports/coverage/.coverage index f1bdeb4..b4bab4b 100644 Binary files a/reports/coverage/.coverage and b/reports/coverage/.coverage differ diff --git a/reports/coverage/coverage.xml b/reports/coverage/coverage.xml index 8873827..12bd931 100644 --- a/reports/coverage/coverage.xml +++ b/reports/coverage/coverage.xml @@ -1,5 +1,5 @@ - + @@ -79,7 +79,7 @@ - + @@ -118,7 +118,7 @@ - + @@ -164,6 +164,7 @@ + @@ -190,205 +191,218 @@ - - - - - - - - + + + + + + - - - + + + + - - - - - - - - - + + + + + + + + + + + + + + + + - - - - - - - - - - - - + + + + + + + - - - - - - + + + + + + - - + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - + + + + - + - - - - - - - - + + + + - + + - - - + + + - + - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + - + - - + + - - + + + + + - - + - - - + + + - - - + + + + + + + - - - + + - - - - + + + + - - - - - - - + + + + + + - - - - + + + + + - - - - - + + + + + + + + + + + @@ -527,11 +541,12 @@ + - - - - + + + + @@ -539,22 +554,22 @@ - - - - - + + + + + - - - - - - + + + + + + - + @@ -562,96 +577,95 @@ - - - - + + + + - + - - - + + + - + - - - - + + + + - - - + + + - - + + - + - - - + + + - + - + - + - - + + - + - - + + - + - + - - - + + + - + - - + + - - - - - + + + + + - - - + + + - + - - + @@ -1311,26 +1325,34 @@ - - + + - - + + - + - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + diff --git a/reports/junit/junit.xml b/reports/junit/junit.xml index 52b7010..6388327 100644 --- a/reports/junit/junit.xml +++ b/reports/junit/junit.xml @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file diff --git a/tests/test_mrsal_with_full_queue_config.py b/tests/test_mrsal_with_full_queue_config.py index 5789834..181776d 100644 --- a/tests/test_mrsal_with_full_queue_config.py +++ b/tests/test_mrsal_with_full_queue_config.py @@ -1,6 +1,7 @@ import pytest from unittest.mock import MagicMock from mrsal.amqp.subclass import MrsalBlockingAMQP +from pika.exceptions import ChannelClosedByBroker class TestPassiveDeclarationAndQueueSettings: @@ -112,3 +113,43 @@ def test_consumer_passive_declaration_default(self, mock_consumer): single_active_consumer=None, lazy_queue=None ) + + + def test_publisher_passive_false_with_queue_mismatch(self, mock_consumer): + """Test that publisher with passive=False can cause configuration conflicts""" + # Mock a RabbitMQ error when queue config doesn't match + mock_consumer._setup_exchange_and_queue.side_effect = ChannelClosedByBroker( + 406, "PRECONDITION_FAILED - inequivalent arg 'x-max-length'" + ) + + with pytest.raises(Exception): # Should fail with config mismatch + mock_consumer.publish_message( + exchange_name="test_exch", + routing_key="test_key", + message="test_message", + exchange_type="direct", + queue_name="test_queue", + passive=False, # This should cause problems! + # Publisher trying to configure queue = bad idea + ) + + def test_publisher_passive_true_no_conflicts(self, mock_consumer): + """Test that publisher with passive=True avoids configuration conflicts""" + # This should work fine - no config arguments sent + mock_consumer.publish_message( + exchange_name="test_exch", + routing_key="test_key", + message="test_message", + exchange_type="direct", + queue_name="test_queue", + passive=True # Safe mode - just check existence + ) + + # Should succeed without issues + mock_consumer._setup_exchange_and_queue.assert_called_with( + exchange_name="test_exch", + queue_name="test_queue", + exchange_type="direct", + routing_key="test_key", + passive=True + )