Skip to content

Commit

Permalink
fix(take): fix take() behavior when stopping
Browse files Browse the repository at this point in the history
Bug was: take synchronously next'ed then complete'd and then would remove the TakeOperator from the
list of listeners of its input stream, which would be a concurrent change to the array, while it was
being iterated over in Stream._n, causing bad state in that iteration.
  • Loading branch information
staltz committed Apr 28, 2016
1 parent 670a086 commit 438fc0f
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 28 deletions.
11 changes: 8 additions & 3 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -952,13 +952,14 @@ export class TakeOperator<T> implements Operator<T, T> {
}

_stop(): void {
this.ins._remove(this);
setTimeout(() => { this.ins._remove(this); }, 0);
this.out = null;
this.taken = 0;
}

_n(t: T) {
const out = this.out;
if (!out) return;
if (this.taken++ < this.max - 1) {
out._n(t);
} else {
Expand All @@ -969,11 +970,15 @@ export class TakeOperator<T> implements Operator<T, T> {
}

_e(err: any) {
this.out._e(err);
const out = this.out;
if (!out) return;
out._e(err);
}

_c() {
this.out._c();
const out = this.out;
if (!out) return;
out._c();
}
}

Expand Down
10 changes: 5 additions & 5 deletions tests/extra/flattenSequentially.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ describe('flattenSequentially (extra)', () => {
});

it('should expand 3 sync events as a periodic each', (done) => {
const stream = xs.of(0, 1, 2)
const stream = xs.of(1, 2, 3)
.map(i => xs.periodic(100 * i).take(2).map(x => `${i}${x}`))
.compose(flattenSequentially());
// ---x---x---x---x---x---x
// ---00--01
// -------10------11
// -----------20----------21
const expected = ['00', '01', '10', '11', '20', '21'];
// ---10--11
// -------20------21
// -----------30----------31
const expected = ['10', '11', '20', '21', '30', '31'];
const listener = {
next: (x: number) => {
assert.equal(x, expected.shift());
Expand Down
14 changes: 6 additions & 8 deletions tests/extra/fromEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,16 @@ describe('fromEvent (extra)', () => {

let expected = [1, 2, 3];

const listener = {
next(x: any) {
stream.addListener({
next: (x: any) => {
assert.strictEqual(x, expected.shift());
},
error: done,
error: (err: any) => done(err),
complete: () => {
assert.strictEqual(expected.length, 0);
done();
}
};

stream.addListener(listener);
});

target.emit(1);
target.emit(2);
Expand All @@ -88,8 +86,8 @@ describe('fromEvent (extra)', () => {
const stream = fromEvent(target, 'test', true);

stream.take(1).addListener({
next(x) {},
error: done,
next: (x) => {},
error: (err: any) => done(err),
complete() {
setTimeout(() => {
assert.strictEqual('test', target.removedEvent);
Expand Down
8 changes: 4 additions & 4 deletions tests/operator/endWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ describe('Stream.prototype.endWhen', () => {

stream.addListener({
next: (x: number) => {
assert.equal(x, expected.shift());
assert.strictEqual(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
assert.strictEqual(expected.length, 0);
done();
},
});
Expand All @@ -29,11 +29,11 @@ describe('Stream.prototype.endWhen', () => {

stream.addListener({
next: (x: number) => {
assert.equal(x, expected.shift());
assert.strictEqual(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
assert.strictEqual(expected.length, 0);
done();
},
});
Expand Down
12 changes: 6 additions & 6 deletions tests/operator/flatten.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import xs from '../../src/index';
import xs, {Stream} from '../../src/index';
import * as assert from 'assert';

describe('Stream.prototype.flatten', () => {
Expand All @@ -22,14 +22,14 @@ describe('Stream.prototype.flatten', () => {
});

it('should expand 3 sync events as a periodic, only last one passes', (done) => {
const stream = xs.fromArray([0, 1, 2])
const stream = xs.fromArray([1, 2, 3])
.map(i => xs.periodic(100 * i).take(2).map(x => `${i}${x}`))
.flatten();
// ---x---x---x---x---x---x
// ---00--01
// -------10------11
// -----------20----------21
const expected = ['20', '21'];
// ---10--11
// -------20------21
// -----------30----------31
const expected = ['30', '31'];
const listener = {
next: (x: number) => {
assert.equal(x, expected.shift());
Expand Down
33 changes: 31 additions & 2 deletions tests/operator/take.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ import * as assert from 'assert';

describe('Stream.prototype.take', () => {
it('should allow specifying max amount to take from input stream', (done) => {
const stream = xs.periodic(50).take(4)
const stream = xs.periodic(50).take(4);
const expected = [0, 1, 2, 3];
let listener = {
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: done,
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
stream.removeListener(listener);
Expand All @@ -18,4 +18,33 @@ describe('Stream.prototype.take', () => {
};
stream.addListener(listener);
});

it('should not break sibling listeners when TakeOperator tears down', (done) => {
const source = xs.periodic(50);
const streamA = source.take(3);
const streamB = source.take(6);
const expectedA = [0, 1, 2];
const expectedB = [0, 1, 2, 3, 4, 5];

streamA.addListener({
next: (x: number) => {
assert.equal(x, expectedA.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expectedA.length, 0);
},
});

streamB.addListener({
next: (x: number) => {
assert.equal(x, expectedB.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expectedB.length, 0);
done();
},
});
});
});

0 comments on commit 438fc0f

Please sign in to comment.