diff --git a/nats/aio/client.py b/nats/aio/client.py index c2bfd00b..f501117e 100644 --- a/nats/aio/client.py +++ b/nats/aio/client.py @@ -330,6 +330,7 @@ async def connect( inbox_prefix: Union[str, bytes] = DEFAULT_INBOX_PREFIX, pending_size: int = DEFAULT_PENDING_SIZE, flush_timeout: Optional[float] = None, + ws_client_options: Optional[dict] = None, ) -> None: """ Establishes a connection to NATS. @@ -450,6 +451,9 @@ async def subscribe_handler(msg): self._nkeys_seed = nkeys_seed self._nkeys_seed_str = nkeys_seed_str + # Options to customize aiohttp client in case of websocket transport + self._ws_client_options = ws_client_options + # Customizable options self.options["verbose"] = verbose self.options["pedantic"] = pedantic @@ -1348,7 +1352,9 @@ async def _select_next_server(self) -> None: s.last_attempt = time.monotonic() if not self._transport: if s.uri.scheme in ("ws", "wss"): - self._transport = WebSocketTransport() + self._transport = WebSocketTransport( + self._ws_client_options + ) else: # use TcpTransport as a fallback self._transport = TcpTransport() diff --git a/nats/aio/transport.py b/nats/aio/transport.py index 74a597ad..e8b0d2ea 100644 --- a/nats/aio/transport.py +++ b/nats/aio/transport.py @@ -192,13 +192,15 @@ def __bool__(self): class WebSocketTransport(Transport): - def __init__(self): + def __init__(self, client_options: Optional[dict] = None): if not aiohttp: raise ImportError( "Could not import aiohttp transport, please install it with `pip install aiohttp`" ) self._ws: Optional[aiohttp.ClientWebSocketResponse] = None - self._client: aiohttp.ClientSession = aiohttp.ClientSession() + self._client: aiohttp.ClientSession = aiohttp.ClientSession( + **client_options + ) self._pending = asyncio.Queue() self._close_task = asyncio.Future() self._using_tls: Optional[bool] = None