diff --git a/README.md b/README.md index fb5a194..831b45b 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # MRSAL AMQP -[![Release](https://img.shields.io/badge/release-1.2.0-blue.svg)](https://pypi.org/project/mrsal/) +[![Release](https://img.shields.io/badge/release-2.0.1-blue.svg)](https://pypi.org/project/mrsal/) [![Python 3.10+](https://img.shields.io/badge/python-3.10%7C3.11%7C3.12-blue.svg)](https://www.python.org/downloads/) [![Mrsal Workflow](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml/badge.svg?branch=main)](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml) [![Coverage](https://neomedsys.github.io/mrsal/reports/badges/coverage-badge.svg)](https://neomedsys.github.io/mrsal/reports/coverage/htmlcov/) diff --git a/mrsal/amqp/subclass.py b/mrsal/amqp/subclass.py index 81f9c2d..0690498 100644 --- a/mrsal/amqp/subclass.py +++ b/mrsal/amqp/subclass.py @@ -211,7 +211,7 @@ def start_consumer(self, self._publish_to_dlx_with_retry_cycle( method_frame, properties, body, "Callback processing failed", exchange_name, routing_key, enable_retry_cycles, - retry_cycle_interval, max_retry_time_limit + retry_cycle_interval, max_retry_time_limit, dlx_exchange_name ) elif dlx_enable: # Original DLX behavior @@ -361,17 +361,26 @@ def publish_messages( except Exception as e: log.error(f"Unexpected error while publishing message: {e}") - def _publish_to_dlx_with_retry_cycle(self, method_frame, properties, body, processing_error: str, - original_exchange: str, original_routing_key: str, - enable_retry_cycles: bool, retry_cycle_interval: int, - max_retry_time_limit: int): + def _publish_to_dlx_with_retry_cycle( + self, + method_frame, properties, body, processing_error: str, + original_exchange: str, original_routing_key: str, + enable_retry_cycles: bool, retry_cycle_interval: int, + max_retry_time_limit: int, dlx_exchange_name: str | None): """Publish message to DLX with retry cycle headers.""" try: # Use common logic from superclass - self._handle_dlx_with_retry_cycle( - method_frame, properties, body, processing_error, - original_exchange, original_routing_key, - enable_retry_cycles, retry_cycle_interval, max_retry_time_limit + self._handle_dlx_with_retry_cycle_sync( + method_frame=method_frame, + properties=properties, + body=body, + processing_error=processing_error, + original_exchange=original_exchange, + original_routing_key=original_routing_key, + enable_retry_cycles=enable_retry_cycles, + retry_cycle_interval=retry_cycle_interval, + max_retry_time_limit=max_retry_time_limit, + dlx_exchange_name=dlx_exchange_name ) # Acknowledge original message @@ -390,7 +399,7 @@ def _publish_to_dlx(self, dlx_exchange: str, routing_key: str, body: bytes, prop content_type=properties.get('content_type', 'application/json'), expiration=properties.get('expiration') ) - + self._channel.basic_publish( exchange=dlx_exchange, routing_key=routing_key, @@ -561,7 +570,7 @@ async def start_consumer( await self._async_publish_to_dlx_with_retry_cycle( message, properties, "Callback processing failed", exchange_name, routing_key, enable_retry_cycles, - retry_cycle_interval, max_retry_time_limit + retry_cycle_interval, max_retry_time_limit, dlx_exchange_name ) elif dlx_enable: # Original DLX behavior @@ -581,14 +590,20 @@ async def start_consumer( async def _async_publish_to_dlx_with_retry_cycle(self, message, properties, processing_error: str, original_exchange: str, original_routing_key: str, enable_retry_cycles: bool, retry_cycle_interval: int, - max_retry_time_limit: int): + max_retry_time_limit: int, dlx_exchange_name: str | None): """Async publish message to DLX with retry cycle headers.""" try: # Use common logic from superclass - self._handle_dlx_with_retry_cycle( - message, properties, message.body, processing_error, - original_exchange, original_routing_key, - enable_retry_cycles, retry_cycle_interval, max_retry_time_limit + await self._handle_dlx_with_retry_cycle_async( + message=message, + properties=properties, + processing_error=processing_error, + original_exchange=original_exchange, + original_routing_key=original_routing_key, + enable_retry_cycles=enable_retry_cycles, + retry_cycle_interval=retry_cycle_interval, + max_retry_time_limit=max_retry_time_limit, + dlx_exchange_name=dlx_exchange_name ) # Acknowledge original message diff --git a/mrsal/superclass.py b/mrsal/superclass.py index 328c427..b6bd013 100644 --- a/mrsal/superclass.py +++ b/mrsal/superclass.py @@ -1,4 +1,4 @@ -# external + # external import os import ssl import pika @@ -43,6 +43,7 @@ class Mrsal: prefetch_count: int = 5 heartbeat: int = 60 # sec dlx_enable: bool = True + dlx_exchange_name = None max_retries: int = 3 use_quorum_queues: bool = True _connection = None @@ -73,7 +74,7 @@ def _setup_exchange_and_queue(self, if dlx_enable: - dlx_name = dlx_exchange_name or f"{queue_name}.dlx" + dlx_name = dlx_exchange_name or f"{exchange_name}.dlx" dlx_routing = dlx_routing_key or routing_key try: self._declare_exchange( @@ -167,7 +168,7 @@ async def _async_setup_exchange_and_queue(self, raise MrsalAbortedSetup("Oh my Oh my! Connection not found when trying to run the setup!") if dlx_enable: - dlx_name = dlx_exchange_name or f"{queue_name}.dlx" + dlx_name = dlx_exchange_name or f"{exchange_name}.dlx" dlx_routing = dlx_routing_key or routing_key try: @@ -557,17 +558,18 @@ def _create_retry_cycle_headers(self, original_headers: dict, cycle_count: int, return headers - def _handle_dlx_with_retry_cycle(self, method_frame, properties, body, processing_error: str, - original_exchange: str, original_routing_key: str, - enable_retry_cycles: bool, retry_cycle_interval: int, - max_retry_time_limit: int): + def _handle_dlx_with_retry_cycle_sync( + self, method_frame, properties, body, processing_error: str, + original_exchange: str, original_routing_key: str, + enable_retry_cycles: bool, retry_cycle_interval: int, + max_retry_time_limit: int, dlx_exchange_name: str | None): """Base method for DLX handling with retry cycles.""" # Get retry info retry_info = self._get_retry_cycle_info(properties) should_cycle = self._should_continue_retry_cycles(retry_info, enable_retry_cycles, max_retry_time_limit) # Get DLX info - dlx_name = f"{method_frame.routing_key}.dlx" + dlx_name = dlx_exchange_name or f"{original_exchange}.dlx" dlx_routing = original_routing_key # Create enhanced headers @@ -600,6 +602,50 @@ def _handle_dlx_with_retry_cycle(self, method_frame, properties, body, processin log.error(f"Message permanently failed after {retry_info['cycle_count']} cycles " f"- staying in DLX for manual replay") + async def _handle_dlx_with_retry_cycle_async( + self, message, properties, processing_error: str, + original_exchange: str, original_routing_key: str, + enable_retry_cycles: bool, retry_cycle_interval: int, + max_retry_time_limit: int, dlx_exchange_name: str | None): + """Base method for DLX handling with retry cycles.""" + # Get retry info + retry_info = self._get_retry_cycle_info(properties) + should_cycle = self._should_continue_retry_cycles(retry_info, enable_retry_cycles, max_retry_time_limit) + + # Get DLX info + dlx_name = dlx_exchange_name or f"{original_exchange}.dlx" + dlx_routing = original_routing_key + + # Create enhanced headers + original_headers = getattr(properties, 'headers', {}) or {} + enhanced_headers = self._create_retry_cycle_headers( + original_headers, retry_info['cycle_count'], retry_info['first_failure'], + processing_error, should_cycle, original_exchange, original_routing_key + ) + + # Create properties for DLX message + dlx_properties = { + 'headers': enhanced_headers, + 'delivery_mode': 2, # Persistent + 'content_type': getattr(properties, 'content_type', 'application/json') + } + + # Set TTL if cycling + if should_cycle: + ttl_ms = retry_cycle_interval * 60 * 1000 + dlx_properties['expiration'] = str(ttl_ms) + + # Call subclass-specific publish method + await self._publish_to_dlx(dlx_name, dlx_routing, message.body, dlx_properties) + + # Log result + if should_cycle: + log.info(f"Message sent to DLX for retry cycle {retry_info['cycle_count'] + 1} " + f"(next retry in {retry_cycle_interval}m)") + else: + log.error(f"Message permanently failed after {retry_info['cycle_count']} cycles " + f"- staying in DLX for manual replay") + def _publish_to_dlx(self, dlx_exchange: str, routing_key: str, body: bytes, properties: dict): """Abstract method - implemented by subclasses.""" raise NotImplementedError("Subclasses must implement _publish_to_dlx") diff --git a/pyproject.toml b/pyproject.toml index 01f1c61..3f18ce0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ license = "" maintainers = ["Jon E Nesvold "] name = "mrsal" readme = "README.md" -version = "2.0.0" +version = "2.0.1" [tool.poetry.dependencies] colorlog = "^6.7.0" diff --git a/reports/coverage/.coverage b/reports/coverage/.coverage index d90cf9a..12e46cb 100644 Binary files a/reports/coverage/.coverage and b/reports/coverage/.coverage differ diff --git a/reports/coverage/coverage.xml b/reports/coverage/coverage.xml index 720ae75..9f79c59 100644 --- a/reports/coverage/coverage.xml +++ b/reports/coverage/coverage.xml @@ -1,5 +1,5 @@ - + @@ -79,7 +79,7 @@ - + @@ -118,7 +118,7 @@ - + @@ -150,68 +150,68 @@ - + - + - - + + - - - + + + - + - - + + - - - - - - - - + + + + + + + + - + - - - - + + + + - + - - - + + + - + - - + + - - - - - - - - - + + + + + + + + + @@ -219,142 +219,158 @@ - + - - - - - + + + + + - + - - - - - - - + + + + + + + - - + + - - - - - + + + + + - + - - - - - - - + + + + + + + - - + + - - - - + + + + - + - - - - - - - + + + + + + + - + - - + + - - + + - + - - - - + + + + - + - + - - - + + + - - - - + + + + - + - - + + - + - - + + - + - + - - + + - - - - - - - - - - - + + + + + + + + + + - - - - - - + + + + + + + + + + + + + + + + + + + + + + + - + - + @@ -498,116 +514,116 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - - + + + + + + + + + + + - + + - + - - - - - - - - - - + + + + + + + + - + - - + + - + - + + + - - + + - + + + - - - - - - + + - - + + + + + + + - - - - - - - - + + + + + + + + + + - + @@ -840,7 +856,7 @@ - + @@ -848,253 +864,417 @@ - + - - + + - + - + - - + + - - + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + - - + + + - - + - - + + + - - - + + + - - + + - - + + - - + - + - - - - + + + + + - + - - + - + - + - + + - + - - + - - + + + - + - - + - - - - - + + + + + - + + - - - + + - - - - - - - + + + + + + + + - - + + - - - - - - + + + + + - + - - + + + - - + - - - + + + - + - - + + + - - + - - - + + + - - + + - + + - - + - - - - + + + + - - - + + + + - - + + - - - - + + + + - - + + - - - - - - - + + + + + + - - + + - + + - - diff --git a/reports/junit/junit.xml b/reports/junit/junit.xml index 53bc8c4..106fc0b 100644 --- a/reports/junit/junit.xml +++ b/reports/junit/junit.xml @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file diff --git a/tests/test_mrsal_custom_dlx.py b/tests/test_mrsal_custom_dlx.py new file mode 100644 index 0000000..b46f124 --- /dev/null +++ b/tests/test_mrsal_custom_dlx.py @@ -0,0 +1,361 @@ +import pytest +from unittest.mock import Mock, MagicMock, AsyncMock, patch +from mrsal.amqp.subclass import MrsalBlockingAMQP, MrsalAsyncAMQP +from pydantic.dataclasses import dataclass + +from mrsal.exceptions import MrsalAbortedSetup + + +@dataclass +class ExpectedPayload: + id: int + name: str + active: bool + + +class AsyncIteratorMock: + """Mock async iterator for aio-pika queue.iterator()""" + def __init__(self, items): + self.items = iter(items) + + def __aiter__(self): + return self + + async def __anext__(self): + try: + return next(self.items) + except StopIteration: + raise StopAsyncIteration + + +class TestDLXExchangeNameConfiguration: + """Test DLX exchange name configuration and fallback behavior""" + + @pytest.fixture + def mock_consumer(self): + """Create a mock consumer with mocked connection and channel""" + consumer = MrsalBlockingAMQP( + host="localhost", + port=5672, + credentials=("user", "password"), + virtual_host="testboi", + ssl=False, + verbose=False, + prefetch_count=1, + heartbeat=60, + dlx_enable=True, + max_retries=2, + use_quorum_queues=True, + blocked_connection_timeout=60 + ) + + # Mock connection and channel + consumer._connection = MagicMock() + consumer._channel = MagicMock() + consumer.auto_declare_ok = True + + # Mock setup methods + consumer.setup_blocking_connection = MagicMock() + consumer._setup_exchange_and_queue = MagicMock() + + return consumer + + @pytest.fixture + def mock_async_consumer(self): + """Create a mock async consumer""" + consumer = MrsalAsyncAMQP( + host="localhost", + port=5672, + credentials=("user", "password"), + virtual_host="testboi", + ssl=False, + verbose=False, + prefetch_count=1, + heartbeat=60, + dlx_enable=True, + max_retries=2, + use_quorum_queues=True + ) + + # Mock connection and channel + consumer._connection = AsyncMock() + consumer._channel = AsyncMock() + consumer.auto_declare_ok = True + + # Mock setup methods + consumer.setup_async_connection = AsyncMock() + consumer._async_setup_exchange_and_queue = AsyncMock() + + return consumer + + def test_dlx_fallback_naming_sync(self, mock_consumer): + """Test that DLX exchange name falls back to {exchange}.dlx when not specified""" + mock_method_frame = MagicMock() + mock_method_frame.delivery_tag = 123 + mock_method_frame.routing_key = "test_key" + mock_properties = MagicMock() + mock_properties.message_id = 'test_msg' + mock_properties.app_id = 'test_app' + mock_properties.headers = None + invalid_body = b'{"id": "wrong_type", "name": "Test", "active": true}' + + def debug_consume_generator(): + print("DEBUG: Starting consume generator") + for i in range(3): + print(f"DEBUG: Yielding message {i+1}") + yield (mock_method_frame, mock_properties, invalid_body) + print("DEBUG: Generator exhausted") + + # Create generator that simulates redelivery cycle + def consume_generator(): + for _ in range(3): # Max retries + 1 + yield (mock_method_frame, mock_properties, invalid_body) + + mock_consumer._channel.consume.return_value = consume_generator() + + # Mock the DLX publishing method to capture calls + mock_consumer._publish_to_dlx = MagicMock() + + mock_consumer.start_consumer( + queue_name="test_queue", + callback=Mock(), + auto_ack=False, + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + payload_model=ExpectedPayload, + dlx_enable=True, + enable_retry_cycles=True, + # dlx_exchange_name NOT specified - should use fallback + retry_cycle_interval=10, + max_retry_time_limit=60 + ) + + # Verify _publish_to_dlx was called with fallback name "test_exchange.dlx" + mock_consumer._publish_to_dlx.assert_called() + call_args = mock_consumer._publish_to_dlx.call_args + dlx_exchange_used = call_args[0][0] # First positional argument + assert dlx_exchange_used == "test_exchange.dlx" + + def test_dlx_custom_naming_sync(self, mock_consumer): + """Test that custom DLX exchange name is used when specified""" + mock_method_frame = MagicMock() + mock_method_frame.delivery_tag = 123 + mock_method_frame.routing_key = "test_key" + mock_properties = MagicMock() + mock_properties.message_id = 'test_msg' + mock_properties.app_id = 'test_app' + mock_properties.headers = None + invalid_body = b'{"id": "wrong_type", "name": "Test", "active": true}' + + def consume_generator(): + for _ in range(3): + yield (mock_method_frame, mock_properties, invalid_body) + + mock_consumer._channel.consume.return_value = consume_generator() + mock_consumer._publish_to_dlx = MagicMock() + + mock_consumer.start_consumer( + queue_name="test_queue", + callback=Mock(), + auto_ack=False, + auto_declare=True, + exchange_name="test_exchange", + exchange_type="direct", + routing_key="test_key", + payload_model=ExpectedPayload, + dlx_enable=True, + enable_retry_cycles=True, + dlx_exchange_name="custom_dlx_exchange", # Custom DLX name + retry_cycle_interval=10, + max_retry_time_limit=60 + ) + + # Verify _publish_to_dlx was called with custom name + mock_consumer._publish_to_dlx.assert_called() + call_args = mock_consumer._publish_to_dlx.call_args + print(f"DEBUG - All call args: {call_args}") + print(f"DEBUG - Positional args: {call_args[0] if call_args[0] else 'None'}") + + dlx_exchange_used = call_args[0][0] + assert dlx_exchange_used == "custom_dlx_exchange" + + @pytest.mark.asyncio + async def test_dlx_fallback_naming_async(self, mock_async_consumer): + """Test async DLX exchange name fallback behavior""" + mock_message = MagicMock() + mock_message.delivery_tag = 123 + mock_message.app_id = 'test_app' + mock_message.headers = None + mock_message.body = b'{"id": "wrong_type", "name": "Test", "active": true}' + mock_message.ack = AsyncMock() + mock_message.reject = AsyncMock() + + mock_properties = MagicMock() + mock_properties.headers = None + + mock_queue = AsyncMock() + mock_async_consumer._async_setup_exchange_and_queue.return_value = mock_queue + + # Multiple redeliveries to trigger DLX + async_iterator = AsyncIteratorMock([mock_message] * 3) + mock_queue.iterator = Mock(return_value=async_iterator) + + # Mock the DLX publishing method + mock_async_consumer._publish_to_dlx = AsyncMock() + + await mock_async_consumer.start_consumer( + queue_name="test_queue", + callback=AsyncMock(side_effect=Exception("Processing failed")), + auto_ack=False, + auto_declare=True, + exchange_name="async_exchange", + exchange_type="direct", + routing_key="test_key", + payload_model=ExpectedPayload, + dlx_enable=True, + enable_retry_cycles=True, + # dlx_exchange_name NOT specified - should use fallback + retry_cycle_interval=10, + max_retry_time_limit=60 + ) + + # Verify _publish_to_dlx was called with fallback name + mock_async_consumer._publish_to_dlx.assert_called() + print(f"DEBUG: _publish_to_dlx call_args: {mock_async_consumer._publish_to_dlx.call_args}") + print(f"DEBUG: All calls: {mock_async_consumer._publish_to_dlx.call_args_list}") + call_args = mock_async_consumer._publish_to_dlx.call_args + dlx_exchange_used = call_args[0][0] + assert dlx_exchange_used == "async_exchange.dlx" + + @pytest.mark.asyncio + async def test_dlx_custom_naming_async(self, mock_async_consumer): + """Test async custom DLX exchange name usage""" + mock_message = MagicMock() + mock_message.delivery_tag = 123 + mock_message.app_id = 'test_app' + mock_message.headers = None + mock_message.body = b'{"id": "wrong_type", "name": "Test", "active": true}' + mock_message.ack = AsyncMock() + mock_message.reject = AsyncMock() + + mock_properties = MagicMock() + mock_properties.headers = None + + mock_queue = AsyncMock() + mock_async_consumer._async_setup_exchange_and_queue.return_value = mock_queue + + async_iterator = AsyncIteratorMock([mock_message] * 3) + mock_queue.iterator = Mock(return_value=async_iterator) + + mock_async_consumer._publish_to_dlx = AsyncMock() + + await mock_async_consumer.start_consumer( + queue_name="test_queue", + callback=AsyncMock(side_effect=Exception("Processing failed")), + auto_ack=False, + auto_declare=True, + exchange_name="async_exchange", + exchange_type="direct", + routing_key="test_key", + payload_model=ExpectedPayload, + dlx_enable=True, + enable_retry_cycles=True, + dlx_exchange_name="async_custom_dlx", # Custom DLX name + retry_cycle_interval=10, + max_retry_time_limit=60 + ) + + # Verify custom DLX name was used + mock_async_consumer._publish_to_dlx.assert_called() + call_args = mock_async_consumer._publish_to_dlx.call_args + dlx_exchange_used = call_args[0][0] + assert dlx_exchange_used == "async_custom_dlx" + + def test_dlx_exchange_setup_fallback_sync(self, mock_consumer): + """Test that DLX exchange setup uses fallback naming""" + mock_consumer.start_consumer( + queue_name="setup_test_queue", + auto_declare=True, + exchange_name="setup_exchange", + exchange_type="direct", + routing_key="setup_key", + dlx_enable=True, + # dlx_exchange_name not specified + ) + + # Verify _setup_exchange_and_queue was called + mock_consumer._setup_exchange_and_queue.assert_called() + call_args = mock_consumer._setup_exchange_and_queue.call_args[1] # kwargs + + # Check that dlx_exchange_name parameter is None (will use fallback) + assert call_args.get('dlx_exchange_name') is None + assert call_args.get('exchange_name') == "setup_exchange" + + def test_dlx_exchange_setup_custom_sync(self, mock_consumer): + """Test that DLX exchange setup uses custom naming when provided""" + mock_consumer.start_consumer( + queue_name="setup_test_queue", + auto_declare=True, + exchange_name="setup_exchange", + exchange_type="direct", + routing_key="setup_key", + dlx_enable=True, + dlx_exchange_name="custom_setup_dlx" + ) + + mock_consumer._setup_exchange_and_queue.assert_called() + call_args = mock_consumer._setup_exchange_and_queue.call_args[1] + + # Check that custom dlx_exchange_name was passed + assert call_args.get('dlx_exchange_name') == "custom_setup_dlx" + + @pytest.mark.asyncio + async def test_dlx_exchange_setup_fallback_async(self, mock_async_consumer): + """Test async DLX exchange setup with fallback naming""" + mock_queue = AsyncMock() + mock_async_consumer._async_setup_exchange_and_queue.return_value = mock_queue + + async_iterator = AsyncIteratorMock([]) # Empty to avoid processing + mock_queue.iterator = Mock(return_value=async_iterator) + + await mock_async_consumer.start_consumer( + queue_name="async_setup_queue", + auto_declare=True, + exchange_name="async_setup_exchange", + exchange_type="direct", + routing_key="async_setup_key", + dlx_enable=True, + # dlx_exchange_name not specified + ) + + mock_async_consumer._async_setup_exchange_and_queue.assert_called() + call_args = mock_async_consumer._async_setup_exchange_and_queue.call_args[1] + + assert call_args.get('dlx_exchange_name') is None + assert call_args.get('exchange_name') == "async_setup_exchange" + + @pytest.mark.asyncio + async def test_dlx_exchange_setup_custom_async(self, mock_async_consumer): + """Test async DLX exchange setup with custom naming""" + mock_queue = AsyncMock() + mock_async_consumer._async_setup_exchange_and_queue.return_value = mock_queue + + async_iterator = AsyncIteratorMock([]) + mock_queue.iterator = Mock(return_value=async_iterator) + + await mock_async_consumer.start_consumer( + queue_name="async_setup_queue", + auto_declare=True, + exchange_name="async_setup_exchange", + exchange_type="direct", + routing_key="async_setup_key", + dlx_enable=True, + dlx_exchange_name="async_custom_setup_dlx" + ) + + mock_async_consumer._async_setup_exchange_and_queue.assert_called() + call_args = mock_async_consumer._async_setup_exchange_and_queue.call_args[1] + + assert call_args.get('dlx_exchange_name') == "async_custom_setup_dlx" diff --git a/tests/test_mrsal_dlx_retry.py b/tests/test_mrsal_dlx_retry.py index 9a570d6..bcea1e7 100644 --- a/tests/test_mrsal_dlx_retry.py +++ b/tests/test_mrsal_dlx_retry.py @@ -1,10 +1,8 @@ import pytest from unittest.mock import Mock, MagicMock, AsyncMock, patch from pydantic.dataclasses import dataclass -from pydantic import ValidationError from mrsal.amqp.subclass import MrsalBlockingAMQP, MrsalAsyncAMQP -from mrsal.exceptions import MrsalAbortedSetup @dataclass