diff --git a/src/Stream.ts b/src/Stream.ts index 613d00d..8b19be7 100644 --- a/src/Stream.ts +++ b/src/Stream.ts @@ -8,7 +8,7 @@ import {DebugOperator} from './operator/DebugOperator'; import {FoldOperator} from './operator/FoldOperator'; import {LastOperator} from './operator/LastOperator'; import {StartWithOperator} from './operator/StartWithOperator'; -import {FlattenOperator} from './operator/FlattenOperator'; +import {FlattenConcurrentlyOperator} from './operator/FlattenConcurrentlyOperator'; import { CombineProducer, CombineInstanceSignature, @@ -166,8 +166,8 @@ export class Stream implements Listener { return new Stream(new StartWithOperator(this, x)); } - flatten>(): T { - return new Stream(new FlattenOperator(>> ( this))); + flattenConcurrently>(): T { + return new Stream(new FlattenConcurrentlyOperator(>> ( this))); } merge(other: Stream): Stream { diff --git a/src/operator/FlattenOperator.ts b/src/operator/FlattenConcurrentlyOperator.ts similarity index 86% rename from src/operator/FlattenOperator.ts rename to src/operator/FlattenConcurrentlyOperator.ts index 966d147..51f0512 100644 --- a/src/operator/FlattenOperator.ts +++ b/src/operator/FlattenConcurrentlyOperator.ts @@ -6,7 +6,7 @@ import {MapOperator} from './MapOperator'; export class Inner implements Listener { constructor(public out: Stream, - public op: FlattenOperator) { + public op: FlattenConcurrentlyOperator) { } next(t: T) { @@ -24,7 +24,7 @@ export class Inner implements Listener { export class Outer implements Listener> { constructor(public out: Stream, - public op: FlattenOperator) { + public op: FlattenConcurrentlyOperator) { } next(s: Stream) { @@ -43,7 +43,7 @@ export class Outer implements Listener> { export class MapOuter implements Listener { constructor(public out: Stream, public pr: (t: T) => Stream, - public op: FlattenOperator) { // pr = project + public op: FlattenConcurrentlyOperator) { // pr = project } next(v: T) { @@ -60,7 +60,7 @@ export class MapOuter implements Listener { } } -export class FlattenOperator implements Operator, T> { +export class FlattenConcurrentlyOperator implements Operator, T> { public proxy: Listener> = emptyListener; public mapOp: MapOperator>; public active: number = 1; // number of outers and inners that have not yet ended diff --git a/tests/operator/flatten.ts b/tests/operator/flattenConcurrently.ts similarity index 89% rename from tests/operator/flatten.ts rename to tests/operator/flattenConcurrently.ts index 8f634e3..463461b 100644 --- a/tests/operator/flatten.ts +++ b/tests/operator/flattenConcurrently.ts @@ -1,12 +1,12 @@ import xs from '../../src/index'; import * as assert from 'assert'; -describe('Stream.prototype.flatten', () => { +describe('Stream.prototype.flattenConcurrently', () => { describe('with map', () => { it('should expand each interval event with 3 sync events', (done) => { const stream = xs.interval(100).take(3) .map(i => xs.from([1 + i, 2 + i, 3 + i])) - .flatten(); + .flattenConcurrently(); const expected = [1, 2, 3, 2, 3, 4, 3, 4, 5]; const listener = { next: (x: number) => { @@ -25,8 +25,8 @@ describe('Stream.prototype.flatten', () => { it('should expand 3 sync events as an interval each', (done) => { const stream = xs.from([0, 1, 2]) .map(i => xs.interval(100 * i).take(2).map(x => `${i}${x}`)) - .flatten(); - // ---|---|---|---|---|---| + .flattenConcurrently(); + // ---x---x---x---x---x---x // ---00--01 // -------10------11 // -----------20----------21