Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# MRSAL AMQP
[![Release](https://img.shields.io/badge/release-1.1.3-blue.svg)](https://pypi.org/project/mrsal/)
[![Release](https://img.shields.io/badge/release-1.2.0-blue.svg)](https://pypi.org/project/mrsal/)
[![Python 3.10+](https://img.shields.io/badge/python-3.10%7C3.11%7C3.12-blue.svg)](https://www.python.org/downloads/)
[![Mrsal Workflow](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml/badge.svg?branch=main)](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml)
[![Coverage](https://neomedsys.github.io/mrsal/reports/badges/coverage-badge.svg)](https://neomedsys.github.io/mrsal/reports/coverage/htmlcov/)
Expand Down
1,083 changes: 595 additions & 488 deletions mrsal/amqp/subclass.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions mrsal/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class ValidateTLS(BaseModel):
class AioPikaAttributes(BaseModel):
message_id: str | None
app_id: str | None
headers: dict | None = None


class MrsalNoAsyncioLoopFound(Exception):
Expand Down
170 changes: 138 additions & 32 deletions mrsal/superclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import ssl
import pika
import logging
from datetime import datetime, timezone
from ssl import SSLContext
from typing import Any, Type
from pika.connection import SSLOptions
Expand Down Expand Up @@ -58,7 +59,7 @@ def __post_init__(self) -> None:
self.tls_dict = {cert: (env_var if env_var != '' else None) for cert, env_var in tls_dict.items()}
config.ValidateTLS(**self.tls_dict)

def _setup_exchange_and_queue(self,
def _setup_exchange_and_queue(self,
exchange_name: str, queue_name: str, exchange_type: str,
routing_key: str, exch_args: dict[str, str] | None = None,
queue_args: dict[str, str] | None = None,
Expand Down Expand Up @@ -89,7 +90,7 @@ def _setup_exchange_and_queue(self,

except MrsalSetupError as e:
log.warning(f"DLX {dlx_name} might already exist or failed to create: {e}")

if queue_args is None:
queue_args = {}

Expand Down Expand Up @@ -147,7 +148,7 @@ def _setup_exchange_and_queue(self,
log.error(f'Splæt! I failed the declaration setup with {e}', exc_info=True)
self.auto_declare_ok = False

async def _async_setup_exchange_and_queue(self,
async def _async_setup_exchange_and_queue(self,
exchange_name: str, queue_name: str,
routing_key: str, exchange_type: str,
exch_args: dict[str, str] | None = None,
Expand Down Expand Up @@ -185,14 +186,14 @@ async def _async_setup_exchange_and_queue(self,

except MrsalSetupError as e:
log.warning(f"DLX {dlx_name} might already exist or failed to create: {e}")

if queue_args is None:
queue_args = {}

queue_args.update({
'x-dead-letter-exchange': dlx_name,
'x-dead-letter-routing-key': dlx_routing
})
})

if use_quorum_queues:
if queue_args is None:
Expand Down Expand Up @@ -246,7 +247,7 @@ async def _async_setup_exchange_and_queue(self,
self.auto_declare_ok = False


def _declare_exchange(self,
def _declare_exchange(self,
exchange: str, exchange_type: str,
arguments: dict[str, str] | None,
durable: bool, passive: bool,
Expand Down Expand Up @@ -289,13 +290,13 @@ def _declare_exchange(self,
if self.verbose:
log.info("Exchange declared yo!")

async def _async_declare_exchange(self,
exchange: str,
exchange_type: AioExchangeType,
arguments: dict[str, str] | None = None,
durable: bool = True,
passive: bool = False,
internal: bool = False,
async def _async_declare_exchange(self,
exchange: str,
exchange_type: AioExchangeType,
arguments: dict[str, str] | None = None,
durable: bool = True,
passive: bool = False,
internal: bool = False,
auto_delete: bool = False) -> AioExchange:
"""Declare a RabbitMQ exchange in async mode."""
exchange_declare_info = f"""
Expand All @@ -312,11 +313,11 @@ async def _async_declare_exchange(self,

try:
exchange_obj = await self._channel.declare_exchange(
name=exchange,
type=exchange_type,
durable=durable,
auto_delete=auto_delete,
internal=internal,
name=exchange,
type=exchange_type,
durable=durable,
auto_delete=auto_delete,
internal=internal,
arguments=arguments
)
return exchange_obj
Expand Down Expand Up @@ -361,11 +362,11 @@ def _declare_queue(self,
if self.verbose:
log.info(f"Queue declared yo")

async def _async_declare_queue(self,
queue_name: str,
durable: bool = True,
exclusive: bool = False,
auto_delete: bool = False,
async def _async_declare_queue(self,
queue_name: str,
durable: bool = True,
exclusive: bool = False,
auto_delete: bool = False,
passive: bool = False,
arguments: dict[str, Any] | None = None) -> AioQueue:
"""Declare a RabbitMQ queue asynchronously."""
Expand All @@ -381,18 +382,18 @@ async def _async_declare_queue(self,

try:
queue_obj = await self._channel.declare_queue(
name=queue_name,
durable=durable,
exclusive=exclusive,
auto_delete=auto_delete,
name=queue_name,
durable=durable,
exclusive=exclusive,
auto_delete=auto_delete,
arguments=arguments,
passive=passive
)
return queue_obj
except Exception as e:
raise MrsalSetupError(f"Failed to declare async queue: {e}")

def _declare_queue_binding(self,
def _declare_queue_binding(self,
exchange: str, queue: str,
routing_key: str | None,
arguments: dict[str, str] | None
Expand All @@ -419,10 +420,10 @@ def _declare_queue_binding(self,
if self.verbose:
log.info(f"Queue bound yo")

async def _async_declare_queue_binding(self,
queue: AioQueue,
exchange: AioExchange,
routing_key: str | None,
async def _async_declare_queue_binding(self,
queue: AioQueue,
exchange: AioExchange,
routing_key: str | None,
arguments: dict[str, Any] | None = None) -> None:
"""Bind the queue to the exchange asynchronously."""
binding_info = f"""
Expand Down Expand Up @@ -497,3 +498,108 @@ def _get_retry_count(self, properties) -> int:
def _has_dlx_configured(self, queue_name: str) -> bool:
"""Check if the queue has a dead letter exchange configured."""
return self.dlx_enable

def _get_retry_cycle_info(self, properties) -> dict:
"""Extract retry cycle information from message headers."""
if not hasattr(properties, 'headers') or not properties.headers:
return {'cycle_count': 0, 'first_failure': None, 'total_elapsed': 0}

headers = properties.headers
return {
'cycle_count': headers.get('x-cycle-count', 0),
'first_failure': headers.get('x-first-failure'),
'total_elapsed': headers.get('x-total-elapsed', 0)
}

def _should_continue_retry_cycles(self, retry_info: dict, enable_retry_cycles: bool,
max_retry_time_limit: int) -> bool:
"""Check if message should continue retry cycles or go to permanent DLX."""
if not enable_retry_cycles or not self.dlx_enable:
return False

max_time_ms = max_retry_time_limit * 60 * 1000
return retry_info['total_elapsed'] < max_time_ms

def _create_retry_cycle_headers(self, original_headers: dict, cycle_count: int,
first_failure: str, processing_error: str,
should_cycle: bool, original_exchange: str,
original_routing_key: str) -> dict:
"""Create headers for DLX message with retry cycle info."""
headers = original_headers.copy() if original_headers else {}
now = datetime.now(timezone.utc).isoformat()

# Calculate elapsed time
if first_failure:
try:
first_time = datetime.fromisoformat(first_failure.replace('Z', ''))
elapsed_ms = int((datetime.now(timezone.utc) - first_time).total_seconds() * 1000)
except:
elapsed_ms = 0
else:
first_failure = now
elapsed_ms = 0

# Update retry cycle tracking
headers.update({
'x-cycle-count': cycle_count + 1,
'x-first-failure': first_failure,
'x-total-elapsed': elapsed_ms,
'x-processing-error': processing_error,
'x-retry-exhausted': not should_cycle
})

# If cycling, set TTL and routing back to original queue
if should_cycle:
headers.update({
'x-dead-letter-exchange': original_exchange,
'x-dead-letter-routing-key': original_routing_key
})

return headers

def _handle_dlx_with_retry_cycle(self, method_frame, properties, body, processing_error: str,
original_exchange: str, original_routing_key: str,
enable_retry_cycles: bool, retry_cycle_interval: int,
max_retry_time_limit: int):
"""Base method for DLX handling with retry cycles."""
# Get retry info
retry_info = self._get_retry_cycle_info(properties)
should_cycle = self._should_continue_retry_cycles(retry_info, enable_retry_cycles, max_retry_time_limit)

# Get DLX info
dlx_name = f"{method_frame.routing_key}.dlx"
dlx_routing = original_routing_key

# Create enhanced headers
original_headers = getattr(properties, 'headers', {}) or {}
enhanced_headers = self._create_retry_cycle_headers(
original_headers, retry_info['cycle_count'], retry_info['first_failure'],
processing_error, should_cycle, original_exchange, original_routing_key
)

# Create properties for DLX message
dlx_properties = {
'headers': enhanced_headers,
'delivery_mode': 2, # Persistent
'content_type': getattr(properties, 'content_type', 'application/json')
}

# Set TTL if cycling
if should_cycle:
ttl_ms = retry_cycle_interval * 60 * 1000
dlx_properties['expiration'] = str(ttl_ms)

# Call subclass-specific publish method
self._publish_to_dlx(dlx_name, dlx_routing, body, dlx_properties)

# Log result
if should_cycle:
log.info(f"Message sent to DLX for retry cycle {retry_info['cycle_count'] + 1} "
f"(next retry in {retry_cycle_interval}m)")
else:
log.error(f"Message permanently failed after {retry_info['cycle_count']} cycles "
f"- staying in DLX for manual replay")

def _publish_to_dlx(self, dlx_exchange: str, routing_key: str, body: bytes, properties: dict):
"""Abstract method - implemented by subclasses."""
raise NotImplementedError("Subclasses must implement _publish_to_dlx")
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ license = ""
maintainers = ["Jon E Nesvold <[email protected]>"]
name = "mrsal"
readme = "README.md"
version = "1.1.5"
version = "1.2.0"

[tool.poetry.dependencies]
colorlog = "^6.7.0"
Expand Down
Binary file modified reports/coverage/.coverage
Binary file not shown.
Loading