Skip to content

Commit d023bb0

Browse files
authored
Merge pull request #2 from OpenMatchmaking/feature-retry-send-request
Implemented sending the message with retries and timeouts
2 parents b7dfefd + 652eb0e commit d023bb0

File tree

3 files changed

+48
-11
lines changed

3 files changed

+48
-11
lines changed

sage_utils/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
__title__ = 'sage-utils'
2-
__version__ = '0.3.1'
2+
__version__ = '0.4.0'
33
__license__ = 'BSD'
44
VERSION = __version__

sage_utils/amqp/clients.py

+10-6
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def __init__(self, app, routing_key, request_exchange='',
3030
def response_queue_name(self):
3131
return self._response_queue_name
3232

33-
async def connect(self):
33+
async def connect(self, consume_timeout=None):
3434
self.transport, self.protocol = await self.app.amqp.connect()
3535
self.channel = await self.protocol.channel()
3636

@@ -53,18 +53,22 @@ async def connect(self):
5353
prefetch_size=0,
5454
connection_global=False
5555
)
56-
await self.channel.basic_consume(
57-
self.on_response,
58-
queue_name=self._response_queue_name,
56+
await asyncio.wait_for(
57+
self.channel.basic_consume(
58+
self.on_response,
59+
queue_name=self._response_queue_name,
60+
),
61+
timeout=consume_timeout,
62+
loop=self.app.loop
5963
)
6064

6165
async def on_response(self, _channel, body, _envelope, _properties):
6266
self._response = json.loads(body)
6367
self.waiter.set()
6468

65-
async def send(self, payload={}, properties={}, raw_data=False):
69+
async def send(self, payload={}, properties={}, raw_data=False, consume_timeout=None):
6670
if not self.protocol:
67-
await self.connect()
71+
await self.connect(consume_timeout=consume_timeout)
6872

6973
request_properties = deepcopy(self.DEFAULT_PROPERTIES)
7074
request_properties.update({'reply_to': self.response_queue_name})

sage_utils/amqp/workers.py

+37-4
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,22 @@
1+
import asyncio
2+
import logging
3+
4+
from aioamqp.exceptions import AioamqpException
15
from sage_utils.amqp.base import AmqpWorker
26
from sage_utils.amqp.clients import RpcAmqpClient
37

48

9+
LOGGER = logging.getLogger(__name__)
10+
11+
512
class BaseRegisterWorker(AmqpWorker):
613
"""
714
Base class for implementing worker which is registering a new
815
microservice in Open Matchmaking platform.
916
"""
17+
MAX_RETRIES = 5
18+
RETRY_TIMEOUT = 10
19+
1020
REQUEST_QUEUE_NAME = "auth.microservices.register"
1121
REQUEST_EXCHANGE_NAME = "open-matchmaking.direct"
1222
RESPONSE_EXCHANGE_NAME = "open-matchmaking.responses.direct"
@@ -24,8 +34,31 @@ async def run(self, *args, **kwargs):
2434
response_queue='',
2535
response_exchange=self.RESPONSE_EXCHANGE_NAME
2636
)
27-
response = await client.send(self.get_microservice_data(self.app))
2837

29-
assert 'error' not in response.keys(), response['error']
30-
assert 'content' in response.keys()
31-
assert response['content'] == 'OK'
38+
is_registered = False
39+
microservice_data = self.get_microservice_data(self.app)
40+
for _ in range(self.MAX_RETRIES):
41+
try:
42+
response = await client.send(
43+
payload=microservice_data,
44+
consume_timeout=self.RETRY_TIMEOUT
45+
)
46+
47+
if 'error' in response.keys():
48+
LOGGER.error("Received validation errors: {}".format(response['error']))
49+
else:
50+
assert 'content' in response.keys()
51+
assert response['content'] == 'OK'
52+
is_registered = True
53+
54+
break
55+
except (AioamqpException, TimeoutError):
56+
LOGGER.error(
57+
"Can't receive a response because the Auth/Auth microservice isn't "
58+
"responding or the required queues and the exchanges aren't created. "
59+
"Retry after {} second(s).".format(self.RETRY_TIMEOUT)
60+
)
61+
await asyncio.sleep(self.RETRY_TIMEOUT, loop=self.app.loop)
62+
63+
if not is_registered:
64+
raise ConnectionError('Occurred an error during registering microservice.')

0 commit comments

Comments
 (0)