diff --git a/packages/rxjs/spec/Subject-spec.ts b/packages/rxjs/spec/Subject-spec.ts index 7c6e5b2b5c..38c08869db 100644 --- a/packages/rxjs/spec/Subject-spec.ts +++ b/packages/rxjs/spec/Subject-spec.ts @@ -1,6 +1,5 @@ import { expect } from 'chai'; import { Subject, Observable, AsyncSubject, Observer, of, config, Subscription, Subscriber, noop, operate } from 'rxjs'; -import { AnonymousSubject } from 'rxjs/internal/Subject'; import { delay } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from './helpers/observableMatcher'; @@ -448,104 +447,6 @@ describe('Subject', () => { expect(subject.observed).to.equal(false); }); - it('should have a static create function that works', () => { - expect(Subject.create).to.be.a('function'); - const source = of(1, 2, 3, 4, 5); - const nexts: number[] = []; - const output: any[] = []; - - let error: any; - let complete = false; - let outputComplete = false; - - const destination = { - closed: false, - next: function (x: number) { - nexts.push(x); - }, - error: function (err: any) { - error = err; - this.closed = true; - }, - complete: function () { - complete = true; - this.closed = true; - }, - }; - - const sub: Subject = Subject.create(destination, source); - - sub.subscribe({ - next: function (x: number) { - output.push(x); - }, - complete: () => { - outputComplete = true; - }, - }); - - sub.next('a'); - sub.next('b'); - sub.next('c'); - sub.complete(); - - expect(nexts).to.deep.equal(['a', 'b', 'c']); - expect(complete).to.be.true; - expect(error).to.be.a('undefined'); - - expect(output).to.deep.equal([1, 2, 3, 4, 5]); - expect(outputComplete).to.be.true; - }); - - it('should have a static create function that works also to raise errors', () => { - expect(Subject.create).to.be.a('function'); - const source = of(1, 2, 3, 4, 5); - const nexts: number[] = []; - const output: number[] = []; - - let error: any; - let complete = false; - let outputComplete = false; - - const destination = { - closed: false, - next: function (x: number) { - nexts.push(x); - }, - error: function (err: any) { - error = err; - this.closed = true; - }, - complete: function () { - complete = true; - this.closed = true; - }, - }; - - const sub: Subject = Subject.create(destination, source); - - sub.subscribe({ - next: function (x: number) { - output.push(x); - }, - complete: () => { - outputComplete = true; - }, - }); - - sub.next('a'); - sub.next('b'); - sub.next('c'); - sub.error('boom'); - - expect(nexts).to.deep.equal(['a', 'b', 'c']); - expect(complete).to.be.false; - expect(error).to.equal('boom'); - - expect(output).to.deep.equal([1, 2, 3, 4, 5]); - expect(outputComplete).to.be.true; - }); - it('should be an Observer which can be given to Observable.subscribe', (done) => { const source = of(1, 2, 3, 4, 5); const subject = new Subject(); @@ -781,30 +682,3 @@ describe('Subject', () => { expect(results).to.deep.equal([1, 1, 2, 2, 'complete']); }); }); - -describe('AnonymousSubject', () => { - it('should be exposed', () => { - expect(AnonymousSubject).to.be.a('function'); - }); - - it('should not be eager', () => { - let subscribed = false; - - const subject = Subject.create( - null, - new Observable((observer: Observer) => { - subscribed = true; - const subscription = of('x').subscribe(observer); - return () => { - subscription.unsubscribe(); - }; - }) - ); - - const observable = subject.asObservable(); - expect(subscribed).to.be.false; - - observable.subscribe(); - expect(subscribed).to.be.true; - }); -}); diff --git a/packages/rxjs/spec/observables/dom/webSocket-spec.ts b/packages/rxjs/spec/observables/dom/webSocket-spec.ts index 3aed1eb2f7..2023b6df7d 100644 --- a/packages/rxjs/spec/observables/dom/webSocket-spec.ts +++ b/packages/rxjs/spec/observables/dom/webSocket-spec.ts @@ -3,15 +3,13 @@ import * as sinon from 'sinon'; import { webSocket } from 'rxjs/webSocket'; import { map, retry, take, repeat, takeWhile } from 'rxjs/operators'; -const root: any = (typeof globalThis !== 'undefined' && globalThis) - || (typeof self !== 'undefined' && self) - || global; +const root: any = (typeof globalThis !== 'undefined' && globalThis) || (typeof self !== 'undefined' && self) || global; enum WebSocketState { CONNECTING = 0, OPEN = 1, CLOSING = 2, - CLOSED = 3 + CLOSED = 3, } /** @test {webSocket} */ @@ -43,7 +41,7 @@ describe('webSocket', () => { subject.next('ping'); - subject.subscribe(x => { + subject.subscribe((x) => { expect(x).to.equal('pong'); messageReceived = true; }); @@ -64,10 +62,7 @@ describe('webSocket', () => { const subject = webSocket('ws://mysocket'); const results: any[] = []; - subject.pipe( - map(x => x + '!'), - ) - .subscribe(x => results.push(x)); + subject.pipe(map((x) => x + '!')).subscribe((x) => results.push(x)); MockWebSocket.lastSocket.triggerMessage(JSON.stringify('ngconf 2018 bug')); @@ -79,7 +74,7 @@ describe('webSocket', () => { const results: string[] = []; const subject = webSocket('ws://mysocket'); - subject.subscribe(x => { + subject.subscribe((x) => { results.push(x); }); @@ -87,7 +82,7 @@ describe('webSocket', () => { socket.open(); - expected.forEach(x => { + expected.forEach((x) => { socket.triggerMessage(JSON.stringify(x)); }); @@ -100,7 +95,7 @@ describe('webSocket', () => { const expected = ['make', 'him', 'walk', 'the', 'plank']; const subject = webSocket('ws://mysocket'); - expected.forEach(x => { + expected.forEach((x) => { subject.next(x); }); @@ -226,7 +221,7 @@ describe('webSocket', () => { // Close socket after socket2 has opened socket2.open(); expect(socket2.readyState).to.equal(WebSocketState.OPEN); - socket.triggerClose({wasClean: true}); + socket.triggerClose({ wasClean: true }); expect(socket.readyState).to.equal(WebSocketState.CLOSED); expect(socket2.close).have.not.been.called; @@ -247,7 +242,7 @@ describe('webSocket', () => { sinon.spy(socket, 'close'); expect(socket.close).not.have.been.called; - subject.error({ code: 1337, reason: 'Too bad, so sad :('}); + subject.error({ code: 1337, reason: 'Too bad, so sad :(' }); expect(socket.close).have.been.calledWith(1337, 'Too bad, so sad :('); subject.unsubscribe(); @@ -310,7 +305,6 @@ describe('webSocket', () => { }); describe('with a config object', () => { - beforeEach(() => { setupMockWebSocket(); }); @@ -325,7 +319,7 @@ describe('webSocket', () => { subject.next('ping'); - subject.subscribe(x => { + subject.subscribe((x) => { expect(x).to.equal('pong'); messageReceived = true; }); @@ -345,7 +339,7 @@ describe('webSocket', () => { it('should take a protocol and set it properly on the web socket', () => { const subject = webSocket({ url: 'ws://mysocket', - protocol: 'someprotocol' + protocol: 'someprotocol', }); subject.subscribe(); @@ -359,7 +353,7 @@ describe('webSocket', () => { it('should take a binaryType and set it properly on the web socket', () => { const subject = webSocket({ url: 'ws://mysocket', - binaryType: 'blob' + binaryType: 'blob', }); subject.subscribe(); @@ -377,7 +371,7 @@ describe('webSocket', () => { url: 'ws://mysocket', deserializer: (e: any) => { return e.data + '!'; - } + }, }); subject.subscribe((x: any) => { @@ -400,14 +394,17 @@ describe('webSocket', () => { url: 'ws://mysocket', deserializer: (e: any) => { throw new Error('I am a bad error'); - } + }, }); - subject.subscribe({ next: (x: any) => { - expect(x).to.equal('this should not happen'); - }, error: (err: any) => { - expect(err).to.be.an('error', 'I am a bad error'); - } }); + subject.subscribe({ + next: (x: any) => { + expect(x).to.equal('this should not happen'); + }, + error: (err: any) => { + expect(err).to.be.an('error', 'I am a bad error'); + }, + }); const socket = MockWebSocket.lastSocket; socket.open(); @@ -424,8 +421,8 @@ describe('webSocket', () => { next(x: any) { calls++; expect(x).to.be.an('undefined'); - } - } + }, + }, }); subject.subscribe(); @@ -455,8 +452,8 @@ describe('webSocket', () => { closeObserver: { next(e: any) { closes.push(e); - } - } + }, + }, }); subject.subscribe(); @@ -468,9 +465,11 @@ describe('webSocket', () => { socket.triggerClose(expected[0]); expect(closes.length).to.equal(1); - subject.subscribe({ error: function (err) { - expect(err).to.equal(expected[1]); - } }); + subject.subscribe({ + error: function (err) { + expect(err).to.equal(expected[1]); + }, + }); socket = MockWebSocket.lastSocket; socket.open(); @@ -489,21 +488,23 @@ describe('webSocket', () => { url: 'bad_url', WebSocketCtor: (url: string, protocol?: string | string[]): WebSocket => { throw new Error(`connection refused`); - } + }, }); - subject.subscribe({ next: (x: any) => { - expect(x).to.equal('this should not happen'); - }, error: (err: any) => { - expect(err).to.be.an('error', 'connection refused'); - } }); + subject.subscribe({ + next: (x: any) => { + expect(x).to.equal('this should not happen'); + }, + error: (err: any) => { + expect(err).to.be.an('error', 'connection refused'); + }, + }); subject.unsubscribe(); }); }); describe('multiplex', () => { - beforeEach(() => { setupMockWebSocket(); }); @@ -514,20 +515,22 @@ describe('webSocket', () => { it('should be retryable', () => { const results = [] as string[]; - const subject = webSocket<{ name: string, value: string }>('ws://websocket'); + const subject = webSocket<{ sub: string } | { unsub: string }, { name: string; value: string }>('ws://websocket'); const source = subject.multiplex( () => ({ sub: 'foo' }), () => ({ unsub: 'foo' }), - value => value.name === 'foo' + (value) => value.name === 'foo' ); - source.pipe( - retry(1), - map(x => x.value), - take(2), - ).subscribe(x => { - results.push(x); - }); + source + .pipe( + retry(1), + map((x) => x.value), + take(2) + ) + .subscribe((x) => { + results.push(x); + }); const socket = MockWebSocket.lastSocket; socket.open(); @@ -549,19 +552,20 @@ describe('webSocket', () => { it('should be repeatable', () => { const results = [] as string[]; - const subject = webSocket<{ name: string, value: string }>('ws://websocket'); + const subject = webSocket<{ sub: string } | { unsub: string }, { name: string; value: string }>('ws://websocket'); + const source = subject.multiplex( () => ({ sub: 'foo' }), () => ({ unsub: 'foo' }), - value => value.name === 'foo' + (value) => value.name === 'foo' ); source .pipe( repeat(2), - map(x => x.value) + map((x) => x.value) ) - .subscribe(x => { + .subscribe((x) => { results.push(x); }); @@ -586,12 +590,12 @@ describe('webSocket', () => { }); it('should multiplex over the webSocket', () => { - const results = [] as Array<{ value: number, name: string }>; - const subject = webSocket<{ value: number, name: string }>('ws://websocket'); + const results = [] as Array<{ value: number; name: string }>; + const subject = webSocket<{ sub: string } | { unsub: string }, { value: number; name: string }>('ws://websocket'); const source = subject.multiplex( - () => ({ sub: 'foo'}), + () => ({ sub: 'foo' }), () => ({ unsub: 'foo' }), - value => value.name === 'foo' + (value) => value.name === 'foo' ); const sub = source.subscribe(function (x: any) { @@ -602,14 +606,16 @@ describe('webSocket', () => { expect(socket.lastMessageSent).to.deep.equal(JSON.stringify({ sub: 'foo' })); - [1, 2, 3, 4, 5].map((x: number) => { - return { - name: x % 3 === 0 ? 'bar' : 'foo', - value: x - }; - }).forEach((x: any) => { - socket.triggerMessage(JSON.stringify(x)); - }); + [1, 2, 3, 4, 5] + .map((x: number) => { + return { + name: x % 3 === 0 ? 'bar' : 'foo', + value: x, + }; + }) + .forEach((x: any) => { + socket.triggerMessage(JSON.stringify(x)); + }); expect(results).to.deep.equal([1, 2, 4, 5]); @@ -622,34 +628,34 @@ describe('webSocket', () => { }); it('should keep the same socket for multiple multiplex subscriptions', () => { - const socketSubject = webSocket({url: 'ws://mysocket'}); + const socketSubject = webSocket({ url: 'ws://mysocket' }); const results = [] as string[]; - const socketMessages = [ - {id: 'A'}, - {id: 'B'}, - {id: 'A'}, - {id: 'B'}, - {id: 'B'}, - ]; - - const sub1 = socketSubject.multiplex( - () => 'no-op', - () => results.push('A unsub'), - (req: any) => req.id === 'A' - ).pipe( - takeWhile((req: any) => !req.complete) - ) - .subscribe( - { next: () => results.push('A next'), error: (e) => results.push('A error ' + e), complete: () => results.push('A complete') } - ); + const socketMessages = [{ id: 'A' }, { id: 'B' }, { id: 'A' }, { id: 'B' }, { id: 'B' }]; + + const sub1 = socketSubject + .multiplex( + () => 'no-op', + () => (results.push('A unsub'), 'no-op'), + (req: any) => req.id === 'A' + ) + .pipe(takeWhile((req: any) => !req.complete)) + .subscribe({ + next: () => results.push('A next'), + error: (e) => results.push('A error ' + e), + complete: () => results.push('A complete'), + }); - socketSubject.multiplex( - () => 'no-op', - () => results.push('B unsub'), - (req: any) => req.id === 'B') - .subscribe( - { next: () => results.push('B next'), error: (e) => results.push('B error ' + e), complete: () => results.push('B complete') } - ); + socketSubject + .multiplex( + () => 'no-op', + () => (results.push('B unsub'), 'no-op'), + (req: any) => req.id === 'B' + ) + .subscribe({ + next: () => results.push('B next'), + error: (e) => results.push('B error ' + e), + complete: () => results.push('B complete'), + }); // Setup socket and send messages const socket = MockWebSocket.lastSocket; @@ -663,47 +669,39 @@ describe('webSocket', () => { }); socket.triggerClose({ wasClean: true }); - expect(results).to.deep.equal([ - 'A next', - 'A unsub', - 'B next', - 'B next', - 'B next', - 'B complete', - 'B unsub', - ]); + expect(results).to.deep.equal(['A next', 'A unsub', 'B next', 'B next', 'B next', 'B complete', 'B unsub']); }); it('should not close the socket until all subscriptions complete', () => { - const socketSubject = webSocket<{ id: string, complete: boolean }>({url: 'ws://mysocket'}); + const socketSubject = webSocket({ url: 'ws://mysocket' }); const results = [] as string[]; - const socketMessages = [ - {id: 'A'}, - {id: 'B'}, - {id: 'A', complete: true}, - {id: 'B'}, - {id: 'B', complete: true}, - ]; - - socketSubject.multiplex( - () => 'no-op', - () => results.push('A unsub'), - req => req.id === 'A' - ).pipe( - takeWhile(req => !req.complete) - ).subscribe( - { next: () => results.push('A next'), error: (e) => results.push('A error ' + e), complete: () => results.push('A complete') } - ); + const socketMessages = [{ id: 'A' }, { id: 'B' }, { id: 'A', complete: true }, { id: 'B' }, { id: 'B', complete: true }]; - socketSubject.multiplex( - () => 'no-op', - () => results.push('B unsub'), - req => req.id === 'B' - ).pipe( - takeWhile(req => !req.complete) - ).subscribe( - { next: () => results.push('B next'), error: (e) => results.push('B error ' + e), complete: () => results.push('B complete') } - ); + socketSubject + .multiplex( + () => 'no-op', + () => (results.push('A unsub'), 'no-op'), + (req) => req.id === 'A' + ) + .pipe(takeWhile((req) => !req.complete)) + .subscribe({ + next: () => results.push('A next'), + error: (e) => results.push('A error ' + e), + complete: () => results.push('A complete'), + }); + + socketSubject + .multiplex( + () => 'no-op', + () => (results.push('B unsub'), 'no-op'), + (req) => req.id === 'B' + ) + .pipe(takeWhile((req) => !req.complete)) + .subscribe({ + next: () => results.push('B next'), + error: (e) => results.push('B error ' + e), + complete: () => results.push('B complete'), + }); // Setup socket and send messages const socket = MockWebSocket.lastSocket; @@ -712,15 +710,7 @@ describe('webSocket', () => { socket.triggerMessage(JSON.stringify(msg)); }); - expect(results).to.deep.equal([ - 'A next', - 'B next', - 'A unsub', - 'A complete', - 'B next', - 'B unsub', - 'B complete', - ]); + expect(results).to.deep.equal(['A next', 'B next', 'A unsub', 'A complete', 'B next', 'B unsub', 'B complete']); }); }); @@ -729,12 +719,12 @@ describe('webSocket', () => { let messageReceived = false; const subject = webSocket({ url: 'ws://mysocket', - WebSocketCtor: MockWebSocket + WebSocketCtor: MockWebSocket, }); subject.next('ping'); - subject.subscribe(x => { + subject.subscribe((x) => { expect(x).to.equal('pong'); messageReceived = true; }); @@ -752,16 +742,13 @@ describe('webSocket', () => { }); it('should handle constructor errors if no WebSocketCtor', () => { - expect(() => { const subject = webSocket({ - url: 'ws://mysocket' + url: 'ws://mysocket', }); }).to.throw('no WebSocket constructor can be found'); - }); }); - }); class MockWebSocket { diff --git a/packages/rxjs/src/internal/Subject.ts b/packages/rxjs/src/internal/Subject.ts index 90f153bd39..885981e0bc 100644 --- a/packages/rxjs/src/internal/Subject.ts +++ b/packages/rxjs/src/internal/Subject.ts @@ -40,15 +40,6 @@ export class Subject extends Observable implements SubscriptionLike { /** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */ thrownError: any = null; - /** - * Creates a "subject" by basically gluing an observer to an observable. - * - * @deprecated Recommended you do not use. Will be removed at some point in the future. Plans for replacement still under discussion. - */ - static create: (...args: any[]) => any = (destination: Observer, source: Observable): AnonymousSubject => { - return new AnonymousSubject(destination, source); - }; - constructor() { // NOTE: This must be here to obscure Observable's constructor. super(); @@ -146,31 +137,3 @@ export class Subject extends Observable implements SubscriptionLike { return new Observable((subscriber) => this.subscribe(subscriber)); } } - -export class AnonymousSubject extends Subject { - constructor( - /** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */ - public destination?: Observer, - /** @internal */ - protected _source?: Observable - ) { - super(); - } - - next(value: T) { - this.destination?.next?.(value); - } - - error(err: any) { - this.destination?.error?.(err); - } - - complete() { - this.destination?.complete?.(); - } - - /** @internal */ - protected _subscribe(subscriber: Subscriber): Subscription { - return this._source?.subscribe(subscriber) ?? Subscription.EMPTY; - } -} diff --git a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts index 5d7dbf9e77..187c04c63b 100644 --- a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts @@ -1,7 +1,5 @@ -import { Subject, AnonymousSubject } from '../../Subject.js'; -import { Subscriber, Observable, Subscription } from '../../Observable.js'; -import { ReplaySubject } from '../../ReplaySubject.js'; -import { Observer, NextObserver } from '../../types.js'; +import { Subscriber, Observable, Subscription, operate } from '../../Observable.js'; +import { NextObserver } from '../../types.js'; /** * WebSocketSubjectConfig is a plain Object that allows us to make our @@ -100,23 +98,23 @@ import { Observer, NextObserver } from '../../types.js'; * // Connection ok * ``` */ -export interface WebSocketSubjectConfig { +export interface WebSocketSubjectConfig { /** The url of the socket server to connect to */ url: string; /** The protocol to use to connect */ protocol?: string | Array; /** @deprecated Will be removed in v8. Use {@link deserializer} instead. */ - resultSelector?: (e: MessageEvent) => T; + resultSelector?: (e: MessageEvent) => Out; /** * A serializer used to create messages from passed values before the * messages are sent to the server. Defaults to JSON.stringify. */ - serializer?: (value: T) => WebSocketMessage; + serializer?: (value: In) => WebSocketMessage; /** * A deserializer used for messages arriving on the socket from the * server. Defaults to JSON.parse. */ - deserializer?: (e: MessageEvent) => T; + deserializer?: (e: MessageEvent) => Out; /** * An Observer that watches when open events occur on the underlying web socket. */ @@ -140,8 +138,7 @@ export interface WebSocketSubjectConfig { binaryType?: 'blob' | 'arraybuffer'; } -const DEFAULT_WEBSOCKET_CONFIG: WebSocketSubjectConfig = { - url: '', +const DEFAULT_WEBSOCKET_CONFIG = { deserializer: (e: MessageEvent) => JSON.parse(e.data), serializer: (value: any) => JSON.stringify(value), }; @@ -151,47 +148,53 @@ const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT = export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView; -export class WebSocketSubject extends AnonymousSubject { - private _config!: WebSocketSubjectConfig; - - /** @internal */ - _output!: Subject; +export class WebSocketSubject extends Observable { + private _config: WebSocketSubjectConfig & + Required, 'WebSocketCtor' | 'serializer' | 'deserializer'>>; private _socket: WebSocket | null = null; - constructor(urlConfigOrSource: string | WebSocketSubjectConfig | Observable, destination?: Observer) { + private _inputBuffer: In[] = []; + + private _hasError = false; + + private _error: any; + + private _isComplete = false; + + private _subscriberCounter = 0; + + private _subscribers = new Map>(); + + get observed() { + return this._subscribers.size > 0; + } + + constructor(urlConfigOrSource: string | WebSocketSubjectConfig) { super(); - if (urlConfigOrSource instanceof Observable) { - this.destination = destination; - this._source = urlConfigOrSource as Observable; - } else { - const config = (this._config = { ...DEFAULT_WEBSOCKET_CONFIG }); - this._output = new Subject(); - if (typeof urlConfigOrSource === 'string') { - config.url = urlConfigOrSource; - } else { - for (const key in urlConfigOrSource) { - if (urlConfigOrSource.hasOwnProperty(key)) { - (config as any)[key] = (urlConfigOrSource as any)[key]; - } - } - } + const userConfig = typeof urlConfigOrSource === 'string' ? { url: urlConfigOrSource } : urlConfigOrSource; + this._config = { + ...DEFAULT_WEBSOCKET_CONFIG, + // Setting this here because a previous version of this allowed + // WebSocket to be polyfilled later than DEFAULT_WEBSOCKET_CONFIG + // was defined. + WebSocketCtor: WebSocket, + ...userConfig, + }; - if (!config.WebSocketCtor && WebSocket) { - config.WebSocketCtor = WebSocket; - } else if (!config.WebSocketCtor) { - throw new Error('no WebSocket constructor can be found'); - } - this.destination = new ReplaySubject(); + if (!this._config.WebSocketCtor) { + throw new Error('no WebSocket constructor can be found'); } } private _resetState() { this._socket = null; - if (!this._source) { - this.destination = new ReplaySubject(); - } - this._output = new Subject(); + this._subscriberCounter = 0; + this._subscribers.clear(); + this._inputBuffer = []; + this._hasError = false; + this._isComplete = false; + this._error = null; } /** @@ -212,172 +215,176 @@ export class WebSocketSubject extends AnonymousSubject { * @param messageFilter A predicate for selecting the appropriate messages * from the server for the output stream. */ - multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) { - return new Observable((observer: Observer) => { - try { - this.next(subMsg()); - } catch (err) { - observer.error(err); - } - - const subscription = this.subscribe({ - next: (x) => { - try { + multiplex(subMsg: () => In, unsubMsg: () => In, messageFilter: (value: Out) => boolean) { + return new Observable((destination) => { + this.next(subMsg()); + destination.add(() => { + this.next(unsubMsg()); + }); + this.subscribe( + operate({ + destination, + next: (x) => { if (messageFilter(x)) { - observer.next(x); + destination.next(x); } - } catch (err) { - observer.error(err); - } - }, - error: (err) => observer.error(err), - complete: () => observer.complete(), - }); - - return () => { - try { - this.next(unsubMsg()); - } catch (err) { - observer.error(err); - } - subscription.unsubscribe(); - }; + }, + }) + ); }); } + #outputNext(value: Out) { + for (const subscriber of Array.from(this._subscribers.values())) { + subscriber.next(value); + } + } + + #outputError(err: any) { + const subscribers = Array.from(this._subscribers.values()); + for (const subscriber of subscribers) { + subscriber.error(err); + } + } + + #outputComplete() { + const subscribers = Array.from(this._subscribers.values()); + for (const subscriber of subscribers) { + subscriber.complete(); + } + } + private _connectSocket() { - const { WebSocketCtor, protocol, url, binaryType } = this._config; - const observer = this._output; + const { WebSocketCtor, protocol, url, binaryType, deserializer, openObserver, closeObserver } = this._config; let socket: WebSocket | null = null; try { - socket = protocol ? new WebSocketCtor!(url, protocol) : new WebSocketCtor!(url); + socket = protocol ? new WebSocketCtor(url, protocol) : new WebSocketCtor(url); this._socket = socket; if (binaryType) { this._socket.binaryType = binaryType; } - } catch (e) { - observer.error(e); + } catch (err) { + this.#outputError(err); return; } - const subscription = new Subscription(() => { - this._socket = null; - if (socket && socket.readyState === 1) { - socket.close(); - } - }); - - socket.onopen = (evt: Event) => { - const { _socket } = this; - if (!_socket) { - socket!.close(); - this._resetState(); + socket.onopen = (evt) => { + if (socket !== this._socket) { + socket?.close(); return; } - const { openObserver } = this._config; - if (openObserver) { - openObserver.next(evt); - } - const queue = this.destination; + openObserver?.next(evt); - this.destination = new Subscriber({ - next: (x: T) => { - if (socket!.readyState === 1) { - try { - const { serializer } = this._config; - socket!.send(serializer!(x!)); - } catch (e) { - this.destination!.error(e); - } - } - }, - error: (err: any) => { - const { closingObserver } = this._config; - if (closingObserver) { - closingObserver.next(undefined); - } - if (err && err.code) { - socket!.close(err.code, err.reason); - } else { - observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT)); - } - this._resetState(); - }, - complete: () => { - const { closingObserver } = this._config; - if (closingObserver) { - closingObserver.next(undefined); - } - socket!.close(); - this._resetState(); - }, - }); + while (this._inputBuffer.length > 0) { + this.next(this._inputBuffer.shift()!); + } - if (queue && queue instanceof ReplaySubject) { - subscription.add((queue as ReplaySubject).subscribe(this.destination)); + if (this._hasError) { + this.error(this._error); + } else if (this._isComplete) { + this.complete(); } }; socket.onerror = (e: Event) => { - this._resetState(); - observer.error(e); + if (socket !== this._socket) { + return; + } + + this.#outputError(e); }; socket.onclose = (e: CloseEvent) => { - if (socket === this._socket) { - this._resetState(); - } - const { closeObserver } = this._config; - if (closeObserver) { - closeObserver.next(e); + if (socket !== this._socket) { + return; } + + closeObserver?.next(e); + if (e.wasClean) { - observer.complete(); + this.#outputComplete(); } else { - observer.error(e); + this.#outputError(e); } }; socket.onmessage = (e: MessageEvent) => { try { - const { deserializer } = this._config; - observer.next(deserializer!(e)); + this.#outputNext(deserializer(e)); } catch (err) { - observer.error(err); + this.#outputError(err); } }; } - /** @internal */ - protected _subscribe(subscriber: Subscriber): Subscription { - const { _source } = this; - if (_source) { - return _source.subscribe(subscriber); + next(value: In) { + if (this._socket?.readyState !== 1) { + this._inputBuffer.push(value); + } else { + try { + this._socket.send(this._config.serializer(value)); + } catch (err: any) { + this.error(err); + } + } + } + + error(err: any) { + if (this._socket?.readyState === 1) { + this._config.closingObserver?.next(undefined); + if (err?.code) { + this._socket?.close(err.code, err.reason); + } else { + this.#outputError(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT)); + } + this._resetState(); + } else { + this._hasError = true; + this._error = err; + } + } + + complete() { + if (this._socket?.readyState === 1) { + this.#closeSocket(); + } else { + this._isComplete = true; + } + } + + #closeSocket() { + const { _socket } = this; + this._config.closingObserver?.next(undefined); + if (_socket && _socket.readyState <= 1) { + _socket.close(); } + this._resetState(); + } + + /** @internal */ + protected _subscribe(subscriber: Subscriber): Subscription { if (!this._socket) { this._connectSocket(); } - this._output.subscribe(subscriber); + const subscriberId = this._subscriberCounter++; + this._subscribers.set(subscriberId, subscriber); subscriber.add(() => { - const { _socket } = this; - if (this._output.observers.length === 0) { - if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) { - _socket.close(); - } - this._resetState(); + this._subscribers.delete(subscriberId); + if (!this.observed) { + this.#closeSocket(); } }); return subscriber; } unsubscribe() { - const { _socket } = this; - if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) { - _socket.close(); + const subscribers = Array.from(this._subscribers.values()); + this._subscribers.clear(); + for (const subscriber of subscribers) { + subscriber.unsubscribe(); } this._resetState(); - super.unsubscribe(); } } diff --git a/packages/rxjs/src/internal/observable/dom/webSocket.ts b/packages/rxjs/src/internal/observable/dom/webSocket.ts index b9631628f2..02bd79fedb 100644 --- a/packages/rxjs/src/internal/observable/dom/webSocket.ts +++ b/packages/rxjs/src/internal/observable/dom/webSocket.ts @@ -89,7 +89,7 @@ import { WebSocketSubject, WebSocketSubjectConfig } from './WebSocketSubject.js' * ```ts * import { webSocket } from 'rxjs/webSocket'; * - * const subject = webSocket('ws://localhost:8081'); + * const subject = webSocket('ws://localhost:8081'); * * subject.subscribe({ * next: msg => console.log('message received: ' + msg), // Called whenever there is a message from the server. @@ -103,7 +103,15 @@ import { WebSocketSubject, WebSocketSubjectConfig } from './WebSocketSubject.js' * ```ts * import { webSocket } from 'rxjs/webSocket'; * - * const subject = webSocket('ws://localhost:8081'); + * interface SendMsg { + * message: string; + * } + * + * interface RespMsg { + * data: any; + * } + * + * const subject = webSocket('ws://localhost:8081'); * * subject.subscribe(); * // Note that at least one consumer has to subscribe to the created subject - otherwise "nexted" values will be just buffered and not sent, @@ -123,7 +131,21 @@ import { WebSocketSubject, WebSocketSubjectConfig } from './WebSocketSubject.js' * ```ts * import { webSocket } from 'rxjs/webSocket'; * - * const subject = webSocket('ws://localhost:8081'); + * interface SubMsg { + * subscribe: string; + * } + * + * interface UnsubMsg { + * unsubscribe: string; + * } + * + * interface RespMsg { + * type: string; + * data: any; + * } + * + * + * const subject = webSocket('ws://localhost:8081'); * * const observableA = subject.multiplex( * () => ({ subscribe: 'A' }), // When server gets this message, it will start sending messages for 'A'... @@ -156,6 +178,6 @@ import { WebSocketSubject, WebSocketSubjectConfig } from './WebSocketSubject.js' * @param urlConfigOrSource The WebSocket endpoint as an url or an object with configuration and additional Observers. * @return Subject which allows to both send and receive messages via WebSocket connection. */ -export function webSocket(urlConfigOrSource: string | WebSocketSubjectConfig): WebSocketSubject { - return new WebSocketSubject(urlConfigOrSource); +export function webSocket(urlConfigOrSource: string | WebSocketSubjectConfig): WebSocketSubject { + return new WebSocketSubject(urlConfigOrSource); }