Skip to content

Commit

Permalink
fix(flatten): fix automatic removal of inner listeners
Browse files Browse the repository at this point in the history
The listener of an inner stream in a flatten() was not necessarily being removed when the flatten
operator/producer was stopped, causing a leak. This commit fixes that leak.

Fixes issue #68.
  • Loading branch information
staltz committed Jul 5, 2016
1 parent 7aa3a04 commit 1c6ed5c
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ export class FlattenOperator<T> implements Operator<Stream<T>, T> {

_stop(): void {
this.ins._remove(this);
this.inner._remove(this.il);
this.inner = null;
this.il = null;
this.open = true;
Expand Down Expand Up @@ -704,6 +705,7 @@ export class MapFlattenOperator<T, R> implements Operator<T, R> {

_stop(): void {
this.mapOp.ins._remove(this);
this.inner._remove(this.il);
this.inner = null;
this.il = null;
this.open = true;
Expand Down
29 changes: 29 additions & 0 deletions tests/operator/flatten.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,35 @@ describe('Stream.prototype.flatten', () => {
},
});
});

it('should not leak when used in a withLatestFrom-like case', (done) => {
const a$ = xs.periodic(100);
const b$ = xs.periodic(220);

let innerAEmissions = 0;

// a$.withLatestFrom(b$, (a, b) => a + b)
const c$ = b$.map(b =>
a$.map(a => a + b).debug(a => { innerAEmissions += 1; })
).flatten().take(1);

let cEmissions = 0;
c$.addListener({
next: (c) => {
assert.strictEqual(cEmissions, 0);
assert.strictEqual(c, 0);
cEmissions += 1;
},
error: (err: any) => done(err),
complete: () => { },
});

setTimeout(() => {
assert.strictEqual(innerAEmissions, 1);
assert.strictEqual(cEmissions, 1);
done();
}, 800);
});
});

describe('with filter+map fusion', () => {
Expand Down

0 comments on commit 1c6ed5c

Please sign in to comment.