fix: silence shutdown noise from MqWatcher and Web3 provider#1154
fix: silence shutdown noise from MqWatcher and Web3 provider#1154odesenfans wants to merge 3 commits into
Conversation
…as lost on shutdown
MqWatcher.consumes signals from RabbitMQ purely to wake up a DB-driven
processor; data lives in pending_txs / pending_messages, not in the
message body. Using no_ack=True meant aio_pika logged a warning per
prefetched message still in the iterator buffer at shutdown
("Message ... lost for consumer with no_ack ..."), which was alarming
on Ctrl-C even though nothing was actually lost.
Switch to no_ack=False with manual ack. Acked messages are gone; any
prefetched-but-not-yet-iterated messages are nack'd with requeue=True
by aio_pika's iterator close path, so they reappear on next startup
and trigger the watcher.
web3.AsyncHTTPProvider caches an aiohttp.ClientSession in its HTTPSessionManager. The session was only closed in EthereumConnector.packer's finally block, which never runs on reader-only nodes (no packing_node), so Ctrl-C produced "Unclosed client session" / "Unclosed connector" warnings from aiohttp's garbage collector. Make EthereumConnector an async context manager whose __aexit__ calls provider.disconnect(). Make ChainConnector an async context manager that delegates to every registered chain connector via an internal AsyncExitStack (dedup'd by id() since EthereumConnector is in both readers and writers). Register chain_connector with the main AsyncExitStack in commands.py, next to the other shutdown callbacks. Drop the duplicate provider.disconnect() from packer's finally now that the context manager owns the cleanup.
foxpatch-aleph
left a comment
There was a problem hiding this comment.
Clean follow-up PR that eliminates two sources of shutdown noise: MqWatcher warning from aio_pika on prefetched unacked messages, and unclosed aiohttp session from the Web3 provider. Both fixes are correct and minimal. The ChainConnector async context manager properly deduplicates by identity and skips connectors without cleanup needs. No logic errors or regressions found.
src/aleph/chains/connector.py (line 50): The id() dedup is safe here since connectors are long-lived singletons, but worth noting that this relies on object identity stability. If a connector were ever recreated with a new identity but the same logical instance, the dedup could break. Consider using the connector object itself as a dict key instead of id(connector) for slightly more robustness, though this is a nit.
…ows the type
hasattr("__aexit__") is opaque to mypy, which then rejected the
ChainReader being passed to AsyncExitStack.enter_async_context.
AbstractAsyncContextManager has a __subclasshook__ that duck-types
on __aenter__/__aexit__, so isinstance() works at runtime and gives
mypy the type narrowing it needs.
foxpatch-aleph
left a comment
There was a problem hiding this comment.
Well-targeted PR that fixes two sources of noisy shutdown warnings. The MqWatcher change (no_ack=False with explicit ack) correctly avoids aio_pika's 'message lost' warning while preserving message delivery via requeue. The EthereumConnector/ChainConnector async context manager refactor ensures the aiohttp ClientSession is properly closed on shutdown even for reader-only nodes, removing the 'Unclosed client session' error. The deduplication by id() in ChainConnector.aenter correctly handles connectors registered as both reader and writer. The removal of the redundant provider.disconnect() from packer's finally block is a good cleanup since the context manager handles it now. All changes are well-reasoned and correctly implemented.
Summary
Two small follow-ups to the earlier shutdown-cleanup PRs (#1139 - #1145) that eliminate two remaining classes of noisy log output observed when Ctrl-C'ing pyaleph during a fresh sync.
1. MqWatcher prefetched-message warnings
Every Ctrl-C produced a flurry of:
MqWatcher._check_for_messageopened the queue iterator withno_ack=Trueeven though it only uses messages as level-triggered signals (the actual data lives inpending_txs/pending_messages). aio_pika unconditionally logs a warning per prefetched message still in the iterator buffer at close time whenno_ack=True.Switched to
no_ack=Falsewith explicitawait message.ack(). Buffered messages are now nack'd withrequeue=Trueon shutdown, so nothing is lost and the warning disappears.2. Unclosed aiohttp session from the ETH Web3 provider
After SIGTERM:
web3.AsyncHTTPProvidercaches anaiohttp.ClientSessionper endpoint in itsHTTPSessionManager. The provider exposesdisconnect()to close it, but the only call site was insideEthereumConnector.packer'sfinally. Reader-only nodes (nopacking_node) never reach it, so the session leaks at shutdown.EthereumConnectoris now an async context manager whose__aexit__callsprovider.disconnect().ChainConnectoris also an async context manager. It owns an internalAsyncExitStackand enters every chain connector that supports the protocol, dedup'd byid()sinceEthereumConnectoris registered as both reader and writer.commands.pyregisterschain_connectorwith the mainAsyncExitStack, alongside the other shutdown callbacks added in the previous PRs.provider.disconnect()inpacker'sfinallyis removed (cleanup now lives on the context manager).Test plan
hatch run testing:test tests/chains tests/message_processing(104 passed, 3 skipped)just start-dev-env && just run, wait for ETH messages to arrive, Ctrl-C, confirm the two warning families are gone