Skip to content

Commit e1ec617

Browse files
committed
Add initial (untested) implementation
1 parent 8282074 commit e1ec617

File tree

9 files changed

+1376
-0
lines changed

9 files changed

+1376
-0
lines changed

.metadata

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# This file tracks properties of this Flutter project.
2+
# Used by Flutter tool to assess capabilities and perform upgrades etc.
3+
#
4+
# This file should be version controlled and should not be manually edited.
5+
6+
version:
7+
revision: 27321ebbad34b0a3fafe99fac037102196d655ff
8+
channel: stable
9+
10+
project_type: package

lib/phoenix_socket.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
library phoenix_socket;
2+

lib/src/channel.dart

Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
import 'dart:async';
2+
3+
import 'package:quiver/collection.dart';
4+
5+
import 'message.dart';
6+
import 'push.dart';
7+
import 'socket.dart';
8+
9+
class PhoenixChannelEvents {
10+
static String close = "phx_close";
11+
static String error = "phx_error";
12+
static String join = "phx_join";
13+
static String reply = "phx_reply";
14+
static String leave = "phx_leave";
15+
16+
static Set<String> statuses = {
17+
close,
18+
error,
19+
join,
20+
reply,
21+
leave,
22+
};
23+
}
24+
25+
enum PhoenixChannelState {
26+
closed,
27+
errored,
28+
joined,
29+
joining,
30+
leaving,
31+
}
32+
33+
class PhoenixChannel {
34+
Map<String, String> parameters;
35+
PhoenixSocket _socket;
36+
StreamController<Message> _controller;
37+
PhoenixChannelState _state = PhoenixChannelState.closed;
38+
Timer _rejoinTimer;
39+
bool _joinedOnce = false;
40+
ListMultimap<String, Completer<Message>> _waiters;
41+
List<StreamSubscription> _subscriptions;
42+
String _reference;
43+
Push _joinPush;
44+
45+
String topic;
46+
Duration timeout;
47+
List<Push> pushBuffer = List();
48+
49+
PhoenixChannel.fromSocket(
50+
this._socket, {
51+
this.topic,
52+
Map parameters,
53+
Duration timeout,
54+
}) : this.parameters = parameters ?? {},
55+
this._controller = StreamController.broadcast(),
56+
this._waiters = ListMultimap(),
57+
this.timeout = timeout ?? _socket.defaultTimeout {
58+
_joinPush = _prepareJoin();
59+
_subscriptions.add(messages.listen(_onMessage));
60+
_subscriptions.addAll(_subscribeToSocketStreams(this._socket));
61+
}
62+
63+
bool get isClosed => _state == PhoenixChannelState.closed;
64+
bool get isErrored => _state == PhoenixChannelState.errored;
65+
bool get isJoined => _state == PhoenixChannelState.joined;
66+
bool get isJoining => _state == PhoenixChannelState.joining;
67+
bool get isLeaving => _state == PhoenixChannelState.leaving;
68+
69+
bool get canPush => socket.isConnected && isJoined;
70+
71+
String get joinRef => _joinPush.ref;
72+
PhoenixSocket get socket => _socket;
73+
PhoenixChannelState get state => _state;
74+
Stream<Message> get messages => _controller.stream;
75+
76+
String get reference {
77+
_reference ??= _socket.makeRef();
78+
return _reference;
79+
}
80+
81+
Future<Message> onPushReply(replyRef) {
82+
Completer completer = Completer();
83+
_waiters[replyRef].add(completer);
84+
return completer.future;
85+
}
86+
87+
void removeWaiters(replyRef) {
88+
_waiters.removeAll(replyRef);
89+
}
90+
91+
dispose() {
92+
pushBuffer.forEach((push) => push.cancelTimeout());
93+
_joinPush?.cancelTimeout();
94+
95+
_subscriptions.forEach((sub) => sub.cancel());
96+
_controller.close();
97+
_waiters.clear();
98+
}
99+
100+
void trigger(Message message) => _controller.add(message);
101+
102+
void triggerError() {
103+
if (!(isErrored || isLeaving || isClosed)) {
104+
trigger(Message(event: PhoenixChannelEvents.error));
105+
_waiters.forEach((k, waiter) =>
106+
waiter.completeError(Message(event: PhoenixChannelEvents.error)));
107+
_waiters.clear();
108+
}
109+
}
110+
111+
Push leave([Duration timeout]) {
112+
_joinPush?.cancelTimeout();
113+
_rejoinTimer?.cancel();
114+
115+
_state = PhoenixChannelState.leaving;
116+
var leavePush = Push(
117+
this,
118+
event: PhoenixChannelEvents.leave,
119+
payload: () => {},
120+
timeout: timeout,
121+
);
122+
123+
leavePush
124+
..onReply('ok').then(_onClose)
125+
..onReply('timeout').then(_onClose)
126+
..send();
127+
128+
if (!socket.isConnected || !isJoined) {
129+
leavePush.trigger(PushResponse(status: 'ok'));
130+
}
131+
132+
return leavePush;
133+
}
134+
135+
Push join([Duration newTimeout]) {
136+
assert(!_joinedOnce);
137+
138+
if (newTimeout is Duration) {
139+
timeout = newTimeout;
140+
}
141+
142+
_joinedOnce = true;
143+
_attemptJoin();
144+
145+
return _joinPush;
146+
}
147+
148+
Push push(String event, dynamic payload, [Duration newTimeout]) {
149+
assert(_joinedOnce);
150+
151+
final pushEvent = Push(
152+
this,
153+
event: event,
154+
payload: () => payload,
155+
timeout: newTimeout ?? timeout,
156+
);
157+
158+
if (canPush) {
159+
pushEvent.send();
160+
} else {
161+
pushEvent.startTimeout();
162+
pushBuffer.add(pushEvent);
163+
}
164+
165+
return pushEvent;
166+
}
167+
168+
List<StreamSubscription> _subscribeToSocketStreams(PhoenixSocket socket) {
169+
return [
170+
socket.messageStream.where(_isMember).listen(_controller.add),
171+
socket.errorStream.listen((error) => _rejoinTimer?.cancel()),
172+
socket.openStream.listen((event) {
173+
_rejoinTimer?.cancel();
174+
if (isErrored) {
175+
_attemptJoin();
176+
}
177+
})
178+
];
179+
}
180+
181+
Push _prepareJoin([Duration providedTimeout]) {
182+
var push = Push(
183+
this,
184+
event: PhoenixChannelEvents.join,
185+
payload: () => this.parameters,
186+
timeout: providedTimeout ?? timeout,
187+
);
188+
push
189+
..onReply('ok').then((PushResponse response) {
190+
_state = PhoenixChannelState.joined;
191+
_rejoinTimer?.cancel();
192+
pushBuffer.forEach((push) => push.send());
193+
pushBuffer.clear();
194+
})
195+
..onReply('error').then((PushResponse response) {
196+
_state = PhoenixChannelState.errored;
197+
if (socket.isConnected) {
198+
_startRejoinTimer();
199+
}
200+
})
201+
..onReply('timeout').then((PushResponse response) {
202+
var leavePush = Push(
203+
this,
204+
event: PhoenixChannelEvents.leave,
205+
payload: () => {},
206+
timeout: timeout,
207+
);
208+
leavePush.send();
209+
_state = PhoenixChannelState.errored;
210+
_joinPush.reset();
211+
if (socket.isConnected) {
212+
_startRejoinTimer();
213+
}
214+
});
215+
216+
return push;
217+
}
218+
219+
void _startRejoinTimer() {
220+
_rejoinTimer?.cancel();
221+
_rejoinTimer = Timer(timeout, () {
222+
if (socket.isConnected) _attemptJoin();
223+
});
224+
}
225+
226+
void _attemptJoin() {
227+
if (!isLeaving) {
228+
_state = PhoenixChannelState.joining;
229+
_joinPush.resend(timeout);
230+
}
231+
}
232+
233+
bool _isMember(Message message) {
234+
if (topic != message.topic) return false;
235+
236+
if (message.joinRef != null &&
237+
message.joinRef != _joinPush.ref &&
238+
PhoenixChannelEvents.statuses.contains(message.event)) {
239+
return false;
240+
}
241+
return true;
242+
}
243+
244+
void _onMessage(Message message) {
245+
if (message.event == PhoenixChannelEvents.close) {
246+
_rejoinTimer?.cancel();
247+
_state = PhoenixChannelState.closed;
248+
this.socket.removeChannel(this);
249+
} else if (message.event == PhoenixChannelEvents.error) {
250+
if (isJoining) {
251+
_joinPush.reset();
252+
}
253+
_state = PhoenixChannelState.errored;
254+
if (socket.isConnected) {
255+
_rejoinTimer?.cancel();
256+
_startRejoinTimer();
257+
}
258+
} else if (message.event == PhoenixChannelEvents.reply) {
259+
_controller.add(message.asReplyEvent());
260+
}
261+
if (_waiters.containsKey(message.event)) {
262+
_waiters[message.event]
263+
.forEach((completer) => completer.complete(message));
264+
removeWaiters(message.event);
265+
}
266+
}
267+
268+
void _onClose(PushResponse response) {
269+
trigger(Message(
270+
event: PhoenixChannelEvents.close,
271+
payload: "leave",
272+
));
273+
}
274+
}

lib/src/message.dart

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import 'dart:convert';
2+
3+
import 'package:equatable/equatable.dart';
4+
5+
import 'push.dart';
6+
7+
class Message implements Equatable {
8+
final String joinRef;
9+
final String ref;
10+
final String topic;
11+
final String event;
12+
final dynamic payload;
13+
14+
factory Message.fromJson(List<dynamic> parts) {
15+
return Message(
16+
joinRef: parts[0],
17+
ref: parts[1],
18+
topic: parts[2],
19+
event: parts[3],
20+
payload: parts[4],
21+
);
22+
}
23+
24+
factory Message.heartbeat(String ref) {
25+
return Message(topic: "phoenix", event: "heartbeat", payload: {});
26+
}
27+
28+
Message({
29+
this.joinRef,
30+
this.ref,
31+
this.topic,
32+
this.event,
33+
this.payload,
34+
});
35+
36+
String encode() {
37+
return jsonEncode([
38+
joinRef,
39+
ref,
40+
topic,
41+
event,
42+
payload,
43+
]);
44+
}
45+
46+
@override
47+
List<Object> get props => [joinRef, ref, topic, event, payload];
48+
49+
Message asReplyEvent() {
50+
return Message(
51+
ref: ref,
52+
payload: payload,
53+
event: Push.replyEventName(ref),
54+
topic: topic,
55+
joinRef: joinRef,
56+
);
57+
}
58+
}
59+
60+
class MessageSerializer {
61+
static Message decode(rawData) {
62+
return Message.fromJson(jsonDecode(rawData));
63+
}
64+
65+
String encode(Message message) {
66+
return message.encode();
67+
}
68+
}

0 commit comments

Comments
 (0)