Skip to content

Commit

Permalink
fix(imitate): fix cyclic execution leaks
Browse files Browse the repository at this point in the history
Rewrite imitate() with a new approach that redirects every listener of a mimic stream directly to
the target stream, and then does graph traversal to check if an isolated cycle exists in the stream
graph, whenever we remove a listener and the stream is about to have just one listener left.

Fixes #51.
  • Loading branch information
staltz committed Jun 14, 2016
1 parent be07a01 commit 8658aa0
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 80 deletions.
128 changes: 75 additions & 53 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export interface InternalListener<T> {
_c: () => void;
}

export const emptyListener: InternalListener<any> = {
export const emptyIL: InternalListener<any> = {
_n: noop,
_e: noop,
_c: noop,
Expand All @@ -29,14 +29,20 @@ export interface InternalProducer<T> {
_stop: () => void;
}

export interface Operator<T, R> extends InternalProducer<R>, InternalListener<T> {
export interface OutSender<T> {
out: Stream<T>;
}

export interface Operator<T, R> extends InternalProducer<R>, InternalListener<T>, OutSender<R> {
type: string;
ins: Stream<T>;
_start: (out: Stream<R>) => void;
_stop: () => void;
_n: (v: T) => void;
_e: (err: any) => void;
_c: () => void;
}

export interface Aggregator<T, U> extends InternalProducer<U>, OutSender<U> {
type: string;
insArr: Array<Stream<T>>;
_start: (out: Stream<U>) => void;
}

export interface Producer<T> {
Expand Down Expand Up @@ -74,26 +80,26 @@ function and<T>(f1: (t: T) => boolean, f2: (t: T) => boolean): (t: T) => boolean
};
}

export class MergeProducer<T> implements InternalProducer<T>, InternalListener<T> {
export class MergeProducer<T> implements Aggregator<T, T>, InternalListener<T> {
public type = 'merge';
private out: InternalListener<T> = emptyListener;
public out: Stream<T> = null;
private ac: number; // ac is activeCount, starts initialized

constructor(public streams: Array<Stream<T>>) {
this.ac = streams.length;
constructor(public insArr: Array<Stream<T>>) {
this.ac = insArr.length;
}

_start(out: InternalListener<T>): void {
_start(out: Stream<T>): void {
this.out = out;
const s = this.streams;
const s = this.insArr;
const L = s.length;
for (let i = 0; i < L; i++) {
s[i]._add(this);
}
}

_stop(): void {
const s = this.streams;
const s = this.insArr;
const L = s.length;
for (let i = 0; i < L; i++) {
s[i]._remove(this);
Expand Down Expand Up @@ -154,22 +160,23 @@ export interface CombineSignature {
(...stream: Array<Stream<any>>): Stream<Array<any>>;
}

export class CombineListener<T> implements InternalListener<T> {
export class CombineListener<T> implements InternalListener<T>, OutSender<Array<T>> {
constructor(private i: number,
public out: Stream<Array<T>>,
private p: CombineProducer<T>) {
p.ils.push(this);
}

_n(t: T): void {
const p = this.p, out = p.out;
const p = this.p, out = this.out;
if (!out) return;
if (p.up(t, this.i)) {
out._n(p.vals);
}
}

_e(err: any): void {
const out = this.p.out;
const out = this.out;
if (!out) return;
out._e(err);
}
Expand All @@ -183,16 +190,16 @@ export class CombineListener<T> implements InternalListener<T> {
}
}

export class CombineProducer<R> implements InternalProducer<Array<R>> {
export class CombineProducer<R> implements Aggregator<any, Array<R>> {
public type = 'combine';
public out: InternalListener<Array<R>> = null;
public out: Stream<Array<R>> = null;
public ils: Array<CombineListener<any>> = [];
public Nc: number; // *N*umber of streams still to send *c*omplete
public Nn: number; // *N*umber of streams still to send *n*ext
public vals: Array<R>;

constructor(public s: Array<Stream<any>>) {
const n = this.Nc = this.Nn = s.length;
constructor(public insArr: Array<Stream<any>>) {
const n = this.Nc = this.Nn = insArr.length;
const vals = this.vals = new Array(n);
for (let i = 0; i < n; i++) {
vals[i] = empty;
Expand All @@ -206,22 +213,22 @@ export class CombineProducer<R> implements InternalProducer<Array<R>> {
return Nn === 0;
}

_start(out: InternalListener<Array<R>>): void {
_start(out: Stream<Array<R>>): void {
this.out = out;
const s = this.s;
const s = this.insArr;
const n = s.length;
if (n === 0) {
out._n([]);
out._c();
} else {
for (let i = 0; i < n; i++) {
s[i]._add(new CombineListener(i, this));
s[i]._add(new CombineListener(i, out, this));
}
}
}

_stop(): void {
const s = this.s;
const s = this.insArr;
const n = this.Nc = this.Nn = s.length;
const vals = this.vals = new Array(n);
for (let i = 0; i < n; i++) {
Expand Down Expand Up @@ -303,7 +310,7 @@ export class PeriodicProducer implements InternalProducer<number> {

export class DebugOperator<T> implements Operator<T, T> {
public type = 'debug';
private out: Stream<T> = null;
public out: Stream<T> = null;
private s: (t: T) => any = null; // spy
private l: string = null; // label

Expand Down Expand Up @@ -359,7 +366,7 @@ export class DebugOperator<T> implements Operator<T, T> {

export class DropOperator<T> implements Operator<T, T> {
public type = 'drop';
private out: Stream<T> = null;
public out: Stream<T> = null;
private dropped: number = 0;

constructor(public max: number,
Expand Down Expand Up @@ -416,8 +423,8 @@ class OtherIL<T> implements InternalListener<any> {

export class EndWhenOperator<T> implements Operator<T, T> {
public type = 'endWhen';
private out: Stream<T> = null;
private oil: InternalListener<any> = emptyListener; // oil = other InternalListener
public out: Stream<T> = null;
private oil: InternalListener<any> = emptyIL; // oil = other InternalListener

constructor(public o: Stream<any>, // o = other
public ins: Stream<T>) {
Expand Down Expand Up @@ -461,7 +468,7 @@ export class EndWhenOperator<T> implements Operator<T, T> {

export class FilterOperator<T> implements Operator<T, T> {
public type = 'filter';
private out: Stream<T> = null;
public out: Stream<T> = null;

constructor(public passes: (t: T) => boolean,
public ins: Stream<T>) {
Expand Down Expand Up @@ -524,7 +531,7 @@ export class FlattenOperator<T> implements Operator<Stream<T>, T> {
public inner: Stream<T> = null; // Current inner Stream
private il: InternalListener<T> = null; // Current inner InternalListener
private open: boolean = true;
private out: Stream<T> = null;
public out: Stream<T> = null;

constructor(public ins: Stream<Stream<T>>) {
}
Expand Down Expand Up @@ -570,7 +577,7 @@ export class FlattenOperator<T> implements Operator<Stream<T>, T> {

export class FoldOperator<T, R> implements Operator<T, R> {
public type = 'fold';
private out: Stream<R> = null;
public out: Stream<R> = null;
private acc: R; // initialized as seed

constructor(public f: (acc: R, t: T) => R,
Expand Down Expand Up @@ -616,7 +623,7 @@ export class FoldOperator<T, R> implements Operator<T, R> {

export class LastOperator<T> implements Operator<T, T> {
public type = 'last';
private out: Stream<T> = null;
public out: Stream<T> = null;
private has: boolean = false;
private val: T = <T> empty;

Expand Down Expand Up @@ -683,7 +690,7 @@ export class MapFlattenOperator<T, R> implements Operator<T, R> {
public inner: Stream<R> = null; // Current inner Stream
private il: InternalListener<R> = null; // Current inner InternalListener
private open: boolean = true;
private out: Stream<R> = null;
public out: Stream<R> = null;

constructor(public mapOp: MapOperator<T, Stream<R>>) {
this.type = `${mapOp.type}+flatten`;
Expand Down Expand Up @@ -739,7 +746,7 @@ export class MapFlattenOperator<T, R> implements Operator<T, R> {

export class MapOperator<T, R> implements Operator<T, R> {
public type = 'map';
protected out: Stream<R> = null;
public out: Stream<R> = null;

constructor(public project: (t: T) => R,
public ins: Stream<T>) {
Expand Down Expand Up @@ -795,7 +802,7 @@ export class FilterMapOperator<T, R> extends MapOperator<T, R> {

export class ReplaceErrorOperator<T> implements Operator<T, T> {
public type = 'replaceError';
private out: Stream<T> = <Stream<T>> empty;
public out: Stream<T> = <Stream<T>> empty;

constructor(public fn: (err: any) => Stream<T>,
public ins: Stream<T>) {
Expand Down Expand Up @@ -837,7 +844,7 @@ export class ReplaceErrorOperator<T> implements Operator<T, T> {

export class StartWithOperator<T> implements InternalProducer<T> {
public type = 'startWith';
private out: InternalListener<T> = emptyListener;
private out: InternalListener<T> = emptyIL;

constructor(public ins: Stream<T>,
public value: T) {
Expand All @@ -857,7 +864,7 @@ export class StartWithOperator<T> implements InternalProducer<T> {

export class TakeOperator<T> implements Operator<T, T> {
public type = 'take';
private out: Stream<T> = null;
public out: Stream<T> = null;
private taken: number = 0;

constructor(public max: number,
Expand Down Expand Up @@ -901,15 +908,13 @@ export class TakeOperator<T> implements Operator<T, T> {

export class Stream<T> implements InternalListener<T> {
protected _ils: Array<InternalListener<T>>; // 'ils' = Internal listeners
protected _hil: InternalListener<T>; // 'hil' = Hidden Internal Listener
protected _stopID: any = empty;
protected _prod: InternalProducer<T>;
protected _target: Stream<T>; // imitation target if this Stream will imitate

constructor(producer?: InternalProducer<T>) {
this._prod = producer;
this._ils = [];
this._hil = null;
this._target = null;
}

Expand All @@ -920,8 +925,6 @@ export class Stream<T> implements InternalListener<T> {
const b = copy(a);
for (let i = 0; i < L; i++) b[i]._n(t);
}
const h = this._hil;
if (h) h._n(t);
}

_e(err: any): void {
Expand All @@ -931,8 +934,6 @@ export class Stream<T> implements InternalListener<T> {
const b = copy(a);
for (let i = 0; i < L; i++) b[i]._e(err);
}
const h = this._hil;
if (h) h._e(err);
this._x();
}

Expand All @@ -943,8 +944,6 @@ export class Stream<T> implements InternalListener<T> {
const b = copy(a);
for (let i = 0; i < L; i++) b[i]._c();
}
const h = this._hil;
if (h) h._c();
this._x();
}

Expand Down Expand Up @@ -983,9 +982,7 @@ export class Stream<T> implements InternalListener<T> {

_add(il: InternalListener<T>): void {
const ta = this._target;
if (ta && ta._ils.length === 0) {
return ta._add(il);
}
if (ta) return ta._add(il);
const a = this._ils;
a.push(il);
if (a.length === 1) {
Expand All @@ -999,21 +996,47 @@ export class Stream<T> implements InternalListener<T> {
}

_remove(il: InternalListener<T>): void {
const ta = this._target;
if (ta) return ta._remove(il);
const a = this._ils;
const i = a.indexOf(il);
if (i > -1) {
a.splice(i, 1);
const p = this._prod;
if (p && a.length <= 0) {
this._stopID = setTimeout(() => p._stop());
} else if (a.length === 1) {
this._pruneCycles();
}
} else if (this._target) {
this._target._remove(il);
}
}

_setHIL(il: InternalListener<T>): void {
this._hil = il;
// If all paths stemming from `this` stream eventually end at `this`
// stream, then we remove the single listener of `this` stream, to
// force it to end its execution and dispose resources. This method
// assumes as a precondition that this._ils has just one listener.
_pruneCycles() {
if (this._onlyReachesThis(this)) {
this._remove(this._ils[0]);
}
}

// Checks whether *all* paths starting from `x` will eventually end at
// `this` stream, on the stream graph, following edges A->B where B is
// a listener of A.
_onlyReachesThis(x: InternalListener<any>): boolean {
if ((<OutSender<any>><any>x).out === this) {
return true;
} else if ((<OutSender<any>><any>x).out) {
return this._onlyReachesThis((<OutSender<any>><any>x).out);
} else if ((<Stream<any>>x)._ils) {
for (let i = 0; i < (<Stream<any>>x)._ils.length; i++) {
if (!this._onlyReachesThis((<Stream<any>>x)._ils[i])) return false;
}
return true;
} else {
return false;
}
}

private ctor(): typeof Stream {
Expand Down Expand Up @@ -1684,7 +1707,6 @@ export class Stream<T> implements InternalListener<T> {
'https://github.com/staltz/xstream#faq');
}
this._target = target;
target._setHIL(this);
}

/**
Expand Down
8 changes: 4 additions & 4 deletions src/extra/concat.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import {Stream, InternalProducer, InternalListener} from '../core';
import {Stream, InternalProducer, InternalListener, OutSender} from '../core';

class ConcatProducer<T> implements InternalProducer<T>, InternalListener<T> {
class ConcatProducer<T> implements InternalProducer<T>, InternalListener<T>, OutSender<T> {
public type = 'concat';
private out: InternalListener<T> = null;
public out: Stream<T> = null;
private i: number = 0;

constructor(public streams: Array<Stream<T>>) {
}

_start(out: InternalListener<T>): void {
_start(out: Stream<T>): void {
this.out = out;
this.streams[this.i]._add(this);
}
Expand Down
2 changes: 1 addition & 1 deletion src/extra/debounce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {Operator, Stream} from '../core';

class DebounceOperator<T> implements Operator<T, T> {
public type = 'debounce';
private out: Stream<T> = null;
public out: Stream<T> = null;
private value: T = null;
private id: any = null;

Expand Down
Loading

0 comments on commit 8658aa0

Please sign in to comment.