Skip to content

Commit 02eeca0

Browse files
committed
fix(jobs): ack MqWatcher messages so prefetched buffer is not logged as lost on shutdown
MqWatcher.consumes signals from RabbitMQ purely to wake up a DB-driven processor; data lives in pending_txs / pending_messages, not in the message body. Using no_ack=True meant aio_pika logged a warning per prefetched message still in the iterator buffer at shutdown ("Message ... lost for consumer with no_ack ..."), which was alarming on Ctrl-C even though nothing was actually lost. Switch to no_ack=False with manual ack. Acked messages are gone; any prefetched-but-not-yet-iterated messages are nack'd with requeue=True by aio_pika's iterator close path, so they reappear on next startup and trigger the watcher.
1 parent c9320fc commit 02eeca0

1 file changed

Lines changed: 6 additions & 2 deletions

File tree

src/aleph/jobs/job_utils.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,12 @@ def __init__(self, mq_queue: aio_pika.abc.AbstractQueue):
157157
self._event = asyncio.Event()
158158

159159
async def _check_for_message(self):
160-
async with self.mq_queue.iterator(no_ack=True) as queue_iter:
161-
async for _ in queue_iter:
160+
# Manual ack (no_ack=False) so that prefetched messages still in the
161+
# iterator buffer on shutdown are nack'd with requeue=True by aio_pika,
162+
# instead of being logged as "lost for consumer with no_ack".
163+
async with self.mq_queue.iterator(no_ack=False) as queue_iter:
164+
async for message in queue_iter:
165+
await message.ack()
162166
self._event.set()
163167

164168
async def __aenter__(self):

0 commit comments

Comments
 (0)