Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# MRSAL AMQP
[![Release](https://img.shields.io/badge/release-2.1.0-blue.svg)](https://pypi.org/project/mrsal/)
[![Release](https://img.shields.io/badge/release-2.1.1-blue.svg)](https://pypi.org/project/mrsal/)
[![Python 3.10+](https://img.shields.io/badge/python-3.10%7C3.11%7C3.12-blue.svg)](https://www.python.org/downloads/)
[![Mrsal Workflow](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml/badge.svg?branch=main)](https://github.com/NeoMedSys/mrsal/actions/workflows/mrsal.yaml)
[![Coverage](https://neomedsys.github.io/mrsal/reports/badges/coverage-badge.svg)](https://neomedsys.github.io/mrsal/reports/coverage/htmlcov/)
Expand Down
3 changes: 1 addition & 2 deletions mrsal/amqp/subclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def publish_message(
exchange_type: str,
queue_name: str,
auto_declare: bool = True,
passive: bool = True,
passive: bool = True,
prop: pika.BasicProperties | None = None,
) -> None:
"""Publish message to the exchange specifying routing key and properties.
Expand Down Expand Up @@ -380,7 +380,6 @@ def publish_messages(

# connect and use only blocking
self.setup_blocking_connection()

if auto_declare:
self._setup_exchange_and_queue(
exchange_name=protocol.exchange_name,
Expand Down
219 changes: 119 additions & 100 deletions mrsal/superclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,66 +89,71 @@ def _setup_exchange_and_queue(self,

if queue_args is None:
queue_args = {}
if not passive:
if dlx_enable:
dlx_name = dlx_exchange_name or f"{exchange_name}.dlx"
dlx_routing = dlx_routing_key or routing_key
try:
self._declare_exchange(
exchange=dlx_name,
exchange_type=exchange_type,
arguments=None,
durable=exch_durable,
passive=passive,
internal=internal,
auto_delete=auto_delete
)
if self.verbose:
log.info(f"Dead letter exchange {dlx_name} declared successfully")

except MrsalSetupError as e:
log.warning(f"DLX {dlx_name} might already exist or failed to create: {e}")

queue_args.update({
'x-dead-letter-exchange': dlx_name,
'x-dead-letter-routing-key': dlx_routing
})

if use_quorum_queues:
queue_args.update({
'x-queue-type': 'quorum',
'x-quorum-initial-group-size': 3
})

if dlx_enable:
dlx_name = dlx_exchange_name or f"{exchange_name}.dlx"
dlx_routing = dlx_routing_key or routing_key
try:
self._declare_exchange(
exchange=dlx_name,
exchange_type=exchange_type,
arguments=None,
durable=exch_durable,
passive=False,
internal=internal,
auto_delete=auto_delete
)
if self.verbose:
log.info(f"Dead letter exchange {dlx_name} declared successfully")

except MrsalSetupError as e:
log.warning(f"DLX {dlx_name} might already exist or failed to create: {e}")

queue_args.update({
'x-dead-letter-exchange': dlx_name,
'x-dead-letter-routing-key': dlx_routing
})

if use_quorum_queues:
queue_args.update({
'x-queue-type': 'quorum',
'x-quorum-initial-group-size': 3
})
log.info(f"Queue {queue_name} configured as quorum queue for enhanced reliability")

if self.verbose:
log.info(f"Queue {queue_name} configured as quorum queue for enhanced reliability")
# Add max length settings
if max_queue_length and max_queue_length > 0:
queue_args['x-max-length'] = max_queue_length

if max_queue_length_bytes and max_queue_length_bytes > 0:
queue_args['x-max-length-bytes'] = max_queue_length_bytes

# Add max length settings
if max_queue_length and max_queue_length > 0:
queue_args['x-max-length'] = max_queue_length

if max_queue_length_bytes and max_queue_length_bytes > 0:
queue_args['x-max-length-bytes'] = max_queue_length_bytes
# Add overflow behavior
if queue_overflow in ["drop-head", "reject-publish"]:
queue_args['x-overflow'] = queue_overflow

# Add overflow behavior
if queue_overflow in ["drop-head", "reject-publish"]:
queue_args['x-overflow'] = queue_overflow
# Add single active consumer
if single_active_consumer:
queue_args['x-single-active-consumer'] = True

# Add single active consumer
if single_active_consumer:
queue_args['x-single-active-consumer'] = True
# Add lazy queue setting
if lazy_queue:
queue_args['x-queue-mode'] = 'lazy'

# Add lazy queue setting
if lazy_queue:
queue_args['x-queue-mode'] = 'lazy'
if self.verbose and queue_args:
log.info(f"Queue {queue_name} configured with arguments: {queue_args}")
else:
queue_args = {}
if self.verbose:
log.info(f"Passive mode: checking existence of queue {queue_name} without configuration")

if self.verbose and queue_args:
log.info(f"Queue {queue_name} configured with arguments: {queue_args}")

declare_exhange_dict = {
'exchange': exchange_name,
'exchange_type': exchange_type,
'arguments': exch_args,
'arguments': exch_args if not passive else None,
'durable': exch_durable,
'passive': passive,
'internal': internal,
Expand All @@ -174,9 +179,13 @@ def _setup_exchange_and_queue(self,
try:
self._declare_exchange(**declare_exhange_dict)
self._declare_queue(**declare_queue_dict)
self._declare_queue_binding(**declare_queue_binding_dict)
if not passive:
self._declare_queue_binding(**declare_queue_binding_dict)
self.auto_declare_ok = True
log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.")
if not passive:
log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.")
else:
log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.")
except MrsalSetupError as e:
log.error(f'Splæt! I failed the declaration setup with {e}', exc_info=True)
self.auto_declare_ok = False
Expand Down Expand Up @@ -207,66 +216,72 @@ async def _async_setup_exchange_and_queue(self,
if queue_args is None:
queue_args = {}

if dlx_enable:
dlx_name = dlx_exchange_name or f"{exchange_name}.dlx"
dlx_routing = dlx_routing_key or routing_key

try:
await self._async_declare_exchange(
exchange=dlx_name,
exchange_type=exchange_type,
arguments=None,
durable=exch_durable,
passive=False,
internal=internal,
auto_delete=auto_delete
)
if not passive:
if dlx_enable:
dlx_name = dlx_exchange_name or f"{exchange_name}.dlx"
dlx_routing = dlx_routing_key or routing_key

try:
await self._async_declare_exchange(
exchange=dlx_name,
exchange_type=exchange_type,
arguments=None,
durable=exch_durable,
passive=passive,
internal=internal,
auto_delete=auto_delete
)

if self.verbose:
log.info(f"Dead letter exchange {dlx_name} declared successfully")

except MrsalSetupError as e:
log.warning(f"DLX {dlx_name} might already exist or failed to create: {e}")

queue_args.update({
'x-dead-letter-exchange': dlx_name,
'x-dead-letter-routing-key': dlx_routing
})

if use_quorum_queues:
queue_args.update({
'x-queue-type': 'quorum',
'x-quorum-initial-group-size': 3 # Good default for 3+ node clusters
})

if self.verbose:
log.info(f"Dead letter exchange {dlx_name} declared successfully")
log.info(f"Queue {queue_name} configured as quorum queue for enhanced reliability")

except MrsalSetupError as e:
log.warning(f"DLX {dlx_name} might already exist or failed to create: {e}")
if max_queue_length and max_queue_length > 0:
queue_args['x-max-length'] = max_queue_length

if max_queue_length_bytes and max_queue_length_bytes > 0:
queue_args['x-max-length-bytes'] = max_queue_length_bytes

queue_args.update({
'x-dead-letter-exchange': dlx_name,
'x-dead-letter-routing-key': dlx_routing
})
# Add overflow behavior
if queue_overflow and queue_overflow in ["drop-head", "reject-publish"]:
queue_args['x-overflow'] = queue_overflow

if use_quorum_queues:
queue_args.update({
'x-queue-type': 'quorum',
'x-quorum-initial-group-size': 3 # Good default for 3+ node clusters
})

if self.verbose:
log.info(f"Queue {queue_name} configured as quorum queue for enhanced reliability")
# Add single active consumer
if single_active_consumer:
queue_args['x-single-active-consumer'] = True

if max_queue_length and max_queue_length > 0:
queue_args['x-max-length'] = max_queue_length

if max_queue_length_bytes and max_queue_length_bytes > 0:
queue_args['x-max-length-bytes'] = max_queue_length_bytes
# Add lazy queue setting
if lazy_queue:
queue_args['x-queue-mode'] = 'lazy'

# Add overflow behavior
if queue_overflow and queue_overflow in ["drop-head", "reject-publish"]:
queue_args['x-overflow'] = queue_overflow

# Add single active consumer
if single_active_consumer:
queue_args['x-single-active-consumer'] = True

# Add lazy queue setting
if lazy_queue:
queue_args['x-queue-mode'] = 'lazy'
if self.verbose and queue_args:
log.info(f"Queue {queue_name} configured with arguments: {queue_args}")
else:
queue_args = {}
if self.verbose:
log.info(f"Passive mode: checking existence of queue {queue_name} without configuration")

if self.verbose and queue_args:
log.info(f"Queue {queue_name} configured with arguments: {queue_args}")

async_declare_exhange_dict = {
'exchange': exchange_name,
'exchange_type': exchange_type,
'arguments': exch_args,
'arguments': exch_args if not passive else None,
'durable': exch_durable,
'passive': passive,
'internal': internal,
Expand All @@ -292,9 +307,13 @@ async def _async_setup_exchange_and_queue(self,
# Declare exchange and queue
exchange = await self._async_declare_exchange(**async_declare_exhange_dict)
queue = await self._async_declare_queue(**async_declare_queue_dict)
await self._async_declare_queue_binding(queue=queue, exchange=exchange, **async_declare_queue_binding_dict)
if not passive:
await self._async_declare_queue_binding(queue=queue, exchange=exchange, **async_declare_queue_binding_dict)
self.auto_declare_ok = True
log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.")
if not passive:
log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.")
else:
log.info(f"Exchange {exchange_name} and Queue {queue_name} set up successfully.")
if dlx_enable:
log.info(f"You have a dead letter exhange {dlx_name} for fault tolerance -- use it well young grasshopper!")
return queue
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ license = "GPL-3.0-or-later"
maintainers = ["Jon E Nesvold <[email protected]>"]
name = "mrsal"
readme = "README.md"
version = "2.1.0"
version = "2.1.1"
homepage = "https://github.com/NeoMedSys/mrsal"
repository = "https://github.com/NeoMedSys/mrsal"
documentation = "https://neomedsys.github.io/mrsal/"
Expand Down
Binary file modified reports/coverage/.coverage
Binary file not shown.
Loading