Skip to content

Commit

Permalink
fix: check for interop subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
cartant committed Nov 27, 2019
1 parent f9ebfc1 commit a5f7abd
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 21 deletions.
5 changes: 4 additions & 1 deletion src/internal/operators/catchError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ class CatchSubscriber<T, R> extends OuterSubscriber<T, T | R> {
this._unsubscribeAndRecycle();
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
subscribeToResult(this, result, undefined, undefined, innerSubscriber);
const innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber);
if (innerSubscription !== innerSubscriber) {
this.add(innerSubscription);
}
}
}
}
7 changes: 5 additions & 2 deletions src/internal/operators/exhaustMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,13 @@ class ExhaustMapSubscriber<T, R> extends OuterSubscriber<T, R> {
}

private _innerSub(result: ObservableInput<R>, value: T, index: number): void {
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
const innerSubscriber = new InnerSubscriber(this, value, index);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
subscribeToResult<T, R>(this, result, value, index, innerSubscriber);
const innerSubscription = subscribeToResult<T, R>(this, result, undefined, undefined, innerSubscriber);
if (innerSubscription !== innerSubscriber) {
destination.add(innerSubscription);
}
}

protected _complete(): void {
Expand Down
7 changes: 5 additions & 2 deletions src/internal/operators/mergeMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,13 @@ export class MergeMapSubscriber<T, R> extends OuterSubscriber<T, R> {
}

private _innerSub(ish: ObservableInput<R>, value: T, index: number): void {
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
const innerSubscriber = new InnerSubscriber(this, value, index);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
subscribeToResult<T, R>(this, ish, value, index, innerSubscriber);
const innerSubscription = subscribeToResult<T, R>(this, ish, undefined, undefined, innerSubscriber);
if (innerSubscription !== innerSubscriber) {
destination.add(innerSubscription);
}
}

protected _complete(): void {
Expand Down
7 changes: 5 additions & 2 deletions src/internal/operators/mergeScan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,13 @@ export class MergeScanSubscriber<T, R> extends OuterSubscriber<T, R> {
}

private _innerSub(ish: any, value: T, index: number): void {
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
const innerSubscriber = new InnerSubscriber(this, value, index);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
subscribeToResult<T, R>(this, ish, value, index, innerSubscriber);
const innerSubscription = subscribeToResult<T, R>(this, ish, undefined, undefined, innerSubscriber);
if (innerSubscription !== innerSubscriber) {
destination.add(innerSubscription);
}
}

protected _complete(): void {
Expand Down
5 changes: 4 additions & 1 deletion src/internal/operators/onErrorResumeNext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ class OnErrorResumeNextSubscriber<T, R> extends OuterSubscriber<T, R> {
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
subscribeToResult(this, next, undefined, undefined, innerSubscriber);
const innerSubscription = subscribeToResult(this, next, undefined, undefined, innerSubscriber);
if (innerSubscription !== innerSubscriber) {
destination.add(innerSubscription);
}
} else {
this.destination.complete();
}
Expand Down
6 changes: 5 additions & 1 deletion src/internal/operators/skipUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ class SkipUntilSubscriber<T, R> extends OuterSubscriber<T, R> {
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
this.innerSubscription = innerSubscriber;
subscribeToResult(this, notifier, undefined, undefined, innerSubscriber);
const innerSubscription = subscribeToResult(this, notifier, undefined, undefined, innerSubscriber);
if (innerSubscription !== innerSubscriber) {
this.add(innerSubscription);
this.innerSubscription = innerSubscription;
}
}

protected _next(value: T) {
Expand Down
7 changes: 5 additions & 2 deletions src/internal/operators/switchMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,13 @@ class SwitchMapSubscriber<T, R> extends OuterSubscriber<T, R> {
if (innerSubscription) {
innerSubscription.unsubscribe();
}
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
const innerSubscriber = new InnerSubscriber(this, value, index);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
this.innerSubscription = subscribeToResult(this, result, value, index, innerSubscriber);
this.innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber);
if (this.innerSubscription !== innerSubscriber) {
destination.add(this.innerSubscription);
}
}

protected _complete(): void {
Expand Down
13 changes: 3 additions & 10 deletions src/internal/util/subscribeToResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,13 @@ export function subscribeToResult<T, R>(
result: any,
outerValue?: T,
outerIndex?: number,
destination?: Subscriber<any>
): Subscription;
export function subscribeToResult<T, R>(
outerSubscriber: OuterSubscriber<T, R>,
result: any,
outerValue?: T,
outerIndex?: number,
destination: Subscriber<any> = new InnerSubscriber(outerSubscriber, outerValue, outerIndex)
): Subscription | void {
destination: Subscriber<R> = new InnerSubscriber(outerSubscriber, outerValue, outerIndex)
): Subscription | undefined {
if (destination.closed) {
return undefined;
}
if (result instanceof Observable) {
return result.subscribe(destination);
}
return subscribeTo(result)(destination);
return subscribeTo(result)(destination) as Subscription;
}

0 comments on commit a5f7abd

Please sign in to comment.