Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# MRSAL AMQP
[![Release](https://img.shields.io/badge/release-3.1.0-blue.svg)](https://pypi.org/project/mrsal/)
[![Release](https://img.shields.io/badge/release-3.2.0-blue.svg)](https://pypi.org/project/mrsal/)
[![Python 3.10+](https://img.shields.io/badge/python-3.10%7C3.11%7C3.12-blue.svg)](https://www.python.org/downloads/)
[![Mrsal Workflow](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml/badge.svg?branch=main)](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml)
[![Coverage](https://neomedsys.github.io/mrsal/reports/badges/coverage-badge.svg)](https://neomedsys.github.io/mrsal/reports/coverage/htmlcov/)
Expand Down
248 changes: 141 additions & 107 deletions mrsal/amqp/subclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import pika
import json
import logging
import threading
from functools import partial
from mrsal.exceptions import MrsalAbortedSetup, MrsalNoAsyncioLoopError
from logging import WARNING
from pika.exceptions import (
Expand Down Expand Up @@ -82,6 +84,77 @@ def setup_blocking_connection(self) -> None:
except Exception as e:
log.error(f"Unexpected error caught: {e}")

def _schedule_threadsafe(self, func: Callable, threaded: bool, *args, **kwargs) -> None:
"""
Executes an AMQP operation safely based on the threading mode.
"""
if threaded:
cb = partial(func, *args, **kwargs)
self._connection.add_callback_threadsafe(cb)
else:
func(*args, **kwargs)

def _process_single_message(self, method_frame, properties, body, runtime_config: dict) -> None:
"""
Worker method to process a single message.
Accepts a config dict to avoid an explosion of arguments.
"""
auto_ack = runtime_config.get('auto_ack')
threaded = runtime_config.get('threaded')
callback = runtime_config.get('callback')
callback_args = runtime_config.get('callback_args')
payload_model = runtime_config.get('payload_model')
dlx_enable = runtime_config.get('dlx_enable')
enable_retry_cycles = runtime_config.get('enable_retry_cycles')

app_id = properties.app_id if hasattr(properties, 'app_id') else 'no AppID'
msg_id = properties.message_id if hasattr(properties, 'message_id') else 'no MsgID'
delivery_tag = method_frame.delivery_tag

current_retry = properties.headers.get('x-delivery-count', 0) if properties and properties.headers else 0

if self.verbose:
log.info(f"Processing message {msg_id} from {app_id} (Retry: {current_retry})")

should_process = True
if payload_model:
try:
self.validate_payload(body, payload_model)
except (ValidationError, json.JSONDecodeError, UnicodeDecodeError, TypeError) as e:
log.error(f"Payload validation failed for {msg_id}: {e}")
should_process = False

if callback and should_process:
try:
if callback_args:
callback(*callback_args, method_frame, properties, body)
else:
callback(method_frame, properties, body)
except Exception as e:
log.error(f"Oh lordy lord, payload validation failed for your specific model requirements: {e}")
should_process = False

if not should_process and not auto_ack:
if dlx_enable and enable_retry_cycles:
self._schedule_threadsafe(
self._publish_to_dlx_with_retry_cycle, threaded,
method_frame, properties, body, "Callback failed",
runtime_config['exchange_name'], runtime_config['routing_key'],
enable_retry_cycles, runtime_config['retry_cycle_interval'],
runtime_config['max_retry_time_limit'], runtime_config['dlx_exchange_name']
)
elif dlx_enable:
log.warning(f"Message {msg_id} sent to dead letter exchange after {current_retry} retries")
self._schedule_threadsafe(self._channel.basic_nack, threaded, delivery_tag=delivery_tag, requeue=False)
else:
log.warning(f"No dead letter exchange declared for {runtime_config['queue_name']}, proceeding to drop the message -- reflect on your life choices! byebye")
log.info(f"Dropped message content: {body}")
self._schedule_threadsafe(self._channel.basic_nack, threaded, delivery_tag=delivery_tag, requeue=False)

elif not auto_ack and should_process:
log.info(f'Message ({msg_id}) from {app_id} received and properly processed -- now dance the funky chicken')
self._schedule_threadsafe(self._channel.basic_ack, threaded, delivery_tag=delivery_tag)

@retry(
retry=retry_if_exception_type((
AMQPConnectionError,
Expand All @@ -93,30 +166,32 @@ def setup_blocking_connection(self) -> None:
wait=wait_fixed(2),
before_sleep=before_sleep_log(log, WARNING)
)
def start_consumer(self,
queue_name: str,
callback: Callable | None = None,
callback_args: dict[str, str | int | float | bool] | None = None,
auto_ack: bool = True,
inactivity_timeout: int | None = None, # just let conusmer wait patiently damn it
auto_declare: bool = True,
exchange_name: str | None = None,
exchange_type: str | None = None,
routing_key: str | None = None,
payload_model: Type | None = None,
dlx_enable: bool = True,
dlx_exchange_name: str | None = None,
dlx_routing_key: str | None = None,
use_quorum_queues: bool = True,
enable_retry_cycles: bool = True,
retry_cycle_interval: int = 10, # Minutes between cycles
max_retry_time_limit: int = 60, # Minutes total before permanent DLX
max_queue_length: int | None = None,
max_queue_length_bytes: int | None = None,
queue_overflow: str | None = None,
single_active_consumer: bool | None = None,
lazy_queue: bool | None = None,
) -> None:
def start_consumer(
self,
queue_name: str,
callback: Callable | None = None,
callback_args: dict[str, str | int | float | bool] | None = None,
auto_ack: bool = True,
inactivity_timeout: int | None = None,
auto_declare: bool = True,
exchange_name: str | None = None,
exchange_type: str | None = None,
routing_key: str | None = None,
payload_model: Type | None = None,
dlx_enable: bool = True,
dlx_exchange_name: str | None = None,
dlx_routing_key: str | None = None,
use_quorum_queues: bool = True,
enable_retry_cycles: bool = True,
retry_cycle_interval: int = 10,
max_retry_time_limit: int = 60,
max_queue_length: int | None = None,
max_queue_length_bytes: int | None = None,
queue_overflow: str | None = None,
single_active_consumer: bool | None = None,
lazy_queue: bool | None = None,
threaded: bool = False
) -> None:
"""
Start the consumer using blocking setup.
:param str queue_name: The queue to consume from
Expand Down Expand Up @@ -144,7 +219,6 @@ def start_consumer(self,
:param bool single_active_consumer: Only one consumer processes at a time
:param bool lazy_queue: Store messages on disk to save memory
"""
# Connect and start the I/O loop
self.setup_blocking_connection()

if auto_declare:
Expand All @@ -165,95 +239,55 @@ def start_consumer(self,
queue_overflow=queue_overflow,
single_active_consumer=single_active_consumer,
lazy_queue=lazy_queue
)

)
if not self.auto_declare_ok:
raise MrsalAbortedSetup('Auto declaration for the connection setup failed and is aborted')

# Log consumer configuration
consumer_config = {
"queue": queue_name,
"exchange": exchange_name,
"max_length": max_queue_length or self.max_queue_length,
"overflow": queue_overflow or self.queue_overflow,
"single_consumer": single_active_consumer if single_active_consumer is not None else self.single_active_consumer,
"lazy": lazy_queue if lazy_queue is not None else self.lazy_queue
raise MrsalAbortedSetup('Auto declaration failed')

runtime_config = {
'callback': callback,
'callback_args': callback_args,
'auto_ack': auto_ack,
'payload_model': payload_model,
'threaded': threaded,
'dlx_enable': dlx_enable,
'enable_retry_cycles': enable_retry_cycles,
'retry_cycle_interval': retry_cycle_interval,
'max_retry_time_limit': max_retry_time_limit,
'exchange_name': exchange_name,
'routing_key': routing_key,
'dlx_exchange_name': dlx_exchange_name,
'queue_name': queue_name,
}

log.info(f"Straight out of the swamps -- consumer boi listening with config: {consumer_config}")

log.info(f"""
Straight out of the swamps -- consumer boi listening with config:
auto_ack: {auto_ack}
threaded: {threaded}
DLX: {dlx_enable}
retry cycles: {enable_retry_cycles}
retry interval: {retry_cycle_interval}
max retry time: {max_retry_time_limit}
DLX name: {dlx_exchange_name}
""")

try:
for method_frame, properties, body in self._channel.consume(
queue=queue_name, auto_ack=auto_ack, inactivity_timeout=inactivity_timeout):
queue=queue_name, auto_ack=auto_ack, inactivity_timeout=inactivity_timeout):

if method_frame:
if properties:
app_id = properties.app_id if hasattr(properties, 'app_id') else 'no AppID given'
msg_id = properties.message_id if hasattr(properties, 'message_id') else 'no msgID given'
delivery_tag = method_frame.delivery_tag

if self.verbose:
log.info(
f"""
Message received with:
- Method Frame: {method_frame}
- Redelivery: {method_frame.redelivered}
- Exchange: {method_frame.exchange}
- Routing Key: {method_frame.routing_key}
- Delivery Tag: {method_frame.delivery_tag}
- Properties: {properties}
- Auto Ack: {auto_ack}
"""
)
if auto_ack:
log.info(f'I successfully received a message with AutoAck from: {app_id} with messageID: {msg_id}')

current_retry = properties.headers.get('x-delivery-count', 0) if properties.headers else 0
log.info(f"Current retry is: {current_retry}")
should_process = True

if payload_model:
try:
self.validate_payload(body, payload_model)
except (ValidationError, json.JSONDecodeError, UnicodeDecodeError, TypeError) as e:
log.error(f"Oh lordy lord, payload validation failed for your specific model requirements: {e}")
should_process = False

if callback and should_process:
try:
if callback_args:
callback(*callback_args, method_frame, properties, body)
else:
callback(method_frame, properties, body)
except Exception as e:
log.error(f"Callback method failure: {e}")
log.error(f"Oh lordy lord message {msg_id} from {app_id} failed while running callback")
should_process = False

if not should_process and not auto_ack:
if dlx_enable and enable_retry_cycles:
# Use retry cycle logic
self._publish_to_dlx_with_retry_cycle(
method_frame, properties, body, "Callback processing failed",
exchange_name, routing_key, enable_retry_cycles,
retry_cycle_interval, max_retry_time_limit, dlx_exchange_name
)
elif dlx_enable:
# Original DLX behavior
self._channel.basic_nack(delivery_tag=delivery_tag, requeue=False)
log.warning(f"Message {msg_id} sent to dead letter exchange after {current_retry} retries")
log.info(f"Its the fault of polictial ideology imposing! Strangle an influencer if it makes you feel better!")
else:
self._channel.basic_nack(delivery_tag=delivery_tag, requeue=False)
log.warning(f"No dead letter exchange declared for {queue_name}, proceeding to drop the message -- reflect on your life choices! byebye")
log.info(f"Dropped message content: {body}")
continue

if not auto_ack and should_process:
log.info(f'Message ({msg_id}) from {app_id} received and properly processed -- now dance the funky chicken')
self._channel.basic_ack(delivery_tag=delivery_tag)

if threaded:
log.info("Threaded processes started to ensure heartbeat during long processes -- sauber!")
t = threading.Thread(
target=self._process_single_message,
args=(method_frame, properties, body, runtime_config),
daemon=True
)
t.start()
else:
self._process_single_message(method_frame, properties, body, runtime_config)
except (AMQPConnectionError, ConnectionClosedByBroker, StreamLostError) as e:
log.error(f"Ooooooopsie! I caught a connection error while consuming: {e}")
log.error(f"Ooooooopsie! I caught a connection error while consuming messaiges: {e}")
raise
except Exception as e:
log.error(f'Oh lordy lord! I failed consuming ze messaj with: {e}')
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ license = "GPL-3.0-or-later"
maintainers = ["Jon E Nesvold <[email protected]>"]
name = "mrsal"
readme = "README.md"
version = "3.1.1"
version = "3.2.0"
homepage = "https://github.com/NeoMedSys/mrsal"
repository = "https://github.com/NeoMedSys/mrsal"
documentation = "https://neomedsys.github.io/mrsal/"
Expand Down
Binary file added reports/coverage/.coverage
Binary file not shown.
Loading