@@ -610,6 +610,7 @@ async def connect(self, force=False):
610
610
try :
611
611
await asyncio .wait_for (self ._cancel (), timeout = 10.0 )
612
612
except asyncio .TimeoutError :
613
+ logger .debug (f"Timed out waiting for cancellation" )
613
614
pass
614
615
self .ws = await asyncio .wait_for (
615
616
connect (self .ws_url , ** self ._options ), timeout = 10.0
@@ -618,8 +619,9 @@ async def connect(self, force=False):
618
619
self ._send_recv_task = asyncio .get_running_loop ().create_task (
619
620
self ._handler (self .ws )
620
621
)
622
+ logger .debug ("Websocket handler attached." )
621
623
622
- async def _handler (self , ws : ClientConnection ) -> None :
624
+ async def _handler (self , ws : ClientConnection ) -> Union [ None , Exception ] :
623
625
recv_task = asyncio .create_task (self ._start_receiving (ws ))
624
626
send_task = asyncio .create_task (self ._start_sending (ws ))
625
627
done , pending = await asyncio .wait (
@@ -652,6 +654,7 @@ async def _handler(self, ws: ClientConnection) -> None:
652
654
)
653
655
await self .connect (True )
654
656
await self ._handler (ws = self .ws )
657
+ return None
655
658
elif isinstance (e := recv_task .result (), Exception ):
656
659
return e
657
660
elif isinstance (e := send_task .result (), Exception ):
@@ -834,8 +837,10 @@ async def retrieve(self, item_id: str) -> Optional[dict]:
834
837
except asyncio .QueueEmpty :
835
838
pass
836
839
if self ._send_recv_task is not None and self ._send_recv_task .done ():
837
- if isinstance (e := self ._send_recv_task .result (), Exception ):
838
- raise e
840
+ if not self ._send_recv_task .cancelled ():
841
+ if isinstance ((e := self ._send_recv_task .exception ()), Exception ):
842
+ logger .exception (f"Websocket sending exception: { e } " )
843
+ raise e
839
844
await asyncio .sleep (0.1 )
840
845
return None
841
846
@@ -2377,8 +2382,13 @@ async def _make_rpc_request(
2377
2382
for payload in payloads :
2378
2383
item_id = await ws .send (payload ["payload" ])
2379
2384
request_manager .add_request (item_id , payload ["id" ])
2385
+ # truncate to 2000 chars for debug logging
2386
+ if len (stringified_payload := str (payload )) < 2_000 :
2387
+ output_payload = stringified_payload
2388
+ else :
2389
+ output_payload = f"{ stringified_payload [:2_000 ]} (truncated)"
2380
2390
logger .debug (
2381
- f"Submitted payload ID { payload ['id' ]} with websocket ID { item_id } : { payload } "
2391
+ f"Submitted payload ID { payload ['id' ]} with websocket ID { item_id } : { output_payload } "
2382
2392
)
2383
2393
2384
2394
while True :
@@ -2420,6 +2430,7 @@ async def _make_rpc_request(
2420
2430
request_manager .add_response (
2421
2431
item_id , decoded_response , complete
2422
2432
)
2433
+ # truncate to 2000 chars for debug logging
2423
2434
if (
2424
2435
len (stringified_response := str (decoded_response ))
2425
2436
< 2_000
0 commit comments