Skip to content

Commit

Permalink
feat(flattenConcurrently): rename flatten to flattenConcurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros committed Mar 26, 2016
1 parent 0788da8 commit b3a87ee
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 11 deletions.
6 changes: 3 additions & 3 deletions src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {DebugOperator} from './operator/DebugOperator';
import {FoldOperator} from './operator/FoldOperator';
import {LastOperator} from './operator/LastOperator';
import {StartWithOperator} from './operator/StartWithOperator';
import {FlattenOperator} from './operator/FlattenOperator';
import {FlattenConcurrentlyOperator} from './operator/FlattenConcurrentlyOperator';
import {
CombineProducer,
CombineInstanceSignature,
Expand Down Expand Up @@ -166,8 +166,8 @@ export class Stream<T> implements Listener<T> {
return new Stream<T>(new StartWithOperator(this, x));
}

flatten<R, T extends Stream<R>>(): T {
return <T> new Stream<R>(new FlattenOperator(<Stream<Stream<R>>> (<any> this)));
flattenConcurrently<R, T extends Stream<R>>(): T {
return <T> new Stream<R>(new FlattenConcurrentlyOperator(<Stream<Stream<R>>> (<any> this)));
}

merge(other: Stream<T>): Stream<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {MapOperator} from './MapOperator';

export class Inner<T> implements Listener<T> {
constructor(public out: Stream<T>,
public op: FlattenOperator<T>) {
public op: FlattenConcurrentlyOperator<T>) {
}

next(t: T) {
Expand All @@ -24,7 +24,7 @@ export class Inner<T> implements Listener<T> {

export class Outer<T> implements Listener<Stream<T>> {
constructor(public out: Stream<T>,
public op: FlattenOperator<T>) {
public op: FlattenConcurrentlyOperator<T>) {
}

next(s: Stream<T>) {
Expand All @@ -43,7 +43,7 @@ export class Outer<T> implements Listener<Stream<T>> {
export class MapOuter<T> implements Listener<T> {
constructor(public out: Stream<T>,
public pr: (t: T) => Stream<T>,
public op: FlattenOperator<T>) { // pr = project
public op: FlattenConcurrentlyOperator<T>) { // pr = project
}

next(v: T) {
Expand All @@ -60,7 +60,7 @@ export class MapOuter<T> implements Listener<T> {
}
}

export class FlattenOperator<T> implements Operator<Stream<T>, T> {
export class FlattenConcurrentlyOperator<T> implements Operator<Stream<T>, T> {
public proxy: Listener<T | Stream<T>> = emptyListener;
public mapOp: MapOperator<T, Stream<T>>;
public active: number = 1; // number of outers and inners that have not yet ended
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import xs from '../../src/index';
import * as assert from 'assert';

describe('Stream.prototype.flatten', () => {
describe('Stream.prototype.flattenConcurrently', () => {
describe('with map', () => {
it('should expand each interval event with 3 sync events', (done) => {
const stream = xs.interval(100).take(3)
.map(i => xs.from([1 + i, 2 + i, 3 + i]))
.flatten();
.flattenConcurrently();
const expected = [1, 2, 3, 2, 3, 4, 3, 4, 5];
const listener = {
next: (x: number) => {
Expand All @@ -25,8 +25,8 @@ describe('Stream.prototype.flatten', () => {
it('should expand 3 sync events as an interval each', (done) => {
const stream = xs.from([0, 1, 2])
.map(i => xs.interval(100 * i).take(2).map(x => `${i}${x}`))
.flatten();
// ---|---|---|---|---|---|
.flattenConcurrently();
// ---x---x---x---x---x---x
// ---00--01
// -------10------11
// -----------20----------21
Expand Down

0 comments on commit b3a87ee

Please sign in to comment.