diff --git a/src/extra/concat.ts b/src/extra/concat.ts index b046d94..d9f9186 100644 --- a/src/extra/concat.ts +++ b/src/extra/concat.ts @@ -20,6 +20,7 @@ class ConcatProducer implements InternalProducer, InternalListener { streams[this.i]._remove(this); } this.i = 0; + this.out = null; } _n(t: T) { diff --git a/src/extra/debounce.ts b/src/extra/debounce.ts index a7a9338..783e9a1 100644 --- a/src/extra/debounce.ts +++ b/src/extra/debounce.ts @@ -17,6 +17,9 @@ class DebounceOperator implements Operator { _stop(): void { this.ins._remove(this); + this.out = null; + this.value = null; + this.id = null; } clearTimer() { diff --git a/src/extra/delay.ts b/src/extra/delay.ts index 0426848..7310543 100644 --- a/src/extra/delay.ts +++ b/src/extra/delay.ts @@ -15,6 +15,7 @@ class DelayOperator implements Operator { _stop(): void { this.ins._remove(this); + this.out = null; } _n(t: T) { diff --git a/src/extra/dropRepeats.ts b/src/extra/dropRepeats.ts index 23365e5..2d10103 100644 --- a/src/extra/dropRepeats.ts +++ b/src/extra/dropRepeats.ts @@ -17,6 +17,8 @@ export class DropRepeatsOperator implements Operator { _stop(): void { this.ins._remove(this); + this.out = null; + this.v = empty; } isEq(x: T, y: T) { diff --git a/src/extra/fromEvent.ts b/src/extra/fromEvent.ts index ff65779..b22785e 100644 --- a/src/extra/fromEvent.ts +++ b/src/extra/fromEvent.ts @@ -19,6 +19,7 @@ export class DOMEventProducer implements InternalProducer { _stop() { const {node, eventType, listener, useCapture} = this; node.removeEventListener(eventType, listener, useCapture); + this.listener = null; } } diff --git a/src/extra/pairwise.ts b/src/extra/pairwise.ts index e750b64..f7c8652 100644 --- a/src/extra/pairwise.ts +++ b/src/extra/pairwise.ts @@ -16,6 +16,9 @@ class PairwiseOperator implements Operator { _stop(): void { this.ins._remove(this); + this.has = false; + this.out = null; + this.val = null; } _n(t: T) { diff --git a/src/factory/CombineProducer.ts b/src/factory/CombineProducer.ts index bcd5e56..ee7f33c 100644 --- a/src/factory/CombineProducer.ts +++ b/src/factory/CombineProducer.ts @@ -118,15 +118,22 @@ export class CombineProducer implements InternalProducer { _start(out: InternalListener): void { this.out = out; - for (let i = this.streams.length - 1; i >= 0; i--) { - this.streams[i]._add(new Proxy(i, this)); + const streams = this.streams; + for (let i = streams.length - 1; i >= 0; i--) { + streams[i]._add(new Proxy(i, this)); } } _stop(): void { - for (let i = this.streams.length - 1; i >= 0; i--) { - this.streams[i]._remove(this.proxies[i]); + const streams = this.streams; + for (let i = streams.length - 1; i >= 0; i--) { + streams[i]._remove(this.proxies[i]); } + this.out = null; + this.ac = streams.length; this.proxies = []; + this.ready = false; + this.vals = new Array(streams.length); + this.hasVal = new Array(streams.length); } } diff --git a/src/factory/MergeProducer.ts b/src/factory/MergeProducer.ts index 12d82d1..c35bdbf 100644 --- a/src/factory/MergeProducer.ts +++ b/src/factory/MergeProducer.ts @@ -13,15 +13,19 @@ export class MergeProducer implements InternalProducer, InternalListener): void { this.out = out; - for (let i = this.streams.length - 1; i >= 0; i--) { - this.streams[i]._add(this); + const streams = this.streams; + for (let i = streams.length - 1; i >= 0; i--) { + streams[i]._add(this); } } _stop(): void { - for (let i = this.streams.length - 1; i >= 0; i--) { - this.streams[i]._remove(this); + const streams = this.streams; + for (let i = streams.length - 1; i >= 0; i--) { + streams[i]._remove(this); } + this.out = null; + this.ac = streams.length; } _n(t: T) { diff --git a/src/factory/PeriodicProducer.ts b/src/factory/PeriodicProducer.ts index 555d31d..6598650 100644 --- a/src/factory/PeriodicProducer.ts +++ b/src/factory/PeriodicProducer.ts @@ -15,7 +15,8 @@ export class PeriodicProducer implements InternalProducer { } _stop(): void { - this.i = 0; if (this.intervalID !== -1) clearInterval(this.intervalID); + this.intervalID = -1; + this.i = 0; } } diff --git a/src/operator/DebugOperator.ts b/src/operator/DebugOperator.ts index c6a0710..0631abb 100644 --- a/src/operator/DebugOperator.ts +++ b/src/operator/DebugOperator.ts @@ -15,6 +15,7 @@ export class DebugOperator implements Operator { _stop(): void { this.ins._remove(this); + this.out = null; } _n(t: T) { diff --git a/src/operator/DropOperator.ts b/src/operator/DropOperator.ts index 8843988..1924721 100644 --- a/src/operator/DropOperator.ts +++ b/src/operator/DropOperator.ts @@ -16,6 +16,8 @@ export class DropOperator implements Operator { _stop(): void { this.ins._remove(this); + this.out = null; + this.dropped = 0; } _n(t: T) { diff --git a/src/operator/EndWhenOperator.ts b/src/operator/EndWhenOperator.ts index e37ec25..f980aac 100644 --- a/src/operator/EndWhenOperator.ts +++ b/src/operator/EndWhenOperator.ts @@ -38,14 +38,12 @@ export class EndWhenOperator implements Operator { _stop(): void { this.ins._remove(this); this.o._remove(this.oli); + this.out = null; + this.oli = null; } end(): void { - this.ins._remove(this); - this.o._remove(this.oli); this.out._c(); - this.out = null; - this.oli = null; } _n(t: T) { diff --git a/src/operator/FilterOperator.ts b/src/operator/FilterOperator.ts index 591b16d..4e52696 100644 --- a/src/operator/FilterOperator.ts +++ b/src/operator/FilterOperator.ts @@ -15,6 +15,7 @@ export class FilterOperator implements Operator { _stop(): void { this.ins._remove(this); + this.out = null; } _n(t: T) { diff --git a/src/operator/FlattenConcOperator.ts b/src/operator/FlattenConcOperator.ts index 5c86406..42186c7 100644 --- a/src/operator/FlattenConcOperator.ts +++ b/src/operator/FlattenConcOperator.ts @@ -34,6 +34,8 @@ export class FlattenConcOperator implements Operator, T> { _stop(): void { this.ins._remove(this); + this.active = 1; + this.out = null; } less(): void { diff --git a/src/operator/FlattenOperator.ts b/src/operator/FlattenOperator.ts index 8cebdb7..f393b05 100644 --- a/src/operator/FlattenOperator.ts +++ b/src/operator/FlattenOperator.ts @@ -37,6 +37,10 @@ export class FlattenOperator implements Operator, T> { _stop(): void { this.ins._remove(this); + this.curr = null; + this.inner = null; + this.open = true; + this.out = null; } cut(): void { diff --git a/src/operator/FoldOperator.ts b/src/operator/FoldOperator.ts index 0d5faff..25a3c64 100644 --- a/src/operator/FoldOperator.ts +++ b/src/operator/FoldOperator.ts @@ -19,6 +19,8 @@ export class FoldOperator implements Operator { _stop(): void { this.ins._remove(this); + this.out = null; + this.acc = this.seed; } _n(t: T) { diff --git a/src/operator/LastOperator.ts b/src/operator/LastOperator.ts index 1a073dd..c216e5f 100644 --- a/src/operator/LastOperator.ts +++ b/src/operator/LastOperator.ts @@ -17,6 +17,9 @@ export class LastOperator implements Operator { _stop(): void { this.ins._remove(this); + this.out = null; + this.has = false; + this.val = empty; } _n(t: T) { diff --git a/src/operator/MapFlattenConcOperator.ts b/src/operator/MapFlattenConcOperator.ts index ac73f7d..08bcb32 100644 --- a/src/operator/MapFlattenConcOperator.ts +++ b/src/operator/MapFlattenConcOperator.ts @@ -35,6 +35,8 @@ export class MapFlattenConcOperator implements InternalProducer, InternalL _stop(): void { this.mapOp.ins._remove(this); + this.active = 1; + this.out = null; } less(): void { diff --git a/src/operator/MapFlattenOperator.ts b/src/operator/MapFlattenOperator.ts index 6418d05..ee7cd7a 100644 --- a/src/operator/MapFlattenOperator.ts +++ b/src/operator/MapFlattenOperator.ts @@ -38,6 +38,10 @@ export class MapFlattenOperator implements InternalProducer, InternalListe _stop(): void { this.mapOp.ins._remove(this); + this.curr = null; + this.inner = null; + this.open = true; + this.out = null; } cut(): void { diff --git a/src/operator/MapOperator.ts b/src/operator/MapOperator.ts index 75589b1..0a4585a 100644 --- a/src/operator/MapOperator.ts +++ b/src/operator/MapOperator.ts @@ -15,6 +15,7 @@ export class MapOperator implements Operator { _stop(): void { this.ins._remove(this); + this.out = null; } _n(t: T) { diff --git a/src/operator/MapToOperator.ts b/src/operator/MapToOperator.ts index cec96c8..07c11df 100644 --- a/src/operator/MapToOperator.ts +++ b/src/operator/MapToOperator.ts @@ -15,6 +15,7 @@ export class MapToOperator implements Operator { _stop(): void { this.ins._remove(this); + this.out = null; } _n(t: T) { diff --git a/src/operator/ReplaceErrorOperator.ts b/src/operator/ReplaceErrorOperator.ts index 8db83c0..3bed11a 100644 --- a/src/operator/ReplaceErrorOperator.ts +++ b/src/operator/ReplaceErrorOperator.ts @@ -16,6 +16,7 @@ export class ReplaceErrorOperator implements Operator { _stop(): void { this.ins._remove(this); + this.out = null; } _n(t: T) { diff --git a/src/operator/StartWithOperator.ts b/src/operator/StartWithOperator.ts index 07c5fdb..fb06ab4 100644 --- a/src/operator/StartWithOperator.ts +++ b/src/operator/StartWithOperator.ts @@ -18,5 +18,6 @@ export class StartWithOperator implements InternalProducer { _stop(): void { this.ins._remove(this.out); + this.out = null; } } diff --git a/src/operator/TakeOperator.ts b/src/operator/TakeOperator.ts index 1644cc8..0d42a39 100644 --- a/src/operator/TakeOperator.ts +++ b/src/operator/TakeOperator.ts @@ -16,6 +16,8 @@ export class TakeOperator implements Operator { _stop(): void { this.ins._remove(this); + this.out = null; + this.taken = 0; } _n(t: T) { diff --git a/tests/operator/filter.ts b/tests/operator/filter.ts index 2dd1b63..f3d5070 100644 --- a/tests/operator/filter.ts +++ b/tests/operator/filter.ts @@ -35,4 +35,25 @@ describe('Stream.prototype.filter', () => { }, }); }); + + it('should clean up Operator producer when complete', (done) => { + const stream = xs.of(1, 2, 3).filter(i => i !== 2); + const expected = [1, 3]; + let completeCalled = false; + + stream.addListener({ + next: (x: number) => { + assert.strictEqual(x, expected.shift()); + assert.strictEqual(stream['_prod']['out'], stream); + }, + error: (err: any) => done(err), + complete: () => { + completeCalled = true; + }, + }); + + assert.strictEqual(completeCalled, true); + assert.strictEqual(stream['_prod']['out'], null); + done(); + }); }); diff --git a/tests/operator/map.ts b/tests/operator/map.ts index 8daab5d..7f7630a 100644 --- a/tests/operator/map.ts +++ b/tests/operator/map.ts @@ -3,20 +3,19 @@ import * as assert from 'assert'; describe('Stream.prototype.map', () => { it('should transform values from input stream to output stream', (done) => { - const stream = xs.periodic(100).map(i => 10 * i); + const stream = xs.periodic(100).map(i => 10 * i).take(3); const expected = [0, 10, 20]; - let listener = { + + stream.addListener({ next: (x: number) => { assert.equal(x, expected.shift()); - if (expected.length === 0) { - stream.removeListener(listener); - done(); - } - }, - error: done.fail, - complete: done.fail, - }; - stream.addListener(listener); + }, + error: (err: any) => done(err), + complete: () => { + assert.equal(expected.length, 0); + done(); + }, + }); }); it('should propagate user mistakes in project as errors', (done) => { @@ -36,4 +35,49 @@ describe('Stream.prototype.map', () => { }, }); }); + + it('should clean up Operator producer when complete', (done) => { + const stream = xs.of(1, 2, 3).map(i => i * 10); + const expected = [10, 20, 30]; + let completeCalled = false; + + stream.addListener({ + next: (x: number) => { + assert.strictEqual(x, expected.shift()); + assert.strictEqual(stream['_prod']['out'], stream); + }, + error: (err: any) => done(err), + complete: () => { + completeCalled = true; + }, + }); + + assert.strictEqual(completeCalled, true); + assert.strictEqual(stream['_prod']['out'], null); + done(); + }); + + it('should clean up Operator producer when failed', (done) => { + const stream = xs.of('a', 'b', 3).map(i => i.toUpperCase()); + const expected = ['A', 'B']; + let errorCalled = false; + + stream.addListener({ + next: (x: number) => { + assert.strictEqual(x, expected.shift()); + assert.strictEqual(stream['_prod']['out'], stream); + }, + error: (err: any) => { + errorCalled = true; + }, + complete: () => { + done('complete should not be called'); + }, + }); + + assert.strictEqual(errorCalled, true); + assert.strictEqual(expected.length, 0); + assert.strictEqual(stream['_prod']['out'], null); + done(); + }); });