-
-
Notifications
You must be signed in to change notification settings - Fork 927
/
Copy pathstream.js
171 lines (148 loc) · 5.07 KB
/
stream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/* eslint-disable */
;(function() {
"use strict"
/* eslint-enable */
var guid = 0, HALT = {}
function createStream() {
function stream() {
if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, arguments[0])
return stream._state.value
}
initStream(stream)
if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, arguments[0])
return stream
}
function initStream(stream) {
stream.constructor = createStream
stream._state = {id: guid++, value: undefined, state: 0, derive: undefined, recover: undefined, deps: {}, parents: [], endStream: undefined, unregister: undefined}
stream.map = stream["fantasy-land/map"] = map, stream["fantasy-land/ap"] = ap, stream["fantasy-land/of"] = createStream
stream.toJSON = toJSON
Object.defineProperties(stream, {
end: {get: function() {
if (!stream._state.endStream) {
var endStream = createStream()
endStream.map(function(value) {
if (value === true) {
unregisterStream(stream)
endStream._state.unregister = function(){unregisterStream(endStream)}
}
return value
})
stream._state.endStream = endStream
}
return stream._state.endStream
}}
})
}
function updateStream(stream, value) {
updateState(stream, value)
for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false)
if (stream._state.unregister != null) stream._state.unregister()
finalize(stream)
}
function updateState(stream, value) {
stream._state.value = value
stream._state.changed = true
if (stream._state.state !== 2) stream._state.state = 1
}
function updateDependency(stream, mustSync) {
var state = stream._state, parents = state.parents
if (parents.length > 0 && parents.every(active) && (mustSync || parents.some(changed))) {
var value = stream._state.derive()
if (value === HALT) return unregisterStream(stream)
updateState(stream, value)
}
}
function finalize(stream) {
stream._state.changed = false
for (var id in stream._state.deps) stream._state.deps[id]._state.changed = false
}
function combine(fn, streams) {
if (!streams.every(valid)) throw new Error("Ensure that each item passed to stream.combine/merge/lift is a stream")
return initDependency(createStream(), streams, function() {
return fn.apply(this, streams.concat([streams.filter(changed)]))
})
}
function initDependency(dep, streams, derive) {
var state = dep._state
state.derive = derive
state.parents = streams.filter(notEnded)
registerDependency(dep, state.parents)
updateDependency(dep, true)
return dep
}
function registerDependency(stream, parents) {
for (var i = 0; i < parents.length; i++) {
parents[i]._state.deps[stream._state.id] = stream
registerDependency(stream, parents[i]._state.parents)
}
}
function unregisterStream(stream) {
for (var i = 0; i < stream._state.parents.length; i++) {
var parent = stream._state.parents[i]
delete parent._state.deps[stream._state.id]
}
for (var id in stream._state.deps) {
var dependent = stream._state.deps[id]
var index = dependent._state.parents.indexOf(stream)
if (index > -1) dependent._state.parents.splice(index, 1)
}
stream._state.state = 2 //ended
stream._state.deps = {}
}
function map(fn) {return combine(function(stream) {return fn(stream())}, [this])}
function ap(stream) {return combine(function(s1, s2) {return s1()(s2())}, [stream, this])}
function toJSON() {return this._state.value != null && typeof this._state.value.toJSON === "function" ? this._state.value.toJSON() : this._state.value}
function valid(stream) {return stream._state }
function active(stream) {return stream._state.state === 1}
function changed(stream) {return stream._state.changed}
function notEnded(stream) {return stream._state.state !== 2}
function merge(streams) {
return combine(function() {
return streams.map(function(s) {return s()})
}, streams)
}
function scan(reducer, seed, stream) {
var newStream = combine(function (s) {
var next = reducer(seed, s._state.value)
if (next !== HALT) return seed = next
return HALT
}, [stream])
if (newStream._state.state === 0) newStream(seed)
return newStream
}
function scanMerge(tuples, seed) {
var streams = tuples.map(function(tuple) {
var stream = tuple[0]
if (stream._state.state === 0) stream(undefined)
return stream
})
var newStream = combine(function() {
var changed = arguments[arguments.length - 1]
streams.forEach(function(stream, idx) {
if (changed.indexOf(stream) > -1) {
seed = tuples[idx][1](seed, stream._state.value)
}
})
return seed
}, streams)
return newStream
}
function lift() {
var fn = arguments[0]
var streams = Array.prototype.slice.call(arguments, 1)
return merge(streams).map(function(streams) {
return fn.apply(undefined, streams)
})
}
createStream["fantasy-land/of"] = createStream
createStream.merge = merge
createStream.combine = combine
createStream.scan = scan
createStream.scanMerge = scanMerge
createStream.lift = lift
createStream.HALT = HALT
if (typeof module !== "undefined") module["exports"] = createStream
else if (typeof window.m === "function" && !("stream" in window.m)) window.m.stream = createStream
else window.m = {stream : createStream}
}());