Skip to content

Commit

Permalink
feat(delay): implement extra operator delay() and compose()
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros committed Mar 28, 2016
1 parent 707cd50 commit 48c5abc
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ export class Stream<T> implements InternalListener<T> {
return Stream.merge(this, other);
}

compose(operator: (stream: Stream<T>) => Stream<any>): Stream<any> {
return operator(this);
}

combine: CombineInstanceSignature<T> =
function combine<R>(project: CombineProjectFunction,
...streams: Array<Stream<any>>): Stream<R> {
Expand Down
56 changes: 56 additions & 0 deletions src/extra/delay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import {Stream} from '../Stream';
import {InternalListener} from '../InternalListener';
import {Operator} from '../Operator';
import {emptyListener} from '../utils/emptyListener';

class Proxy<T> implements InternalListener<T> {
constructor(private out: Stream<T>,
private op: DelayOperator<T>) {
}

_n(t: T) {
const self = this;
const id = setInterval(() => {
self.out._n(t);
clearInterval(id);
}, this.op.period);
}

_e(err: any) {
const self = this;
const id = setInterval(() => {
self.out._e(err);
clearInterval(id);
}, this.op.period);
}

_c() {
const self = this;
const id = setInterval(() => {
self.out._c();
clearInterval(id);
}, this.op.period);
}
}

export class DelayOperator<T> implements Operator<T, T> {
private proxy: InternalListener<T> = emptyListener;

constructor(public period: number,
public ins: Stream<T>) {
}

_start(out: Stream<T>): void {
this.ins._add(this.proxy = new Proxy(out, this));
}

_stop(): void {
this.ins._remove(this.proxy);
}
}

export default function delay<T>(period: number): (ins: Stream<T>) => Stream<T> {
return function delayOperator(ins: Stream<T>) {
return new Stream<T>(new DelayOperator(period, ins));
};
}
49 changes: 49 additions & 0 deletions tests/extra/delay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import xs from '../../src/index';
import delay from '../../src/extra/delay';
import * as assert from 'assert';

describe('delay (extra)', () => {
it('should delay interval events by a given time period', (done) => {
const stream = xs.interval(100).take(3).compose(delay(200));
const expected = [0, 1, 2];
let completeCalled = false;

stream.addListener({
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: (err: Error) => done(err),
complete: () => { completeCalled = true; },
});

setTimeout(() => assert.equal(expected.length, 3), 250);
setTimeout(() => assert.equal(expected.length, 2), 350);
setTimeout(() => assert.equal(expected.length, 1), 450);
setTimeout(() => {
assert.equal(expected.length, 0)
assert.equal(completeCalled, true);
done();
}, 550);
});

it('should delay synchronous events by a given time period', (done) => {
const stream = xs.of(10, 20, 30).compose(delay(100));
const expected = [10, 20, 30];
let completeCalled = false;

stream.addListener({
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: (err: Error) => done(err),
complete: () => { completeCalled = true; },
});

setTimeout(() => assert.equal(expected.length, 3), 50);
setTimeout(() => {
assert.equal(expected.length, 0)
assert.equal(completeCalled, true);
done();
}, 150);
});
});

0 comments on commit 48c5abc

Please sign in to comment.