diff --git a/README.md b/README.md index 8142a87..6e604cc 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # MRSAL AMQP -[![Release](https://img.shields.io/badge/release-2.1.1-blue.svg)](https://pypi.org/project/mrsal/) +[![Release](https://img.shields.io/badge/release-3.0.0-blue.svg)](https://pypi.org/project/mrsal/) [![Python 3.10+](https://img.shields.io/badge/python-3.10%7C3.11%7C3.12-blue.svg)](https://www.python.org/downloads/) [![Mrsal Workflow](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml/badge.svg?branch=main)](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml) [![Coverage](https://neomedsys.github.io/mrsal/reports/badges/coverage-badge.svg)](https://neomedsys.github.io/mrsal/reports/coverage/htmlcov/) @@ -11,7 +11,6 @@ Mrsal is a **production-ready** message broker abstraction on top of [RabbitMQ]( **What makes Mrsal production-ready:** -- **Intelligent Retry Logic**: Immediate retries + time-delayed retry cycles prevent message loss - **Dead Letter Exchange**: Automatic DLX setup with configurable retry cycles - **High Availability**: Quorum queues for data safety across cluster nodes - **Performance Tuning**: Queue limits, overflow behavior, lazy queues, prefetch control @@ -211,7 +210,6 @@ mrsal = MrsalBlockingAMQP( credentials=(RABBITMQ_USER, RABBITMQ_PASSWORD), virtual_host=RABBITMQ_VHOST, dlx_enable=True, # Default: creates '.dlx' - max_retries=3 # Immediate retries before DLX ) # Advanced retry configuration with cycles @@ -228,22 +226,17 @@ mrsal.start_consumer( enable_retry_cycles=True, # Enable time-delayed retry cycles retry_cycle_interval=10, # Minutes between retry cycles max_retry_time_limit=60, # Total minutes before permanent failure - immediate_retry_delay=4, # Seconds between immediate retries - requeue=True # Enable requeuing for retries ) ``` **How the advanced retry logic works:** -1. **Immediate Retries**: Message fails → retry up to \`max_retries\` times with \`immediate_retry_delay\` seconds between attempts -2. **Retry Cycles**: After immediate retries exhausted → send to DLX with TTL for time-delayed retry +2. **Retry Cycles**: Send to DLX with TTL for time-delayed retry 3. **Cycle Tracking**: Each cycle increments counters and tracks total elapsed time 4. **Permanent Failure**: After \`max_retry_time_limit\` exceeded → message stays in DLX for manual review **Benefits:** -- Handles transient failures with immediate retries - Handles longer outages with time-delayed cycles -- Prevents infinite retry loops - Full observability with retry tracking - Manual intervention capability for persistent failures @@ -350,7 +343,6 @@ mrsal = MrsalBlockingAMQP( credentials=(RABBITMQ_USER, RABBITMQ_PASSWORD), virtual_host=RABBITMQ_VHOST, dlx_enable=True, # Automatic DLX for failed orders - max_retries=3, # Immediate retries use_quorum_queues=True, # High availability prefetch_count=10 # Process up to 10 messages concurrently ) @@ -369,7 +361,6 @@ mrsal.start_consumer( enable_retry_cycles=True, # Enable retry cycles retry_cycle_interval=15, # 15 minutes between cycles max_retry_time_limit=120, # 2 hours total retry time - immediate_retry_delay=5, # 5 seconds between immediate retries # Queue performance settings max_queue_length=50000, # Handle large order volumes diff --git a/mrsal/amqp/subclass.py b/mrsal/amqp/subclass.py index c13cc05..d5f0d69 100644 --- a/mrsal/amqp/subclass.py +++ b/mrsal/amqp/subclass.py @@ -1,5 +1,4 @@ import asyncio -import time from mrsal.basemodels import MrsalProtocol import pika import json @@ -7,13 +6,13 @@ from mrsal.exceptions import MrsalAbortedSetup, MrsalNoAsyncioLoopError from logging import WARNING from pika.exceptions import ( - AMQPConnectionError, - ChannelClosedByBroker, - StreamLostError, - ConnectionClosedByBroker, - NackError, - UnroutableError - ) + AMQPConnectionError, + ChannelClosedByBroker, + StreamLostError, + ConnectionClosedByBroker, + NackError, + UnroutableError + ) from aio_pika import connect_robust, Message, Channel as AioChannel from typing import Callable, Type from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, before_sleep_log @@ -27,675 +26,640 @@ @dataclass class MrsalBlockingAMQP(Mrsal): - """ - :param int blocked_connection_timeout: blocked_connection_timeout - is the timeout, in seconds, - for the connection to remain blocked; if the timeout expires, - the connection will be torn down during connection tuning. - """ - blocked_connection_timeout: int = 60 # sec - - - def setup_blocking_connection(self) -> None: - """We can use setup_blocking_connection for establishing a connection to RabbitMQ server specifying connection parameters. - The connection is blocking which is only advisable to use for the apps with low througput. - - DISCLAIMER: If you expect a lot of traffic to the app or if its realtime then you should use async. - - Parameters - ---------- - context : Dict[str, str] - context is the structured map with information regarding the SSL options for connecting with rabbit server via TLS. - """ - connection_info = f""" - Mrsal connection parameters: - host={self.host}, - virtual_host={self.virtual_host}, - port={self.port}, - heartbeat={self.heartbeat}, - ssl={self.ssl} - """ - if self.verbose: - log.info(f"Establishing connection to RabbitMQ on {connection_info}") - credentials = pika.PlainCredentials(*self.credentials) - try: - self._connection = pika.BlockingConnection( - pika.ConnectionParameters( - host=self.host, - port=self.port, - ssl_options=self.get_ssl_context(async_conn=False), - virtual_host=self.virtual_host, - credentials=credentials, - heartbeat=self.heartbeat, - blocked_connection_timeout=self.blocked_connection_timeout, - ) - ) - - self._channel = self._connection.channel() - # Note: prefetch is set to 1 here as an example only. - # In production you will want to test with different prefetch values to find which one provides the best performance and usability for your solution. - # use a high number of prefecth if you think the pods with Mrsal installed can handle it. A prefetch 4 will mean up to 4 async runs before ack is required - self._channel.basic_qos(prefetch_count=self.prefetch_count) - log.info(f"Boom! Connection established with RabbitMQ on {connection_info}") - except (AMQPConnectionError, ChannelClosedByBroker, ConnectionClosedByBroker, StreamLostError) as e: - log.error(f"I tried to connect with the RabbitMQ server but failed with: {e}") - raise - except Exception as e: - log.error(f"Unexpected error caught: {e}") - - @retry( - retry=retry_if_exception_type(( - AMQPConnectionError, - ChannelClosedByBroker, - ConnectionClosedByBroker, - StreamLostError, - )), - stop=stop_after_attempt(3), - wait=wait_fixed(2), - before_sleep=before_sleep_log(log, WARNING) - ) - def start_consumer(self, - queue_name: str, - callback: Callable | None = None, - callback_args: dict[str, str | int | float | bool] | None = None, - auto_ack: bool = True, - inactivity_timeout: int | None = None, # just let conusmer wait patiently damn it - auto_declare: bool = True, - exchange_name: str | None = None, - exchange_type: str | None = None, - routing_key: str | None = None, - payload_model: Type | None = None, - requeue: bool = True, - dlx_enable: bool = True, - dlx_exchange_name: str | None = None, - dlx_routing_key: str | None = None, - use_quorum_queues: bool = True, - enable_retry_cycles: bool = True, - retry_cycle_interval: int = 10, # Minutes between cycles - max_retry_time_limit: int = 60, # Minutes total before permanent DLX - immediate_retry_delay: int = 4, # Seconds between immediate retries - max_queue_length: int | None = None, - max_queue_length_bytes: int | None = None, - queue_overflow: str | None = None, - single_active_consumer: bool | None = None, - lazy_queue: bool | None = None, - ) -> None: - """ - Start the consumer using blocking setup. - :param str queue_name: The queue to consume from - :param Callable callback: The callback function to process messages - :param dict callback_args: Optional arguments to pass to the callback - :param bool auto_ack: If True, messages are automatically acknowledged - :param int inactivity_timeout: Timeout for inactivity in the consumer loop - :param bool auto_declare: If True, will declare exchange/queue before consuming - :param bool passive: If True, only check if exchange/queue exists (False for consumers) - :param str exchange_name: Exchange name for auto_declare - :param str exchange_type: Exchange type for auto_declare - :param str routing_key: Routing key for auto_declare - :param Type payload_model: Pydantic model for payload validation - :param bool requeue: Whether to requeue failed messages - :param bool dlx_enable: Enable dead letter exchange - :param str dlx_exchange_name: Custom DLX exchange name - :param str dlx_routing_key: Custom DLX routing key - :param bool use_quorum_queues: Use quorum queues for durability - :param bool enable_retry_cycles: Enable DLX retry cycles - :param int retry_cycle_interval: Minutes between retry cycles - :param int max_retry_time_limit: Minutes total before permanent DLX - :param int immediate_retry_delay: Seconds between immediate retries - :param int max_queue_length: Maximum number of messages in queue - :param int max_queue_length_bytes: Maximum queue size in bytes - :param str queue_overflow: "drop-head" or "reject-publish" - :param bool single_active_consumer: Only one consumer processes at a time - :param bool lazy_queue: Store messages on disk to save memory - """ - # Connect and start the I/O loop - self.setup_blocking_connection() - retry_counts = {} - - if auto_declare: - if None in (exchange_name, queue_name, exchange_type, routing_key): - raise TypeError('Make sure that you are passing in all the necessary args for auto_declare') - - self._setup_exchange_and_queue( - exchange_name=exchange_name, - queue_name=queue_name, - exchange_type=exchange_type, - routing_key=routing_key, - dlx_enable=dlx_enable, - dlx_exchange_name=dlx_exchange_name, - dlx_routing_key=dlx_routing_key, - use_quorum_queues=use_quorum_queues, - max_queue_length=max_queue_length, - max_queue_length_bytes=max_queue_length_bytes, - queue_overflow=queue_overflow, - single_active_consumer=single_active_consumer, - lazy_queue=lazy_queue - ) - - if not self.auto_declare_ok: - raise MrsalAbortedSetup('Auto declaration for the connection setup failed and is aborted') - - # Log consumer configuration - consumer_config = { - "queue": queue_name, - "exchange": exchange_name, - "max_length": max_queue_length or self.max_queue_length, - "overflow": queue_overflow or self.queue_overflow, - "single_consumer": single_active_consumer if single_active_consumer is not None else self.single_active_consumer, - "lazy": lazy_queue if lazy_queue is not None else self.lazy_queue - } - - log.info(f"Straight out of the swamps -- consumer boi listening with config: {consumer_config}") - - try: - for method_frame, properties, body in self._channel.consume( - queue=queue_name, auto_ack=auto_ack, inactivity_timeout=inactivity_timeout): - if method_frame: - if properties: - app_id = properties.app_id if hasattr(properties, 'app_id') else 'no AppID given' - msg_id = properties.message_id if hasattr(properties, 'message_id') else 'no msgID given' - delivery_tag = method_frame.delivery_tag - - if self.verbose: - log.info( - f""" - Message received with: - - Method Frame: {method_frame} - - Redelivery: {method_frame.redelivered} - - Exchange: {method_frame.exchange} - - Routing Key: {method_frame.routing_key} - - Delivery Tag: {method_frame.delivery_tag} - - Properties: {properties} - - Requeue: {requeue} - - Auto Ack: {auto_ack} - """ - ) - if auto_ack: - log.info(f'I successfully received a message with AutoAck from: {app_id} with messageID: {msg_id}') - - current_retry = retry_counts.get(delivery_tag, 0) - should_process = True - - if payload_model: - try: - self.validate_payload(body, payload_model) - except (ValidationError, json.JSONDecodeError, UnicodeDecodeError, TypeError) as e: - log.error(f"Oh lordy lord, payload validation failed for your specific model requirements: {e}") - should_process = False - - if callback and should_process: - try: - if callback_args: - callback(*callback_args, method_frame, properties, body) - else: - callback(method_frame, properties, body) - except Exception as e: - log.error(f"Callback method failure: {e}") - log.error(f"Oh lordy lord message {msg_id} from {app_id} failed while running callback") - should_process = False - - if not should_process and not auto_ack: - if current_retry < self.max_retries and requeue: - # Increment retry count and requeue - retry_counts[delivery_tag] = current_retry + 1 - self._channel.basic_nack(delivery_tag=delivery_tag, requeue=True) - log.info(f"Message {msg_id} requeued (attempt {current_retry + 1}/{self.max_retries})") - # Add delay before next retry - if current_retry + 1 < self.max_retries: - time.sleep(immediate_retry_delay) - else: - # Max retries reached or requeue disabled - retry_counts.pop(delivery_tag, None) - if dlx_enable and enable_retry_cycles: - # Use retry cycle logic - self._publish_to_dlx_with_retry_cycle( - method_frame, properties, body, "Callback processing failed", - exchange_name, routing_key, enable_retry_cycles, - retry_cycle_interval, max_retry_time_limit, dlx_exchange_name - ) - elif dlx_enable: - # Original DLX behavior - self._channel.basic_nack(delivery_tag=delivery_tag, requeue=False) - log.warning(f"Message {msg_id} sent to dead letter exchange after {current_retry} retries") - log.info(f"Its the fault of polictial ideology imposing! Strangle an influencer if it makes you feel better!") - else: - self._channel.basic_nack(delivery_tag=delivery_tag, requeue=False) - log.warning(f"No dead letter exchange declared for {queue_name}, proceeding to drop the message -- reflect on your life choices! byebye") - log.info(f"Dropped message content: {body}") - continue - - if not auto_ack and should_process: - retry_counts.pop(delivery_tag, None) - log.info(f'Message ({msg_id}) from {app_id} received and properly processed -- now dance the funky chicken') - self._channel.basic_ack(delivery_tag=delivery_tag) - - except (AMQPConnectionError, ConnectionClosedByBroker, StreamLostError) as e: - log.error(f"Ooooooopsie! I caught a connection error while consuming: {e}") - raise - except Exception as e: - log.error(f'Oh lordy lord! I failed consuming ze messaj with: {e}') - - @retry( - retry=retry_if_exception_type(( - NackError, - UnroutableError - )), - stop=stop_after_attempt(3), - wait=wait_fixed(2), - before_sleep=before_sleep_log(log, WARNING) - ) - def publish_message( - self, - exchange_name: str, - routing_key: str, - message: str | bytes | None, - exchange_type: str, - queue_name: str, - auto_declare: bool = True, + """ + :param int blocked_connection_timeout: blocked_connection_timeout + is the timeout, in seconds, + for the connection to remain blocked; if the timeout expires, + the connection will be torn down during connection tuning. + """ + blocked_connection_timeout: int = 60 # sec + + + def setup_blocking_connection(self) -> None: + """We can use setup_blocking_connection for establishing a connection to RabbitMQ server specifying connection parameters. + The connection is blocking which is only advisable to use for the apps with low througput. + + DISCLAIMER: If you expect a lot of traffic to the app or if its realtime then you should use async. + + Parameters + ---------- + context : Dict[str, str] + context is the structured map with information regarding the SSL options for connecting with rabbit server via TLS. + """ + connection_info = f""" + Mrsal connection parameters: + host={self.host}, + virtual_host={self.virtual_host}, + port={self.port}, + heartbeat={self.heartbeat}, + ssl={self.ssl} + """ + if self.verbose: + log.info(f"Establishing connection to RabbitMQ on {connection_info}") + credentials = pika.PlainCredentials(*self.credentials) + try: + self._connection = pika.BlockingConnection( + pika.ConnectionParameters( + host=self.host, + port=self.port, + ssl_options=self.get_ssl_context(async_conn=False), + virtual_host=self.virtual_host, + credentials=credentials, + heartbeat=self.heartbeat, + blocked_connection_timeout=self.blocked_connection_timeout, + ) + ) + + self._channel = self._connection.channel() + # Note: prefetch is set to 1 here as an example only. + # In production you will want to test with different prefetch values to find which one provides the best performance and usability for your solution. + # use a high number of prefecth if you think the pods with Mrsal installed can handle it. A prefetch 4 will mean up to 4 async runs before ack is required + self._channel.basic_qos(prefetch_count=self.prefetch_count) + log.info(f"Boom! Connection established with RabbitMQ on {connection_info}") + except (AMQPConnectionError, ChannelClosedByBroker, ConnectionClosedByBroker, StreamLostError) as e: + log.error(f"I tried to connect with the RabbitMQ server but failed with: {e}") + raise + except Exception as e: + log.error(f"Unexpected error caught: {e}") + + @retry( + retry=retry_if_exception_type(( + AMQPConnectionError, + ChannelClosedByBroker, + ConnectionClosedByBroker, + StreamLostError, + )), + stop=stop_after_attempt(3), + wait=wait_fixed(2), + before_sleep=before_sleep_log(log, WARNING) + ) + def start_consumer(self, + queue_name: str, + callback: Callable | None = None, + callback_args: dict[str, str | int | float | bool] | None = None, + auto_ack: bool = True, + inactivity_timeout: int | None = None, # just let conusmer wait patiently damn it + auto_declare: bool = True, + exchange_name: str | None = None, + exchange_type: str | None = None, + routing_key: str | None = None, + payload_model: Type | None = None, + dlx_enable: bool = True, + dlx_exchange_name: str | None = None, + dlx_routing_key: str | None = None, + use_quorum_queues: bool = True, + enable_retry_cycles: bool = True, + retry_cycle_interval: int = 10, # Minutes between cycles + max_retry_time_limit: int = 60, # Minutes total before permanent DLX + max_queue_length: int | None = None, + max_queue_length_bytes: int | None = None, + queue_overflow: str | None = None, + single_active_consumer: bool | None = None, + lazy_queue: bool | None = None, + ) -> None: + """ + Start the consumer using blocking setup. + :param str queue_name: The queue to consume from + :param Callable callback: The callback function to process messages + :param dict callback_args: Optional arguments to pass to the callback + :param bool auto_ack: If True, messages are automatically acknowledged + :param int inactivity_timeout: Timeout for inactivity in the consumer loop + :param bool auto_declare: If True, will declare exchange/queue before consuming + :param bool passive: If True, only check if exchange/queue exists (False for consumers) + :param str exchange_name: Exchange name for auto_declare + :param str exchange_type: Exchange type for auto_declare + :param str routing_key: Routing key for auto_declare + :param Type payload_model: Pydantic model for payload validation + :param bool dlx_enable: Enable dead letter exchange + :param str dlx_exchange_name: Custom DLX exchange name + :param str dlx_routing_key: Custom DLX routing key + :param bool use_quorum_queues: Use quorum queues for durability + :param bool enable_retry_cycles: Enable DLX retry cycles + :param int retry_cycle_interval: Minutes between retry cycles + :param int max_retry_time_limit: Minutes total before permanent DLX + :param int immediate_retry_delay: Seconds between immediate retries + :param int max_queue_length: Maximum number of messages in queue + :param int max_queue_length_bytes: Maximum queue size in bytes + :param str queue_overflow: "drop-head" or "reject-publish" + :param bool single_active_consumer: Only one consumer processes at a time + :param bool lazy_queue: Store messages on disk to save memory + """ + # Connect and start the I/O loop + self.setup_blocking_connection() + + if auto_declare: + if None in (exchange_name, queue_name, exchange_type, routing_key): + raise TypeError('Make sure that you are passing in all the necessary args for auto_declare') + + self._setup_exchange_and_queue( + exchange_name=exchange_name, + queue_name=queue_name, + exchange_type=exchange_type, + routing_key=routing_key, + dlx_enable=dlx_enable, + dlx_exchange_name=dlx_exchange_name, + dlx_routing_key=dlx_routing_key, + use_quorum_queues=use_quorum_queues, + max_queue_length=max_queue_length, + max_queue_length_bytes=max_queue_length_bytes, + queue_overflow=queue_overflow, + single_active_consumer=single_active_consumer, + lazy_queue=lazy_queue + ) + + if not self.auto_declare_ok: + raise MrsalAbortedSetup('Auto declaration for the connection setup failed and is aborted') + + # Log consumer configuration + consumer_config = { + "queue": queue_name, + "exchange": exchange_name, + "max_length": max_queue_length or self.max_queue_length, + "overflow": queue_overflow or self.queue_overflow, + "single_consumer": single_active_consumer if single_active_consumer is not None else self.single_active_consumer, + "lazy": lazy_queue if lazy_queue is not None else self.lazy_queue + } + + log.info(f"Straight out of the swamps -- consumer boi listening with config: {consumer_config}") + + try: + for method_frame, properties, body in self._channel.consume( + queue=queue_name, auto_ack=auto_ack, inactivity_timeout=inactivity_timeout): + if method_frame: + if properties: + app_id = properties.app_id if hasattr(properties, 'app_id') else 'no AppID given' + msg_id = properties.message_id if hasattr(properties, 'message_id') else 'no msgID given' + delivery_tag = method_frame.delivery_tag + + if self.verbose: + log.info( + f""" + Message received with: + - Method Frame: {method_frame} + - Redelivery: {method_frame.redelivered} + - Exchange: {method_frame.exchange} + - Routing Key: {method_frame.routing_key} + - Delivery Tag: {method_frame.delivery_tag} + - Properties: {properties} + - Auto Ack: {auto_ack} + """ + ) + if auto_ack: + log.info(f'I successfully received a message with AutoAck from: {app_id} with messageID: {msg_id}') + + current_retry = properties.headers.get('x-delivery-count', 0) if properties.headers else 0 + log.info(f"Current retry is: {current_retry}") + should_process = True + + if payload_model: + try: + self.validate_payload(body, payload_model) + except (ValidationError, json.JSONDecodeError, UnicodeDecodeError, TypeError) as e: + log.error(f"Oh lordy lord, payload validation failed for your specific model requirements: {e}") + should_process = False + + if callback and should_process: + try: + if callback_args: + callback(*callback_args, method_frame, properties, body) + else: + callback(method_frame, properties, body) + except Exception as e: + log.error(f"Callback method failure: {e}") + log.error(f"Oh lordy lord message {msg_id} from {app_id} failed while running callback") + should_process = False + + if not should_process and not auto_ack: + if dlx_enable and enable_retry_cycles: + # Use retry cycle logic + self._publish_to_dlx_with_retry_cycle( + method_frame, properties, body, "Callback processing failed", + exchange_name, routing_key, enable_retry_cycles, + retry_cycle_interval, max_retry_time_limit, dlx_exchange_name + ) + elif dlx_enable: + # Original DLX behavior + self._channel.basic_nack(delivery_tag=delivery_tag, requeue=False) + log.warning(f"Message {msg_id} sent to dead letter exchange after {current_retry} retries") + log.info(f"Its the fault of polictial ideology imposing! Strangle an influencer if it makes you feel better!") + else: + self._channel.basic_nack(delivery_tag=delivery_tag, requeue=False) + log.warning(f"No dead letter exchange declared for {queue_name}, proceeding to drop the message -- reflect on your life choices! byebye") + log.info(f"Dropped message content: {body}") + continue + + if not auto_ack and should_process: + log.info(f'Message ({msg_id}) from {app_id} received and properly processed -- now dance the funky chicken') + self._channel.basic_ack(delivery_tag=delivery_tag) + + except (AMQPConnectionError, ConnectionClosedByBroker, StreamLostError) as e: + log.error(f"Ooooooopsie! I caught a connection error while consuming: {e}") + raise + except Exception as e: + log.error(f'Oh lordy lord! I failed consuming ze messaj with: {e}') + + @retry( + retry=retry_if_exception_type(( + NackError, + UnroutableError + )), + stop=stop_after_attempt(3), + wait=wait_fixed(2), + before_sleep=before_sleep_log(log, WARNING) + ) + def publish_message( + self, + exchange_name: str, + routing_key: str, + message: str | bytes | None, + exchange_type: str, + queue_name: str, + auto_declare: bool = True, passive: bool = True, - prop: pika.BasicProperties | None = None, - ) -> None: - """Publish message to the exchange specifying routing key and properties. - - :param str exchange: The exchange to publish to - :param str routing_key: The routing key to bind on - :param bytes body: The message body; empty string if no body - :param pika.spec.BasicProperties properties: message properties - :param bool fast_setup: - - when True, will the method create the specified exchange, queue and bind them together using the routing kye. - - If False, this method will check if the specified exchange and queue already exist before publishing. - - :raises UnroutableError: raised when a message published in publisher-acknowledgments mode (see `BlockingChannel.confirm_delivery`) is returned via `Basic.Return` followed by `Basic.Ack`. - :raises NackError: raised when a message published in publisher-acknowledgements mode is Nack'ed by the broker. See `BlockingChannel.confirm_delivery`. - """ - - if not isinstance(message, (str, bytes)): - raise MrsalAbortedSetup(f'Your message body needs to be string or bytes or serialized dict') - # connect and use only blocking - self.setup_blocking_connection() - - if auto_declare: - if None in (exchange_name, queue_name, exchange_type, routing_key): - raise TypeError('Make sure that you are passing in all the necessary args for auto_declare') - - self._setup_exchange_and_queue( - exchange_name=exchange_name, - queue_name=queue_name, - exchange_type=exchange_type, - routing_key=routing_key, - passive=passive - ) - try: - # Publish the message by serializing it in json dump - # NOTE! we are not dumping a json anymore here! This allows for more flexibility - self._channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=message, properties=prop) - log.info(f"The message ({message!r}) is published to the exchange {exchange_name} with the routing key {routing_key}") - - except UnroutableError as e: - log.error(f"Producer could not publish message:{message!r} to the exchange {exchange_name} with a routing key {routing_key}: {e}", exc_info=True) - raise - except NackError as e: - log.error(f"Message NACKed by broker: {e}") - raise - except Exception as e: - log.error(f"Unexpected error while publishing message: {e}") - - @retry( - retry=retry_if_exception_type(( - NackError, - UnroutableError - )), - stop=stop_after_attempt(3), - wait=wait_fixed(2), - before_sleep=before_sleep_log(log, WARNING) - ) - def publish_messages( - self, - mrsal_protocol_collection: dict[str, dict[str, str | bytes]], - prop: pika.BasicProperties | None = None, - auto_declare: bool = True, - passive: bool = True - ) -> None: - """Publish message to the exchange specifying routing key and properties. - - mrsal_protocol_collection : dict[str, dict[str, str | bytes]] - This is a collection of the protcols needed for publishing to multiple exhanges at once - - expected collection: { - inbound_app_1: {message: bytes | str, routing_key: str, queue_name: str, exchange_type: str, exchange_name: str}, - inbound_app_2: {message: bytes | str, routing_key: str, queue_name: str, exchange_type: str, exchange_name: str}, - ., - . - } - - :raises UnroutableError: raised when a message published in publisher-acknowledgments mode (see `BlockingChannel.confirm_delivery`) is returned via `Basic.Return` followed by `Basic.Ack`. - :raises NackError: raised when a message published in publisher-acknowledgements mode is Nack'ed by the broker. See `BlockingChannel.confirm_delivery`. - """ - - for inbound_app_id, mrsal_protocol in mrsal_protocol_collection.items(): - protocol = MrsalProtocol(**mrsal_protocol) - - if not isinstance(protocol.message, (str, bytes)): - raise MrsalAbortedSetup(f'Your message body needs to be string or bytes or serialized dict') - - # connect and use only blocking - self.setup_blocking_connection() - if auto_declare: - self._setup_exchange_and_queue( - exchange_name=protocol.exchange_name, - queue_name=protocol.queue_name, - exchange_type=protocol.exchange_type, - routing_key=protocol.routing_key, - passive=passive - ) - try: - # Publish the message by serializing it in json dump - # NOTE! we are not dumping a json anymore here! This allows for more flexibility - self._channel.basic_publish( - exchange=protocol.exchange_name, - routing_key=protocol.routing_key, - body=protocol.message, - properties=prop - ) - log.info(f"The message for inbound app {inbound_app_id} with message -- ({protocol.message!r}) is published to the exchange {protocol.exchange_name} with the routing key {protocol.routing_key}. Oh baby baby") - - except UnroutableError as e: - log.error(f"Producer could not publish message:{protocol.message!r} to the exchange {protocol.exchange_name} with a routing key {protocol.routing_key}: {e}", exc_info=True) - raise - except NackError as e: - log.error(f"Message NACKed by broker: {e}") - raise - except Exception as e: - log.error(f"Unexpected error while publishing message: {e}") - - def _publish_to_dlx_with_retry_cycle( - self, - method_frame, properties, body, processing_error: str, - original_exchange: str, original_routing_key: str, - enable_retry_cycles: bool, retry_cycle_interval: int, - max_retry_time_limit: int, dlx_exchange_name: str | None): - """Publish message to DLX with retry cycle headers.""" - try: - # Use common logic from superclass - self._handle_dlx_with_retry_cycle_sync( - method_frame=method_frame, - properties=properties, - body=body, - processing_error=processing_error, - original_exchange=original_exchange, - original_routing_key=original_routing_key, - enable_retry_cycles=enable_retry_cycles, - retry_cycle_interval=retry_cycle_interval, - max_retry_time_limit=max_retry_time_limit, - dlx_exchange_name=dlx_exchange_name - ) - - # Acknowledge original message - self._channel.basic_ack(delivery_tag=method_frame.delivery_tag) - - except Exception as e: - log.error(f"Failed to send message to DLX: {e}") - self._channel.basic_nack(delivery_tag=method_frame.delivery_tag, requeue=True) - - def _publish_to_dlx(self, dlx_exchange: str, routing_key: str, body: bytes, properties: dict): - """Blocking implementation of DLX publishing.""" - # Convert properties dict to pika.BasicProperties - pika_properties = pika.BasicProperties( - headers=properties.get('headers'), - delivery_mode=properties.get('delivery_mode', 2), - content_type=properties.get('content_type', 'application/json'), - expiration=properties.get('expiration') - ) - - self._channel.basic_publish( - exchange=dlx_exchange, - routing_key=routing_key, - body=body, - properties=pika_properties - ) + prop: pika.BasicProperties | None = None, + ) -> None: + """Publish message to the exchange specifying routing key and properties. + + :param str exchange: The exchange to publish to + :param str routing_key: The routing key to bind on + :param bytes body: The message body; empty string if no body + :param pika.spec.BasicProperties properties: message properties + :param bool fast_setup: + - when True, will the method create the specified exchange, queue and bind them together using the routing kye. + - If False, this method will check if the specified exchange and queue already exist before publishing. + + :raises UnroutableError: raised when a message published in publisher-acknowledgments mode (see `BlockingChannel.confirm_delivery`) is returned via `Basic.Return` followed by `Basic.Ack`. + :raises NackError: raised when a message published in publisher-acknowledgements mode is Nack'ed by the broker. See `BlockingChannel.confirm_delivery`. + """ + + if not isinstance(message, (str, bytes)): + raise MrsalAbortedSetup(f'Your message body needs to be string or bytes or serialized dict') + # connect and use only blocking + self.setup_blocking_connection() + + if auto_declare: + if None in (exchange_name, queue_name, exchange_type, routing_key): + raise TypeError('Make sure that you are passing in all the necessary args for auto_declare') + + self._setup_exchange_and_queue( + exchange_name=exchange_name, + queue_name=queue_name, + exchange_type=exchange_type, + routing_key=routing_key, + passive=passive + ) + try: + # Publish the message by serializing it in json dump + # NOTE! we are not dumping a json anymore here! This allows for more flexibility + self._channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=message, properties=prop) + log.info(f"The message ({message!r}) is published to the exchange {exchange_name} with the routing key {routing_key}") + + except UnroutableError as e: + log.error(f"Producer could not publish message:{message!r} to the exchange {exchange_name} with a routing key {routing_key}: {e}", exc_info=True) + raise + except NackError as e: + log.error(f"Message NACKed by broker: {e}") + raise + except Exception as e: + log.error(f"Unexpected error while publishing message: {e}") + + @retry( + retry=retry_if_exception_type(( + NackError, + UnroutableError + )), + stop=stop_after_attempt(3), + wait=wait_fixed(2), + before_sleep=before_sleep_log(log, WARNING) + ) + def publish_messages( + self, + mrsal_protocol_collection: dict[str, dict[str, str | bytes]], + prop: pika.BasicProperties | None = None, + auto_declare: bool = True, + passive: bool = True + ) -> None: + """Publish message to the exchange specifying routing key and properties. + + mrsal_protocol_collection : dict[str, dict[str, str | bytes]] + This is a collection of the protcols needed for publishing to multiple exhanges at once + + expected collection: { + inbound_app_1: {message: bytes | str, routing_key: str, queue_name: str, exchange_type: str, exchange_name: str}, + inbound_app_2: {message: bytes | str, routing_key: str, queue_name: str, exchange_type: str, exchange_name: str}, + ., + . + } + + :raises UnroutableError: raised when a message published in publisher-acknowledgments mode (see `BlockingChannel.confirm_delivery`) is returned via `Basic.Return` followed by `Basic.Ack`. + :raises NackError: raised when a message published in publisher-acknowledgements mode is Nack'ed by the broker. See `BlockingChannel.confirm_delivery`. + """ + + for inbound_app_id, mrsal_protocol in mrsal_protocol_collection.items(): + protocol = MrsalProtocol(**mrsal_protocol) + + if not isinstance(protocol.message, (str, bytes)): + raise MrsalAbortedSetup(f'Your message body needs to be string or bytes or serialized dict') + + # connect and use only blocking + self.setup_blocking_connection() + if auto_declare: + self._setup_exchange_and_queue( + exchange_name=protocol.exchange_name, + queue_name=protocol.queue_name, + exchange_type=protocol.exchange_type, + routing_key=protocol.routing_key, + passive=passive + ) + try: + # Publish the message by serializing it in json dump + # NOTE! we are not dumping a json anymore here! This allows for more flexibility + self._channel.basic_publish( + exchange=protocol.exchange_name, + routing_key=protocol.routing_key, + body=protocol.message, + properties=prop + ) + log.info(f"The message for inbound app {inbound_app_id} with message -- ({protocol.message!r}) is published to the exchange {protocol.exchange_name} with the routing key {protocol.routing_key}. Oh baby baby") + + except UnroutableError as e: + log.error(f"Producer could not publish message:{protocol.message!r} to the exchange {protocol.exchange_name} with a routing key {protocol.routing_key}: {e}", exc_info=True) + raise + except NackError as e: + log.error(f"Message NACKed by broker: {e}") + raise + except Exception as e: + log.error(f"Unexpected error while publishing message: {e}") + + def _publish_to_dlx_with_retry_cycle( + self, + method_frame, properties, body, processing_error: str, + original_exchange: str, original_routing_key: str, + enable_retry_cycles: bool, retry_cycle_interval: int, + max_retry_time_limit: int, dlx_exchange_name: str | None): + """Publish message to DLX with retry cycle headers.""" + try: + # Use common logic from superclass + self._handle_dlx_with_retry_cycle_sync( + method_frame=method_frame, + properties=properties, + body=body, + processing_error=processing_error, + original_exchange=original_exchange, + original_routing_key=original_routing_key, + enable_retry_cycles=enable_retry_cycles, + retry_cycle_interval=retry_cycle_interval, + max_retry_time_limit=max_retry_time_limit, + dlx_exchange_name=dlx_exchange_name + ) + + # Acknowledge original message + self._channel.basic_ack(delivery_tag=method_frame.delivery_tag) + + except Exception as e: + log.error(f"Failed to send message to DLX: {e}") + self._channel.basic_nack(delivery_tag=method_frame.delivery_tag, requeue=True) + + def _publish_to_dlx(self, dlx_exchange: str, routing_key: str, body: bytes, properties: dict): + """Blocking implementation of DLX publishing.""" + # Convert properties dict to pika.BasicProperties + pika_properties = pika.BasicProperties( + headers=properties.get('headers'), + delivery_mode=properties.get('delivery_mode', 2), + content_type=properties.get('content_type', 'application/json'), + expiration=properties.get('expiration') + ) + + self._channel.basic_publish( + exchange=dlx_exchange, + routing_key=routing_key, + body=body, + properties=pika_properties + ) class MrsalAsyncAMQP(Mrsal): - """Handles asynchronous connection with RabbitMQ using aio-pika.""" - async def setup_async_connection(self): - """Setup an asynchronous connection to RabbitMQ using aio-pika.""" - log.info(f"Establishing async connection to RabbitMQ on {self.host}:{self.port}") - try: - self._connection = await connect_robust( - host=self.host, - port=self.port, - login=self.credentials[0], - password=self.credentials[1], - virtualhost=self.virtual_host, - ssl=self.ssl, - ssl_context=self.get_ssl_context(), - heartbeat=self.heartbeat - ) - self._channel = await self._connection.channel() - await self._channel.set_qos(prefetch_count=self.prefetch_count) - log.info("Async connection established successfully.") - except (AMQPConnectionError, StreamLostError, ChannelClosedByBroker, ConnectionClosedByBroker) as e: - log.error(f"Error establishing async connection: {e}", exc_info=True) - raise - except Exception as e: - log.error(f'Oh my lordy lord! I caugth an unexpected exception while trying to connect: {e}', exc_info=True) - - @retry( - retry=retry_if_exception_type(( - AMQPConnectionError, - ChannelClosedByBroker, - ConnectionClosedByBroker, - StreamLostError, - )), - stop=stop_after_attempt(3), - wait=wait_fixed(2), - before_sleep=before_sleep_log(log, WARNING) - ) - async def start_consumer( - self, - queue_name: str, - callback: Callable | None = None, - callback_args: dict[str, str | int | float | bool] | None = None, - auto_ack: bool = False, - auto_declare: bool = True, - exchange_name: str | None = None, - exchange_type: str | None = None, - routing_key: str | None = None, - payload_model: Type | None = None, - requeue: bool = True, - dlx_enable: bool = True, - dlx_exchange_name: str | None = None, - dlx_routing_key: str | None = None, - use_quorum_queues: bool = True, - enable_retry_cycles: bool = True, - retry_cycle_interval: int = 10, # Minutes between cycles - max_retry_time_limit: int = 60, # Minutes total before permanent DLX - immediate_retry_delay: int = 4, # Seconds between immediate retries - max_queue_length: int | None = None, - max_queue_length_bytes: int | None = None, - queue_overflow: str | None = None, - single_active_consumer: bool | None = None, - lazy_queue: bool | None = None, - - ): - """Start the async consumer with the provided setup.""" - retry_counts = {} - - # Check if there's a connection; if not, create one - try: - asyncio.get_running_loop() - except RuntimeError: - raise MrsalNoAsyncioLoopError(f'Young grasshopper! You forget to add asyncio.run(mrsal.start_consumer(...))') - if not self._connection: - await self.setup_async_connection() - - - self._channel: AioChannel = await self._connection.channel() - await self._channel.set_qos(prefetch_count=self.prefetch_count) - - if auto_declare: - if None in (exchange_name, queue_name, exchange_type, routing_key): - raise TypeError('Make sure that you are passing in all the necessary args for auto_declare') - - queue = await self._async_setup_exchange_and_queue( - exchange_name=exchange_name, - queue_name=queue_name, - exchange_type=exchange_type, - routing_key=routing_key, - dlx_enable=dlx_enable, - dlx_exchange_name=dlx_exchange_name, - dlx_routing_key=dlx_routing_key, - use_quorum_queues=use_quorum_queues, - max_queue_length=max_queue_length, - max_queue_length_bytes=max_queue_length_bytes, - queue_overflow=queue_overflow, - single_active_consumer=single_active_consumer, - lazy_queue=lazy_queue - ) - - if not self.auto_declare_ok: - if self._connection: - await self._connection.close() - raise MrsalAbortedSetup('Auto declaration failed during setup.') - - # Log consumer configuration - consumer_config = { - "queue": queue_name, - "exchange": exchange_name, - "max_length": max_queue_length or self.max_queue_length, - "overflow": queue_overflow or self.queue_overflow, - "single_consumer": single_active_consumer if single_active_consumer is not None else self.single_active_consumer, - "lazy": lazy_queue if lazy_queue is not None else self.lazy_queue - } - - log.info(f"Straight out of the swamps -- consumer boi listening with config: {consumer_config}") - - # async with queue.iterator() as queue_iter: - async for message in queue.iterator(): - if message is None: - continue - - # Extract message metadata - delivery_tag = message.delivery_tag - app_id = message.app_id if hasattr(message, 'app_id') else 'NoAppID' - msg_id = message.app_id if hasattr(message, 'message_id') else 'NoMsgID' - - # add this so it is in line with Pikas awkawrdly old ways - properties = config.AioPikaAttributes(app_id=app_id, message_id=msg_id) - properties.headers = message.headers - - if self.verbose: - log.info(f""" - Message received with: - - Redelivery: {message.redelivered} - - Exchange: {message.exchange} - - Routing Key: {message.routing_key} - - Delivery Tag: {message.delivery_tag} - - Requeue: {requeue} - - Auto Ack: {auto_ack} - """) - - if auto_ack: - await message.ack() - log.info(f'I successfully received a message from: {app_id} with messageID: {msg_id}') - - current_retry = retry_counts.get(delivery_tag, 0) - should_process = True - - if payload_model: - try: - self.validate_payload(message.body, payload_model) - except (ValidationError, json.JSONDecodeError, UnicodeDecodeError, TypeError) as e: - log.error(f"Payload validation failed: {e}", exc_info=True) - should_process = False - - if callback and should_process: - try: - if callback_args: - await callback(*callback_args, message, properties, message.body) - else: - await callback(message, properties, message.body) - except Exception as e: - log.error(f"Splæt! Error processing message with callback: {e}", exc_info=True) - should_process = False - - if not should_process and not auto_ack: - if current_retry < self.max_retries and requeue: - # Increment retry count and requeue - retry_counts[delivery_tag] = current_retry + 1 - # Note: aio-pika doesn't allow modifying headers after receiving, - # so we rely on the broker's redelivery mechanism - await message.reject(requeue=True) - log.info(f"Message {msg_id} requeued (attempt {current_retry}/{self.max_retries})") - # Add delay before next retry - if current_retry + 1 < self.max_retries: - await asyncio.sleep(immediate_retry_delay) - else: - # Max retries reached or requeue disabled - retry_counts.pop(delivery_tag, None) - if dlx_enable and enable_retry_cycles: - # Use retry cycle logic - await self._async_publish_to_dlx_with_retry_cycle( - message, properties, "Callback processing failed", - exchange_name, routing_key, enable_retry_cycles, - retry_cycle_interval, max_retry_time_limit, dlx_exchange_name - ) - elif dlx_enable: - # Original DLX behavior - await message.reject(requeue=False) - log.warning(f"Message {msg_id} sent to dead letter exchange after {current_retry} retries") - else: - await message.reject(requeue=False) - log.warning(f"No dead letter exchange for {queue_name} declared, proceeding to drop the message -- Ponder you life choices! byebye") - log.info(f"Dropped message content: {message.body}") - continue - - if not auto_ack: - retry_counts.pop(delivery_tag, None) - await message.ack() - log.info(f'Young grasshopper! Message ({msg_id}) from {app_id} received and properly processed.') - - async def _async_publish_to_dlx_with_retry_cycle(self, message, properties, processing_error: str, - original_exchange: str, original_routing_key: str, - enable_retry_cycles: bool, retry_cycle_interval: int, - max_retry_time_limit: int, dlx_exchange_name: str | None): - """Async publish message to DLX with retry cycle headers.""" - try: - # Use common logic from superclass - await self._handle_dlx_with_retry_cycle_async( - message=message, - properties=properties, - processing_error=processing_error, - original_exchange=original_exchange, - original_routing_key=original_routing_key, - enable_retry_cycles=enable_retry_cycles, - retry_cycle_interval=retry_cycle_interval, - max_retry_time_limit=max_retry_time_limit, - dlx_exchange_name=dlx_exchange_name - ) - - # Acknowledge original message - await message.ack() - - except Exception as e: - log.error(f"Failed to send message to DLX: {e}") - await message.reject(requeue=True) - - async def _publish_to_dlx(self, dlx_exchange: str, routing_key: str, body: bytes, properties: dict): - """Async implementation of DLX publishing.""" - - # Create aio-pika message - message = Message( - body, - headers=properties.get('headers'), - content_type=properties.get('content_type', 'application/json'), - delivery_mode=properties.get('delivery_mode', 2) - ) - - # Set expiration if provided - if 'expiration' in properties: - message.expiration = int(properties['expiration']) - - # Get exchange and publish - exchange = await self._channel.get_exchange(dlx_exchange) - await exchange.publish(message, routing_key=routing_key) + """Handles asynchronous connection with RabbitMQ using aio-pika.""" + async def setup_async_connection(self): + """Setup an asynchronous connection to RabbitMQ using aio-pika.""" + log.info(f"Establishing async connection to RabbitMQ on {self.host}:{self.port}") + try: + self._connection = await connect_robust( + host=self.host, + port=self.port, + login=self.credentials[0], + password=self.credentials[1], + virtualhost=self.virtual_host, + ssl=self.ssl, + ssl_context=self.get_ssl_context(), + heartbeat=self.heartbeat + ) + self._channel = await self._connection.channel() + await self._channel.set_qos(prefetch_count=self.prefetch_count) + log.info("Async connection established successfully.") + except (AMQPConnectionError, StreamLostError, ChannelClosedByBroker, ConnectionClosedByBroker) as e: + log.error(f"Error establishing async connection: {e}", exc_info=True) + raise + except Exception as e: + log.error(f'Oh my lordy lord! I caugth an unexpected exception while trying to connect: {e}', exc_info=True) + + @retry( + retry=retry_if_exception_type(( + AMQPConnectionError, + ChannelClosedByBroker, + ConnectionClosedByBroker, + StreamLostError, + )), + stop=stop_after_attempt(3), + wait=wait_fixed(2), + before_sleep=before_sleep_log(log, WARNING) + ) + async def start_consumer( + self, + queue_name: str, + callback: Callable | None = None, + callback_args: dict[str, str | int | float | bool] | None = None, + auto_ack: bool = False, + auto_declare: bool = True, + exchange_name: str | None = None, + exchange_type: str | None = None, + routing_key: str | None = None, + payload_model: Type | None = None, + dlx_enable: bool = True, + dlx_exchange_name: str | None = None, + dlx_routing_key: str | None = None, + use_quorum_queues: bool = True, + enable_retry_cycles: bool = True, + retry_cycle_interval: int = 10, # Minutes between cycles + max_retry_time_limit: int = 60, # Minutes total before permanent DLX + max_queue_length: int | None = None, + max_queue_length_bytes: int | None = None, + queue_overflow: str | None = None, + single_active_consumer: bool | None = None, + lazy_queue: bool | None = None, + + ): + """Start the async consumer with the provided setup.""" + # Check if there's a connection; if not, create one + try: + asyncio.get_running_loop() + except RuntimeError: + raise MrsalNoAsyncioLoopError(f'Young grasshopper! You forget to add asyncio.run(mrsal.start_consumer(...))') + if not self._connection: + await self.setup_async_connection() + + + self._channel: AioChannel = await self._connection.channel() + await self._channel.set_qos(prefetch_count=self.prefetch_count) + + if auto_declare: + if None in (exchange_name, queue_name, exchange_type, routing_key): + raise TypeError('Make sure that you are passing in all the necessary args for auto_declare') + + queue = await self._async_setup_exchange_and_queue( + exchange_name=exchange_name, + queue_name=queue_name, + exchange_type=exchange_type, + routing_key=routing_key, + dlx_enable=dlx_enable, + dlx_exchange_name=dlx_exchange_name, + dlx_routing_key=dlx_routing_key, + use_quorum_queues=use_quorum_queues, + max_queue_length=max_queue_length, + max_queue_length_bytes=max_queue_length_bytes, + queue_overflow=queue_overflow, + single_active_consumer=single_active_consumer, + lazy_queue=lazy_queue + ) + + if not self.auto_declare_ok: + if self._connection: + await self._connection.close() + raise MrsalAbortedSetup('Auto declaration failed during setup.') + + # Log consumer configuration + consumer_config = { + "queue": queue_name, + "exchange": exchange_name, + "max_length": max_queue_length or self.max_queue_length, + "overflow": queue_overflow or self.queue_overflow, + "single_consumer": single_active_consumer if single_active_consumer is not None else self.single_active_consumer, + "lazy": lazy_queue if lazy_queue is not None else self.lazy_queue + } + + log.info(f"Straight out of the swamps -- consumer boi listening with config: {consumer_config}") + + # async with queue.iterator() as queue_iter: + async for message in queue.iterator(): + if message is None: + continue + + # Extract message metadata + delivery_tag = message.delivery_tag + app_id = message.app_id if hasattr(message, 'app_id') else 'NoAppID' + msg_id = message.app_id if hasattr(message, 'message_id') else 'NoMsgID' + + # add this so it is in line with Pikas awkawrdly old ways + properties = config.AioPikaAttributes(app_id=app_id, message_id=msg_id) + properties.headers = message.headers + + if self.verbose: + log.info(f""" + Message received with: + - Redelivery: {message.redelivered} + - Exchange: {message.exchange} + - Routing Key: {message.routing_key} + - Delivery Tag: {message.delivery_tag} + - Auto Ack: {auto_ack} + """) + + if auto_ack: + await message.ack() + log.info(f'I successfully received a message from: {app_id} with messageID: {msg_id}') + + current_retry = message.headers.get('x-delivery-count', 0) if message.headers else 0 + should_process = True + + if payload_model: + try: + self.validate_payload(message.body, payload_model) + except (ValidationError, json.JSONDecodeError, UnicodeDecodeError, TypeError) as e: + log.error(f"Payload validation failed: {e}", exc_info=True) + should_process = False + + if callback and should_process: + try: + if callback_args: + await callback(*callback_args, message, properties, message.body) + else: + await callback(message, properties, message.body) + except Exception as e: + log.error(f"Splæt! Error processing message with callback: {e}", exc_info=True) + should_process = False + + if not should_process and not auto_ack: + if dlx_enable and enable_retry_cycles: + # Use retry cycle logic + await self._async_publish_to_dlx_with_retry_cycle( + message, properties, "Callback processing failed", + exchange_name, routing_key, enable_retry_cycles, + retry_cycle_interval, max_retry_time_limit, dlx_exchange_name + ) + elif dlx_enable: + # Original DLX behavior + await message.reject(requeue=False) + log.warning(f"Message {msg_id} sent to dead letter exchange after {current_retry} retries") + else: + await message.reject(requeue=False) + log.warning(f"No dead letter exchange for {queue_name} declared, proceeding to drop the message -- Ponder you life choices! byebye") + log.info(f"Dropped message content: {message.body}") + continue + + if not auto_ack: + await message.ack() + log.info(f'Young grasshopper! Message ({msg_id}) from {app_id} received and properly processed.') + + async def _async_publish_to_dlx_with_retry_cycle(self, message, properties, processing_error: str, + original_exchange: str, original_routing_key: str, + enable_retry_cycles: bool, retry_cycle_interval: int, + max_retry_time_limit: int, dlx_exchange_name: str | None): + """Async publish message to DLX with retry cycle headers.""" + try: + # Use common logic from superclass + await self._handle_dlx_with_retry_cycle_async( + message=message, + properties=properties, + processing_error=processing_error, + original_exchange=original_exchange, + original_routing_key=original_routing_key, + enable_retry_cycles=enable_retry_cycles, + retry_cycle_interval=retry_cycle_interval, + max_retry_time_limit=max_retry_time_limit, + dlx_exchange_name=dlx_exchange_name + ) + + # Acknowledge original message + await message.ack() + + except Exception as e: + log.error(f"Failed to send message to DLX: {e}") + await message.reject(requeue=True) + + async def _publish_to_dlx(self, dlx_exchange: str, routing_key: str, body: bytes, properties: dict): + """Async implementation of DLX publishing.""" + + # Create aio-pika message + message = Message( + body, + headers=properties.get('headers'), + content_type=properties.get('content_type', 'application/json'), + delivery_mode=properties.get('delivery_mode', 2) + ) + + # Set expiration if provided + if 'expiration' in properties: + message.expiration = int(properties['expiration']) + + # Get exchange and publish + exchange = await self._channel.get_exchange(dlx_exchange) + await exchange.publish(message, routing_key=routing_key) diff --git a/mrsal/superclass.py b/mrsal/superclass.py index ecc1e32..cf53230 100644 --- a/mrsal/superclass.py +++ b/mrsal/superclass.py @@ -49,7 +49,6 @@ class Mrsal: heartbeat: int = 60 # sec dlx_enable: bool = True dlx_exchange_name = None - max_retries: int = 3 use_quorum_queues: bool = True max_queue_length: int = 10000 # Good default for most use cases max_queue_length_bytes: int | None = None # Optional memory limit diff --git a/pyproject.toml b/pyproject.toml index 0b4317b..6a2db74 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ license = "GPL-3.0-or-later" maintainers = ["Jon E Nesvold "] name = "mrsal" readme = "README.md" -version = "2.1.1" +version = "3.0.0" homepage = "https://github.com/NeoMedSys/mrsal" repository = "https://github.com/NeoMedSys/mrsal" documentation = "https://neomedsys.github.io/mrsal/" diff --git a/reports/coverage/.coverage b/reports/coverage/.coverage index b4bab4b..e5b5b51 100644 Binary files a/reports/coverage/.coverage and b/reports/coverage/.coverage differ diff --git a/reports/coverage/coverage.xml b/reports/coverage/coverage.xml index 12bd931..349a963 100644 --- a/reports/coverage/coverage.xml +++ b/reports/coverage/coverage.xml @@ -1,6 +1,6 @@ - - + + /home/runner/work/mrsal/mrsal @@ -79,7 +79,7 @@ - + @@ -118,7 +118,7 @@ - + @@ -155,13 +155,13 @@ - + - + - - + + @@ -169,34 +169,34 @@ - - - + + + - - + + - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + - - - - + + + + @@ -204,212 +204,211 @@ - + - - - - + + + + - + - + - - - + + + - - + + - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + - - - - - + + + + + - - + + - + - - - - - + + + + + - + - - - - - - - + + + + + + + - - + + - - - - - + + + + + - + - - - - - - - + + + + + + + - - + + - - - - + + + + - + - - - - - - - + + + + + + + - + - - + + - - + + - + - - - - + + + + - + - + - - - + + + - - - - + + + + - + - - - - + + + + - - + + - - - - - - - + + + + + + + - - + + - - - + + + - + - + - - + + - - + + - - - + + + - + - + - - + + - - + + - - - - + + + - + - + @@ -420,257 +419,235 @@ - + - + - - + + - - - - + + + + - - + + - - - - - - - - - - - - + + + + + + + + + + + + + + - + - - - - - - + + + + + + + + - - + - - + + - - - - + - + + + + + - - - - - - - - - - + + + + - - - - - - - - - + + + + + + + + + - - - - - - + + + + + + + - - - - - - - - - - - - - + + + + + + + + + - - + + + + + + + + - - - - - - - - - - - + + + + - - - - - - - - - + + + + + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + - - - - - - - - - - - + + + + + + + + + + + + + + + + + - - - + + + + + + + + + - + - - - - - - - - - + + + + + - + - - - + - - - - - - + + - - - - - - - - - - - - - - - - - - - - - + + + + + + - + - - - - @@ -1064,7 +1041,7 @@ - + @@ -1090,233 +1067,160 @@ + + - + - - + + - + - - - - - - - - - - - + + + + + + + + + + - - - + + + + + + + - - - - - - - - - - - + + + + + + + + - - - + - - - - - - - - - + - + + - - - - - - - - + + + + + + + + - - + + + + + + + + - - - - - - - - - - + + + - + + + + - + - + + + + + + + + + + + + - + - - - - - - - - - + + + + + + + + + + - - + - + - - - - - + + + - - - - - + - + - - - + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/reports/junit/junit.xml b/reports/junit/junit.xml index 6388327..ec42f9e 100644 --- a/reports/junit/junit.xml +++ b/reports/junit/junit.xml @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/test_mrsal_dlx_retry.py b/tests/test_mrsal_dlx_retry.py index bcea1e7..94a70b2 100644 --- a/tests/test_mrsal_dlx_retry.py +++ b/tests/test_mrsal_dlx_retry.py @@ -7,585 +7,415 @@ @dataclass class ExpectedPayload: - id: int - name: str - active: bool + id: int + name: str + active: bool class AsyncIteratorMock: - """Mock async iterator for aio-pika queue.iterator()""" - def __init__(self, items): - self.items = iter(items) - - def __aiter__(self): - return self - - async def __anext__(self): - try: - return next(self.items) - except StopIteration: - raise StopAsyncIteration - - -class TestRetryMechanism: - """Test retry mechanism with DLX cycles""" - - @pytest.fixture - def mock_consumer(self): - """Create a mock consumer with mocked connection and channel""" - consumer = MrsalBlockingAMQP( - host="localhost", - port=5672, - credentials=("user", "password"), - virtual_host="testboi", - ssl=False, - verbose=False, - prefetch_count=1, - heartbeat=60, - dlx_enable=True, - max_retries=2, - use_quorum_queues=True, - blocked_connection_timeout=60 - ) - - # Mock connection and channel - consumer._connection = MagicMock() - consumer._channel = MagicMock() - consumer.auto_declare_ok = True - - # Mock setup methods - consumer.setup_blocking_connection = MagicMock() - consumer._setup_exchange_and_queue = MagicMock() - - return consumer - - def test_immediate_retry_params_exist(self, mock_consumer): - """Test that start_consumer accepts immediate retry parameters""" - # Should not raise error when passing immediate retry parameters - mock_consumer._channel.consume.return_value = [] - - try: - mock_consumer.start_consumer( - queue_name="test_queue", - auto_declare=True, - exchange_name="test_exchange", - exchange_type="direct", - routing_key="test_key", - immediate_retry_delay=5, - enable_retry_cycles=True, - retry_cycle_interval=10, - max_retry_time_limit=60 - ) - except Exception as e: - if "unexpected keyword argument" in str(e): - pytest.fail(f"start_consumer doesn't accept retry parameters: {e}") - - def test_retry_cycle_params_exist(self, mock_consumer): - """Test that start_consumer accepts retry cycle parameters""" - mock_consumer._channel.consume.return_value = [] - - try: - mock_consumer.start_consumer( - queue_name="test_queue", - auto_declare=True, - exchange_name="test_exchange", - exchange_type="direct", - routing_key="test_key", - enable_retry_cycles=True, - retry_cycle_interval=15, - max_retry_time_limit=90 - ) - except Exception as e: - if "unexpected keyword argument" in str(e): - pytest.fail(f"start_consumer doesn't accept retry cycle parameters: {e}") - - @patch('mrsal.amqp.subclass.time.sleep') - def test_immediate_retry_delay(self, mock_sleep, mock_consumer): - """Test that immediate retry delay is applied between retries""" - mock_method_frame = MagicMock() - mock_method_frame.delivery_tag = 123 - mock_properties = MagicMock() - mock_properties.message_id = 'test_msg' - invalid_body = b'{"id": "wrong_type", "name": "Test", "active": true}' - - # Create generator that yields same message multiple times (simulating redelivery) - def consume_generator(): - for _ in range(3): # 3 attempts (initial + 2 retries) - yield (mock_method_frame, mock_properties, invalid_body) - - mock_consumer._channel.consume.return_value = consume_generator() - - mock_consumer.start_consumer( - queue_name="test_queue", - callback=Mock(), - auto_ack=False, - auto_declare=True, - exchange_name="test_exchange", - exchange_type="direct", - routing_key="test_key", - payload_model=ExpectedPayload, - immediate_retry_delay=3 # 3 seconds delay - ) - - # Should have called sleep during retries - mock_sleep.assert_called_with(3) - - def test_validation_failure_with_retry_cycles_disabled(self, mock_consumer): - """Test that original DLX behavior is used when retry cycles disabled.""" - mock_method_frame = MagicMock() - mock_method_frame.delivery_tag = 123 - mock_method_frame.routing_key = "test_key" - mock_properties = MagicMock() - mock_properties.message_id = 'test_msg' - mock_properties.app_id = 'test_app' - invalid_body = b'{"id": "wrong_type", "name": "Test", "active": true}' - - # Create generator that simulates redelivery cycle - def consume_generator(): - # Initial delivery + max_retries (2) = 3 total attempts - for _ in range(3): - yield (mock_method_frame, mock_properties, invalid_body) - - mock_consumer._channel.consume.return_value = consume_generator() - - mock_consumer.start_consumer( - queue_name="test_queue", - callback=Mock(), - auto_ack=False, - auto_declare=True, - exchange_name="test_exchange", - exchange_type="direct", - routing_key="test_key", - payload_model=ExpectedPayload, - dlx_enable=True, - enable_retry_cycles=False # Disable retry cycles - ) - - # Should use original behavior (basic_nack with requeue=False) after max retries - nack_calls = mock_consumer._channel.basic_nack.call_args_list - - # First calls should be requeue=True (immediate retries) - for i in range(mock_consumer.max_retries): - assert nack_calls[i][1]['requeue'] == True - - # Final call should be requeue=False (goes to DLX) - final_nack_call = nack_calls[-1] - assert final_nack_call[1]['delivery_tag'] == 123 - assert final_nack_call[1]['requeue'] == False # Goes to DLX - - def test_validation_failure_with_retry_cycles_enabled(self, mock_consumer): - """Test that retry cycle DLX logic is used when enabled""" - mock_method_frame = MagicMock() - mock_method_frame.delivery_tag = 123 - mock_method_frame.routing_key = "test_key" - mock_properties = MagicMock() - mock_properties.message_id = 'test_msg' - mock_properties.app_id = 'test_app' - mock_properties.headers = None - invalid_body = b'{"id": "wrong_type", "name": "Test", "active": true}' - - # Create generator that simulates redelivery cycle - def consume_generator(): - # Initial delivery + max_retries (2) = 3 total attempts - for _ in range(3): - yield (mock_method_frame, mock_properties, invalid_body) - - mock_consumer._channel.consume.return_value = consume_generator() - - # Mock the DLX publishing method - mock_consumer._publish_to_dlx = MagicMock() - - mock_consumer.start_consumer( - queue_name="test_queue", - callback=Mock(), - auto_ack=False, - auto_declare=True, - exchange_name="test_exchange", - exchange_type="direct", - routing_key="test_key", - payload_model=ExpectedPayload, - dlx_enable=True, - enable_retry_cycles=True, # Enable retry cycles - retry_cycle_interval=10, - max_retry_time_limit=60 - ) - - # Should call the DLX retry cycle method instead of basic_nack - mock_consumer._publish_to_dlx.assert_called() - - def test_callback_failure_triggers_retry(self, mock_consumer): - """Test that callback failures trigger the retry mechanism""" - mock_method_frame = MagicMock() - mock_method_frame.delivery_tag = 456 - mock_properties = MagicMock() - mock_properties.message_id = 'callback_test' - valid_body = b'{"id": 123, "name": "Test", "active": true}' - - def consume_generator(): - for _ in range(3): - yield (mock_method_frame, mock_properties, valid_body) - - mock_consumer._channel.consume.return_value = consume_generator() - - # Mock callback that raises exception - failing_callback = Mock(side_effect=Exception("Callback failed")) - - mock_consumer.start_consumer( - queue_name="test_queue", - callback=failing_callback, - auto_ack=False, - auto_declare=True, - exchange_name="test_exchange", - exchange_type="direct", - routing_key="test_key", - payload_model=ExpectedPayload, - immediate_retry_delay=1 - ) - - # Callback should be called multiple times due to retries - assert failing_callback.call_count >= 2 - - # Should have nack calls with requeue - nack_calls = mock_consumer._channel.basic_nack.call_args_list - assert len(nack_calls) > 0 - assert nack_calls[0][1]['delivery_tag'] == 456 - - def test_successful_processing_acks_message(self, mock_consumer): - """Test that successful processing acknowledges the message""" - mock_method_frame = MagicMock() - mock_method_frame.delivery_tag = 789 - mock_properties = MagicMock() - mock_properties.message_id = 'success_test' - mock_properties.app_id = 'test_app' - valid_body = b'{"id": 123, "name": "Test", "active": true}' - - mock_consumer._channel.consume.return_value = [ - (mock_method_frame, mock_properties, valid_body) - ] - - successful_callback = Mock() # No exception - - mock_consumer.start_consumer( - queue_name="test_queue", - callback=successful_callback, - auto_ack=False, - auto_declare=True, - exchange_name="test_exchange", - exchange_type="direct", - routing_key="test_key", - payload_model=ExpectedPayload - ) - - # Should acknowledge the message - mock_consumer._channel.basic_ack.assert_called_with(delivery_tag=789) - - # Should not have any nack calls - mock_consumer._channel.basic_nack.assert_not_called() - - -class TestAsyncRetryCycles: - """Test async consumer retry cycle functionality""" - - @pytest.fixture - def mock_async_consumer(self): - """Create a mock async consumer""" - consumer = MrsalAsyncAMQP( - host="localhost", - port=5672, - credentials=("user", "password"), - virtual_host="testboi", - ssl=False, - verbose=False, - prefetch_count=1, - heartbeat=60, - dlx_enable=True, - max_retries=2, - use_quorum_queues=True - ) - - # Mock connection and channel - consumer._connection = AsyncMock() - consumer._channel = AsyncMock() - consumer.auto_declare_ok = True - - # Mock setup methods - consumer.setup_async_connection = AsyncMock() - consumer._async_setup_exchange_and_queue = AsyncMock() - - return consumer - - @pytest.mark.asyncio - async def test_async_start_consumer_has_retry_cycle_params(self, mock_async_consumer): - """Test that async start_consumer accepts retry cycle parameters.""" - # Mock minimal setup - mock_queue = AsyncMock() - mock_async_consumer._async_setup_exchange_and_queue.return_value = mock_queue - - # FIX: Create proper async iterator mock - async_iterator = AsyncIteratorMock([]) # Empty iterator to avoid infinite loop - - # Mock the iterator method to return our async iterator (not a coroutine) - mock_queue.iterator = Mock(return_value=async_iterator) - - # Should not raise error with retry cycle parameters - await mock_async_consumer.start_consumer( - queue_name="test_queue", - exchange_name="test_exchange", - exchange_type="direct", - routing_key="test_key", - enable_retry_cycles=True, - retry_cycle_interval=15, - max_retry_time_limit=90, - immediate_retry_delay=5 - ) - - @pytest.mark.asyncio - @patch('mrsal.amqp.subclass.asyncio.sleep') - async def test_async_immediate_retry_delay(self, mock_async_sleep, mock_async_consumer): - """Test that immediate retry delay is applied in async consumer.""" - # Mock message that will fail processing - mock_message = MagicMock() - mock_message.delivery_tag = 123 - mock_message.app_id = 'test_app' - mock_message.headers = None - mock_message.body = b'{"id": "wrong_type", "name": "Test", "active": true}' - mock_message.reject = AsyncMock() - - mock_properties = MagicMock() - mock_properties.headers = None - - # Mock queue iterator to return message multiple times (simulating redelivery) - mock_queue = AsyncMock() - mock_async_consumer._async_setup_exchange_and_queue.return_value = mock_queue - - # FIX: Create proper async iterator with multiple redeliveries - async_iterator = AsyncIteratorMock([mock_message] * 3) # 3 redeliveries - mock_queue.iterator = Mock(return_value=async_iterator) - - await mock_async_consumer.start_consumer( - queue_name="test_queue", - callback=AsyncMock(side_effect=Exception("Processing failed")), - auto_ack=False, - auto_declare=True, - exchange_name="test_exchange", - exchange_type="direct", - routing_key="test_key", - immediate_retry_delay=2 # 2 seconds delay - ) - - # Verify asyncio.sleep was called with the delay - mock_async_sleep.assert_called_with(2) - - @pytest.mark.asyncio - async def test_async_validation_failure_with_cycles_disabled(self, mock_async_consumer): - """Test async consumer with validation failure and retry cycles disabled""" - # Mock message with invalid payload - mock_message = MagicMock() - mock_message.delivery_tag = 123 - mock_message.app_id = 'test_app' - mock_message.headers = None - mock_message.body = b'{"id": "wrong_type", "name": "Test", "active": true}' - mock_message.reject = AsyncMock() - mock_message.ack = AsyncMock() - - mock_properties = MagicMock() - mock_properties.headers = None - - mock_queue = AsyncMock() - mock_async_consumer._async_setup_exchange_and_queue.return_value = mock_queue - - # Simulate multiple redeliveries - async_iterator = AsyncIteratorMock([mock_message] * 3) - mock_queue.iterator = Mock(return_value=async_iterator) - - await mock_async_consumer.start_consumer( - queue_name="test_queue", - callback=AsyncMock(), - auto_ack=False, - auto_declare=True, - exchange_name="test_exchange", - exchange_type="direct", - routing_key="test_key", - payload_model=ExpectedPayload, - dlx_enable=True, - enable_retry_cycles=False - ) - - # Should have called reject with requeue=False eventually - mock_message.reject.assert_called() - - @pytest.mark.asyncio - async def test_async_successful_processing(self, mock_async_consumer): - """Test async consumer successful message processing""" - # Mock message with valid payload - mock_message = MagicMock() - mock_message.delivery_tag = 456 - mock_message.app_id = 'test_app' - mock_message.headers = None - mock_message.body = b'{"id": 123, "name": "Test", "active": true}' - mock_message.ack = AsyncMock() - mock_message.reject = AsyncMock() - - mock_properties = MagicMock() - mock_properties.headers = None - - mock_queue = AsyncMock() - mock_async_consumer._async_setup_exchange_and_queue.return_value = mock_queue - - # Single successful message - async_iterator = AsyncIteratorMock([mock_message]) - mock_queue.iterator = Mock(return_value=async_iterator) - - successful_callback = AsyncMock() - - await mock_async_consumer.start_consumer( - queue_name="test_queue", - callback=successful_callback, - auto_ack=False, - auto_declare=True, - exchange_name="test_exchange", - exchange_type="direct", - routing_key="test_key", - payload_model=ExpectedPayload - ) - - # Should acknowledge the message - mock_message.ack.assert_called() - - # Should not reject - mock_message.reject.assert_not_called() - - @pytest.mark.asyncio - async def test_async_callback_failure_retry_cycles(self, mock_async_consumer): - """Test async consumer callback failure with retry cycles enabled""" - mock_message = MagicMock() - mock_message.delivery_tag = 789 - mock_message.app_id = 'test_app' - mock_message.headers = None - mock_message.body = b'{"id": 123, "name": "Test", "active": true}' - mock_message.ack = AsyncMock() - mock_message.reject = AsyncMock() - - mock_properties = MagicMock() - mock_properties.headers = None - - mock_queue = AsyncMock() - mock_async_consumer._async_setup_exchange_and_queue.return_value = mock_queue - - # Multiple message deliveries for retry testing - async_iterator = AsyncIteratorMock([mock_message] * 3) - mock_queue.iterator = Mock(return_value=async_iterator) - - # Mock the DLX publishing method - mock_async_consumer._publish_to_dlx = AsyncMock() - - failing_callback = AsyncMock(side_effect=Exception("Async callback failed")) - - await mock_async_consumer.start_consumer( - queue_name="test_queue", - callback=failing_callback, - auto_ack=False, - auto_declare=True, - exchange_name="test_exchange", - exchange_type="direct", - routing_key="test_key", - payload_model=ExpectedPayload, - dlx_enable=True, - enable_retry_cycles=True, - retry_cycle_interval=5, - max_retry_time_limit=30 - ) - - # Callback should be called multiple times - assert failing_callback.call_count >= 2 + """Mock async iterator for aio-pika queue.iterator()""" + def __init__(self, items): + self.items = iter(items) + + def __aiter__(self): + return self + + async def __anext__(self): + try: + return next(self.items) + except StopIteration: + raise StopAsyncIteration + + +class TestDLXRetryCycleOnly: + """Test DLX retry cycle mechanism WITHOUT immediate retries""" + + @pytest.fixture + def mock_consumer(self): + """Create a mock consumer with mocked connection and channel""" + consumer = MrsalBlockingAMQP( + host="localhost", + port=5672, + credentials=("user", "password"), + virtual_host="testboi", + ssl=False, + verbose=False, + prefetch_count=1, + heartbeat=60, + dlx_enable=True, + use_quorum_queues=True, + blocked_connection_timeout=60 + ) + + consumer._connection = MagicMock() + consumer._channel = MagicMock() + consumer.auto_declare_ok = True + consumer.setup_blocking_connection = MagicMock() + consumer._setup_exchange_and_queue = MagicMock() + + return consumer + + def test_validation_failure_sends_to_dlx_immediately(self, mock_consumer): + """ + Test that validation failures send message directly to DLX + NO immediate retries - goes straight to DLX + """ + mock_method_frame = MagicMock() + mock_method_frame.delivery_tag = 456 + mock_method_frame.routing_key = "test_key" + mock_method_frame.exchange = "test_exchange" + + invalid_body = b'{"id": "not_an_int", "name": "Test", "active": true}' + + props = MagicMock() + props.message_id = 'validation_test' + props.app_id = 'test_app' + props.headers = None + + mock_consumer._channel.consume.return_value = [ + (mock_method_frame, props, invalid_body) + ] + + mock_consumer._publish_to_dlx_with_retry_cycle = MagicMock() + + mock_consumer.start_consumer( + queue_name="test_queue", + callback=Mock(), + auto_ack=False, + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + payload_model=ExpectedPayload, + dlx_enable=True, + enable_retry_cycles=True, + retry_cycle_interval=10, + max_retry_time_limit=60 + ) + + # Should NOT nack with requeue + mock_consumer._channel.basic_nack.assert_not_called() + + # Should call DLX immediately (no immediate retries) + mock_consumer._publish_to_dlx_with_retry_cycle.assert_called_once() + + # Verify the call had correct parameters + call_args = mock_consumer._publish_to_dlx_with_retry_cycle.call_args + assert call_args[0][0] == mock_method_frame + assert call_args[0][1] == props + assert call_args[0][2] == invalid_body + + def test_callback_failure_sends_to_dlx_with_cycles_enabled(self, mock_consumer): + """ + Test callback failure with retry_cycles enabled. + Should send to DLX immediately for retry cycle + """ + mock_method_frame = MagicMock() + mock_method_frame.delivery_tag = 789 + mock_method_frame.routing_key = "test_key" + mock_method_frame.exchange = "test_exchange" + + valid_body = b'{"id": 123, "name": "Test", "active": true}' + + props = MagicMock() + props.message_id = 'callback_fail' + props.app_id = 'test_app' + props.headers = None + + mock_consumer._channel.consume.return_value = [ + (mock_method_frame, props, valid_body) + ] + + # Callback that always fails + failing_callback = Mock(side_effect=Exception("Processing failed")) + mock_consumer._publish_to_dlx_with_retry_cycle = MagicMock() + + mock_consumer.start_consumer( + queue_name="test_queue", + callback=failing_callback, + auto_ack=False, + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + payload_model=ExpectedPayload, + dlx_enable=True, + enable_retry_cycles=True, + retry_cycle_interval=10, + max_retry_time_limit=60 + ) + + # Callback should be called once (no immediate retries) + assert failing_callback.call_count == 1 + + # Should send to DLX with retry cycle + mock_consumer._publish_to_dlx_with_retry_cycle.assert_called_once() + + def test_callback_failure_with_retry_cycles_disabled(self, mock_consumer): + """ + Test callback failure with retry_cycles disabled. + Should nack to DLX without retry cycle logic + """ + mock_method_frame = MagicMock() + mock_method_frame.delivery_tag = 999 + mock_method_frame.routing_key = "test_key" + + valid_body = b'{"id": 123, "name": "Test", "active": true}' + + props = MagicMock() + props.message_id = 'callback_no_cycle' + props.app_id = 'test_app' + props.headers = None + + mock_consumer._channel.consume.return_value = [ + (mock_method_frame, props, valid_body) + ] + + failing_callback = Mock(side_effect=Exception("Processing failed")) + + mock_consumer.start_consumer( + queue_name="test_queue", + callback=failing_callback, + auto_ack=False, + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + payload_model=ExpectedPayload, + dlx_enable=True, + enable_retry_cycles=False # Disabled + ) + + # Should nack with requeue=False (goes to DLX) + nack_calls = mock_consumer._channel.basic_nack.call_args_list + assert len(nack_calls) == 1 + assert nack_calls[0][1]['delivery_tag'] == 999 + assert nack_calls[0][1]['requeue'] == False + + def test_successful_processing_acks_message(self, mock_consumer): + """Test that successful processing acks immediately without retries""" + mock_method_frame = MagicMock() + mock_method_frame.delivery_tag = 111 + + valid_body = b'{"id": 456, "name": "Success", "active": false}' + + props = MagicMock() + props.message_id = 'success_msg' + props.app_id = 'test_app' + props.headers = None + + mock_consumer._channel.consume.return_value = [ + (mock_method_frame, props, valid_body) + ] + + successful_callback = Mock() # No exception + + mock_consumer.start_consumer( + queue_name="test_queue", + callback=successful_callback, + auto_ack=False, + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + payload_model=ExpectedPayload + ) + + # Should ack immediately + mock_consumer._channel.basic_ack.assert_called_once_with(delivery_tag=111) + + # Should NOT nack or send to DLX + mock_consumer._channel.basic_nack.assert_not_called() + + def test_no_dlx_drops_message(self, mock_consumer): + """Test that without DLX, failed messages are dropped""" + mock_method_frame = MagicMock() + mock_method_frame.delivery_tag = 222 + + invalid_body = b'{"id": "bad", "name": "Test", "active": true}' + + props = MagicMock() + props.message_id = 'no_dlx' + props.app_id = 'test_app' + props.headers = None + + mock_consumer._channel.consume.return_value = [ + (mock_method_frame, props, invalid_body) + ] + + mock_consumer.start_consumer( + queue_name="test_queue", + callback=Mock(), + auto_ack=False, + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + payload_model=ExpectedPayload, + dlx_enable=False # No DLX + ) + + # Should nack with requeue=False (message dropped) + nack_calls = mock_consumer._channel.basic_nack.call_args_list + assert len(nack_calls) == 1 + assert nack_calls[0][1]['requeue'] == False + + +class TestAsyncDLXRetryCycleOnly: + """Test async consumer with DLX retry cycles (no immediate retries)""" + + @pytest.fixture + def mock_async_consumer(self): + consumer = MrsalAsyncAMQP( + host="localhost", + port=5672, + credentials=("user", "password"), + virtual_host="testboi", + ssl=False, + verbose=False, + prefetch_count=1, + heartbeat=60, + dlx_enable=True, + use_quorum_queues=True + ) + + consumer._connection = AsyncMock() + consumer._channel = AsyncMock() + consumer.auto_declare_ok = True + consumer.setup_async_connection = AsyncMock() + consumer._async_setup_exchange_and_queue = AsyncMock() + + return consumer + + @pytest.mark.asyncio + async def test_async_validation_failure_sends_to_dlx(self, mock_async_consumer): + """Test async validation failure sends to DLX immediately""" + invalid_body = b'{"id": "wrong", "name": "Test", "active": true}' + + mock_msg = MagicMock() + mock_msg.delivery_tag = 123 + mock_msg.app_id = 'async_app' + mock_msg.headers = None + mock_msg.body = invalid_body + mock_msg.reject = AsyncMock() + mock_msg.ack = AsyncMock() + + mock_properties = MagicMock() + mock_properties.headers = None + + mock_queue = AsyncMock() + mock_async_consumer._async_setup_exchange_and_queue.return_value = mock_queue + + async_iterator = AsyncIteratorMock([mock_msg]) + mock_queue.iterator = Mock(return_value=async_iterator) + + mock_async_consumer._publish_to_dlx = AsyncMock() + + await mock_async_consumer.start_consumer( + queue_name="test_queue", + callback=AsyncMock(), + auto_ack=False, + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + payload_model=ExpectedPayload, + enable_retry_cycles=True, + retry_cycle_interval=10, + max_retry_time_limit=60 + ) + + # Should NOT reject (handled by DLX publish) + mock_msg.reject.assert_not_called() + + # Should call DLX + mock_async_consumer._publish_to_dlx.assert_called_once() + + @pytest.mark.asyncio + async def test_async_successful_processing(self, mock_async_consumer): + """Test async successful message processing""" + valid_body = b'{"id": 789, "name": "AsyncSuccess", "active": true}' + + mock_msg = MagicMock() + mock_msg.delivery_tag = 456 + mock_msg.app_id = 'async_app' + mock_msg.headers = None + mock_msg.body = valid_body + mock_msg.ack = AsyncMock() + mock_msg.reject = AsyncMock() + + mock_properties = MagicMock() + mock_properties.headers = None + + mock_queue = AsyncMock() + mock_async_consumer._async_setup_exchange_and_queue.return_value = mock_queue + + async_iterator = AsyncIteratorMock([mock_msg]) + mock_queue.iterator = Mock(return_value=async_iterator) + + successful_callback = AsyncMock() + + await mock_async_consumer.start_consumer( + queue_name="test_queue", + callback=successful_callback, + auto_ack=False, + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + payload_model=ExpectedPayload + ) + + # Should ack + mock_msg.ack.assert_called_once() + + # Should NOT reject + mock_msg.reject.assert_not_called() class TestDLXRetryHeaders: - """Test DLX retry cycle header management""" - - @pytest.fixture - def mock_consumer(self): - consumer = MrsalBlockingAMQP( - host="localhost", - port=5672, - credentials=("user", "password"), - virtual_host="testboi", - dlx_enable=True, - max_retries=2 - ) - consumer._connection = MagicMock() - consumer._channel = MagicMock() - return consumer - - def test_retry_cycle_info_extraction(self, mock_consumer): - """Test extraction of retry cycle information from headers""" - # Mock properties with retry headers - mock_properties = MagicMock() - mock_properties.headers = { - 'x-cycle-count': 2, - 'x-first-failure': '2024-01-01T10:00:00Z', - 'x-total-elapsed': 120000 # 2 minutes in ms - } - - retry_info = mock_consumer._get_retry_cycle_info(mock_properties) - - assert retry_info['cycle_count'] == 2 - assert retry_info['first_failure'] == '2024-01-01T10:00:00Z' - assert retry_info['total_elapsed'] == 120000 - - def test_retry_cycle_info_defaults(self, mock_consumer): - """Test default values when no retry headers present""" - mock_properties = MagicMock() - mock_properties.headers = None - - retry_info = mock_consumer._get_retry_cycle_info(mock_properties) - - assert retry_info['cycle_count'] == 0 - assert retry_info['first_failure'] is None - assert retry_info['total_elapsed'] == 0 - - def test_should_continue_retry_cycles_time_limit(self, mock_consumer): - """Test retry cycle time limit checking""" - # Within time limit - retry_info = {'total_elapsed': 30000} # 30 seconds - should_continue = mock_consumer._should_continue_retry_cycles( - retry_info, enable_retry_cycles=True, max_retry_time_limit=1 # 1 minute - ) - assert should_continue is True - - # Exceeded time limit - retry_info = {'total_elapsed': 120000} # 2 minutes - should_continue = mock_consumer._should_continue_retry_cycles( - retry_info, enable_retry_cycles=True, max_retry_time_limit=1 # 1 minute - ) - assert should_continue is False - - def test_should_continue_retry_cycles_disabled(self, mock_consumer): - """Test retry cycle disabled""" - retry_info = {'total_elapsed': 0} - should_continue = mock_consumer._should_continue_retry_cycles( - retry_info, enable_retry_cycles=False, max_retry_time_limit=60 - ) - assert should_continue is False - - def test_create_retry_cycle_headers(self, mock_consumer): - """Test creation of retry cycle headers""" - original_headers = {'custom-header': 'value'} - - headers = mock_consumer._create_retry_cycle_headers( - original_headers=original_headers, - cycle_count=1, - first_failure='2024-01-01T10:00:00Z', - processing_error='Test error', - should_cycle=True, - original_exchange='test_exchange', - original_routing_key='test_key' - ) - - assert headers['x-cycle-count'] == 2 # cycle_count + 1 - assert headers['x-first-failure'] == '2024-01-01T10:00:00Z' - assert headers['x-processing-error'] == 'Test error' - assert headers['x-retry-exhausted'] is False - assert headers['x-dead-letter-exchange'] == 'test_exchange' - assert headers['x-dead-letter-routing-key'] == 'test_key' - assert headers['custom-header'] == 'value' # Original header preserved + """Test DLX retry cycle header management""" + + @pytest.fixture + def mock_consumer(self): + consumer = MrsalBlockingAMQP( + host="localhost", + port=5672, + credentials=("user", "password"), + virtual_host="testboi", + dlx_enable=True + ) + consumer._connection = MagicMock() + consumer._channel = MagicMock() + return consumer + + def test_retry_cycle_info_extraction(self, mock_consumer): + """Test extraction of retry cycle information from headers""" + mock_properties = MagicMock() + mock_properties.headers = { + 'x-cycle-count': 2, + 'x-first-failure': '2024-01-01T10:00:00Z', + 'x-total-elapsed': 120000 + } + + retry_info = mock_consumer._get_retry_cycle_info(mock_properties) + + assert retry_info['cycle_count'] == 2 + assert retry_info['first_failure'] == '2024-01-01T10:00:00Z' + assert retry_info['total_elapsed'] == 120000 + + def test_should_continue_retry_cycles_time_limit(self, mock_consumer): + """Test retry cycle time limit checking""" + # Within time limit + retry_info = {'total_elapsed': 30000} + should_continue = mock_consumer._should_continue_retry_cycles( + retry_info, enable_retry_cycles=True, max_retry_time_limit=1 + ) + assert should_continue is True + + # Exceeded time limit + retry_info = {'total_elapsed': 120000} + should_continue = mock_consumer._should_continue_retry_cycles( + retry_info, enable_retry_cycles=True, max_retry_time_limit=1 + ) + assert should_continue is False