Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 176 additions & 107 deletions README.md

Large diffs are not rendered by default.

103 changes: 86 additions & 17 deletions mrsal/amqp/subclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,39 @@ def start_consumer(self,
retry_cycle_interval: int = 10, # Minutes between cycles
max_retry_time_limit: int = 60, # Minutes total before permanent DLX
immediate_retry_delay: int = 4, # Seconds between immediate retries
max_queue_length: int | None = None,
max_queue_length_bytes: int | None = None,
queue_overflow: str | None = None,
single_active_consumer: bool | None = None,
lazy_queue: bool | None = None,
) -> None:
"""
Start the consumer using blocking setup.
:param queue: The queue to consume from.
:param auto_ack: If True, messages are automatically acknowledged.
:param inactivity_timeout: Timeout for inactivity in the consumer loop.
:param callback: The callback function to process messages.
:param callback_args: Optional arguments to pass to the callback.
:param str queue_name: The queue to consume from
:param Callable callback: The callback function to process messages
:param dict callback_args: Optional arguments to pass to the callback
:param bool auto_ack: If True, messages are automatically acknowledged
:param int inactivity_timeout: Timeout for inactivity in the consumer loop
:param bool auto_declare: If True, will declare exchange/queue before consuming
:param bool passive: If True, only check if exchange/queue exists (False for consumers)
:param str exchange_name: Exchange name for auto_declare
:param str exchange_type: Exchange type for auto_declare
:param str routing_key: Routing key for auto_declare
:param Type payload_model: Pydantic model for payload validation
:param bool requeue: Whether to requeue failed messages
:param bool dlx_enable: Enable dead letter exchange
:param str dlx_exchange_name: Custom DLX exchange name
:param str dlx_routing_key: Custom DLX routing key
:param bool use_quorum_queues: Use quorum queues for durability
:param bool enable_retry_cycles: Enable DLX retry cycles
:param int retry_cycle_interval: Minutes between retry cycles
:param int max_retry_time_limit: Minutes total before permanent DLX
:param int immediate_retry_delay: Seconds between immediate retries
:param int max_queue_length: Maximum number of messages in queue
:param int max_queue_length_bytes: Maximum queue size in bytes
:param str queue_overflow: "drop-head" or "reject-publish"
:param bool single_active_consumer: Only one consumer processes at a time
:param bool lazy_queue: Store messages on disk to save memory
"""
# Connect and start the I/O loop
self.setup_blocking_connection()
Expand All @@ -139,13 +164,28 @@ def start_consumer(self,
dlx_enable=dlx_enable,
dlx_exchange_name=dlx_exchange_name,
dlx_routing_key=dlx_routing_key,
use_quorum_queues=use_quorum_queues
use_quorum_queues=use_quorum_queues,
max_queue_length=max_queue_length,
max_queue_length_bytes=max_queue_length_bytes,
queue_overflow=queue_overflow,
single_active_consumer=single_active_consumer,
lazy_queue=lazy_queue
)

if not self.auto_declare_ok:
raise MrsalAbortedSetup('Auto declaration for the connection setup failed and is aborted')

log.info(f"Straigh out of the swamps -- consumer boi listening on queue: {queue_name} to the exchange {exchange_name}. Waiting for messages...")
# Log consumer configuration
consumer_config = {
"queue": queue_name,
"exchange": exchange_name,
"max_length": max_queue_length or self.max_queue_length,
"overflow": queue_overflow or self.queue_overflow,
"single_consumer": single_active_consumer if single_active_consumer is not None else self.single_active_consumer,
"lazy": lazy_queue if lazy_queue is not None else self.lazy_queue
}

log.info(f"Straight out of the swamps -- consumer boi listening with config: {consumer_config}")

try:
for method_frame, properties, body in self._channel.consume(
Expand Down Expand Up @@ -252,6 +292,7 @@ def publish_message(
exchange_type: str,
queue_name: str,
auto_declare: bool = True,
passive: bool = True,
prop: pika.BasicProperties | None = None,
) -> None:
"""Publish message to the exchange specifying routing key and properties.
Expand Down Expand Up @@ -281,7 +322,8 @@ def publish_message(
exchange_name=exchange_name,
queue_name=queue_name,
exchange_type=exchange_type,
routing_key=routing_key
routing_key=routing_key,
passive=passive
)
try:
# Publish the message by serializing it in json dump
Expand Down Expand Up @@ -311,6 +353,8 @@ def publish_messages(
self,
mrsal_protocol_collection: dict[str, dict[str, str | bytes]],
prop: pika.BasicProperties | None = None,
auto_declare: bool = True,
passive: bool = True
) -> None:
"""Publish message to the exchange specifying routing key and properties.

Expand All @@ -333,14 +377,18 @@ def publish_messages(

if not isinstance(protocol.message, (str, bytes)):
raise MrsalAbortedSetup(f'Your message body needs to be string or bytes or serialized dict')
# connect and use only blocking

# connect and use only blocking
self.setup_blocking_connection()
self._setup_exchange_and_queue(
exchange_name=protocol.exchange_name,
queue_name=protocol.queue_name,
exchange_type=protocol.exchange_type,
routing_key=protocol.routing_key
)

if auto_declare:
self._setup_exchange_and_queue(
exchange_name=protocol.exchange_name,
queue_name=protocol.queue_name,
exchange_type=protocol.exchange_type,
routing_key=protocol.routing_key,
passive=passive
)
try:
# Publish the message by serializing it in json dump
# NOTE! we are not dumping a json anymore here! This allows for more flexibility
Expand Down Expand Up @@ -464,6 +512,12 @@ async def start_consumer(
retry_cycle_interval: int = 10, # Minutes between cycles
max_retry_time_limit: int = 60, # Minutes total before permanent DLX
immediate_retry_delay: int = 4, # Seconds between immediate retries
max_queue_length: int | None = None,
max_queue_length_bytes: int | None = None,
queue_overflow: str | None = None,
single_active_consumer: bool | None = None,
lazy_queue: bool | None = None,

):
"""Start the async consumer with the provided setup."""
retry_counts = {}
Expand Down Expand Up @@ -492,15 +546,30 @@ async def start_consumer(
dlx_enable=dlx_enable,
dlx_exchange_name=dlx_exchange_name,
dlx_routing_key=dlx_routing_key,
use_quorum_queues=use_quorum_queues
use_quorum_queues=use_quorum_queues,
max_queue_length=max_queue_length,
max_queue_length_bytes=max_queue_length_bytes,
queue_overflow=queue_overflow,
single_active_consumer=single_active_consumer,
lazy_queue=lazy_queue
)

if not self.auto_declare_ok:
if self._connection:
await self._connection.close()
raise MrsalAbortedSetup('Auto declaration failed during setup.')

log.info(f"Straight out of the swamps -- Consumer boi listening on queue: {queue_name}, exchange: {exchange_name}")
# Log consumer configuration
consumer_config = {
"queue": queue_name,
"exchange": exchange_name,
"max_length": max_queue_length or self.max_queue_length,
"overflow": queue_overflow or self.queue_overflow,
"single_consumer": single_active_consumer if single_active_consumer is not None else self.single_active_consumer,
"lazy": lazy_queue if lazy_queue is not None else self.lazy_queue
}

log.info(f"Straight out of the swamps -- consumer boi listening with config: {consumer_config}")

# async with queue.iterator() as queue_iter:
async for message in queue.iterator():
Expand Down
89 changes: 72 additions & 17 deletions mrsal/superclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ class Mrsal:
:param int heartbeat: Controls RabbitMQ's server heartbeat timeout negotiation
:param int prefetch_count: Specifies a prefetch window in terms of whole messages.
:param bool ssl: Set this flag to true if you want to connect externally to the rabbit server.
:param int max_queue_length: Maximum number of messages in queue before overflow behavior triggers
:param int max_queue_length_bytes: Maximum queue size in bytes (optional)
:param str queue_overflow: Behavior when queue is full - "drop-head" or "reject-publish"
:param bool single_active_consumer: If True, only one consumer processes messages at a time
:param bool lazy_queue: If True, messages are stored on disk to save memory
"""

host: str
Expand All @@ -46,6 +51,11 @@ class Mrsal:
dlx_exchange_name = None
max_retries: int = 3
use_quorum_queues: bool = True
max_queue_length: int = 10000 # Good default for most use cases
max_queue_length_bytes: int | None = None # Optional memory limit
queue_overflow: str = "drop-head" # Drop old messages by default
single_active_consumer: bool = False # Allow parallel processing
lazy_queue: bool = False # Keep messages in RAM for speed
_connection = None
_channel = None

Expand All @@ -69,9 +79,16 @@ def _setup_exchange_and_queue(self,
passive: bool = False, internal: bool = False,
auto_delete: bool = False, exclusive: bool = False,
dlx_enable: bool = True, dlx_exchange_name: str | None = None,
dlx_routing_key: str | None = None, use_quorum_queues: bool = True
dlx_routing_key: str | None = None, use_quorum_queues: bool = True,
max_queue_length: int | None = None,
max_queue_length_bytes: int | None = None,
queue_overflow: str | None = None,
single_active_consumer: bool | None = None,
lazy_queue: bool | None = None
) -> None:

if queue_args is None:
queue_args = {}

if dlx_enable:
dlx_name = dlx_exchange_name or f"{exchange_name}.dlx"
Expand All @@ -92,26 +109,41 @@ def _setup_exchange_and_queue(self,
except MrsalSetupError as e:
log.warning(f"DLX {dlx_name} might already exist or failed to create: {e}")

if queue_args is None:
queue_args = {}

queue_args.update({
'x-dead-letter-exchange': dlx_name,
'x-dead-letter-routing-key': dlx_routing
})

if use_quorum_queues:
if queue_args is None:
queue_args = {}

queue_args.update({
'x-queue-type': 'quorum',
'x-quorum-initial-group-size': 3 # Good default for 3+ node clusters
'x-quorum-initial-group-size': 3
})

if self.verbose:
log.info(f"Queue {queue_name} configured as quorum queue for enhanced reliability")

# Add max length settings
if max_queue_length and max_queue_length > 0:
queue_args['x-max-length'] = max_queue_length

if max_queue_length_bytes and max_queue_length_bytes > 0:
queue_args['x-max-length-bytes'] = max_queue_length_bytes

# Add overflow behavior
if queue_overflow in ["drop-head", "reject-publish"]:
queue_args['x-overflow'] = queue_overflow

# Add single active consumer
if single_active_consumer:
queue_args['x-single-active-consumer'] = True

# Add lazy queue setting
if lazy_queue:
queue_args['x-queue-mode'] = 'lazy'

if self.verbose and queue_args:
log.info(f"Queue {queue_name} configured with arguments: {queue_args}")

declare_exhange_dict = {
'exchange': exchange_name,
Expand Down Expand Up @@ -161,12 +193,20 @@ async def _async_setup_exchange_and_queue(self,
dlx_enable: bool = True,
dlx_exchange_name: str | None = None,
dlx_routing_key: str | None = None,
use_quorum_queues: bool = True
use_quorum_queues: bool = True,
max_queue_length: int | None = None,
max_queue_length_bytes: int | None = None,
queue_overflow: str | None = None,
single_active_consumer: bool | None = None,
lazy_queue: bool | None = None
) -> AioQueue | None:
"""Setup exchange and queue with bindings asynchronously."""
if not self._connection:
raise MrsalAbortedSetup("Oh my Oh my! Connection not found when trying to run the setup!")

if queue_args is None:
queue_args = {}

if dlx_enable:
dlx_name = dlx_exchange_name or f"{exchange_name}.dlx"
dlx_routing = dlx_routing_key or routing_key
Expand All @@ -188,18 +228,12 @@ async def _async_setup_exchange_and_queue(self,
except MrsalSetupError as e:
log.warning(f"DLX {dlx_name} might already exist or failed to create: {e}")

if queue_args is None:
queue_args = {}

queue_args.update({
'x-dead-letter-exchange': dlx_name,
'x-dead-letter-routing-key': dlx_routing
})

if use_quorum_queues:
if queue_args is None:
queue_args = {}

queue_args.update({
'x-queue-type': 'quorum',
'x-quorum-initial-group-size': 3 # Good default for 3+ node clusters
Expand All @@ -208,6 +242,27 @@ async def _async_setup_exchange_and_queue(self,
if self.verbose:
log.info(f"Queue {queue_name} configured as quorum queue for enhanced reliability")

if max_queue_length and max_queue_length > 0:
queue_args['x-max-length'] = max_queue_length

if max_queue_length_bytes and max_queue_length_bytes > 0:
queue_args['x-max-length-bytes'] = max_queue_length_bytes

# Add overflow behavior
if queue_overflow and queue_overflow in ["drop-head", "reject-publish"]:
queue_args['x-overflow'] = queue_overflow

# Add single active consumer
if single_active_consumer:
queue_args['x-single-active-consumer'] = True

# Add lazy queue setting
if lazy_queue:
queue_args['x-queue-mode'] = 'lazy'

if self.verbose and queue_args:
log.info(f"Queue {queue_name} configured with arguments: {queue_args}")

async_declare_exhange_dict = {
'exchange': exchange_name,
'exchange_type': exchange_type,
Expand Down Expand Up @@ -562,7 +617,7 @@ def _handle_dlx_with_retry_cycle_sync(
self, method_frame, properties, body, processing_error: str,
original_exchange: str, original_routing_key: str,
enable_retry_cycles: bool, retry_cycle_interval: int,
max_retry_time_limit: int, dlx_exchange_name: str | None):
max_retry_time_limit: int, dlx_exchange_name: str | None):
"""Base method for DLX handling with retry cycles."""
# Get retry info
retry_info = self._get_retry_cycle_info(properties)
Expand Down Expand Up @@ -606,7 +661,7 @@ async def _handle_dlx_with_retry_cycle_async(
self, message, properties, processing_error: str,
original_exchange: str, original_routing_key: str,
enable_retry_cycles: bool, retry_cycle_interval: int,
max_retry_time_limit: int, dlx_exchange_name: str | None):
max_retry_time_limit: int, dlx_exchange_name: str | None):
"""Base method for DLX handling with retry cycles."""
# Get retry info
retry_info = self._get_retry_cycle_info(properties)
Expand Down
33 changes: 22 additions & 11 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
[tool.poetry]
authors = ["Jon E. Nesvold <jnesvold@pm.me>"]
description = "Mrsal is a simple to use message broker abstraction on top of RabbitMQ and Pika."
keywords = ["message broker", "RabbitMQ", "Pika", "Mrsal"]
license = ""
maintainers = ["Jon E Nesvold <jnesvold@pm.me>"]
authors = ["Jon E. Nesvold <jnesvold@neomedsys.io>"]
description = "Production-ready AMQP message broker abstraction with advanced retry logic, dead letter exchanges, and high availability features."
keywords = ["message broker", "RabbitMQ", "Pika", "Mrsal", "AMQP", "async", "dead letter exchange"]
license = "GPL-3.0-or-later"
maintainers = ["Jon E Nesvold <jnesvold@neomedsys.io>"]
name = "mrsal"
readme = "README.md"
version = "2.0.2"
version = "2.1.0"
homepage = "https://github.com/NeoMedSys/mrsal"
repository = "https://github.com/NeoMedSys/mrsal"
documentation = "https://neomedsys.github.io/mrsal/"

classifiers = [
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
"Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: System :: Networking",
"Topic :: Communications",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Operating System :: POSIX :: Linux"
]

[tool.poetry.dependencies]
colorlog = "^6.7.0"
Expand All @@ -29,8 +45,3 @@ pytest-asyncio = "^0.24.0"
[build-system]
build-backend = "poetry.core.masonry.api"
requires = ["poetry-core>=1.0.0"]

[[tool.poetry.source]]
name = "neomedsys"
url = "https://pypi.neomedsys.dev/simple"
priority = "supplemental"
Binary file modified reports/coverage/.coverage
Binary file not shown.
Loading