Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite stream #2207

Merged
merged 16 commits into from
Nov 27, 2018
26 changes: 13 additions & 13 deletions docs/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
- [Stream.scan](#streamscan)
- [Stream.scanMerge](#streamscanmerge)
- [Stream.lift](#streamlift)
- [Stream.HALT](#streamhalt)
- [Stream.SKIP](#streamskip)
- [Stream["fantasy-land/of"]](#streamfantasy-landof)
- [Instance members](#instance-members)
- [stream.map](#streammap)
Expand Down Expand Up @@ -121,13 +121,13 @@ Argument | Type | Required | Description

Creates a new stream with the results of calling the function on every value in the stream with an accumulator and the incoming value.

Note that you can prevent dependent streams from being updated by returning the special value `stream.HALT` inside the accumulator function.
Note that you can prevent dependent streams from being updated by returning the special value `stream.SKIP` inside the accumulator function.

`stream = Stream.scan(fn, accumulator, stream)`

Argument | Type | Required | Description
------------- | -------------------------------- | -------- | ---
`fn` | `(accumulator, value) -> result \| HALT` | Yes | A function that takes an accumulator and value parameter and returns a new accumulator value
`fn` | `(accumulator, value) -> result \| SKIP` | Yes | A function that takes an accumulator and value parameter and returns a new accumulator value of the same type
`accumulator` | `any` | Yes | The starting value for the accumulator
`stream` | `Stream` | Yes | Stream containing the values
**returns** | `Stream` | | Returns a new stream containing the result
Expand Down Expand Up @@ -183,9 +183,9 @@ Argument | Type | Required | Description

---

##### Stream.HALT
##### Stream.SKIP

A special value that can be returned to stream callbacks to halt execution of downstreams
A special value that can be returned to stream callbacks to skip execution of downstreams

---

Expand Down Expand Up @@ -375,14 +375,14 @@ console.log(doubled()) // logs 2

Dependent streams are *reactive*: their values are updated any time the value of their parent stream is updated. This happens regardless of whether the dependent stream was created before or after the value of the parent stream was set.

You can prevent dependent streams from being updated by returning the special value `stream.HALT`
You can prevent dependent streams from being updated by returning the special value `stream.SKIP`

```javascript
var halted = stream(1).map(function(value) {
return stream.HALT
var skipped = stream(1).map(function(value) {
return stream.SKIP
})

halted.map(function() {
skipped.map(function() {
// never runs
})
```
Expand Down Expand Up @@ -432,14 +432,14 @@ console.log(added()) // logs 12

A stream can depend on any number of streams and it's guaranteed to update atomically. For example, if a stream A has two dependent streams B and C, and a fourth stream D is dependent on both B and C, the stream D will only update once if the value of A changes. This guarantees that the callback for stream D is never called with unstable values such as when B has a new value but C has the old value. Atomicity also brings the performance benefits of not recomputing downstreams unnecessarily.

You can prevent dependent streams from being updated by returning the special value `stream.HALT`
You can prevent dependent streams from being updated by returning the special value `stream.SKIP`

```javascript
var halted = stream.combine(function(stream) {
return stream.HALT
var skipped = stream.combine(function(stream) {
return stream.SKIP
}, [stream(1)])

halted.map(function() {
skipped.map(function() {
// never runs
})
```
Expand Down
2 changes: 2 additions & 0 deletions stream/change-log.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Change log for stream

## 2.0.0
- renamed HALT to SKIP [#2207](https://github.com/MithrilJS/mithril.js/pull/2207)
- rewrote implementation [#2207](https://github.com/MithrilJS/mithril.js/pull/2207)
- stream: Removed `valueOf` & `toString` methods ([#2150](https://github.com/MithrilJS/mithril.js/pull/2150)

## 1.1.0
Expand Down
242 changes: 114 additions & 128 deletions stream/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,150 +2,140 @@
;(function() {
"use strict"
/* eslint-enable */

var guid = 0, HALT = {}
function createStream() {
function stream() {
if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, arguments[0])
return stream._state.value
Stream.SKIP = {}
Stream.lift = lift
Stream.scan = scan
Stream.merge = merge
Stream.combine = combine
Stream.scanMerge = scanMerge
Stream["fantasy-land/of"] = Stream

let warnedHalt = false
Object.defineProperty(Stream, "HALT", {
get: function() {
warnedHalt && console.log("HALT is deprecated and has been renamed to SKIP");
warnedHalt = true
return Stream.SKIP
}
initStream(stream)
})

if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, arguments[0])
function Stream(value) {
var dependentStreams = []
var dependentFns = []

return stream
}
function initStream(stream) {
stream.constructor = createStream
stream._state = {id: guid++, value: undefined, state: 0, derive: undefined, recover: undefined, deps: {}, parents: [], endStream: undefined, unregister: undefined}
stream.map = stream["fantasy-land/map"] = map, stream["fantasy-land/ap"] = ap, stream["fantasy-land/of"] = createStream
stream.toJSON = toJSON

Object.defineProperties(stream, {
end: {get: function() {
if (!stream._state.endStream) {
var endStream = createStream()
endStream.map(function(value) {
if (value === true) {
unregisterStream(stream)
endStream._state.unregister = function(){unregisterStream(endStream)}
}
return value
})
stream._state.endStream = endStream
}
return stream._state.endStream
}}
})
}
function updateStream(stream, value) {
updateState(stream, value)
for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false)
if (stream._state.unregister != null) stream._state.unregister()
finalize(stream)
}
function updateState(stream, value) {
stream._state.value = value
stream._state.changed = true
if (stream._state.state !== 2) stream._state.state = 1
}
function updateDependency(stream, mustSync) {
var state = stream._state, parents = state.parents
if (parents.length > 0 && parents.every(active) && (mustSync || parents.some(changed))) {
var value = stream._state.derive()
if (value === HALT) return unregisterStream(stream)
updateState(stream, value)
function stream(v) {
if (arguments.length && v !== Stream.SKIP && open(stream)) {
value = v
stream.changing()
stream.state = "active"
dependentStreams.forEach(function(s, i) { s(dependentFns[i](value)) })
}

return value
}
}
function finalize(stream) {
stream._state.changed = false
for (var id in stream._state.deps) stream._state.deps[id]._state.changed = false
}

function combine(fn, streams) {
if (!streams.every(valid)) throw new Error("Ensure that each item passed to stream.combine/merge/lift is a stream")
return initDependency(createStream(), streams, function() {
return fn.apply(this, streams.concat([streams.filter(changed)]))
})
}
stream.constructor = Stream
stream.state = arguments.length && value !== Stream.SKIP ? "active" : "pending"

function initDependency(dep, streams, derive) {
var state = dep._state
state.derive = derive
state.parents = streams.filter(notEnded)
stream.changing = function() {
open(stream) && (stream.state = "changing")
dependentStreams.forEach(function(s) {
s.dependent && s.dependent.changing()
s.changing()
})
}

registerDependency(dep, state.parents)
updateDependency(dep, true)
stream.map = function(fn, ignoreInitial) {
var target = stream.state === "active" && ignoreInitial !== Stream.SKIP
? Stream(fn(value))
: Stream()

return dep
}
function registerDependency(stream, parents) {
for (var i = 0; i < parents.length; i++) {
parents[i]._state.deps[stream._state.id] = stream
registerDependency(stream, parents[i]._state.parents)
dependentStreams.push(target)
dependentFns.push(fn)
return target
}
}
function unregisterStream(stream) {
for (var i = 0; i < stream._state.parents.length; i++) {
var parent = stream._state.parents[i]
delete parent._state.deps[stream._state.id]
}
for (var id in stream._state.deps) {
var dependent = stream._state.deps[id]
var index = dependent._state.parents.indexOf(stream)
if (index > -1) dependent._state.parents.splice(index, 1)

let end
function createEnd() {
end = Stream()
end.map(function(value) {
if (value === true) {
stream.state = "ended"
dependentStreams.length = dependentFns.length = 0
}
return value
})
return end
}
stream._state.state = 2 //ended
stream._state.deps = {}
}

function map(fn) {return combine(function(stream) {return fn(stream())}, [this])}
function ap(stream) {return combine(function(s1, s2) {return s1()(s2())}, [stream, this])}
function toJSON() {return this._state.value != null && typeof this._state.value.toJSON === "function" ? this._state.value.toJSON() : this._state.value}
stream.toJSON = function() { return value != null && typeof value.toJSON === "function" ? value.toJSON() : value }
nordfjord marked this conversation as resolved.
Show resolved Hide resolved

function valid(stream) {return stream._state }
function active(stream) {return stream._state.state === 1}
function changed(stream) {return stream._state.changed}
function notEnded(stream) {return stream._state.state !== 2}
stream["fantasy-land/map"] = stream.map
stream["fantasy-land/ap"] = function(x) { return combine(function(s1, s2) { return s1()(s2()) }, [x, stream]) }

function merge(streams) {
return combine(function() {
return streams.map(function(s) {return s()})
}, streams)
Object.defineProperty(stream, "end", {
get: function() { return end || createEnd() }
nordfjord marked this conversation as resolved.
Show resolved Hide resolved
})

return stream
}

function scan(reducer, seed, stream) {
var newStream = combine(function (s) {
var next = reducer(seed, s._state.value)
if (next !== HALT) return seed = next
return HALT
}, [stream])
function combine(fn, streams) {
var ready = streams.every(function(s) {
if (s.constructor !== Stream)
throw new Error("Ensure that each item passed to stream.combine/stream.merge/lift is a stream")
return s.state === "active"
})
var stream = ready
? Stream(fn.apply(null, streams.concat([streams])))
: Stream()

let changed = []

streams.forEach(function(s) {
s.map(function(value) {
changed.push(s)
if (ready || streams.every(function(s) { return s.state !== "pending" })) {
ready = true
stream(fn.apply(null, streams.concat([changed])))
changed = []
}
return value
}, Stream.SKIP).parent = stream
})

if (newStream._state.state === 0) newStream(seed)
return stream
}

return newStream
function merge(streams) {
return combine(function() { return streams.map(function(s) { return s() }) }, streams)
}

function scanMerge(tuples, seed) {
var streams = tuples.map(function(tuple) {
var stream = tuple[0]
if (stream._state.state === 0) stream(undefined)
return stream
function scan(fn, acc, origin) {
var stream = origin.map(function(v) {
acc = fn(acc, v)
return acc
})
stream(acc)
return stream
}

var newStream = combine(function() {
var changed = arguments[arguments.length - 1]
function scanMerge(tuples, seed) {
var streams = tuples.map(function(tuple) { return tuple[0] })

streams.forEach(function(stream, idx) {
if (changed.indexOf(stream) > -1) {
seed = tuples[idx][1](seed, stream._state.value)
}
var stream = combine(function() {
var changed = arguments[arguments.length - 1]
streams.forEach(function(stream, i) {
if (changed.indexOf(stream) > -1)
seed = tuples[i][1](seed, stream())
})

return seed
}, streams)

return newStream
stream(seed)

return stream
}

function lift() {
Expand All @@ -156,16 +146,12 @@ function lift() {
})
}

createStream["fantasy-land/of"] = createStream
createStream.merge = merge
createStream.combine = combine
createStream.scan = scan
createStream.scanMerge = scanMerge
createStream.lift = lift
createStream.HALT = HALT

if (typeof module !== "undefined") module["exports"] = createStream
else if (typeof window.m === "function" && !("stream" in window.m)) window.m.stream = createStream
else window.m = {stream : createStream}
function open(s) {
return s.state === "pending" || s.state === "active" || s.state === "changing"
}

if (typeof module !== "undefined") module["exports"] = Stream
else if (typeof window.m === "function" && !("stream" in window.m)) window.m.stream = Stream
else window.m = {stream : Stream}

}());
4 changes: 2 additions & 2 deletions stream/tests/test-scan.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ o.spec("scan", function() {
o(result[3]).deepEquals({a: 1})
})

o("reducer can return HALT to prevent child updates", function() {
o("reducer can return SKIP to prevent child updates", function() {
var count = 0
var action = stream()
var store = stream.scan(function (arr, value) {
switch (typeof value) {
case "number":
return arr.concat(value)
default:
return stream.HALT
return stream.SKIP
}
}, [], action)
var child = store.map(function (p) {
Expand Down
Loading