diff --git a/README.md b/README.md index 5f32fc8..5a52673 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # MRSAL AMQP -[![Release](https://img.shields.io/badge/release-3.1.0-blue.svg)](https://pypi.org/project/mrsal/) +[![Release](https://img.shields.io/badge/release-3.2.0-blue.svg)](https://pypi.org/project/mrsal/) [![Python 3.10+](https://img.shields.io/badge/python-3.10%7C3.11%7C3.12-blue.svg)](https://www.python.org/downloads/) [![Mrsal Workflow](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml/badge.svg?branch=main)](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml) [![Coverage](https://neomedsys.github.io/mrsal/reports/badges/coverage-badge.svg)](https://neomedsys.github.io/mrsal/reports/coverage/htmlcov/) diff --git a/mrsal/amqp/subclass.py b/mrsal/amqp/subclass.py index d5f0d69..de56031 100644 --- a/mrsal/amqp/subclass.py +++ b/mrsal/amqp/subclass.py @@ -3,6 +3,8 @@ import pika import json import logging +import threading +from functools import partial from mrsal.exceptions import MrsalAbortedSetup, MrsalNoAsyncioLoopError from logging import WARNING from pika.exceptions import ( @@ -82,6 +84,77 @@ def setup_blocking_connection(self) -> None: except Exception as e: log.error(f"Unexpected error caught: {e}") + def _schedule_threadsafe(self, func: Callable, threaded: bool, *args, **kwargs) -> None: + """ + Executes an AMQP operation safely based on the threading mode. + """ + if threaded: + cb = partial(func, *args, **kwargs) + self._connection.add_callback_threadsafe(cb) + else: + func(*args, **kwargs) + + def _process_single_message(self, method_frame, properties, body, runtime_config: dict) -> None: + """ + Worker method to process a single message. + Accepts a config dict to avoid an explosion of arguments. + """ + auto_ack = runtime_config.get('auto_ack') + threaded = runtime_config.get('threaded') + callback = runtime_config.get('callback') + callback_args = runtime_config.get('callback_args') + payload_model = runtime_config.get('payload_model') + dlx_enable = runtime_config.get('dlx_enable') + enable_retry_cycles = runtime_config.get('enable_retry_cycles') + + app_id = properties.app_id if hasattr(properties, 'app_id') else 'no AppID' + msg_id = properties.message_id if hasattr(properties, 'message_id') else 'no MsgID' + delivery_tag = method_frame.delivery_tag + + current_retry = properties.headers.get('x-delivery-count', 0) if properties and properties.headers else 0 + + if self.verbose: + log.info(f"Processing message {msg_id} from {app_id} (Retry: {current_retry})") + + should_process = True + if payload_model: + try: + self.validate_payload(body, payload_model) + except (ValidationError, json.JSONDecodeError, UnicodeDecodeError, TypeError) as e: + log.error(f"Payload validation failed for {msg_id}: {e}") + should_process = False + + if callback and should_process: + try: + if callback_args: + callback(*callback_args, method_frame, properties, body) + else: + callback(method_frame, properties, body) + except Exception as e: + log.error(f"Oh lordy lord, payload validation failed for your specific model requirements: {e}") + should_process = False + + if not should_process and not auto_ack: + if dlx_enable and enable_retry_cycles: + self._schedule_threadsafe( + self._publish_to_dlx_with_retry_cycle, threaded, + method_frame, properties, body, "Callback failed", + runtime_config['exchange_name'], runtime_config['routing_key'], + enable_retry_cycles, runtime_config['retry_cycle_interval'], + runtime_config['max_retry_time_limit'], runtime_config['dlx_exchange_name'] + ) + elif dlx_enable: + log.warning(f"Message {msg_id} sent to dead letter exchange after {current_retry} retries") + self._schedule_threadsafe(self._channel.basic_nack, threaded, delivery_tag=delivery_tag, requeue=False) + else: + log.warning(f"No dead letter exchange declared for {runtime_config['queue_name']}, proceeding to drop the message -- reflect on your life choices! byebye") + log.info(f"Dropped message content: {body}") + self._schedule_threadsafe(self._channel.basic_nack, threaded, delivery_tag=delivery_tag, requeue=False) + + elif not auto_ack and should_process: + log.info(f'Message ({msg_id}) from {app_id} received and properly processed -- now dance the funky chicken') + self._schedule_threadsafe(self._channel.basic_ack, threaded, delivery_tag=delivery_tag) + @retry( retry=retry_if_exception_type(( AMQPConnectionError, @@ -93,30 +166,32 @@ def setup_blocking_connection(self) -> None: wait=wait_fixed(2), before_sleep=before_sleep_log(log, WARNING) ) - def start_consumer(self, - queue_name: str, - callback: Callable | None = None, - callback_args: dict[str, str | int | float | bool] | None = None, - auto_ack: bool = True, - inactivity_timeout: int | None = None, # just let conusmer wait patiently damn it - auto_declare: bool = True, - exchange_name: str | None = None, - exchange_type: str | None = None, - routing_key: str | None = None, - payload_model: Type | None = None, - dlx_enable: bool = True, - dlx_exchange_name: str | None = None, - dlx_routing_key: str | None = None, - use_quorum_queues: bool = True, - enable_retry_cycles: bool = True, - retry_cycle_interval: int = 10, # Minutes between cycles - max_retry_time_limit: int = 60, # Minutes total before permanent DLX - max_queue_length: int | None = None, - max_queue_length_bytes: int | None = None, - queue_overflow: str | None = None, - single_active_consumer: bool | None = None, - lazy_queue: bool | None = None, - ) -> None: + def start_consumer( + self, + queue_name: str, + callback: Callable | None = None, + callback_args: dict[str, str | int | float | bool] | None = None, + auto_ack: bool = True, + inactivity_timeout: int | None = None, + auto_declare: bool = True, + exchange_name: str | None = None, + exchange_type: str | None = None, + routing_key: str | None = None, + payload_model: Type | None = None, + dlx_enable: bool = True, + dlx_exchange_name: str | None = None, + dlx_routing_key: str | None = None, + use_quorum_queues: bool = True, + enable_retry_cycles: bool = True, + retry_cycle_interval: int = 10, + max_retry_time_limit: int = 60, + max_queue_length: int | None = None, + max_queue_length_bytes: int | None = None, + queue_overflow: str | None = None, + single_active_consumer: bool | None = None, + lazy_queue: bool | None = None, + threaded: bool = False + ) -> None: """ Start the consumer using blocking setup. :param str queue_name: The queue to consume from @@ -144,7 +219,6 @@ def start_consumer(self, :param bool single_active_consumer: Only one consumer processes at a time :param bool lazy_queue: Store messages on disk to save memory """ - # Connect and start the I/O loop self.setup_blocking_connection() if auto_declare: @@ -165,95 +239,55 @@ def start_consumer(self, queue_overflow=queue_overflow, single_active_consumer=single_active_consumer, lazy_queue=lazy_queue - ) - + ) + if not self.auto_declare_ok: - raise MrsalAbortedSetup('Auto declaration for the connection setup failed and is aborted') - - # Log consumer configuration - consumer_config = { - "queue": queue_name, - "exchange": exchange_name, - "max_length": max_queue_length or self.max_queue_length, - "overflow": queue_overflow or self.queue_overflow, - "single_consumer": single_active_consumer if single_active_consumer is not None else self.single_active_consumer, - "lazy": lazy_queue if lazy_queue is not None else self.lazy_queue + raise MrsalAbortedSetup('Auto declaration failed') + + runtime_config = { + 'callback': callback, + 'callback_args': callback_args, + 'auto_ack': auto_ack, + 'payload_model': payload_model, + 'threaded': threaded, + 'dlx_enable': dlx_enable, + 'enable_retry_cycles': enable_retry_cycles, + 'retry_cycle_interval': retry_cycle_interval, + 'max_retry_time_limit': max_retry_time_limit, + 'exchange_name': exchange_name, + 'routing_key': routing_key, + 'dlx_exchange_name': dlx_exchange_name, + 'queue_name': queue_name, } - - log.info(f"Straight out of the swamps -- consumer boi listening with config: {consumer_config}") + + log.info(f""" + Straight out of the swamps -- consumer boi listening with config: + auto_ack: {auto_ack} + threaded: {threaded} + DLX: {dlx_enable} + retry cycles: {enable_retry_cycles} + retry interval: {retry_cycle_interval} + max retry time: {max_retry_time_limit} + DLX name: {dlx_exchange_name} + """) try: for method_frame, properties, body in self._channel.consume( - queue=queue_name, auto_ack=auto_ack, inactivity_timeout=inactivity_timeout): + queue=queue_name, auto_ack=auto_ack, inactivity_timeout=inactivity_timeout): + if method_frame: - if properties: - app_id = properties.app_id if hasattr(properties, 'app_id') else 'no AppID given' - msg_id = properties.message_id if hasattr(properties, 'message_id') else 'no msgID given' - delivery_tag = method_frame.delivery_tag - - if self.verbose: - log.info( - f""" - Message received with: - - Method Frame: {method_frame} - - Redelivery: {method_frame.redelivered} - - Exchange: {method_frame.exchange} - - Routing Key: {method_frame.routing_key} - - Delivery Tag: {method_frame.delivery_tag} - - Properties: {properties} - - Auto Ack: {auto_ack} - """ - ) - if auto_ack: - log.info(f'I successfully received a message with AutoAck from: {app_id} with messageID: {msg_id}') - - current_retry = properties.headers.get('x-delivery-count', 0) if properties.headers else 0 - log.info(f"Current retry is: {current_retry}") - should_process = True - - if payload_model: - try: - self.validate_payload(body, payload_model) - except (ValidationError, json.JSONDecodeError, UnicodeDecodeError, TypeError) as e: - log.error(f"Oh lordy lord, payload validation failed for your specific model requirements: {e}") - should_process = False - - if callback and should_process: - try: - if callback_args: - callback(*callback_args, method_frame, properties, body) - else: - callback(method_frame, properties, body) - except Exception as e: - log.error(f"Callback method failure: {e}") - log.error(f"Oh lordy lord message {msg_id} from {app_id} failed while running callback") - should_process = False - - if not should_process and not auto_ack: - if dlx_enable and enable_retry_cycles: - # Use retry cycle logic - self._publish_to_dlx_with_retry_cycle( - method_frame, properties, body, "Callback processing failed", - exchange_name, routing_key, enable_retry_cycles, - retry_cycle_interval, max_retry_time_limit, dlx_exchange_name - ) - elif dlx_enable: - # Original DLX behavior - self._channel.basic_nack(delivery_tag=delivery_tag, requeue=False) - log.warning(f"Message {msg_id} sent to dead letter exchange after {current_retry} retries") - log.info(f"Its the fault of polictial ideology imposing! Strangle an influencer if it makes you feel better!") - else: - self._channel.basic_nack(delivery_tag=delivery_tag, requeue=False) - log.warning(f"No dead letter exchange declared for {queue_name}, proceeding to drop the message -- reflect on your life choices! byebye") - log.info(f"Dropped message content: {body}") - continue - - if not auto_ack and should_process: - log.info(f'Message ({msg_id}) from {app_id} received and properly processed -- now dance the funky chicken') - self._channel.basic_ack(delivery_tag=delivery_tag) - + if threaded: + log.info("Threaded processes started to ensure heartbeat during long processes -- sauber!") + t = threading.Thread( + target=self._process_single_message, + args=(method_frame, properties, body, runtime_config), + daemon=True + ) + t.start() + else: + self._process_single_message(method_frame, properties, body, runtime_config) except (AMQPConnectionError, ConnectionClosedByBroker, StreamLostError) as e: - log.error(f"Ooooooopsie! I caught a connection error while consuming: {e}") + log.error(f"Ooooooopsie! I caught a connection error while consuming messaiges: {e}") raise except Exception as e: log.error(f'Oh lordy lord! I failed consuming ze messaj with: {e}') diff --git a/pyproject.toml b/pyproject.toml index 2211b9b..0add1c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ license = "GPL-3.0-or-later" maintainers = ["Jon E Nesvold "] name = "mrsal" readme = "README.md" -version = "3.1.1" +version = "3.2.0" homepage = "https://github.com/NeoMedSys/mrsal" repository = "https://github.com/NeoMedSys/mrsal" documentation = "https://neomedsys.github.io/mrsal/" diff --git a/reports/coverage/.coverage b/reports/coverage/.coverage new file mode 100644 index 0000000..842f4cd Binary files /dev/null and b/reports/coverage/.coverage differ diff --git a/reports/coverage/coverage.xml b/reports/coverage/coverage.xml new file mode 100644 index 0000000..7828ba1 --- /dev/null +++ b/reports/coverage/coverage.xml @@ -0,0 +1,1373 @@ + + + + + + /home/runner/work/mrsal/mrsal + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/reports/junit/junit.xml b/reports/junit/junit.xml new file mode 100644 index 0000000..532b12f --- /dev/null +++ b/reports/junit/junit.xml @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/tests/test_mrsal_threaded_consumer.py b/tests/test_mrsal_threaded_consumer.py new file mode 100644 index 0000000..96b1977 --- /dev/null +++ b/tests/test_mrsal_threaded_consumer.py @@ -0,0 +1,175 @@ +import pytest +import threading +import time +from unittest.mock import MagicMock, patch, ANY +from mrsal.amqp.subclass import MrsalBlockingAMQP +from pydantic.dataclasses import dataclass + +@dataclass +class ExpectedPayload: + id: int + name: str + active: bool + +# --- Fixtures --- + +@pytest.fixture +def mock_consumer(): + """Create a mock consumer with mocked connection and channel.""" + consumer = MrsalBlockingAMQP( + host="localhost", + port=5672, + credentials=("user", "password"), + virtual_host="testboi", + ssl=False, + verbose=True, + prefetch_count=1, + heartbeat=60 + ) + + # Mock critical Pika objects + consumer._connection = MagicMock() + consumer._channel = MagicMock() + consumer.auto_declare_ok = True + + # Mock setup methods to prevent actual network calls + consumer.setup_blocking_connection = MagicMock() + consumer._setup_exchange_and_queue = MagicMock() + + return consumer + +# --- Tests --- + +def test_threaded_flag_spawns_thread(mock_consumer): + """Test that threaded=True actually starts a new thread.""" + + # 1. Setup a generator to simulate one message then stop + mock_method = MagicMock() + mock_method.delivery_tag = 1 + mock_props = MagicMock() + mock_props.headers = {} + mock_body = b'{"id": 1, "name": "ThreadTest", "active": true}' + + # Simulate consume yielding one message + mock_consumer._channel.consume.return_value = [(mock_method, mock_props, mock_body)] + + # 2. Mock threading.Thread to verify it gets called + with patch('threading.Thread') as mock_thread_cls: + mock_thread_instance = MagicMock() + mock_thread_cls.return_value = mock_thread_instance + + # 3. Run start_consumer with threaded=True + mock_consumer.start_consumer( + queue_name="test_queue", + callback=MagicMock(), + threaded=True, # <--- The Flag + auto_ack=False, + auto_declare=False, # Skip setup for unit test speed + payload_model=None + ) + + # 4. Verify a thread was created and started + assert mock_thread_cls.call_count == 1 + mock_thread_instance.start.assert_called_once() + + # Verify target is the internal worker method + call_args = mock_thread_cls.call_args[1] # kwargs + assert call_args['target'] == mock_consumer._process_single_message + assert call_args['daemon'] is True + +def test_blocking_mode_does_not_spawn_thread(mock_consumer): + """Test that threaded=False (default) runs in the main thread.""" + + mock_method = MagicMock() + mock_props = MagicMock() + mock_body = b'test' + mock_consumer._channel.consume.return_value = [(mock_method, mock_props, mock_body)] + + with patch('threading.Thread') as mock_thread_cls: + # Run default (threaded=False) + mock_consumer.start_consumer( + queue_name="test_queue", + callback=MagicMock(), + threaded=False, + auto_ack=False, + auto_declare=False + ) + + # Assert NO thread was created + mock_thread_cls.assert_not_called() + +def test_schedule_threadsafe_behavior(mock_consumer): + """Test that _schedule_threadsafe chooses the right Pika method.""" + + mock_func = MagicMock() + arg1 = "test" + + # Case A: Threaded = True -> Use add_callback_threadsafe + mock_consumer._schedule_threadsafe(mock_func, True, arg1) + mock_consumer._connection.add_callback_threadsafe.assert_called_once() + # The function itself shouldn't be called immediately, but wrapped + mock_func.assert_not_called() + + # Reset + mock_consumer._connection.reset_mock() + + # Case B: Threaded = False -> Call immediately + mock_consumer._schedule_threadsafe(mock_func, False, arg1) + mock_consumer._connection.add_callback_threadsafe.assert_not_called() + mock_func.assert_called_once_with(arg1) + +def test_worker_logic_acks_threadsafe(mock_consumer): + """ + Test that the worker logic (_process_single_message) correctly + uses the threadsafe scheduling for ACKs when threaded=True. + """ + mock_method = MagicMock() + mock_method.delivery_tag = 99 + mock_props = MagicMock() + mock_props.headers = {} + mock_body = b'{}' + + # Config passed to worker + runtime_config = { + 'auto_ack': False, + 'threaded': True, # <--- Important + 'callback': MagicMock(), # Successful callback + 'dlx_enable': False, + 'enable_retry_cycles': False + } + + # Spy on _schedule_threadsafe to ensure it's called + with patch.object(mock_consumer, '_schedule_threadsafe') as mock_schedule: + mock_consumer._process_single_message(mock_method, mock_props, mock_body, runtime_config) + + # Verify ACK was scheduled safely + # We expect _schedule_threadsafe to be called with (basic_ack, True, delivery_tag=99) + mock_schedule.assert_called_with( + mock_consumer._channel.basic_ack, + True, + delivery_tag=99 + ) + +def test_worker_logic_acks_blocking(mock_consumer): + """Test that worker logic calls ACK directly when threaded=False.""" + mock_method = MagicMock() + mock_method.delivery_tag = 99 + mock_props = MagicMock() + mock_props.headers = {} + + runtime_config = { + 'auto_ack': False, + 'threaded': False, # <--- Blocking + 'callback': MagicMock(), + 'dlx_enable': False + } + + with patch.object(mock_consumer, '_schedule_threadsafe') as mock_schedule: + mock_consumer._process_single_message(mock_method, mock_props, b'{}', runtime_config) + + # Verify schedule was called with threaded=False + mock_schedule.assert_called_with( + mock_consumer._channel.basic_ack, + False, + delivery_tag=99 + )