diff --git a/docs/stream.md b/docs/stream.md index be78d0a80..d49b1fd2f 100644 --- a/docs/stream.md +++ b/docs/stream.md @@ -8,7 +8,7 @@ - [Stream.scan](#streamscan) - [Stream.scanMerge](#streamscanmerge) - [Stream.lift](#streamlift) - - [Stream.HALT](#streamhalt) + - [Stream.SKIP](#streamskip) - [Stream["fantasy-land/of"]](#streamfantasy-landof) - [Instance members](#instance-members) - [stream.map](#streammap) @@ -121,13 +121,13 @@ Argument | Type | Required | Description Creates a new stream with the results of calling the function on every value in the stream with an accumulator and the incoming value. -Note that you can prevent dependent streams from being updated by returning the special value `stream.HALT` inside the accumulator function. +Note that you can prevent dependent streams from being updated by returning the special value `stream.SKIP` inside the accumulator function. `stream = Stream.scan(fn, accumulator, stream)` Argument | Type | Required | Description ------------- | -------------------------------- | -------- | --- -`fn` | `(accumulator, value) -> result \| HALT` | Yes | A function that takes an accumulator and value parameter and returns a new accumulator value +`fn` | `(accumulator, value) -> result \| SKIP` | Yes | A function that takes an accumulator and value parameter and returns a new accumulator value of the same type `accumulator` | `any` | Yes | The starting value for the accumulator `stream` | `Stream` | Yes | Stream containing the values **returns** | `Stream` | | Returns a new stream containing the result @@ -183,9 +183,9 @@ Argument | Type | Required | Description --- -##### Stream.HALT +##### Stream.SKIP -A special value that can be returned to stream callbacks to halt execution of downstreams +A special value that can be returned to stream callbacks to skip execution of downstreams --- @@ -375,14 +375,14 @@ console.log(doubled()) // logs 2 Dependent streams are *reactive*: their values are updated any time the value of their parent stream is updated. This happens regardless of whether the dependent stream was created before or after the value of the parent stream was set. -You can prevent dependent streams from being updated by returning the special value `stream.HALT` +You can prevent dependent streams from being updated by returning the special value `stream.SKIP` ```javascript -var halted = stream(1).map(function(value) { - return stream.HALT +var skipped = stream(1).map(function(value) { + return stream.SKIP }) -halted.map(function() { +skipped.map(function() { // never runs }) ``` @@ -432,14 +432,14 @@ console.log(added()) // logs 12 A stream can depend on any number of streams and it's guaranteed to update atomically. For example, if a stream A has two dependent streams B and C, and a fourth stream D is dependent on both B and C, the stream D will only update once if the value of A changes. This guarantees that the callback for stream D is never called with unstable values such as when B has a new value but C has the old value. Atomicity also brings the performance benefits of not recomputing downstreams unnecessarily. -You can prevent dependent streams from being updated by returning the special value `stream.HALT` +You can prevent dependent streams from being updated by returning the special value `stream.SKIP` ```javascript -var halted = stream.combine(function(stream) { - return stream.HALT +var skipped = stream.combine(function(stream) { + return stream.SKIP }, [stream(1)]) -halted.map(function() { +skipped.map(function() { // never runs }) ``` diff --git a/stream/change-log.md b/stream/change-log.md index 45ccd0da8..a7c5bf7b9 100644 --- a/stream/change-log.md +++ b/stream/change-log.md @@ -1,6 +1,8 @@ # Change log for stream ## 2.0.0 +- renamed HALT to SKIP [#2207](https://github.com/MithrilJS/mithril.js/pull/2207) +- rewrote implementation [#2207](https://github.com/MithrilJS/mithril.js/pull/2207) - stream: Removed `valueOf` & `toString` methods ([#2150](https://github.com/MithrilJS/mithril.js/pull/2150) ## 1.1.0 diff --git a/stream/stream.js b/stream/stream.js index d51520cb6..4a703ce98 100644 --- a/stream/stream.js +++ b/stream/stream.js @@ -2,150 +2,140 @@ ;(function() { "use strict" /* eslint-enable */ - -var guid = 0, HALT = {} -function createStream() { - function stream() { - if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, arguments[0]) - return stream._state.value +Stream.SKIP = {} +Stream.lift = lift +Stream.scan = scan +Stream.merge = merge +Stream.combine = combine +Stream.scanMerge = scanMerge +Stream["fantasy-land/of"] = Stream + +let warnedHalt = false +Object.defineProperty(Stream, "HALT", { + get: function() { + warnedHalt && console.log("HALT is deprecated and has been renamed to SKIP"); + warnedHalt = true + return Stream.SKIP } - initStream(stream) +}) - if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, arguments[0]) +function Stream(value) { + var dependentStreams = [] + var dependentFns = [] - return stream -} -function initStream(stream) { - stream.constructor = createStream - stream._state = {id: guid++, value: undefined, state: 0, derive: undefined, recover: undefined, deps: {}, parents: [], endStream: undefined, unregister: undefined} - stream.map = stream["fantasy-land/map"] = map, stream["fantasy-land/ap"] = ap, stream["fantasy-land/of"] = createStream - stream.toJSON = toJSON - - Object.defineProperties(stream, { - end: {get: function() { - if (!stream._state.endStream) { - var endStream = createStream() - endStream.map(function(value) { - if (value === true) { - unregisterStream(stream) - endStream._state.unregister = function(){unregisterStream(endStream)} - } - return value - }) - stream._state.endStream = endStream - } - return stream._state.endStream - }} - }) -} -function updateStream(stream, value) { - updateState(stream, value) - for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false) - if (stream._state.unregister != null) stream._state.unregister() - finalize(stream) -} -function updateState(stream, value) { - stream._state.value = value - stream._state.changed = true - if (stream._state.state !== 2) stream._state.state = 1 -} -function updateDependency(stream, mustSync) { - var state = stream._state, parents = state.parents - if (parents.length > 0 && parents.every(active) && (mustSync || parents.some(changed))) { - var value = stream._state.derive() - if (value === HALT) return unregisterStream(stream) - updateState(stream, value) + function stream(v) { + if (arguments.length && v !== Stream.SKIP && open(stream)) { + value = v + stream.changing() + stream.state = "active" + dependentStreams.forEach(function(s, i) { s(dependentFns[i](value)) }) + } + + return value } -} -function finalize(stream) { - stream._state.changed = false - for (var id in stream._state.deps) stream._state.deps[id]._state.changed = false -} -function combine(fn, streams) { - if (!streams.every(valid)) throw new Error("Ensure that each item passed to stream.combine/merge/lift is a stream") - return initDependency(createStream(), streams, function() { - return fn.apply(this, streams.concat([streams.filter(changed)])) - }) -} + stream.constructor = Stream + stream.state = arguments.length && value !== Stream.SKIP ? "active" : "pending" -function initDependency(dep, streams, derive) { - var state = dep._state - state.derive = derive - state.parents = streams.filter(notEnded) + stream.changing = function() { + open(stream) && (stream.state = "changing") + dependentStreams.forEach(function(s) { + s.dependent && s.dependent.changing() + s.changing() + }) + } - registerDependency(dep, state.parents) - updateDependency(dep, true) + stream.map = function(fn, ignoreInitial) { + var target = stream.state === "active" && ignoreInitial !== Stream.SKIP + ? Stream(fn(value)) + : Stream() - return dep -} -function registerDependency(stream, parents) { - for (var i = 0; i < parents.length; i++) { - parents[i]._state.deps[stream._state.id] = stream - registerDependency(stream, parents[i]._state.parents) + dependentStreams.push(target) + dependentFns.push(fn) + return target } -} -function unregisterStream(stream) { - for (var i = 0; i < stream._state.parents.length; i++) { - var parent = stream._state.parents[i] - delete parent._state.deps[stream._state.id] - } - for (var id in stream._state.deps) { - var dependent = stream._state.deps[id] - var index = dependent._state.parents.indexOf(stream) - if (index > -1) dependent._state.parents.splice(index, 1) + + let end + function createEnd() { + end = Stream() + end.map(function(value) { + if (value === true) { + stream.state = "ended" + dependentStreams.length = dependentFns.length = 0 + } + return value + }) + return end } - stream._state.state = 2 //ended - stream._state.deps = {} -} -function map(fn) {return combine(function(stream) {return fn(stream())}, [this])} -function ap(stream) {return combine(function(s1, s2) {return s1()(s2())}, [stream, this])} -function toJSON() {return this._state.value != null && typeof this._state.value.toJSON === "function" ? this._state.value.toJSON() : this._state.value} + stream.toJSON = function() { return value != null && typeof value.toJSON === "function" ? value.toJSON() : value } -function valid(stream) {return stream._state } -function active(stream) {return stream._state.state === 1} -function changed(stream) {return stream._state.changed} -function notEnded(stream) {return stream._state.state !== 2} + stream["fantasy-land/map"] = stream.map + stream["fantasy-land/ap"] = function(x) { return combine(function(s1, s2) { return s1()(s2()) }, [x, stream]) } -function merge(streams) { - return combine(function() { - return streams.map(function(s) {return s()}) - }, streams) + Object.defineProperty(stream, "end", { + get: function() { return end || createEnd() } + }) + + return stream } -function scan(reducer, seed, stream) { - var newStream = combine(function (s) { - var next = reducer(seed, s._state.value) - if (next !== HALT) return seed = next - return HALT - }, [stream]) +function combine(fn, streams) { + var ready = streams.every(function(s) { + if (s.constructor !== Stream) + throw new Error("Ensure that each item passed to stream.combine/stream.merge/lift is a stream") + return s.state === "active" + }) + var stream = ready + ? Stream(fn.apply(null, streams.concat([streams]))) + : Stream() + + let changed = [] + + streams.forEach(function(s) { + s.map(function(value) { + changed.push(s) + if (ready || streams.every(function(s) { return s.state !== "pending" })) { + ready = true + stream(fn.apply(null, streams.concat([changed]))) + changed = [] + } + return value + }, Stream.SKIP).parent = stream + }) - if (newStream._state.state === 0) newStream(seed) + return stream +} - return newStream +function merge(streams) { + return combine(function() { return streams.map(function(s) { return s() }) }, streams) } -function scanMerge(tuples, seed) { - var streams = tuples.map(function(tuple) { - var stream = tuple[0] - if (stream._state.state === 0) stream(undefined) - return stream +function scan(fn, acc, origin) { + var stream = origin.map(function(v) { + acc = fn(acc, v) + return acc }) + stream(acc) + return stream +} - var newStream = combine(function() { - var changed = arguments[arguments.length - 1] +function scanMerge(tuples, seed) { + var streams = tuples.map(function(tuple) { return tuple[0] }) - streams.forEach(function(stream, idx) { - if (changed.indexOf(stream) > -1) { - seed = tuples[idx][1](seed, stream._state.value) - } + var stream = combine(function() { + var changed = arguments[arguments.length - 1] + streams.forEach(function(stream, i) { + if (changed.indexOf(stream) > -1) + seed = tuples[i][1](seed, stream()) }) return seed }, streams) - return newStream + stream(seed) + + return stream } function lift() { @@ -156,16 +146,12 @@ function lift() { }) } -createStream["fantasy-land/of"] = createStream -createStream.merge = merge -createStream.combine = combine -createStream.scan = scan -createStream.scanMerge = scanMerge -createStream.lift = lift -createStream.HALT = HALT - -if (typeof module !== "undefined") module["exports"] = createStream -else if (typeof window.m === "function" && !("stream" in window.m)) window.m.stream = createStream -else window.m = {stream : createStream} +function open(s) { + return s.state === "pending" || s.state === "active" || s.state === "changing" +} + +if (typeof module !== "undefined") module["exports"] = Stream +else if (typeof window.m === "function" && !("stream" in window.m)) window.m.stream = Stream +else window.m = {stream : Stream} }()); diff --git a/stream/tests/test-scan.js b/stream/tests/test-scan.js index 2f2c2bb2a..04423729c 100644 --- a/stream/tests/test-scan.js +++ b/stream/tests/test-scan.js @@ -31,7 +31,7 @@ o.spec("scan", function() { o(result[3]).deepEquals({a: 1}) }) - o("reducer can return HALT to prevent child updates", function() { + o("reducer can return SKIP to prevent child updates", function() { var count = 0 var action = stream() var store = stream.scan(function (arr, value) { @@ -39,7 +39,7 @@ o.spec("scan", function() { case "number": return arr.concat(value) default: - return stream.HALT + return stream.SKIP } }, [], action) var child = store.map(function (p) { diff --git a/stream/tests/test-stream.js b/stream/tests/test-stream.js index 2ef0ef94e..99e59b5cf 100644 --- a/stream/tests/test-stream.js +++ b/stream/tests/test-stream.js @@ -30,6 +30,43 @@ o.spec("stream", function() { o(stream()()).equals(1) }) + o("can SKIP", function() { + var a = Stream(2) + var b = a.map(function(value) { + return value === 5 + ? Stream.SKIP + : value + }) + + a(5) + + o(b()).equals(2) + }) + o("can HALT", function() { + var a = Stream(2) + var b = a.map(function(value) { + return value === 5 + ? Stream.HALT + : value + }) + + a(5) + + o(b()).equals(2) + }) + o("warns HALT deprecated", function() { + var log = console.log + var warning = "" + console.log = function(a) { + warning = a + } + + Stream.HALT + + console.log = log + + o(warning).equals("HALT is deprecated and has been renamed to SKIP") + }) }) o.spec("combine", function() { o("transforms value", function() { @@ -87,6 +124,7 @@ o.spec("stream", function() { o(d()).equals(15) o(count).equals(1) }) + o("combines default value atomically", function() { var count = 0 var a = Stream(3) @@ -100,6 +138,21 @@ o.spec("stream", function() { o(d()).equals(15) o(count).equals(1) }) + o("combines and maps nested streams atomically", function() { + var count = 0 + var a = Stream(3) + var b = Stream.combine(function(a) {return a() * 2}, [a]) + var c = Stream.combine(function(a) {return a() * a()}, [a]) + var d = c.map(function(x){return x}) + var e = Stream.combine(function(x) {return x()}, [d]) + var f = Stream.combine(function(b, e) { + count++ + return b() + e() + }, [b, e]) + + o(f()).equals(15) + o(count).equals(1) + }) o("combine lists only changed upstreams in last arg", function() { var streams = [] var a = Stream() @@ -111,8 +164,22 @@ o.spec("stream", function() { a(3) b(5) - o(streams.length).equals(1) - o(streams[0]).equals(b) + o(streams.length).equals(2) + o(streams[0]).equals(a) + o(streams[1]).equals(b) + }) + o("combine continues with ended streams", function() { + var a = Stream() + var b = Stream() + var combined = Stream.combine(function(a, b) { + return a() + b() + }, [a, b]) + + a(3) + a.end(true) + b(5) + + o(combined()).equals(8) }) o("combine lists only changed upstreams in last arg with default value", function() { var streams = [] @@ -151,11 +218,11 @@ o.spec("stream", function() { o(b()()).equals(undefined) }) - o("combine can halt", function() { + o("combine can skip", function() { var count = 0 var a = Stream(1) var b = Stream.combine(function() { - return Stream.HALT + return Stream.SKIP }, [a])["fantasy-land/map"](function() { count++ return 1 @@ -164,13 +231,13 @@ o.spec("stream", function() { o(b()).equals(undefined) o(count).equals(0) }) - o("combine can conditionaly halt", function() { + o("combine can conditionaly skip", function() { var count = 0 - var halt = false + var skip = false var a = Stream(1) var b = Stream.combine(function(a) { - if (halt) { - return Stream.HALT + if (skip) { + return Stream.SKIP } return a() }, [a])["fantasy-land/map"](function(a) { @@ -179,7 +246,7 @@ o.spec("stream", function() { }) o(b()).equals(1) o(count).equals(1) - halt = true + skip = true count = 0 a(2) o(b()).equals(1) @@ -554,7 +621,7 @@ o.spec("stream", function() { }) o.spec("applicative", function() { o("identity", function() { - var a = Stream()["fantasy-land/of"](function(value) {return value}) + var a = Stream["fantasy-land/of"](function(value) {return value}) var v = Stream(5) o(v["fantasy-land/ap"](a)()).equals(5) @@ -565,16 +632,16 @@ o.spec("stream", function() { var f = function(value) {return value * 2} var x = 3 - o(a["fantasy-land/of"](x)["fantasy-land/ap"](a["fantasy-land/of"](f))()).equals(6) - o(a["fantasy-land/of"](x)["fantasy-land/ap"](a["fantasy-land/of"](f))()).equals(a["fantasy-land/of"](f(x))()) + o(a.constructor["fantasy-land/of"](x)["fantasy-land/ap"](a.constructor["fantasy-land/of"](f))()).equals(6) + o(a.constructor["fantasy-land/of"](x)["fantasy-land/ap"](a.constructor["fantasy-land/of"](f))()).equals(a.constructor["fantasy-land/of"](f(x))()) }) o("interchange", function() { var u = Stream(function(value) {return value * 2}) var a = Stream() var y = 3 - o(a["fantasy-land/of"](y)["fantasy-land/ap"](u)()).equals(6) - o(a["fantasy-land/of"](y)["fantasy-land/ap"](u)()).equals(u["fantasy-land/ap"](a["fantasy-land/of"](function(f) {return f(y)}))()) + o(a.constructor["fantasy-land/of"](y)["fantasy-land/ap"](u)()).equals(6) + o(a.constructor["fantasy-land/of"](y)["fantasy-land/ap"](u)()).equals(u["fantasy-land/ap"](a.constructor["fantasy-land/of"](function(f) {return f(y)}))()) }) }) })