Skip to content

Commit 611558b

Browse files
committed
backend: add support for subscriptions
Referencing implementation at https://github.com/graphql-python/graphql-ws/tree/master/examples/flask_gevent and graphql-python/graphql-ws#11
1 parent af5ba5e commit 611558b

File tree

5 files changed

+383
-7
lines changed

5 files changed

+383
-7
lines changed

backend/app.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
import time
22
from flask import Flask
33
from flask_graphql import GraphQLView
4+
from flask_sockets import Sockets
5+
from graphql_ws.gevent import GeventSubscriptionServer
46

57
from models import db_session
68
from schema import schema
79

810
app = Flask(__name__)
911
app.debug = True
12+
sockets = Sockets(app)
1013

1114
app.add_url_rule(
1215
'/graphql',
@@ -17,9 +20,20 @@
1720
)
1821
)
1922

23+
subscription_server = GeventSubscriptionServer(schema)
24+
app.app_protocol = lambda environ_path_info: 'graphql-ws'
25+
26+
@sockets.route('/subscriptions')
27+
def echo_socket(ws):
28+
subscription_server.handle(ws)
29+
return []
30+
2031
@app.teardown_appcontext
2132
def shutdown_session(exception=None):
2233
db_session.remove()
2334

2435
if __name__ == '__main__':
25-
app.run()
36+
from gevent import pywsgi
37+
from geventwebsocket.handler import WebSocketHandler
38+
server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
39+
server.serve_forever()

backend/pubsub.py

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# Code taken from https://github.com/hballard/graphql-ws/tree/pubsub under MIT license.
2+
# Redis version is also available from the above branch.
3+
4+
from rx.subjects import Subject
5+
from rx import config
6+
7+
8+
class SubjectObserversWrapper(object):
9+
def __init__(self, pubsub, channel):
10+
self.pubsub = pubsub
11+
self.channel = channel
12+
self.observers = []
13+
14+
self.lock = config["concurrency"].RLock()
15+
16+
def __getitem__(self, key):
17+
return self.observers[key]
18+
19+
def __getattr__(self, attr):
20+
return getattr(self.observers, attr)
21+
22+
def remove(self, observer):
23+
with self.lock:
24+
self.observers.remove(observer)
25+
if not self.observers:
26+
self.pubsub.unsubscribe(self.channel)
27+
28+
29+
class GeventRxPubsub(object):
30+
31+
def __init__(self):
32+
self.subscriptions = {}
33+
34+
def publish(self, channel, payload):
35+
if channel in self.subscriptions:
36+
self.subscriptions[channel].on_next(payload)
37+
38+
def subscribe_to_channel(self, channel):
39+
if channel in self.subscriptions:
40+
return self.subscriptions[channel]
41+
else:
42+
subject = Subject()
43+
# monkeypatch Subject to unsubscribe pubsub on observable
44+
# subscription.dispose()
45+
subject.observers = SubjectObserversWrapper(self, channel)
46+
self.subscriptions[channel] = subject
47+
return subject
48+
49+
def unsubscribe(self, channel):
50+
if channel in self.subscriptions:
51+
del self.subscriptions[channel]

backend/relay_helper.py

+156
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
# Add-ons for graphene-relay for subscriptions
2+
3+
import re
4+
5+
from graphene.utils.get_unbound_function import get_unbound_function
6+
from graphene.utils.props import props
7+
from graphene.types.field import Field
8+
from graphene.types.objecttype import ObjectType, ObjectTypeOptions
9+
from graphene.types.utils import yank_fields_from_attrs
10+
from graphene.types.interface import Interface
11+
12+
from graphene.types import Field, InputObjectType, String
13+
14+
15+
class SubscriptionOptions(ObjectTypeOptions):
16+
arguments = None # type: Dict[str, Argument]
17+
output = None # type: Type[ObjectType]
18+
resolver = None # type: Callable
19+
interfaces = () # type: Iterable[Type[Interface]]
20+
21+
22+
class Subscription(ObjectType):
23+
24+
@classmethod
25+
def __init_subclass_with_meta__(
26+
cls,
27+
interfaces=(),
28+
resolver=None,
29+
output=None,
30+
arguments=None,
31+
_meta=None,
32+
**options,
33+
):
34+
if not _meta:
35+
_meta = SubscriptionOptions(cls)
36+
37+
output = output or getattr(cls, "Output", None)
38+
fields = {}
39+
40+
for interface in interfaces:
41+
assert issubclass(
42+
interface, Interface
43+
), f'All interfaces of {cls.__name__} must be a subclass of Interface. Received "{interface}".'
44+
fields.update(interface._meta.fields)
45+
46+
if not output:
47+
# If output is defined, we don't need to get the fields
48+
fields = {}
49+
for base in reversed(cls.__mro__):
50+
fields.update(yank_fields_from_attrs(base.__dict__, _as=Field))
51+
output = cls
52+
53+
if not arguments:
54+
input_class = getattr(cls, "Arguments", None)
55+
if not input_class:
56+
input_class = getattr(cls, "Input", None)
57+
58+
if input_class:
59+
arguments = props(input_class)
60+
else:
61+
arguments = {}
62+
63+
if not resolver:
64+
subscribe = getattr(cls, "subscribe", None)
65+
assert subscribe, "All subscriptions must define a subscribe method in it"
66+
resolver = get_unbound_function(subscribe)
67+
68+
if _meta.fields:
69+
_meta.fields.update(fields)
70+
else:
71+
_meta.fields = fields
72+
73+
_meta.interfaces = interfaces
74+
_meta.output = output
75+
_meta.resolver = resolver
76+
_meta.arguments = arguments
77+
78+
super(Subscription, cls).__init_subclass_with_meta__(_meta=_meta, **options)
79+
80+
@classmethod
81+
def Field(
82+
cls, name=None, description=None, deprecation_reason=None, required=False
83+
):
84+
""" Mount instance of subscription Field. """
85+
return Field(
86+
cls._meta.output,
87+
args=cls._meta.arguments,
88+
resolver=cls._meta.resolver,
89+
name=name,
90+
description=description or cls._meta.description,
91+
deprecation_reason=deprecation_reason,
92+
required=required,
93+
)
94+
95+
96+
class ClientIDSubscription(Subscription):
97+
class Meta:
98+
abstract = True
99+
100+
@classmethod
101+
def __init_subclass_with_meta__(
102+
cls, output=None, input_fields=None, arguments=None, name=None, **options
103+
):
104+
input_class = getattr(cls, "Input", None)
105+
base_name = re.sub("Payload$", "", name or cls.__name__)
106+
107+
assert not output, "Can't specify any output"
108+
assert not arguments, "Can't specify any arguments"
109+
110+
bases = (InputObjectType,)
111+
if input_class:
112+
bases += (input_class,)
113+
114+
if not input_fields:
115+
input_fields = {}
116+
117+
cls.Input = type(
118+
f"{base_name}Input",
119+
bases,
120+
dict(input_fields, client_subscription_id=String(name="clientSubscriptionId")),
121+
)
122+
123+
arguments = dict(
124+
input=cls.Input(required=True)
125+
# 'client_subscription_id': String(name='clientSubscriptionId')
126+
)
127+
subscribe_and_get_payload = getattr(cls, "subscribe_and_get_payload", None)
128+
if cls.subscribe and cls.subscribe.__func__ == ClientIDSubscription.subscribe.__func__:
129+
assert subscribe_and_get_payload, (
130+
f"{name or cls.__name__}.subscribe_and_get_payload method is required"
131+
" in a ClientIDSubscription."
132+
)
133+
134+
if not name:
135+
name = f"{base_name}Payload"
136+
137+
super(ClientIDSubscription, cls).__init_subclass_with_meta__(
138+
output=None, arguments=arguments, name=name, **options
139+
)
140+
cls._meta.fields["client_subscription_id"] = Field(String, name="clientSubscriptionId")
141+
142+
@classmethod
143+
def subscribe(cls, root, info, input):
144+
def on_resolve(payload):
145+
def set_client_subscription_id(item):
146+
try:
147+
item.client_subscription_id = input.get("client_subscription_id")
148+
except Exception:
149+
raise Exception(
150+
f"Cannot set client_subscription_id in the payload object {repr(payload)}"
151+
)
152+
return item
153+
return payload.map(set_client_subscription_id)
154+
155+
result = cls.subscribe_and_get_payload(root, info, **input)
156+
return on_resolve(result)

backend/requirements.txt

+7
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,16 @@ attrs==19.3.0
33
click==7.1.2
44
Flask==1.1.2
55
Flask-GraphQL==2.0.1
6+
Flask-Sockets==0.2.1
7+
gevent==20.9.0
8+
gevent-websocket==0.10.1
69
graphene==2.1.8
710
graphene-sqlalchemy==2.3.0
811
graphql-core==2.3.2
912
graphql-relay==2.0.1
1013
graphql-server-core==1.2.0
14+
graphql-ws==0.3.1
15+
greenlet==0.4.17
1116
importlib-metadata==1.7.0
1217
install==1.3.3
1318
itsdangerous==1.1.0
@@ -28,3 +33,5 @@ SQLAlchemy==1.3.18
2833
wcwidth==0.2.5
2934
Werkzeug==1.0.1
3035
zipp==3.1.0
36+
zope.event==4.5.0
37+
zope.interface==5.1.2

0 commit comments

Comments
 (0)