diff --git a/spec/operators/retryWhen-spec.ts b/spec/operators/retryWhen-spec.ts index d145d6e4c0..227a595919 100644 --- a/spec/operators/retryWhen-spec.ts +++ b/spec/operators/retryWhen-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { retryWhen, map, mergeMap, takeUntil } from 'rxjs/operators'; -import { of, EMPTY } from 'rxjs'; +import { of, EMPTY, Observable, throwError } from 'rxjs'; /** @test {retryWhen} */ describe('retryWhen operator', () => { @@ -329,4 +329,25 @@ describe('retryWhen operator', () => { expectObservable(result).toBe(expected); expectSubscriptions(source.subscriptions).toBe(subs); }); + + it('should always teardown before starting the next cycle, even when synchronous', () => { + const results: any[] = []; + const source = new Observable(subscriber => { + subscriber.next(1); + subscriber.next(2); + subscriber.error('bad'); + return () => { + results.push('teardown'); + } + }); + const subscription = source.pipe(retryWhen(errors$ => errors$.pipe( + mergeMap((err, i) => i < 3 ? of(true) : throwError(err)) + ))).subscribe({ + next: value => results.push(value), + error: (err) => results.push(err) + }); + + expect(subscription.closed).to.be.true; + expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'bad', 'teardown']) + }); }); diff --git a/src/internal/operators/retryWhen.ts b/src/internal/operators/retryWhen.ts index 5368ea8611..c5d1d05626 100644 --- a/src/internal/operators/retryWhen.ts +++ b/src/internal/operators/retryWhen.ts @@ -1,13 +1,11 @@ -import { Operator } from '../Operator'; +/** @prettier */ import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { Subject } from '../Subject'; import { Subscription } from '../Subscription'; - -import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { MonoTypeOperatorFunction } from '../types'; import { lift } from '../util/lift'; -import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe'; /** * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable @@ -62,88 +60,84 @@ import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '.. * @name retryWhen */ export function retryWhen(notifier: (errors: Observable) => Observable): MonoTypeOperatorFunction { - return (source: Observable) => lift(source, new RetryWhenOperator(notifier, source)); -} - -class RetryWhenOperator implements Operator { - constructor(protected notifier: (errors: Observable) => Observable, - protected source: Observable) { - } - - call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new RetryWhenSubscriber(subscriber, this.notifier, this.source)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class RetryWhenSubscriber extends SimpleOuterSubscriber { - - private errors: Subject | null = null; - private retries: Observable | null = null; - private retriesSubscription: Subscription | null | undefined = null; - - constructor(destination: Subscriber, - private notifier: (errors: Observable) => Observable, - private source: Observable) { - super(destination); - } - - error(err: any) { - if (!this.isStopped) { - - let errors = this.errors; - let retries = this.retries; - let retriesSubscription = this.retriesSubscription; - - if (!retries) { - errors = new Subject(); - try { - const { notifier } = this; - retries = notifier(errors); - } catch (e) { - return super.error(e); + return (source: Observable) => + lift(source, function (this: Subscriber, source: Observable) { + const subscriber = this; + const subscription = new Subscription(); + let innerSub: Subscription | null; + let syncResub = false; + let errors$: Subject; + + /** + * Gets the subject to send errors through. If it doesn't exist, + * we know we need to setup the notifier. + */ + const getErrorSubject = () => { + if (!errors$) { + errors$ = new Subject(); + let notifier$: Observable; + // The notifier is a user-provided function, so we need to do + // some error handling. + try { + notifier$ = notifier(errors$); + } catch (err) { + subscriber.error(err); + // Returning null here will cause the code below to + // notice there's been a problem and skip error notification. + return null; + } + subscription.add( + notifier$.subscribe({ + next: () => { + if (innerSub) { + subscribeNext(); + } else { + // If we don't have an innerSub yet, that's because the inner subscription + // call hasn't even returned yet. We've arrived here synchronously. + // So we flag that we want to resub, such that we can ensure teardown + // happens before we resubscribe. + syncResub = true; + } + }, + error: (err) => subscriber.error(err), + complete: () => subscriber.complete(), + }) + ); } - retriesSubscription = innerSubscribe(retries, new SimpleInnerSubscriber(this)); - } else { - this.errors = null; - this.retriesSubscription = null; - } - - this._unsubscribeAndRecycle(); - - this.errors = errors; - this.retries = retries; - this.retriesSubscription = retriesSubscription; - - errors!.next(err); - } - } - - /** @deprecated This is an internal implementation detail, do not use. */ - _unsubscribe() { - const { errors, retriesSubscription } = this; - if (errors) { - errors.unsubscribe(); - this.errors = null; - } - if (retriesSubscription) { - retriesSubscription.unsubscribe(); - this.retriesSubscription = null; - } - this.retries = null; - } - - notifyNext(): void { - const { _unsubscribe } = this; + return errors$; + }; + + const subscribeNext = () => { + innerSub = source.subscribe({ + next: (value) => subscriber.next(value), + error: (err) => { + const errors$ = getErrorSubject(); + if (errors$) { + // We have set up the notifier without error. + errors$.next(err); + } + }, + complete: () => subscriber.complete(), + }); + if (syncResub) { + // Ensure that the inner subscription is torn down before + // moving on to the next subscription in the synchronous case. + // If we don't do this here, all inner subscriptions will not be + // torn down until the entire observable is done. + innerSub.unsubscribe(); + innerSub = null; + // We may need to do this multiple times, so reset the flag. + syncResub = false; + // Resubscribe + subscribeNext(); + } else { + subscription.add(innerSub); + } + }; - this._unsubscribe = null!; - this._unsubscribeAndRecycle(); - this._unsubscribe = _unsubscribe; + // Start the subscription + subscribeNext(); - this.source.subscribe(this); - } + return subscription; + }); }