Skip to content

Commit 4651ebe

Browse files
committed
Initial commit for the project
1 parent dc051e2 commit 4651ebe

File tree

10 files changed

+282
-2
lines changed

10 files changed

+282
-2
lines changed

README.md

-2
This file was deleted.

README.rst

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
sage-utils-python
2+
####################
3+
SDK for Open Matchmaking microservices in Python
4+
5+
Features
6+
========
7+
- Framework agnostic solution
8+
- Easy to write a new extension and use it later with your code
9+
- Base class for implementing AMQP workers
10+
11+
Installation
12+
============
13+
This package should be installed using pip: ::
14+
15+
pip install sage-utils
16+
17+
Example
18+
=======
19+
.. code-block:: python
20+
21+
from sanic import Sanic
22+
from sage_utils.extension import BaseExtension
23+
24+
25+
class CustomExtension(BaseExtension):
26+
extension_name = app_attribute = 'custom'
27+
28+
def hello(self, user):
29+
print("Hello, {}!".format(user))
30+
31+
32+
app = Sanic(__name__)
33+
CustomExtension() # available via `app.custom` or `app.extensions['custom']`
34+
app.custom.hello('world') # Hello, world!
35+
36+
License
37+
=======
38+
The sage-utils-python is published under BSD license. For more details read LICENSE_ file.
39+
40+
.. _links:
41+
.. _LICENSE: https://github.com/OpenMatchmaking/sage-utils-python/blob/master/LICENSE

sage_utils/__init__.py

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
__title__ = 'sage-utils'
2+
__version__ = '0.1.0'
3+
__license__ = 'BSD'
4+
VERSION = __version__

sage_utils/amqp/__init__.py

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from sage_utils.amqp.base import AmqpWorker # NOQA
2+
from sage_utils.amqp.extension import AmqpExtension # NOQA
3+
from sage_utils.amqp.clients import RpcAmqpClient # NOQA

sage_utils/amqp/base.py

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
2+
3+
class AmqpWorker(object):
4+
5+
def __init__(self, app, *args, **kwargs):
6+
self.app = app
7+
self.protocol = None
8+
self.transport = None
9+
10+
async def run(self, *args, **kwargs):
11+
raise NotImplementedError('`run(*args, **kwargs)` method must be implemented.')
12+
13+
async def connect(self):
14+
self.transport, self.protocol = await self.app.amqp.connect()
15+
return self.transport, self.protocol
16+
17+
async def free_resources(self):
18+
if self.protocol:
19+
if not self.protocol.worker.cancelled():
20+
self.protocol.worker.cancel()
21+
22+
await self.protocol.close()
23+
24+
if self.transport:
25+
self.transport.close()
26+
27+
self.transport = None
28+
self.protocol = None

sage_utils/amqp/clients.py

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import asyncio
2+
import json
3+
from copy import deepcopy
4+
5+
6+
class RpcAmqpClient(object):
7+
CONTENT_TYPE = 'application/json'
8+
DEFAULT_PROPERTIES = {
9+
'content_type': CONTENT_TYPE,
10+
'delivery_mode': 2,
11+
'correlation_id': 'test-event-name'
12+
}
13+
14+
def __init__(self, app, routing_key, request_exchange='',
15+
response_queue=None, response_exchange=''):
16+
self.app = app
17+
self.routing_key = routing_key
18+
self.request_exchange = request_exchange
19+
self.response_queue = response_queue
20+
self.response_exchange = response_exchange
21+
self.transport = None
22+
self.protocol = None
23+
self.channel = None
24+
25+
self.waiter = asyncio.Event()
26+
self._response_queue_name = None
27+
self._response = None
28+
29+
@property
30+
def response_queue_name(self):
31+
return self._response_queue_name
32+
33+
async def connect(self):
34+
self.transport, self.protocol = await self.app.amqp.connect()
35+
self.channel = await self.protocol.channel()
36+
37+
if self.response_queue is not None:
38+
result = await self.channel.queue_declare(
39+
queue_name=self.response_queue,
40+
exclusive=True,
41+
durable=True,
42+
passive=False,
43+
auto_delete=True,
44+
)
45+
self._response_queue_name = result['queue']
46+
await self.channel.queue_bind(
47+
queue_name=self.response_queue_name,
48+
exchange_name=self.response_exchange,
49+
routing_key=self.response_queue_name
50+
)
51+
await self.channel.basic_qos(
52+
prefetch_count=1,
53+
prefetch_size=0,
54+
connection_global=False
55+
)
56+
await self.channel.basic_consume(
57+
self.on_response,
58+
queue_name=self._response_queue_name,
59+
)
60+
61+
async def on_response(self, _channel, body, _envelope, _properties):
62+
self._response = json.loads(body)
63+
self.waiter.set()
64+
65+
async def send(self, payload={}, properties={}, raw_data=False):
66+
if not self.protocol:
67+
await self.connect()
68+
69+
request_properties = deepcopy(self.DEFAULT_PROPERTIES)
70+
request_properties.update({'reply_to': self.response_queue_name})
71+
request_properties.update(properties)
72+
await self.channel.publish(
73+
payload if raw_data else json.dumps(payload),
74+
exchange_name=self.request_exchange,
75+
routing_key=self.routing_key,
76+
properties=request_properties
77+
)
78+
79+
response = None
80+
if self.response_queue_name is not None:
81+
await self.waiter.wait()
82+
response = self._response
83+
84+
await self.protocol.close()
85+
self.protocol = None
86+
self.transport = None
87+
return response

sage_utils/amqp/extension.py

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
from asyncio import ensure_future
2+
3+
from aioamqp import connect as amqp_connect
4+
5+
from sage_utils.extension import BaseExtension
6+
7+
8+
class AmqpExtension(BaseExtension):
9+
app_attribute = 'amqp'
10+
workers = []
11+
active_tasks = []
12+
13+
def register_worker(self, worker):
14+
self.workers.append(worker)
15+
16+
def get_config(self, app):
17+
return {
18+
"login": app.config.get("AMQP_USERNAME", "guest"),
19+
"password": app.config.get("AMQP_PASSWORD", "guest"),
20+
"host": app.config.get("AMQP_HOST", "localhost"),
21+
"port": app.config.get("AMQP_PORT", 5672),
22+
"virtualhost": app.config.get("AMQP_VIRTUAL_HOST", "vhost"),
23+
"ssl": app.config.get("AMQP_USING_SSL", False),
24+
}
25+
26+
async def connect(self):
27+
config = self.get_config(self.app)
28+
transport, protocol = await amqp_connect(**config)
29+
return transport, protocol
30+
31+
async def init(self, loop):
32+
if not hasattr(self.app, 'extensions'):
33+
setattr(self.app, 'extensions', {})
34+
self.app.extensions[self.extension_name] = self
35+
36+
for worker in self.workers:
37+
task = ensure_future(worker.run(), loop=loop)
38+
self.active_tasks.append(task)
39+
40+
async def deinit(self, loop):
41+
for task in self.active_tasks:
42+
if not loop.is_closed and not task.cancelled():
43+
task.cancel()
44+
45+
for worker in self.workers:
46+
await worker.free_resources()
47+
48+
setattr(self.app, self.app_attribute, None)
49+
extensions = getattr(self.app, 'extensions', {})
50+
extensions.pop(self.extension_name, None)

sage_utils/constants.py

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
"""
2+
Descriptive error types, for code readability.
3+
"""
4+
AUTHORIZATION_ERROR = "AuthorizationError"
5+
NOT_FOUND_ERROR = "NotFoundError"
6+
TOKEN_ERROR = "TokenError"
7+
HEADER_ERROR = "HeaderError"
8+
VALIDATION_ERROR = "ValidationError"

sage_utils/extension.py

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
2+
3+
class BaseExtension(object):
4+
extension_name = None
5+
app_attribute = None
6+
7+
def __init__(self, app=None, app_attribute: str=None, *args, **kwargs):
8+
self.app = app
9+
self.app_attribute = app_attribute or self.app_attribute
10+
11+
if app:
12+
self._register_extension(app, *args, **kwargs)
13+
self.init_app(app, *args, **kwargs)
14+
15+
def _register_extension(self, app, *args, **kwargs):
16+
if not hasattr(app, 'extensions'):
17+
setattr(app, 'extensions', {})
18+
19+
app.extensions[self.extension_name] = self
20+
21+
def init_app(self, app, *args, **kwargs):
22+
setattr(app, self.app_attribute, self)
23+
24+
def get_from_app_config(self, app, parameter, default=None):
25+
return getattr(app.config, parameter, default)
26+
27+
async def init(self, loop):
28+
pass
29+
30+
async def deinit(self, loop):
31+
pass

sage_utils/wrappers.py

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
2+
3+
class Response(object):
4+
"""
5+
The response object that is used by default for microservices.
6+
"""
7+
CONTENT_FIELD_NAME = 'content'
8+
ERROR_FIELD_NAME = 'error'
9+
EVENT_FIELD_NAME = 'event-name'
10+
11+
def append_extra_fields(self, event_name, *args, **kwargs):
12+
return {self.EVENT_FIELD_NAME: event_name}
13+
14+
def from_error(self, error_type, message, event_name=None):
15+
if isinstance(message, str) and not message.endswith('.'):
16+
message = message + '.'
17+
18+
response = {
19+
self.ERROR_FIELD_NAME: {
20+
"type": error_type,
21+
"message": message
22+
}
23+
}
24+
response.update(self.append_extra_fields(event_name))
25+
return response
26+
27+
def wrap_content(self, data, event_name=None):
28+
response = {self.CONTENT_FIELD_NAME: data}
29+
response.update(self.append_extra_fields(event_name))
30+
return response

0 commit comments

Comments
 (0)