Skip to content

Commit

Permalink
feat(takeUntil): implement and test takeUntil()
Browse files Browse the repository at this point in the history
Add implementation and tests for .takeUntil()
  • Loading branch information
TylorS committed Mar 19, 2016
1 parent 3489ce3 commit 304bed1
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {FoldOperator} from './operator/FoldOperator';
import {LastOperator} from './operator/LastOperator';
import {RememberOperator} from './operator/RememberOperator';
import {StartWithOperator} from './operator/StartWithOperator';
import {TakeUntilOperator} from './operator/TakeUntilOperator';
import {
CombineProducer,
CombineInstanceSignature,
Expand Down Expand Up @@ -151,6 +152,10 @@ export class Stream<T> implements Observer<T> {
return new Stream<T>(new StartWithOperator(this, x));
}

takeUntil<T>(s: Stream<any>): Stream<T> {
return new Stream<T>(new TakeUntilOperator(this, s));
}

merge(other: Stream<T>): Stream<T> {
return Stream.merge(this, other);
}
Expand Down
23 changes: 23 additions & 0 deletions src/operator/TakeUntilOperator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import {Stream} from '../Stream';
import {Observer} from '../Observer';
import {Operator} from '../Operator';
import {emptyObserver} from '../utils/emptyObserver';
import {noop} from '../utils/noop';

export class TakeUntilOperator<T> implements Operator<T, T> {
public out: Observer<T> = emptyObserver;
public endObserver: Observer<any> = emptyObserver;
constructor(public ins: Stream<T>, public endStream: Stream<T>) {
}

start(out: Observer<T>): void {
this.out = out;
function next() { out.end(); }
this.endStream.take(1).subscribe({next, error: noop, end: noop});
this.ins.subscribe(this.out);
}

stop(): void {
this.ins.unsubscribe(this.out);
}
}
20 changes: 20 additions & 0 deletions tests/operator/takeUntil.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import xs from '../../src/index';
import * as assert from 'assert';

describe('Stream.prototype.takeUntil', () => {
it('should end a stream when another emits', (done) => {
const stream = xs.interval(50).takeUntil(xs.interval(210));
const expected = [0, 1, 2, 3];
stream.subscribe({
next(x: number) {
assert.equal(x, expected.shift());
},
error: done.fail,
end() {
assert.equal(expected.length, 0);
done();
},
}
);
});
});

0 comments on commit 304bed1

Please sign in to comment.