Skip to content

Commit

Permalink
Added reconnection in case websocket unexpectedly closes (#37)
Browse files Browse the repository at this point in the history
- A few bugfixes

Co-authored-by: Anon Ray <[email protected]>
  • Loading branch information
AleMuzzi and ecthiender authored May 27, 2023
1 parent a453417 commit b5dca94
Showing 1 changed file with 28 additions and 4 deletions.
32 changes: 28 additions & 4 deletions graphql_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(self, url):
# map of queues for each subscriber
self._subscriber_queues = {}
self._shutdown_receiver = False
self._subscriptions = []
self.connect()

def connect(self) -> None:
Expand All @@ -71,6 +72,16 @@ def connect(self) -> None:
self._recevier_thread = threading.Thread(target=self._receiver_task)
self._recevier_thread.start()

def _reconnect(self):
subscriptions = self._subscriptions
self.__init__(self.ws_url)

for subscription in subscriptions:
self.subscribe(query=subscription['query'],
variables=subscription['variables'],
headers=subscription['headers'],
callback=subscription['callback'])

def __dump_queues(self):
logger.debug('[GQL_CLIENT] => Dump of all the internal queues')
logger.debug('[GQL_CLIENT] => Global queue => \n %s', self._queue.queue)
Expand All @@ -81,13 +92,21 @@ def __dump_queues(self):
def _receiver_task(self):
"""the recieve function of the client. Which validates response from the
server and queues data """
while not self._shutdown_receiver:
reconnected = False
while not self._shutdown_receiver and not reconnected:
self.__dump_queues()
res = self._connection.recv()
try:
res = self._connection.recv()
except websocket._exceptions.WebSocketConnectionClosedException as e:
self._reconnect()
reconnected = True
continue

try:
msg = json.loads(res)
except json.JSONDecodeError as err:
logger.warning('Ignoring. Server sent invalid JSON data: %s \n %s', res, err)
continue

# ignore messages which are GQL_CONNECTION_KEEP_ALIVE
if msg['type'] != GQL_CONNECTION_KEEP_ALIVE:
Expand Down Expand Up @@ -122,7 +141,6 @@ def _receiver_task(self):
else:
self._queue.put(msg)


def _insert_subscriber(self, op_id, callback_fn):
self._subscriber_callbacks[op_id] = callback_fn

Expand Down Expand Up @@ -213,12 +231,18 @@ def subscribe(self, query: str, variables: dict = None, headers: dict = None,
"""

# sanity check that the user passed a valid function
if not callback and not callable(callback):
if not callback or not callable(callback):
raise TypeError('the argument `callback` is mandatory and it should be a function')

self._connection_init(headers)
payload = {'headers': headers, 'query': query, 'variables': variables}
op_id = self._start(payload, callback)
self._subscriptions.append({
'query': query,
'variables': variables,
'headers': headers,
'callback': callback
})
return op_id

def stop_subscribe(self, op_id: str) -> None:
Expand Down

0 comments on commit b5dca94

Please sign in to comment.