From 82adb93b69bff11dfcd6824a591a7d168f35c466 Mon Sep 17 00:00:00 2001 From: Hynek Urban Date: Thu, 29 Dec 2022 20:25:09 +0100 Subject: [PATCH] Initial commit. --- .gitignore | 2 + LICENSE | 18 +++ README.md | 44 ++++++ requirements.txt | 1 + rocketchat_async/.gitignore | 2 + rocketchat_async/__init__.py | 1 + rocketchat_async/constants.py | 26 ++++ rocketchat_async/core.py | 71 ++++++++++ rocketchat_async/dispatcher.py | 70 ++++++++++ rocketchat_async/methods.py | 248 +++++++++++++++++++++++++++++++++ 10 files changed, 483 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 requirements.txt create mode 100644 rocketchat_async/.gitignore create mode 100644 rocketchat_async/__init__.py create mode 100644 rocketchat_async/constants.py create mode 100644 rocketchat_async/core.py create mode 100644 rocketchat_async/dispatcher.py create mode 100644 rocketchat_async/methods.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..033df5f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.venv +__pycache__ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d7f27dc --- /dev/null +++ b/LICENSE @@ -0,0 +1,18 @@ +Copyright (c) 2023 Hynek Urban + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..405883d --- /dev/null +++ b/README.md @@ -0,0 +1,44 @@ +# rocketchat-async + +asyncio-based Python wrapper for the Rocket.Chat Realtime API. + +## When should you use this library? + +Use this library if you: + +- want to integrate with Rocket.Chat from Python +- are using [asyncio](https://docs.python.org/3/library/asyncio.html) to drive your application +- want to use Rocket.Chat's efficient websockets-based Realtime API + +## Example usage + +```python +import asyncio +from rocketchat_async import RocketChat + + +def handle_message(channel_id, sender_id, msg_id, thread_id, msg, qualifier): + """Simply print the message that arrived.""" + print(msg) + + +async def subscribe_to_messages(rc, channel): + """Subscribe to a channel message.""" + await rc.subscribe_to_channel_messages(channel, handle_message) + + +async def main(address, username, password): + rc = RocketChat() + await rc.start(address, username, password) + # One possible workflow consists of two steps: + # + # 1. Set up the desired callbacks... + for channel_id, channel_type in await rc.get_channels(): + await subscribe_to_messages(rc, channel_id) + # 2. ...and then simply wait for the registered events. + await rc.run_forever() + + +# Side note: Don't forget to use the wss:// scheme when TLS is used. +asyncio.run(main('ws://localhost:3000/websocket', 'username', 'password')) +``` diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2da5042 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +websockets==10.4 diff --git a/rocketchat_async/.gitignore b/rocketchat_async/.gitignore new file mode 100644 index 0000000..838d81c --- /dev/null +++ b/rocketchat_async/.gitignore @@ -0,0 +1,2 @@ +src/.venv +__pycache__ diff --git a/rocketchat_async/__init__.py b/rocketchat_async/__init__.py new file mode 100644 index 0000000..96e1285 --- /dev/null +++ b/rocketchat_async/__init__.py @@ -0,0 +1 @@ +from rocketchat_async.core import RocketChat diff --git a/rocketchat_async/constants.py b/rocketchat_async/constants.py new file mode 100644 index 0000000..2097741 --- /dev/null +++ b/rocketchat_async/constants.py @@ -0,0 +1,26 @@ +QUALIFIER_REMOVE_USER = "ru" +QUALIFIER_DIRECT_MESSAGE = "d" + +# Selection of emojis that are subjectively "most fun". +EMOJIS = [ + ':grin:', + ':sweat_smile:', + ':joy:', + ':heart_eyes:', + ':smiling_face_with_3_hearts:', + ':nerd:', + ':sunglasses:', + ':partying_face:', + ':sob:', + ':exploding_head:', + ':fearful:', + ':rolling_eyes:', + ':thumbsup:', + ':thumbsdown:', + ':fingers_crossed:', + ':metal:', + ':v:', + ':man_facepalming:', + ':point_up:', + ':penguin:', +] diff --git a/rocketchat_async/core.py b/rocketchat_async/core.py new file mode 100644 index 0000000..5911a82 --- /dev/null +++ b/rocketchat_async/core.py @@ -0,0 +1,71 @@ +import asyncio +import websockets + +from rocketchat_async.dispatcher import Dispatcher +from rocketchat_async.methods import Connect, Login, GetChannels, SendMessage,\ + SendReaction, SendTypingEvent, SubscribeToChannelMessages,\ + SubscribeToChannelChanges, Unsubscribe + + +class RocketChat: + """Represents a connection to RocketChat, exposing the API.""" + + def __init__(self): + self._dispatcher = Dispatcher(verbose=True) + self.user_id = None + + async def start(self, address, username, password): + ws_connected = asyncio.get_event_loop().create_future() + ws_connection = self._start(address, ws_connected) + self._ws_connection_task = asyncio.create_task(ws_connection) + await ws_connected + # Connect and login. + await self._connect() + self.user_id = await self._login(username, password) + + async def run_forever(self): + await self.dispatch_task + + async def _start(self, address, connected_fut): + try: + async with websockets.connect(address) as websocket: + self.dispatch_task = self._dispatcher.run(websocket) + # Notify the caller that login has succeeded. + connected_fut.set_result(True) + # Finally, create the ever-running dispatcher loop. + await self.dispatch_task + except Exception as e: + connected_fut.set_exception(e) + + async def _connect(self): + await Connect.call(self._dispatcher) + + async def _login(self, username, password): + return await Login.call(self._dispatcher, username, password) + + # --> Public API methods start here. <-- + + async def get_channels(self): + return await GetChannels.call(self._dispatcher) + + async def send_message(self, context, text): + await SendMessage.call(self._dispatcher, context, text) + + async def send_reaction(self, orig_msg_id, emoji): + await SendReaction.call(orig_msg_id, emoji) + + async def send_typing_event(self, context, is_typing): + await SendTypingEvent.call(self._dispatcher, context, False) + + async def subscribe_to_channel_messages(self, channel_id, callback): + sub_id = await SubscribeToChannelMessages.call(self._dispatcher, + channel_id, callback) + return sub_id + + async def subscribe_to_channel_changes(self, callback): + sub_id = await SubscribeToChannelChanges.call(self._dispatcher, + self.user_id, callback) + return sub_id + + async def unsubscribe(self, subscription_id): + await Unsubscribe.call(self._dispatcher, subscription_id) diff --git a/rocketchat_async/dispatcher.py b/rocketchat_async/dispatcher.py new file mode 100644 index 0000000..a1e2ad0 --- /dev/null +++ b/rocketchat_async/dispatcher.py @@ -0,0 +1,70 @@ +import asyncio +import json + + +class Dispatcher: + """Match websockets calls with responses and manage callbacks.""" + + def __init__(self, verbose=False): + self._websocket = None + self._futures = {} # ID -> asyncio Future (resolved with the response) + self._verbose = verbose + + # ID -> registered callback (called when the msg arrives) + self._callbacks = {} + + def run(self, websocket): + self._websocket = websocket + # Start listening to incoming messages, executing callbacks, + # if appropriate. + return asyncio.create_task(self._process_incoming()) + + async def call_method(self, msg, msg_id=None): + if self._verbose: + print(f'Outgoing: {msg}') + if (msg_id is not None): + fut = asyncio.get_event_loop().create_future() + self._futures[msg_id] = fut + await self._websocket.send(json.dumps(msg)) + if (msg_id is not None): + return await fut + + async def create_subscription(self, msg, msg_id, callback): + if self._verbose: + print(f'Outgoing: {msg}') + self._callbacks[msg['name']] = callback + await self._websocket.send(json.dumps(msg)) + + async def _process_incoming(self): + try: + while True: + await self._process_incoming_event() + except Exception as err: + for fut in self._futures.values(): + # Propagate the exception to all awaiters to not get stuck. + fut.set_exception(err) + raise err + + async def _process_incoming_event(self): + msg = await self._websocket.recv() + if self._verbose: + print(f'Incoming: {msg}') + parsed = json.loads(msg) + if parsed['msg'] == 'result': + msg_id = parsed['id'] + if msg_id in self._futures: + self._futures[msg_id].set_result(parsed) + del self._futures[msg_id] + elif parsed['msg'] == 'changed': # Subscription update. + stream_name = parsed['collection'] + if stream_name in self._callbacks: + self._callbacks[stream_name](parsed) + elif parsed['msg'] in ['ready', 'connected', 'added', 'updated', + 'nosub']: + return # Nothing to do. + elif parsed['msg'] == 'ping': + asyncio.create_task(self.call_method({'msg': 'pong'})) + elif parsed['msg'] == 'error': + raise Exception(msg) # TODO - more specific class. + else: + raise Exception(f'Unknown message: {msg}') diff --git a/rocketchat_async/methods.py b/rocketchat_async/methods.py new file mode 100644 index 0000000..e6bf959 --- /dev/null +++ b/rocketchat_async/methods.py @@ -0,0 +1,248 @@ +import hashlib +import time + + +class RealtimeRequest: + """Method call or subscription in the RocketChat realtime API.""" + _max_id = 0 + + @staticmethod + def _get_new_id(): + RealtimeRequest._max_id += 1 + return f'{RealtimeRequest._max_id}' + + +class Connect(RealtimeRequest): + """Initialize the connection.""" + + REQUEST_MSG = { + 'msg': 'connect', + 'version': '1', + 'support': ['1'], + } + + @classmethod + async def call(cls, dispatcher): + await dispatcher.call_method(cls.REQUEST_MSG) + + +class Login(RealtimeRequest): + """Log in to the service.""" + + @staticmethod + def _get_request_msg(msg_id, username, password): + pwd_digest = hashlib.sha256(password.encode()).hexdigest() + return { + "msg": "method", + "method": "login", + "id": msg_id, + "params": [ + { + "user": {"username": username}, + "password": { + "digest": pwd_digest, + "algorithm": "sha-256" + } + } + ] + } + + @staticmethod + def _parse(response): + return response['result']['id'] + + @classmethod + async def call(cls, dispatcher, username, password): + msg_id = cls._get_new_id() + msg = cls._get_request_msg(msg_id, username, password) + response = await dispatcher.call_method(msg, msg_id) + return cls._parse(response) + + +class GetChannels(RealtimeRequest): + """Get a list of channels Varel is currently member of.""" + + @staticmethod + def _get_request_msg(msg_id): + return { + 'msg': 'method', + 'method': 'rooms/get', + 'id': msg_id, + 'params': [], + } + + @staticmethod + def _parse(response): + # Return channel IDs and channel types. + return [(r['_id'], r['t']) for r in response['result']] + + @classmethod + async def call(cls, dispatcher): + msg_id = cls._get_new_id() + msg = cls._get_request_msg(msg_id) + response = await dispatcher.call_method(msg, msg_id) + return cls._parse(response) + + +class SendMessage(RealtimeRequest): + """Send a text message to a channel.""" + + @staticmethod + def _get_request_msg(msg_id, channel_id, msg_text, thread_id=None): + id_seed = f'{msg_id}:{time.time()}' + msg = { + "msg": "method", + "method": "sendMessage", + "id": msg_id, + "params": [ + { + "_id": hashlib.md5(id_seed.encode()).hexdigest()[:12], + "rid": channel_id, + "msg": msg_text + } + ] + } + if thread_id is not None: + msg["params"][0]["tmid"] = thread_id + return msg + + @classmethod + async def call(cls, dispatcher, context, msg_text): + msg_id = cls._get_new_id() + msg = cls._get_request_msg(msg_id, context.channel_id, msg_text, + context.thread_id) + await dispatcher.call_method(msg, msg_id) + + +class SendReaction(RealtimeRequest): + """Send a reaction to a specific message.""" + + @staticmethod + def _get_request_msg(msg_id, orig_msg_id, emoji): + return { + "msg": "method", + "method": "setReaction", + "id": msg_id, + "params": [ + emoji, + orig_msg_id, + ] + } + + @classmethod + async def call(cls, dispatcher, orig_msg_id, emoji): + msg_id = cls._get_new_id() + msg = cls._get_request_msg(msg_id, orig_msg_id, emoji) + await dispatcher.call_method(msg) + + +class SendTypingEvent(RealtimeRequest): + """Send the `typing` event to a channel.""" + + @staticmethod + def _get_request_msg(msg_id, channel_id, is_typing): + return { + "msg": "method", + "method": "stream-notify-room", + "id": msg_id, + "params": [ + f'{channel_id}/typing', + "varel.bot", # TODO? + is_typing + ] + } + + @classmethod + async def call(cls, dispatcher, context, is_typing): + msg_id = cls._get_new_id() + msg = cls._get_request_msg(msg_id, context.channel_id, is_typing) + await dispatcher.call_method(msg, msg_id) + + +class SubscribeToChannelMessages(RealtimeRequest): + """Subscribe to all messages in the given channel.""" + + @staticmethod + def _get_request_msg(msg_id, channel_id): + return { + "msg": "sub", + "id": msg_id, + "name": "stream-room-messages", + "params": [ + channel_id, + { + "useCollection": False, + "args": [] + } + ] + } + + @staticmethod + def _wrap(callback): + def fn(msg): + event = msg['fields']['args'][0] # TODO: This looks suspicious. + msg_id = event['_id'] + channel_id = event['rid'] + thread_id = event.get('tmid') + sender_id = event['u']['_id'] + msg = event['msg'] + qualifier = event.get('t') + return callback(channel_id, sender_id, msg_id, thread_id, msg, + qualifier) + return fn + + @classmethod + async def call(cls, dispatcher, channel_id, callback): + # TODO: document the expected interface of the callback. + msg_id = cls._get_new_id() + msg = cls._get_request_msg(msg_id, channel_id) + await dispatcher.create_subscription(msg, msg_id, cls._wrap(callback)) + return msg_id # Return the ID to allow for later unsubscription. + + +class SubscribeToChannelChanges(RealtimeRequest): + """Subscribe to all changes in channels.""" + + @staticmethod + def _get_request_msg(msg_id, user_id): + return { + "msg": "sub", + "id": msg_id, + "name": "stream-notify-user", + "params": [ + f'{user_id}/rooms-changed', + False + ] + } + + @staticmethod + def _wrap(callback): + def fn(msg): + channel_id = msg['fields']['args'][1]['_id'] + channel_type = msg['fields']['args'][1]['t'] + return callback(channel_id, channel_type) + return fn + + @classmethod + async def call(cls, dispatcher, user_id, callback): + # TODO: document the expected interface of the callback. + msg_id = cls._get_new_id() + msg = cls._get_request_msg(msg_id, user_id) + await dispatcher.create_subscription(msg, msg_id, cls._wrap(callback)) + return msg_id # Return the ID to allow for later unsubscription. + + +class Unsubscribe(RealtimeRequest): + """Cancel a subscription""" + + @staticmethod + def _get_request_msg(subscription_id): + return { + "msg": "unsub", + "id": subscription_id, + } + + @classmethod + async def call(cls, dispatcher, subscription_id): + msg = cls._get_request_msg(subscription_id) + await dispatcher.call_method(msg)