Skip to content

Commit

Permalink
fix(flatten): do not restart inner stream if equals the previous inner
Browse files Browse the repository at this point in the history
Reverts the change done in commit 819bc94, which is problematic due
to issue #104. This commit here will fix issues like #103 and similar. It reverts xstream's behavior
back to what it was in v5.3.1.

Closes issue #103.
  • Loading branch information
staltz committed Aug 15, 2016
1 parent cadf73a commit 9973eca
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 11 deletions.
2 changes: 0 additions & 2 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,6 @@ export class FlattenOperator<T> implements Operator<Stream<T>, T> {
const u = this.out;
if (u === NO) return;
const {inner, il} = this;
if (s === inner && s._prod !== NO) s._stopNow();
if (inner !== NO && il !== NO_IL) inner._remove(il);
(this.inner = s)._add(this.il = new FlattenListener(u, this));
}
Expand Down Expand Up @@ -840,7 +839,6 @@ export class MapFlattenOperator<T, R> implements Operator<T, R> {
u._e(e);
return;
}
if (s === inner && s._prod !== NO) s._stopNow();
if (inner !== NO && il !== NO_IL) inner._remove(il);
(this.inner = s)._add(this.il = new MapFlattenInner(u, this));
}
Expand Down
18 changes: 9 additions & 9 deletions tests/operator/flatten.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ import * as assert from 'assert';

describe('Stream.prototype.flatten', () => {
describe('with map+debug to break the fusion', () => {
it('should restart inner stream if switching to the same inner stream', (done) => {
it('should not restart inner stream if switching to the same inner stream', (done) => {
const outer = fromDiagram('-A---------B----------C--------|');
const nums = fromDiagram( '-a-b-c-----------------------|', {
values: {a: 1, b: 2, c: 3}
});
const inner = nums.fold((acc, x) => acc + x, 0);
const inner = nums.map(x => 10 * x);

const stream = outer.map(() => inner).debug(() => { }).flatten();

const expected = [0, 1, 3, 6, 0, 1, 3, 6, 0, 1, 3, 6];
const expected = [10, 20, 30];

stream.addListener({
next: (x: number) => {
Expand Down Expand Up @@ -241,16 +241,16 @@ describe('Stream.prototype.flatten', () => {
});
});

it('should restart inner stream if switching to the same inner stream', (done) => {
it('should not restart inner stream if switching to the same inner stream', (done) => {
const outer = fromDiagram('-A---------B----------C--------|');
const nums = fromDiagram( '-a-b-c-----------------------|', {
values: {a: 1, b: 2, c: 3}
});
const inner = nums.fold((acc, x) => acc + x, 0);
const inner = nums.map(x => 10 * x);

const stream = outer.map(() => inner).flatten();

const expected = [0, 1, 3, 6, 0, 1, 3, 6, 0, 1, 3, 6];
const expected = [10, 20, 30];

stream.addListener({
next: (x: number) => {
Expand Down Expand Up @@ -309,16 +309,16 @@ describe('Stream.prototype.flatten', () => {
done();
});

it('should restart inner stream if switching to the same inner stream', (done) => {
it('should not restart inner stream if switching to the same inner stream', (done) => {
const outer = fromDiagram('-A---------B----------C--------|');
const nums = fromDiagram( '-a-b-c-----------------------|', {
values: {a: 1, b: 2, c: 3}
});
const inner = nums.fold((acc, x) => acc + x, 0);
const inner = nums.map(x => 10 * x);

const stream = outer.mapTo(inner).flatten();

const expected = [0, 1, 3, 6, 0, 1, 3, 6, 0, 1, 3, 6];
const expected = [10, 20, 30];

stream.addListener({
next: (x: number) => {
Expand Down

0 comments on commit 9973eca

Please sign in to comment.