diff --git a/links/gql_websocket_link/CHANGELOG.md b/links/gql_websocket_link/CHANGELOG.md index 7f595895..1b6e9361 100644 --- a/links/gql_websocket_link/CHANGELOG.md +++ b/links/gql_websocket_link/CHANGELOG.md @@ -1,3 +1,7 @@ +## 3.0.0 + +- BREAKING: graphql-transport-ws: streaming operations and single result operations are now multiplexed on a single connection, user has to manually resubscribe subscription when connection is broken + ## 2.0.1 - support uuid 4.0.0 diff --git a/links/gql_websocket_link/README.md b/links/gql_websocket_link/README.md index 0b0d476a..7e75bc99 100644 --- a/links/gql_websocket_link/README.md +++ b/links/gql_websocket_link/README.md @@ -84,7 +84,9 @@ The `WebSocketLink` class has some known issues, see: - https://github.com/gql-dart/gql/issues/430 +#### TransportWebSocketLink (`graphql-transport-ws`) +- Streaming operations and single result operations are now multiplexed on a single connection. This means you have to manually resubscribe on subscriptions. On IOS and Android, when you app is in background (lock screen, etc.), all open sockets will be closed to save battery, and your app is freezed, no code from your app will run. Thus there is no way to reconnect a websocket connection when it is broken because no code from your app will run, you have to manually resubscribe on app resume (use `WidgetsBindingObserver` or related packages). You can use old TransportWebSocketLink client to resubscribe, the underlying socket is newly acquired when you resubscribe. ## Features and bugs diff --git a/links/gql_websocket_link/lib/src/graphql_transport_ws.dart b/links/gql_websocket_link/lib/src/graphql_transport_ws.dart index dab9253c..532fc3d6 100644 --- a/links/gql_websocket_link/lib/src/graphql_transport_ws.dart +++ b/links/gql_websocket_link/lib/src/graphql_transport_ws.dart @@ -693,6 +693,7 @@ class _ConnectionState { // subscriptions might complete while waiting for retry if (locks == 0) { + // print("retry connecting = null"); connecting = null; return denied( LikeCloseEvent(code: 1000, reason: "All Subscriptions Gone"), @@ -736,6 +737,7 @@ class _ConnectionState { (Object event) => emitter.emit(TransportWsEvent.closed(event)); errorOrClosed((errOrEvent) { options.log?.call("errorOrClosed $errOrEvent"); + // print("errorOrClosed $errOrEvent connecting = null"); connecting = null; isOpen = false; connectionAckTimeout?.cancel(); @@ -904,7 +906,6 @@ class _ConnectionState { )); } catch (err) { // stop reading messages as soon as reading breaks once - print("_messageSubs.cancel()"); // ignore: unawaited_futures _messageSubs.cancel(); emitter.emit(TransportWsEvent.error(err)); @@ -944,7 +945,6 @@ class _ConnectionState { Future<_Connection> connect() async { connecting ??= _startConnecting(); final _connection = await connecting!; - options.log?.call("_connection"); final socket = _connection.socket; @@ -1017,6 +1017,7 @@ class _Client extends TransportWsClient { ) { final id = options.generateID(); options.log?.call("subscribe $id"); + // print("subscribe step 1 generate id $id ${state.hashCode}"); bool done = false; bool errored = false; @@ -1028,9 +1029,124 @@ class _Client extends TransportWsClient { (() async { state.locks++; - for (;;) { + + final serializedRequest = options.serializer.serializeRequest(payload); + + final query = serializedRequest["query"] as String; + + if (query.startsWith("subscription")) { + for (;;) { + try { + final _c = await state.connect(); + // print("subscribe step 2 connect $id ${state.hashCode} ${_c.socket.hashCode}"); + final socket = _c.socket; + final release = _c.release; + final waitForReleaseOrThrowOnClose = + _c.waitForReleaseOrThrowOnClose; + final waitForLikeCloseEvent = _c.waitForLikeCloseEvent; + // print("isolate debug name: ${Isolate.current.debugName}"); + // print(payload.operation.toString()); + // print(payload.variables.toString()); + // print(payload.context.toString()); + // print("graphQLSocketMessageEncoder: ${Isolate.current.debugName}"); + // if done while waiting for connect, release the connection lock right away + final _subscribeMsg = await options.graphQLSocketMessageEncoder( + SubscribeMessage(id, serializedRequest), + ); + // print("subscribe step 3 operation $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + // print("after graphQLSocketMessageEncoder: ${Isolate.current.debugName}"); + if (done) { + if (!release.isCompleted) release.complete(); + } + + final unlisten = emitter.onMessage(id, (message) { + if (message is NextMessage) { + sink.add(message.payload); + final completer = state.nextOrErrorMsgWaitMap[id]; + if (completer != null && !completer.isCompleted) { + completer.complete(); + } + state.nextOrErrorMsgWaitMap.remove(id); + // print("subscribe step 5 receive NextMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + } else if (message is ErrorMessage) { + errored = true; + done = true; + sink.addError(message.payload); + final completer = state.nextOrErrorMsgWaitMap[id]; + if (completer != null && !completer.isCompleted) { + completer.complete(); + } + state.nextOrErrorMsgWaitMap.remove(id); + releaser(); + // print("subscribe step 6 receive ErrorMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + } else if (message is CompleteMessage) { + done = true; + releaser(); // release completes the sink + // print("subscribe step 7 receive CompleteMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + } + }); + + socket.sink.add(_subscribeMsg); + + state.nextOrErrorMsgWaitMap[id] = Completer(); + + releaser = () async { + final _completeMsg = await options + .graphQLSocketMessageEncoder(CompleteMessage(id)); + if (!done && state.isOpen) { + // if not completed already and socket is open, send complete message to server on release + socket.sink.add(_completeMsg); + } + state.locks--; + done = true; + if (!release.isCompleted) release.complete(); + // print("subscribe step 8 releaser $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + }; + + // print("subscribe step 4 waitForReleaseOrThrowOnClose $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + // either the releaser will be called, connection completed and + // the promise resolved or the socket closed and the promise rejected. + // whatever happens though, we want to stop listening for messages + final likeCloseEvent = await Future.any( + [waitForReleaseOrThrowOnClose, waitForLikeCloseEvent]) + .whenComplete(() async { + unlisten(); + // print( + // "subscribe step 9 complete waitForReleaseOrThrowOnClose $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + }); + + // workground for dart linux bug: complete error not being caught by try..catch block + if (likeCloseEvent != null) { + // print("subscribe step 10 error likeCloseEvent $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + throw likeCloseEvent; + } + + // print("subscribe step 11 success $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + + return; // completed, shouldnt try again + } catch (errOrCloseEvent) { + if (!state.shouldRetryConnectOrThrow(errOrCloseEvent)) return; + } + // final finish = await processMessage( + // id: id, + // serializedRequest: serializedRequest, + // done: done, + // errored: errored, + // releaser: releaser, + // setDone: (value) => done = value, + // setErrored: (value) => errored = value, + // isSubscription: true, + // sink: sink, + // ); + + // if (finish) { + // return; + // } + } + } else { try { final _c = await state.connect(); + // print("subscribe step 2 connect $id ${state.hashCode} ${_c.socket.hashCode}"); final socket = _c.socket; final release = _c.release; final waitForReleaseOrThrowOnClose = _c.waitForReleaseOrThrowOnClose; @@ -1042,8 +1158,9 @@ class _Client extends TransportWsClient { // print("graphQLSocketMessageEncoder: ${Isolate.current.debugName}"); // if done while waiting for connect, release the connection lock right away final _subscribeMsg = await options.graphQLSocketMessageEncoder( - SubscribeMessage(id, options.serializer.serializeRequest(payload)), + SubscribeMessage(id, serializedRequest), ); + // print("subscribe step 3 operation $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); // print("after graphQLSocketMessageEncoder: ${Isolate.current.debugName}"); if (done) { if (!release.isCompleted) release.complete(); @@ -1051,12 +1168,15 @@ class _Client extends TransportWsClient { final unlisten = emitter.onMessage(id, (message) { if (message is NextMessage) { + done = true; sink.add(message.payload); final completer = state.nextOrErrorMsgWaitMap[id]; if (completer != null && !completer.isCompleted) { completer.complete(); } state.nextOrErrorMsgWaitMap.remove(id); + releaser(); + // print("subscribe step 5 receive NextMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); } else if (message is ErrorMessage) { errored = true; done = true; @@ -1067,9 +1187,7 @@ class _Client extends TransportWsClient { } state.nextOrErrorMsgWaitMap.remove(id); releaser(); - } else if (message is CompleteMessage) { - done = true; - releaser(); // release completes the sink + // print("subscribe step 6 receive ErrorMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); } }); @@ -1087,23 +1205,44 @@ class _Client extends TransportWsClient { state.locks--; done = true; if (!release.isCompleted) release.complete(); + // print("subscribe step 8 releaser $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); }; + // print("subscribe step 4 waitForReleaseOrThrowOnClose $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); // either the releaser will be called, connection completed and // the promise resolved or the socket closed and the promise rejected. // whatever happens though, we want to stop listening for messages - await waitForReleaseOrThrowOnClose.whenComplete(unlisten); + final likeCloseEvent = await Future.any( + [waitForReleaseOrThrowOnClose, waitForLikeCloseEvent]) + .whenComplete(() async { + unlisten(); + // print( + // "subscribe step 9 complete waitForReleaseOrThrowOnClose $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + }); // workground for dart linux bug: complete error not being caught by try..catch block - final likeCloseEvent = await waitForLikeCloseEvent; if (likeCloseEvent != null) { + // print("subscribe step 10 error likeCloseEvent $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); throw likeCloseEvent; } + // print("subscribe step 11 success $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + return; // completed, shouldnt try again } catch (errOrCloseEvent) { if (!state.shouldRetryConnectOrThrow(errOrCloseEvent)) return; } + // await processMessage( + // id: id, + // serializedRequest: serializedRequest, + // done: done, + // errored: errored, + // releaser: releaser, + // setDone: (value) => done = value, + // setErrored: (value) => errored = value, + // isSubscription: false, + // sink: sink, + // ); } })() .then((_) { @@ -1120,6 +1259,129 @@ class _Client extends TransportWsClient { }; } + // TODO: there seems to be a bug either in dart or web_socket_channel that's causing this common function failing test + Future processMessage({ + required String id, + required Map serializedRequest, + required bool done, + required bool errored, + required Function() releaser, + required Function(bool) setDone, + required Function(bool) setErrored, + required bool isSubscription, + required EventSink sink, + }) async { + try { + bool localDone = done; + final _c = await state.connect(); + // print("subscribe step 2 connect $id ${state.hashCode} ${_c.socket.hashCode}"); + final socket = _c.socket; + final release = _c.release; + final waitForReleaseOrThrowOnClose = _c.waitForReleaseOrThrowOnClose; + final waitForLikeCloseEvent = _c.waitForLikeCloseEvent; + // print("isolate debug name: ${Isolate.current.debugName}"); + // print(payload.operation.toString()); + // print(payload.variables.toString()); + // print(payload.context.toString()); + // print("graphQLSocketMessageEncoder: ${Isolate.current.debugName}"); + // if done while waiting for connect, release the connection lock right away + final _subscribeMsg = await options.graphQLSocketMessageEncoder( + SubscribeMessage(id, serializedRequest), + ); + // print("subscribe step 3 operation $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + // print("after graphQLSocketMessageEncoder: ${Isolate.current.debugName}"); + if (localDone) { + if (!release.isCompleted) release.complete(); + } + + final unlisten = emitter.onMessage(id, (message) { + if (message is NextMessage) { + if (!isSubscription) { + setDone(true); + localDone = true; + } + sink.add(message.payload); + final completer = state.nextOrErrorMsgWaitMap[id]; + if (completer != null && !completer.isCompleted) { + completer.complete(); + } + state.nextOrErrorMsgWaitMap.remove(id); + if (!isSubscription) { + releaser(); // release completes the sink + } + // print("subscribe step 5 receive NextMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + } else if (message is ErrorMessage) { + setErrored(true); + setDone(true); + localDone = true; + sink.addError(message.payload); + final completer = state.nextOrErrorMsgWaitMap[id]; + if (completer != null && !completer.isCompleted) { + completer.complete(); + } + state.nextOrErrorMsgWaitMap.remove(id); + releaser(); + // print("subscribe step 6 receive ErrorMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + } else if (message is CompleteMessage) { + if (isSubscription) { + setDone(true); + localDone = true; + releaser(); // release completes the sink + } + // print( + // "subscribe step 7 receive CompleteMessage $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + } + }); + + socket.sink.add(_subscribeMsg); + + state.nextOrErrorMsgWaitMap[id] = Completer(); + + releaser = () async { + final _completeMsg = + await options.graphQLSocketMessageEncoder(CompleteMessage(id)); + if (!localDone && state.isOpen) { + // if not completed already and socket is open, send complete message to server on release + socket.sink.add(_completeMsg); + } + state.locks--; + setDone(true); + localDone = true; + if (!release.isCompleted) release.complete(); + // print( + // "subscribe step 8 releaser $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + }; + + // print( + // "subscribe step 4 waitForReleaseOrThrowOnClose $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + // either the releaser will be called, connection completed and + // the promise resolved or the socket closed and the promise rejected. + // whatever happens though, we want to stop listening for messages + final likeCloseEvent = await Future.any( + [waitForReleaseOrThrowOnClose, waitForLikeCloseEvent]) + .whenComplete(() async { + unlisten(); + // print( + // "subscribe step 9 complete waitForReleaseOrThrowOnClose $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + }); + + // workground for dart linux bug: complete error not being caught by try..catch block + if (likeCloseEvent != null) { + // print( + // "subscribe step 10 error likeCloseEvent $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + throw likeCloseEvent; + } + + // print( + // "subscribe step 11 success $id ${state.hashCode} ${_c.socket.hashCode} ${serializedRequest["operationName"]}"); + + return true; // completed, shouldnt try again + } catch (errOrCloseEvent) { + if (!state.shouldRetryConnectOrThrow(errOrCloseEvent)) return true; + return false; + } + } + @override Future dispose() async { options.log?.call("dispose"); @@ -1319,6 +1581,7 @@ class TransportWebSocketLink extends Link { // TODO pass more data? ); } + return response; }, ); diff --git a/links/gql_websocket_link/pubspec.yaml b/links/gql_websocket_link/pubspec.yaml index bd5ea2de..6a85764a 100644 --- a/links/gql_websocket_link/pubspec.yaml +++ b/links/gql_websocket_link/pubspec.yaml @@ -1,7 +1,8 @@ name: gql_websocket_link -version: 2.0.1 +version: 3.0.0 description: GQL Websocket Link repository: https://github.com/gql-dart/gql +publish_to: none environment: sdk: '>=2.15.0 <4.0.0' dependencies: @@ -12,6 +13,7 @@ dependencies: rxdart: '>=0.26.0 <=0.28.0' uuid: '>=3.0.0 <5.0.0' web_socket_channel: ^3.0.3 + # path: ../../../http/pkgs/web_socket_channel dev_dependencies: gql_pedantic: ^1.0.2 mockito: ^5.0.0 diff --git a/links/gql_websocket_link/test/gql_websocket_link_test.dart b/links/gql_websocket_link/test/gql_websocket_link_test.dart index 5869ae0d..d2b319f2 100644 --- a/links/gql_websocket_link/test/gql_websocket_link_test.dart +++ b/links/gql_websocket_link/test/gql_websocket_link_test.dart @@ -335,7 +335,7 @@ void _testLinks( .map((dynamic s) => json.decode(s as String)) .listen( (dynamic message) { - print("message $message"); + // print("message $message"); if (message["type"] == "connection_init") { channel.sink.add( json.encode( @@ -343,7 +343,7 @@ void _testLinks( ), ); } else if (message["type"] == startMessageType) { - print("enter subscribe"); + // print("enter subscribe"); channel.sink.add( json.encode( { @@ -1553,114 +1553,119 @@ void _testLinks( ), ); - server1 = await HttpServer.bind("localhost", 0); - server1.transform(WebSocketTransformer()).listen( - expectAsync1( - (webSocket) async { - final channel = IOWebSocketChannel(webSocket); - var messageCount = 0; - channel.stream.listen( - expectAsync1( - (dynamic message) { - final map = json.decode(message as String) - as Map?; - if (messageCount == 0) { - expect(map!["type"], MessageTypes.connectionInit); - channel.sink.add( - json.encode( - ConnectionAck(), - ), - ); - } else if (messageCount == 1) { - expect(map!["id"], isA()); - expect(map["type"], startMessageType); - subId = map["id"] as String?; - // disconnect - webSocket.close(websocket_status.goingAway); - } - messageCount++; - }, - count: 2, - reason: - "server1 should only receive 2 messages, init and start", - id: "server1:websocket_messages", - ), - ); - }, - count: 1, - reason: "server 1 should only be connected once", - id: "server1:websocket_connections", - ), - ); + if (isApolloSubProtocol) { + server1 = await HttpServer.bind("localhost", 0); + server1.transform(WebSocketTransformer()).listen( + expectAsync1( + (webSocket) async { + final channel = IOWebSocketChannel(webSocket); + var messageCount = 0; + channel.stream.listen( + expectAsync1( + (dynamic message) { + final map = json.decode(message as String) + as Map?; + if (messageCount == 0) { + expect(map!["type"], MessageTypes.connectionInit); + channel.sink.add( + json.encode( + ConnectionAck(), + ), + ); + } else if (messageCount == 1) { + expect(map!["id"], isA()); + expect(map["type"], startMessageType); + subId = map["id"] as String?; + // disconnect + webSocket.close(websocket_status.goingAway); + } + messageCount++; + }, + count: 2, + reason: + "server1 should only receive 2 messages, init and start", + id: "server1:websocket_messages", + ), + ); + }, + count: 1, + reason: "server 1 should only be connected once", + id: "server1:websocket_connections", + ), + ); - server2 = await HttpServer.bind("localhost", 0); - server2.transform(WebSocketTransformer()).listen( - expectAsync1( - (webSocket) async { - final channel = IOWebSocketChannel(webSocket); - var messageCount = 0; - channel.stream.listen( - expectAsync1( - (dynamic message) { - final map = json.decode(message as String) - as Map?; - if (messageCount == 0) { - expect(map!["type"], MessageTypes.connectionInit); - channel.sink.add( - json.encode( - ConnectionAck(), - ), - ); - } else if (messageCount == 1) { - expect(map!["id"], isA()); - expect(map["type"], startMessageType); - expect(map["id"], subId); - completer.complete(); - } else { - expect(map!["id"], isA()); - expect( - map["type"], - isApolloSubProtocol - ? MessageTypes.stop - : MessageTypes.complete); - expect(map["id"], subId); - - stopReceivedCompleter.complete(); - } - messageCount++; - }, - count: 3, - id: "server2:websocket_messages", - reason: - "server 2 should receive init, subscription and complete/stop msg", - ), - ); - }, - count: 1, - reason: "server 2 should only receive one connection", - ), - ); + server2 = await HttpServer.bind("localhost", 0); + server2.transform(WebSocketTransformer()).listen( + expectAsync1( + (webSocket) async { + final channel = IOWebSocketChannel(webSocket); + var messageCount = 0; + channel.stream.listen( + expectAsync1( + (dynamic message) { + final map = json.decode(message as String) + as Map?; + if (messageCount == 0) { + expect(map!["type"], MessageTypes.connectionInit); + channel.sink.add( + json.encode( + ConnectionAck(), + ), + ); + } else if (messageCount == 1) { + expect(map!["id"], isA()); + expect(map["type"], startMessageType); + expect(map["id"], subId); + completer.complete(); + } else { + expect(map!["id"], isA()); + expect( + map["type"], + isApolloSubProtocol + ? MessageTypes.stop + : MessageTypes.complete); + expect(map["id"], subId); + + stopReceivedCompleter.complete(); + } + messageCount++; + }, + count: 3, + id: "server2:websocket_messages", + reason: + "server 2 should receive init, subscription and complete/stop msg", + ), + ); + }, + count: 1, + reason: "server 2 should only receive one connection", + ), + ); - link = makeLink( - null, - channelGenerator: () async { - if (connectToServer == 1) { - connectToServer++; - final webSocket = - await WebSocket.connect("ws://localhost:${server1.port}"); - return IOWebSocketChannel(webSocket); - } else { - final webSocket = - await WebSocket.connect("ws://localhost:${server2.port}"); - return IOWebSocketChannel(webSocket); - } - }, - reconnectInterval: Duration(milliseconds: 500), - ); - final sub = link.request(request).listen(print, onError: print); - await completer.future; - await sub.cancel(); - await stopReceivedCompleter.future; + link = makeLink( + null, + channelGenerator: () async { + if (connectToServer == 1) { + connectToServer++; + final webSocket = + await WebSocket.connect("ws://localhost:${server1.port}"); + return IOWebSocketChannel(webSocket); + } else { + final webSocket = + await WebSocket.connect("ws://localhost:${server2.port}"); + return IOWebSocketChannel(webSocket); + } + }, + reconnectInterval: Duration(milliseconds: 500), + ); + final sub = link.request(request).listen(print, onError: print); + await completer.future; + await sub.cancel(); + await stopReceivedCompleter.future; + } else { + // only test for transport ws sub-protocol + return; + } }, );