Skip to content

Commit

Permalink
fix(flatten): add ins field as metadata to flatten
Browse files Browse the repository at this point in the history
Make flatten() and flattenConcurrently() implement the Operator interface, and contain the ins
field, so we can traverse the stream graph from bottom up.
  • Loading branch information
staltz committed May 12, 2016
1 parent 84742e8 commit cbc1f8b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 23 deletions.
51 changes: 28 additions & 23 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ export interface InternalProducer<T> {
}

export interface Operator<T, R> extends InternalProducer<R>, InternalListener<T> {
type: string;
ins: Stream<T>;
_start: (out: Stream<R>) => void;
_stop: () => void;
_n: (v: T) => void;
_e: (err: any) => void;
_c: () => void;
type: string;
}

export interface Producer<T> {
Expand Down Expand Up @@ -708,13 +709,13 @@ export class LastOperator<T> implements Operator<T, T> {
}
}

class MFCIL<T> implements InternalListener<T> {
constructor(private out: Stream<T>,
private op: MapFlattenConcOperator<T>) {
class MFCIL<R> implements InternalListener<R> {
constructor(private out: Stream<R>,
private op: MapFlattenConcOperator<any, R>) {
}

_n(t: T) {
this.out._n(t);
_n(r: R) {
this.out._n(r);
}

_e(err: any) {
Expand All @@ -726,15 +727,17 @@ class MFCIL<T> implements InternalListener<T> {
}
}

export class MapFlattenConcOperator<T> implements InternalProducer<T>, InternalListener<T> {
export class MapFlattenConcOperator<T, R> implements Operator<T, R> {
public type = 'map+flattenConcurrently';
public ins: Stream<T>;
private active: number = 1; // number of outers and inners that have not yet ended
private out: Stream<T> = null;
private out: Stream<R> = null;

constructor(public mapOp: MapOperator<T, Stream<T>>) {
constructor(public mapOp: MapOperator<T, Stream<R>>) {
this.ins = mapOp.ins;
}

_start(out: Stream<T>): void {
_start(out: Stream<R>): void {
this.out = out;
this.mapOp.ins._add(this);
}
Expand Down Expand Up @@ -769,13 +772,13 @@ export class MapFlattenConcOperator<T> implements InternalProducer<T>, InternalL
}
}

class MFIL<T> implements InternalListener<T> {
constructor(private out: Stream<T>,
private op: MapFlattenOperator<T>) {
class MFIL<R> implements InternalListener<R> {
constructor(private out: Stream<R>,
private op: MapFlattenOperator<any, R>) {
}

_n(t: T) {
this.out._n(t);
_n(r: R) {
this.out._n(r);
}

_e(err: any) {
Expand All @@ -788,17 +791,19 @@ class MFIL<T> implements InternalListener<T> {
}
}

export class MapFlattenOperator<T> implements InternalProducer<T>, InternalListener<T> {
export class MapFlattenOperator<T, R> implements Operator<T, R> {
public type = 'map+flatten';
public inner: Stream<T> = null; // Current inner Stream
private il: InternalListener<T> = null; // Current inner InternalListener
public ins: Stream<T>;
public inner: Stream<R> = null; // Current inner Stream
private il: InternalListener<R> = null; // Current inner InternalListener
private open: boolean = true;
private out: Stream<T> = null;
private out: Stream<R> = null;

constructor(public mapOp: MapOperator<T, Stream<T>>) {
constructor(public mapOp: MapOperator<T, Stream<R>>) {
this.ins = mapOp.ins;
}

_start(out: Stream<T>): void {
_start(out: Stream<R>): void {
this.out = out;
this.mapOp.ins._add(this);
}
Expand Down Expand Up @@ -1636,7 +1641,7 @@ export class Stream<T> implements InternalListener<T> {
const p = this._prod;
return <T> <any> new Stream<R>(
p instanceof MapOperator || p instanceof FilterMapOperator ?
new MapFlattenOperator(<MapOperator<R, Stream<R>>> <any> p) :
new MapFlattenOperator(<MapOperator<any, Stream<R>>> <any> p) :
new FlattenOperator(<Stream<Stream<R>>> <any> this)
);
}
Expand Down Expand Up @@ -1670,7 +1675,7 @@ export class Stream<T> implements InternalListener<T> {
const p = this._prod;
return <T> <any> new Stream<R>(
p instanceof MapOperator || p instanceof FilterMapOperator ?
new MapFlattenConcOperator(<MapOperator<R, Stream<R>>> <any> p) :
new MapFlattenConcOperator(<MapOperator<any, Stream<R>>> <any> p) :
new FlattenConcOperator(<Stream<Stream<R>>> <any> this)
);
}
Expand Down
9 changes: 9 additions & 0 deletions tests/operator/flatten.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ describe('Stream.prototype.flatten', () => {
});
});

it('should have an ins field as metadata', (done) => {
const source: Stream<number> = xs.periodic(100).take(3)
const stream: Stream<number> = source
.map((i: number) => xs.of(1 + i, 2 + i, 3 + i))
.flatten();
assert.strictEqual(stream['_prod']['ins'], source);
done();
});

it('should return a flat stream with correct TypeScript types', (done) => {
const streamStrings: Stream<string> = Stream.create({
start: (listener: Listener<string>) => {},
Expand Down
9 changes: 9 additions & 0 deletions tests/operator/flattenConcurrently.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ describe('Stream.prototype.flattenConcurrently', () => {
});
});

it('should have an ins field as metadata', (done) => {
const source: Stream<number> = xs.periodic(100).take(3)
const stream: Stream<number> = source
.map((i: number) => xs.of(1 + i, 2 + i, 3 + i))
.flattenConcurrently();
assert.strictEqual(stream['_prod']['ins'], source);
done();
});

it('should return a flat stream with correct TypeScript types', (done) => {
const streamStrings: Stream<string> = Stream.create({
start: (listener: Listener<string>) => {},
Expand Down

0 comments on commit cbc1f8b

Please sign in to comment.