Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): improve signature of operators regarding types #43

Merged
merged 1 commit into from
Jun 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 40 additions & 12 deletions dist/xstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,9 @@ var Stream = (function () {
}
}
};
Stream.prototype.ctor = function () {
return this instanceof MemoryStream ? MemoryStream : Stream;
};

Stream.create = function (producer) {
if (producer) {
Expand Down Expand Up @@ -1008,19 +1011,23 @@ var Stream = (function () {
}
return new Stream(new MergeProducer(streams));
};

Stream.prototype.map = function (project) {
Stream.prototype._map = function (project) {
var p = this._prod;
var ctor = this.ctor();
if (p instanceof FilterOperator) {
return new Stream(new FilterMapOperator(p.passes, project, p.ins));
return new ctor(new FilterMapOperator(p.passes, project, p.ins));
}
if (p instanceof FilterMapOperator) {
return new Stream(new FilterMapOperator(p.passes, compose2(project, p.project), p.ins));
return new ctor(new FilterMapOperator(p.passes, compose2(project, p.project), p.ins));
}
if (p instanceof MapOperator) {
return new Stream(new MapOperator(compose2(project, p.project), p.ins));
return new ctor(new MapOperator(compose2(project, p.project), p.ins));
}
return new Stream(new MapOperator(project, this));
return new ctor(new MapOperator(project, this));
};

Stream.prototype.map = function (project) {
return this._map(project);
};

Stream.prototype.mapTo = function (projectedValue) {
Expand All @@ -1039,7 +1046,7 @@ var Stream = (function () {
};

Stream.prototype.take = function (amount) {
return new Stream(new TakeOperator(amount, this));
return new (this.ctor())(new TakeOperator(amount, this));
};

Stream.prototype.drop = function (amount) {
Expand All @@ -1051,19 +1058,19 @@ var Stream = (function () {
};

Stream.prototype.startWith = function (initial) {
return new Stream(new StartWithOperator(this, initial));
return new MemoryStream(new StartWithOperator(this, initial));
};

Stream.prototype.endWhen = function (other) {
return new Stream(new EndWhenOperator(other, this));
return new (this.ctor())(new EndWhenOperator(other, this));
};

Stream.prototype.fold = function (accumulate, seed) {
return new Stream(new FoldOperator(accumulate, seed, this));
return new MemoryStream(new FoldOperator(accumulate, seed, this));
};

Stream.prototype.replaceError = function (replace) {
return new Stream(new ReplaceErrorOperator(replace, this));
return new (this.ctor())(new ReplaceErrorOperator(replace, this));
};

Stream.prototype.flatten = function () {
Expand All @@ -1090,7 +1097,7 @@ var Stream = (function () {
};

Stream.prototype.debug = function (labelOrSpy) {
return new Stream(new DebugOperator(labelOrSpy, this));
return new (this.ctor())(new DebugOperator(labelOrSpy, this));
};

Stream.prototype.shamefullySendNext = function (value) {
Expand Down Expand Up @@ -1134,6 +1141,9 @@ var MimicStream = (function (_super) {
};

MimicStream.prototype.imitate = function (other) {
if (other instanceof MemoryStream) {
throw new Error('bad');
}
this._target = other;
};
return MimicStream;
Expand All @@ -1160,6 +1170,24 @@ var MemoryStream = (function (_super) {
this._has = false;
_super.prototype._x.call(this);
};
MemoryStream.prototype.map = function (project) {
return this._map(project);
};
MemoryStream.prototype.mapTo = function (projectedValue) {
return _super.prototype.mapTo.call(this, projectedValue);
};
MemoryStream.prototype.take = function (amount) {
return _super.prototype.take.call(this, amount);
};
MemoryStream.prototype.endWhen = function (other) {
return _super.prototype.endWhen.call(this, other);
};
MemoryStream.prototype.replaceError = function (replace) {
return _super.prototype.replaceError.call(this, replace);
};
MemoryStream.prototype.debug = function (labelOrSpy) {
return _super.prototype.debug.call(this, labelOrSpy);
};
return MemoryStream;
}(Stream));
exports.MemoryStream = MemoryStream;
Expand Down
2 changes: 1 addition & 1 deletion dist/xstream.min.js

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions markdown/footer.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ The operators and factories listed above are the core functions. `xstream` has p

# FAQ

**Q: Why does `imitate()` support a Stream but not a MemoryStream?**

A: MemoryStreams are meant for representing "values over time" (your age), while Streams represent simply events (your birthdays). MemoryStreams are usually initialized with a value, and `imitate()` is meant for creating circular dependencies of streams. If we would attempt to imitate a MemoryStream in a circular dependency, we would either get a race condition (where the symptom would be "nothing happens") or an infinite cyclic emission of values.

If you find yourself wanting to use `imitate()` with a MemoryStream, you should rework your code around `imitate()` to use a Stream instead. Look for the stream in the circular dependency that represents an event stream, and that would be a candidate for creating a MimicStream which then imitates the real event stream.

**Q: What's the difference between xstream and RxJS?**

A: Read this [blog post](http://staltz.com/why-we-built-xstream.html) on the topic.
Expand Down
101 changes: 70 additions & 31 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,10 @@ export class Stream<T> implements InternalListener<T> {
}
}

private ctor(): typeof Stream {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very clever! 👍

return this instanceof MemoryStream ? MemoryStream : Stream;
}

/**
* Creates a new Stream given a Producer.
*
Expand Down Expand Up @@ -1295,46 +1299,51 @@ export class Stream<T> implements InternalListener<T> {
return new Stream<R>(new CombineProducer<R>(project, streams));
};

/**
* Transforms each event from the input Stream through a `project` function,
* to get a Stream that emits those transformed events.
*
* Marble diagram:
*
* ```text
* --1---3--5-----7------
* map(i => i * 10)
* --10--30-50----70-----
* ```
*
* @param {Function} project A function of type `(t: T) => U` that takes event
* `t` of type `T` from the input Stream and produces an event of type `U`, to
* be emitted on the output Stream.
* @return {Stream}
*/
map<U>(project: (t: T) => U): Stream<U> {
protected _map<U>(project: (t: T) => U): Stream<U> | MemoryStream<U> {
const p = this._prod;
const ctor = this.ctor();
if (p instanceof FilterOperator) {
return new Stream<U>(new FilterMapOperator(
return new ctor<U>(new FilterMapOperator(
(<FilterOperator<T>> p).passes,
project,
(<FilterOperator<T>> p).ins
));
}
if (p instanceof FilterMapOperator) {
return new Stream<U>(new FilterMapOperator(
return new ctor<U>(new FilterMapOperator(
(<FilterMapOperator<T, T>> p).passes,
compose2(project, (<FilterMapOperator<T, T>> p).project),
(<FilterMapOperator<T, T>> p).ins
));
}
if (p instanceof MapOperator) {
return new Stream<U>(new MapOperator(
return new ctor<U>(new MapOperator(
compose2(project, (<MapOperator<T, T>> p).project),
(<MapOperator<T, T>> p).ins
));
}
return new Stream<U>(new MapOperator(project, this));
return new ctor<U>(new MapOperator(project, this));
}

/**
* Transforms each event from the input Stream through a `project` function,
* to get a Stream that emits those transformed events.
*
* Marble diagram:
*
* ```text
* --1---3--5-----7------
* map(i => i * 10)
* --10--30-50----70-----
* ```
*
* @param {Function} project A function of type `(t: T) => U` that takes event
* `t` of type `T` from the input Stream and produces an event of type `U`, to
* be emitted on the output Stream.
* @return {Stream}
*/
map<U>(project: (t: T) => U): Stream<U> {
return this._map(project);
}

/**
Expand Down Expand Up @@ -1408,7 +1417,7 @@ export class Stream<T> implements InternalListener<T> {
* @return {Stream}
*/
take(amount: number): Stream<T> {
return new Stream<T>(new TakeOperator(amount, this));
return new (this.ctor())<T>(new TakeOperator(amount, this));
}

/**
Expand Down Expand Up @@ -1465,8 +1474,8 @@ export class Stream<T> implements InternalListener<T> {
* @param initial The value or event to prepend.
* @return {Stream}
*/
startWith(initial: T): Stream<T> {
return new Stream<T>(new StartWithOperator(this, initial));
startWith(initial: T): MemoryStream<T> {
return new MemoryStream<T>(new StartWithOperator(this, initial));
}

/**
Expand All @@ -1489,7 +1498,7 @@ export class Stream<T> implements InternalListener<T> {
* @return {Stream}
*/
endWhen(other: Stream<any>): Stream<T> {
return new Stream<T>(new EndWhenOperator(other, this));
return new (this.ctor())<T>(new EndWhenOperator(other, this));
}

/**
Expand Down Expand Up @@ -1520,8 +1529,8 @@ export class Stream<T> implements InternalListener<T> {
* @param seed The initial accumulated value, of type `R`.
* @return {Stream}
*/
fold<R>(accumulate: (acc: R, t: T) => R, seed: R): Stream<R> {
return new Stream<R>(new FoldOperator(accumulate, seed, this));
fold<R>(accumulate: (acc: R, t: T) => R, seed: R): MemoryStream<R> {
return new MemoryStream<R>(new FoldOperator(accumulate, seed, this));
}

/**
Expand All @@ -1548,7 +1557,7 @@ export class Stream<T> implements InternalListener<T> {
* @return {Stream}
*/
replaceError(replace: (err: any) => Stream<T>): Stream<T> {
return new Stream<T>(new ReplaceErrorOperator(replace, this));
return new (this.ctor())<T>(new ReplaceErrorOperator(replace, this));
}

/**
Expand Down Expand Up @@ -1700,7 +1709,7 @@ export class Stream<T> implements InternalListener<T> {
* @return {Stream}
*/
debug(labelOrSpy?: string | ((t: T) => void)): Stream<T> {
return new Stream<T>(new DebugOperator(labelOrSpy, this));
return new (this.ctor())<T>(new DebugOperator(labelOrSpy, this));
}

/**
Expand Down Expand Up @@ -1770,9 +1779,15 @@ export class MimicStream<T> extends Stream<T> {
* the current stream, making it re-emit whatever events are emitted by the
* given `other` stream.
*
* @param {Stream} other The stream to imitate on the current one.
* @param {Stream} other The stream to imitate on the current one. Must not be
* a MemoryStream.
*/
imitate(other: Stream<T>): void {
if (other instanceof MemoryStream) {
throw new Error('A MemoryStream was given to imitate(), but it only ' +
'supports a Stream. Read more about this restriction here: ' +
'https://github.com/staltz/xstream#faq');
}
this._target = other;
}
}
Expand All @@ -1799,6 +1814,30 @@ export class MemoryStream<T> extends Stream<T> {
this._has = false;
super._x();
}

map<U>(project: (t: T) => U): MemoryStream<U> {
return <MemoryStream<U>> this._map(project);
}

mapTo<U>(projectedValue: U): MemoryStream<U> {
return <MemoryStream<U>> super.mapTo(projectedValue);
}

take(amount: number): MemoryStream<T> {
return <MemoryStream<T>> super.take(amount);
}

endWhen(other: Stream<any>): MemoryStream<T> {
return <MemoryStream<T>> super.endWhen(other);
}

replaceError(replace: (err: any) => Stream<T>): MemoryStream<T> {
return <MemoryStream<T>> super.replaceError(replace);
}

debug(labelOrSpy?: string | ((t: T) => void)): MemoryStream<T> {
return <MemoryStream<T>> super.debug(labelOrSpy);
}
}

export default Stream;
2 changes: 2 additions & 0 deletions tests/extra/concat.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// <reference path="../../typings/globals/mocha/index.d.ts" />
/// <reference path="../../typings/globals/node/index.d.ts" />
import xs from '../../src/index';
import concat from '../../src/extra/concat';
import * as assert from 'assert';
Expand Down
2 changes: 2 additions & 0 deletions tests/extra/debounce.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// <reference path="../../typings/globals/mocha/index.d.ts" />
/// <reference path="../../typings/globals/node/index.d.ts" />
import xs, {Listener, Producer} from '../../src/index';
import debounce from '../../src/extra/debounce';
import * as assert from 'assert';
Expand Down
2 changes: 2 additions & 0 deletions tests/extra/delay.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// <reference path="../../typings/globals/mocha/index.d.ts" />
/// <reference path="../../typings/globals/node/index.d.ts" />
import xs from '../../src/index';
import delay from '../../src/extra/delay';
import * as assert from 'assert';
Expand Down
2 changes: 2 additions & 0 deletions tests/extra/dropRepeats.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// <reference path="../../typings/globals/mocha/index.d.ts" />
/// <reference path="../../typings/globals/node/index.d.ts" />
import xs, {Stream} from '../../src/index';
import dropRepeats from '../../src/extra/dropRepeats';
import * as assert from 'assert';
Expand Down
2 changes: 2 additions & 0 deletions tests/extra/dropUntil.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// <reference path="../../typings/globals/mocha/index.d.ts" />
/// <reference path="../../typings/globals/node/index.d.ts" />
import xs from '../../src/index';
import dropUntil from '../../src/extra/dropUntil';
import delay from '../../src/extra/delay';
Expand Down
2 changes: 2 additions & 0 deletions tests/extra/flattenConcurrently.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// <reference path="../../typings/globals/mocha/index.d.ts" />
/// <reference path="../../typings/globals/node/index.d.ts" />
import xs, {Stream, Listener} from '../../src/index';
import flattenConcurrently from '../../src/extra/flattenConcurrently';
import * as assert from 'assert';
Expand Down
2 changes: 2 additions & 0 deletions tests/extra/flattenSequentially.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// <reference path="../../typings/globals/mocha/index.d.ts" />
/// <reference path="../../typings/globals/node/index.d.ts" />
import xs from '../../src/index';
import flattenSequentially from '../../src/extra/flattenSequentially';
import * as assert from 'assert';
Expand Down
2 changes: 2 additions & 0 deletions tests/extra/fromDiagram.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// <reference path="../../typings/globals/mocha/index.d.ts" />
/// <reference path="../../typings/globals/node/index.d.ts" />
import xs from '../../src/index';
import fromDiagram from '../../src/extra/fromDiagram';
import * as assert from 'assert';
Expand Down
2 changes: 2 additions & 0 deletions tests/extra/fromEvent.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// <reference path="../../typings/globals/mocha/index.d.ts" />
/// <reference path="../../typings/globals/node/index.d.ts" />
import xs from '../../src/index';
import fromEvent from '../../src/extra/fromEvent';
import * as assert from 'assert';
Expand Down
2 changes: 2 additions & 0 deletions tests/extra/pairwise.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// <reference path="../../typings/globals/mocha/index.d.ts" />
/// <reference path="../../typings/globals/node/index.d.ts" />
import xs from '../../src/index';
import pairwise from '../../src/extra/pairwise';
import * as assert from 'assert';
Expand Down
2 changes: 2 additions & 0 deletions tests/extra/split.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// <reference path="../../typings/globals/mocha/index.d.ts" />
/// <reference path="../../typings/globals/node/index.d.ts" />
import xs, {Stream} from '../../src/index';
import split from '../../src/extra/split';
import concat from '../../src/extra/concat';
Expand Down
2 changes: 2 additions & 0 deletions tests/factory/combine.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// <reference path="../../typings/globals/mocha/index.d.ts" />
/// <reference path="../../typings/globals/node/index.d.ts" />
import xs, {Stream} from '../../src/index';
import * as assert from 'assert';

Expand Down
2 changes: 2 additions & 0 deletions tests/factory/empty.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// <reference path="../../typings/globals/mocha/index.d.ts" />
/// <reference path="../../typings/globals/node/index.d.ts" />
import xs from '../../src/index';

describe('xs.empty()', function() {
Expand Down
Loading