From 04a2a2e57ed94832a71b9bd9fb33a0328da95d8d Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 20 Nov 2023 14:10:26 -0600 Subject: [PATCH 01/15] refactor(WebSocketSubject): modernize the configuration approach. --- .../observable/dom/WebSocketSubject.ts | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts index 5d7dbf9e77..61828e3350 100644 --- a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts @@ -1,5 +1,5 @@ import { Subject, AnonymousSubject } from '../../Subject.js'; -import { Subscriber, Observable, Subscription } from '../../Observable.js'; +import { Subscriber, Observable, Subscription, operate } from '../../Observable.js'; import { ReplaySubject } from '../../ReplaySubject.js'; import { Observer, NextObserver } from '../../types.js'; @@ -165,23 +165,21 @@ export class WebSocketSubject extends AnonymousSubject { this.destination = destination; this._source = urlConfigOrSource as Observable; } else { - const config = (this._config = { ...DEFAULT_WEBSOCKET_CONFIG }); - this._output = new Subject(); - if (typeof urlConfigOrSource === 'string') { - config.url = urlConfigOrSource; - } else { - for (const key in urlConfigOrSource) { - if (urlConfigOrSource.hasOwnProperty(key)) { - (config as any)[key] = (urlConfigOrSource as any)[key]; - } - } - } + const userConfig = typeof urlConfigOrSource === 'string' ? { url: urlConfigOrSource } : urlConfigOrSource; + this._config = { + ...DEFAULT_WEBSOCKET_CONFIG, + // Setting this here because a previous version of this allowed + // WebSocket to be polyfilled later than DEFAULT_WEBSOCKET_CONFIG + // was defined. + WebSocketCtor: WebSocket, + ...userConfig, + }; - if (!config.WebSocketCtor && WebSocket) { - config.WebSocketCtor = WebSocket; - } else if (!config.WebSocketCtor) { + if (!this._config.WebSocketCtor) { throw new Error('no WebSocket constructor can be found'); } + + this._output = new Subject(); this.destination = new ReplaySubject(); } } From 045875bdb90e5c73f0e8d90a3d28177ba5eaa987 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 20 Nov 2023 14:12:23 -0600 Subject: [PATCH 02/15] refactor(WebSocketSubject): remove unnecessary try/catching --- .../observable/dom/WebSocketSubject.ts | 22 ++++++------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts index 61828e3350..8615b9aaec 100644 --- a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts @@ -211,33 +211,25 @@ export class WebSocketSubject extends AnonymousSubject { * from the server for the output stream. */ multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) { - return new Observable((observer: Observer) => { - try { - this.next(subMsg()); - } catch (err) { - observer.error(err); - } + return new Observable((destination) => { + this.next(subMsg()); const subscription = this.subscribe({ next: (x) => { try { if (messageFilter(x)) { - observer.next(x); + destination.next(x); } } catch (err) { - observer.error(err); + destination.error(err); } }, - error: (err) => observer.error(err), - complete: () => observer.complete(), + error: (err) => destination.error(err), + complete: () => destination.complete(), }); return () => { - try { - this.next(unsubMsg()); - } catch (err) { - observer.error(err); - } + this.next(unsubMsg()); subscription.unsubscribe(); }; }); From e5bb4ba06535ac03936846f91a97fe53db03213e Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 20 Nov 2023 14:14:03 -0600 Subject: [PATCH 03/15] refactor(WebSocketSubject): simplify multiplex --- .../observable/dom/WebSocketSubject.ts | 26 +++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts index 8615b9aaec..731b28c266 100644 --- a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts @@ -213,25 +213,19 @@ export class WebSocketSubject extends AnonymousSubject { multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) { return new Observable((destination) => { this.next(subMsg()); - - const subscription = this.subscribe({ - next: (x) => { - try { + destination.add(() => { + this.next(unsubMsg()); + }); + this.subscribe( + operate({ + destination, + next: (x) => { if (messageFilter(x)) { destination.next(x); } - } catch (err) { - destination.error(err); - } - }, - error: (err) => destination.error(err), - complete: () => destination.complete(), - }); - - return () => { - this.next(unsubMsg()); - subscription.unsubscribe(); - }; + }, + }) + ); }); } From 55139efddcfe1d75aef877ca695088dad91e7705 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 20 Nov 2023 14:18:59 -0600 Subject: [PATCH 04/15] refactor(WebSocketSubject): directly inherit Subject --- .../observable/dom/WebSocketSubject.ts | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts index 731b28c266..d74fa9493e 100644 --- a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts @@ -1,4 +1,4 @@ -import { Subject, AnonymousSubject } from '../../Subject.js'; +import { Subject } from '../../Subject.js'; import { Subscriber, Observable, Subscription, operate } from '../../Observable.js'; import { ReplaySubject } from '../../ReplaySubject.js'; import { Observer, NextObserver } from '../../types.js'; @@ -151,7 +151,7 @@ const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT = export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView; -export class WebSocketSubject extends AnonymousSubject { +export class WebSocketSubject extends Subject { private _config!: WebSocketSubjectConfig; /** @internal */ @@ -159,6 +159,10 @@ export class WebSocketSubject extends AnonymousSubject { private _socket: WebSocket | null = null; + private destination: Observer | undefined = undefined; + + private _source: Observable | undefined = undefined; + constructor(urlConfigOrSource: string | WebSocketSubjectConfig | Observable, destination?: Observer) { super(); if (urlConfigOrSource instanceof Observable) { @@ -334,6 +338,18 @@ export class WebSocketSubject extends AnonymousSubject { }; } + next(value: T) { + this.destination?.next?.(value); + } + + error(err: any) { + this.destination?.error?.(err); + } + + complete() { + this.destination?.complete?.(); + } + /** @internal */ protected _subscribe(subscriber: Subscriber): Subscription { const { _source } = this; From a8c98b6cb5b2bee388b597c55582e493ea014106 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 20 Nov 2023 14:25:12 -0600 Subject: [PATCH 05/15] feat(Subject.create): Removed the deprecated `Subject.create` method. BREAKING CHANGE: Removed the deprecated `Subject.create` method. If you need to create an object that is "half Observable, half Observer", you'll need to either bolt `next`, `error`, and `complete` handlers onto an `Observable` and property type the return... or you'll need to create your own class that is backed by an `Observable`. In any case, if the `Observer` and the `Observable` are so unrelated that you have to bolt them together, you're probably better off with those two objects separately. This is why `Subject.create` has been deprecated for so long. --- packages/rxjs/spec/Subject-spec.ts | 126 -------------------------- packages/rxjs/src/internal/Subject.ts | 37 -------- 2 files changed, 163 deletions(-) diff --git a/packages/rxjs/spec/Subject-spec.ts b/packages/rxjs/spec/Subject-spec.ts index 7c6e5b2b5c..38c08869db 100644 --- a/packages/rxjs/spec/Subject-spec.ts +++ b/packages/rxjs/spec/Subject-spec.ts @@ -1,6 +1,5 @@ import { expect } from 'chai'; import { Subject, Observable, AsyncSubject, Observer, of, config, Subscription, Subscriber, noop, operate } from 'rxjs'; -import { AnonymousSubject } from 'rxjs/internal/Subject'; import { delay } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from './helpers/observableMatcher'; @@ -448,104 +447,6 @@ describe('Subject', () => { expect(subject.observed).to.equal(false); }); - it('should have a static create function that works', () => { - expect(Subject.create).to.be.a('function'); - const source = of(1, 2, 3, 4, 5); - const nexts: number[] = []; - const output: any[] = []; - - let error: any; - let complete = false; - let outputComplete = false; - - const destination = { - closed: false, - next: function (x: number) { - nexts.push(x); - }, - error: function (err: any) { - error = err; - this.closed = true; - }, - complete: function () { - complete = true; - this.closed = true; - }, - }; - - const sub: Subject = Subject.create(destination, source); - - sub.subscribe({ - next: function (x: number) { - output.push(x); - }, - complete: () => { - outputComplete = true; - }, - }); - - sub.next('a'); - sub.next('b'); - sub.next('c'); - sub.complete(); - - expect(nexts).to.deep.equal(['a', 'b', 'c']); - expect(complete).to.be.true; - expect(error).to.be.a('undefined'); - - expect(output).to.deep.equal([1, 2, 3, 4, 5]); - expect(outputComplete).to.be.true; - }); - - it('should have a static create function that works also to raise errors', () => { - expect(Subject.create).to.be.a('function'); - const source = of(1, 2, 3, 4, 5); - const nexts: number[] = []; - const output: number[] = []; - - let error: any; - let complete = false; - let outputComplete = false; - - const destination = { - closed: false, - next: function (x: number) { - nexts.push(x); - }, - error: function (err: any) { - error = err; - this.closed = true; - }, - complete: function () { - complete = true; - this.closed = true; - }, - }; - - const sub: Subject = Subject.create(destination, source); - - sub.subscribe({ - next: function (x: number) { - output.push(x); - }, - complete: () => { - outputComplete = true; - }, - }); - - sub.next('a'); - sub.next('b'); - sub.next('c'); - sub.error('boom'); - - expect(nexts).to.deep.equal(['a', 'b', 'c']); - expect(complete).to.be.false; - expect(error).to.equal('boom'); - - expect(output).to.deep.equal([1, 2, 3, 4, 5]); - expect(outputComplete).to.be.true; - }); - it('should be an Observer which can be given to Observable.subscribe', (done) => { const source = of(1, 2, 3, 4, 5); const subject = new Subject(); @@ -781,30 +682,3 @@ describe('Subject', () => { expect(results).to.deep.equal([1, 1, 2, 2, 'complete']); }); }); - -describe('AnonymousSubject', () => { - it('should be exposed', () => { - expect(AnonymousSubject).to.be.a('function'); - }); - - it('should not be eager', () => { - let subscribed = false; - - const subject = Subject.create( - null, - new Observable((observer: Observer) => { - subscribed = true; - const subscription = of('x').subscribe(observer); - return () => { - subscription.unsubscribe(); - }; - }) - ); - - const observable = subject.asObservable(); - expect(subscribed).to.be.false; - - observable.subscribe(); - expect(subscribed).to.be.true; - }); -}); diff --git a/packages/rxjs/src/internal/Subject.ts b/packages/rxjs/src/internal/Subject.ts index 90f153bd39..885981e0bc 100644 --- a/packages/rxjs/src/internal/Subject.ts +++ b/packages/rxjs/src/internal/Subject.ts @@ -40,15 +40,6 @@ export class Subject extends Observable implements SubscriptionLike { /** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */ thrownError: any = null; - /** - * Creates a "subject" by basically gluing an observer to an observable. - * - * @deprecated Recommended you do not use. Will be removed at some point in the future. Plans for replacement still under discussion. - */ - static create: (...args: any[]) => any = (destination: Observer, source: Observable): AnonymousSubject => { - return new AnonymousSubject(destination, source); - }; - constructor() { // NOTE: This must be here to obscure Observable's constructor. super(); @@ -146,31 +137,3 @@ export class Subject extends Observable implements SubscriptionLike { return new Observable((subscriber) => this.subscribe(subscriber)); } } - -export class AnonymousSubject extends Subject { - constructor( - /** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */ - public destination?: Observer, - /** @internal */ - protected _source?: Observable - ) { - super(); - } - - next(value: T) { - this.destination?.next?.(value); - } - - error(err: any) { - this.destination?.error?.(err); - } - - complete() { - this.destination?.complete?.(); - } - - /** @internal */ - protected _subscribe(subscriber: Subscriber): Subscription { - return this._source?.subscribe(subscriber) ?? Subscription.EMPTY; - } -} From 40e47b2b8e1f20159f9e428eeda698f37e2d0404 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 20 Nov 2023 14:32:48 -0600 Subject: [PATCH 06/15] refactor: Removing AnonymousSubject lift remnants --- .../observable/dom/WebSocketSubject.ts | 46 ++++++++----------- 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts index d74fa9493e..6365e6aad7 100644 --- a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts @@ -154,38 +154,32 @@ export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView; export class WebSocketSubject extends Subject { private _config!: WebSocketSubjectConfig; - /** @internal */ - _output!: Subject; + private _output: Subject; private _socket: WebSocket | null = null; - private destination: Observer | undefined = undefined; + private destination: Observer; private _source: Observable | undefined = undefined; - constructor(urlConfigOrSource: string | WebSocketSubjectConfig | Observable, destination?: Observer) { + constructor(urlConfigOrSource: string | WebSocketSubjectConfig) { super(); - if (urlConfigOrSource instanceof Observable) { - this.destination = destination; - this._source = urlConfigOrSource as Observable; - } else { - const userConfig = typeof urlConfigOrSource === 'string' ? { url: urlConfigOrSource } : urlConfigOrSource; - this._config = { - ...DEFAULT_WEBSOCKET_CONFIG, - // Setting this here because a previous version of this allowed - // WebSocket to be polyfilled later than DEFAULT_WEBSOCKET_CONFIG - // was defined. - WebSocketCtor: WebSocket, - ...userConfig, - }; - - if (!this._config.WebSocketCtor) { - throw new Error('no WebSocket constructor can be found'); - } + const userConfig = typeof urlConfigOrSource === 'string' ? { url: urlConfigOrSource } : urlConfigOrSource; + this._config = { + ...DEFAULT_WEBSOCKET_CONFIG, + // Setting this here because a previous version of this allowed + // WebSocket to be polyfilled later than DEFAULT_WEBSOCKET_CONFIG + // was defined. + WebSocketCtor: WebSocket, + ...userConfig, + }; - this._output = new Subject(); - this.destination = new ReplaySubject(); + if (!this._config.WebSocketCtor) { + throw new Error('no WebSocket constructor can be found'); } + + this._output = new Subject(); + this.destination = new ReplaySubject(); } private _resetState() { @@ -339,15 +333,15 @@ export class WebSocketSubject extends Subject { } next(value: T) { - this.destination?.next?.(value); + this.destination.next(value); } error(err: any) { - this.destination?.error?.(err); + this.destination.error(err); } complete() { - this.destination?.complete?.(); + this.destination.complete(); } /** @internal */ From 516735fa24e6c14aa5d517e1ce69b8d3d757a118 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 20 Nov 2023 14:35:18 -0600 Subject: [PATCH 07/15] refactor(WebSocketSubject): rename inbound and outbound subjects to `input` and `output`. --- .../observable/dom/WebSocketSubject.ts | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts index 6365e6aad7..0a259a1be9 100644 --- a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts @@ -158,7 +158,7 @@ export class WebSocketSubject extends Subject { private _socket: WebSocket | null = null; - private destination: Observer; + private _input: Observer; private _source: Observable | undefined = undefined; @@ -179,13 +179,13 @@ export class WebSocketSubject extends Subject { } this._output = new Subject(); - this.destination = new ReplaySubject(); + this._input = new ReplaySubject(); } private _resetState() { this._socket = null; if (!this._source) { - this.destination = new ReplaySubject(); + this._input = new ReplaySubject(); } this._output = new Subject(); } @@ -229,7 +229,7 @@ export class WebSocketSubject extends Subject { private _connectSocket() { const { WebSocketCtor, protocol, url, binaryType } = this._config; - const observer = this._output; + const { _output } = this; let socket: WebSocket | null = null; try { @@ -239,7 +239,7 @@ export class WebSocketSubject extends Subject { this._socket.binaryType = binaryType; } } catch (e) { - observer.error(e); + _output.error(e); return; } @@ -262,16 +262,16 @@ export class WebSocketSubject extends Subject { openObserver.next(evt); } - const queue = this.destination; + const queue = this._input; - this.destination = new Subscriber({ + this._input = new Subscriber({ next: (x: T) => { if (socket!.readyState === 1) { try { const { serializer } = this._config; socket!.send(serializer!(x!)); } catch (e) { - this.destination!.error(e); + this._input!.error(e); } } }, @@ -283,7 +283,7 @@ export class WebSocketSubject extends Subject { if (err && err.code) { socket!.close(err.code, err.reason); } else { - observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT)); + _output.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT)); } this._resetState(); }, @@ -298,13 +298,13 @@ export class WebSocketSubject extends Subject { }); if (queue && queue instanceof ReplaySubject) { - subscription.add((queue as ReplaySubject).subscribe(this.destination)); + subscription.add((queue as ReplaySubject).subscribe(this._input)); } }; socket.onerror = (e: Event) => { this._resetState(); - observer.error(e); + _output.error(e); }; socket.onclose = (e: CloseEvent) => { @@ -316,32 +316,32 @@ export class WebSocketSubject extends Subject { closeObserver.next(e); } if (e.wasClean) { - observer.complete(); + _output.complete(); } else { - observer.error(e); + _output.error(e); } }; socket.onmessage = (e: MessageEvent) => { try { const { deserializer } = this._config; - observer.next(deserializer!(e)); + _output.next(deserializer!(e)); } catch (err) { - observer.error(err); + _output.error(err); } }; } next(value: T) { - this.destination.next(value); + this._input.next(value); } error(err: any) { - this.destination.error(err); + this._input.error(err); } complete() { - this.destination.complete(); + this._input.complete(); } /** @internal */ From 023f4752aec822723fe4c84a8035cb42813e17bb Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 20 Nov 2023 14:43:53 -0600 Subject: [PATCH 08/15] refactor(WebSocketSubject): remove unused subscription, add comments, improve names --- .../observable/dom/WebSocketSubject.ts | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts index 0a259a1be9..8d73c19f18 100644 --- a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts @@ -243,13 +243,6 @@ export class WebSocketSubject extends Subject { return; } - const subscription = new Subscription(() => { - this._socket = null; - if (socket && socket.readyState === 1) { - socket.close(); - } - }); - socket.onopen = (evt: Event) => { const { _socket } = this; if (!_socket) { @@ -262,8 +255,11 @@ export class WebSocketSubject extends Subject { openObserver.next(evt); } - const queue = this._input; + const previousInput = this._input; + // We switch over now to passthrough all messages directly to the + // to the socket, where previously we were queuing them up with + // a ReplaySubject. this._input = new Subscriber({ next: (x: T) => { if (socket!.readyState === 1) { @@ -297,8 +293,11 @@ export class WebSocketSubject extends Subject { }, }); - if (queue && queue instanceof ReplaySubject) { - subscription.add((queue as ReplaySubject).subscribe(this._input)); + // If the _input was a ReplaySubject before, when we + // subscribe right now, it will synchronously emit all + // of the buffered values. + if (previousInput instanceof ReplaySubject) { + previousInput.subscribe(this._input); } }; From 55955931f39e714a2db3d07b8757376674e8feb7 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 20 Nov 2023 14:46:19 -0600 Subject: [PATCH 09/15] refactor(WebSocketSubject): remove _source lift remenant --- .../src/internal/observable/dom/WebSocketSubject.ts | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts index 8d73c19f18..13014584bf 100644 --- a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts @@ -160,8 +160,6 @@ export class WebSocketSubject extends Subject { private _input: Observer; - private _source: Observable | undefined = undefined; - constructor(urlConfigOrSource: string | WebSocketSubjectConfig) { super(); const userConfig = typeof urlConfigOrSource === 'string' ? { url: urlConfigOrSource } : urlConfigOrSource; @@ -184,9 +182,7 @@ export class WebSocketSubject extends Subject { private _resetState() { this._socket = null; - if (!this._source) { - this._input = new ReplaySubject(); - } + this._input = new ReplaySubject(); this._output = new Subject(); } @@ -345,10 +341,6 @@ export class WebSocketSubject extends Subject { /** @internal */ protected _subscribe(subscriber: Subscriber): Subscription { - const { _source } = this; - if (_source) { - return _source.subscribe(subscriber); - } if (!this._socket) { this._connectSocket(); } From a82c348ace63ac66223ff55a939d8d5e2994c5a2 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 20 Nov 2023 14:55:20 -0600 Subject: [PATCH 10/15] refactor(WebSocketSubject): cleanup types and modernize syntax --- .../observable/dom/WebSocketSubject.ts | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts index 13014584bf..fb49efbd98 100644 --- a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts @@ -239,40 +239,35 @@ export class WebSocketSubject extends Subject { return; } - socket.onopen = (evt: Event) => { + socket.onopen = (evt) => { const { _socket } = this; + if (!_socket) { socket!.close(); this._resetState(); return; } - const { openObserver } = this._config; - if (openObserver) { - openObserver.next(evt); - } + + this._config.openObserver?.next(evt); const previousInput = this._input; // We switch over now to passthrough all messages directly to the // to the socket, where previously we were queuing them up with // a ReplaySubject. - this._input = new Subscriber({ - next: (x: T) => { + this._input = new Subscriber({ + next: (x) => { if (socket!.readyState === 1) { try { - const { serializer } = this._config; - socket!.send(serializer!(x!)); + socket!.send(this._config.serializer!(x)); } catch (e) { - this._input!.error(e); + this._input.error(e); } } }, error: (err: any) => { - const { closingObserver } = this._config; - if (closingObserver) { - closingObserver.next(undefined); - } - if (err && err.code) { + this._config.closingObserver?.next(undefined); + if (err?.code) { socket!.close(err.code, err.reason); } else { _output.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT)); @@ -280,10 +275,7 @@ export class WebSocketSubject extends Subject { this._resetState(); }, complete: () => { - const { closingObserver } = this._config; - if (closingObserver) { - closingObserver.next(undefined); - } + this._config.closingObserver?.next(undefined); socket!.close(); this._resetState(); }, @@ -293,6 +285,8 @@ export class WebSocketSubject extends Subject { // subscribe right now, it will synchronously emit all // of the buffered values. if (previousInput instanceof ReplaySubject) { + // Note that since `_input` is a `Subscriber`, this will + // automatically wire up the subscription. previousInput.subscribe(this._input); } }; From c0d00a98dc1577b18569d7817818e0eeb1dd531a Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 20 Nov 2023 15:10:14 -0600 Subject: [PATCH 11/15] refactor(WebSocketSubject): no longer relies on `ReplaySubject`. --- .../observable/dom/WebSocketSubject.ts | 98 ++++++++++--------- 1 file changed, 53 insertions(+), 45 deletions(-) diff --git a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts index fb49efbd98..26e9291f87 100644 --- a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts @@ -1,7 +1,6 @@ import { Subject } from '../../Subject.js'; import { Subscriber, Observable, Subscription, operate } from '../../Observable.js'; -import { ReplaySubject } from '../../ReplaySubject.js'; -import { Observer, NextObserver } from '../../types.js'; +import { NextObserver } from '../../types.js'; /** * WebSocketSubjectConfig is a plain Object that allows us to make our @@ -158,7 +157,13 @@ export class WebSocketSubject extends Subject { private _socket: WebSocket | null = null; - private _input: Observer; + private _inputBuffer: T[] = []; + + private _hasError = false; + + private _error: any; + + private _isComplete = false; constructor(urlConfigOrSource: string | WebSocketSubjectConfig) { super(); @@ -177,13 +182,15 @@ export class WebSocketSubject extends Subject { } this._output = new Subject(); - this._input = new ReplaySubject(); } private _resetState() { this._socket = null; - this._input = new ReplaySubject(); this._output = new Subject(); + this._inputBuffer = []; + this._hasError = false; + this._isComplete = false; + this._error = null; } /** @@ -250,44 +257,14 @@ export class WebSocketSubject extends Subject { this._config.openObserver?.next(evt); - const previousInput = this._input; - - // We switch over now to passthrough all messages directly to the - // to the socket, where previously we were queuing them up with - // a ReplaySubject. - this._input = new Subscriber({ - next: (x) => { - if (socket!.readyState === 1) { - try { - socket!.send(this._config.serializer!(x)); - } catch (e) { - this._input.error(e); - } - } - }, - error: (err: any) => { - this._config.closingObserver?.next(undefined); - if (err?.code) { - socket!.close(err.code, err.reason); - } else { - _output.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT)); - } - this._resetState(); - }, - complete: () => { - this._config.closingObserver?.next(undefined); - socket!.close(); - this._resetState(); - }, - }); + while (this._inputBuffer.length > 0) { + this.next(this._inputBuffer.shift()!); + } - // If the _input was a ReplaySubject before, when we - // subscribe right now, it will synchronously emit all - // of the buffered values. - if (previousInput instanceof ReplaySubject) { - // Note that since `_input` is a `Subscriber`, this will - // automatically wire up the subscription. - previousInput.subscribe(this._input); + if (this._hasError) { + this.error(this._error); + } else if (this._isComplete) { + this.complete(); } }; @@ -322,15 +299,46 @@ export class WebSocketSubject extends Subject { } next(value: T) { - this._input.next(value); + if (this._socket?.readyState !== 1) { + this._inputBuffer.push(value); + } else { + try { + this._socket.send(this._config.serializer!(value)); + } catch (err: any) { + this._config.closingObserver?.next(undefined); + if (err?.code) { + this._socket.close(err.code, err.reason); + } else { + this._output.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT)); + } + this._resetState(); + } + } } error(err: any) { - this._input.error(err); + if (this._socket?.readyState === 1) { + this._config.closingObserver?.next(undefined); + if (err?.code) { + this._socket?.close(err.code, err.reason); + } else { + this._output.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT)); + } + this._resetState(); + } else { + this._hasError = true; + this._error = err; + } } complete() { - this._input.complete(); + if (this._socket?.readyState === 1) { + this._config.closingObserver?.next(undefined); + this._socket.close(); + this._resetState(); + } else { + this._isComplete = true; + } } /** @internal */ From 22ca51550a5e062b34d95e92b4f1bfa650654304 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 20 Nov 2023 15:14:00 -0600 Subject: [PATCH 12/15] feat(WebSocketSubject): no longer extends `Subject`. BREAKING CHANGE: `WebSocketSubject` is no longer `instanceof Subject`. Check for `instanceof WebSocketSubject` instead. --- packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts index 26e9291f87..956e0884fe 100644 --- a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts @@ -150,7 +150,7 @@ const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT = export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView; -export class WebSocketSubject extends Subject { +export class WebSocketSubject extends Observable { private _config!: WebSocketSubjectConfig; private _output: Subject; @@ -365,6 +365,5 @@ export class WebSocketSubject extends Subject { _socket.close(); } this._resetState(); - super.unsubscribe(); } } From 809ef067f27cb6d454c51862bd796d37b709c1cf Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 20 Nov 2023 16:15:41 -0600 Subject: [PATCH 13/15] refactor(WebSocketSubject): end reliance on Subject. --- .../observable/dom/WebSocketSubject.ts | 116 +++++++++++------- 1 file changed, 69 insertions(+), 47 deletions(-) diff --git a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts index 956e0884fe..29630e1c23 100644 --- a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts @@ -1,4 +1,3 @@ -import { Subject } from '../../Subject.js'; import { Subscriber, Observable, Subscription, operate } from '../../Observable.js'; import { NextObserver } from '../../types.js'; @@ -153,8 +152,6 @@ export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView; export class WebSocketSubject extends Observable { private _config!: WebSocketSubjectConfig; - private _output: Subject; - private _socket: WebSocket | null = null; private _inputBuffer: T[] = []; @@ -165,6 +162,14 @@ export class WebSocketSubject extends Observable { private _isComplete = false; + private _subscriberCounter = 0; + + private _subscribers = new Map>(); + + get observed() { + return this._subscribers.size > 0; + } + constructor(urlConfigOrSource: string | WebSocketSubjectConfig) { super(); const userConfig = typeof urlConfigOrSource === 'string' ? { url: urlConfigOrSource } : urlConfigOrSource; @@ -180,13 +185,12 @@ export class WebSocketSubject extends Observable { if (!this._config.WebSocketCtor) { throw new Error('no WebSocket constructor can be found'); } - - this._output = new Subject(); } private _resetState() { this._socket = null; - this._output = new Subject(); + this._subscriberCounter = 0; + this._subscribers.clear(); this._inputBuffer = []; this._hasError = false; this._isComplete = false; @@ -230,9 +234,28 @@ export class WebSocketSubject extends Observable { }); } + #outputNext(value: T) { + for (const subscriber of Array.from(this._subscribers.values())) { + subscriber.next(value); + } + } + + #outputError(err: any) { + const subscribers = Array.from(this._subscribers.values()); + for (const subscriber of subscribers) { + subscriber.error(err); + } + } + + #outputComplete() { + const subscribers = Array.from(this._subscribers.values()); + for (const subscriber of subscribers) { + subscriber.complete(); + } + } + private _connectSocket() { const { WebSocketCtor, protocol, url, binaryType } = this._config; - const { _output } = this; let socket: WebSocket | null = null; try { @@ -241,17 +264,14 @@ export class WebSocketSubject extends Observable { if (binaryType) { this._socket.binaryType = binaryType; } - } catch (e) { - _output.error(e); + } catch (err) { + this.#outputError(err); return; } socket.onopen = (evt) => { - const { _socket } = this; - - if (!_socket) { - socket!.close(); - this._resetState(); + if (socket !== this._socket) { + socket?.close(); return; } @@ -269,31 +289,33 @@ export class WebSocketSubject extends Observable { }; socket.onerror = (e: Event) => { - this._resetState(); - _output.error(e); + if (socket !== this._socket) { + return; + } + + this.#outputError(e); }; socket.onclose = (e: CloseEvent) => { - if (socket === this._socket) { - this._resetState(); - } - const { closeObserver } = this._config; - if (closeObserver) { - closeObserver.next(e); + if (socket !== this._socket) { + return; } + + this._config.closeObserver?.next(e); + if (e.wasClean) { - _output.complete(); + this.#outputComplete(); } else { - _output.error(e); + this.#outputError(e); } }; socket.onmessage = (e: MessageEvent) => { try { const { deserializer } = this._config; - _output.next(deserializer!(e)); + this.#outputNext(deserializer!(e)); } catch (err) { - _output.error(err); + this.#outputError(err); } }; } @@ -305,13 +327,7 @@ export class WebSocketSubject extends Observable { try { this._socket.send(this._config.serializer!(value)); } catch (err: any) { - this._config.closingObserver?.next(undefined); - if (err?.code) { - this._socket.close(err.code, err.reason); - } else { - this._output.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT)); - } - this._resetState(); + this.error(err); } } } @@ -322,7 +338,7 @@ export class WebSocketSubject extends Observable { if (err?.code) { this._socket?.close(err.code, err.reason); } else { - this._output.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT)); + this.#outputError(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT)); } this._resetState(); } else { @@ -333,36 +349,42 @@ export class WebSocketSubject extends Observable { complete() { if (this._socket?.readyState === 1) { - this._config.closingObserver?.next(undefined); - this._socket.close(); - this._resetState(); + this.#closeSocket(); } else { this._isComplete = true; } } + #closeSocket() { + const { _socket } = this; + this._config.closingObserver?.next(undefined); + if (_socket && _socket.readyState <= 1) { + _socket.close(); + } + this._resetState(); + } + /** @internal */ protected _subscribe(subscriber: Subscriber): Subscription { if (!this._socket) { this._connectSocket(); } - this._output.subscribe(subscriber); + const subscriberId = this._subscriberCounter++; + this._subscribers.set(subscriberId, subscriber); subscriber.add(() => { - const { _socket } = this; - if (this._output.observers.length === 0) { - if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) { - _socket.close(); - } - this._resetState(); + this._subscribers.delete(subscriberId); + if (!this.observed) { + this.#closeSocket(); } }); return subscriber; } unsubscribe() { - const { _socket } = this; - if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) { - _socket.close(); + const subscribers = Array.from(this._subscribers.values()); + this._subscribers.clear(); + for (const subscriber of subscribers) { + subscriber.unsubscribe(); } this._resetState(); } From c67a35a203637bb84b27fb55af0eb2ba5211cda8 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 20 Nov 2023 16:35:06 -0600 Subject: [PATCH 14/15] refactor(WebSocketSubject): clean up config a bit --- .../observable/dom/WebSocketSubject.ts | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts index 29630e1c23..c9e827ed2e 100644 --- a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts @@ -138,8 +138,7 @@ export interface WebSocketSubjectConfig { binaryType?: 'blob' | 'arraybuffer'; } -const DEFAULT_WEBSOCKET_CONFIG: WebSocketSubjectConfig = { - url: '', +const DEFAULT_WEBSOCKET_CONFIG = { deserializer: (e: MessageEvent) => JSON.parse(e.data), serializer: (value: any) => JSON.stringify(value), }; @@ -150,7 +149,7 @@ const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT = export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView; export class WebSocketSubject extends Observable { - private _config!: WebSocketSubjectConfig; + private _config: WebSocketSubjectConfig & Required, 'WebSocketCtor' | 'serializer' | 'deserializer'>>; private _socket: WebSocket | null = null; @@ -255,11 +254,11 @@ export class WebSocketSubject extends Observable { } private _connectSocket() { - const { WebSocketCtor, protocol, url, binaryType } = this._config; + const { WebSocketCtor, protocol, url, binaryType, deserializer, openObserver, closeObserver } = this._config; let socket: WebSocket | null = null; try { - socket = protocol ? new WebSocketCtor!(url, protocol) : new WebSocketCtor!(url); + socket = protocol ? new WebSocketCtor(url, protocol) : new WebSocketCtor(url); this._socket = socket; if (binaryType) { this._socket.binaryType = binaryType; @@ -275,7 +274,7 @@ export class WebSocketSubject extends Observable { return; } - this._config.openObserver?.next(evt); + openObserver?.next(evt); while (this._inputBuffer.length > 0) { this.next(this._inputBuffer.shift()!); @@ -301,7 +300,7 @@ export class WebSocketSubject extends Observable { return; } - this._config.closeObserver?.next(e); + closeObserver?.next(e); if (e.wasClean) { this.#outputComplete(); @@ -312,8 +311,7 @@ export class WebSocketSubject extends Observable { socket.onmessage = (e: MessageEvent) => { try { - const { deserializer } = this._config; - this.#outputNext(deserializer!(e)); + this.#outputNext(deserializer(e)); } catch (err) { this.#outputError(err); } @@ -325,7 +323,7 @@ export class WebSocketSubject extends Observable { this._inputBuffer.push(value); } else { try { - this._socket.send(this._config.serializer!(value)); + this._socket.send(this._config.serializer(value)); } catch (err: any) { this.error(err); } From bbe08e29c02f1d8b32208b5971dac8bfae971e6c Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 21 Nov 2023 14:02:51 -0600 Subject: [PATCH 15/15] feat(webSocket): now allows input and output typing to differ --- .../spec/observables/dom/webSocket-spec.ts | 269 +++++++++--------- .../observable/dom/WebSocketSubject.ts | 29 +- .../src/internal/observable/dom/webSocket.ts | 32 ++- 3 files changed, 170 insertions(+), 160 deletions(-) diff --git a/packages/rxjs/spec/observables/dom/webSocket-spec.ts b/packages/rxjs/spec/observables/dom/webSocket-spec.ts index 3aed1eb2f7..2023b6df7d 100644 --- a/packages/rxjs/spec/observables/dom/webSocket-spec.ts +++ b/packages/rxjs/spec/observables/dom/webSocket-spec.ts @@ -3,15 +3,13 @@ import * as sinon from 'sinon'; import { webSocket } from 'rxjs/webSocket'; import { map, retry, take, repeat, takeWhile } from 'rxjs/operators'; -const root: any = (typeof globalThis !== 'undefined' && globalThis) - || (typeof self !== 'undefined' && self) - || global; +const root: any = (typeof globalThis !== 'undefined' && globalThis) || (typeof self !== 'undefined' && self) || global; enum WebSocketState { CONNECTING = 0, OPEN = 1, CLOSING = 2, - CLOSED = 3 + CLOSED = 3, } /** @test {webSocket} */ @@ -43,7 +41,7 @@ describe('webSocket', () => { subject.next('ping'); - subject.subscribe(x => { + subject.subscribe((x) => { expect(x).to.equal('pong'); messageReceived = true; }); @@ -64,10 +62,7 @@ describe('webSocket', () => { const subject = webSocket('ws://mysocket'); const results: any[] = []; - subject.pipe( - map(x => x + '!'), - ) - .subscribe(x => results.push(x)); + subject.pipe(map((x) => x + '!')).subscribe((x) => results.push(x)); MockWebSocket.lastSocket.triggerMessage(JSON.stringify('ngconf 2018 bug')); @@ -79,7 +74,7 @@ describe('webSocket', () => { const results: string[] = []; const subject = webSocket('ws://mysocket'); - subject.subscribe(x => { + subject.subscribe((x) => { results.push(x); }); @@ -87,7 +82,7 @@ describe('webSocket', () => { socket.open(); - expected.forEach(x => { + expected.forEach((x) => { socket.triggerMessage(JSON.stringify(x)); }); @@ -100,7 +95,7 @@ describe('webSocket', () => { const expected = ['make', 'him', 'walk', 'the', 'plank']; const subject = webSocket('ws://mysocket'); - expected.forEach(x => { + expected.forEach((x) => { subject.next(x); }); @@ -226,7 +221,7 @@ describe('webSocket', () => { // Close socket after socket2 has opened socket2.open(); expect(socket2.readyState).to.equal(WebSocketState.OPEN); - socket.triggerClose({wasClean: true}); + socket.triggerClose({ wasClean: true }); expect(socket.readyState).to.equal(WebSocketState.CLOSED); expect(socket2.close).have.not.been.called; @@ -247,7 +242,7 @@ describe('webSocket', () => { sinon.spy(socket, 'close'); expect(socket.close).not.have.been.called; - subject.error({ code: 1337, reason: 'Too bad, so sad :('}); + subject.error({ code: 1337, reason: 'Too bad, so sad :(' }); expect(socket.close).have.been.calledWith(1337, 'Too bad, so sad :('); subject.unsubscribe(); @@ -310,7 +305,6 @@ describe('webSocket', () => { }); describe('with a config object', () => { - beforeEach(() => { setupMockWebSocket(); }); @@ -325,7 +319,7 @@ describe('webSocket', () => { subject.next('ping'); - subject.subscribe(x => { + subject.subscribe((x) => { expect(x).to.equal('pong'); messageReceived = true; }); @@ -345,7 +339,7 @@ describe('webSocket', () => { it('should take a protocol and set it properly on the web socket', () => { const subject = webSocket({ url: 'ws://mysocket', - protocol: 'someprotocol' + protocol: 'someprotocol', }); subject.subscribe(); @@ -359,7 +353,7 @@ describe('webSocket', () => { it('should take a binaryType and set it properly on the web socket', () => { const subject = webSocket({ url: 'ws://mysocket', - binaryType: 'blob' + binaryType: 'blob', }); subject.subscribe(); @@ -377,7 +371,7 @@ describe('webSocket', () => { url: 'ws://mysocket', deserializer: (e: any) => { return e.data + '!'; - } + }, }); subject.subscribe((x: any) => { @@ -400,14 +394,17 @@ describe('webSocket', () => { url: 'ws://mysocket', deserializer: (e: any) => { throw new Error('I am a bad error'); - } + }, }); - subject.subscribe({ next: (x: any) => { - expect(x).to.equal('this should not happen'); - }, error: (err: any) => { - expect(err).to.be.an('error', 'I am a bad error'); - } }); + subject.subscribe({ + next: (x: any) => { + expect(x).to.equal('this should not happen'); + }, + error: (err: any) => { + expect(err).to.be.an('error', 'I am a bad error'); + }, + }); const socket = MockWebSocket.lastSocket; socket.open(); @@ -424,8 +421,8 @@ describe('webSocket', () => { next(x: any) { calls++; expect(x).to.be.an('undefined'); - } - } + }, + }, }); subject.subscribe(); @@ -455,8 +452,8 @@ describe('webSocket', () => { closeObserver: { next(e: any) { closes.push(e); - } - } + }, + }, }); subject.subscribe(); @@ -468,9 +465,11 @@ describe('webSocket', () => { socket.triggerClose(expected[0]); expect(closes.length).to.equal(1); - subject.subscribe({ error: function (err) { - expect(err).to.equal(expected[1]); - } }); + subject.subscribe({ + error: function (err) { + expect(err).to.equal(expected[1]); + }, + }); socket = MockWebSocket.lastSocket; socket.open(); @@ -489,21 +488,23 @@ describe('webSocket', () => { url: 'bad_url', WebSocketCtor: (url: string, protocol?: string | string[]): WebSocket => { throw new Error(`connection refused`); - } + }, }); - subject.subscribe({ next: (x: any) => { - expect(x).to.equal('this should not happen'); - }, error: (err: any) => { - expect(err).to.be.an('error', 'connection refused'); - } }); + subject.subscribe({ + next: (x: any) => { + expect(x).to.equal('this should not happen'); + }, + error: (err: any) => { + expect(err).to.be.an('error', 'connection refused'); + }, + }); subject.unsubscribe(); }); }); describe('multiplex', () => { - beforeEach(() => { setupMockWebSocket(); }); @@ -514,20 +515,22 @@ describe('webSocket', () => { it('should be retryable', () => { const results = [] as string[]; - const subject = webSocket<{ name: string, value: string }>('ws://websocket'); + const subject = webSocket<{ sub: string } | { unsub: string }, { name: string; value: string }>('ws://websocket'); const source = subject.multiplex( () => ({ sub: 'foo' }), () => ({ unsub: 'foo' }), - value => value.name === 'foo' + (value) => value.name === 'foo' ); - source.pipe( - retry(1), - map(x => x.value), - take(2), - ).subscribe(x => { - results.push(x); - }); + source + .pipe( + retry(1), + map((x) => x.value), + take(2) + ) + .subscribe((x) => { + results.push(x); + }); const socket = MockWebSocket.lastSocket; socket.open(); @@ -549,19 +552,20 @@ describe('webSocket', () => { it('should be repeatable', () => { const results = [] as string[]; - const subject = webSocket<{ name: string, value: string }>('ws://websocket'); + const subject = webSocket<{ sub: string } | { unsub: string }, { name: string; value: string }>('ws://websocket'); + const source = subject.multiplex( () => ({ sub: 'foo' }), () => ({ unsub: 'foo' }), - value => value.name === 'foo' + (value) => value.name === 'foo' ); source .pipe( repeat(2), - map(x => x.value) + map((x) => x.value) ) - .subscribe(x => { + .subscribe((x) => { results.push(x); }); @@ -586,12 +590,12 @@ describe('webSocket', () => { }); it('should multiplex over the webSocket', () => { - const results = [] as Array<{ value: number, name: string }>; - const subject = webSocket<{ value: number, name: string }>('ws://websocket'); + const results = [] as Array<{ value: number; name: string }>; + const subject = webSocket<{ sub: string } | { unsub: string }, { value: number; name: string }>('ws://websocket'); const source = subject.multiplex( - () => ({ sub: 'foo'}), + () => ({ sub: 'foo' }), () => ({ unsub: 'foo' }), - value => value.name === 'foo' + (value) => value.name === 'foo' ); const sub = source.subscribe(function (x: any) { @@ -602,14 +606,16 @@ describe('webSocket', () => { expect(socket.lastMessageSent).to.deep.equal(JSON.stringify({ sub: 'foo' })); - [1, 2, 3, 4, 5].map((x: number) => { - return { - name: x % 3 === 0 ? 'bar' : 'foo', - value: x - }; - }).forEach((x: any) => { - socket.triggerMessage(JSON.stringify(x)); - }); + [1, 2, 3, 4, 5] + .map((x: number) => { + return { + name: x % 3 === 0 ? 'bar' : 'foo', + value: x, + }; + }) + .forEach((x: any) => { + socket.triggerMessage(JSON.stringify(x)); + }); expect(results).to.deep.equal([1, 2, 4, 5]); @@ -622,34 +628,34 @@ describe('webSocket', () => { }); it('should keep the same socket for multiple multiplex subscriptions', () => { - const socketSubject = webSocket({url: 'ws://mysocket'}); + const socketSubject = webSocket({ url: 'ws://mysocket' }); const results = [] as string[]; - const socketMessages = [ - {id: 'A'}, - {id: 'B'}, - {id: 'A'}, - {id: 'B'}, - {id: 'B'}, - ]; - - const sub1 = socketSubject.multiplex( - () => 'no-op', - () => results.push('A unsub'), - (req: any) => req.id === 'A' - ).pipe( - takeWhile((req: any) => !req.complete) - ) - .subscribe( - { next: () => results.push('A next'), error: (e) => results.push('A error ' + e), complete: () => results.push('A complete') } - ); + const socketMessages = [{ id: 'A' }, { id: 'B' }, { id: 'A' }, { id: 'B' }, { id: 'B' }]; + + const sub1 = socketSubject + .multiplex( + () => 'no-op', + () => (results.push('A unsub'), 'no-op'), + (req: any) => req.id === 'A' + ) + .pipe(takeWhile((req: any) => !req.complete)) + .subscribe({ + next: () => results.push('A next'), + error: (e) => results.push('A error ' + e), + complete: () => results.push('A complete'), + }); - socketSubject.multiplex( - () => 'no-op', - () => results.push('B unsub'), - (req: any) => req.id === 'B') - .subscribe( - { next: () => results.push('B next'), error: (e) => results.push('B error ' + e), complete: () => results.push('B complete') } - ); + socketSubject + .multiplex( + () => 'no-op', + () => (results.push('B unsub'), 'no-op'), + (req: any) => req.id === 'B' + ) + .subscribe({ + next: () => results.push('B next'), + error: (e) => results.push('B error ' + e), + complete: () => results.push('B complete'), + }); // Setup socket and send messages const socket = MockWebSocket.lastSocket; @@ -663,47 +669,39 @@ describe('webSocket', () => { }); socket.triggerClose({ wasClean: true }); - expect(results).to.deep.equal([ - 'A next', - 'A unsub', - 'B next', - 'B next', - 'B next', - 'B complete', - 'B unsub', - ]); + expect(results).to.deep.equal(['A next', 'A unsub', 'B next', 'B next', 'B next', 'B complete', 'B unsub']); }); it('should not close the socket until all subscriptions complete', () => { - const socketSubject = webSocket<{ id: string, complete: boolean }>({url: 'ws://mysocket'}); + const socketSubject = webSocket({ url: 'ws://mysocket' }); const results = [] as string[]; - const socketMessages = [ - {id: 'A'}, - {id: 'B'}, - {id: 'A', complete: true}, - {id: 'B'}, - {id: 'B', complete: true}, - ]; - - socketSubject.multiplex( - () => 'no-op', - () => results.push('A unsub'), - req => req.id === 'A' - ).pipe( - takeWhile(req => !req.complete) - ).subscribe( - { next: () => results.push('A next'), error: (e) => results.push('A error ' + e), complete: () => results.push('A complete') } - ); + const socketMessages = [{ id: 'A' }, { id: 'B' }, { id: 'A', complete: true }, { id: 'B' }, { id: 'B', complete: true }]; - socketSubject.multiplex( - () => 'no-op', - () => results.push('B unsub'), - req => req.id === 'B' - ).pipe( - takeWhile(req => !req.complete) - ).subscribe( - { next: () => results.push('B next'), error: (e) => results.push('B error ' + e), complete: () => results.push('B complete') } - ); + socketSubject + .multiplex( + () => 'no-op', + () => (results.push('A unsub'), 'no-op'), + (req) => req.id === 'A' + ) + .pipe(takeWhile((req) => !req.complete)) + .subscribe({ + next: () => results.push('A next'), + error: (e) => results.push('A error ' + e), + complete: () => results.push('A complete'), + }); + + socketSubject + .multiplex( + () => 'no-op', + () => (results.push('B unsub'), 'no-op'), + (req) => req.id === 'B' + ) + .pipe(takeWhile((req) => !req.complete)) + .subscribe({ + next: () => results.push('B next'), + error: (e) => results.push('B error ' + e), + complete: () => results.push('B complete'), + }); // Setup socket and send messages const socket = MockWebSocket.lastSocket; @@ -712,15 +710,7 @@ describe('webSocket', () => { socket.triggerMessage(JSON.stringify(msg)); }); - expect(results).to.deep.equal([ - 'A next', - 'B next', - 'A unsub', - 'A complete', - 'B next', - 'B unsub', - 'B complete', - ]); + expect(results).to.deep.equal(['A next', 'B next', 'A unsub', 'A complete', 'B next', 'B unsub', 'B complete']); }); }); @@ -729,12 +719,12 @@ describe('webSocket', () => { let messageReceived = false; const subject = webSocket({ url: 'ws://mysocket', - WebSocketCtor: MockWebSocket + WebSocketCtor: MockWebSocket, }); subject.next('ping'); - subject.subscribe(x => { + subject.subscribe((x) => { expect(x).to.equal('pong'); messageReceived = true; }); @@ -752,16 +742,13 @@ describe('webSocket', () => { }); it('should handle constructor errors if no WebSocketCtor', () => { - expect(() => { const subject = webSocket({ - url: 'ws://mysocket' + url: 'ws://mysocket', }); }).to.throw('no WebSocket constructor can be found'); - }); }); - }); class MockWebSocket { diff --git a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts index c9e827ed2e..187c04c63b 100644 --- a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts @@ -98,23 +98,23 @@ import { NextObserver } from '../../types.js'; * // Connection ok * ``` */ -export interface WebSocketSubjectConfig { +export interface WebSocketSubjectConfig { /** The url of the socket server to connect to */ url: string; /** The protocol to use to connect */ protocol?: string | Array; /** @deprecated Will be removed in v8. Use {@link deserializer} instead. */ - resultSelector?: (e: MessageEvent) => T; + resultSelector?: (e: MessageEvent) => Out; /** * A serializer used to create messages from passed values before the * messages are sent to the server. Defaults to JSON.stringify. */ - serializer?: (value: T) => WebSocketMessage; + serializer?: (value: In) => WebSocketMessage; /** * A deserializer used for messages arriving on the socket from the * server. Defaults to JSON.parse. */ - deserializer?: (e: MessageEvent) => T; + deserializer?: (e: MessageEvent) => Out; /** * An Observer that watches when open events occur on the underlying web socket. */ @@ -148,12 +148,13 @@ const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT = export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView; -export class WebSocketSubject extends Observable { - private _config: WebSocketSubjectConfig & Required, 'WebSocketCtor' | 'serializer' | 'deserializer'>>; +export class WebSocketSubject extends Observable { + private _config: WebSocketSubjectConfig & + Required, 'WebSocketCtor' | 'serializer' | 'deserializer'>>; private _socket: WebSocket | null = null; - private _inputBuffer: T[] = []; + private _inputBuffer: In[] = []; private _hasError = false; @@ -163,13 +164,13 @@ export class WebSocketSubject extends Observable { private _subscriberCounter = 0; - private _subscribers = new Map>(); + private _subscribers = new Map>(); get observed() { return this._subscribers.size > 0; } - constructor(urlConfigOrSource: string | WebSocketSubjectConfig) { + constructor(urlConfigOrSource: string | WebSocketSubjectConfig) { super(); const userConfig = typeof urlConfigOrSource === 'string' ? { url: urlConfigOrSource } : urlConfigOrSource; this._config = { @@ -214,8 +215,8 @@ export class WebSocketSubject extends Observable { * @param messageFilter A predicate for selecting the appropriate messages * from the server for the output stream. */ - multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) { - return new Observable((destination) => { + multiplex(subMsg: () => In, unsubMsg: () => In, messageFilter: (value: Out) => boolean) { + return new Observable((destination) => { this.next(subMsg()); destination.add(() => { this.next(unsubMsg()); @@ -233,7 +234,7 @@ export class WebSocketSubject extends Observable { }); } - #outputNext(value: T) { + #outputNext(value: Out) { for (const subscriber of Array.from(this._subscribers.values())) { subscriber.next(value); } @@ -318,7 +319,7 @@ export class WebSocketSubject extends Observable { }; } - next(value: T) { + next(value: In) { if (this._socket?.readyState !== 1) { this._inputBuffer.push(value); } else { @@ -363,7 +364,7 @@ export class WebSocketSubject extends Observable { } /** @internal */ - protected _subscribe(subscriber: Subscriber): Subscription { + protected _subscribe(subscriber: Subscriber): Subscription { if (!this._socket) { this._connectSocket(); } diff --git a/packages/rxjs/src/internal/observable/dom/webSocket.ts b/packages/rxjs/src/internal/observable/dom/webSocket.ts index b9631628f2..02bd79fedb 100644 --- a/packages/rxjs/src/internal/observable/dom/webSocket.ts +++ b/packages/rxjs/src/internal/observable/dom/webSocket.ts @@ -89,7 +89,7 @@ import { WebSocketSubject, WebSocketSubjectConfig } from './WebSocketSubject.js' * ```ts * import { webSocket } from 'rxjs/webSocket'; * - * const subject = webSocket('ws://localhost:8081'); + * const subject = webSocket('ws://localhost:8081'); * * subject.subscribe({ * next: msg => console.log('message received: ' + msg), // Called whenever there is a message from the server. @@ -103,7 +103,15 @@ import { WebSocketSubject, WebSocketSubjectConfig } from './WebSocketSubject.js' * ```ts * import { webSocket } from 'rxjs/webSocket'; * - * const subject = webSocket('ws://localhost:8081'); + * interface SendMsg { + * message: string; + * } + * + * interface RespMsg { + * data: any; + * } + * + * const subject = webSocket('ws://localhost:8081'); * * subject.subscribe(); * // Note that at least one consumer has to subscribe to the created subject - otherwise "nexted" values will be just buffered and not sent, @@ -123,7 +131,21 @@ import { WebSocketSubject, WebSocketSubjectConfig } from './WebSocketSubject.js' * ```ts * import { webSocket } from 'rxjs/webSocket'; * - * const subject = webSocket('ws://localhost:8081'); + * interface SubMsg { + * subscribe: string; + * } + * + * interface UnsubMsg { + * unsubscribe: string; + * } + * + * interface RespMsg { + * type: string; + * data: any; + * } + * + * + * const subject = webSocket('ws://localhost:8081'); * * const observableA = subject.multiplex( * () => ({ subscribe: 'A' }), // When server gets this message, it will start sending messages for 'A'... @@ -156,6 +178,6 @@ import { WebSocketSubject, WebSocketSubjectConfig } from './WebSocketSubject.js' * @param urlConfigOrSource The WebSocket endpoint as an url or an object with configuration and additional Observers. * @return Subject which allows to both send and receive messages via WebSocket connection. */ -export function webSocket(urlConfigOrSource: string | WebSocketSubjectConfig): WebSocketSubject { - return new WebSocketSubject(urlConfigOrSource); +export function webSocket(urlConfigOrSource: string | WebSocketSubjectConfig): WebSocketSubject { + return new WebSocketSubject(urlConfigOrSource); }