Skip to content

Commit

Permalink
feat(flatten): implement flatten operator, a.k.a. switch()
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros committed Mar 26, 2016
1 parent b3a87ee commit 6255e53
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {DebugOperator} from './operator/DebugOperator';
import {FoldOperator} from './operator/FoldOperator';
import {LastOperator} from './operator/LastOperator';
import {StartWithOperator} from './operator/StartWithOperator';
import {FlattenOperator} from './operator/FlattenOperator';
import {FlattenConcurrentlyOperator} from './operator/FlattenConcurrentlyOperator';
import {
CombineProducer,
Expand Down Expand Up @@ -166,6 +167,10 @@ export class Stream<T> implements Listener<T> {
return new Stream<T>(new StartWithOperator(this, x));
}

flatten<R, T extends Stream<R>>(): T {
return <T> new Stream<R>(new FlattenOperator(<Stream<Stream<R>>> (<any> this)));
}

flattenConcurrently<R, T extends Stream<R>>(): T {
return <T> new Stream<R>(new FlattenConcurrentlyOperator(<Stream<Stream<R>>> (<any> this)));
}
Expand Down
107 changes: 107 additions & 0 deletions src/operator/FlattenOperator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import {Observer} from '../Observer';
import {Operator} from '../Operator';
import {Stream} from '../Stream';
import {emptyObserver} from '../utils/emptyObserver';
import {MapOperator} from './MapOperator';

export class Inner<T> implements Observer<T> {
constructor(public out: Stream<T>,
public op: FlattenOperator<T>) {
}

next(t: T) {
this.out.next(t);
}

error(err: any) {
this.out.error(err);
}

end() {
this.op.curr = null;
this.op.less();
}
}

export class Outer<T> implements Observer<Stream<T>> {
constructor(public out: Stream<T>,
public op: FlattenOperator<T>) {
}

next(s: Stream<T>) {
this.op.cut();
(this.op.curr = s).subscribe(this.op.inner = new Inner(this.out, this.op));
}

error(err: any) {
this.out.error(err);
}

end() {
this.op.open = false;
this.op.less();
}
}

export class MapOuter<T> implements Observer<T> {
constructor(public out: Stream<T>,
public pr: (t: T) => Stream<T>, // pr = project
public op: FlattenOperator<T>) {
}

next(v: T) {
this.op.cut();
(this.op.curr = this.pr(v)).subscribe(this.op.inner = new Inner(this.out, this.op));
}

error(err: any) {
this.out.error(err);
}

end() {
this.op.open = false;
this.op.less();
}
}

export class FlattenOperator<T> implements Operator<Stream<T>, T> {
public proxy: Observer<T | Stream<T>> = emptyObserver;
public mapOp: MapOperator<T, Stream<T>>;
public curr: Stream<T>; // Current inner Stream
public inner: Observer<T>; // Current inner Observer
public open: boolean = true;
public out: Stream<T>;

constructor(public ins: Stream<Stream<T>>) {
if (ins._prod instanceof MapOperator) {
this.mapOp = <MapOperator<T, Stream<T>>> ins._prod;
}
}

start(out: Stream<T>): void {
this.out = out;
const mapOp = this.mapOp;
if (mapOp) {
mapOp.ins.subscribe(this.proxy = new MapOuter(out, mapOp.project, this));
} else {
this.ins.subscribe(this.proxy = new Outer(out, this));
}
}

stop(): void {
this.ins.unsubscribe(this.proxy);
}

cut(): void {
const {curr, inner} = this;
if (curr && inner) {
curr.unsubscribe(inner);
}
}

less(): void {
if (!this.open && !this.curr) {
this.out.end();
}
}
}
73 changes: 73 additions & 0 deletions tests/operator/flatten.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import xs from '../../src/index';
import * as assert from 'assert';

describe('Stream.prototype.flatten', () => {
describe('with map', () => {
it('should expand each interval event with 3 sync events', (done) => {
const stream = xs.interval(100).take(3)
.map(i => xs.of(1 + i, 2 + i, 3 + i))
.flatten();
const expected = [1, 2, 3, 2, 3, 4, 3, 4, 5];
const observer = {
next: (x: number) => {
assert.equal(x, expected.shift());
if (expected.length === 0) {
stream.unsubscribe(observer);
done();
}
},
error: done.fail,
end: done.fail,
};
stream.subscribe(observer);
});

it('should expand 3 sync events as an interval, only last one passes', (done) => {
const stream = xs.from([0, 1, 2])
.map(i => xs.interval(100 * i).take(2).map(x => `${i}${x}`))
.flatten();
// ---x---x---x---x---x---x
// ---00--01
// -------10------11
// -----------20----------21
const expected = ['20', '21'];
const observer = {
next: (x: number) => {
assert.equal(x, expected.shift());
if (expected.length === 0) {
stream.unsubscribe(observer);
done();
}
},
error: (err: any) => done(err),
end: () => done(new Error('No end() should be called')),
};
stream.subscribe(observer);
});

it('should expand 3 async events as an interval each', (done) => {
const stream = xs.interval(140).take(3)
.map(i =>
xs.interval(100 * (i < 2 ? 1 : i)).take(3).map(x => `${i}${x}`)
)
.flatten();
// ---x---x---x---x---x---x---x---x---x---x---x---x
// ---00--01--02
// ----10--11--12
// ------------20-----------21----------22
const expected = ['00', '10', '20', '21', '22'];
const observer = {
next: (x: number) => {
assert.equal(x, expected.shift());
if (expected.length === 0) {
stream.unsubscribe(observer);
done();
}
},
error: (err: any) => done(err),
end: () => done(new Error('No end() should be called')),
};
stream.subscribe(observer);
});
});
});
25 changes: 25 additions & 0 deletions tests/operator/flattenConcurrently.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,30 @@ describe('Stream.prototype.flattenConcurrently', () => {
};
stream.addListener(listener);
});

it('should expand 3 async events as an interval each', (done) => {
const stream = xs.interval(140).take(3)
.map(i =>
xs.interval(100 * (i < 2 ? 1 : i)).take(3).map(x => `${i}${x}`)
)
.flattenConcurrently();
// ---x---x---x---x---x---x---x---x---x---x---x---x
// ---00--01--02
// ----10--11--12
// ------------20-----------21----------22
const expected = ['00', '01', '10', '02', '11', '12', '20', '21', '22'];
const observer = {
next: (x: number) => {
assert.equal(x, expected.shift());
if (expected.length === 0) {
stream.unsubscribe(observer);
done();
}
},
error: (err: any) => done(err),
end: () => done(new Error('No end() should be called')),
};
stream.subscribe(observer);
});
});
});

0 comments on commit 6255e53

Please sign in to comment.