44import asyncio
55import json
66import logging
7- import weakref
87import aiohttp
98import aiohttp_session
109from aiohttp import WSMessage , web
2019log = logging .getLogger (__name__ )
2120
2221_SEND_CONCURRENCY = 200
23- # Keep one shared send semaphore per event loop.
24- # A plain module-level asyncio.Semaphore can become bound to the first loop that
25- # waits on it and then raise "bound to a different event loop" in later tests
26- # (for example IsolatedAsyncioTestCase creates a fresh loop per test).
27- # Weak refs let loop entries disappear automatically when loops are closed.
28- _send_semaphores : weakref .WeakKeyDictionary [asyncio .AbstractEventLoop , asyncio .Semaphore ] = (
29- weakref .WeakKeyDictionary ()
30- )
31-
32-
33- def _get_send_semaphore () -> asyncio .Semaphore :
34- # Reuse the same semaphore inside a loop to enforce a process-wide send cap
35- # in production (single aiohttp loop) without cross-loop binding errors.
36- loop = asyncio .get_running_loop ()
37- sem = _send_semaphores .get (loop )
38- if sem is None :
39- sem = asyncio .Semaphore (_SEND_CONCURRENCY )
40- _send_semaphores [loop ] = sem
41- return sem
22+ _SEND_TIMEOUT_SECS = 2.0
4223
4324
4425async def get_user (session : aiohttp_session .Session , request : web .Request ) -> User :
@@ -150,9 +131,9 @@ async def process_ws(
150131
151132async def ws_send_str (ws : WebSocketResponse , msg : str ) -> bool :
152133 try :
153- await ws .send_str (msg )
134+ await asyncio . wait_for ( ws .send_str (msg ), timeout = _SEND_TIMEOUT_SECS )
154135 return True
155- except (ConnectionResetError , ClientConnectionResetError ):
136+ except (ConnectionResetError , ClientConnectionResetError , RuntimeError , asyncio . TimeoutError ):
156137 # Peer disconnected between scheduling and actual send.
157138 return False
158139
@@ -162,14 +143,20 @@ async def ws_send_str_many(ws_set: Iterable[WebSocketResponse | None], msg: str)
162143 if len (sockets ) == 0 :
163144 return 0
164145
165- sem = _get_send_semaphore ()
146+ # Cap fan-out inside one broadcast without coupling unrelated broadcasts.
147+ sem = asyncio .Semaphore (min (_SEND_CONCURRENCY , len (sockets )))
166148
167149 async def one (ws : WebSocketResponse ) -> bool :
168150 async with sem :
169151 try :
170- await ws .send_str (msg )
152+ await asyncio . wait_for ( ws .send_str (msg ), timeout = _SEND_TIMEOUT_SECS )
171153 return True
172- except (ConnectionResetError , ClientConnectionResetError ):
154+ except (
155+ ConnectionResetError ,
156+ ClientConnectionResetError ,
157+ RuntimeError ,
158+ asyncio .TimeoutError ,
159+ ):
173160 return False
174161 except Exception :
175162 return False
@@ -183,9 +170,9 @@ async def ws_send_json(ws: WebSocketResponse | None, msg: Mapping[str, object] |
183170 log .error ("ws_send_json: ws is None" )
184171 return False
185172 try :
186- await ws .send_json (msg )
173+ await asyncio . wait_for ( ws .send_json (msg ), timeout = _SEND_TIMEOUT_SECS )
187174 return True
188- except (ConnectionResetError , ClientConnectionResetError ):
175+ except (ConnectionResetError , ClientConnectionResetError , RuntimeError , asyncio . TimeoutError ):
189176 # Peer disconnected between scheduling and actual send.
190177 return False
191178 except Exception :
0 commit comments