Skip to content

Commit

Permalink
Initial commit.
Browse files Browse the repository at this point in the history
  • Loading branch information
hynek-urban committed Dec 29, 2022
0 parents commit 82adb93
Show file tree
Hide file tree
Showing 10 changed files with 483 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.venv
__pycache__
18 changes: 18 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Copyright (c) 2023 Hynek Urban <[email protected]>

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# rocketchat-async

asyncio-based Python wrapper for the Rocket.Chat Realtime API.

## When should you use this library?

Use this library if you:

- want to integrate with Rocket.Chat from Python
- are using [asyncio](https://docs.python.org/3/library/asyncio.html) to drive your application
- want to use Rocket.Chat's efficient websockets-based Realtime API

## Example usage

```python
import asyncio
from rocketchat_async import RocketChat


def handle_message(channel_id, sender_id, msg_id, thread_id, msg, qualifier):
"""Simply print the message that arrived."""
print(msg)


async def subscribe_to_messages(rc, channel):
"""Subscribe to a channel message."""
await rc.subscribe_to_channel_messages(channel, handle_message)


async def main(address, username, password):
rc = RocketChat()
await rc.start(address, username, password)
# One possible workflow consists of two steps:
#
# 1. Set up the desired callbacks...
for channel_id, channel_type in await rc.get_channels():
await subscribe_to_messages(rc, channel_id)
# 2. ...and then simply wait for the registered events.
await rc.run_forever()


# Side note: Don't forget to use the wss:// scheme when TLS is used.
asyncio.run(main('ws://localhost:3000/websocket', 'username', 'password'))
```
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
websockets==10.4
2 changes: 2 additions & 0 deletions rocketchat_async/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
src/.venv
__pycache__
1 change: 1 addition & 0 deletions rocketchat_async/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from rocketchat_async.core import RocketChat
26 changes: 26 additions & 0 deletions rocketchat_async/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
QUALIFIER_REMOVE_USER = "ru"
QUALIFIER_DIRECT_MESSAGE = "d"

# Selection of emojis that are subjectively "most fun".
EMOJIS = [
':grin:',
':sweat_smile:',
':joy:',
':heart_eyes:',
':smiling_face_with_3_hearts:',
':nerd:',
':sunglasses:',
':partying_face:',
':sob:',
':exploding_head:',
':fearful:',
':rolling_eyes:',
':thumbsup:',
':thumbsdown:',
':fingers_crossed:',
':metal:',
':v:',
':man_facepalming:',
':point_up:',
':penguin:',
]
71 changes: 71 additions & 0 deletions rocketchat_async/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import asyncio
import websockets

from rocketchat_async.dispatcher import Dispatcher
from rocketchat_async.methods import Connect, Login, GetChannels, SendMessage,\
SendReaction, SendTypingEvent, SubscribeToChannelMessages,\
SubscribeToChannelChanges, Unsubscribe


class RocketChat:
"""Represents a connection to RocketChat, exposing the API."""

def __init__(self):
self._dispatcher = Dispatcher(verbose=True)
self.user_id = None

async def start(self, address, username, password):
ws_connected = asyncio.get_event_loop().create_future()
ws_connection = self._start(address, ws_connected)
self._ws_connection_task = asyncio.create_task(ws_connection)
await ws_connected
# Connect and login.
await self._connect()
self.user_id = await self._login(username, password)

async def run_forever(self):
await self.dispatch_task

async def _start(self, address, connected_fut):
try:
async with websockets.connect(address) as websocket:
self.dispatch_task = self._dispatcher.run(websocket)
# Notify the caller that login has succeeded.
connected_fut.set_result(True)
# Finally, create the ever-running dispatcher loop.
await self.dispatch_task
except Exception as e:
connected_fut.set_exception(e)

async def _connect(self):
await Connect.call(self._dispatcher)

async def _login(self, username, password):
return await Login.call(self._dispatcher, username, password)

# --> Public API methods start here. <--

async def get_channels(self):
return await GetChannels.call(self._dispatcher)

async def send_message(self, context, text):
await SendMessage.call(self._dispatcher, context, text)

async def send_reaction(self, orig_msg_id, emoji):
await SendReaction.call(orig_msg_id, emoji)

async def send_typing_event(self, context, is_typing):
await SendTypingEvent.call(self._dispatcher, context, False)

async def subscribe_to_channel_messages(self, channel_id, callback):
sub_id = await SubscribeToChannelMessages.call(self._dispatcher,
channel_id, callback)
return sub_id

async def subscribe_to_channel_changes(self, callback):
sub_id = await SubscribeToChannelChanges.call(self._dispatcher,
self.user_id, callback)
return sub_id

async def unsubscribe(self, subscription_id):
await Unsubscribe.call(self._dispatcher, subscription_id)
70 changes: 70 additions & 0 deletions rocketchat_async/dispatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import asyncio
import json


class Dispatcher:
"""Match websockets calls with responses and manage callbacks."""

def __init__(self, verbose=False):
self._websocket = None
self._futures = {} # ID -> asyncio Future (resolved with the response)
self._verbose = verbose

# ID -> registered callback (called when the msg arrives)
self._callbacks = {}

def run(self, websocket):
self._websocket = websocket
# Start listening to incoming messages, executing callbacks,
# if appropriate.
return asyncio.create_task(self._process_incoming())

async def call_method(self, msg, msg_id=None):
if self._verbose:
print(f'Outgoing: {msg}')
if (msg_id is not None):
fut = asyncio.get_event_loop().create_future()
self._futures[msg_id] = fut
await self._websocket.send(json.dumps(msg))
if (msg_id is not None):
return await fut

async def create_subscription(self, msg, msg_id, callback):
if self._verbose:
print(f'Outgoing: {msg}')
self._callbacks[msg['name']] = callback
await self._websocket.send(json.dumps(msg))

async def _process_incoming(self):
try:
while True:
await self._process_incoming_event()
except Exception as err:
for fut in self._futures.values():
# Propagate the exception to all awaiters to not get stuck.
fut.set_exception(err)
raise err

async def _process_incoming_event(self):
msg = await self._websocket.recv()
if self._verbose:
print(f'Incoming: {msg}')
parsed = json.loads(msg)
if parsed['msg'] == 'result':
msg_id = parsed['id']
if msg_id in self._futures:
self._futures[msg_id].set_result(parsed)
del self._futures[msg_id]
elif parsed['msg'] == 'changed': # Subscription update.
stream_name = parsed['collection']
if stream_name in self._callbacks:
self._callbacks[stream_name](parsed)
elif parsed['msg'] in ['ready', 'connected', 'added', 'updated',
'nosub']:
return # Nothing to do.
elif parsed['msg'] == 'ping':
asyncio.create_task(self.call_method({'msg': 'pong'}))
elif parsed['msg'] == 'error':
raise Exception(msg) # TODO - more specific class.
else:
raise Exception(f'Unknown message: {msg}')
Loading

0 comments on commit 82adb93

Please sign in to comment.