-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathserver.ls
240 lines (204 loc) · 8.11 KB
/
server.ls
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
Bacon = require \baconjs
Yotsuba = require \./yotsuba
_ = require \prelude-ls
require! {
\./Limiter
\./request
fs
colors
#req: \request
lynx
}
board = process.env.BOARD || 'a'
const SAVE_FILE = "/tmp/org.hakase.fountain.#board.json"
stats = new lynx \localhost 8125 scope: \fountain
# inc_gc/full_gc/compactions are all counting from 0
# which is not as useful as a rate, so keep track of last seen
last-inc-gc = 0
last-full-gc = 0
last-heap-compactions = 0
text-content = ->
return '' if not it?
it.replace /<br>/g '\n' .replace /<[^>]+?>/g ''
.replace />/g '>' .replace /"/g '"'
.replace /'/g \' .replace /</g '<'
.replace /&/g '&'
export y = new Yotsuba do
board
if fs.exists-sync SAVE_FILE
try JSON.parse fs.read-file-sync SAVE_FILE
export l = new Limiter 1000ms request.get, (.status-code >= 500)
#y.responses.plug l.responses
l.responses.on-value (res) !->
set-timeout (!-> y.responses.push res), 0
l.responses.on-error !->
stats.increment 'responses.error'
#l.requests.plug y.requests
y.requests.on-value (req) !->
set-timeout (!-> l.requests.push req), 0
#y.ready.plug l.ready
l.ready.on-value (ready) !->
set-timeout (!-> y.ready.push ready), 0
#l.requests.on-value !-> console.log "requesting #{it.path}".green
l.responses.filter (.status-code is not 200)
.on-value !-> console.log "response: #{it.status-code}".red.bold
y.board.on-value !({diff}: board) ->
if board.stale.length > 3
console.log "board too stale (#{board.stale.length}), not logging".yellow
return
threads = _.values board.threads
missing = board.stale.map ->
t = board.threads[it]
return 0 unless t?
t.replies - t.posts.length + 1
if diff.new-threads.length > 0
stats.count 'new-threads' diff.new-threads.length
console.log "#{diff.new-threads.length} new threads \
#{diff.new-threads.map (.no)}".blue.bold
if diff.new-posts.length > 0
stats.count 'new-posts' diff.new-posts.length
for size in diff.new-posts.filter (.fsize?) .map (.fsize)
stats.increment 'new-images'
stats.timing 'image-size' size
console.log "#{diff.new-posts.length} new posts".blue.bold
if diff.deleted-threads.length > 0
stats.count 'deleted-threads' diff.deleted-threads.length
console.log "#{diff.deleted-threads.length} deleted threads: \
#{diff.deleted-threads}".red.bold
if diff.changed-threads.length > 0
stats.increment 'changed-threads'
console.log "#{diff.changed-threads.length} changed threads".red.bold
if diff.deleted-posts.length > 0
stats.count 'deleted-posts' diff.deleted-posts.length
console.log "#{diff.deleted-posts.length} deleted posts: \
#{diff.deleted-posts.map (.no)}".red.bold
if diff.changed-posts.length > 0
stats.count 'changed-posts' diff.changed-posts.length
console.log "#{diff.changed-posts.length} changed posts".red.bold
stats.gauge 'threads' threads.length
stats.gauge 'posts' _.sum threads.map (.posts.length)
stats.gauge 'images' _.sum threads.map (.images)
stats.gauge do
'image-size'
_.sum(threads.map -> _.sum it.posts.map (.fsize || 0)) / 1_000_000
#console.log "#{threads.length} threads, \
##{_.sum threads.map (.posts.length)} posts, \
##{_.sum threads.map (.images)} images (
##{_.sum(threads.map -> _.sum it.posts.map (.fsize || 0)) / 1_000_000}Mb)
#".white.bold
s = board.stale.length
if s > 0
stats.gauge 'stale-threads' s
console.log "#{board.stale.length} stale threads".red.bold
m = _.sum missing
if m > 0
stats.gauge 'missing-posts' m
console.log "#m missing".red.bold
for it in diff.new-posts
latency = Date.now! - it.time * 1000
stats.timing 'post-latency' latency
if latency > 60_000
console.log "================".red
console.log "latent: #latency".red
console.log "#{JSON.stringify it , , 3}".red
for it in diff.changed-posts
console.log "!!!!!!!!!!!!!!!!".red
console.log "#{JSON.stringify it , , 3}".red
l.ready.push true
stringify = JSON~stringify
console.log "listening on #{process.env.PORT || 3500}"
require! express
do express
# we have to make zlib flush synchronously or it'll buffer
# our event streams
..use express.compress { flush: require \zlib .Z_SYNC_FLUSH }
..get "/v1/#board/json" (req, res) !->
unless req.accepts 'application/json'
return res.send 406
req.socket.set-timeout 30_000
res.set-header 'Content-Type', \application/json+stream
res.set-header 'Transfer-Encoding', \identity
res.set-header 'Cache-Control', \no-cache
res.set-header 'Connection', \keep-alive
res.write-head 200
console.log "opened json stream!".white.bold
close = Bacon.from-callback res, \on \close
..on-value !-> console.log "closed json stream!".white.bold
err = Bacon.from-callback res, \on \error
..on-value console.error
y.board.changes!take-until Bacon.merge-all(close, err) .on-value !->
if it.diff.new-posts.length > 0
p = it.diff.new-posts.sort (a, b) -> a.no - b.no
res.write "#{p.map stringify .join '\n'}\n"
Bacon.interval 10_000ms .take-until Bacon.merge-all(close, err) .on-value !->
res.write "\n"
..get "/v1/#board/stream" (req, res) !->
unless req.accepts 'text/event-stream'
return res.send 406
console.log "got request to stream"
req.socket.set-timeout 30_000
# XXX if we implicitly set the headers as an object in
# write-head, then the compression middleware won't see
# our content type and will thus not compress our stream.
# Kind of annoying, since set-header is more verbose.
res.set-header 'Content-Type', \text/event-stream
res.set-header 'Transfer-Encoding', \identity
res.set-header 'Cache-Control', \no-cache
res.set-header 'Connection', \keep-alive
res.set-header 'Access-Control-Allow-Origin', '*'
res.write-head 200
# TODO keep track of changes to rewind last-event-id
res.write ':hi\n\n'
# send initial state over the wire
# TODO figure out how to do this nicely without pushing 8MB of state
# in one event.
if req.param \init
y.board.sampled-by Bacon.once! .on-value !->
res.write "event: init\n
data: #{stringify it.threads}\n\n"
# send reduced initial state, i.e. OP + info for every thread
if req.param \catalog
y.board.sampled-by Bacon.once! .on-value !->
cat = {}
for tno, thread of it.threads
cat[tno] = with {...thread}
..posts = thread.posts.slice 0 1
res.write "event: catalog\n
data: #{stringify cat}\n\n"
console.log "opened event-stream!".white.bold
close = Bacon.from-callback res, \on \close
..on-value !-> console.log "closed event-stream!".white.bold
err = Bacon.from-callback res, \on \error
..on-value console.error
y.board.changes!take-until Bacon.merge-all(close, err) .on-value !->
if it.diff.new-posts.length > 0
res.write "event: new-posts\n
data: #{stringify it.diff.new-posts}\n\n"
if it.diff.deleted-posts.length > 0
res.write "event: deleted-posts\n
data: #{stringify it.diff.deleted-posts}\n\n"
if it.diff.new-threads.length > 0
res.write "event: new-threads\n
data: #{stringify it.diff.new-threads}\n\n"
if it.diff.deleted-threads.length > 0
res.write "event: deleted-threads\n
data: #{stringify it.diff.deleted-threads}\n\n"
Bacon.interval 10_000ms .take-until Bacon.merge-all(close, err) .on-value !->
res.write ":ping\n\n"
..listen process.env.PORT || 3500
!function save-state state, cb
console.error "saving state to #SAVE_FILE..."
json = JSON.stringify(state, null, " ")
fs.write-file SAVE_FILE, json, (err) ->
if err?
console.error "error saving state" err
else
console.error "state saved!"
cb?!
Bacon.interval 30_000ms .map y.board .on-value !->
save-state it
Bacon.from-event-target process, \SIGINT
.merge Bacon.from-event-target process, \SIGPIPE
.map y.board .on-value !->
console.error "caught SIGINT/SIGPIPE, saving..."
save-state it, !-> process.exit 0