From 02eeca0461cdc805263575e337a93ff5a4a43941 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Wed, 20 May 2026 01:12:53 +0200 Subject: [PATCH 1/3] fix(jobs): ack MqWatcher messages so prefetched buffer is not logged as lost on shutdown MqWatcher.consumes signals from RabbitMQ purely to wake up a DB-driven processor; data lives in pending_txs / pending_messages, not in the message body. Using no_ack=True meant aio_pika logged a warning per prefetched message still in the iterator buffer at shutdown ("Message ... lost for consumer with no_ack ..."), which was alarming on Ctrl-C even though nothing was actually lost. Switch to no_ack=False with manual ack. Acked messages are gone; any prefetched-but-not-yet-iterated messages are nack'd with requeue=True by aio_pika's iterator close path, so they reappear on next startup and trigger the watcher. --- src/aleph/jobs/job_utils.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/aleph/jobs/job_utils.py b/src/aleph/jobs/job_utils.py index ca7eaabf6..5b254910e 100644 --- a/src/aleph/jobs/job_utils.py +++ b/src/aleph/jobs/job_utils.py @@ -157,8 +157,12 @@ def __init__(self, mq_queue: aio_pika.abc.AbstractQueue): self._event = asyncio.Event() async def _check_for_message(self): - async with self.mq_queue.iterator(no_ack=True) as queue_iter: - async for _ in queue_iter: + # Manual ack (no_ack=False) so that prefetched messages still in the + # iterator buffer on shutdown are nack'd with requeue=True by aio_pika, + # instead of being logged as "lost for consumer with no_ack". + async with self.mq_queue.iterator(no_ack=False) as queue_iter: + async for message in queue_iter: + await message.ack() self._event.set() async def __aenter__(self): From 0a2189dd3ef556278dd404f44fc0e84613dab05e Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Wed, 20 May 2026 01:13:01 +0200 Subject: [PATCH 2/3] fix(chains): close Web3 provider session on shutdown via context manager web3.AsyncHTTPProvider caches an aiohttp.ClientSession in its HTTPSessionManager. The session was only closed in EthereumConnector.packer's finally block, which never runs on reader-only nodes (no packing_node), so Ctrl-C produced "Unclosed client session" / "Unclosed connector" warnings from aiohttp's garbage collector. Make EthereumConnector an async context manager whose __aexit__ calls provider.disconnect(). Make ChainConnector an async context manager that delegates to every registered chain connector via an internal AsyncExitStack (dedup'd by id() since EthereumConnector is in both readers and writers). Register chain_connector with the main AsyncExitStack in commands.py, next to the other shutdown callbacks. Drop the duplicate provider.disconnect() from packer's finally now that the context manager owns the cleanup. --- src/aleph/chains/connector.py | 18 ++++++ src/aleph/chains/ethereum.py | 118 +++++++++++++++++----------------- src/aleph/commands.py | 1 + 3 files changed, 78 insertions(+), 59 deletions(-) diff --git a/src/aleph/chains/connector.py b/src/aleph/chains/connector.py index 9d5c2aff3..8b9b8f495 100644 --- a/src/aleph/chains/connector.py +++ b/src/aleph/chains/connector.py @@ -1,5 +1,6 @@ import asyncio import logging +from contextlib import AsyncExitStack from typing import Dict, Self, Union from aleph_message.models import Chain @@ -39,6 +40,23 @@ def __init__( self.readers = {} self.writers = {} + self._exit_stack = AsyncExitStack() + + async def __aenter__(self) -> Self: + await self._exit_stack.__aenter__() + # A connector can be both a reader and a writer; dedupe by identity so + # __aexit__ runs at most once per instance. + seen: set[int] = set() + for connector in (*self.readers.values(), *self.writers.values()): + if id(connector) in seen: + continue + seen.add(id(connector)) + if hasattr(connector, "__aexit__"): + await self._exit_stack.enter_async_context(connector) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb) @classmethod async def new( diff --git a/src/aleph/chains/ethereum.py b/src/aleph/chains/ethereum.py index 218b31bc0..41aa18e9a 100644 --- a/src/aleph/chains/ethereum.py +++ b/src/aleph/chains/ethereum.py @@ -108,6 +108,13 @@ def __init__( pending_tx_publisher=pending_tx_publisher, ) + async def __aenter__(self) -> Self: + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + # Closes the aiohttp ClientSession cached by AsyncHTTPProvider. + await self.web3_client.provider.disconnect() + @classmethod async def new( cls, @@ -377,68 +384,61 @@ async def broadcast_messages( ) async def packer(self, config: Config): - try: - pri_key = HexBytes(config.ethereum.private_key.value) - account = Account.from_key(pri_key) - address = account.address + pri_key = HexBytes(config.ethereum.private_key.value) + account = Account.from_key(pri_key) + address = account.address - LOGGER.info("Ethereum Connector set up with address %s" % address) - i = 0 - while True: - with self.session_factory() as session: - # Wait for sync operations to complete - if (count_pending_txs(session=session, chain=Chain.ETH)) or ( - count_pending_messages(session=session, chain=Chain.ETH) - ) > 1000: - await asyncio.sleep(30) - continue - - if i >= 100: - await asyncio.sleep(30) # wait three (!!) blocks - i = 0 - - nonce = await self.web3_client.eth.get_transaction_count( - account.address - ) + LOGGER.info("Ethereum Connector set up with address %s" % address) + i = 0 + while True: + with self.session_factory() as session: + # Wait for sync operations to complete + if (count_pending_txs(session=session, chain=Chain.ETH)) or ( + count_pending_messages(session=session, chain=Chain.ETH) + ) > 1000: + await asyncio.sleep(30) + continue + + if i >= 100: + await asyncio.sleep(30) # wait three (!!) blocks + i = 0 + + nonce = await self.web3_client.eth.get_transaction_count( + account.address + ) - # Collect all unconfirmed messages using pagination - max_unconfirmed = config.aleph.jobs.max_unconfirmed_messages.value - all_messages = [] - offset = 0 - while True: - batch = list( - get_unconfirmed_messages( - session=session, - limit=500, - offset=offset, - ) + # Collect all unconfirmed messages using pagination + max_unconfirmed = config.aleph.jobs.max_unconfirmed_messages.value + all_messages = [] + offset = 0 + while True: + batch = list( + get_unconfirmed_messages( + session=session, + limit=500, + offset=offset, ) - if not batch: - break - all_messages.extend(batch) - offset += len(batch) - if len(batch) < 500 or len(all_messages) >= max_unconfirmed: - break - all_messages = all_messages[:max_unconfirmed] - - if all_messages: - LOGGER.info( - "Chain sync: %d unconfirmed messages" % len(all_messages) ) + if not batch: + break + all_messages.extend(batch) + offset += len(batch) + if len(batch) < 500 or len(all_messages) >= max_unconfirmed: + break + all_messages = all_messages[:max_unconfirmed] - try: - response = await self.broadcast_messages( - account=account, - messages=all_messages, - nonce=nonce, - ) - LOGGER.info("Broadcast %r on %s" % (response, Chain.ETH.value)) - except Exception: - LOGGER.exception( - "Error while broadcasting messages to Ethereum" - ) + if all_messages: + LOGGER.info("Chain sync: %d unconfirmed messages" % len(all_messages)) + + try: + response = await self.broadcast_messages( + account=account, + messages=all_messages, + nonce=nonce, + ) + LOGGER.info("Broadcast %r on %s" % (response, Chain.ETH.value)) + except Exception: + LOGGER.exception("Error while broadcasting messages to Ethereum") - await asyncio.sleep(config.ethereum.commit_delay.value) - i += 1 - finally: - await self.web3_client.provider.disconnect() + await asyncio.sleep(config.ethereum.commit_delay.value) + i += 1 diff --git a/src/aleph/commands.py b/src/aleph/commands.py index 4cee56357..79f247d0b 100644 --- a/src/aleph/commands.py +++ b/src/aleph/commands.py @@ -184,6 +184,7 @@ async def main(args: List[str]) -> None: pending_tx_publisher=pending_tx_publisher, chain_data_service=chain_data_service, ) + await stack.enter_async_context(chain_connector) await repair_node( storage_service=storage_service, session_factory=session_factory From 967c603103c614b1c33f26bc5b1bcb6c80b5d40e Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Wed, 20 May 2026 01:20:08 +0200 Subject: [PATCH 3/3] fix(chains): use isinstance(AbstractAsyncContextManager) so mypy narrows the type hasattr("__aexit__") is opaque to mypy, which then rejected the ChainReader being passed to AsyncExitStack.enter_async_context. AbstractAsyncContextManager has a __subclasshook__ that duck-types on __aenter__/__aexit__, so isinstance() works at runtime and gives mypy the type narrowing it needs. --- src/aleph/chains/connector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/aleph/chains/connector.py b/src/aleph/chains/connector.py index 8b9b8f495..dd11be6e8 100644 --- a/src/aleph/chains/connector.py +++ b/src/aleph/chains/connector.py @@ -1,6 +1,6 @@ import asyncio import logging -from contextlib import AsyncExitStack +from contextlib import AbstractAsyncContextManager, AsyncExitStack from typing import Dict, Self, Union from aleph_message.models import Chain @@ -51,7 +51,7 @@ async def __aenter__(self) -> Self: if id(connector) in seen: continue seen.add(id(connector)) - if hasattr(connector, "__aexit__"): + if isinstance(connector, AbstractAsyncContextManager): await self._exit_stack.enter_async_context(connector) return self