Skip to content

Commit

Permalink
fix(MemoryStream): fix how MemoryStream handles late sync emissions
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros committed Mar 27, 2016
1 parent c88d6c2 commit 00de09d
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 3 deletions.
4 changes: 3 additions & 1 deletion src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,17 +229,19 @@ export class Stream<T> implements InternalListener<T> {

export class MemoryStream<T> extends Stream<T> {
public _val: any;
public _has: boolean = false;
constructor(producer: InternalProducer<T>) {
super(producer);
}

_n(x: T) {
this._val = x;
this._has = true;
super._n(x);
}

_add(listener: InternalListener<T>): void {
if (this._has) { listener._n(this._val); }
super._add(listener);
if (this._val) { listener._n(this._val); }
}
}
76 changes: 74 additions & 2 deletions tests/memoryStream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import xs from '../src/index';
import xs, {Producer, Listener, Stream} from '../src/index';
import * as assert from 'assert';

describe('MemoryStream', () => {
Expand All @@ -11,10 +11,82 @@ describe('MemoryStream', () => {
next(x: any) {
assert.strictEqual(x, 1);
},
error: done.fail,
error: (err: any) => done(err),
complete: done,
});

stream.shamefullySendComplete();
});

it('should be createable giving a custom producer object', (done) => {
const expected = [10, 20, 30];
let listenerGotEnd: boolean = false;

const stream = xs.createWithMemory({
start(listener: Listener<number>) {
listener.next(10);
listener.next(20);
listener.next(30);
listener.complete();
},

stop() {
done();
assert.equal(expected.length, 0);
assert.equal(listenerGotEnd, true);
},
});

stream.addListener({
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
listenerGotEnd = true;
},
});
});

it('should broadcast the producer to multiple listeners', (done) => {
const stream = xs.createWithMemory({
start(listener: Listener<number>) {
setTimeout(() => listener.next(0), 100);
setTimeout(() => listener.next(1), 200);
setTimeout(() => listener.next(2), 300);
},

stop() {},
});
const expected1 = [0, 1, 2];
const expected2 = [0, 1, 2];

let listener1 = {
next: (x: number) => {
assert.equal(x, expected1.shift());
},
error: (err: any) => done(err),
complete: () => done('should not call complete'),
};
stream.addListener(listener1);

let listener2 = {
next: (x: number) => {
assert.equal(x, expected2.shift());
},
error: (err: any) => done(err),
complete: () => done('should not call complete'),
};
setTimeout(() => {
stream.addListener(listener2);
}, 150);

setTimeout(() => {
stream.removeListener(listener1);
stream.removeListener(listener2);
assert.equal(expected1.length, 0);
assert.equal(expected2.length, 0);
done();
}, 400);
});
});

0 comments on commit 00de09d

Please sign in to comment.