Skip to content
Merged

fuck #55

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# MRSAL AMQP
[![Release](https://img.shields.io/badge/release-1.2.0-blue.svg)](https://pypi.org/project/mrsal/)
[![Release](https://img.shields.io/badge/release-2.0.1-blue.svg)](https://pypi.org/project/mrsal/)
[![Python 3.10+](https://img.shields.io/badge/python-3.10%7C3.11%7C3.12-blue.svg)](https://www.python.org/downloads/)
[![Mrsal Workflow](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml/badge.svg?branch=main)](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml)
[![Coverage](https://neomedsys.github.io/mrsal/reports/badges/coverage-badge.svg)](https://neomedsys.github.io/mrsal/reports/coverage/htmlcov/)
Expand Down
47 changes: 31 additions & 16 deletions mrsal/amqp/subclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def start_consumer(self,
self._publish_to_dlx_with_retry_cycle(
method_frame, properties, body, "Callback processing failed",
exchange_name, routing_key, enable_retry_cycles,
retry_cycle_interval, max_retry_time_limit
retry_cycle_interval, max_retry_time_limit, dlx_exchange_name
)
elif dlx_enable:
# Original DLX behavior
Expand Down Expand Up @@ -361,17 +361,26 @@ def publish_messages(
except Exception as e:
log.error(f"Unexpected error while publishing message: {e}")

def _publish_to_dlx_with_retry_cycle(self, method_frame, properties, body, processing_error: str,
original_exchange: str, original_routing_key: str,
enable_retry_cycles: bool, retry_cycle_interval: int,
max_retry_time_limit: int):
def _publish_to_dlx_with_retry_cycle(
self,
method_frame, properties, body, processing_error: str,
original_exchange: str, original_routing_key: str,
enable_retry_cycles: bool, retry_cycle_interval: int,
max_retry_time_limit: int, dlx_exchange_name: str | None):
"""Publish message to DLX with retry cycle headers."""
try:
# Use common logic from superclass
self._handle_dlx_with_retry_cycle(
method_frame, properties, body, processing_error,
original_exchange, original_routing_key,
enable_retry_cycles, retry_cycle_interval, max_retry_time_limit
self._handle_dlx_with_retry_cycle_sync(
method_frame=method_frame,
properties=properties,
body=body,
processing_error=processing_error,
original_exchange=original_exchange,
original_routing_key=original_routing_key,
enable_retry_cycles=enable_retry_cycles,
retry_cycle_interval=retry_cycle_interval,
max_retry_time_limit=max_retry_time_limit,
dlx_exchange_name=dlx_exchange_name
)

# Acknowledge original message
Expand All @@ -390,7 +399,7 @@ def _publish_to_dlx(self, dlx_exchange: str, routing_key: str, body: bytes, prop
content_type=properties.get('content_type', 'application/json'),
expiration=properties.get('expiration')
)

self._channel.basic_publish(
exchange=dlx_exchange,
routing_key=routing_key,
Expand Down Expand Up @@ -561,7 +570,7 @@ async def start_consumer(
await self._async_publish_to_dlx_with_retry_cycle(
message, properties, "Callback processing failed",
exchange_name, routing_key, enable_retry_cycles,
retry_cycle_interval, max_retry_time_limit
retry_cycle_interval, max_retry_time_limit, dlx_exchange_name
)
elif dlx_enable:
# Original DLX behavior
Expand All @@ -581,14 +590,20 @@ async def start_consumer(
async def _async_publish_to_dlx_with_retry_cycle(self, message, properties, processing_error: str,
original_exchange: str, original_routing_key: str,
enable_retry_cycles: bool, retry_cycle_interval: int,
max_retry_time_limit: int):
max_retry_time_limit: int, dlx_exchange_name: str | None):
"""Async publish message to DLX with retry cycle headers."""
try:
# Use common logic from superclass
self._handle_dlx_with_retry_cycle(
message, properties, message.body, processing_error,
original_exchange, original_routing_key,
enable_retry_cycles, retry_cycle_interval, max_retry_time_limit
await self._handle_dlx_with_retry_cycle_async(
message=message,
properties=properties,
processing_error=processing_error,
original_exchange=original_exchange,
original_routing_key=original_routing_key,
enable_retry_cycles=enable_retry_cycles,
retry_cycle_interval=retry_cycle_interval,
max_retry_time_limit=max_retry_time_limit,
dlx_exchange_name=dlx_exchange_name
)

# Acknowledge original message
Expand Down
62 changes: 54 additions & 8 deletions mrsal/superclass.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# external
# external
import os
import ssl
import pika
Expand Down Expand Up @@ -43,6 +43,7 @@ class Mrsal:
prefetch_count: int = 5
heartbeat: int = 60 # sec
dlx_enable: bool = True
dlx_exchange_name = None
max_retries: int = 3
use_quorum_queues: bool = True
_connection = None
Expand Down Expand Up @@ -73,7 +74,7 @@ def _setup_exchange_and_queue(self,


if dlx_enable:
dlx_name = dlx_exchange_name or f"{queue_name}.dlx"
dlx_name = dlx_exchange_name or f"{exchange_name}.dlx"
dlx_routing = dlx_routing_key or routing_key
try:
self._declare_exchange(
Expand Down Expand Up @@ -167,7 +168,7 @@ async def _async_setup_exchange_and_queue(self,
raise MrsalAbortedSetup("Oh my Oh my! Connection not found when trying to run the setup!")

if dlx_enable:
dlx_name = dlx_exchange_name or f"{queue_name}.dlx"
dlx_name = dlx_exchange_name or f"{exchange_name}.dlx"
dlx_routing = dlx_routing_key or routing_key

try:
Expand Down Expand Up @@ -557,17 +558,18 @@ def _create_retry_cycle_headers(self, original_headers: dict, cycle_count: int,

return headers

def _handle_dlx_with_retry_cycle(self, method_frame, properties, body, processing_error: str,
original_exchange: str, original_routing_key: str,
enable_retry_cycles: bool, retry_cycle_interval: int,
max_retry_time_limit: int):
def _handle_dlx_with_retry_cycle_sync(
self, method_frame, properties, body, processing_error: str,
original_exchange: str, original_routing_key: str,
enable_retry_cycles: bool, retry_cycle_interval: int,
max_retry_time_limit: int, dlx_exchange_name: str | None):
"""Base method for DLX handling with retry cycles."""
# Get retry info
retry_info = self._get_retry_cycle_info(properties)
should_cycle = self._should_continue_retry_cycles(retry_info, enable_retry_cycles, max_retry_time_limit)

# Get DLX info
dlx_name = f"{method_frame.routing_key}.dlx"
dlx_name = dlx_exchange_name or f"{original_exchange}.dlx"
dlx_routing = original_routing_key

# Create enhanced headers
Expand Down Expand Up @@ -600,6 +602,50 @@ def _handle_dlx_with_retry_cycle(self, method_frame, properties, body, processin
log.error(f"Message permanently failed after {retry_info['cycle_count']} cycles "
f"- staying in DLX for manual replay")

async def _handle_dlx_with_retry_cycle_async(
self, message, properties, processing_error: str,
original_exchange: str, original_routing_key: str,
enable_retry_cycles: bool, retry_cycle_interval: int,
max_retry_time_limit: int, dlx_exchange_name: str | None):
"""Base method for DLX handling with retry cycles."""
# Get retry info
retry_info = self._get_retry_cycle_info(properties)
should_cycle = self._should_continue_retry_cycles(retry_info, enable_retry_cycles, max_retry_time_limit)

# Get DLX info
dlx_name = dlx_exchange_name or f"{original_exchange}.dlx"
dlx_routing = original_routing_key

# Create enhanced headers
original_headers = getattr(properties, 'headers', {}) or {}
enhanced_headers = self._create_retry_cycle_headers(
original_headers, retry_info['cycle_count'], retry_info['first_failure'],
processing_error, should_cycle, original_exchange, original_routing_key
)

# Create properties for DLX message
dlx_properties = {
'headers': enhanced_headers,
'delivery_mode': 2, # Persistent
'content_type': getattr(properties, 'content_type', 'application/json')
}

# Set TTL if cycling
if should_cycle:
ttl_ms = retry_cycle_interval * 60 * 1000
dlx_properties['expiration'] = str(ttl_ms)

# Call subclass-specific publish method
await self._publish_to_dlx(dlx_name, dlx_routing, message.body, dlx_properties)

# Log result
if should_cycle:
log.info(f"Message sent to DLX for retry cycle {retry_info['cycle_count'] + 1} "
f"(next retry in {retry_cycle_interval}m)")
else:
log.error(f"Message permanently failed after {retry_info['cycle_count']} cycles "
f"- staying in DLX for manual replay")

def _publish_to_dlx(self, dlx_exchange: str, routing_key: str, body: bytes, properties: dict):
"""Abstract method - implemented by subclasses."""
raise NotImplementedError("Subclasses must implement _publish_to_dlx")
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ license = ""
maintainers = ["Jon E Nesvold <jnesvold@pm.me>"]
name = "mrsal"
readme = "README.md"
version = "2.0.0"
version = "2.0.1"

[tool.poetry.dependencies]
colorlog = "^6.7.0"
Expand Down
Binary file modified reports/coverage/.coverage
Binary file not shown.
Loading