-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathseq.go
473 lines (426 loc) · 14.8 KB
/
seq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
package encoding
import (
"fmt"
"math"
"time"
"github.com/getlantern/goexpr"
"github.com/getlantern/golog"
"github.com/getlantern/zenodb/expr"
)
var (
log = golog.LoggerFor("zenodb.encoding")
)
// Sequence represents a time-ordered sequence of accumulator states in
// descending time order. The first 8 bytes are the timestamp at which the
// Sequence ends, and after that each n bytes are data for the next interval
// in the Sequence going back in time, where n is determined by the type of
// Expr.
type Sequence []byte
// NewSequence allocates a new Sequence that holds the given number of periods
// with accumulator states of the given width.
func NewSequence(width int, numPeriods int) Sequence {
return make(Sequence, Width64bits+numPeriods*width)
}
// NewFloatValue makes a single-value sequence from a simple expression,
// timestamp float value.
func NewFloatValue(e expr.Expr, ts time.Time, val float64) Sequence {
return NewValue(e, ts, expr.FloatParams(val), nil)
}
// NewValue makes a single-value sequence from a simple expression, timestamp,
// params and metadata.
func NewValue(e expr.Expr, ts time.Time, params expr.Params, metadata goexpr.Params) Sequence {
seq := NewSequence(e.EncodedWidth(), 1)
seq.SetUntil(ts)
seq.UpdateValueAt(0, e, params, metadata)
return seq
}
// Until returns the most recent date represented by this Sequence.
func (seq Sequence) Until() time.Time {
if len(seq) == 0 {
return zeroTime
}
return TimeFromBytes(seq)
}
// Until returns the most recent date represented by this Sequence as an integer
func (seq Sequence) UntilInt() int64 {
if len(seq) == 0 {
return 0
}
return TimeIntFromBytes(seq)
}
// AsOf returns the oldest date represented by this Sequence.
func (seq Sequence) AsOf(width int, resolution time.Duration) time.Time {
if len(seq) == 0 {
return zeroTime
}
return seq.Until().Add(-1 * time.Duration(seq.NumPeriods(width)) * resolution)
}
// SetUntil sets the until time of this Sequence.
func (seq Sequence) SetUntil(t time.Time) {
Binary.PutUint64(seq, uint64(t.UnixNano()))
}
// NumPeriods returns the number of periods in this Sequence assuming the given
// width.
func (seq Sequence) NumPeriods(width int) int {
if len(seq) == 0 {
return 0
}
return seq.DataLength() / width
}
// DataLength returns the number of bytes in this Sequence excluding the start
// time.
func (seq Sequence) DataLength() int {
return len(seq) - Width64bits
}
// ValueAtTime returns the value at the given time within this sequence,
// extracted using the given Expr and assuming each period represents 1 *
// resolution. If no value is set for the given time, found will be false.
func (seq Sequence) ValueAtTime(t time.Time, e expr.Expr, resolution time.Duration) (val float64, found bool) {
if e.IsConstant() {
val, found, _ = e.Get(nil)
return
}
if len(seq) == 0 {
return 0, false
}
until := seq.Until()
t = RoundTimeUntilUp(t, resolution, until)
if t.After(until) {
return 0, false
}
period := int(until.Sub(t) / resolution)
return seq.ValueAt(period, e)
}
// ValueAt returns the value at the given period extracted using the given Expr.
// If no value is set for the given period, found will be false.
func (seq Sequence) ValueAt(period int, e expr.Expr) (val float64, found bool) {
if e.IsConstant() {
val, found, _ = e.Get(nil)
return
}
if len(seq) == 0 {
return 0, false
}
if period < 0 {
return 0, false
}
return seq.ValueAtOffset(period*e.EncodedWidth(), e)
}
// ValueAtOffset returns the value at the given byte offset in the Sequence (not
// including the start time) extracted using the given Expr. If no value is set
// for the given offset, found will be false.
func (seq Sequence) ValueAtOffset(offset int, e expr.Expr) (val float64, found bool) {
if e.IsConstant() {
val, found, _ = e.Get(nil)
return
}
if len(seq) == 0 {
return 0, false
}
offset = offset + Width64bits
if offset >= len(seq) {
return 0, false
}
val, wasSet, _ := e.Get(seq[offset:])
return val, wasSet
}
// UpdateValueAt updates the value at the given period by applying the supplied
// Params to the given expression. metadata represents metadata about the
// operation that's used by the Expr as well (e.g. information about the
// dimensions associated to the value).
func (seq Sequence) UpdateValueAt(period int, e expr.Expr, params expr.Params, metadata goexpr.Params) {
seq.UpdateValueAtOffset(period*e.EncodedWidth(), e, params, metadata)
}
// UpdateValueAtOffset updates the value at the given byte offset by applying
// the supplied Params to the given expression. metadata represents metadata
// about the operation that's used by the Expr as well (e.g. information about
// the dimensions associated to the value).
func (seq Sequence) UpdateValueAtOffset(offset int, e expr.Expr, params expr.Params, metadata goexpr.Params) {
offset = offset + Width64bits
e.Update(seq[offset:], params, metadata)
}
// Update unpacks the given TSParams and calls UpdateValue.
func (seq Sequence) Update(tsp TSParams, metadata goexpr.Params, e expr.Expr, resolution time.Duration, truncateBefore time.Time) Sequence {
ts, params := tsp.TimeAndParams()
return seq.UpdateValue(ts, params, metadata, e, resolution, truncateBefore)
}
// UpdateValue updates the value at the given time by applying the given params
// using the given Expr. The resolution indicates how wide we assume each period
// of data to be. Any values in the sequence older than truncateBefore
// including the new value) will be omitted from the sequence. If the sequence
// needs to be grown to accommodate the updated value, it will be. metadata
// represents metadata about the operation that's used by the Expr as well (e.g.
// information about the dimensions associated to the value).
//
// The returned Sequence may reference the same underlying byte array as the
// updated sequence, or it may be a newly allocated byte array (i.e. if the
// sequence grew).
func (seq Sequence) UpdateValue(ts time.Time, params expr.Params, metadata goexpr.Params, e expr.Expr, resolution time.Duration, truncateBefore time.Time) Sequence {
width := e.EncodedWidth()
until := seq.Until()
ts = RoundTimeUp(ts, resolution)
if until.IsZero() {
// sequence has no until, use ts
until = ts
}
truncateBefore = RoundTimeUntilUp(truncateBefore, resolution, until)
if log.IsTraceEnabled() {
log.Tracef("Updating sequence starting at %v to %v at %v, truncating before %v", seq.Until().In(time.UTC), params, ts.In(time.UTC), truncateBefore.In(time.UTC))
}
if !ts.After(truncateBefore) {
log.Tracef("New value falls outside of truncation range, just truncate existing sequence to %v", truncateBefore)
return seq.Truncate(width, resolution, truncateBefore, zeroTime)
}
sequenceEmpty := len(seq) == 0
var start time.Time
var gapPeriods int
var maxPeriods int
if !sequenceEmpty {
start = seq.Until()
gapPeriods = int(ts.Sub(start) / resolution)
maxPeriods = int(ts.Sub(truncateBefore) / resolution)
}
if sequenceEmpty || start.Before(truncateBefore) || gapPeriods > maxPeriods {
log.Trace("Creating new sequence")
out := make(Sequence, Width64bits+width)
out.SetUntil(ts)
out.UpdateValueAt(0, e, params, metadata)
return out
}
if ts.After(start) {
log.Trace("Prepending to sequence")
numPeriods := seq.NumPeriods(width) + gapPeriods
origEnd := len(seq)
if numPeriods > maxPeriods {
log.Trace("Truncating existing sequence")
numPeriods = maxPeriods
origEnd = Width64bits + width*(numPeriods-gapPeriods)
}
out := NewSequence(width, numPeriods)
copy(out[Width64bits+gapPeriods*width:], seq[Width64bits:origEnd])
out.SetUntil(ts)
out.UpdateValueAt(0, e, params, metadata)
return out
}
log.Trace("Updating existing entry on sequence")
out := seq
period := int(start.Sub(ts) / resolution)
offset := period * width
if offset+width >= len(seq) {
// Grow seq
out = make(Sequence, offset+Width64bits+width)
copy(out, seq)
}
out.UpdateValueAtOffset(offset, e, params, metadata)
return out
}
func (seq Sequence) SubMerge(other Sequence, metadata goexpr.Params, resolution time.Duration, otherResolution time.Duration, ex expr.Expr, otherEx expr.Expr, submerge expr.SubMerge, asOf time.Time, until time.Time, strideSlice time.Duration) (result Sequence) {
shiftBack := -1 * ex.Shift()
result = seq
otherWidth := otherEx.EncodedWidth()
otherAsOf := other.AsOf(otherEx.EncodedWidth(), otherResolution)
if otherAsOf.Before(asOf) {
otherAsOf = asOf
}
other = other.Truncate(otherWidth, otherResolution, asOf.Add(-1*shiftBack), until)
otherPeriods := other.NumPeriods(otherWidth)
if otherPeriods == 0 {
return
}
width := ex.EncodedWidth()
result = seq.Truncate(width, resolution, asOf, until)
resultUntil := result.Until()
otherUntil := other.Until()
if shiftBack > 0 {
shiftedOtherUntil := otherUntil.Add(shiftBack)
if shiftedOtherUntil.After(until) {
shiftedOtherUntil = until
}
growByPeriods := int(shiftedOtherUntil.Sub(otherUntil) / otherResolution)
if growByPeriods > 0 {
growBy := growByPeriods * otherWidth
// grow other to give us a chance to pick up the shifted values
grown := make(Sequence, len(other)+growBy)
grown.SetUntil(shiftedOtherUntil)
copy(grown[Width64bits+growBy:], other[Width64bits:])
other = grown
otherUntil = shiftedOtherUntil
otherPeriods = other.NumPeriods(otherWidth)
}
}
newUntil := RoundTimeUntilUp(otherUntil, resolution, until)
if len(result) <= Width64bits {
result = NewSequence(width, 1)
result.SetUntil(newUntil)
resultUntil = newUntil
} else {
periodsToPrepend := int(newUntil.Sub(resultUntil) / resolution)
if periodsToPrepend > 0 {
prepended := NewSequence(width, periodsToPrepend)
prepended.SetUntil(newUntil)
// Append existing data
prepended = append(prepended, result[Width64bits:]...)
result = prepended
resultUntil = newUntil
}
}
oldAsOf := RoundTimeUntilUp(result.AsOf(width, resolution), resolution, resultUntil)
newAsOf := RoundTimeUntilDown(otherAsOf, resolution, resultUntil)
periodsToAppend := int(oldAsOf.Sub(newAsOf) / resolution)
if periodsToAppend > 0 {
appended := NewSequence(width, result.NumPeriods(width)+periodsToAppend)
copy(appended, result)
result = appended
}
// We assume that resolution is a positive integer multiple of otherResolution
// (i.e. caller already checked this)
scale := int(resolution / otherResolution)
untilOffset := int(resultUntil.Sub(otherUntil) / otherResolution)
resultPeriods := result.NumPeriods(width)
strideSlicePeriods := int(strideSlice / otherResolution)
for po := 0; po < otherPeriods; po++ {
p := int(math.Floor(float64(po+untilOffset) / float64(scale)))
if p >= resultPeriods {
break
}
if strideSlice <= 0 || (po+untilOffset)%scale < strideSlicePeriods {
submerge(result[Width64bits+p*width:], other[Width64bits+po*otherWidth:], otherResolution, metadata)
}
}
return
}
// Merge merges the other Sequence into this Sequence by applying the given
// Expr's merge operator to each period in both Sequences. The resulting
// Sequence will start at the early of the two Sequence's start times, and will
// end at the later of the two Sequence's start times, or at the given
// truncateBefore if that's later.
//
// The returned Sequence may reference the same underlying byte array as one or
// the other Sequence if nothing needed merging, otherwise it will be a newly
// allocated byte array. Merge will NOT update either of the supplied arrays.
func (seq Sequence) Merge(other Sequence, e expr.Expr, resolution time.Duration, truncateBefore time.Time) Sequence {
if len(seq) == 0 {
return other
}
if len(other) == 0 {
return seq
}
sa := seq
sb := other
startA := sa.Until()
startB := sb.Until()
if startB.After(startA) {
// Switch
sa, startA, sb, startB = sb, startB, sa, startA
}
truncateBefore = RoundTimeUntilUp(truncateBefore, resolution, startA)
if startB.Before(truncateBefore) {
return sa
}
encodedWidth := e.EncodedWidth()
aPeriods := sa.NumPeriods(encodedWidth)
bPeriods := sb.NumPeriods(encodedWidth)
endA := startA.Add(-1 * time.Duration(aPeriods) * resolution)
endB := startB.Add(-1 * time.Duration(bPeriods) * resolution)
end := endB
if endA.Before(endB) {
end = endA
}
totalPeriods := int(startA.Sub(end) / resolution)
out := make(Sequence, Width64bits+totalPeriods*encodedWidth)
sout := out
// Set start
copy(sout, sa[:Width64bits])
sout = sout[Width64bits:]
sa = sa[Width64bits:]
sb = sb[Width64bits:]
// Handle starting window with no overlap
leadEnd := startB
if startB.Before(endA) {
leadEnd = endA
}
leadNoOverlapPeriods := int(startA.Sub(leadEnd) / resolution)
if leadNoOverlapPeriods > 0 {
l := leadNoOverlapPeriods * encodedWidth
copy(sout, sa[:l])
sout = sout[l:]
sa = sa[l:]
}
if startB.After(endA) {
// Handle middle window with overlap
overlapPeriods := 0
if endB.After(endA) {
overlapPeriods = int(startA.Sub(endB) / resolution)
} else {
overlapPeriods = int(startA.Sub(endA) / resolution)
}
overlapPeriods -= leadNoOverlapPeriods
for i := 0; i < overlapPeriods; i++ {
sout, sa, sb = e.Merge(sout, sa, sb)
}
} else if startB.Before(endA) {
// Handle gap
gapPeriods := int(endA.Sub(startB) / resolution)
gap := gapPeriods * encodedWidth
sout = sout[gap:]
}
// Handle end window with no overlap
if endA.Before(endB) {
copy(sout, sa)
} else if endB.Before(endA) {
copy(sout, sb)
}
return out
}
// Truncate truncates all periods in the Sequence that fall outside of the given
// asOf and until.
func (seq Sequence) Truncate(width int, resolution time.Duration, asOf time.Time, until time.Time) (result Sequence) {
if len(seq) == 0 {
return nil
}
result = seq
oldUntil := result.Until()
asOf = RoundTimeUntilDown(asOf, resolution, oldUntil)
until = RoundTimeUntilDown(until, resolution, oldUntil)
if !until.IsZero() {
periodsToRemove := int(oldUntil.Sub(until) / resolution)
if periodsToRemove > 0 {
bytesToRemove := periodsToRemove * width
if bytesToRemove+Width64bits >= len(seq) {
return nil
}
result = result[bytesToRemove:]
result.SetUntil(until)
}
}
if !asOf.IsZero() {
maxPeriods := int(result.Until().Sub(asOf) / resolution)
if maxPeriods <= 0 {
// Entire sequence falls outside of truncation range
return nil
}
maxLength := Width64bits + maxPeriods*width
if maxLength >= len(result) {
return result
}
return result[:maxLength]
}
return result
}
// String provides a string representation of this Sequence assuming that it
// holds data for the given Expr.
func (seq Sequence) String(e expr.Expr, resolution time.Duration) string {
if len(seq) == 0 {
return ""
}
values := ""
numPeriods := seq.NumPeriods(e.EncodedWidth())
for i := 0; i < numPeriods; i++ {
if i > 0 {
values += " "
}
val, _ := seq.ValueAt(numPeriods-1-i, e)
values += fmt.Sprint(val)
}
return fmt.Sprintf("%v from %v -> %v: %v", e, seq.AsOf(e.EncodedWidth(), resolution).In(time.UTC), seq.Until().In(time.UTC), values)
}