diff --git a/packages/rxjs/spec/observables/dom/webSocket-spec.ts b/packages/rxjs/spec/observables/dom/webSocket-spec.ts index 5f6f0cfcdc..2c52134a47 100644 --- a/packages/rxjs/spec/observables/dom/webSocket-spec.ts +++ b/packages/rxjs/spec/observables/dom/webSocket-spec.ts @@ -669,7 +669,7 @@ describe('webSocket', () => { }); socket.triggerClose({ wasClean: true }); - expect(results).to.deep.equal(['A next', 'A unsub', 'B next', 'B next', 'B next', 'B unsub', 'B complete']); + expect(results).to.deep.equal(['A next', 'A unsub', 'B next', 'B next', 'B next', 'B complete']); }); it('should not close the socket until all subscriptions complete', () => { diff --git a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts index 943e22a51e..a3f93888ee 100644 --- a/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ b/packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts @@ -1,4 +1,4 @@ -import type { Subscriber, Subscription} from '@rxjs/observable'; +import type { Subscriber, Subscription } from '@rxjs/observable'; import { Observable, operate } from '@rxjs/observable'; import type { NextObserver } from '../../types.js'; @@ -219,8 +219,11 @@ export class WebSocketSubject extends Observable { multiplex(subMsg: () => In, unsubMsg: () => In, messageFilter: (value: Out) => boolean) { return new Observable((destination) => { this.next(subMsg()); + let isUnsub = true; destination.add(() => { - this.next(unsubMsg()); + if (isUnsub) { + this.next(unsubMsg()); + } }); this.subscribe( operate({ @@ -230,6 +233,14 @@ export class WebSocketSubject extends Observable { destination.next(x); } }, + error: (err) => { + isUnsub = false; + destination.error(err); + }, + complete: () => { + isUnsub = false; + destination.complete(); + }, }) ); });