diff --git a/src/operator/skipUntil.ts b/src/operator/skipUntil.ts index 993499c371..ea6c2dad6f 100644 --- a/src/operator/skipUntil.ts +++ b/src/operator/skipUntil.ts @@ -1,10 +1,5 @@ -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; -import { TeardownLogic } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; +import { skipUntil as higherOrder } from '../operators/skipUntil'; /** * Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item. @@ -19,58 +14,5 @@ import { subscribeToResult } from '../util/subscribeToResult'; * @owner Observable */ export function skipUntil(this: Observable, notifier: Observable): Observable { - return this.lift(new SkipUntilOperator(notifier)); -} - -class SkipUntilOperator implements Operator { - constructor(private notifier: Observable) { - } - - call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new SkipUntilSubscriber(subscriber, this.notifier)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class SkipUntilSubscriber extends OuterSubscriber { - - private hasValue: boolean = false; - private isInnerStopped: boolean = false; - - constructor(destination: Subscriber, - notifier: Observable) { - super(destination); - this.add(subscribeToResult(this, notifier)); - } - - protected _next(value: T) { - if (this.hasValue) { - super._next(value); - } - } - - protected _complete() { - if (this.isInnerStopped) { - super._complete(); - } else { - this.unsubscribe(); - } - } - - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - this.hasValue = true; - } - - notifyComplete(): void { - this.isInnerStopped = true; - if (this.isStopped) { - super._complete(); - } - } + return higherOrder(notifier)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index d7e348bbba..96ac9e6faa 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -62,6 +62,7 @@ export { refCount } from './refCount'; export { scan } from './scan'; export { skip } from './skip'; export { skipLast } from './skipLast'; +export { skipUntil } from './skipUntil'; export { subscribeOn } from './subscribeOn'; export { switchAll } from './switchAll'; export { switchMap } from './switchMap'; diff --git a/src/operators/skipUntil.ts b/src/operators/skipUntil.ts new file mode 100644 index 0000000000..2bde030901 --- /dev/null +++ b/src/operators/skipUntil.ts @@ -0,0 +1,77 @@ +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { Observable } from '../Observable'; +import { TeardownLogic } from '../Subscription'; +import { OuterSubscriber } from '../OuterSubscriber'; +import { InnerSubscriber } from '../InnerSubscriber'; +import { subscribeToResult } from '../util/subscribeToResult'; +import { MonoTypeOperatorFunction } from '../interfaces'; + +/** + * Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item. + * + * + * + * @param {Observable} notifier - The second Observable that has to emit an item before the source Observable's elements begin to + * be mirrored by the resulting Observable. + * @return {Observable} An Observable that skips items from the source Observable until the second Observable emits + * an item, then emits the remaining items. + * @method skipUntil + * @owner Observable + */ +export function skipUntil(notifier: Observable): MonoTypeOperatorFunction { + return (source: Observable) => source.lift(new SkipUntilOperator(notifier)); +} + +class SkipUntilOperator implements Operator { + constructor(private notifier: Observable) { + } + + call(subscriber: Subscriber, source: any): TeardownLogic { + return source.subscribe(new SkipUntilSubscriber(subscriber, this.notifier)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class SkipUntilSubscriber extends OuterSubscriber { + + private hasValue: boolean = false; + private isInnerStopped: boolean = false; + + constructor(destination: Subscriber, + notifier: Observable) { + super(destination); + this.add(subscribeToResult(this, notifier)); + } + + protected _next(value: T) { + if (this.hasValue) { + super._next(value); + } + } + + protected _complete() { + if (this.isInnerStopped) { + super._complete(); + } else { + this.unsubscribe(); + } + } + + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + this.hasValue = true; + } + + notifyComplete(): void { + this.isInnerStopped = true; + if (this.isStopped) { + super._complete(); + } + } +}