diff --git a/spec/observables/bindNodeCallback-spec.ts b/spec/observables/bindNodeCallback-spec.ts index ca4f506987..d689f6f9b1 100644 --- a/spec/observables/bindNodeCallback-spec.ts +++ b/spec/observables/bindNodeCallback-spec.ts @@ -279,17 +279,16 @@ describe('bindNodeCallback', () => { expect(results2).to.deep.equal([42, 'done']); }); - it('should not swallow post-callback errors', () => { + it('should emit post callback errors', () => { function badFunction(callback: (error: Error, answer: number) => void): void { callback(null as any, 42); - throw new Error('kaboom'); - } - const consoleStub = sinon.stub(console, 'warn'); - try { - bindNodeCallback(badFunction)().subscribe(); - expect(consoleStub).to.have.property('called', true); - } finally { - consoleStub.restore(); + throw 'kaboom'; } + let receivedError: any; + bindNodeCallback(badFunction)().subscribe({ + error: err => receivedError = err + }); + + expect(receivedError).to.equal('kaboom'); }); }); diff --git a/src/internal/observable/bindNodeCallback.ts b/src/internal/observable/bindNodeCallback.ts index 87467ef7f2..8c4a9f9ebb 100644 --- a/src/internal/observable/bindNodeCallback.ts +++ b/src/internal/observable/bindNodeCallback.ts @@ -1,51 +1,155 @@ +/** @prettier */ import { Observable } from '../Observable'; import { AsyncSubject } from '../AsyncSubject'; import { Subscriber } from '../Subscriber'; import { SchedulerAction, SchedulerLike } from '../types'; import { map } from '../operators/map'; -import { canReportError } from '../util/canReportError'; import { isScheduler } from '../util/isScheduler'; import { isArray } from '../util/isArray'; /* tslint:disable:max-line-length */ /** @deprecated resultSelector is deprecated, pipe to map instead */ -export function bindNodeCallback(callbackFunc: Function, resultSelector: Function, scheduler?: SchedulerLike): (...args: any[]) => Observable; +export function bindNodeCallback( + callbackFunc: Function, + resultSelector: Function, + scheduler?: SchedulerLike +): (...args: any[]) => Observable; -export function bindNodeCallback(callbackFunc: (callback: (err: any, res1: R1, res2: R2, res3: R3, res4: R4, ...args: any[]) => any) => any, scheduler?: SchedulerLike): (...args: any[]) => Observable; -export function bindNodeCallback(callbackFunc: (callback: (err: any, res1: R1, res2: R2, res3: R3) => any) => any, scheduler?: SchedulerLike): () => Observable<[R1, R2, R3]>; -export function bindNodeCallback(callbackFunc: (callback: (err: any, res1: R1, res2: R2) => any) => any, scheduler?: SchedulerLike): () => Observable<[R1, R2]>; -export function bindNodeCallback(callbackFunc: (callback: (err: any, res1: R1) => any) => any, scheduler?: SchedulerLike): () => Observable; +export function bindNodeCallback( + callbackFunc: (callback: (err: any, res1: R1, res2: R2, res3: R3, res4: R4, ...args: any[]) => any) => any, + scheduler?: SchedulerLike +): (...args: any[]) => Observable; +export function bindNodeCallback( + callbackFunc: (callback: (err: any, res1: R1, res2: R2, res3: R3) => any) => any, + scheduler?: SchedulerLike +): () => Observable<[R1, R2, R3]>; +export function bindNodeCallback( + callbackFunc: (callback: (err: any, res1: R1, res2: R2) => any) => any, + scheduler?: SchedulerLike +): () => Observable<[R1, R2]>; +export function bindNodeCallback( + callbackFunc: (callback: (err: any, res1: R1) => any) => any, + scheduler?: SchedulerLike +): () => Observable; export function bindNodeCallback(callbackFunc: (callback: (err: any) => any) => any, scheduler?: SchedulerLike): () => Observable; -export function bindNodeCallback(callbackFunc: (arg1: A1, callback: (err: any, res1: R1, res2: R2, res3: R3, res4: R4, ...args: any[]) => any) => any, scheduler?: SchedulerLike): (...args: any[]) => Observable; -export function bindNodeCallback(callbackFunc: (arg1: A1, callback: (err: any, res1: R1, res2: R2, res3: R3) => any) => any, scheduler?: SchedulerLike): (arg1: A1) => Observable<[R1, R2, R3]>; -export function bindNodeCallback(callbackFunc: (arg1: A1, callback: (err: any, res1: R1, res2: R2) => any) => any, scheduler?: SchedulerLike): (arg1: A1) => Observable<[R1, R2]>; -export function bindNodeCallback(callbackFunc: (arg1: A1, callback: (err: any, res1: R1) => any) => any, scheduler?: SchedulerLike): (arg1: A1) => Observable; -export function bindNodeCallback(callbackFunc: (arg1: A1, callback: (err: any) => any) => any, scheduler?: SchedulerLike): (arg1: A1) => Observable; +export function bindNodeCallback( + callbackFunc: (arg1: A1, callback: (err: any, res1: R1, res2: R2, res3: R3, res4: R4, ...args: any[]) => any) => any, + scheduler?: SchedulerLike +): (...args: any[]) => Observable; +export function bindNodeCallback( + callbackFunc: (arg1: A1, callback: (err: any, res1: R1, res2: R2, res3: R3) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1) => Observable<[R1, R2, R3]>; +export function bindNodeCallback( + callbackFunc: (arg1: A1, callback: (err: any, res1: R1, res2: R2) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1) => Observable<[R1, R2]>; +export function bindNodeCallback( + callbackFunc: (arg1: A1, callback: (err: any, res1: R1) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1) => Observable; +export function bindNodeCallback( + callbackFunc: (arg1: A1, callback: (err: any) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1) => Observable; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, callback: (err: any, res1: R1, res2: R2, res3: R3, res4: R4, ...args: any[]) => any) => any, scheduler?: SchedulerLike): (...args: any[]) => Observable; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, callback: (err: any, res1: R1, res2: R2, res3: R3) => any) => any, scheduler?: SchedulerLike): (arg1: A1, arg2: A2) => Observable<[R1, R2, R3]>; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, callback: (err: any, res1: R1, res2: R2) => any) => any, scheduler?: SchedulerLike): (arg1: A1, arg2: A2) => Observable<[R1, R2]>; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, callback: (err: any, res1: R1) => any) => any, scheduler?: SchedulerLike): (arg1: A1, arg2: A2) => Observable; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, callback: (err: any) => any) => any, scheduler?: SchedulerLike): (arg1: A1, arg2: A2) => Observable; +export function bindNodeCallback( + callbackFunc: (arg1: A1, arg2: A2, callback: (err: any, res1: R1, res2: R2, res3: R3, res4: R4, ...args: any[]) => any) => any, + scheduler?: SchedulerLike +): (...args: any[]) => Observable; +export function bindNodeCallback( + callbackFunc: (arg1: A1, arg2: A2, callback: (err: any, res1: R1, res2: R2, res3: R3) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1, arg2: A2) => Observable<[R1, R2, R3]>; +export function bindNodeCallback( + callbackFunc: (arg1: A1, arg2: A2, callback: (err: any, res1: R1, res2: R2) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1, arg2: A2) => Observable<[R1, R2]>; +export function bindNodeCallback( + callbackFunc: (arg1: A1, arg2: A2, callback: (err: any, res1: R1) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1, arg2: A2) => Observable; +export function bindNodeCallback( + callbackFunc: (arg1: A1, arg2: A2, callback: (err: any) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1, arg2: A2) => Observable; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, arg3: A3, callback: (err: any, res1: R1, res2: R2, res3: R3, res4: R4, ...args: any[]) => any) => any, scheduler?: SchedulerLike): (...args: any[]) => Observable; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, arg3: A3, callback: (err: any, res1: R1, res2: R2, res3: R3) => any) => any, scheduler?: SchedulerLike): (arg1: A1, arg2: A2, arg3: A3) => Observable<[R1, R2, R3]>; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, arg3: A3, callback: (err: any, res1: R1, res2: R2) => any) => any, scheduler?: SchedulerLike): (arg1: A1, arg2: A2, arg3: A3) => Observable<[R1, R2]>; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, arg3: A3, callback: (err: any, res1: R1) => any) => any, scheduler?: SchedulerLike): (arg1: A1, arg2: A2, arg3: A3) => Observable; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, arg3: A3, callback: (err: any) => any) => any, scheduler?: SchedulerLike): (arg1: A1, arg2: A2, arg3: A3) => Observable; +export function bindNodeCallback( + callbackFunc: (arg1: A1, arg2: A2, arg3: A3, callback: (err: any, res1: R1, res2: R2, res3: R3, res4: R4, ...args: any[]) => any) => any, + scheduler?: SchedulerLike +): (...args: any[]) => Observable; +export function bindNodeCallback( + callbackFunc: (arg1: A1, arg2: A2, arg3: A3, callback: (err: any, res1: R1, res2: R2, res3: R3) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1, arg2: A2, arg3: A3) => Observable<[R1, R2, R3]>; +export function bindNodeCallback( + callbackFunc: (arg1: A1, arg2: A2, arg3: A3, callback: (err: any, res1: R1, res2: R2) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1, arg2: A2, arg3: A3) => Observable<[R1, R2]>; +export function bindNodeCallback( + callbackFunc: (arg1: A1, arg2: A2, arg3: A3, callback: (err: any, res1: R1) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1, arg2: A2, arg3: A3) => Observable; +export function bindNodeCallback( + callbackFunc: (arg1: A1, arg2: A2, arg3: A3, callback: (err: any) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1, arg2: A2, arg3: A3) => Observable; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, arg3: A3, arg4: A4, callback: (err: any, res1: R1, res2: R2, res3: R3, res4: R4, ...args: any[]) => any) => any, scheduler?: SchedulerLike): (...args: any[]) => Observable; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, arg3: A3, arg4: A4, callback: (err: any, res1: R1, res2: R2, res3: R3) => any) => any, scheduler?: SchedulerLike): (arg1: A1, arg2: A2, arg3: A3, arg4: A4) => Observable<[R1, R2, R3]>; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, arg3: A3, arg4: A4, callback: (err: any, res1: R1, res2: R2) => any) => any, scheduler?: SchedulerLike): (arg1: A1, arg2: A2, arg3: A3, arg4: A4) => Observable<[R1, R2]>; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, arg3: A3, arg4: A4, callback: (err: any, res1: R1) => any) => any, scheduler?: SchedulerLike): (arg1: A1, arg2: A2, arg3: A3, arg4: A4) => Observable; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, arg3: A3, arg4: A4, callback: (err: any) => any) => any, scheduler?: SchedulerLike): (arg1: A1, arg2: A2, arg3: A3, arg4: A4) => Observable; +export function bindNodeCallback( + callbackFunc: ( + arg1: A1, + arg2: A2, + arg3: A3, + arg4: A4, + callback: (err: any, res1: R1, res2: R2, res3: R3, res4: R4, ...args: any[]) => any + ) => any, + scheduler?: SchedulerLike +): (...args: any[]) => Observable; +export function bindNodeCallback( + callbackFunc: (arg1: A1, arg2: A2, arg3: A3, arg4: A4, callback: (err: any, res1: R1, res2: R2, res3: R3) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1, arg2: A2, arg3: A3, arg4: A4) => Observable<[R1, R2, R3]>; +export function bindNodeCallback( + callbackFunc: (arg1: A1, arg2: A2, arg3: A3, arg4: A4, callback: (err: any, res1: R1, res2: R2) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1, arg2: A2, arg3: A3, arg4: A4) => Observable<[R1, R2]>; +export function bindNodeCallback( + callbackFunc: (arg1: A1, arg2: A2, arg3: A3, arg4: A4, callback: (err: any, res1: R1) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1, arg2: A2, arg3: A3, arg4: A4) => Observable; +export function bindNodeCallback( + callbackFunc: (arg1: A1, arg2: A2, arg3: A3, arg4: A4, callback: (err: any) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1, arg2: A2, arg3: A3, arg4: A4) => Observable; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5, callback: (err: any, res1: R1, res2: R2, res3: R3, res4: R4, ...args: any[]) => any) => any, scheduler?: SchedulerLike): (...args: any[]) => Observable; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5, callback: (err: any, res1: R1, res2: R2, res3: R3) => any) => any, scheduler?: SchedulerLike): (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5) => Observable<[R1, R2, R3]>; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5, callback: (err: any, res1: R1, res2: R2) => any) => any, scheduler?: SchedulerLike): (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5) => Observable<[R1, R2]>; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5, callback: (err: any, res1: R1) => any) => any, scheduler?: SchedulerLike): (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5) => Observable; -export function bindNodeCallback(callbackFunc: (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5, callback: (err: any) => any) => any, scheduler?: SchedulerLike): (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5) => Observable; /* tslint:enable:max-line-length */ +export function bindNodeCallback( + callbackFunc: ( + arg1: A1, + arg2: A2, + arg3: A3, + arg4: A4, + arg5: A5, + callback: (err: any, res1: R1, res2: R2, res3: R3, res4: R4, ...args: any[]) => any + ) => any, + scheduler?: SchedulerLike +): (...args: any[]) => Observable; +export function bindNodeCallback( + callbackFunc: (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5, callback: (err: any, res1: R1, res2: R2, res3: R3) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5) => Observable<[R1, R2, R3]>; +export function bindNodeCallback( + callbackFunc: (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5, callback: (err: any, res1: R1, res2: R2) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5) => Observable<[R1, R2]>; +export function bindNodeCallback( + callbackFunc: (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5, callback: (err: any, res1: R1) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5) => Observable; +export function bindNodeCallback( + callbackFunc: (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5, callback: (err: any) => any) => any, + scheduler?: SchedulerLike +): (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5) => Observable; /* tslint:enable:max-line-length */ export function bindNodeCallback(callbackFunc: Function, scheduler?: SchedulerLike): (...args: any[]) => Observable; /** @@ -155,124 +259,120 @@ export function bindNodeCallback(callbackFunc: Function, scheduler?: SchedulerLi */ export function bindNodeCallback( callbackFunc: Function, - resultSelector?: Function|SchedulerLike, + resultSelector?: Function | SchedulerLike, scheduler?: SchedulerLike ): (...args: any[]) => Observable { - if (resultSelector) { if (isScheduler(resultSelector)) { scheduler = resultSelector; } else { // DEPRECATED PATH - return (...args: any[]) => bindNodeCallback(callbackFunc, scheduler)(...args).pipe( - map(args => isArray(args) ? resultSelector(...args) : resultSelector(args)) - ); + return (...args: any[]) => + bindNodeCallback( + callbackFunc, + scheduler + )(...args).pipe(map((args) => (isArray(args) ? resultSelector(...args) : resultSelector(args)))); } } - return function(this: any, ...args: any[]): Observable { - const params: ParamsState = { - subject: undefined!, - args, - callbackFunc, - scheduler: scheduler!, - context: this, - }; - return new Observable(subscriber => { - const { context } = params; - let { subject } = params; + return function (this: any, ...args: any[]): Observable { + let results: any; + let hasResults = false; + let hasError = false; + let error: any; + return new Observable((subscriber) => { if (!scheduler) { - if (!subject) { - subject = params.subject = new AsyncSubject(); + let isCurrentlyAsync = false; + let hasCompletedSynchronously = false; + if (hasResults) { + subscriber.next(results); + subscriber.complete(); + } else if (hasError) { + subscriber.error(error); + } else { const handler = (...innerArgs: any[]) => { const err = innerArgs.shift(); - - if (err) { - subject.error(err); - return; + if (err != null) { + hasError = true; + error = err; + subscriber.error(err); + } else { + hasResults = true; + results = innerArgs.length <= 1 ? innerArgs[0] : innerArgs; + subscriber.next(results); + if (isCurrentlyAsync) { + subscriber.complete(); + } else { + hasCompletedSynchronously = true; + } } - - subject.next(innerArgs.length <= 1 ? innerArgs[0] : innerArgs); - subject.complete(); }; try { - callbackFunc.apply(context, [...args, handler]); + callbackFunc.apply(this, [...args, handler]); } catch (err) { - if (canReportError(subject)) { - subject.error(err); - } else { - console.warn(err); - } + hasError = true; + error = err; + subscriber.error(err); + } + isCurrentlyAsync = true; + + if (hasCompletedSynchronously && !hasError) { + subscriber.complete(); } } - return subject.subscribe(subscriber); + return; } else { - return scheduler.schedule>(dispatch as any, 0, { params, subscriber, context }); - } - }); - }; -} - -interface DispatchState { - subscriber: Subscriber; - context: any; - params: ParamsState; -} - -interface ParamsState { - callbackFunc: Function; - args: any[]; - scheduler: SchedulerLike; - subject: AsyncSubject; - context: any; -} - -function dispatch(this: SchedulerAction>, state: DispatchState) { - const { params, subscriber, context } = state; - const { callbackFunc, args, scheduler } = params; - let subject = params.subject; + const scheduleNext = (value: any[]) => { + hasResults = true; + results = value.length <= 1 ? value[0] : value; + subscriber.add( + scheduler!.schedule(() => { + subscriber.next(results); + subscriber.add( + scheduler!.schedule(() => { + subscriber.complete(); + }) + ); + }) + ); + }; - if (!subject) { - subject = params.subject = new AsyncSubject(); + const scheduleError = (err: any) => { + hasError = true; + error = err; + subscriber.add( + scheduler!.schedule(() => { + subscriber.error(error); + }) + ); + }; - const handler = (...innerArgs: any[]) => { - const err = innerArgs.shift(); - if (err) { - this.add(scheduler.schedule>(dispatchError as any, 0, { err, subject })); - } else { - const value = innerArgs.length <= 1 ? innerArgs[0] : innerArgs; - this.add(scheduler.schedule>(dispatchNext as any, 0, { value, subject })); + return scheduler.schedule(() => { + if (hasResults) { + scheduleNext(results); + } else if (hasError) { + scheduleError(error); + } else { + try { + callbackFunc.apply(this, [ + ...args, + (...innerArgs: any[]) => { + const err = innerArgs.shift(); + if (err != null) { + scheduleError(err); + } else { + scheduleNext(innerArgs); + } + }, + ]); + } catch (err) { + scheduleError(err); + return; + } + } + }); } - }; - - try { - callbackFunc.apply(context, [...args, handler]); - } catch (err) { - this.add(scheduler.schedule>(dispatchError as any, 0, { err, subject })); - } - } - - this.add(subject.subscribe(subscriber)); -} - -interface DispatchNextArg { - subject: AsyncSubject; - value: T; -} - -function dispatchNext(arg: DispatchNextArg) { - const { value, subject } = arg; - subject.next(value); - subject.complete(); -} - -interface DispatchErrorArg { - subject: AsyncSubject; - err: any; -} - -function dispatchError(arg: DispatchErrorArg) { - const { err, subject } = arg; - subject.error(err); + }); + }; }