@@ -610,6 +610,7 @@ async def connect(self, force=False):
610
610
try :
611
611
await asyncio .wait_for (self ._cancel (), timeout = 10.0 )
612
612
except asyncio .TimeoutError :
613
+ logger .debug (f"Timed out waiting for cancellation" )
613
614
pass
614
615
self .ws = await asyncio .wait_for (
615
616
connect (self .ws_url , ** self ._options ), timeout = 10.0
@@ -618,8 +619,9 @@ async def connect(self, force=False):
618
619
self ._send_recv_task = asyncio .get_running_loop ().create_task (
619
620
self ._handler (self .ws )
620
621
)
622
+ logger .debug ("Websocket handler attached." )
621
623
622
- async def _handler (self , ws : ClientConnection ) -> None :
624
+ async def _handler (self , ws : ClientConnection ) -> Union [ None , Exception ] :
623
625
recv_task = asyncio .create_task (self ._start_receiving (ws ))
624
626
send_task = asyncio .create_task (self ._start_sending (ws ))
625
627
done , pending = await asyncio .wait (
@@ -652,6 +654,7 @@ async def _handler(self, ws: ClientConnection) -> None:
652
654
)
653
655
await self .connect (True )
654
656
await self ._handler (ws = self .ws )
657
+ return None
655
658
elif isinstance (e := recv_task .result (), Exception ):
656
659
return e
657
660
elif isinstance (e := send_task .result (), Exception ):
@@ -834,8 +837,10 @@ async def retrieve(self, item_id: str) -> Optional[dict]:
834
837
except asyncio .QueueEmpty :
835
838
pass
836
839
if self ._send_recv_task is not None and self ._send_recv_task .done ():
837
- if isinstance (e := self ._send_recv_task .result (), Exception ):
838
- raise e
840
+ if not self ._send_recv_task .cancelled ():
841
+ if isinstance ((e := self ._send_recv_task .exception ()), Exception ):
842
+ logger .exception (f"Websocket sending exception: { e } " )
843
+ raise e
839
844
await asyncio .sleep (0.1 )
840
845
return None
841
846
0 commit comments