Skip to content

Commit

Permalink
fix(sampleCombine): do not sample until all streams have emitted
Browse files Browse the repository at this point in the history
  • Loading branch information
staltz committed Sep 25, 2016
1 parent e21f76f commit 9882e89
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 43 deletions.
88 changes: 55 additions & 33 deletions src/extra/sampleCombine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,78 +48,100 @@ export interface SampleCombineSignature {
(...streams: Array<Stream<any>>): (s: Stream<any>) => Stream<Array<any>>;
}

const NO = {};

export class SampleCombineListener<T> implements InternalListener<T> {
constructor(private i: number, private p: SampleCombineOperator<any>) {
p.ils[i] = this;
}

_n(t: T): void {
if (!this.p.out) return;
this.p.vals[this.i] = t;
const p = this.p;
if (p.out === NO) return;
p.up(t, this.i);
}

_e(err: any): void {
this.p._e(err);
}

_c(): void {
this.p.d(this.i, this);
this.p.down(this.i, this);
}
}

export class SampleCombineOperator<T> implements Operator<T, Array<any>> {
public type = 'sampleCombine';
public ins: Stream<T>;
public others: Array<Stream<any>>;
public out: Stream<Array<any>>;
public vals: Array<any> = [];
public Sc: number = 0;
public ils: Array<SampleCombineListener<any>> = [];
public ils: Array<SampleCombineListener<any>>;
public Nn: number; // *N*umber of streams still to send *n*ext
public vals: Array<any>;

constructor(public ins: Stream<T>,
public streams: Array<Stream<any>>) { }
constructor(ins: Stream<T>, streams: Array<Stream<any>>) {
this.ins = ins;
this.others = streams;
this.out = NO as Stream<Array<any>>;
this.ils = [];
this.Nn = 0;
this.vals = [];
}

_start(out: Stream<Array<any>>): void {
if (!this.ins || !this.streams) {
out._n([]);
out._c();
} else {
this.Sc = this.streams.length;
this.ins._add(this);
this.out = out;
if (this.Sc) {
for (let i = 0; i < this.Sc; i++) {
this.vals[i] = undefined;
this.streams[i]._add(new SampleCombineListener<any>(i, this));
}
}
this.out = out;
const s = this.others;
const n = this.Nn = s.length;
const vals = this.vals = new Array(n);
for (let i = 0; i < n; i++) {
vals[i] = NO;
s[i]._add(new SampleCombineListener<any>(i, this));
}
this.ins._add(this);
}

_stop(): void {
if (!this.ins || this.Sc) return;
const s = this.others;
const n = s.length;
const ils = this.ils;
this.ins._remove(this);
this.out = this.vals = null;
for (let i = 0; i < this.Sc; i++) {
this.streams[i]._remove(this.ils[i]);
for (let i = 0; i < n; i++) {
s[i]._remove(ils[i]);
}
this.out = NO as Stream<Array<any>>;
this.vals = [];
this.ils = [];
}

_n(t: T): void {
if (!this.out) return;
this.out._n([t, ...this.vals]);
const out = this.out;
if (out === NO) return;
if (this.Nn > 0) return;
out._n([t, ...this.vals]);
}

_e(err: any): void {
if (!this.out) return;
this.out._e(err);
const out = this.out;
if (out === NO) return;
out._e(err);
}

_c(): void {
if (!this.out) return;
this.out._c();
const out = this.out;
if (out === NO) return;
out._c();
}

up(t: any, i: number): void {
const v = this.vals[i];
if (this.Nn > 0 && v === NO) {
this.Nn--;
}
this.vals[i] = t;
}

d(i: number, l: SampleCombineListener<any>): void {
this.streams[i]._remove(l);
down(i: number, l: SampleCombineListener<any>): void {
this.others[i]._remove(l);
}
}

Expand Down
37 changes: 27 additions & 10 deletions tests/extra/sampleCombine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ describe('sampleCombine (extra)', () => {
});
});

it('should not pick values from sampled streams before they have emitted', (done) => {
const stream1 = xs.periodic(100).take(4);
const stream2 = xs.periodic(150).take(1);
const stream = stream1.compose(sampleCombine(stream2)).map(arr => arr.join(''));
let expected = ['10', '20', '30'];
stream.addListener({
next: (x) => {
assert.equal(x, expected.shift());
},
error: done,
complete: () => {
assert.equal(expected.length, 0);
done();
},
});
});

it('should just wrap the value if combining one stream', (done) => {
const source = xs.periodic(100).take(3);
const stream = source.compose(sampleCombine());
Expand All @@ -76,13 +93,13 @@ describe('sampleCombine (extra)', () => {
});

it('should not break future listeners when SampleCombineProducer tears down', (done) => {
// --0---1--2---| innerA
// ----0----1---| innerB
// ----0-----1--2---| outer
// ------00--11-12--| stream
// --0---1----2-| innerA
// ----0-----1--| innerB
// ----a-----b--c---| outer
// ------a0-----c21-| stream
const innerA = xs.create<number>();
const innerB = xs.create<number>();
const outer = xs.create<number>();
const outer = xs.create<string>();
const arrayInners: Array<Stream<number>> = [];
const stream = outer
.map(x => {
Expand All @@ -92,11 +109,11 @@ describe('sampleCombine (extra)', () => {
.map(combination => `${x}${combination.join('')}`);
})
.flatten();
const expected = ['00', '11', '12'];
const expected = ['a0', 'c21'];

setTimeout(() => {
arrayInners.push(innerA);
outer.shamefullySendNext(0);
outer.shamefullySendNext('a');
}, 100);
setTimeout(() => {
innerA.shamefullySendNext(0);
Expand All @@ -106,13 +123,13 @@ describe('sampleCombine (extra)', () => {
}, 175);
setTimeout(() => {
arrayInners.push(innerB);
outer.shamefullySendNext(1);
outer.shamefullySendNext('b');
innerA.shamefullySendNext(1);
}, 200);
setTimeout(() => {
innerA.shamefullySendNext(2);
outer.shamefullySendNext(2);
outer.shamefullySendNext('c');
innerB.shamefullySendNext(1);
innerA.shamefullySendNext(2);
}, 250);
setTimeout(() => {
innerA.shamefullySendComplete();
Expand Down

0 comments on commit 9882e89

Please sign in to comment.