From 6e1a6fbc7d0d1dc563631d33f590e747dff804f9 Mon Sep 17 00:00:00 2001 From: JonNesvold Date: Sat, 11 Oct 2025 15:58:50 +0200 Subject: [PATCH 1/4] banan --- mrsal/superclass.py | 1449 ++++++++++++++++++++++--------------------- 1 file changed, 747 insertions(+), 702 deletions(-) diff --git a/mrsal/superclass.py b/mrsal/superclass.py index cf53230..087f01b 100644 --- a/mrsal/superclass.py +++ b/mrsal/superclass.py @@ -20,705 +20,750 @@ @dataclass class Mrsal: - """ - Mrsal creates a layer on top of Pika's core, providing methods to setup a RabbitMQ broker with multiple functionalities. - - Properties: - :param str host: Hostname or IP Address to connect to - :param int port: TCP port to connect to - :param pika.credentials.Credentials credentials: auth credentials - :param str virtual_host: RabbitMQ virtual host to use - :param bool verbose: If True then more INFO logs will be printed - :param int heartbeat: Controls RabbitMQ's server heartbeat timeout negotiation - :param int prefetch_count: Specifies a prefetch window in terms of whole messages. - :param bool ssl: Set this flag to true if you want to connect externally to the rabbit server. - :param int max_queue_length: Maximum number of messages in queue before overflow behavior triggers - :param int max_queue_length_bytes: Maximum queue size in bytes (optional) - :param str queue_overflow: Behavior when queue is full - "drop-head" or "reject-publish" - :param bool single_active_consumer: If True, only one consumer processes messages at a time - :param bool lazy_queue: If True, messages are stored on disk to save memory - """ - - host: str - port: int - credentials: tuple[str, str] - virtual_host: str - ssl: bool = False - verbose: bool = False - prefetch_count: int = 5 - heartbeat: int = 60 # sec - dlx_enable: bool = True - dlx_exchange_name = None - use_quorum_queues: bool = True - max_queue_length: int = 10000 # Good default for most use cases - max_queue_length_bytes: int | None = None # Optional memory limit - queue_overflow: str = "drop-head" # Drop old messages by default - single_active_consumer: bool = False # Allow parallel processing - lazy_queue: bool = False # Keep messages in RAM for speed - _connection = None - _channel = None - - def __post_init__(self) -> None: - if self.ssl: - tls_dict = { - 'crt': os.environ.get('RABBITMQ_CERT'), - 'key': os.environ.get('RABBITMQ_KEY'), - 'ca': os.environ.get('RABBITMQ_CAFILE') - } - # empty string handling - self.tls_dict = {cert: (env_var if env_var != '' else None) for cert, env_var in tls_dict.items()} - config.ValidateTLS(**self.tls_dict) - - def _setup_exchange_and_queue(self, - exchange_name: str, queue_name: str, exchange_type: str, - routing_key: str, exch_args: dict[str, str] | None = None, - queue_args: dict[str, str] | None = None, - bind_args: dict[str, str] | None = None, - exch_durable: bool = True, queue_durable: bool =True, - passive: bool = False, internal: bool = False, - auto_delete: bool = False, exclusive: bool = False, - dlx_enable: bool = True, dlx_exchange_name: str | None = None, - dlx_routing_key: str | None = None, use_quorum_queues: bool = True, - max_queue_length: int | None = None, - max_queue_length_bytes: int | None = None, - queue_overflow: str | None = None, - single_active_consumer: bool | None = None, - lazy_queue: bool | None = None - ) -> None: - - if queue_args is None: - queue_args = {} - if not passive: - if dlx_enable: - dlx_name = dlx_exchange_name or f"{exchange_name}.dlx" - dlx_routing = dlx_routing_key or routing_key - try: - self._declare_exchange( - exchange=dlx_name, - exchange_type=exchange_type, - arguments=None, - durable=exch_durable, - passive=passive, - internal=internal, - auto_delete=auto_delete - ) - if self.verbose: - log.info(f"Dead letter exchange {dlx_name} declared successfully") - - except MrsalSetupError as e: - log.warning(f"DLX {dlx_name} might already exist or failed to create: {e}") - - queue_args.update({ - 'x-dead-letter-exchange': dlx_name, - 'x-dead-letter-routing-key': dlx_routing - }) - - if use_quorum_queues: - queue_args.update({ - 'x-queue-type': 'quorum', - 'x-quorum-initial-group-size': 3 - }) - - if self.verbose: - log.info(f"Queue {queue_name} configured as quorum queue for enhanced reliability") - - # Add max length settings - if max_queue_length and max_queue_length > 0: - queue_args['x-max-length'] = max_queue_length - - if max_queue_length_bytes and max_queue_length_bytes > 0: - queue_args['x-max-length-bytes'] = max_queue_length_bytes - - # Add overflow behavior - if queue_overflow in ["drop-head", "reject-publish"]: - queue_args['x-overflow'] = queue_overflow - - # Add single active consumer - if single_active_consumer: - queue_args['x-single-active-consumer'] = True - - # Add lazy queue setting - if lazy_queue: - queue_args['x-queue-mode'] = 'lazy' - - if self.verbose and queue_args: - log.info(f"Queue {queue_name} configured with arguments: {queue_args}") - else: - queue_args = {} - if self.verbose: - log.info(f"Passive mode: checking existence of queue {queue_name} without configuration") - - - declare_exhange_dict = { - 'exchange': exchange_name, - 'exchange_type': exchange_type, - 'arguments': exch_args if not passive else None, - 'durable': exch_durable, - 'passive': passive, - 'internal': internal, - 'auto_delete': auto_delete - } - - declare_queue_dict = { - 'queue': queue_name, - 'arguments': queue_args, - 'durable': queue_durable, - 'passive': passive, - 'exclusive': exclusive, - 'auto_delete': auto_delete - } - - declare_queue_binding_dict = { - 'exchange': exchange_name, - 'queue': queue_name, - 'routing_key': routing_key, - 'arguments': bind_args - - } - try: - self._declare_exchange(**declare_exhange_dict) - self._declare_queue(**declare_queue_dict) - if not passive: - self._declare_queue_binding(**declare_queue_binding_dict) - self.auto_declare_ok = True - if not passive: - log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.") - else: - log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.") - except MrsalSetupError as e: - log.error(f'Splæt! I failed the declaration setup with {e}', exc_info=True) - self.auto_declare_ok = False - - async def _async_setup_exchange_and_queue(self, - exchange_name: str, queue_name: str, - routing_key: str, exchange_type: str, - exch_args: dict[str, str] | None = None, - queue_args: dict[str, str] | None = None, - bind_args: dict[str, str] | None = None, - exch_durable: bool = True, queue_durable: bool = True, - passive: bool = False, internal: bool = False, - auto_delete: bool = False, exclusive: bool = False, - dlx_enable: bool = True, - dlx_exchange_name: str | None = None, - dlx_routing_key: str | None = None, - use_quorum_queues: bool = True, - max_queue_length: int | None = None, - max_queue_length_bytes: int | None = None, - queue_overflow: str | None = None, - single_active_consumer: bool | None = None, - lazy_queue: bool | None = None - ) -> AioQueue | None: - """Setup exchange and queue with bindings asynchronously.""" - if not self._connection: - raise MrsalAbortedSetup("Oh my Oh my! Connection not found when trying to run the setup!") - - if queue_args is None: - queue_args = {} - - if not passive: - if dlx_enable: - dlx_name = dlx_exchange_name or f"{exchange_name}.dlx" - dlx_routing = dlx_routing_key or routing_key - - try: - await self._async_declare_exchange( - exchange=dlx_name, - exchange_type=exchange_type, - arguments=None, - durable=exch_durable, - passive=passive, - internal=internal, - auto_delete=auto_delete - ) - - if self.verbose: - log.info(f"Dead letter exchange {dlx_name} declared successfully") - - except MrsalSetupError as e: - log.warning(f"DLX {dlx_name} might already exist or failed to create: {e}") - - queue_args.update({ - 'x-dead-letter-exchange': dlx_name, - 'x-dead-letter-routing-key': dlx_routing - }) - - if use_quorum_queues: - queue_args.update({ - 'x-queue-type': 'quorum', - 'x-quorum-initial-group-size': 3 # Good default for 3+ node clusters - }) - - if self.verbose: - log.info(f"Queue {queue_name} configured as quorum queue for enhanced reliability") - - if max_queue_length and max_queue_length > 0: - queue_args['x-max-length'] = max_queue_length - - if max_queue_length_bytes and max_queue_length_bytes > 0: - queue_args['x-max-length-bytes'] = max_queue_length_bytes - - # Add overflow behavior - if queue_overflow and queue_overflow in ["drop-head", "reject-publish"]: - queue_args['x-overflow'] = queue_overflow - - # Add single active consumer - if single_active_consumer: - queue_args['x-single-active-consumer'] = True - - # Add lazy queue setting - if lazy_queue: - queue_args['x-queue-mode'] = 'lazy' - - if self.verbose and queue_args: - log.info(f"Queue {queue_name} configured with arguments: {queue_args}") - else: - queue_args = {} - if self.verbose: - log.info(f"Passive mode: checking existence of queue {queue_name} without configuration") - - - async_declare_exhange_dict = { - 'exchange': exchange_name, - 'exchange_type': exchange_type, - 'arguments': exch_args if not passive else None, - 'durable': exch_durable, - 'passive': passive, - 'internal': internal, - 'auto_delete': auto_delete - } - - async_declare_queue_dict = { - 'queue_name': queue_name, - 'arguments': queue_args, - 'durable': queue_durable, - 'exclusive': exclusive, - 'auto_delete': auto_delete, - 'passive': passive - } - - async_declare_queue_binding_dict = { - 'routing_key': routing_key, - 'arguments': bind_args - - } - - try: - # Declare exchange and queue - exchange = await self._async_declare_exchange(**async_declare_exhange_dict) - queue = await self._async_declare_queue(**async_declare_queue_dict) - if not passive: - await self._async_declare_queue_binding(queue=queue, exchange=exchange, **async_declare_queue_binding_dict) - self.auto_declare_ok = True - if not passive: - log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.") - else: - log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.") - if dlx_enable: - log.info(f"You have a dead letter exhange {dlx_name} for fault tolerance -- use it well young grasshopper!") - return queue - except MrsalSetupError as e: - log.error(f'Splæt! I failed the declaration setup with {e}', exc_info=True) - self.auto_declare_ok = False - - - def _declare_exchange(self, - exchange: str, exchange_type: str, - arguments: dict[str, str] | None, - durable: bool, passive: bool, - internal: bool, auto_delete: bool - ) -> None: - """This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class. - - If passive set, the server will reply with Declare-Ok if the exchange already exists with the same name, - and raise an error if not and if the exchange does not already exist, the server MUST raise a channel exception with reply code 404 (not found). - - :param str exchange: The exchange name - :param str exchange_type: The exchange type to use - :param bool passive: Perform a declare or just check to see if it exists - :param bool durable: Survive a reboot of RabbitMQ - :param bool auto_delete: Remove when no more queues are bound to it - :param bool internal: Can only be published to by other exchanges - :param dict arguments: Custom key/value pair arguments for the exchange - :rtype: `pika.frame.Method` having `method` attribute of type `spec.Exchange.DeclareOk` - """ - exchange_declare_info = f""" - exchange={exchange}, - exchange_type={exchange_type}, - durable={durable}, - passive={passive}, - internal={internal}, - auto_delete={auto_delete}, - arguments={arguments} - """ - if self.verbose: - log.info(f"Declaring exchange with: {exchange_declare_info}") - try: - self._channel.exchange_declare( - exchange=exchange, exchange_type=exchange_type, - arguments=arguments, durable=durable, - passive=passive, internal=internal, - auto_delete=auto_delete - ) - except Exception as e: - raise MrsalSetupError(f'Oooopise! I failed declaring the exchange with : {e}') - if self.verbose: - log.info("Exchange declared yo!") - - async def _async_declare_exchange(self, - exchange: str, - exchange_type: AioExchangeType, - arguments: dict[str, str] | None = None, - durable: bool = True, - passive: bool = False, - internal: bool = False, - auto_delete: bool = False) -> AioExchange: - """Declare a RabbitMQ exchange in async mode.""" - exchange_declare_info = f""" - exchange={exchange}, - exchange_type={exchange_type}, - durable={durable}, - passive={passive}, - internal={internal}, - auto_delete={auto_delete}, - arguments={arguments} - """ - if self.verbose: - print(f"Declaring exchange with: {exchange_declare_info}") - - try: - exchange_obj = await self._channel.declare_exchange( - name=exchange, - type=exchange_type, - durable=durable, - auto_delete=auto_delete, - internal=internal, - arguments=arguments - ) - return exchange_obj - except Exception as e: - raise MrsalSetupError(f"Failed to declare async exchange: {e}") - - def _declare_queue(self, - queue: str, arguments: dict[str, str] | None, - durable: bool, exclusive: bool, - auto_delete: bool, passive: bool - ) -> None: - """Declare queue, create if needed. This method creates or checks a queue. - When creating a new queue the client can specify various properties that control the durability of the queue and its contents, - and the level of sharing for the queue. - - Use an empty string as the queue name for the broker to auto-generate one. - Retrieve this auto-generated queue name from the returned `spec.Queue.DeclareOk` method frame. - - :param str queue: The queue name; if empty string, the broker will create a unique queue name - :param bool passive: Only check to see if the queue exists and raise `ChannelClosed` if it doesn't - :param bool durable: Survive reboots of the broker - :param bool exclusive: Only allow access by the current connection - :param bool auto_delete: Delete after consumer cancels or disconnects - :param dict arguments: Custom key/value arguments for the queue - :returns: Method frame from the Queue.Declare-ok response - :rtype: `pika.frame.Method` having `method` attribute of type `spec.Queue.DeclareOk` - """ - queue_declare_info = f""" - queue={queue}, - durable={durable}, - exclusive={exclusive}, - auto_delete={auto_delete}, - arguments={arguments} - """ - if self.verbose: - log.info(f"Declaring queue with: {queue_declare_info}") - - try: - self._channel.queue_declare(queue=queue, arguments=arguments, durable=durable, exclusive=exclusive, auto_delete=auto_delete, passive=passive) - except Exception as e: - raise MrsalSetupError(f'Oooopise! I failed declaring the queue with : {e}') - if self.verbose: - log.info(f"Queue declared yo") - - async def _async_declare_queue(self, - queue_name: str, - durable: bool = True, - exclusive: bool = False, - auto_delete: bool = False, - passive: bool = False, - arguments: dict[str, Any] | None = None) -> AioQueue: - """Declare a RabbitMQ queue asynchronously.""" - queue_declare_info = f""" - queue={queue_name}, - durable={durable}, - exclusive={exclusive}, - auto_delete={auto_delete}, - arguments={arguments} - """ - if self.verbose: - log.info(f"Declaring queue with: {queue_declare_info}") - - try: - queue_obj = await self._channel.declare_queue( - name=queue_name, - durable=durable, - exclusive=exclusive, - auto_delete=auto_delete, - arguments=arguments, - passive=passive - ) - return queue_obj - except Exception as e: - raise MrsalSetupError(f"Failed to declare async queue: {e}") - - def _declare_queue_binding(self, - exchange: str, queue: str, - routing_key: str | None, - arguments: dict[str, str] | None - ) -> None: - """Bind queue to exchange. - - :param str queue: The queue to bind to the exchange - :param str exchange: The source exchange to bind to - :param str routing_key: The routing key to bind on - :param dict arguments: Custom key/value pair arguments for the binding - - :returns: Method frame from the Queue.Bind-ok response - :rtype: `pika.frame.Method` having `method` attribute of type `spec.Queue.BindOk` - """ - if self.verbose: - log.info(f"Binding queue to exchange: queue={queue}, exchange={exchange}, routing_key={routing_key}") - - try: - self._channel.queue_bind(exchange=exchange, queue=queue, routing_key=routing_key, arguments=arguments) - if self.verbose: - log.info(f"The queue is bound to exchange successfully: queue={queue}, exchange={exchange}, routing_key={routing_key}") - except Exception as e: - raise MrsalSetupError(f'I failed binding the queue with : {e}') - if self.verbose: - log.info(f"Queue bound yo") - - async def _async_declare_queue_binding(self, - queue: AioQueue, - exchange: AioExchange, - routing_key: str | None, - arguments: dict[str, Any] | None = None) -> None: - """Bind the queue to the exchange asynchronously.""" - binding_info = f""" - queue={queue.name}, - exchange={exchange.name}, - routing_key={routing_key}, - arguments={arguments} - """ - if self.verbose: - log.info(f"Binding queue to exchange with: {binding_info}") - - try: - await queue.bind(exchange, routing_key=routing_key, arguments=arguments) - except Exception as e: - raise MrsalSetupError(f"Failed to bind async queue: {e}") - - def _ssl_setup(self) -> SSLContext: - """_ssl_setup is private method we are using to connect with rabbit server via signed certificates and some TLS settings. - - Parameters - ---------- - - Returns - ------- - SSLContext - - """ - context = ssl.create_default_context(cafile=self.tls_dict['ca']) - context.load_cert_chain(certfile=self.tls_dict['crt'], keyfile=self.tls_dict['key']) - return context - - def get_ssl_context(self, async_conn: bool = True) -> SSLOptions | SSLContext | None: - if self.ssl: - log.info("Setting up TLS connection") - context = self._ssl_setup() - # use_blocking is the same as sync - if not async_conn: - ssl_options = pika.SSLOptions(context, self.host) - return ssl_options - else: - return context - else: - return None - - def validate_payload(self, payload: Any, model: Type) -> None: - """ - Parses and validates the incoming message payload using the provided dataclass model. - :param payload: The message payload which could be of any type (str, bytes, dict, etc.). - :param model: The pydantic dataclass model class to validate against. - :return: An instance of the model if validation is successful, otherwise None. - """ - # If payload is bytes, decode it to a string - if isinstance(payload, bytes): - payload = payload.decode('utf-8') - - # If payload is a string, attempt to load it as JSON - if isinstance(payload, str): - payload = json.loads(payload) # Converts JSON string to a dictionary - - # Validate the payload against the provided model - if isinstance(payload, dict): - model(**payload) - else: - raise TypeError("Fool, we aint supporting this type yet {type(payload)}.. Bytes or str -- get it straight") - - def _get_retry_count(self, properties) -> int: - """Extract retry count from message headers.""" - if hasattr(properties, 'headers') and properties.headers: - return properties.headers.get('x-retry-count', 0) - return 0 - - def _has_dlx_configured(self, queue_name: str) -> bool: - """Check if the queue has a dead letter exchange configured.""" - return self.dlx_enable - - def _get_retry_cycle_info(self, properties) -> dict: - """Extract retry cycle information from message headers.""" - if not hasattr(properties, 'headers') or not properties.headers: - return {'cycle_count': 0, 'first_failure': None, 'total_elapsed': 0} - - headers = properties.headers - return { - 'cycle_count': headers.get('x-cycle-count', 0), - 'first_failure': headers.get('x-first-failure'), - 'total_elapsed': headers.get('x-total-elapsed', 0) - } - - def _should_continue_retry_cycles(self, retry_info: dict, enable_retry_cycles: bool, - max_retry_time_limit: int) -> bool: - """Check if message should continue retry cycles or go to permanent DLX.""" - if not enable_retry_cycles or not self.dlx_enable: - return False - - max_time_ms = max_retry_time_limit * 60 * 1000 - return retry_info['total_elapsed'] < max_time_ms - - def _create_retry_cycle_headers(self, original_headers: dict, cycle_count: int, - first_failure: str, processing_error: str, - should_cycle: bool, original_exchange: str, - original_routing_key: str) -> dict: - """Create headers for DLX message with retry cycle info.""" - headers = original_headers.copy() if original_headers else {} - now = datetime.now(timezone.utc).isoformat() - - # Calculate elapsed time - if first_failure: - try: - first_time = datetime.fromisoformat(first_failure.replace('Z', '')) - elapsed_ms = int((datetime.now(timezone.utc) - first_time).total_seconds() * 1000) - except: - elapsed_ms = 0 - else: - first_failure = now - elapsed_ms = 0 - - # Update retry cycle tracking - headers.update({ - 'x-cycle-count': cycle_count + 1, - 'x-first-failure': first_failure, - 'x-total-elapsed': elapsed_ms, - 'x-processing-error': processing_error, - 'x-retry-exhausted': not should_cycle - }) - - # If cycling, set TTL and routing back to original queue - if should_cycle: - headers.update({ - 'x-dead-letter-exchange': original_exchange, - 'x-dead-letter-routing-key': original_routing_key - }) - - return headers - - def _handle_dlx_with_retry_cycle_sync( - self, method_frame, properties, body, processing_error: str, - original_exchange: str, original_routing_key: str, - enable_retry_cycles: bool, retry_cycle_interval: int, - max_retry_time_limit: int, dlx_exchange_name: str | None): - """Base method for DLX handling with retry cycles.""" - # Get retry info - retry_info = self._get_retry_cycle_info(properties) - should_cycle = self._should_continue_retry_cycles(retry_info, enable_retry_cycles, max_retry_time_limit) - - # Get DLX info - dlx_name = dlx_exchange_name or f"{original_exchange}.dlx" - dlx_routing = original_routing_key - - # Create enhanced headers - original_headers = getattr(properties, 'headers', {}) or {} - enhanced_headers = self._create_retry_cycle_headers( - original_headers, retry_info['cycle_count'], retry_info['first_failure'], - processing_error, should_cycle, original_exchange, original_routing_key - ) - - # Create properties for DLX message - dlx_properties = { - 'headers': enhanced_headers, - 'delivery_mode': 2, # Persistent - 'content_type': getattr(properties, 'content_type', 'application/json') - } - - # Set TTL if cycling - if should_cycle: - ttl_ms = retry_cycle_interval * 60 * 1000 - dlx_properties['expiration'] = str(ttl_ms) - - # Call subclass-specific publish method - self._publish_to_dlx(dlx_name, dlx_routing, body, dlx_properties) - - # Log result - if should_cycle: - log.info(f"Message sent to DLX for retry cycle {retry_info['cycle_count'] + 1} " - f"(next retry in {retry_cycle_interval}m)") - else: - log.error(f"Message permanently failed after {retry_info['cycle_count']} cycles " - f"- staying in DLX for manual replay") - - async def _handle_dlx_with_retry_cycle_async( - self, message, properties, processing_error: str, - original_exchange: str, original_routing_key: str, - enable_retry_cycles: bool, retry_cycle_interval: int, - max_retry_time_limit: int, dlx_exchange_name: str | None): - """Base method for DLX handling with retry cycles.""" - # Get retry info - retry_info = self._get_retry_cycle_info(properties) - should_cycle = self._should_continue_retry_cycles(retry_info, enable_retry_cycles, max_retry_time_limit) - - # Get DLX info - dlx_name = dlx_exchange_name or f"{original_exchange}.dlx" - dlx_routing = original_routing_key - - # Create enhanced headers - original_headers = getattr(properties, 'headers', {}) or {} - enhanced_headers = self._create_retry_cycle_headers( - original_headers, retry_info['cycle_count'], retry_info['first_failure'], - processing_error, should_cycle, original_exchange, original_routing_key - ) - - # Create properties for DLX message - dlx_properties = { - 'headers': enhanced_headers, - 'delivery_mode': 2, # Persistent - 'content_type': getattr(properties, 'content_type', 'application/json') - } - - # Set TTL if cycling - if should_cycle: - ttl_ms = retry_cycle_interval * 60 * 1000 - dlx_properties['expiration'] = str(ttl_ms) - - # Call subclass-specific publish method - await self._publish_to_dlx(dlx_name, dlx_routing, message.body, dlx_properties) - - # Log result - if should_cycle: - log.info(f"Message sent to DLX for retry cycle {retry_info['cycle_count'] + 1} " - f"(next retry in {retry_cycle_interval}m)") - else: - log.error(f"Message permanently failed after {retry_info['cycle_count']} cycles " - f"- staying in DLX for manual replay") - - def _publish_to_dlx(self, dlx_exchange: str, routing_key: str, body: bytes, properties: dict): - """Abstract method - implemented by subclasses.""" - raise NotImplementedError("Subclasses must implement _publish_to_dlx") + """ + Mrsal creates a layer on top of Pika's core, providing methods to setup a RabbitMQ broker with multiple functionalities. + + Properties: + :param str host: Hostname or IP Address to connect to + :param int port: TCP port to connect to + :param pika.credentials.Credentials credentials: auth credentials + :param str virtual_host: RabbitMQ virtual host to use + :param bool verbose: If True then more INFO logs will be printed + :param int heartbeat: Controls RabbitMQ's server heartbeat timeout negotiation + :param int prefetch_count: Specifies a prefetch window in terms of whole messages. + :param bool ssl: Set this flag to true if you want to connect externally to the rabbit server. + :param int max_queue_length: Maximum number of messages in queue before overflow behavior triggers + :param int max_queue_length_bytes: Maximum queue size in bytes (optional) + :param str queue_overflow: Behavior when queue is full - "drop-head" or "reject-publish" + :param bool single_active_consumer: If True, only one consumer processes messages at a time + :param bool lazy_queue: If True, messages are stored on disk to save memory + """ + + host: str + port: int + credentials: tuple[str, str] + virtual_host: str + ssl: bool = False + verbose: bool = False + prefetch_count: int = 5 + heartbeat: int = 60 # sec + dlx_enable: bool = True + dlx_exchange_name = None + use_quorum_queues: bool = True + max_queue_length: int = 10000 # Good default for most use cases + max_queue_length_bytes: int | None = None # Optional memory limit + queue_overflow: str = "drop-head" # Drop old messages by default + single_active_consumer: bool = False # Allow parallel processing + lazy_queue: bool = False # Keep messages in RAM for speed + _connection = None + _channel = None + + def __post_init__(self) -> None: + if self.ssl: + tls_dict = { + 'crt': os.environ.get('RABBITMQ_CERT'), + 'key': os.environ.get('RABBITMQ_KEY'), + 'ca': os.environ.get('RABBITMQ_CAFILE') + } + # empty string handling + self.tls_dict = {cert: (env_var if env_var != '' else None) for cert, env_var in tls_dict.items()} + config.ValidateTLS(**self.tls_dict) + + def _setup_exchange_and_queue(self, + exchange_name: str, queue_name: str, exchange_type: str, + routing_key: str, exch_args: dict[str, str] | None = None, + queue_args: dict[str, str] | None = None, + bind_args: dict[str, str] | None = None, + exch_durable: bool = True, queue_durable: bool =True, + passive: bool = False, internal: bool = False, + auto_delete: bool = False, exclusive: bool = False, + dlx_enable: bool = True, dlx_exchange_name: str | None = None, + dlx_routing_key: str | None = None, use_quorum_queues: bool = True, + max_queue_length: int | None = None, + max_queue_length_bytes: int | None = None, + queue_overflow: str | None = None, + single_active_consumer: bool | None = None, + lazy_queue: bool | None = None + ) -> None: + + if queue_args is None: + queue_args = {} + if not passive: + if dlx_enable: + dlx_name = dlx_exchange_name or f"{exchange_name}.dlx" + dlx_routing = dlx_routing_key or routing_key + try: + self._declare_exchange( + exchange=dlx_name, + exchange_type=exchange_type, + arguments=None, + durable=exch_durable, + passive=passive, + internal=internal, + auto_delete=auto_delete + ) + if self.verbose: + log.info(f"Dead letter exchange {dlx_name} declared successfully") + + except MrsalSetupError as e: + log.warning(f"DLX {dlx_name} might already exist or failed to create: {e}") + + dlx_queue_name = f"{dlx_name}.queue" + try: + self._declare_queue( + queue=dlx_queue_name, + arguments=None, + durable=exch_durable, + passive=False, + exclusive=False, + auto_delete=False + ) + self._declare_queue_binding( + exchange=dlx_name, + queue=dlx_queue_name, + routing_key=dlx_routing, + arguments=None + ) + if self.verbose: + log.info("DLX queue {dlx_queue_name} declared and bound successfully") + except MrsalSetupError as e: + log.warning(f"DLX queue {dlx_queue_name} setup failed") + + queue_args.update({ + 'x-dead-letter-exchange': dlx_name, + 'x-dead-letter-routing-key': dlx_routing + }) + + if use_quorum_queues: + queue_args.update({ + 'x-queue-type': 'quorum', + 'x-quorum-initial-group-size': 3 + }) + + if self.verbose: + log.info(f"Queue {queue_name} configured as quorum queue for enhanced reliability") + + # Add max length settings + if max_queue_length and max_queue_length > 0: + queue_args['x-max-length'] = max_queue_length + + if max_queue_length_bytes and max_queue_length_bytes > 0: + queue_args['x-max-length-bytes'] = max_queue_length_bytes + + # Add overflow behavior + if queue_overflow in ["drop-head", "reject-publish"]: + queue_args['x-overflow'] = queue_overflow + + # Add single active consumer + if single_active_consumer: + queue_args['x-single-active-consumer'] = True + + # Add lazy queue setting + if lazy_queue: + queue_args['x-queue-mode'] = 'lazy' + + if self.verbose and queue_args: + log.info(f"Queue {queue_name} configured with arguments: {queue_args}") + else: + queue_args = {} + if self.verbose: + log.info(f"Passive mode: checking existence of queue {queue_name} without configuration") + + + declare_exhange_dict = { + 'exchange': exchange_name, + 'exchange_type': exchange_type, + 'arguments': exch_args if not passive else None, + 'durable': exch_durable, + 'passive': passive, + 'internal': internal, + 'auto_delete': auto_delete + } + + declare_queue_dict = { + 'queue': queue_name, + 'arguments': queue_args, + 'durable': queue_durable, + 'passive': passive, + 'exclusive': exclusive, + 'auto_delete': auto_delete + } + + declare_queue_binding_dict = { + 'exchange': exchange_name, + 'queue': queue_name, + 'routing_key': routing_key, + 'arguments': bind_args + + } + try: + self._declare_exchange(**declare_exhange_dict) + self._declare_queue(**declare_queue_dict) + if not passive: + self._declare_queue_binding(**declare_queue_binding_dict) + self.auto_declare_ok = True + if not passive: + log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.") + else: + log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.") + except MrsalSetupError as e: + log.error(f'Splæt! I failed the declaration setup with {e}', exc_info=True) + self.auto_declare_ok = False + + async def _async_setup_exchange_and_queue(self, + exchange_name: str, queue_name: str, + routing_key: str, exchange_type: str, + exch_args: dict[str, str] | None = None, + queue_args: dict[str, str] | None = None, + bind_args: dict[str, str] | None = None, + exch_durable: bool = True, queue_durable: bool = True, + passive: bool = False, internal: bool = False, + auto_delete: bool = False, exclusive: bool = False, + dlx_enable: bool = True, + dlx_exchange_name: str | None = None, + dlx_routing_key: str | None = None, + use_quorum_queues: bool = True, + max_queue_length: int | None = None, + max_queue_length_bytes: int | None = None, + queue_overflow: str | None = None, + single_active_consumer: bool | None = None, + lazy_queue: bool | None = None + ) -> AioQueue | None: + """Setup exchange and queue with bindings asynchronously.""" + if not self._connection: + raise MrsalAbortedSetup("Oh my Oh my! Connection not found when trying to run the setup!") + + if queue_args is None: + queue_args = {} + + if not passive: + if dlx_enable: + dlx_name = dlx_exchange_name or f"{exchange_name}.dlx" + dlx_routing = dlx_routing_key or routing_key + + try: + await self._async_declare_exchange( + exchange=dlx_name, + exchange_type=exchange_type, + arguments=None, + durable=exch_durable, + passive=passive, + internal=internal, + auto_delete=auto_delete + ) + + if self.verbose: + log.info(f"Dead letter exchange {dlx_name} declared successfully") + + except MrsalSetupError as e: + log.warning(f"DLX {dlx_name} might already exist or failed to create: {e}") + + dlx_queue_name = f"{dlx_name}.queue" + try: + dlx_queue = await self._async_declare_queue( + queue_name=dlx_queue_name, + arguments=None, + durable=exch_durable, + passive=False, + exclusive=False, + auto_delete=False + ) + + dlx_exchange_obj = await self._channel.get_exchange(dlx_name) + + await self._async_declare_queue_binding( + exchange=dlx_exchange_obj, + queue=dlx_queue_name, + routing_key=dlx_routing, + arguments=None + ) + if self.verbose: + log.info("DLX queue {dlx_queue_name} declared and bound successfully") + except MrsalSetupError as e: + log.warning(f"DLX queue {dlx_queue_name} setup failed") + + queue_args.update({ + 'x-dead-letter-exchange': dlx_name, + 'x-dead-letter-routing-key': dlx_routing + }) + + if use_quorum_queues: + queue_args.update({ + 'x-queue-type': 'quorum', + 'x-quorum-initial-group-size': 3 # Good default for 3+ node clusters + }) + + if self.verbose: + log.info(f"Queue {queue_name} configured as quorum queue for enhanced reliability") + + if max_queue_length and max_queue_length > 0: + queue_args['x-max-length'] = max_queue_length + + if max_queue_length_bytes and max_queue_length_bytes > 0: + queue_args['x-max-length-bytes'] = max_queue_length_bytes + + # Add overflow behavior + if queue_overflow and queue_overflow in ["drop-head", "reject-publish"]: + queue_args['x-overflow'] = queue_overflow + + # Add single active consumer + if single_active_consumer: + queue_args['x-single-active-consumer'] = True + + # Add lazy queue setting + if lazy_queue: + queue_args['x-queue-mode'] = 'lazy' + + if self.verbose and queue_args: + log.info(f"Queue {queue_name} configured with arguments: {queue_args}") + else: + queue_args = {} + if self.verbose: + log.info(f"Passive mode: checking existence of queue {queue_name} without configuration") + + + async_declare_exhange_dict = { + 'exchange': exchange_name, + 'exchange_type': exchange_type, + 'arguments': exch_args if not passive else None, + 'durable': exch_durable, + 'passive': passive, + 'internal': internal, + 'auto_delete': auto_delete + } + + async_declare_queue_dict = { + 'queue_name': queue_name, + 'arguments': queue_args, + 'durable': queue_durable, + 'exclusive': exclusive, + 'auto_delete': auto_delete, + 'passive': passive + } + + async_declare_queue_binding_dict = { + 'routing_key': routing_key, + 'arguments': bind_args + + } + + try: + # Declare exchange and queue + exchange = await self._async_declare_exchange(**async_declare_exhange_dict) + queue = await self._async_declare_queue(**async_declare_queue_dict) + if not passive: + await self._async_declare_queue_binding(queue=queue, exchange=exchange, **async_declare_queue_binding_dict) + self.auto_declare_ok = True + if not passive: + log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.") + else: + log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.") + if dlx_enable: + log.info(f"You have a dead letter exhange {dlx_name} for fault tolerance -- use it well young grasshopper!") + return queue + except MrsalSetupError as e: + log.error(f'Splæt! I failed the declaration setup with {e}', exc_info=True) + self.auto_declare_ok = False + + + def _declare_exchange(self, + exchange: str, exchange_type: str, + arguments: dict[str, str] | None, + durable: bool, passive: bool, + internal: bool, auto_delete: bool + ) -> None: + """This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class. + + If passive set, the server will reply with Declare-Ok if the exchange already exists with the same name, + and raise an error if not and if the exchange does not already exist, the server MUST raise a channel exception with reply code 404 (not found). + + :param str exchange: The exchange name + :param str exchange_type: The exchange type to use + :param bool passive: Perform a declare or just check to see if it exists + :param bool durable: Survive a reboot of RabbitMQ + :param bool auto_delete: Remove when no more queues are bound to it + :param bool internal: Can only be published to by other exchanges + :param dict arguments: Custom key/value pair arguments for the exchange + :rtype: `pika.frame.Method` having `method` attribute of type `spec.Exchange.DeclareOk` + """ + exchange_declare_info = f""" + exchange={exchange}, + exchange_type={exchange_type}, + durable={durable}, + passive={passive}, + internal={internal}, + auto_delete={auto_delete}, + arguments={arguments} + """ + if self.verbose: + log.info(f"Declaring exchange with: {exchange_declare_info}") + try: + self._channel.exchange_declare( + exchange=exchange, exchange_type=exchange_type, + arguments=arguments, durable=durable, + passive=passive, internal=internal, + auto_delete=auto_delete + ) + except Exception as e: + raise MrsalSetupError(f'Oooopise! I failed declaring the exchange with : {e}') + if self.verbose: + log.info("Exchange declared yo!") + + async def _async_declare_exchange(self, + exchange: str, + exchange_type: AioExchangeType, + arguments: dict[str, str] | None = None, + durable: bool = True, + passive: bool = False, + internal: bool = False, + auto_delete: bool = False) -> AioExchange: + """Declare a RabbitMQ exchange in async mode.""" + exchange_declare_info = f""" + exchange={exchange}, + exchange_type={exchange_type}, + durable={durable}, + passive={passive}, + internal={internal}, + auto_delete={auto_delete}, + arguments={arguments} + """ + if self.verbose: + print(f"Declaring exchange with: {exchange_declare_info}") + + try: + exchange_obj = await self._channel.declare_exchange( + name=exchange, + type=exchange_type, + durable=durable, + auto_delete=auto_delete, + internal=internal, + arguments=arguments + ) + return exchange_obj + except Exception as e: + raise MrsalSetupError(f"Failed to declare async exchange: {e}") + + def _declare_queue(self, + queue: str, arguments: dict[str, str] | None, + durable: bool, exclusive: bool, + auto_delete: bool, passive: bool + ) -> None: + """Declare queue, create if needed. This method creates or checks a queue. + When creating a new queue the client can specify various properties that control the durability of the queue and its contents, + and the level of sharing for the queue. + + Use an empty string as the queue name for the broker to auto-generate one. + Retrieve this auto-generated queue name from the returned `spec.Queue.DeclareOk` method frame. + + :param str queue: The queue name; if empty string, the broker will create a unique queue name + :param bool passive: Only check to see if the queue exists and raise `ChannelClosed` if it doesn't + :param bool durable: Survive reboots of the broker + :param bool exclusive: Only allow access by the current connection + :param bool auto_delete: Delete after consumer cancels or disconnects + :param dict arguments: Custom key/value arguments for the queue + :returns: Method frame from the Queue.Declare-ok response + :rtype: `pika.frame.Method` having `method` attribute of type `spec.Queue.DeclareOk` + """ + queue_declare_info = f""" + queue={queue}, + durable={durable}, + exclusive={exclusive}, + auto_delete={auto_delete}, + arguments={arguments} + """ + if self.verbose: + log.info(f"Declaring queue with: {queue_declare_info}") + + try: + self._channel.queue_declare(queue=queue, arguments=arguments, durable=durable, exclusive=exclusive, auto_delete=auto_delete, passive=passive) + except Exception as e: + raise MrsalSetupError(f'Oooopise! I failed declaring the queue with : {e}') + if self.verbose: + log.info(f"Queue declared yo") + + async def _async_declare_queue(self, + queue_name: str, + durable: bool = True, + exclusive: bool = False, + auto_delete: bool = False, + passive: bool = False, + arguments: dict[str, Any] | None = None) -> AioQueue: + """Declare a RabbitMQ queue asynchronously.""" + queue_declare_info = f""" + queue={queue_name}, + durable={durable}, + exclusive={exclusive}, + auto_delete={auto_delete}, + arguments={arguments} + """ + if self.verbose: + log.info(f"Declaring queue with: {queue_declare_info}") + + try: + queue_obj = await self._channel.declare_queue( + name=queue_name, + durable=durable, + exclusive=exclusive, + auto_delete=auto_delete, + arguments=arguments, + passive=passive + ) + return queue_obj + except Exception as e: + raise MrsalSetupError(f"Failed to declare async queue: {e}") + + def _declare_queue_binding(self, + exchange: str, queue: str, + routing_key: str | None, + arguments: dict[str, str] | None + ) -> None: + """Bind queue to exchange. + + :param str queue: The queue to bind to the exchange + :param str exchange: The source exchange to bind to + :param str routing_key: The routing key to bind on + :param dict arguments: Custom key/value pair arguments for the binding + + :returns: Method frame from the Queue.Bind-ok response + :rtype: `pika.frame.Method` having `method` attribute of type `spec.Queue.BindOk` + """ + if self.verbose: + log.info(f"Binding queue to exchange: queue={queue}, exchange={exchange}, routing_key={routing_key}") + + try: + self._channel.queue_bind(exchange=exchange, queue=queue, routing_key=routing_key, arguments=arguments) + if self.verbose: + log.info(f"The queue is bound to exchange successfully: queue={queue}, exchange={exchange}, routing_key={routing_key}") + except Exception as e: + raise MrsalSetupError(f'I failed binding the queue with : {e}') + if self.verbose: + log.info(f"Queue bound yo") + + async def _async_declare_queue_binding(self, + queue: AioQueue, + exchange: AioExchange, + routing_key: str | None, + arguments: dict[str, Any] | None = None) -> None: + """Bind the queue to the exchange asynchronously.""" + binding_info = f""" + queue={queue.name}, + exchange={exchange.name}, + routing_key={routing_key}, + arguments={arguments} + """ + if self.verbose: + log.info(f"Binding queue to exchange with: {binding_info}") + + try: + await queue.bind(exchange, routing_key=routing_key, arguments=arguments) + except Exception as e: + raise MrsalSetupError(f"Failed to bind async queue: {e}") + + def _ssl_setup(self) -> SSLContext: + """_ssl_setup is private method we are using to connect with rabbit server via signed certificates and some TLS settings. + + Parameters + ---------- + + Returns + ------- + SSLContext + + """ + context = ssl.create_default_context(cafile=self.tls_dict['ca']) + context.load_cert_chain(certfile=self.tls_dict['crt'], keyfile=self.tls_dict['key']) + return context + + def get_ssl_context(self, async_conn: bool = True) -> SSLOptions | SSLContext | None: + if self.ssl: + log.info("Setting up TLS connection") + context = self._ssl_setup() + # use_blocking is the same as sync + if not async_conn: + ssl_options = pika.SSLOptions(context, self.host) + return ssl_options + else: + return context + else: + return None + + def validate_payload(self, payload: Any, model: Type) -> None: + """ + Parses and validates the incoming message payload using the provided dataclass model. + :param payload: The message payload which could be of any type (str, bytes, dict, etc.). + :param model: The pydantic dataclass model class to validate against. + :return: An instance of the model if validation is successful, otherwise None. + """ + # If payload is bytes, decode it to a string + if isinstance(payload, bytes): + payload = payload.decode('utf-8') + + # If payload is a string, attempt to load it as JSON + if isinstance(payload, str): + payload = json.loads(payload) # Converts JSON string to a dictionary + + # Validate the payload against the provided model + if isinstance(payload, dict): + model(**payload) + else: + raise TypeError("Fool, we aint supporting this type yet {type(payload)}.. Bytes or str -- get it straight") + + def _get_retry_count(self, properties) -> int: + """Extract retry count from message headers.""" + if hasattr(properties, 'headers') and properties.headers: + return properties.headers.get('x-retry-count', 0) + return 0 + + def _has_dlx_configured(self, queue_name: str) -> bool: + """Check if the queue has a dead letter exchange configured.""" + return self.dlx_enable + + def _get_retry_cycle_info(self, properties) -> dict: + """Extract retry cycle information from message headers.""" + if not hasattr(properties, 'headers') or not properties.headers: + return {'cycle_count': 0, 'first_failure': None, 'total_elapsed': 0} + + headers = properties.headers + return { + 'cycle_count': headers.get('x-cycle-count', 0), + 'first_failure': headers.get('x-first-failure'), + 'total_elapsed': headers.get('x-total-elapsed', 0) + } + + def _should_continue_retry_cycles(self, retry_info: dict, enable_retry_cycles: bool, + max_retry_time_limit: int) -> bool: + """Check if message should continue retry cycles or go to permanent DLX.""" + if not enable_retry_cycles or not self.dlx_enable: + return False + + max_time_ms = max_retry_time_limit * 60 * 1000 + return retry_info['total_elapsed'] < max_time_ms + + def _create_retry_cycle_headers(self, original_headers: dict, cycle_count: int, + first_failure: str, processing_error: str, + should_cycle: bool, original_exchange: str, + original_routing_key: str) -> dict: + """Create headers for DLX message with retry cycle info.""" + headers = original_headers.copy() if original_headers else {} + now = datetime.now(timezone.utc).isoformat() + + # Calculate elapsed time + if first_failure: + try: + first_time = datetime.fromisoformat(first_failure.replace('Z', '')) + elapsed_ms = int((datetime.now(timezone.utc) - first_time).total_seconds() * 1000) + except: + elapsed_ms = 0 + else: + first_failure = now + elapsed_ms = 0 + + # Update retry cycle tracking + headers.update({ + 'x-cycle-count': cycle_count + 1, + 'x-first-failure': first_failure, + 'x-total-elapsed': elapsed_ms, + 'x-processing-error': processing_error, + 'x-retry-exhausted': not should_cycle + }) + + # If cycling, set TTL and routing back to original queue + if should_cycle: + headers.update({ + 'x-dead-letter-exchange': original_exchange, + 'x-dead-letter-routing-key': original_routing_key + }) + + return headers + + def _handle_dlx_with_retry_cycle_sync( + self, method_frame, properties, body, processing_error: str, + original_exchange: str, original_routing_key: str, + enable_retry_cycles: bool, retry_cycle_interval: int, + max_retry_time_limit: int, dlx_exchange_name: str | None): + """Base method for DLX handling with retry cycles.""" + # Get retry info + retry_info = self._get_retry_cycle_info(properties) + should_cycle = self._should_continue_retry_cycles(retry_info, enable_retry_cycles, max_retry_time_limit) + + # Get DLX info + dlx_name = dlx_exchange_name or f"{original_exchange}.dlx" + dlx_routing = original_routing_key + + # Create enhanced headers + original_headers = getattr(properties, 'headers', {}) or {} + enhanced_headers = self._create_retry_cycle_headers( + original_headers, retry_info['cycle_count'], retry_info['first_failure'], + processing_error, should_cycle, original_exchange, original_routing_key + ) + + # Create properties for DLX message + dlx_properties = { + 'headers': enhanced_headers, + 'delivery_mode': 2, # Persistent + 'content_type': getattr(properties, 'content_type', 'application/json') + } + + # Set TTL if cycling + if should_cycle: + ttl_ms = retry_cycle_interval * 60 * 1000 + dlx_properties['expiration'] = str(ttl_ms) + + # Call subclass-specific publish method + self._publish_to_dlx(dlx_name, dlx_routing, body, dlx_properties) + + # Log result + if should_cycle: + log.info(f"Message sent to DLX for retry cycle {retry_info['cycle_count'] + 1} " + f"(next retry in {retry_cycle_interval}m)") + else: + log.error(f"Message permanently failed after {retry_info['cycle_count']} cycles " + f"- staying in DLX for manual replay") + + async def _handle_dlx_with_retry_cycle_async( + self, message, properties, processing_error: str, + original_exchange: str, original_routing_key: str, + enable_retry_cycles: bool, retry_cycle_interval: int, + max_retry_time_limit: int, dlx_exchange_name: str | None): + """Base method for DLX handling with retry cycles.""" + # Get retry info + retry_info = self._get_retry_cycle_info(properties) + should_cycle = self._should_continue_retry_cycles(retry_info, enable_retry_cycles, max_retry_time_limit) + + # Get DLX info + dlx_name = dlx_exchange_name or f"{original_exchange}.dlx" + dlx_routing = original_routing_key + + # Create enhanced headers + original_headers = getattr(properties, 'headers', {}) or {} + enhanced_headers = self._create_retry_cycle_headers( + original_headers, retry_info['cycle_count'], retry_info['first_failure'], + processing_error, should_cycle, original_exchange, original_routing_key + ) + + # Create properties for DLX message + dlx_properties = { + 'headers': enhanced_headers, + 'delivery_mode': 2, # Persistent + 'content_type': getattr(properties, 'content_type', 'application/json') + } + + # Set TTL if cycling + if should_cycle: + ttl_ms = retry_cycle_interval * 60 * 1000 + dlx_properties['expiration'] = str(ttl_ms) + + # Call subclass-specific publish method + await self._publish_to_dlx(dlx_name, dlx_routing, message.body, dlx_properties) + + # Log result + if should_cycle: + log.info(f"Message sent to DLX for retry cycle {retry_info['cycle_count'] + 1} " + f"(next retry in {retry_cycle_interval}m)") + else: + log.error(f"Message permanently failed after {retry_info['cycle_count']} cycles " + f"- staying in DLX for manual replay") + + def _publish_to_dlx(self, dlx_exchange: str, routing_key: str, body: bytes, properties: dict): + """Abstract method - implemented by subclasses.""" + raise NotImplementedError("Subclasses must implement _publish_to_dlx") From 82dfba3e4fb5211118b56265cab5e27ababe6c3b Mon Sep 17 00:00:00 2001 From: JonNesvold Date: Sat, 11 Oct 2025 15:59:12 +0200 Subject: [PATCH 2/4] vbump --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 6a2db74..ec7e0da 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ license = "GPL-3.0-or-later" maintainers = ["Jon E Nesvold "] name = "mrsal" readme = "README.md" -version = "3.0.0" +version = "3.1.0" homepage = "https://github.com/NeoMedSys/mrsal" repository = "https://github.com/NeoMedSys/mrsal" documentation = "https://neomedsys.github.io/mrsal/" From ebcf1783150dbc2a6de3994fce9df02fb691fdaa Mon Sep 17 00:00:00 2001 From: JonNesvold Date: Sat, 11 Oct 2025 16:11:55 +0200 Subject: [PATCH 3/4] dlx queue setup --- mrsal/superclass.py | 2 +- reports/coverage/.coverage | Bin 53248 -> 0 bytes reports/coverage/coverage.xml | 1265 --------------------------------- reports/junit/junit.xml | 1 - 4 files changed, 1 insertion(+), 1267 deletions(-) delete mode 100644 reports/coverage/.coverage delete mode 100644 reports/coverage/coverage.xml delete mode 100644 reports/junit/junit.xml diff --git a/mrsal/superclass.py b/mrsal/superclass.py index 087f01b..f058289 100644 --- a/mrsal/superclass.py +++ b/mrsal/superclass.py @@ -273,7 +273,7 @@ async def _async_setup_exchange_and_queue(self, await self._async_declare_queue_binding( exchange=dlx_exchange_obj, - queue=dlx_queue_name, + queue=dlx_queue, routing_key=dlx_routing, arguments=None ) diff --git a/reports/coverage/.coverage b/reports/coverage/.coverage deleted file mode 100644 index e5b5b515f9d7d5e251bd1ee16a25c23235721b9b..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 53248 zcmeI4ZEO_B8OL{T?{0msOM}qWv8Y!`8)ISna3JNS5ym7YMTI&^QD4-+v3Iv;UwwCL zcK5I?-g2Z7B~oRz`Or#?81sutvbZxDdS~ z^mcQ0?K~KfzR89CKjfrsk#4Cq{9@#R&^zIL;JMICq7q0^1_B6x z00@A<|2Kis6Jaq{U(fH~V=Bptv zb!|ZIS9Lj)&8c#>kj|zQGh679&0f_lT{P4*Tj=N-Ep@D5n=Q{$iENsrOtptp^y}HY zq7TV^>QIYapj?}&4w}vdQle&i3ao^@q%7Vn>uN^T)j~=&94nS&)6K!=XZ8if*usVU z0o87kqNnH;s%i~gs5E6`h7_eVJPjKis~Rn`GTFkEMs^OGPHS~7A2&5Qmo3=M zEErics}*E*P)!w0HGSI#wlHXJAb3+-L%O!J25omF%XLkmQxU$6WVv`*I77x2G#6_{ zkX22>w#|iNK1s;GSIqab&LIh>VpE$*awb_*D>t_0zPU|bR@z(Doi1|LqmERuklj&K zCp*7Iu1L1n%_wLsGHV2Rz?VY)*y@EmM_#E88as0Ic`Kn5P0c<}Q0FJw=@9<@=jy~* zVH$jmboy%4sGq&)_;Ne`dd7qomqN*eJ^7WG!P4873_|okQL{bdpYnyi%zlBJ)iw=rYM205A2lGVOudyJvd0HL|SEtV-3Qf#OgtPLq`r>ZMGYFjzi znHD~4M zQFkYj*(OO>qB2XmW2Brz&1n_=vCf8>no};9XfG$~N}^YcEn38voDsq1CHD4JHG*BK z41sA2LG9-ED`u~JtL^ME8Q$dc$G);?Cc~KtiFS7MKl5FWZSsM7n|_`qefAcrOlDUq zCVx#GYTc>iXg29rvN{b}nuDDcKxPHg3Kdq8TG6!qmz~<~$=05Vy^ey)xyznPHQU8$ zct5TgMwXUUwrm+|+^cd^uVS!S$Y_$2nwC?Qg0nV|#72Ya%o?(7!>y}QM@5G*#B}uR zY_e@~7Z*0oZ=pfwjxS2&^GYFIUa#0P>}aLApAAk{k28sopxi`*(x4$aO{c50oXch` zSASbSF=1NsO4=FjtW&0F9OzYVQRl2ik1D2WX7j4dirB{3jy0eda!OYzlh)GaCSf)= zV7--SXGgY6u45nNN{zf2Te_4l70ZLsnP(CPE$Vs2HM@N3sdAEOnH7KOrU{om?M(F( z#%>A@cIwo?CO4-ClMV&9RQa<`u$gV}d1;KJ7Xk=?00@8p2!H?xfB*=900@8p2!O!+ zBOve|US#`!k93xkKA-~v2!H?xfB*=900@8p2!H?xfB*=9z+Fio>=71A?k_x+E#QTQ z`tol8R<^Y-Yg-wlR(YfgoOD6@%~Gr)#C%2-WSj7hLS5E3)-Ol$MOE5O<^xrT0DDc zQ#DK@&W;INNn#*t_9imLTrNS+2q~%wdY(Zh+e3d&?+B4LVUF6;xxs`^&%zlZ1?z&O zz&A$)siI+O`2^`94eJA>AuvY`^h}LZA3cvIVdf0d(N#w}q&wA7(8_9-i2~O%yB4QY zCi$)K3!GMqgi)kt0i|+^!OUIZt5HfFOsW0$LqV97w??U?VyJm7t=ec-3IbQG#X=>& zqd#sGldh5LJ$KMAiZN73B_@GB$O~NG?3Fvd%7C&IHIv={kA$ys(&N&-(cRI1L=T6r zN++a)(T5^eBCkdY5&141<}e!sKmY_l00ck)1V8`;KmY_du0`0&&%R5l?EfR1y`{yU zQo8p2e|Vj@B+OOY)ct>`(_8Y*RYBGMKlq5Z6qu`qY5V`c25(8aV;z&*)w*_}=To#y z+y9FX30h4ORr`N`tx{F{e_yRqRr`PMDxp}Dh12){!kzTXwEe&5L7}gvuXd*F|MC1k zOo<=>0w4eaAOHd&00JNY0w4eaATX~92=sG-2)qB!OXD295I_I~KmY_l00ck)1V8`; zKmY_l00ibC0YQia*!}--w3CzmEsaPeX;9Lnu;hvUHF_fY)96!_fdB#^00JNY0w4ea zAOHd&00JNY0*pX?*vki2gkQRB?RxRgjeAGm>b`l_dhC4TjdS9f5M}KQwH-Qg^18MA z*>_)<==gDLn|8^%|JjS1t+)D)zSr^O#S_-5F>CiL$MQEj{x~{vYTp;f2KK!6RNL|( z$@T`ftQYIt?u5vx%f8`)&7+&eaM${K#RF>`Zyek?U|{iG)p zAfe9({&wNy(OsSg>geR_b$chSUU~T7q4UG$`@`3+cdz+m_}aDMKi#}><7XWm8;(xA z7pfDf+&$v?f7UC0A0H^K_m9Ufa@@G}VUJ}^So^O&df$D#HGX4Uu&kdPwY>k@XrHeh zH9itJYxGC^tdHYsx#RKYk0mC4Bg&7x5x<{Xz^XmB z)oQQkqskWeGJes^65dh%+XAa}nJ~ehS@TufrOpmmmPM&Qj9b4xG~s*t^oP55oqqk? zR*Qdn#j#7rU#H6co_K@1HrnwT$3@A~!yo zd)7|e_|MCGPMP1@+dA^W+Z}Os&_v!F2;4Jzm1p<=!_r|+8k4R`m!v40 - - - - - /home/runner/work/mrsal/mrsal - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/reports/junit/junit.xml b/reports/junit/junit.xml deleted file mode 100644 index ec42f9e..0000000 --- a/reports/junit/junit.xml +++ /dev/null @@ -1 +0,0 @@ - \ No newline at end of file From 54c99f3d6f0205d3e8769e5eba1ccea845f865ca Mon Sep 17 00:00:00 2001 From: JonNesvold Date: Sat, 11 Oct 2025 14:14:18 +0000 Subject: [PATCH 4/4] Apply automatic changes --- README.md | 2 +- reports/coverage/.coverage | Bin 0 -> 53248 bytes reports/coverage/coverage.xml | 1282 +++++++++++++++++++++++++++++++++ reports/junit/junit.xml | 1 + 4 files changed, 1284 insertions(+), 1 deletion(-) create mode 100644 reports/coverage/.coverage create mode 100644 reports/coverage/coverage.xml create mode 100644 reports/junit/junit.xml diff --git a/README.md b/README.md index 6e604cc..5f32fc8 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # MRSAL AMQP -[![Release](https://img.shields.io/badge/release-3.0.0-blue.svg)](https://pypi.org/project/mrsal/) +[![Release](https://img.shields.io/badge/release-3.1.0-blue.svg)](https://pypi.org/project/mrsal/) [![Python 3.10+](https://img.shields.io/badge/python-3.10%7C3.11%7C3.12-blue.svg)](https://www.python.org/downloads/) [![Mrsal Workflow](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml/badge.svg?branch=main)](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml) [![Coverage](https://neomedsys.github.io/mrsal/reports/badges/coverage-badge.svg)](https://neomedsys.github.io/mrsal/reports/coverage/htmlcov/) diff --git a/reports/coverage/.coverage b/reports/coverage/.coverage new file mode 100644 index 0000000000000000000000000000000000000000..0213a6713a56591d336234a903a08c0c5a9c7a73 GIT binary patch literal 53248 zcmeI4ZEO_B8OL{T?{0msOM{T>Skx;ujj`|x4y2?MaX>;6RHsf-)U;}0vp(kvELDoy4-JtjRY*w*NPS6^lG0QSsbo>yk|Gj9Xn^o$?CmqV zw?3c2&Y2I6Aph0f&g{%Q&ph*+XJ&40@Af-Sw52UgPU%KgwdCd80*>dowX)1{T!h{+ zdV7aR5x;jqAwMNPQ#8UgZ`&J_zRN{}&vVk2Si95|eKB@#bQ4J+20|y9z z00@A<7n?xgiKtjzQ^W7xX{j9<%`()aX1M3rgHLW)w`qgCY2Es^4YHdiFA2+ZG&RZV zWJB+hdo@E&r8AnG&UL1fs+G=l$yT@KmCl=5CtK(!jg~rAu-TDkF(uteQkK?5Dte7{ zRy79X9&Mn}DNwA<()ukofs|CT35!?S~7v3lV` zevjs~Ni~x6ij=j65=u?km?A|<-RNZ2cNl6e*{zw4vO3wqq)vABTW)I&J=*=G8tJlKM8yP`k9| zVy-(aeBO>kuG1O{R&S`A$#o`OKxv(fh6^=8vAVXF-yL$?sMuS@KrC}1uP1LZje1#G zO*Qj|H<8RXNqQ2cS<)LL#T;r*lNhXCS36U4ish8nVx+1d`o-!+i}->&BG|md-hr}4 zu$0men6?m9Zho(7b;~!}&XURS`arPyn~P>LoSC4svUBj+ANU-T_tZG_voz_mw@_v> zODUQBEp4D_o0_57q*qNFG-PQGc2@wI73@^0u-c*LEysV^rQ@D#?J3#oDkxoh?5R|P zQ=EqP#gR+{_S;AHie`wi4XCWKmY_l00ck)1V8`;KmY_l00cnbwj>bs35zA~7aq$N z@Iq}(@izb~n_HJPuMAVGe9~!7IxT&ATQ(3zg8&GC00@8p2!H?xfB*=900@8p2uu=) z3X6H~YXDzZs0|gr1z`98W6|@RbVT}z)GpP;PsU%4Z;!8t{V%pZmW|a>77h>q0T2KI z5C8!X009sH0T8%N2`r5X+~$^UJ*%}C`CLvjTKaUOrzLBcYNmM3>HW?h$9o6bqJFNh zc=pnkW?E(oJ1dS7r7vxDE2(@YqtG)#@|r@=Gf1Vo=+Ehi2x$}MsI4>8uNd?!oB>j> zHcScvb5xMbo0gtcNDpaPA0iE*IclJ1Y9xE;c{GZZF-gaUD$*g{s*apqRI^MJxUSi? zxScY|Z$(hx^hzYmJUt62nNdwvbLq!le8aN_D8Fmeo5o2hB=B;PRDN zsAjkJwwU=2&&c(@Tj&?n9LOb=Nuc-f0@pKp<*u(Xpe#nsWcUAL(F>gPxO8`XNBsTx z{^$kiZE0`({@5q6S7N!Ce47q)m<<9T00JNY0w4eaAOHd&00JD>C~W3u-zAmy|FOsX zg~gvyy7K*hbgjQ2%vIae{eNVgzYv(Kg0lU8_(6XmG*=DN_Wz-W{RQckbxdwot6GJw z&(Jb$|1aJz=oLwn?f-+7N|o*Z1C>gZ?f?A`2>FUEoWB1TZlzzQ?f-rE2|X2kwKHY^ zkLUkkN(2EA009sH0T2KI5C8!X009sHfjgRjKtC6VvHSnLbe*FY4iEqV5C8!X009sH z0T2KI5C8!X0D(J@fFQ&|?EZf=zM7LROCwT2>X&p$jE~3PjlUWHdHgBLzySgv00JNY z0w4eaAOHd&00JQJg(pxG_4A<>(S2v_?JvGtx9i#)?c<~NBd6-Fo)lL_C~I4!dHBH5 zOZJZE-g{vp@w4hJ`bYLxpF8uI{YKBh4-!wFdE0(q%--?xq3n3#@7G3-4c;@>xAWB> zHZKp8YbwP9z>a zIPpQGN~CgkiKqTslL`j-P@ye&z2ywYUAND5+4h9J+kU-w-=h!RGdi3Y9vrpp-(DUb zP3-*l>Wxm)BZKi>+^CW`e@>bBlgM4s&R_n^p)1_`Z(i2?oNw*nBUQiL`HO?0BSXJB zdgSC*>ajDTh+qNn8x9j#FhbIEh z96z^X`|;OKZnpVnRvh~1@M~+h!~aoU=Pq7LyvlKLGIzgiOABAHhezB0{C)1giO_cL z_?bbAzuQMueBHNZ;_8)`b{?~y+SN4j@mq-&cG5-O9}3-d?E=s4|3{_$oHQm~l+H@0 zq!ZFVrFW#aq(4Z%mwrbXI6wddKmY_l00ck)1V8`;KmY_l00iclK#+Fyd^G9=k%$w7 z!%h$iIYCvG6NsV{1cOcx2snY??*xM21U{diz5;OW|8ooGxf!Sq1V8`;KmY_l00ck) z1V8`;KmY_lU|tEZ`~TSg&nt3N1_B@e0w4eaAOHd&00JNY0w4ea^G<;M{~!DRdB=_l bK>!3m00ck)1V8`;KmY_l00cl_UJ3jk + + + + + /home/runner/work/mrsal/mrsal + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/reports/junit/junit.xml b/reports/junit/junit.xml new file mode 100644 index 0000000..360b3c9 --- /dev/null +++ b/reports/junit/junit.xml @@ -0,0 +1 @@ + \ No newline at end of file