Skip to content

Commit

Permalink
fix(operator): add more tear down logic in _stop() in operators
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros committed Apr 10, 2016
1 parent 7652011 commit 2483107
Show file tree
Hide file tree
Showing 26 changed files with 137 additions and 24 deletions.
1 change: 1 addition & 0 deletions src/extra/concat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class ConcatProducer<T> implements InternalProducer<T>, InternalListener<T> {
streams[this.i]._remove(this);
}
this.i = 0;
this.out = null;
}

_n(t: T) {
Expand Down
3 changes: 3 additions & 0 deletions src/extra/debounce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ class DebounceOperator<T> implements Operator<T, T> {

_stop(): void {
this.ins._remove(this);
this.out = null;
this.value = null;
this.id = null;
}

clearTimer() {
Expand Down
1 change: 1 addition & 0 deletions src/extra/delay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class DelayOperator<T> implements Operator<T, T> {

_stop(): void {
this.ins._remove(this);
this.out = null;
}

_n(t: T) {
Expand Down
2 changes: 2 additions & 0 deletions src/extra/dropRepeats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ export class DropRepeatsOperator<T> implements Operator<T, T> {

_stop(): void {
this.ins._remove(this);
this.out = null;
this.v = <any> empty;
}

isEq(x: T, y: T) {
Expand Down
1 change: 1 addition & 0 deletions src/extra/fromEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export class DOMEventProducer implements InternalProducer<Event> {
_stop() {
const {node, eventType, listener, useCapture} = this;
node.removeEventListener(eventType, listener, useCapture);
this.listener = null;
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/extra/pairwise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ class PairwiseOperator<T> implements Operator<T, [T, T]> {

_stop(): void {
this.ins._remove(this);
this.has = false;
this.out = null;
this.val = null;
}

_n(t: T) {
Expand Down
15 changes: 11 additions & 4 deletions src/factory/CombineProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,22 @@ export class CombineProducer<R> implements InternalProducer<R> {

_start(out: InternalListener<R>): void {
this.out = out;
for (let i = this.streams.length - 1; i >= 0; i--) {
this.streams[i]._add(new Proxy(i, this));
const streams = this.streams;
for (let i = streams.length - 1; i >= 0; i--) {
streams[i]._add(new Proxy(i, this));
}
}

_stop(): void {
for (let i = this.streams.length - 1; i >= 0; i--) {
this.streams[i]._remove(this.proxies[i]);
const streams = this.streams;
for (let i = streams.length - 1; i >= 0; i--) {
streams[i]._remove(this.proxies[i]);
}
this.out = null;
this.ac = streams.length;
this.proxies = [];
this.ready = false;
this.vals = new Array(streams.length);
this.hasVal = new Array(streams.length);
}
}
12 changes: 8 additions & 4 deletions src/factory/MergeProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@ export class MergeProducer<T> implements InternalProducer<T>, InternalListener<T

_start(out: InternalListener<T>): void {
this.out = out;
for (let i = this.streams.length - 1; i >= 0; i--) {
this.streams[i]._add(this);
const streams = this.streams;
for (let i = streams.length - 1; i >= 0; i--) {
streams[i]._add(this);
}
}

_stop(): void {
for (let i = this.streams.length - 1; i >= 0; i--) {
this.streams[i]._remove(this);
const streams = this.streams;
for (let i = streams.length - 1; i >= 0; i--) {
streams[i]._remove(this);
}
this.out = null;
this.ac = streams.length;
}

_n(t: T) {
Expand Down
3 changes: 2 additions & 1 deletion src/factory/PeriodicProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ export class PeriodicProducer implements InternalProducer<number> {
}

_stop(): void {
this.i = 0;
if (this.intervalID !== -1) clearInterval(this.intervalID);
this.intervalID = -1;
this.i = 0;
}
}
1 change: 1 addition & 0 deletions src/operator/DebugOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export class DebugOperator<T> implements Operator<T, T> {

_stop(): void {
this.ins._remove(this);
this.out = null;
}

_n(t: T) {
Expand Down
2 changes: 2 additions & 0 deletions src/operator/DropOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ export class DropOperator<T> implements Operator<T, T> {

_stop(): void {
this.ins._remove(this);
this.out = null;
this.dropped = 0;
}

_n(t: T) {
Expand Down
6 changes: 2 additions & 4 deletions src/operator/EndWhenOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,12 @@ export class EndWhenOperator<T> implements Operator<T, T> {
_stop(): void {
this.ins._remove(this);
this.o._remove(this.oli);
this.out = null;
this.oli = null;
}

end(): void {
this.ins._remove(this);
this.o._remove(this.oli);
this.out._c();
this.out = null;
this.oli = null;
}

_n(t: T) {
Expand Down
1 change: 1 addition & 0 deletions src/operator/FilterOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export class FilterOperator<T> implements Operator<T, T> {

_stop(): void {
this.ins._remove(this);
this.out = null;
}

_n(t: T) {
Expand Down
2 changes: 2 additions & 0 deletions src/operator/FlattenConcOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ export class FlattenConcOperator<T> implements Operator<Stream<T>, T> {

_stop(): void {
this.ins._remove(this);
this.active = 1;
this.out = null;
}

less(): void {
Expand Down
4 changes: 4 additions & 0 deletions src/operator/FlattenOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ export class FlattenOperator<T> implements Operator<Stream<T>, T> {

_stop(): void {
this.ins._remove(this);
this.curr = null;
this.inner = null;
this.open = true;
this.out = null;
}

cut(): void {
Expand Down
2 changes: 2 additions & 0 deletions src/operator/FoldOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ export class FoldOperator<T, R> implements Operator<T, R> {

_stop(): void {
this.ins._remove(this);
this.out = null;
this.acc = this.seed;
}

_n(t: T) {
Expand Down
3 changes: 3 additions & 0 deletions src/operator/LastOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ export class LastOperator<T> implements Operator<T, T> {

_stop(): void {
this.ins._remove(this);
this.out = null;
this.has = false;
this.val = <T> empty;
}

_n(t: T) {
Expand Down
2 changes: 2 additions & 0 deletions src/operator/MapFlattenConcOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ export class MapFlattenConcOperator<T> implements InternalProducer<T>, InternalL

_stop(): void {
this.mapOp.ins._remove(this);
this.active = 1;
this.out = null;
}

less(): void {
Expand Down
4 changes: 4 additions & 0 deletions src/operator/MapFlattenOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ export class MapFlattenOperator<T> implements InternalProducer<T>, InternalListe

_stop(): void {
this.mapOp.ins._remove(this);
this.curr = null;
this.inner = null;
this.open = true;
this.out = null;
}

cut(): void {
Expand Down
1 change: 1 addition & 0 deletions src/operator/MapOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export class MapOperator<T, R> implements Operator<T, R> {

_stop(): void {
this.ins._remove(this);
this.out = null;
}

_n(t: T) {
Expand Down
1 change: 1 addition & 0 deletions src/operator/MapToOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export class MapToOperator<T, R> implements Operator<T, R> {

_stop(): void {
this.ins._remove(this);
this.out = null;
}

_n(t: T) {
Expand Down
1 change: 1 addition & 0 deletions src/operator/ReplaceErrorOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export class ReplaceErrorOperator<T> implements Operator<T, T> {

_stop(): void {
this.ins._remove(this);
this.out = null;
}

_n(t: T) {
Expand Down
1 change: 1 addition & 0 deletions src/operator/StartWithOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ export class StartWithOperator<T> implements InternalProducer<T> {

_stop(): void {
this.ins._remove(this.out);
this.out = null;
}
}
2 changes: 2 additions & 0 deletions src/operator/TakeOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ export class TakeOperator<T> implements Operator<T, T> {

_stop(): void {
this.ins._remove(this);
this.out = null;
this.taken = 0;
}

_n(t: T) {
Expand Down
21 changes: 21 additions & 0 deletions tests/operator/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,25 @@ describe('Stream.prototype.filter', () => {
},
});
});

it('should clean up Operator producer when complete', (done) => {
const stream = xs.of(1, 2, 3).filter(i => i !== 2);
const expected = [1, 3];
let completeCalled = false;

stream.addListener({
next: (x: number) => {
assert.strictEqual(x, expected.shift());
assert.strictEqual(stream['_prod']['out'], stream);
},
error: (err: any) => done(err),
complete: () => {
completeCalled = true;
},
});

assert.strictEqual(completeCalled, true);
assert.strictEqual(stream['_prod']['out'], null);
done();
});
});
66 changes: 55 additions & 11 deletions tests/operator/map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@ import * as assert from 'assert';

describe('Stream.prototype.map', () => {
it('should transform values from input stream to output stream', (done) => {
const stream = xs.periodic(100).map(i => 10 * i);
const stream = xs.periodic(100).map(i => 10 * i).take(3);
const expected = [0, 10, 20];
let listener = {

stream.addListener({
next: (x: number) => {
assert.equal(x, expected.shift());
if (expected.length === 0) {
stream.removeListener(listener);
done();
}
},
error: done.fail,
complete: done.fail,
};
stream.addListener(listener);
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
},
});
});

it('should propagate user mistakes in project as errors', (done) => {
Expand All @@ -36,4 +35,49 @@ describe('Stream.prototype.map', () => {
},
});
});

it('should clean up Operator producer when complete', (done) => {
const stream = xs.of(1, 2, 3).map(i => i * 10);
const expected = [10, 20, 30];
let completeCalled = false;

stream.addListener({
next: (x: number) => {
assert.strictEqual(x, expected.shift());
assert.strictEqual(stream['_prod']['out'], stream);
},
error: (err: any) => done(err),
complete: () => {
completeCalled = true;
},
});

assert.strictEqual(completeCalled, true);
assert.strictEqual(stream['_prod']['out'], null);
done();
});

it('should clean up Operator producer when failed', (done) => {
const stream = xs.of<any>('a', 'b', 3).map(i => i.toUpperCase());
const expected = ['A', 'B'];
let errorCalled = false;

stream.addListener({
next: (x: number) => {
assert.strictEqual(x, expected.shift());
assert.strictEqual(stream['_prod']['out'], stream);
},
error: (err: any) => {
errorCalled = true;
},
complete: () => {
done('complete should not be called');
},
});

assert.strictEqual(errorCalled, true);
assert.strictEqual(expected.length, 0);
assert.strictEqual(stream['_prod']['out'], null);
done();
});
});

0 comments on commit 2483107

Please sign in to comment.