diff --git a/README.md b/README.md index 5fa90c9..fb5a194 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # MRSAL AMQP -[![Release](https://img.shields.io/badge/release-1.1.3-blue.svg)](https://pypi.org/project/mrsal/) +[![Release](https://img.shields.io/badge/release-1.2.0-blue.svg)](https://pypi.org/project/mrsal/) [![Python 3.10+](https://img.shields.io/badge/python-3.10%7C3.11%7C3.12-blue.svg)](https://www.python.org/downloads/) [![Mrsal Workflow](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml/badge.svg?branch=main)](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml) [![Coverage](https://neomedsys.github.io/mrsal/reports/badges/coverage-badge.svg)](https://neomedsys.github.io/mrsal/reports/coverage/htmlcov/) diff --git a/mrsal/amqp/subclass.py b/mrsal/amqp/subclass.py index 923ebd7..81f9c2d 100644 --- a/mrsal/amqp/subclass.py +++ b/mrsal/amqp/subclass.py @@ -1,4 +1,5 @@ import asyncio +import time from mrsal.basemodels import MrsalProtocol import pika import json @@ -6,14 +7,14 @@ from mrsal.exceptions import MrsalAbortedSetup, MrsalNoAsyncioLoopError from logging import WARNING from pika.exceptions import ( - AMQPConnectionError, - ChannelClosedByBroker, - StreamLostError, - ConnectionClosedByBroker, - NackError, - UnroutableError - ) -from aio_pika import connect_robust, Channel as AioChannel + AMQPConnectionError, + ChannelClosedByBroker, + StreamLostError, + ConnectionClosedByBroker, + NackError, + UnroutableError + ) +from aio_pika import connect_robust, Message, Channel as AioChannel from typing import Callable, Type from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, before_sleep_log from pydantic import ValidationError @@ -26,486 +27,592 @@ @dataclass class MrsalBlockingAMQP(Mrsal): - """ - :param int blocked_connection_timeout: blocked_connection_timeout - is the timeout, in seconds, - for the connection to remain blocked; if the timeout expires, - the connection will be torn down during connection tuning. - """ - blocked_connection_timeout: int = 60 # sec - - - def setup_blocking_connection(self) -> None: - """We can use setup_blocking_connection for establishing a connection to RabbitMQ server specifying connection parameters. - The connection is blocking which is only advisable to use for the apps with low througput. - - DISCLAIMER: If you expect a lot of traffic to the app or if its realtime then you should use async. - - Parameters - ---------- - context : Dict[str, str] - context is the structured map with information regarding the SSL options for connecting with rabbit server via TLS. - """ - connection_info = f""" - Mrsal connection parameters: - host={self.host}, - virtual_host={self.virtual_host}, - port={self.port}, - heartbeat={self.heartbeat}, - ssl={self.ssl} - """ - if self.verbose: - log.info(f"Establishing connection to RabbitMQ on {connection_info}") - credentials = pika.PlainCredentials(*self.credentials) - try: - self._connection = pika.BlockingConnection( - pika.ConnectionParameters( - host=self.host, - port=self.port, - ssl_options=self.get_ssl_context(async_conn=False), - virtual_host=self.virtual_host, - credentials=credentials, - heartbeat=self.heartbeat, - blocked_connection_timeout=self.blocked_connection_timeout, - ) - ) - - self._channel = self._connection.channel() - # Note: prefetch is set to 1 here as an example only. - # In production you will want to test with different prefetch values to find which one provides the best performance and usability for your solution. - # use a high number of prefecth if you think the pods with Mrsal installed can handle it. A prefetch 4 will mean up to 4 async runs before ack is required - self._channel.basic_qos(prefetch_count=self.prefetch_count) - log.info(f"Boom! Connection established with RabbitMQ on {connection_info}") - except (AMQPConnectionError, ChannelClosedByBroker, ConnectionClosedByBroker, StreamLostError) as e: - log.error(f"I tried to connect with the RabbitMQ server but failed with: {e}") - raise - except Exception as e: - log.error(f"Unexpected error caught: {e}") - - @retry( - retry=retry_if_exception_type(( - AMQPConnectionError, - ChannelClosedByBroker, - ConnectionClosedByBroker, - StreamLostError, - )), - stop=stop_after_attempt(3), - wait=wait_fixed(2), - before_sleep=before_sleep_log(log, WARNING) - ) - def start_consumer(self, - queue_name: str, - callback: Callable | None = None, - callback_args: dict[str, str | int | float | bool] | None = None, - auto_ack: bool = True, - inactivity_timeout: int | None = None, # just let conusmer wait patiently damn it - auto_declare: bool = True, - exchange_name: str | None = None, - exchange_type: str | None = None, - routing_key: str | None = None, - payload_model: Type | None = None, - requeue: bool = True, - dlx_enable: bool = True, - dlx_exchange_name: str | None = None, - dlx_routing_key: str | None = None, - use_quorum_queues: bool = True - ) -> None: - """ - Start the consumer using blocking setup. - :param queue: The queue to consume from. - :param auto_ack: If True, messages are automatically acknowledged. - :param inactivity_timeout: Timeout for inactivity in the consumer loop. - :param callback: The callback function to process messages. - :param callback_args: Optional arguments to pass to the callback. - """ - # Connect and start the I/O loop - self.setup_blocking_connection() - retry_counts = {} - - if auto_declare: - if None in (exchange_name, queue_name, exchange_type, routing_key): - raise TypeError('Make sure that you are passing in all the necessary args for auto_declare') - - self._setup_exchange_and_queue( - exchange_name=exchange_name, - queue_name=queue_name, - exchange_type=exchange_type, - routing_key=routing_key, - dlx_enable=dlx_enable, - dlx_exchange_name=dlx_exchange_name, - dlx_routing_key=dlx_routing_key, - use_quorum_queues=use_quorum_queues - ) - - if not self.auto_declare_ok: - raise MrsalAbortedSetup('Auto declaration for the connection setup failed and is aborted') - - log.info(f"Straigh out of the swamps -- consumer boi listening on queue: {queue_name} to the exchange {exchange_name}. Waiting for messages...") - - try: - for method_frame, properties, body in self._channel.consume( - queue=queue_name, auto_ack=auto_ack, inactivity_timeout=inactivity_timeout): - if method_frame: - if properties: - app_id = properties.app_id if hasattr(properties, 'app_id') else 'no AppID given' - msg_id = properties.message_id if hasattr(properties, 'message_id') else 'no msgID given' - delivery_tag = method_frame.delivery_tag - - if self.verbose: - log.info( - f""" - Message received with: - - Method Frame: {method_frame} - - Redelivery: {method_frame.redelivered} - - Exchange: {method_frame.exchange} - - Routing Key: {method_frame.routing_key} - - Delivery Tag: {method_frame.delivery_tag} - - Properties: {properties} - - Requeue: {requeue} - - Auto Ack: {auto_ack} - """ - ) - if auto_ack: - log.info(f'I successfully received a message with AutoAck from: {app_id} with messageID: {msg_id}') - - current_retry = retry_counts.get(delivery_tag, 0) - should_process = True - - if payload_model: - try: - self.validate_payload(body, payload_model) - except (ValidationError, json.JSONDecodeError, UnicodeDecodeError, TypeError) as e: - log.error(f"Oh lordy lord, payload validation failed for your specific model requirements: {e}") - should_process = False - - if callback and should_process: - try: - if callback_args: - callback(*callback_args, method_frame, properties, body) - else: - callback(method_frame, properties, body) - except Exception as e: - log.error(f"Callback method failure: {e}") - log.error(f"Oh lordy lord message {msg_id} from {app_id} failed while running callback") - should_process = False - - if not should_process and not auto_ack: - if current_retry < self.max_retries and requeue: - # Increment retry count and requeue - retry_counts[delivery_tag] = current_retry + 1 - self._channel.basic_nack(delivery_tag=delivery_tag, requeue=True) - log.info(f"Message {msg_id} requeued (attempt {current_retry + 1}/{self.max_retries})") - else: - # Max retries reached or requeue disabled - retry_counts.pop(delivery_tag, None) - if dlx_enable: - self._channel.basic_nack(delivery_tag=delivery_tag, requeue=False) - log.warning(f"Message {msg_id} sent to dead letter exchange after {current_retry} retries") - log.info(f"Its the fault of polictial ideology imposing! Strangle an influencer if it makes you feel better!") - else: - self._channel.basic_nack(delivery_tag=delivery_tag, requeue=False) - log.warning(f"No dead letter exchange declared for {queue_name}, proceeding to drop the message -- reflect on your life choices! byebye") - log.info(f"Dropped message content: {body}") - continue - - if not auto_ack and should_process: - retry_counts.pop(delivery_tag, None) - log.info(f'Message ({msg_id}) from {app_id} received and properly processed -- now dance the funky chicken') - self._channel.basic_ack(delivery_tag=delivery_tag) - - except (AMQPConnectionError, ConnectionClosedByBroker, StreamLostError) as e: - log.error(f"Ooooooopsie! I caught a connection error while consuming: {e}") - raise - except Exception as e: - log.error(f'Oh lordy lord! I failed consuming ze messaj with: {e}') - - @retry( - retry=retry_if_exception_type(( - NackError, - UnroutableError - )), - stop=stop_after_attempt(3), - wait=wait_fixed(2), - before_sleep=before_sleep_log(log, WARNING) - ) - def publish_message( - self, - exchange_name: str, - routing_key: str, - message: str | bytes | None, - exchange_type: str, - queue_name: str, - auto_declare: bool = True, - prop: pika.BasicProperties | None = None, - ) -> None: - """Publish message to the exchange specifying routing key and properties. - - :param str exchange: The exchange to publish to - :param str routing_key: The routing key to bind on - :param bytes body: The message body; empty string if no body - :param pika.spec.BasicProperties properties: message properties - :param bool fast_setup: - - when True, will the method create the specified exchange, queue and bind them together using the routing kye. - - If False, this method will check if the specified exchange and queue already exist before publishing. - - :raises UnroutableError: raised when a message published in publisher-acknowledgments mode (see `BlockingChannel.confirm_delivery`) is returned via `Basic.Return` followed by `Basic.Ack`. - :raises NackError: raised when a message published in publisher-acknowledgements mode is Nack'ed by the broker. See `BlockingChannel.confirm_delivery`. - """ - - if not isinstance(message, (str, bytes)): - raise MrsalAbortedSetup(f'Your message body needs to be string or bytes or serialized dict') - # connect and use only blocking - self.setup_blocking_connection() - - if auto_declare: - if None in (exchange_name, queue_name, exchange_type, routing_key): - raise TypeError('Make sure that you are passing in all the necessary args for auto_declare') - - self._setup_exchange_and_queue( - exchange_name=exchange_name, - queue_name=queue_name, - exchange_type=exchange_type, - routing_key=routing_key - ) - try: - # Publish the message by serializing it in json dump - # NOTE! we are not dumping a json anymore here! This allows for more flexibility - self._channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=message, properties=prop) - log.info(f"The message ({message!r}) is published to the exchange {exchange_name} with the routing key {routing_key}") - - except UnroutableError as e: - log.error(f"Producer could not publish message:{message!r} to the exchange {exchange_name} with a routing key {routing_key}: {e}", exc_info=True) - raise - except NackError as e: - log.error(f"Message NACKed by broker: {e}") - raise - except Exception as e: - log.error(f"Unexpected error while publishing message: {e}") - - @retry( - retry=retry_if_exception_type(( - NackError, - UnroutableError - )), - stop=stop_after_attempt(3), - wait=wait_fixed(2), - before_sleep=before_sleep_log(log, WARNING) - ) - def publish_messages( - self, - mrsal_protocol_collection: dict[str, dict[str, str | bytes]], - prop: pika.BasicProperties | None = None, - ) -> None: - """Publish message to the exchange specifying routing key and properties. - - mrsal_protocol_collection : dict[str, dict[str, str | bytes]] - This is a collection of the protcols needed for publishing to multiple exhanges at once - - expected collection: { - inbound_app_1: {message: bytes | str, routing_key: str, queue_name: str, exchange_type: str, exchange_name: str}, - inbound_app_2: {message: bytes | str, routing_key: str, queue_name: str, exchange_type: str, exchange_name: str}, - ., - . - } - - :raises UnroutableError: raised when a message published in publisher-acknowledgments mode (see `BlockingChannel.confirm_delivery`) is returned via `Basic.Return` followed by `Basic.Ack`. - :raises NackError: raised when a message published in publisher-acknowledgements mode is Nack'ed by the broker. See `BlockingChannel.confirm_delivery`. - """ - - for inbound_app_id, mrsal_protocol in mrsal_protocol_collection.items(): - protocol = MrsalProtocol(**mrsal_protocol) - - if not isinstance(protocol.message, (str, bytes)): - raise MrsalAbortedSetup(f'Your message body needs to be string or bytes or serialized dict') - # connect and use only blocking - self.setup_blocking_connection() - self._setup_exchange_and_queue( - exchange_name=protocol.exchange_name, - queue_name=protocol.queue_name, - exchange_type=protocol.exchange_type, - routing_key=protocol.routing_key - ) - try: - # Publish the message by serializing it in json dump - # NOTE! we are not dumping a json anymore here! This allows for more flexibility - self._channel.basic_publish( - exchange=protocol.exchange_name, - routing_key=protocol.routing_key, - body=protocol.message, - properties=prop - ) - log.info(f"The message for inbound app {inbound_app_id} with message -- ({protocol.message!r}) is published to the exchange {protocol.exchange_name} with the routing key {protocol.routing_key}. Oh baby baby") - - except UnroutableError as e: - log.error(f"Producer could not publish message:{protocol.message!r} to the exchange {protocol.exchange_name} with a routing key {protocol.routing_key}: {e}", exc_info=True) - raise - except NackError as e: - log.error(f"Message NACKed by broker: {e}") - raise - except Exception as e: - log.error(f"Unexpected error while publishing message: {e}") + """ + :param int blocked_connection_timeout: blocked_connection_timeout + is the timeout, in seconds, + for the connection to remain blocked; if the timeout expires, + the connection will be torn down during connection tuning. + """ + blocked_connection_timeout: int = 60 # sec + + + def setup_blocking_connection(self) -> None: + """We can use setup_blocking_connection for establishing a connection to RabbitMQ server specifying connection parameters. + The connection is blocking which is only advisable to use for the apps with low througput. + + DISCLAIMER: If you expect a lot of traffic to the app or if its realtime then you should use async. + + Parameters + ---------- + context : Dict[str, str] + context is the structured map with information regarding the SSL options for connecting with rabbit server via TLS. + """ + connection_info = f""" + Mrsal connection parameters: + host={self.host}, + virtual_host={self.virtual_host}, + port={self.port}, + heartbeat={self.heartbeat}, + ssl={self.ssl} + """ + if self.verbose: + log.info(f"Establishing connection to RabbitMQ on {connection_info}") + credentials = pika.PlainCredentials(*self.credentials) + try: + self._connection = pika.BlockingConnection( + pika.ConnectionParameters( + host=self.host, + port=self.port, + ssl_options=self.get_ssl_context(async_conn=False), + virtual_host=self.virtual_host, + credentials=credentials, + heartbeat=self.heartbeat, + blocked_connection_timeout=self.blocked_connection_timeout, + ) + ) + + self._channel = self._connection.channel() + # Note: prefetch is set to 1 here as an example only. + # In production you will want to test with different prefetch values to find which one provides the best performance and usability for your solution. + # use a high number of prefecth if you think the pods with Mrsal installed can handle it. A prefetch 4 will mean up to 4 async runs before ack is required + self._channel.basic_qos(prefetch_count=self.prefetch_count) + log.info(f"Boom! Connection established with RabbitMQ on {connection_info}") + except (AMQPConnectionError, ChannelClosedByBroker, ConnectionClosedByBroker, StreamLostError) as e: + log.error(f"I tried to connect with the RabbitMQ server but failed with: {e}") + raise + except Exception as e: + log.error(f"Unexpected error caught: {e}") + + @retry( + retry=retry_if_exception_type(( + AMQPConnectionError, + ChannelClosedByBroker, + ConnectionClosedByBroker, + StreamLostError, + )), + stop=stop_after_attempt(3), + wait=wait_fixed(2), + before_sleep=before_sleep_log(log, WARNING) + ) + def start_consumer(self, + queue_name: str, + callback: Callable | None = None, + callback_args: dict[str, str | int | float | bool] | None = None, + auto_ack: bool = True, + inactivity_timeout: int | None = None, # just let conusmer wait patiently damn it + auto_declare: bool = True, + exchange_name: str | None = None, + exchange_type: str | None = None, + routing_key: str | None = None, + payload_model: Type | None = None, + requeue: bool = True, + dlx_enable: bool = True, + dlx_exchange_name: str | None = None, + dlx_routing_key: str | None = None, + use_quorum_queues: bool = True, + enable_retry_cycles: bool = True, + retry_cycle_interval: int = 10, # Minutes between cycles + max_retry_time_limit: int = 60, # Minutes total before permanent DLX + immediate_retry_delay: int = 4, # Seconds between immediate retries + ) -> None: + """ + Start the consumer using blocking setup. + :param queue: The queue to consume from. + :param auto_ack: If True, messages are automatically acknowledged. + :param inactivity_timeout: Timeout for inactivity in the consumer loop. + :param callback: The callback function to process messages. + :param callback_args: Optional arguments to pass to the callback. + """ + # Connect and start the I/O loop + self.setup_blocking_connection() + retry_counts = {} + + if auto_declare: + if None in (exchange_name, queue_name, exchange_type, routing_key): + raise TypeError('Make sure that you are passing in all the necessary args for auto_declare') + + self._setup_exchange_and_queue( + exchange_name=exchange_name, + queue_name=queue_name, + exchange_type=exchange_type, + routing_key=routing_key, + dlx_enable=dlx_enable, + dlx_exchange_name=dlx_exchange_name, + dlx_routing_key=dlx_routing_key, + use_quorum_queues=use_quorum_queues + ) + + if not self.auto_declare_ok: + raise MrsalAbortedSetup('Auto declaration for the connection setup failed and is aborted') + + log.info(f"Straigh out of the swamps -- consumer boi listening on queue: {queue_name} to the exchange {exchange_name}. Waiting for messages...") + + try: + for method_frame, properties, body in self._channel.consume( + queue=queue_name, auto_ack=auto_ack, inactivity_timeout=inactivity_timeout): + if method_frame: + if properties: + app_id = properties.app_id if hasattr(properties, 'app_id') else 'no AppID given' + msg_id = properties.message_id if hasattr(properties, 'message_id') else 'no msgID given' + delivery_tag = method_frame.delivery_tag + + if self.verbose: + log.info( + f""" + Message received with: + - Method Frame: {method_frame} + - Redelivery: {method_frame.redelivered} + - Exchange: {method_frame.exchange} + - Routing Key: {method_frame.routing_key} + - Delivery Tag: {method_frame.delivery_tag} + - Properties: {properties} + - Requeue: {requeue} + - Auto Ack: {auto_ack} + """ + ) + if auto_ack: + log.info(f'I successfully received a message with AutoAck from: {app_id} with messageID: {msg_id}') + + current_retry = retry_counts.get(delivery_tag, 0) + should_process = True + + if payload_model: + try: + self.validate_payload(body, payload_model) + except (ValidationError, json.JSONDecodeError, UnicodeDecodeError, TypeError) as e: + log.error(f"Oh lordy lord, payload validation failed for your specific model requirements: {e}") + should_process = False + + if callback and should_process: + try: + if callback_args: + callback(*callback_args, method_frame, properties, body) + else: + callback(method_frame, properties, body) + except Exception as e: + log.error(f"Callback method failure: {e}") + log.error(f"Oh lordy lord message {msg_id} from {app_id} failed while running callback") + should_process = False + + if not should_process and not auto_ack: + if current_retry < self.max_retries and requeue: + # Increment retry count and requeue + retry_counts[delivery_tag] = current_retry + 1 + self._channel.basic_nack(delivery_tag=delivery_tag, requeue=True) + log.info(f"Message {msg_id} requeued (attempt {current_retry + 1}/{self.max_retries})") + # Add delay before next retry + if current_retry + 1 < self.max_retries: + time.sleep(immediate_retry_delay) + else: + # Max retries reached or requeue disabled + retry_counts.pop(delivery_tag, None) + if dlx_enable and enable_retry_cycles: + # Use retry cycle logic + self._publish_to_dlx_with_retry_cycle( + method_frame, properties, body, "Callback processing failed", + exchange_name, routing_key, enable_retry_cycles, + retry_cycle_interval, max_retry_time_limit + ) + elif dlx_enable: + # Original DLX behavior + self._channel.basic_nack(delivery_tag=delivery_tag, requeue=False) + log.warning(f"Message {msg_id} sent to dead letter exchange after {current_retry} retries") + log.info(f"Its the fault of polictial ideology imposing! Strangle an influencer if it makes you feel better!") + else: + self._channel.basic_nack(delivery_tag=delivery_tag, requeue=False) + log.warning(f"No dead letter exchange declared for {queue_name}, proceeding to drop the message -- reflect on your life choices! byebye") + log.info(f"Dropped message content: {body}") + continue + + if not auto_ack and should_process: + retry_counts.pop(delivery_tag, None) + log.info(f'Message ({msg_id}) from {app_id} received and properly processed -- now dance the funky chicken') + self._channel.basic_ack(delivery_tag=delivery_tag) + + except (AMQPConnectionError, ConnectionClosedByBroker, StreamLostError) as e: + log.error(f"Ooooooopsie! I caught a connection error while consuming: {e}") + raise + except Exception as e: + log.error(f'Oh lordy lord! I failed consuming ze messaj with: {e}') + + @retry( + retry=retry_if_exception_type(( + NackError, + UnroutableError + )), + stop=stop_after_attempt(3), + wait=wait_fixed(2), + before_sleep=before_sleep_log(log, WARNING) + ) + def publish_message( + self, + exchange_name: str, + routing_key: str, + message: str | bytes | None, + exchange_type: str, + queue_name: str, + auto_declare: bool = True, + prop: pika.BasicProperties | None = None, + ) -> None: + """Publish message to the exchange specifying routing key and properties. + + :param str exchange: The exchange to publish to + :param str routing_key: The routing key to bind on + :param bytes body: The message body; empty string if no body + :param pika.spec.BasicProperties properties: message properties + :param bool fast_setup: + - when True, will the method create the specified exchange, queue and bind them together using the routing kye. + - If False, this method will check if the specified exchange and queue already exist before publishing. + + :raises UnroutableError: raised when a message published in publisher-acknowledgments mode (see `BlockingChannel.confirm_delivery`) is returned via `Basic.Return` followed by `Basic.Ack`. + :raises NackError: raised when a message published in publisher-acknowledgements mode is Nack'ed by the broker. See `BlockingChannel.confirm_delivery`. + """ + + if not isinstance(message, (str, bytes)): + raise MrsalAbortedSetup(f'Your message body needs to be string or bytes or serialized dict') + # connect and use only blocking + self.setup_blocking_connection() + + if auto_declare: + if None in (exchange_name, queue_name, exchange_type, routing_key): + raise TypeError('Make sure that you are passing in all the necessary args for auto_declare') + + self._setup_exchange_and_queue( + exchange_name=exchange_name, + queue_name=queue_name, + exchange_type=exchange_type, + routing_key=routing_key + ) + try: + # Publish the message by serializing it in json dump + # NOTE! we are not dumping a json anymore here! This allows for more flexibility + self._channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=message, properties=prop) + log.info(f"The message ({message!r}) is published to the exchange {exchange_name} with the routing key {routing_key}") + + except UnroutableError as e: + log.error(f"Producer could not publish message:{message!r} to the exchange {exchange_name} with a routing key {routing_key}: {e}", exc_info=True) + raise + except NackError as e: + log.error(f"Message NACKed by broker: {e}") + raise + except Exception as e: + log.error(f"Unexpected error while publishing message: {e}") + + @retry( + retry=retry_if_exception_type(( + NackError, + UnroutableError + )), + stop=stop_after_attempt(3), + wait=wait_fixed(2), + before_sleep=before_sleep_log(log, WARNING) + ) + def publish_messages( + self, + mrsal_protocol_collection: dict[str, dict[str, str | bytes]], + prop: pika.BasicProperties | None = None, + ) -> None: + """Publish message to the exchange specifying routing key and properties. + + mrsal_protocol_collection : dict[str, dict[str, str | bytes]] + This is a collection of the protcols needed for publishing to multiple exhanges at once + + expected collection: { + inbound_app_1: {message: bytes | str, routing_key: str, queue_name: str, exchange_type: str, exchange_name: str}, + inbound_app_2: {message: bytes | str, routing_key: str, queue_name: str, exchange_type: str, exchange_name: str}, + ., + . + } + + :raises UnroutableError: raised when a message published in publisher-acknowledgments mode (see `BlockingChannel.confirm_delivery`) is returned via `Basic.Return` followed by `Basic.Ack`. + :raises NackError: raised when a message published in publisher-acknowledgements mode is Nack'ed by the broker. See `BlockingChannel.confirm_delivery`. + """ + + for inbound_app_id, mrsal_protocol in mrsal_protocol_collection.items(): + protocol = MrsalProtocol(**mrsal_protocol) + + if not isinstance(protocol.message, (str, bytes)): + raise MrsalAbortedSetup(f'Your message body needs to be string or bytes or serialized dict') + # connect and use only blocking + self.setup_blocking_connection() + self._setup_exchange_and_queue( + exchange_name=protocol.exchange_name, + queue_name=protocol.queue_name, + exchange_type=protocol.exchange_type, + routing_key=protocol.routing_key + ) + try: + # Publish the message by serializing it in json dump + # NOTE! we are not dumping a json anymore here! This allows for more flexibility + self._channel.basic_publish( + exchange=protocol.exchange_name, + routing_key=protocol.routing_key, + body=protocol.message, + properties=prop + ) + log.info(f"The message for inbound app {inbound_app_id} with message -- ({protocol.message!r}) is published to the exchange {protocol.exchange_name} with the routing key {protocol.routing_key}. Oh baby baby") + + except UnroutableError as e: + log.error(f"Producer could not publish message:{protocol.message!r} to the exchange {protocol.exchange_name} with a routing key {protocol.routing_key}: {e}", exc_info=True) + raise + except NackError as e: + log.error(f"Message NACKed by broker: {e}") + raise + except Exception as e: + log.error(f"Unexpected error while publishing message: {e}") + + def _publish_to_dlx_with_retry_cycle(self, method_frame, properties, body, processing_error: str, + original_exchange: str, original_routing_key: str, + enable_retry_cycles: bool, retry_cycle_interval: int, + max_retry_time_limit: int): + """Publish message to DLX with retry cycle headers.""" + try: + # Use common logic from superclass + self._handle_dlx_with_retry_cycle( + method_frame, properties, body, processing_error, + original_exchange, original_routing_key, + enable_retry_cycles, retry_cycle_interval, max_retry_time_limit + ) + + # Acknowledge original message + self._channel.basic_ack(delivery_tag=method_frame.delivery_tag) + + except Exception as e: + log.error(f"Failed to send message to DLX: {e}") + self._channel.basic_nack(delivery_tag=method_frame.delivery_tag, requeue=True) + + def _publish_to_dlx(self, dlx_exchange: str, routing_key: str, body: bytes, properties: dict): + """Blocking implementation of DLX publishing.""" + # Convert properties dict to pika.BasicProperties + pika_properties = pika.BasicProperties( + headers=properties.get('headers'), + delivery_mode=properties.get('delivery_mode', 2), + content_type=properties.get('content_type', 'application/json'), + expiration=properties.get('expiration') + ) + + self._channel.basic_publish( + exchange=dlx_exchange, + routing_key=routing_key, + body=body, + properties=pika_properties + ) class MrsalAsyncAMQP(Mrsal): - """Handles asynchronous connection with RabbitMQ using aio-pika.""" - async def setup_async_connection(self): - """Setup an asynchronous connection to RabbitMQ using aio-pika.""" - log.info(f"Establishing async connection to RabbitMQ on {self.host}:{self.port}") - try: - self._connection = await connect_robust( - host=self.host, - port=self.port, - login=self.credentials[0], - password=self.credentials[1], - virtualhost=self.virtual_host, - ssl=self.ssl, - ssl_context=self.get_ssl_context(), - heartbeat=self.heartbeat - ) - self._channel = await self._connection.channel() - await self._channel.set_qos(prefetch_count=self.prefetch_count) - log.info("Async connection established successfully.") - except (AMQPConnectionError, StreamLostError, ChannelClosedByBroker, ConnectionClosedByBroker) as e: - log.error(f"Error establishing async connection: {e}", exc_info=True) - raise - except Exception as e: - log.error(f'Oh my lordy lord! I caugth an unexpected exception while trying to connect: {e}', exc_info=True) - - @retry( - retry=retry_if_exception_type(( - AMQPConnectionError, - ChannelClosedByBroker, - ConnectionClosedByBroker, - StreamLostError, - )), - stop=stop_after_attempt(3), - wait=wait_fixed(2), - before_sleep=before_sleep_log(log, WARNING) - ) - async def start_consumer( - self, - queue_name: str, - callback: Callable | None = None, - callback_args: dict[str, str | int | float | bool] | None = None, - auto_ack: bool = False, - auto_declare: bool = True, - exchange_name: str | None = None, - exchange_type: str | None = None, - routing_key: str | None = None, - payload_model: Type | None = None, - requeue: bool = True, - dlx_enable: bool = True, - dlx_exchange_name: str | None = None, - dlx_routing_key: str | None = None, - use_quorum_queues: bool = True - ): - """Start the async consumer with the provided setup.""" - retry_counts = {} - - # Check if there's a connection; if not, create one - try: - asyncio.get_running_loop() - except RuntimeError: - raise MrsalNoAsyncioLoopError(f'Young grasshopper! You forget to add asyncio.run(mrsal.start_consumer(...))') - if not self._connection: - await self.setup_async_connection() - - - self._channel: AioChannel = await self._connection.channel() - await self._channel.set_qos(prefetch_count=self.prefetch_count) - - if auto_declare: - if None in (exchange_name, queue_name, exchange_type, routing_key): - raise TypeError('Make sure that you are passing in all the necessary args for auto_declare') - - queue = await self._async_setup_exchange_and_queue( - exchange_name=exchange_name, - queue_name=queue_name, - exchange_type=exchange_type, - routing_key=routing_key, - dlx_enable=dlx_enable, - dlx_exchange_name=dlx_exchange_name, - dlx_routing_key=dlx_routing_key, - use_quorum_queues=use_quorum_queues - ) - - if not self.auto_declare_ok: - if self._connection: - await self._connection.close() - raise MrsalAbortedSetup('Auto declaration failed during setup.') - - log.info(f"Straight out of the swamps -- Consumer boi listening on queue: {queue_name}, exchange: {exchange_name}") - - # async with queue.iterator() as queue_iter: - async for message in queue.iterator(): - if message is None: - continue - - # Extract message metadata - delivery_tag = message.delivery_tag - app_id = message.app_id if hasattr(message, 'app_id') else 'NoAppID' - msg_id = message.app_id if hasattr(message, 'message_id') else 'NoMsgID' - - # add this so it is in line with Pikas awkawrdly old ways - properties = config.AioPikaAttributes(app_id=app_id, message_id=msg_id) - - if self.verbose: - log.info(f""" - Message received with: - - Redelivery: {message.redelivered} - - Exchange: {message.exchange} - - Routing Key: {message.routing_key} - - Delivery Tag: {message.delivery_tag} - - Requeue: {requeue} - - Auto Ack: {auto_ack} - """) - - if auto_ack: - await message.ack() - log.info(f'I successfully received a message from: {app_id} with messageID: {msg_id}') - - current_retry = retry_counts.get(delivery_tag, 0) - should_process = True - - if payload_model: - try: - self.validate_payload(message.body, payload_model) - except (ValidationError, json.JSONDecodeError, UnicodeDecodeError, TypeError) as e: - log.error(f"Payload validation failed: {e}", exc_info=True) - should_process = False - - if callback and should_process: - try: - if callback_args: - await callback(*callback_args, message, properties, message.body) - else: - await callback(message, properties, message.body) - except Exception as e: - log.error(f"Splæt! Error processing message with callback: {e}", exc_info=True) - should_process = False - - if not should_process and not auto_ack: - if current_retry < self.max_retries and requeue: - # Increment retry count and requeue - retry_counts[delivery_tag] = current_retry + 1 - # Note: aio-pika doesn't allow modifying headers after receiving, - # so we rely on the broker's redelivery mechanism - await message.reject(requeue=True) - log.info(f"Message {msg_id} requeued (attempt {current_retry}/{self.max_retries})") - else: - # Max retries reached or requeue disabled - retry_counts.pop(delivery_tag, None) - if dlx_enable: - await message.reject(requeue=False) - log.warning(f"Message {msg_id} sent to dead letter exchange after {current_retry} retries") - else: - await message.reject(requeue=False) - log.warning(f"No dead letter exchange for {queue_name} declared, proceeding to drop the message -- Ponder you life choices! byebye") - log.info(f"Dropped message content: {message.body}") - continue - - if not auto_ack: - retry_counts.pop(delivery_tag, None) - await message.ack() - log.info(f'Young grasshopper! Message ({msg_id}) from {app_id} received and properly processed.') - + """Handles asynchronous connection with RabbitMQ using aio-pika.""" + async def setup_async_connection(self): + """Setup an asynchronous connection to RabbitMQ using aio-pika.""" + log.info(f"Establishing async connection to RabbitMQ on {self.host}:{self.port}") + try: + self._connection = await connect_robust( + host=self.host, + port=self.port, + login=self.credentials[0], + password=self.credentials[1], + virtualhost=self.virtual_host, + ssl=self.ssl, + ssl_context=self.get_ssl_context(), + heartbeat=self.heartbeat + ) + self._channel = await self._connection.channel() + await self._channel.set_qos(prefetch_count=self.prefetch_count) + log.info("Async connection established successfully.") + except (AMQPConnectionError, StreamLostError, ChannelClosedByBroker, ConnectionClosedByBroker) as e: + log.error(f"Error establishing async connection: {e}", exc_info=True) + raise + except Exception as e: + log.error(f'Oh my lordy lord! I caugth an unexpected exception while trying to connect: {e}', exc_info=True) + + @retry( + retry=retry_if_exception_type(( + AMQPConnectionError, + ChannelClosedByBroker, + ConnectionClosedByBroker, + StreamLostError, + )), + stop=stop_after_attempt(3), + wait=wait_fixed(2), + before_sleep=before_sleep_log(log, WARNING) + ) + async def start_consumer( + self, + queue_name: str, + callback: Callable | None = None, + callback_args: dict[str, str | int | float | bool] | None = None, + auto_ack: bool = False, + auto_declare: bool = True, + exchange_name: str | None = None, + exchange_type: str | None = None, + routing_key: str | None = None, + payload_model: Type | None = None, + requeue: bool = True, + dlx_enable: bool = True, + dlx_exchange_name: str | None = None, + dlx_routing_key: str | None = None, + use_quorum_queues: bool = True, + enable_retry_cycles: bool = True, + retry_cycle_interval: int = 10, # Minutes between cycles + max_retry_time_limit: int = 60, # Minutes total before permanent DLX + immediate_retry_delay: int = 4, # Seconds between immediate retries + ): + """Start the async consumer with the provided setup.""" + retry_counts = {} + + # Check if there's a connection; if not, create one + try: + asyncio.get_running_loop() + except RuntimeError: + raise MrsalNoAsyncioLoopError(f'Young grasshopper! You forget to add asyncio.run(mrsal.start_consumer(...))') + if not self._connection: + await self.setup_async_connection() + + + self._channel: AioChannel = await self._connection.channel() + await self._channel.set_qos(prefetch_count=self.prefetch_count) + + if auto_declare: + if None in (exchange_name, queue_name, exchange_type, routing_key): + raise TypeError('Make sure that you are passing in all the necessary args for auto_declare') + + queue = await self._async_setup_exchange_and_queue( + exchange_name=exchange_name, + queue_name=queue_name, + exchange_type=exchange_type, + routing_key=routing_key, + dlx_enable=dlx_enable, + dlx_exchange_name=dlx_exchange_name, + dlx_routing_key=dlx_routing_key, + use_quorum_queues=use_quorum_queues + ) + + if not self.auto_declare_ok: + if self._connection: + await self._connection.close() + raise MrsalAbortedSetup('Auto declaration failed during setup.') + + log.info(f"Straight out of the swamps -- Consumer boi listening on queue: {queue_name}, exchange: {exchange_name}") + + # async with queue.iterator() as queue_iter: + async for message in queue.iterator(): + if message is None: + continue + + # Extract message metadata + delivery_tag = message.delivery_tag + app_id = message.app_id if hasattr(message, 'app_id') else 'NoAppID' + msg_id = message.app_id if hasattr(message, 'message_id') else 'NoMsgID' + + # add this so it is in line with Pikas awkawrdly old ways + properties = config.AioPikaAttributes(app_id=app_id, message_id=msg_id) + properties.headers = message.headers + + if self.verbose: + log.info(f""" + Message received with: + - Redelivery: {message.redelivered} + - Exchange: {message.exchange} + - Routing Key: {message.routing_key} + - Delivery Tag: {message.delivery_tag} + - Requeue: {requeue} + - Auto Ack: {auto_ack} + """) + + if auto_ack: + await message.ack() + log.info(f'I successfully received a message from: {app_id} with messageID: {msg_id}') + + current_retry = retry_counts.get(delivery_tag, 0) + should_process = True + + if payload_model: + try: + self.validate_payload(message.body, payload_model) + except (ValidationError, json.JSONDecodeError, UnicodeDecodeError, TypeError) as e: + log.error(f"Payload validation failed: {e}", exc_info=True) + should_process = False + + if callback and should_process: + try: + if callback_args: + await callback(*callback_args, message, properties, message.body) + else: + await callback(message, properties, message.body) + except Exception as e: + log.error(f"Splæt! Error processing message with callback: {e}", exc_info=True) + should_process = False + + if not should_process and not auto_ack: + if current_retry < self.max_retries and requeue: + # Increment retry count and requeue + retry_counts[delivery_tag] = current_retry + 1 + # Note: aio-pika doesn't allow modifying headers after receiving, + # so we rely on the broker's redelivery mechanism + await message.reject(requeue=True) + log.info(f"Message {msg_id} requeued (attempt {current_retry}/{self.max_retries})") + # Add delay before next retry + if current_retry + 1 < self.max_retries: + await asyncio.sleep(immediate_retry_delay) + else: + # Max retries reached or requeue disabled + retry_counts.pop(delivery_tag, None) + if dlx_enable and enable_retry_cycles: + # Use retry cycle logic + await self._async_publish_to_dlx_with_retry_cycle( + message, properties, "Callback processing failed", + exchange_name, routing_key, enable_retry_cycles, + retry_cycle_interval, max_retry_time_limit + ) + elif dlx_enable: + # Original DLX behavior + await message.reject(requeue=False) + log.warning(f"Message {msg_id} sent to dead letter exchange after {current_retry} retries") + else: + await message.reject(requeue=False) + log.warning(f"No dead letter exchange for {queue_name} declared, proceeding to drop the message -- Ponder you life choices! byebye") + log.info(f"Dropped message content: {message.body}") + continue + + if not auto_ack: + retry_counts.pop(delivery_tag, None) + await message.ack() + log.info(f'Young grasshopper! Message ({msg_id}) from {app_id} received and properly processed.') + + async def _async_publish_to_dlx_with_retry_cycle(self, message, properties, processing_error: str, + original_exchange: str, original_routing_key: str, + enable_retry_cycles: bool, retry_cycle_interval: int, + max_retry_time_limit: int): + """Async publish message to DLX with retry cycle headers.""" + try: + # Use common logic from superclass + self._handle_dlx_with_retry_cycle( + message, properties, message.body, processing_error, + original_exchange, original_routing_key, + enable_retry_cycles, retry_cycle_interval, max_retry_time_limit + ) + + # Acknowledge original message + await message.ack() + + except Exception as e: + log.error(f"Failed to send message to DLX: {e}") + await message.reject(requeue=True) + + async def _publish_to_dlx(self, dlx_exchange: str, routing_key: str, body: bytes, properties: dict): + """Async implementation of DLX publishing.""" + + # Create aio-pika message + message = Message( + body, + headers=properties.get('headers'), + content_type=properties.get('content_type', 'application/json'), + delivery_mode=properties.get('delivery_mode', 2) + ) + + # Set expiration if provided + if 'expiration' in properties: + message.expiration = int(properties['expiration']) + + # Get exchange and publish + exchange = await self._channel.get_exchange(dlx_exchange) + await exchange.publish(message, routing_key=routing_key) diff --git a/mrsal/config.py b/mrsal/config.py index 08f93bd..6fc9905 100644 --- a/mrsal/config.py +++ b/mrsal/config.py @@ -13,6 +13,7 @@ class ValidateTLS(BaseModel): class AioPikaAttributes(BaseModel): message_id: str | None app_id: str | None + headers: dict | None = None class MrsalNoAsyncioLoopFound(Exception): diff --git a/mrsal/superclass.py b/mrsal/superclass.py index 8ec1273..328c427 100644 --- a/mrsal/superclass.py +++ b/mrsal/superclass.py @@ -3,6 +3,7 @@ import ssl import pika import logging +from datetime import datetime, timezone from ssl import SSLContext from typing import Any, Type from pika.connection import SSLOptions @@ -58,7 +59,7 @@ def __post_init__(self) -> None: self.tls_dict = {cert: (env_var if env_var != '' else None) for cert, env_var in tls_dict.items()} config.ValidateTLS(**self.tls_dict) - def _setup_exchange_and_queue(self, + def _setup_exchange_and_queue(self, exchange_name: str, queue_name: str, exchange_type: str, routing_key: str, exch_args: dict[str, str] | None = None, queue_args: dict[str, str] | None = None, @@ -89,7 +90,7 @@ def _setup_exchange_and_queue(self, except MrsalSetupError as e: log.warning(f"DLX {dlx_name} might already exist or failed to create: {e}") - + if queue_args is None: queue_args = {} @@ -147,7 +148,7 @@ def _setup_exchange_and_queue(self, log.error(f'Splæt! I failed the declaration setup with {e}', exc_info=True) self.auto_declare_ok = False - async def _async_setup_exchange_and_queue(self, + async def _async_setup_exchange_and_queue(self, exchange_name: str, queue_name: str, routing_key: str, exchange_type: str, exch_args: dict[str, str] | None = None, @@ -185,14 +186,14 @@ async def _async_setup_exchange_and_queue(self, except MrsalSetupError as e: log.warning(f"DLX {dlx_name} might already exist or failed to create: {e}") - + if queue_args is None: queue_args = {} queue_args.update({ 'x-dead-letter-exchange': dlx_name, 'x-dead-letter-routing-key': dlx_routing - }) + }) if use_quorum_queues: if queue_args is None: @@ -246,7 +247,7 @@ async def _async_setup_exchange_and_queue(self, self.auto_declare_ok = False - def _declare_exchange(self, + def _declare_exchange(self, exchange: str, exchange_type: str, arguments: dict[str, str] | None, durable: bool, passive: bool, @@ -289,13 +290,13 @@ def _declare_exchange(self, if self.verbose: log.info("Exchange declared yo!") - async def _async_declare_exchange(self, - exchange: str, - exchange_type: AioExchangeType, - arguments: dict[str, str] | None = None, - durable: bool = True, - passive: bool = False, - internal: bool = False, + async def _async_declare_exchange(self, + exchange: str, + exchange_type: AioExchangeType, + arguments: dict[str, str] | None = None, + durable: bool = True, + passive: bool = False, + internal: bool = False, auto_delete: bool = False) -> AioExchange: """Declare a RabbitMQ exchange in async mode.""" exchange_declare_info = f""" @@ -312,11 +313,11 @@ async def _async_declare_exchange(self, try: exchange_obj = await self._channel.declare_exchange( - name=exchange, - type=exchange_type, - durable=durable, - auto_delete=auto_delete, - internal=internal, + name=exchange, + type=exchange_type, + durable=durable, + auto_delete=auto_delete, + internal=internal, arguments=arguments ) return exchange_obj @@ -361,11 +362,11 @@ def _declare_queue(self, if self.verbose: log.info(f"Queue declared yo") - async def _async_declare_queue(self, - queue_name: str, - durable: bool = True, - exclusive: bool = False, - auto_delete: bool = False, + async def _async_declare_queue(self, + queue_name: str, + durable: bool = True, + exclusive: bool = False, + auto_delete: bool = False, passive: bool = False, arguments: dict[str, Any] | None = None) -> AioQueue: """Declare a RabbitMQ queue asynchronously.""" @@ -381,10 +382,10 @@ async def _async_declare_queue(self, try: queue_obj = await self._channel.declare_queue( - name=queue_name, - durable=durable, - exclusive=exclusive, - auto_delete=auto_delete, + name=queue_name, + durable=durable, + exclusive=exclusive, + auto_delete=auto_delete, arguments=arguments, passive=passive ) @@ -392,7 +393,7 @@ async def _async_declare_queue(self, except Exception as e: raise MrsalSetupError(f"Failed to declare async queue: {e}") - def _declare_queue_binding(self, + def _declare_queue_binding(self, exchange: str, queue: str, routing_key: str | None, arguments: dict[str, str] | None @@ -419,10 +420,10 @@ def _declare_queue_binding(self, if self.verbose: log.info(f"Queue bound yo") - async def _async_declare_queue_binding(self, - queue: AioQueue, - exchange: AioExchange, - routing_key: str | None, + async def _async_declare_queue_binding(self, + queue: AioQueue, + exchange: AioExchange, + routing_key: str | None, arguments: dict[str, Any] | None = None) -> None: """Bind the queue to the exchange asynchronously.""" binding_info = f""" @@ -497,3 +498,108 @@ def _get_retry_count(self, properties) -> int: def _has_dlx_configured(self, queue_name: str) -> bool: """Check if the queue has a dead letter exchange configured.""" return self.dlx_enable + + def _get_retry_cycle_info(self, properties) -> dict: + """Extract retry cycle information from message headers.""" + if not hasattr(properties, 'headers') or not properties.headers: + return {'cycle_count': 0, 'first_failure': None, 'total_elapsed': 0} + + headers = properties.headers + return { + 'cycle_count': headers.get('x-cycle-count', 0), + 'first_failure': headers.get('x-first-failure'), + 'total_elapsed': headers.get('x-total-elapsed', 0) + } + + def _should_continue_retry_cycles(self, retry_info: dict, enable_retry_cycles: bool, + max_retry_time_limit: int) -> bool: + """Check if message should continue retry cycles or go to permanent DLX.""" + if not enable_retry_cycles or not self.dlx_enable: + return False + + max_time_ms = max_retry_time_limit * 60 * 1000 + return retry_info['total_elapsed'] < max_time_ms + + def _create_retry_cycle_headers(self, original_headers: dict, cycle_count: int, + first_failure: str, processing_error: str, + should_cycle: bool, original_exchange: str, + original_routing_key: str) -> dict: + """Create headers for DLX message with retry cycle info.""" + headers = original_headers.copy() if original_headers else {} + now = datetime.now(timezone.utc).isoformat() + + # Calculate elapsed time + if first_failure: + try: + first_time = datetime.fromisoformat(first_failure.replace('Z', '')) + elapsed_ms = int((datetime.now(timezone.utc) - first_time).total_seconds() * 1000) + except: + elapsed_ms = 0 + else: + first_failure = now + elapsed_ms = 0 + + # Update retry cycle tracking + headers.update({ + 'x-cycle-count': cycle_count + 1, + 'x-first-failure': first_failure, + 'x-total-elapsed': elapsed_ms, + 'x-processing-error': processing_error, + 'x-retry-exhausted': not should_cycle + }) + + # If cycling, set TTL and routing back to original queue + if should_cycle: + headers.update({ + 'x-dead-letter-exchange': original_exchange, + 'x-dead-letter-routing-key': original_routing_key + }) + + return headers + + def _handle_dlx_with_retry_cycle(self, method_frame, properties, body, processing_error: str, + original_exchange: str, original_routing_key: str, + enable_retry_cycles: bool, retry_cycle_interval: int, + max_retry_time_limit: int): + """Base method for DLX handling with retry cycles.""" + # Get retry info + retry_info = self._get_retry_cycle_info(properties) + should_cycle = self._should_continue_retry_cycles(retry_info, enable_retry_cycles, max_retry_time_limit) + + # Get DLX info + dlx_name = f"{method_frame.routing_key}.dlx" + dlx_routing = original_routing_key + + # Create enhanced headers + original_headers = getattr(properties, 'headers', {}) or {} + enhanced_headers = self._create_retry_cycle_headers( + original_headers, retry_info['cycle_count'], retry_info['first_failure'], + processing_error, should_cycle, original_exchange, original_routing_key + ) + + # Create properties for DLX message + dlx_properties = { + 'headers': enhanced_headers, + 'delivery_mode': 2, # Persistent + 'content_type': getattr(properties, 'content_type', 'application/json') + } + + # Set TTL if cycling + if should_cycle: + ttl_ms = retry_cycle_interval * 60 * 1000 + dlx_properties['expiration'] = str(ttl_ms) + + # Call subclass-specific publish method + self._publish_to_dlx(dlx_name, dlx_routing, body, dlx_properties) + + # Log result + if should_cycle: + log.info(f"Message sent to DLX for retry cycle {retry_info['cycle_count'] + 1} " + f"(next retry in {retry_cycle_interval}m)") + else: + log.error(f"Message permanently failed after {retry_info['cycle_count']} cycles " + f"- staying in DLX for manual replay") + + def _publish_to_dlx(self, dlx_exchange: str, routing_key: str, body: bytes, properties: dict): + """Abstract method - implemented by subclasses.""" + raise NotImplementedError("Subclasses must implement _publish_to_dlx") diff --git a/pyproject.toml b/pyproject.toml index 0aecad4..94fab98 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ license = "" maintainers = ["Jon E Nesvold "] name = "mrsal" readme = "README.md" -version = "1.1.5" +version = "1.2.0" [tool.poetry.dependencies] colorlog = "^6.7.0" diff --git a/reports/coverage/.coverage b/reports/coverage/.coverage index 130f900..d90cf9a 100644 Binary files a/reports/coverage/.coverage and b/reports/coverage/.coverage differ diff --git a/reports/coverage/coverage.xml b/reports/coverage/coverage.xml index 6515f79..720ae75 100644 --- a/reports/coverage/coverage.xml +++ b/reports/coverage/coverage.xml @@ -1,6 +1,6 @@ - - + + /home/runner/work/mrsal/mrsal @@ -79,7 +79,7 @@ - + @@ -106,7 +106,8 @@ - + + @@ -117,7 +118,7 @@ - + @@ -129,13 +130,13 @@ - - + + - - + + - + @@ -148,68 +149,68 @@ - + - + - - + + - - - + + + - + - - + + - - - - - - - - + + + + + + + + - + - - - - + + + + - + - - - + + + - + - - + + - - - - - - - - - + + + + + + + + + @@ -217,100 +218,143 @@ - + - - - - - + + + + + - + - - - - - - - + + + + + + + - - + + - - - - - + + + + + - + - - - - - - - + + + + + + + - - + + - - - - + + + + - + - - - - - - - + + + + + + + - + - - + + - - + + - + - - - - + + + + - + - + - - - + + + - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + - + @@ -321,216 +365,249 @@ - + - + - - + + - - - - + + + + - - + + - - - - - - - - - - - + + + + + + + + + - + - - + - - - - - + + + + + + - - + - + - - + + - + + - + - - - + - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + - - - - - - - + + + + + + + - - - - - - - + + + + + + + + + + - - - - - - - - - - - - + + + + - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + - - - - - - - + - - - - - - - - - - - - - - - + + + + + + + + + - + + - - + - - + - - - - - - - - - - + + + + - - + + - - - - - + + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + @@ -763,135 +840,261 @@ - + - - + + + + - - + - + - + + + - - - - - - - - - + + + - + + + + + - - - - - - - - - + + + + + - - - - - - - + + + + + + + + - - + - - - - - - - - - - + + - + + + + + + + + - + + - - - - - + + + + + - + + - + - - - - - - - - + + + + + + + - - - - - + + + - - + + - + - + + - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/reports/junit/junit.xml b/reports/junit/junit.xml index ec062e2..53bc8c4 100644 --- a/reports/junit/junit.xml +++ b/reports/junit/junit.xml @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file diff --git a/tests/test_mrsal_dlx_retry.py b/tests/test_mrsal_dlx_retry.py index 9105449..9a570d6 100644 --- a/tests/test_mrsal_dlx_retry.py +++ b/tests/test_mrsal_dlx_retry.py @@ -1,279 +1,593 @@ import pytest -from unittest.mock import Mock, MagicMock, patch, AsyncMock +from unittest.mock import Mock, MagicMock, AsyncMock, patch +from pydantic.dataclasses import dataclass +from pydantic import ValidationError + from mrsal.amqp.subclass import MrsalBlockingAMQP, MrsalAsyncAMQP -import mrsal.amqp.subclass -from tests.conftest import SETUP_ARGS, ExpectedPayload - - -class TestDLXConfiguration: - """Test Dead Letter Exchange configuration.""" - - def test_dlx_enabled_by_default(self): - """Test that DLX is enabled by default.""" - consumer = MrsalBlockingAMQP(**SETUP_ARGS) - assert consumer.dlx_enable is True - - def test_dlx_can_be_disabled(self): - """Test that DLX can be explicitly disabled.""" - consumer = MrsalBlockingAMQP(**SETUP_ARGS, dlx_enable=False) - assert consumer.dlx_enable is False - - @patch('mrsal.amqp.subclass.MrsalBlockingAMQP.setup_blocking_connection') - def test_dlx_setup_in_exchange_and_queue(self, mock_setup): - """Test that DLX exchange is created during setup.""" - consumer = MrsalBlockingAMQP(**SETUP_ARGS) - consumer._channel = MagicMock() - - # Mock the declare methods - consumer._declare_exchange = Mock() - consumer._declare_queue = Mock() - consumer._declare_queue_binding = Mock() - - consumer._setup_exchange_and_queue( - exchange_name="test_exchange", - queue_name="test_queue", - exchange_type="direct", - routing_key="test_key", - dlx_enable=True - ) - - # Should declare DLX exchange and main exchange - assert consumer._declare_exchange.call_count == 2 - - # Check DLX exchange was declared - dlx_call = consumer._declare_exchange.call_args_list[0] - assert dlx_call[1]['exchange'] == "test_queue.dlx" - - @patch('mrsal.amqp.subclass.MrsalBlockingAMQP.setup_blocking_connection') - def test_custom_dlx_name(self, mock_setup): - """Test custom DLX exchange name.""" - consumer = MrsalBlockingAMQP(**SETUP_ARGS) - consumer._channel = MagicMock() - - consumer._declare_exchange = Mock() - consumer._declare_queue = Mock() - consumer._declare_queue_binding = Mock() - - consumer._setup_exchange_and_queue( - exchange_name="test_exchange", - queue_name="test_queue", - exchange_type="direct", - routing_key="test_key", - dlx_enable=True, - dlx_exchange_name="custom_dlx" - ) - - # Check custom DLX name was used - dlx_call = consumer._declare_exchange.call_args_list[0] - assert dlx_call[1]['exchange'] == "custom_dlx" - - -class TestQuorumQueues: - """Test Quorum queue configuration.""" - - def test_quorum_enabled_by_default(self): - """Test that quorum queues are enabled by default.""" - consumer = MrsalBlockingAMQP(**SETUP_ARGS) - assert consumer.use_quorum_queues is True - - @patch('mrsal.amqp.subclass.MrsalBlockingAMQP.setup_blocking_connection') - def test_quorum_queue_arguments(self, mock_setup): - """Test that quorum queue arguments are added.""" - consumer = MrsalBlockingAMQP(**SETUP_ARGS) - consumer._channel = MagicMock() - - consumer._declare_exchange = Mock() - consumer._declare_queue = Mock() - consumer._declare_queue_binding = Mock() - - consumer._setup_exchange_and_queue( - exchange_name="test_exchange", - queue_name="test_queue", - exchange_type="direct", - routing_key="test_key", - use_quorum_queues=True - ) - - # Check quorum arguments were added to queue - queue_call = consumer._declare_queue.call_args - queue_args = queue_call[1]['arguments'] - - assert 'x-queue-type' in queue_args - assert queue_args['x-queue-type'] == 'quorum' - assert 'x-quorum-initial-group-size' in queue_args +from mrsal.exceptions import MrsalAbortedSetup + + +@dataclass +class ExpectedPayload: + id: int + name: str + active: bool + + +class AsyncIteratorMock: + """Mock async iterator for aio-pika queue.iterator()""" + def __init__(self, items): + self.items = iter(items) + + def __aiter__(self): + return self + + async def __anext__(self): + try: + return next(self.items) + except StopIteration: + raise StopAsyncIteration class TestRetryMechanism: - """Test message retry mechanism.""" - - @pytest.fixture - def mock_consumer(self): - """Create a mocked consumer for retry testing.""" - consumer = MrsalBlockingAMQP(**SETUP_ARGS, max_retries=2) # Low retry count for testing - consumer._channel = MagicMock() - consumer.setup_blocking_connection = Mock() - consumer._setup_exchange_and_queue = Mock() - consumer.auto_declare_ok = True - return consumer - - def test_max_retries_setting(self): - """Test that max_retries can be configured.""" - consumer = MrsalBlockingAMQP(**SETUP_ARGS, max_retries=5) - assert consumer.max_retries == 5 - - def test_successful_processing_no_retry(self, mock_consumer): - """Test that successful processing doesn't trigger retries.""" - # Mock a valid message - mock_method_frame = MagicMock() - mock_method_frame.delivery_tag = 123 - mock_properties = MagicMock() - valid_body = b'{"id": 1, "name": "Test", "active": true}' - - mock_consumer._channel.consume.return_value = [(mock_method_frame, mock_properties, valid_body)] - mock_callback = Mock() - - mock_consumer.start_consumer( - queue_name="test_queue", - callback=mock_callback, - auto_ack=False, - auto_declare=True, - exchange_name="test_exchange", - exchange_type="direct", - routing_key="test_key", - payload_model=ExpectedPayload - ) - - # Should call ack, not nack - mock_consumer._channel.basic_ack.assert_called_once_with(delivery_tag=123) - mock_consumer._channel.basic_nack.assert_not_called() - - def test_validation_failure_with_dlx(self, mock_consumer): - """Test that validation failures eventually go to DLX after max retries.""" - # Mock an invalid message that will fail validation - mock_method_frame = MagicMock() - mock_method_frame.delivery_tag = 123 - mock_properties = MagicMock() - invalid_body = b'{"id": "wrong_type", "name": "Test", "active": true}' - - # Need to return the same message multiple times to simulate retries - mock_consumer._channel.consume.return_value = [ - (mock_method_frame, mock_properties, invalid_body) - ] * 3 - - mock_consumer.start_consumer( - queue_name="test_queue", - callback=Mock(), - auto_ack=False, - auto_declare=True, - exchange_name="test_exchange", - exchange_type="direct", - routing_key="test_key", - payload_model=ExpectedPayload, - dlx_enable=True # DLX enabled - ) - - # Should eventually nack with requeue=False (send to DLX) - final_nack_call = mock_consumer._channel.basic_nack.call_args_list[-1] - assert final_nack_call[1]['delivery_tag'] == 123 - assert final_nack_call[1]['requeue'] == False # Goes to DLX - - def test_validation_failure_without_dlx(self, mock_consumer): - """Test that validation failures are dropped when no DLX configured.""" - mock_method_frame = MagicMock() - mock_method_frame.delivery_tag = 123 - mock_properties = MagicMock() - invalid_body = b'{"id": "wrong_type", "name": "Test", "active": true}' - - mock_consumer._channel.consume.return_value = [ - (mock_method_frame, mock_properties, invalid_body) - ] * 10 - - with patch.object(mrsal.amqp.subclass.log, 'warning') as mock_log_warning: - mock_consumer.start_consumer( - queue_name="test_queue", - callback=Mock(), - auto_ack=False, - auto_declare=True, - exchange_name="test_exchange", - exchange_type="direct", - routing_key="test_key", - payload_model=ExpectedPayload, - dlx_enable=False # No DLX - ) - - # Should log the expected warning about dropping message - mock_log_warning.assert_called() - warning_message = mock_log_warning.call_args[0][0] - assert "No dead letter exchange declared for" in warning_message - assert "reflect on your life choices! byebye" in warning_message - - -class TestIntegration: - """Integration tests for the complete flow.""" - - @patch('mrsal.amqp.subclass.MrsalBlockingAMQP.setup_blocking_connection') - def test_complete_consumer_setup_with_defaults(self, mock_setup): - """Test complete consumer setup with all default features enabled.""" - consumer = MrsalBlockingAMQP(**SETUP_ARGS) - consumer._channel = MagicMock() - - # Mock all the declare methods - consumer._declare_exchange = Mock() - consumer._declare_queue = Mock() - consumer._declare_queue_binding = Mock() - - # Mock consume to return one message then stop - mock_method_frame = MagicMock() - mock_method_frame.delivery_tag = 789 - mock_properties = MagicMock() - valid_body = b'{"id": 1, "name": "Test", "active": true}' - - mock_consumer_iter = iter([(mock_method_frame, mock_properties, valid_body)]) - consumer._channel.consume.return_value = mock_consumer_iter - - mock_callback = Mock() - - consumer.start_consumer( - queue_name="integration_queue", - callback=mock_callback, - auto_ack=False, - auto_declare=True, - exchange_name="integration_exchange", - exchange_type="direct", - routing_key="integration_key" - # Using all defaults: dlx_enable=True, use_quorum_queues=True, max_retries=3 - ) - - # Verify all components were set up - assert consumer._declare_exchange.call_count == 2 # Main + DLX - assert consumer._declare_queue.call_count == 1 - assert consumer._declare_queue_binding.call_count == 1 - - # Verify DLX was configured - dlx_call = consumer._declare_exchange.call_args_list[0] - assert dlx_call[1]['exchange'] == "integration_queue.dlx" - - # Verify quorum queue was configured - queue_call = consumer._declare_queue.call_args - queue_args = queue_call[1]['arguments'] - assert queue_args['x-queue-type'] == 'quorum' - assert 'x-dead-letter-exchange' in queue_args - - # Verify message was processed successfully - mock_callback.assert_called_once() - consumer._channel.basic_ack.assert_called_once_with(delivery_tag=789) - - def test_minimal_setup_all_disabled(self): - """Test minimal setup with all enhanced features disabled.""" - consumer = MrsalBlockingAMQP( - **SETUP_ARGS, - dlx_enable=False, - use_quorum_queues=False, - max_retries=1 - ) - - # Verify settings - assert consumer.dlx_enable is False - assert consumer.use_quorum_queues is False - assert consumer.max_retries == 1 + """Test retry mechanism with DLX cycles""" + + @pytest.fixture + def mock_consumer(self): + """Create a mock consumer with mocked connection and channel""" + consumer = MrsalBlockingAMQP( + host="localhost", + port=5672, + credentials=("user", "password"), + virtual_host="testboi", + ssl=False, + verbose=False, + prefetch_count=1, + heartbeat=60, + dlx_enable=True, + max_retries=2, + use_quorum_queues=True, + blocked_connection_timeout=60 + ) + + # Mock connection and channel + consumer._connection = MagicMock() + consumer._channel = MagicMock() + consumer.auto_declare_ok = True + + # Mock setup methods + consumer.setup_blocking_connection = MagicMock() + consumer._setup_exchange_and_queue = MagicMock() + + return consumer + + def test_immediate_retry_params_exist(self, mock_consumer): + """Test that start_consumer accepts immediate retry parameters""" + # Should not raise error when passing immediate retry parameters + mock_consumer._channel.consume.return_value = [] + + try: + mock_consumer.start_consumer( + queue_name="test_queue", + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + immediate_retry_delay=5, + enable_retry_cycles=True, + retry_cycle_interval=10, + max_retry_time_limit=60 + ) + except Exception as e: + if "unexpected keyword argument" in str(e): + pytest.fail(f"start_consumer doesn't accept retry parameters: {e}") + + def test_retry_cycle_params_exist(self, mock_consumer): + """Test that start_consumer accepts retry cycle parameters""" + mock_consumer._channel.consume.return_value = [] + + try: + mock_consumer.start_consumer( + queue_name="test_queue", + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + enable_retry_cycles=True, + retry_cycle_interval=15, + max_retry_time_limit=90 + ) + except Exception as e: + if "unexpected keyword argument" in str(e): + pytest.fail(f"start_consumer doesn't accept retry cycle parameters: {e}") + + @patch('mrsal.amqp.subclass.time.sleep') + def test_immediate_retry_delay(self, mock_sleep, mock_consumer): + """Test that immediate retry delay is applied between retries""" + mock_method_frame = MagicMock() + mock_method_frame.delivery_tag = 123 + mock_properties = MagicMock() + mock_properties.message_id = 'test_msg' + invalid_body = b'{"id": "wrong_type", "name": "Test", "active": true}' + + # Create generator that yields same message multiple times (simulating redelivery) + def consume_generator(): + for _ in range(3): # 3 attempts (initial + 2 retries) + yield (mock_method_frame, mock_properties, invalid_body) + + mock_consumer._channel.consume.return_value = consume_generator() + + mock_consumer.start_consumer( + queue_name="test_queue", + callback=Mock(), + auto_ack=False, + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + payload_model=ExpectedPayload, + immediate_retry_delay=3 # 3 seconds delay + ) + + # Should have called sleep during retries + mock_sleep.assert_called_with(3) + + def test_validation_failure_with_retry_cycles_disabled(self, mock_consumer): + """Test that original DLX behavior is used when retry cycles disabled.""" + mock_method_frame = MagicMock() + mock_method_frame.delivery_tag = 123 + mock_method_frame.routing_key = "test_key" + mock_properties = MagicMock() + mock_properties.message_id = 'test_msg' + mock_properties.app_id = 'test_app' + invalid_body = b'{"id": "wrong_type", "name": "Test", "active": true}' + + # Create generator that simulates redelivery cycle + def consume_generator(): + # Initial delivery + max_retries (2) = 3 total attempts + for _ in range(3): + yield (mock_method_frame, mock_properties, invalid_body) + + mock_consumer._channel.consume.return_value = consume_generator() + + mock_consumer.start_consumer( + queue_name="test_queue", + callback=Mock(), + auto_ack=False, + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + payload_model=ExpectedPayload, + dlx_enable=True, + enable_retry_cycles=False # Disable retry cycles + ) + + # Should use original behavior (basic_nack with requeue=False) after max retries + nack_calls = mock_consumer._channel.basic_nack.call_args_list + + # First calls should be requeue=True (immediate retries) + for i in range(mock_consumer.max_retries): + assert nack_calls[i][1]['requeue'] == True + + # Final call should be requeue=False (goes to DLX) + final_nack_call = nack_calls[-1] + assert final_nack_call[1]['delivery_tag'] == 123 + assert final_nack_call[1]['requeue'] == False # Goes to DLX + + def test_validation_failure_with_retry_cycles_enabled(self, mock_consumer): + """Test that retry cycle DLX logic is used when enabled""" + mock_method_frame = MagicMock() + mock_method_frame.delivery_tag = 123 + mock_method_frame.routing_key = "test_key" + mock_properties = MagicMock() + mock_properties.message_id = 'test_msg' + mock_properties.app_id = 'test_app' + mock_properties.headers = None + invalid_body = b'{"id": "wrong_type", "name": "Test", "active": true}' + + # Create generator that simulates redelivery cycle + def consume_generator(): + # Initial delivery + max_retries (2) = 3 total attempts + for _ in range(3): + yield (mock_method_frame, mock_properties, invalid_body) + + mock_consumer._channel.consume.return_value = consume_generator() + + # Mock the DLX publishing method + mock_consumer._publish_to_dlx = MagicMock() + + mock_consumer.start_consumer( + queue_name="test_queue", + callback=Mock(), + auto_ack=False, + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + payload_model=ExpectedPayload, + dlx_enable=True, + enable_retry_cycles=True, # Enable retry cycles + retry_cycle_interval=10, + max_retry_time_limit=60 + ) + + # Should call the DLX retry cycle method instead of basic_nack + mock_consumer._publish_to_dlx.assert_called() + + def test_callback_failure_triggers_retry(self, mock_consumer): + """Test that callback failures trigger the retry mechanism""" + mock_method_frame = MagicMock() + mock_method_frame.delivery_tag = 456 + mock_properties = MagicMock() + mock_properties.message_id = 'callback_test' + valid_body = b'{"id": 123, "name": "Test", "active": true}' + + def consume_generator(): + for _ in range(3): + yield (mock_method_frame, mock_properties, valid_body) + + mock_consumer._channel.consume.return_value = consume_generator() + + # Mock callback that raises exception + failing_callback = Mock(side_effect=Exception("Callback failed")) + + mock_consumer.start_consumer( + queue_name="test_queue", + callback=failing_callback, + auto_ack=False, + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + payload_model=ExpectedPayload, + immediate_retry_delay=1 + ) + + # Callback should be called multiple times due to retries + assert failing_callback.call_count >= 2 + + # Should have nack calls with requeue + nack_calls = mock_consumer._channel.basic_nack.call_args_list + assert len(nack_calls) > 0 + assert nack_calls[0][1]['delivery_tag'] == 456 + + def test_successful_processing_acks_message(self, mock_consumer): + """Test that successful processing acknowledges the message""" + mock_method_frame = MagicMock() + mock_method_frame.delivery_tag = 789 + mock_properties = MagicMock() + mock_properties.message_id = 'success_test' + mock_properties.app_id = 'test_app' + valid_body = b'{"id": 123, "name": "Test", "active": true}' + + mock_consumer._channel.consume.return_value = [ + (mock_method_frame, mock_properties, valid_body) + ] + + successful_callback = Mock() # No exception + + mock_consumer.start_consumer( + queue_name="test_queue", + callback=successful_callback, + auto_ack=False, + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + payload_model=ExpectedPayload + ) + + # Should acknowledge the message + mock_consumer._channel.basic_ack.assert_called_with(delivery_tag=789) + + # Should not have any nack calls + mock_consumer._channel.basic_nack.assert_not_called() + + +class TestAsyncRetryCycles: + """Test async consumer retry cycle functionality""" + + @pytest.fixture + def mock_async_consumer(self): + """Create a mock async consumer""" + consumer = MrsalAsyncAMQP( + host="localhost", + port=5672, + credentials=("user", "password"), + virtual_host="testboi", + ssl=False, + verbose=False, + prefetch_count=1, + heartbeat=60, + dlx_enable=True, + max_retries=2, + use_quorum_queues=True + ) + + # Mock connection and channel + consumer._connection = AsyncMock() + consumer._channel = AsyncMock() + consumer.auto_declare_ok = True + + # Mock setup methods + consumer.setup_async_connection = AsyncMock() + consumer._async_setup_exchange_and_queue = AsyncMock() + + return consumer + + @pytest.mark.asyncio + async def test_async_start_consumer_has_retry_cycle_params(self, mock_async_consumer): + """Test that async start_consumer accepts retry cycle parameters.""" + # Mock minimal setup + mock_queue = AsyncMock() + mock_async_consumer._async_setup_exchange_and_queue.return_value = mock_queue + + # FIX: Create proper async iterator mock + async_iterator = AsyncIteratorMock([]) # Empty iterator to avoid infinite loop + + # Mock the iterator method to return our async iterator (not a coroutine) + mock_queue.iterator = Mock(return_value=async_iterator) + + # Should not raise error with retry cycle parameters + await mock_async_consumer.start_consumer( + queue_name="test_queue", + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + enable_retry_cycles=True, + retry_cycle_interval=15, + max_retry_time_limit=90, + immediate_retry_delay=5 + ) + + @pytest.mark.asyncio + @patch('mrsal.amqp.subclass.asyncio.sleep') + async def test_async_immediate_retry_delay(self, mock_async_sleep, mock_async_consumer): + """Test that immediate retry delay is applied in async consumer.""" + # Mock message that will fail processing + mock_message = MagicMock() + mock_message.delivery_tag = 123 + mock_message.app_id = 'test_app' + mock_message.headers = None + mock_message.body = b'{"id": "wrong_type", "name": "Test", "active": true}' + mock_message.reject = AsyncMock() + + mock_properties = MagicMock() + mock_properties.headers = None + + # Mock queue iterator to return message multiple times (simulating redelivery) + mock_queue = AsyncMock() + mock_async_consumer._async_setup_exchange_and_queue.return_value = mock_queue + + # FIX: Create proper async iterator with multiple redeliveries + async_iterator = AsyncIteratorMock([mock_message] * 3) # 3 redeliveries + mock_queue.iterator = Mock(return_value=async_iterator) + + await mock_async_consumer.start_consumer( + queue_name="test_queue", + callback=AsyncMock(side_effect=Exception("Processing failed")), + auto_ack=False, + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + immediate_retry_delay=2 # 2 seconds delay + ) + + # Verify asyncio.sleep was called with the delay + mock_async_sleep.assert_called_with(2) + + @pytest.mark.asyncio + async def test_async_validation_failure_with_cycles_disabled(self, mock_async_consumer): + """Test async consumer with validation failure and retry cycles disabled""" + # Mock message with invalid payload + mock_message = MagicMock() + mock_message.delivery_tag = 123 + mock_message.app_id = 'test_app' + mock_message.headers = None + mock_message.body = b'{"id": "wrong_type", "name": "Test", "active": true}' + mock_message.reject = AsyncMock() + mock_message.ack = AsyncMock() + + mock_properties = MagicMock() + mock_properties.headers = None + + mock_queue = AsyncMock() + mock_async_consumer._async_setup_exchange_and_queue.return_value = mock_queue + + # Simulate multiple redeliveries + async_iterator = AsyncIteratorMock([mock_message] * 3) + mock_queue.iterator = Mock(return_value=async_iterator) + + await mock_async_consumer.start_consumer( + queue_name="test_queue", + callback=AsyncMock(), + auto_ack=False, + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + payload_model=ExpectedPayload, + dlx_enable=True, + enable_retry_cycles=False + ) + + # Should have called reject with requeue=False eventually + mock_message.reject.assert_called() + + @pytest.mark.asyncio + async def test_async_successful_processing(self, mock_async_consumer): + """Test async consumer successful message processing""" + # Mock message with valid payload + mock_message = MagicMock() + mock_message.delivery_tag = 456 + mock_message.app_id = 'test_app' + mock_message.headers = None + mock_message.body = b'{"id": 123, "name": "Test", "active": true}' + mock_message.ack = AsyncMock() + mock_message.reject = AsyncMock() + + mock_properties = MagicMock() + mock_properties.headers = None + + mock_queue = AsyncMock() + mock_async_consumer._async_setup_exchange_and_queue.return_value = mock_queue + + # Single successful message + async_iterator = AsyncIteratorMock([mock_message]) + mock_queue.iterator = Mock(return_value=async_iterator) + + successful_callback = AsyncMock() + + await mock_async_consumer.start_consumer( + queue_name="test_queue", + callback=successful_callback, + auto_ack=False, + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + payload_model=ExpectedPayload + ) + + # Should acknowledge the message + mock_message.ack.assert_called() + + # Should not reject + mock_message.reject.assert_not_called() + + @pytest.mark.asyncio + async def test_async_callback_failure_retry_cycles(self, mock_async_consumer): + """Test async consumer callback failure with retry cycles enabled""" + mock_message = MagicMock() + mock_message.delivery_tag = 789 + mock_message.app_id = 'test_app' + mock_message.headers = None + mock_message.body = b'{"id": 123, "name": "Test", "active": true}' + mock_message.ack = AsyncMock() + mock_message.reject = AsyncMock() + + mock_properties = MagicMock() + mock_properties.headers = None + + mock_queue = AsyncMock() + mock_async_consumer._async_setup_exchange_and_queue.return_value = mock_queue + + # Multiple message deliveries for retry testing + async_iterator = AsyncIteratorMock([mock_message] * 3) + mock_queue.iterator = Mock(return_value=async_iterator) + + # Mock the DLX publishing method + mock_async_consumer._publish_to_dlx = AsyncMock() + + failing_callback = AsyncMock(side_effect=Exception("Async callback failed")) + + await mock_async_consumer.start_consumer( + queue_name="test_queue", + callback=failing_callback, + auto_ack=False, + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + payload_model=ExpectedPayload, + dlx_enable=True, + enable_retry_cycles=True, + retry_cycle_interval=5, + max_retry_time_limit=30 + ) + + # Callback should be called multiple times + assert failing_callback.call_count >= 2 + + +class TestDLXRetryHeaders: + """Test DLX retry cycle header management""" + + @pytest.fixture + def mock_consumer(self): + consumer = MrsalBlockingAMQP( + host="localhost", + port=5672, + credentials=("user", "password"), + virtual_host="testboi", + dlx_enable=True, + max_retries=2 + ) + consumer._connection = MagicMock() + consumer._channel = MagicMock() + return consumer + + def test_retry_cycle_info_extraction(self, mock_consumer): + """Test extraction of retry cycle information from headers""" + # Mock properties with retry headers + mock_properties = MagicMock() + mock_properties.headers = { + 'x-cycle-count': 2, + 'x-first-failure': '2024-01-01T10:00:00Z', + 'x-total-elapsed': 120000 # 2 minutes in ms + } + + retry_info = mock_consumer._get_retry_cycle_info(mock_properties) + + assert retry_info['cycle_count'] == 2 + assert retry_info['first_failure'] == '2024-01-01T10:00:00Z' + assert retry_info['total_elapsed'] == 120000 + + def test_retry_cycle_info_defaults(self, mock_consumer): + """Test default values when no retry headers present""" + mock_properties = MagicMock() + mock_properties.headers = None + + retry_info = mock_consumer._get_retry_cycle_info(mock_properties) + + assert retry_info['cycle_count'] == 0 + assert retry_info['first_failure'] is None + assert retry_info['total_elapsed'] == 0 + + def test_should_continue_retry_cycles_time_limit(self, mock_consumer): + """Test retry cycle time limit checking""" + # Within time limit + retry_info = {'total_elapsed': 30000} # 30 seconds + should_continue = mock_consumer._should_continue_retry_cycles( + retry_info, enable_retry_cycles=True, max_retry_time_limit=1 # 1 minute + ) + assert should_continue is True + + # Exceeded time limit + retry_info = {'total_elapsed': 120000} # 2 minutes + should_continue = mock_consumer._should_continue_retry_cycles( + retry_info, enable_retry_cycles=True, max_retry_time_limit=1 # 1 minute + ) + assert should_continue is False + + def test_should_continue_retry_cycles_disabled(self, mock_consumer): + """Test retry cycle disabled""" + retry_info = {'total_elapsed': 0} + should_continue = mock_consumer._should_continue_retry_cycles( + retry_info, enable_retry_cycles=False, max_retry_time_limit=60 + ) + assert should_continue is False + + def test_create_retry_cycle_headers(self, mock_consumer): + """Test creation of retry cycle headers""" + original_headers = {'custom-header': 'value'} + + headers = mock_consumer._create_retry_cycle_headers( + original_headers=original_headers, + cycle_count=1, + first_failure='2024-01-01T10:00:00Z', + processing_error='Test error', + should_cycle=True, + original_exchange='test_exchange', + original_routing_key='test_key' + ) + assert headers['x-cycle-count'] == 2 # cycle_count + 1 + assert headers['x-first-failure'] == '2024-01-01T10:00:00Z' + assert headers['x-processing-error'] == 'Test error' + assert headers['x-retry-exhausted'] is False + assert headers['x-dead-letter-exchange'] == 'test_exchange' + assert headers['x-dead-letter-routing-key'] == 'test_key' + assert headers['custom-header'] == 'value' # Original header preserved