Skip to content

Commit

Permalink
feat(operator): implement flatten() operator
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros committed Mar 20, 2016
1 parent cb58f7e commit 4800873
Show file tree
Hide file tree
Showing 4 changed files with 247 additions and 0 deletions.
99 changes: 99 additions & 0 deletions perf/flatMap.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
var Benchmark = require('benchmark');
var xs = require('../lib/index').default;
var most = require('most');
var rx = require('rx');
var rxjs = require('@reactivex/rxjs')
var kefir = require('kefir');
var bacon = require('baconjs');
var lodash = require('lodash');
var highland = require('highland');

var runners = require('./runners');
var kefirFromArray = runners.kefirFromArray;

// flatMapping n streams, each containing m items.
// Results in a single stream that merges in n x m items
// In Array parlance: Take an Array containing n Arrays, each of length m,
// and flatten it to an Array of length n x m.
var mn = runners.getIntArg2(1000, 1000);
var a = build(mn[0], mn[1]);

function build(m, n) {
var a = new Array(n);
for(var i = 0; i< a.length; ++i) {
a[i] = buildArray(i*1000, m);
}
return a;
}

function buildArray(base, n) {
var a = new Array(n);
for(var i = 0; i< a.length; ++i) {
a[i] = base + i;
}
return a;
}

var suite = Benchmark.Suite('flatMap ' + mn[0] + ' x ' + mn[1] + ' streams');
var options = {
defer: true,
onError: function(e) {
e.currentTarget.failure = e.error;
}
};

suite
.add('xstream', function(deferred) {
runners.runXStream(deferred,
xs.from(a).map(xs.from).flatten().fold(sum, 0).last());
}, options)
.add('most', function(deferred) {
runners.runMost(deferred, most.from(a).flatMap(most.from).reduce(sum, 0));
}, options)
.add('rx 5', function(deferred) {
runners.runRx5(deferred,
rxjs.Observable.fromArray(a).flatMap(
function(x) {return rxjs.Observable.fromArray(x)}).reduce(sum, 0))
}, options)
.add('rx 4', function(deferred) {
runners.runRx(deferred, rx.Observable.fromArray(a).flatMap(rx.Observable.fromArray).reduce(sum, 0));
}, options)
.add('kefir', function(deferred) {
runners.runKefir(deferred, kefirFromArray(a).flatMap(kefirFromArray).scan(sum, 0).last());
}, options)
.add('bacon', function(deferred) {
runners.runBacon(deferred, bacon.fromArray(a).flatMap(bacon.fromArray).reduce(0, sum));
}, options)
.add('highland', function(deferred) {
runners.runHighland(deferred, highland(a).flatMap(highland).reduce(0, sum));
}, options)
.add('lodash', function() {
return lodashFlatMap(identity, a).reduce(sum, 0);
})
.add('Array', function() {
return arrayFlatMap(identity, a).reduce(sum, 0);
});

runners.runSuite(suite);

function arrayFlatMap(f, a) {
return a.reduce(function(a, x) {
return a.concat(f(x));
}, []);
}

function lodashFlatMap(f, a) {
return lodash(a).map(f).flatten();
}

function sum(x, y) {
return x + y;
}

function even(x) {
return x % 2 === 0;
}

function identity(x) {
return x;
}
5 changes: 5 additions & 0 deletions src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {DebugOperator} from './operator/DebugOperator';
import {FoldOperator} from './operator/FoldOperator';
import {LastOperator} from './operator/LastOperator';
import {StartWithOperator} from './operator/StartWithOperator';
import {FlattenOperator} from './operator/FlattenOperator';
import {
CombineProducer,
CombineInstanceSignature,
Expand Down Expand Up @@ -161,6 +162,10 @@ export class Stream<T> implements Observer<T> {
return new Stream<T>(new StartWithOperator(this, x));
}

flatten<R, T extends Stream<R>>(): T {
return <T> new Stream<R>(new FlattenOperator(<Stream<Stream<R>>> (<any> this)));
}

merge(other: Stream<T>): Stream<T> {
return Stream.merge(this, other);
}
Expand Down
95 changes: 95 additions & 0 deletions src/operator/FlattenOperator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import {Observer} from '../Observer';
import {Operator} from '../Operator';
import {Stream} from '../Stream';
import {emptyObserver} from '../utils/emptyObserver';
import {MapOperator} from './MapOperator';

export class Inner<T> implements Observer<T> {
constructor(public out: Stream<T>,
public op: FlattenOperator<T>) {
}

next(t: T) {
this.out.next(t);
}

error(err: any) {
this.out.error(err);
}

end() {
this.op.less();
}
}

export class Outer<T> implements Observer<Stream<T>> {
constructor(public out: Stream<T>,
public op: FlattenOperator<T>) {
}

next(s: Stream<T>) {
s.subscribe(new Inner(this.out, this.op));
}

error(err: any) {
this.out.error(err);
}

end() {
this.op.less();
}
}

export class MapOuter<T> implements Observer<T> {
constructor(public out: Stream<T>,
public pr: (t: T) => Stream<T>,
public op: FlattenOperator<T>) { // pr = project
}

next(v: T) {
this.op.active++;
this.pr(v).subscribe(new Inner(this.out, this.op));
}

error(err: any) {
this.out.error(err);
}

end() {
this.op.less();
}
}

export class FlattenOperator<T> implements Operator<Stream<T>, T> {
public proxy: Observer<T | Stream<T>> = emptyObserver;
public mapOp: MapOperator<T, Stream<T>>;
public active: number = 1; // number of outers and inners that have not yet ended
public out: Stream<T>;

constructor(public ins: Stream<Stream<T>>) {
if (ins._prod instanceof MapOperator) {
this.mapOp = <MapOperator<T, Stream<T>>> ins._prod;
}
}

start(out: Stream<T>): void {
this.out = out;
const mapOp = this.mapOp;
if (mapOp) {
mapOp.ins.subscribe(this.proxy = new MapOuter(out, mapOp.project, this));
} else {
this.ins.subscribe(this.proxy = new Outer(out, this));
}
}

stop(): void {
this.ins.unsubscribe(this.proxy);
}

less(): void {
this.active--;
if (this.active === 0) {
this.out.end();
}
}
}
48 changes: 48 additions & 0 deletions tests/operator/flatten.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import xs from '../../src/index';
import * as assert from 'assert';

describe('Stream.prototype.flatten', () => {
describe('with map', () => {
it('should expand each interval event with 3 sync events', (done) => {
const stream = xs.interval(100).take(3)
.map(i => xs.from([1 + i, 2 + i, 3 + i]))
.flatten();
const expected = [1, 2, 3, 2, 3, 4, 3, 4, 5];
const observer = {
next: (x: number) => {
assert.equal(x, expected.shift());
if (expected.length === 0) {
stream.unsubscribe(observer);
done();
}
},
error: done.fail,
end: done.fail,
};
stream.subscribe(observer);
});

it('should expand 3 sync events as an interval each', (done) => {
const stream = xs.from([0, 1, 2])
.map(i => xs.interval(100 * i).take(2).map(x => `${i}${x}`))
.flatten();
// ---|---|---|---|---|---|
// ---00--01
// -------10------11
// -----------20----------21
const expected = ['00', '01', '10', '20', '11', '21'];
const observer = {
next: (x: number) => {
assert.equal(x, expected.shift());
if (expected.length === 0) {
stream.unsubscribe(observer);
done();
}
},
error: (err: any) => done(err),
end: () => done(new Error('No end() should be called')),
};
stream.subscribe(observer);
});
});
});

0 comments on commit 4800873

Please sign in to comment.