Skip to content

Commit 0a2189d

Browse files
committed
fix(chains): close Web3 provider session on shutdown via context manager
web3.AsyncHTTPProvider caches an aiohttp.ClientSession in its HTTPSessionManager. The session was only closed in EthereumConnector.packer's finally block, which never runs on reader-only nodes (no packing_node), so Ctrl-C produced "Unclosed client session" / "Unclosed connector" warnings from aiohttp's garbage collector. Make EthereumConnector an async context manager whose __aexit__ calls provider.disconnect(). Make ChainConnector an async context manager that delegates to every registered chain connector via an internal AsyncExitStack (dedup'd by id() since EthereumConnector is in both readers and writers). Register chain_connector with the main AsyncExitStack in commands.py, next to the other shutdown callbacks. Drop the duplicate provider.disconnect() from packer's finally now that the context manager owns the cleanup.
1 parent 02eeca0 commit 0a2189d

3 files changed

Lines changed: 78 additions & 59 deletions

File tree

src/aleph/chains/connector.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import logging
3+
from contextlib import AsyncExitStack
34
from typing import Dict, Self, Union
45

56
from aleph_message.models import Chain
@@ -39,6 +40,23 @@ def __init__(
3940

4041
self.readers = {}
4142
self.writers = {}
43+
self._exit_stack = AsyncExitStack()
44+
45+
async def __aenter__(self) -> Self:
46+
await self._exit_stack.__aenter__()
47+
# A connector can be both a reader and a writer; dedupe by identity so
48+
# __aexit__ runs at most once per instance.
49+
seen: set[int] = set()
50+
for connector in (*self.readers.values(), *self.writers.values()):
51+
if id(connector) in seen:
52+
continue
53+
seen.add(id(connector))
54+
if hasattr(connector, "__aexit__"):
55+
await self._exit_stack.enter_async_context(connector)
56+
return self
57+
58+
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
59+
await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb)
4260

4361
@classmethod
4462
async def new(

src/aleph/chains/ethereum.py

Lines changed: 59 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,13 @@ def __init__(
108108
pending_tx_publisher=pending_tx_publisher,
109109
)
110110

111+
async def __aenter__(self) -> Self:
112+
return self
113+
114+
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
115+
# Closes the aiohttp ClientSession cached by AsyncHTTPProvider.
116+
await self.web3_client.provider.disconnect()
117+
111118
@classmethod
112119
async def new(
113120
cls,
@@ -377,68 +384,61 @@ async def broadcast_messages(
377384
)
378385

379386
async def packer(self, config: Config):
380-
try:
381-
pri_key = HexBytes(config.ethereum.private_key.value)
382-
account = Account.from_key(pri_key)
383-
address = account.address
387+
pri_key = HexBytes(config.ethereum.private_key.value)
388+
account = Account.from_key(pri_key)
389+
address = account.address
384390

385-
LOGGER.info("Ethereum Connector set up with address %s" % address)
386-
i = 0
387-
while True:
388-
with self.session_factory() as session:
389-
# Wait for sync operations to complete
390-
if (count_pending_txs(session=session, chain=Chain.ETH)) or (
391-
count_pending_messages(session=session, chain=Chain.ETH)
392-
) > 1000:
393-
await asyncio.sleep(30)
394-
continue
395-
396-
if i >= 100:
397-
await asyncio.sleep(30) # wait three (!!) blocks
398-
i = 0
399-
400-
nonce = await self.web3_client.eth.get_transaction_count(
401-
account.address
402-
)
391+
LOGGER.info("Ethereum Connector set up with address %s" % address)
392+
i = 0
393+
while True:
394+
with self.session_factory() as session:
395+
# Wait for sync operations to complete
396+
if (count_pending_txs(session=session, chain=Chain.ETH)) or (
397+
count_pending_messages(session=session, chain=Chain.ETH)
398+
) > 1000:
399+
await asyncio.sleep(30)
400+
continue
401+
402+
if i >= 100:
403+
await asyncio.sleep(30) # wait three (!!) blocks
404+
i = 0
405+
406+
nonce = await self.web3_client.eth.get_transaction_count(
407+
account.address
408+
)
403409

404-
# Collect all unconfirmed messages using pagination
405-
max_unconfirmed = config.aleph.jobs.max_unconfirmed_messages.value
406-
all_messages = []
407-
offset = 0
408-
while True:
409-
batch = list(
410-
get_unconfirmed_messages(
411-
session=session,
412-
limit=500,
413-
offset=offset,
414-
)
410+
# Collect all unconfirmed messages using pagination
411+
max_unconfirmed = config.aleph.jobs.max_unconfirmed_messages.value
412+
all_messages = []
413+
offset = 0
414+
while True:
415+
batch = list(
416+
get_unconfirmed_messages(
417+
session=session,
418+
limit=500,
419+
offset=offset,
415420
)
416-
if not batch:
417-
break
418-
all_messages.extend(batch)
419-
offset += len(batch)
420-
if len(batch) < 500 or len(all_messages) >= max_unconfirmed:
421-
break
422-
all_messages = all_messages[:max_unconfirmed]
423-
424-
if all_messages:
425-
LOGGER.info(
426-
"Chain sync: %d unconfirmed messages" % len(all_messages)
427421
)
422+
if not batch:
423+
break
424+
all_messages.extend(batch)
425+
offset += len(batch)
426+
if len(batch) < 500 or len(all_messages) >= max_unconfirmed:
427+
break
428+
all_messages = all_messages[:max_unconfirmed]
428429

429-
try:
430-
response = await self.broadcast_messages(
431-
account=account,
432-
messages=all_messages,
433-
nonce=nonce,
434-
)
435-
LOGGER.info("Broadcast %r on %s" % (response, Chain.ETH.value))
436-
except Exception:
437-
LOGGER.exception(
438-
"Error while broadcasting messages to Ethereum"
439-
)
430+
if all_messages:
431+
LOGGER.info("Chain sync: %d unconfirmed messages" % len(all_messages))
432+
433+
try:
434+
response = await self.broadcast_messages(
435+
account=account,
436+
messages=all_messages,
437+
nonce=nonce,
438+
)
439+
LOGGER.info("Broadcast %r on %s" % (response, Chain.ETH.value))
440+
except Exception:
441+
LOGGER.exception("Error while broadcasting messages to Ethereum")
440442

441-
await asyncio.sleep(config.ethereum.commit_delay.value)
442-
i += 1
443-
finally:
444-
await self.web3_client.provider.disconnect()
443+
await asyncio.sleep(config.ethereum.commit_delay.value)
444+
i += 1

src/aleph/commands.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ async def main(args: List[str]) -> None:
184184
pending_tx_publisher=pending_tx_publisher,
185185
chain_data_service=chain_data_service,
186186
)
187+
await stack.enter_async_context(chain_connector)
187188

188189
await repair_node(
189190
storage_service=storage_service, session_factory=session_factory

0 commit comments

Comments
 (0)