Skip to content

Commit

Permalink
fix(retryWhen): Ensure subscription tears down between retries
Browse files Browse the repository at this point in the history
Fixes an issue where subscriptions would not teardown until all retries were exhausted in the synchronous case.
  • Loading branch information
benlesh committed Aug 3, 2020
1 parent d9881c1 commit c82f241
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 86 deletions.
23 changes: 22 additions & 1 deletion spec/operators/retryWhen-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { retryWhen, map, mergeMap, takeUntil } from 'rxjs/operators';
import { of, EMPTY } from 'rxjs';
import { of, EMPTY, Observable, throwError } from 'rxjs';

/** @test {retryWhen} */
describe('retryWhen operator', () => {
Expand Down Expand Up @@ -329,4 +329,25 @@ describe('retryWhen operator', () => {
expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should always teardown before starting the next cycle, even when synchronous', () => {
const results: any[] = [];
const source = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.error('bad');
return () => {
results.push('teardown');
}
});
const subscription = source.pipe(retryWhen(errors$ => errors$.pipe(
mergeMap((err, i) => i < 3 ? of(true) : throwError(err))
))).subscribe({
next: value => results.push(value),
error: (err) => results.push(err)
});

expect(subscription.closed).to.be.true;
expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'bad', 'teardown'])
});
});
164 changes: 79 additions & 85 deletions src/internal/operators/retryWhen.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import { Operator } from '../Operator';
/** @prettier */
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';
import { Subject } from '../Subject';
import { Subscription } from '../Subscription';


import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
import { MonoTypeOperatorFunction } from '../types';
import { lift } from '../util/lift';
import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';

/**
* Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable
Expand Down Expand Up @@ -62,88 +60,84 @@ import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '..
* @name retryWhen
*/
export function retryWhen<T>(notifier: (errors: Observable<any>) => Observable<any>): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => lift(source, new RetryWhenOperator(notifier, source));
}

class RetryWhenOperator<T> implements Operator<T, T> {
constructor(protected notifier: (errors: Observable<any>) => Observable<any>,
protected source: Observable<T>) {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new RetryWhenSubscriber(subscriber, this.notifier, this.source));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class RetryWhenSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {

private errors: Subject<any> | null = null;
private retries: Observable<any> | null = null;
private retriesSubscription: Subscription | null | undefined = null;

constructor(destination: Subscriber<R>,
private notifier: (errors: Observable<any>) => Observable<any>,
private source: Observable<T>) {
super(destination);
}

error(err: any) {
if (!this.isStopped) {

let errors = this.errors;
let retries = this.retries;
let retriesSubscription = this.retriesSubscription;

if (!retries) {
errors = new Subject();
try {
const { notifier } = this;
retries = notifier(errors);
} catch (e) {
return super.error(e);
return (source: Observable<T>) =>
lift(source, function (this: Subscriber<T>, source: Observable<T>) {
const subscriber = this;
const subscription = new Subscription();
let innerSub: Subscription | null;
let syncResub = false;
let errors$: Subject<any>;

/**
* Gets the subject to send errors through. If it doesn't exist,
* we know we need to setup the notifier.
*/
const getErrorSubject = () => {
if (!errors$) {
errors$ = new Subject();
let notifier$: Observable<any>;
// The notifier is a user-provided function, so we need to do
// some error handling.
try {
notifier$ = notifier(errors$);
} catch (err) {
subscriber.error(err);
// Returning null here will cause the code below to
// notice there's been a problem and skip error notification.
return null;
}
subscription.add(
notifier$.subscribe({
next: () => {
if (innerSub) {
subscribeNext();
} else {
// If we don't have an innerSub yet, that's because the inner subscription
// call hasn't even returned yet. We've arrived here synchronously.
// So we flag that we want to resub, such that we can ensure teardown
// happens before we resubscribe.
syncResub = true;
}
},
error: (err) => subscriber.error(err),
complete: () => subscriber.complete(),
})
);
}
retriesSubscription = innerSubscribe(retries, new SimpleInnerSubscriber(this));
} else {
this.errors = null;
this.retriesSubscription = null;
}

this._unsubscribeAndRecycle();

this.errors = errors;
this.retries = retries;
this.retriesSubscription = retriesSubscription;

errors!.next(err);
}
}

/** @deprecated This is an internal implementation detail, do not use. */
_unsubscribe() {
const { errors, retriesSubscription } = this;
if (errors) {
errors.unsubscribe();
this.errors = null;
}
if (retriesSubscription) {
retriesSubscription.unsubscribe();
this.retriesSubscription = null;
}
this.retries = null;
}

notifyNext(): void {
const { _unsubscribe } = this;
return errors$;
};

const subscribeNext = () => {
innerSub = source.subscribe({
next: (value) => subscriber.next(value),
error: (err) => {
const errors$ = getErrorSubject();
if (errors$) {
// We have set up the notifier without error.
errors$.next(err);
}
},
complete: () => subscriber.complete(),
});
if (syncResub) {
// Ensure that the inner subscription is torn down before
// moving on to the next subscription in the synchronous case.
// If we don't do this here, all inner subscriptions will not be
// torn down until the entire observable is done.
innerSub.unsubscribe();
innerSub = null;
// We may need to do this multiple times, so reset the flag.
syncResub = false;
// Resubscribe
subscribeNext();
} else {
subscription.add(innerSub);
}
};

this._unsubscribe = null!;
this._unsubscribeAndRecycle();
this._unsubscribe = _unsubscribe;
// Start the subscription
subscribeNext();

this.source.subscribe(this);
}
return subscription;
});
}

0 comments on commit c82f241

Please sign in to comment.