From c289688f5e1f33ec21306b4d2f5539dd19f963f2 Mon Sep 17 00:00:00 2001 From: Naiwei Zheng <naiweizheng@gmail.com> Date: Mon, 31 Aug 2020 14:30:41 -0700 Subject: [PATCH] fix: `switchMap` and `exhaustMap` behave correctly with re-entrant code. - switchMap should unsubscribe previous inner sub when getting synchronously reentrance during subscribing the inner sub - exhaustMap should ignore subsequent synchronous reentrances during subscribing the inner sub --- spec/operators/exhaustMap-spec.ts | 17 ++++++++++++++++- spec/operators/switchMap-spec.ts | 17 ++++++++++++++++- src/internal/operators/exhaustMap.ts | 3 ++- src/internal/operators/switchMap.ts | 3 ++- 4 files changed, 36 insertions(+), 4 deletions(-) diff --git a/spec/operators/exhaustMap-spec.ts b/spec/operators/exhaustMap-spec.ts index 5a878801fe..8255f2cc79 100644 --- a/spec/operators/exhaustMap-spec.ts +++ b/spec/operators/exhaustMap-spec.ts @@ -1,5 +1,5 @@ import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { concat, defer, Observable, of } from 'rxjs'; +import { concat, defer, Observable, of, BehaviorSubject } from 'rxjs'; import { exhaustMap, mergeMap, takeWhile, map, take } from 'rxjs/operators'; import { expect } from 'chai'; import { asInteropObservable } from '../helpers/interop-helper'; @@ -451,4 +451,19 @@ describe('exhaustMap', () => { expect(sideEffects).to.deep.equal([0, 1, 2]); }); + + it('should ignore subsequent synchronous reentrances during subscribing the inner sub', () => { + const e = new BehaviorSubject(1); + const results: Array<number> = []; + + e.pipe( + take(3), + exhaustMap(value => new Observable<number>(subscriber => { + e.next(value+1); + subscriber.next(value); + })), + ).subscribe(value => results.push(value)); + + expect(results).to.deep.equal([1]); + }); }); diff --git a/spec/operators/switchMap-spec.ts b/spec/operators/switchMap-spec.ts index 0ede016ffe..58d1f29907 100644 --- a/spec/operators/switchMap-spec.ts +++ b/spec/operators/switchMap-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { switchMap, mergeMap, map, takeWhile, take } from 'rxjs/operators'; -import { concat, defer, of, Observable } from 'rxjs'; +import { concat, defer, of, Observable, BehaviorSubject } from 'rxjs'; import { asInteropObservable } from '../helpers/interop-helper'; /** @test {switchMap} */ @@ -460,4 +460,19 @@ describe('switchMap', () => { expect(sideEffects).to.deep.equal([0, 1, 2]); }); + + it('should unsubscribe previous inner sub when getting synchronously reentrance during subscribing the inner sub', () => { + const e = new BehaviorSubject(1); + const results: Array<number> = []; + + e.pipe( + take(3), + switchMap(value => new Observable<number>(subscriber => { + e.next(value+1); + subscriber.next(value); + })), + ).subscribe(value => results.push(value)); + + expect(results).to.deep.equal([3]); + }); }); diff --git a/src/internal/operators/exhaustMap.ts b/src/internal/operators/exhaustMap.ts index ac3ad16124..931d7c8a19 100644 --- a/src/internal/operators/exhaustMap.ts +++ b/src/internal/operators/exhaustMap.ts @@ -113,7 +113,8 @@ class ExhaustMapSubscriber<T, R> extends SimpleOuterSubscriber<T, R> { const innerSubscriber = new SimpleInnerSubscriber(this); const destination = this.destination; destination.add(innerSubscriber); - this.innerSubscription = innerSubscribe(result, innerSubscriber); + this.innerSubscription = innerSubscriber; + innerSubscribe(result, innerSubscriber); } } diff --git a/src/internal/operators/switchMap.ts b/src/internal/operators/switchMap.ts index b0b873f228..7beec5afa7 100644 --- a/src/internal/operators/switchMap.ts +++ b/src/internal/operators/switchMap.ts @@ -129,7 +129,8 @@ class SwitchMapSubscriber<T, R> extends SimpleOuterSubscriber<T, R> { } const innerSubscriber = new SimpleInnerSubscriber(this); this.destination.add(innerSubscriber); - this.innerSubscription = innerSubscribe(result, innerSubscriber); + this.innerSubscription = innerSubscriber; + innerSubscribe(result, innerSubscriber); } protected _complete(): void {