-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathencoder.go
142 lines (118 loc) · 3.1 KB
/
encoder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package encoding
import (
"encoding/json"
"fmt"
"math"
"time"
"unsafe"
"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/encoding/m3tsz"
xtime "github.com/m3db/m3/src/x/time"
)
var (
// TODO(rartoul): Eliminate the need for this.
opts = encoding.NewOptions()
)
type Encoder interface {
Encode(timestamp time.Time, value float64) error
LastEncoded() (time.Time, float64, bool)
State() []byte
Restore(b []byte) error
Bytes() []byte
}
type marshalState struct {
TSEncoder m3tsz.TimestampEncoder
FloatEncoder m3tsz.FloatEncoderAndIterator
LastByte byte
BitPos int
HasWrittenFirst bool
}
type encoder struct {
tsEncoder m3tsz.TimestampEncoder
floatEncoder m3tsz.FloatEncoderAndIterator
stream OStream
hasWrittenFirst bool
}
// NewEncoder creates a new encoder.
func NewEncoder() Encoder {
return &encoder{}
}
func (e *encoder) Encode(timestamp time.Time, value float64) error {
if e.stream == nil {
// Lazy init.
e.stream = NewOStream()
e.tsEncoder = m3tsz.NewTimestampEncoder(timestamp, xtime.Nanosecond, opts)
}
e.stream.WriteBit(hasMoreBit)
var (
// Unsafe insanity to temporarily avoid having to fork upstream.
encodingStream = *(*encoding.OStream)(unsafe.Pointer(&e.stream))
err error
)
if !e.hasWrittenFirst {
err = e.tsEncoder.WriteFirstTime(encodingStream, timestamp, nil, xtime.Nanosecond)
} else {
err = e.tsEncoder.WriteNextTime(encodingStream, timestamp, nil, xtime.Nanosecond)
}
if err != nil {
return err
}
e.floatEncoder.WriteFloat(encodingStream, value)
e.hasWrittenFirst = true
return nil
}
func (e *encoder) LastEncoded() (time.Time, float64, bool) {
return e.tsEncoder.PrevTime, math.Float64frombits(e.floatEncoder.PrevFloatBits), e.hasWrittenFirst
}
func (e *encoder) State() []byte {
var (
raw, bitPos = e.stream.Rawbytes()
lastByte byte
)
if len(raw) > 0 {
lastByte = raw[len(raw)-1]
}
marshalState := marshalState{
TSEncoder: e.tsEncoder,
FloatEncoder: e.floatEncoder,
HasWrittenFirst: e.hasWrittenFirst,
LastByte: lastByte,
BitPos: bitPos,
}
// Prevent JSON marshaling error.
marshalState.TSEncoder.Options = nil
// TODO(rartoul): Replace this with something efficient / performant.
marshaled, err := json.Marshal(&marshalState)
if err != nil {
// TODO(rartoul): Remove this once there is a better encoding scheme.
panic(err)
}
return marshaled
}
func (e *encoder) Restore(b []byte) error {
if b == nil {
return fmt.Errorf("cannot restore from nil state")
}
marshalState := marshalState{}
if err := json.Unmarshal(b, &marshalState); err != nil {
return err
}
e.tsEncoder = marshalState.TSEncoder
e.tsEncoder.Options = opts
e.floatEncoder = marshalState.FloatEncoder
e.hasWrittenFirst = marshalState.HasWrittenFirst
if e.stream == nil {
e.stream = NewOStream()
}
// TODO(rartoul): Fix this non-sense.
e.stream.(*ostream).buf = []byte{marshalState.LastByte}
e.stream.(*ostream).pos = marshalState.BitPos
return nil
}
func (e *encoder) Bytes() []byte {
if e.stream == nil {
return nil
}
b, _ := e.stream.Rawbytes()
return b
}