-
Notifications
You must be signed in to change notification settings - Fork 2.5k
/
Copy pathwriter.go
181 lines (148 loc) · 6.23 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
// Copyright (c) 2018 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0
package spanstore
import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"time"
"github.com/dgraph-io/badger/v4"
"github.com/gogo/protobuf/proto"
"github.com/jaegertracing/jaeger/model"
)
/*
This store should be easily modified to use any sorted KV-store, which allows set/get/iterators.
That includes RocksDB also (this key structure should work as-is with RocksDB)
Keys are written in BigEndian order to allow lexicographic sorting of keys
*/
const (
spanKeyPrefix byte = 0x80 // All span keys should have first bit set to 1
indexKeyRange byte = 0x0F // Secondary indexes use last 4 bits
serviceNameIndexKey byte = 0x81
operationNameIndexKey byte = 0x82
tagIndexKey byte = 0x83
durationIndexKey byte = 0x84
jsonEncoding byte = 0x01 // Last 4 bits of the meta byte are for encoding type
protoEncoding byte = 0x02 // Last 4 bits of the meta byte are for encoding type
defaultEncoding byte = protoEncoding
)
// SpanWriter for writing spans to badger
type SpanWriter struct {
store *badger.DB
ttl time.Duration
cache *CacheStore
encodingType byte
}
// NewSpanWriter returns a SpawnWriter with cache
func NewSpanWriter(db *badger.DB, c *CacheStore, ttl time.Duration) *SpanWriter {
return &SpanWriter{
store: db,
ttl: ttl,
cache: c,
encodingType: defaultEncoding, // TODO Make configurable
}
}
// WriteSpan writes the encoded span as well as creates indexes with defined TTL
func (w *SpanWriter) WriteSpan(_ context.Context, span *model.Span) error {
//nolint: gosec // G115
expireTime := uint64(time.Now().Add(w.ttl).Unix())
startTime := model.TimeAsEpochMicroseconds(span.StartTime)
// Avoid doing as much as possible inside the transaction boundary, create entries here
entriesToStore := make([]*badger.Entry, 0, len(span.Tags)+4+len(span.Process.Tags)+len(span.Logs)*4)
trace, err := w.createTraceEntry(span, startTime, expireTime)
if err != nil {
return err
}
entriesToStore = append(entriesToStore, trace)
entriesToStore = append(entriesToStore, w.createBadgerEntry(createIndexKey(serviceNameIndexKey, []byte(span.Process.ServiceName), startTime, span.TraceID), nil, expireTime))
entriesToStore = append(entriesToStore, w.createBadgerEntry(createIndexKey(operationNameIndexKey, []byte(span.Process.ServiceName+span.OperationName), startTime, span.TraceID), nil, expireTime))
// It doesn't matter if we overwrite Duration index keys, everything is read at Trace level in any case
durationValue := make([]byte, 8)
binary.BigEndian.PutUint64(durationValue, uint64(model.DurationAsMicroseconds(span.Duration)))
entriesToStore = append(entriesToStore, w.createBadgerEntry(createIndexKey(durationIndexKey, durationValue, startTime, span.TraceID), nil, expireTime))
for _, kv := range span.Tags {
// Convert everything to string since queries are done that way also
// KEY: it<serviceName><tagsKey><traceId> VALUE: <tagsValue>
entriesToStore = append(entriesToStore, w.createBadgerEntry(createIndexKey(tagIndexKey, []byte(span.Process.ServiceName+kv.Key+kv.AsString()), startTime, span.TraceID), nil, expireTime))
}
for _, kv := range span.Process.Tags {
entriesToStore = append(entriesToStore, w.createBadgerEntry(createIndexKey(tagIndexKey, []byte(span.Process.ServiceName+kv.Key+kv.AsString()), startTime, span.TraceID), nil, expireTime))
}
for _, log := range span.Logs {
for _, kv := range log.Fields {
entriesToStore = append(entriesToStore, w.createBadgerEntry(createIndexKey(tagIndexKey, []byte(span.Process.ServiceName+kv.Key+kv.AsString()), startTime, span.TraceID), nil, expireTime))
}
}
err = w.store.Update(func(txn *badger.Txn) error {
// Write the entries
for i := range entriesToStore {
err = txn.SetEntry(entriesToStore[i])
if err != nil {
// Most likely primary key conflict, but let the caller check this
return err
}
}
// TODO Alternative option is to use simpler keys with the merge value interface.
// Requires at least this to be solved: https://github.com/dgraph-io/badger/issues/373
return nil
})
// Do cache refresh here to release the transaction earlier
w.cache.Update(span.Process.ServiceName, span.OperationName, expireTime)
return err
}
func createIndexKey(indexPrefixKey byte, value []byte, startTime uint64, traceID model.TraceID) []byte {
// KEY: indexKey<indexValue><startTime><traceId> (traceId is last 16 bytes of the key)
key := make([]byte, 1+len(value)+8+sizeOfTraceID)
key[0] = (indexPrefixKey & indexKeyRange) | spanKeyPrefix
pos := len(value) + 1
copy(key[1:pos], value)
binary.BigEndian.PutUint64(key[pos:], startTime)
pos += 8 // sizeOfTraceID / 2
binary.BigEndian.PutUint64(key[pos:], traceID.High)
pos += 8 // sizeOfTraceID / 2
binary.BigEndian.PutUint64(key[pos:], traceID.Low)
return key
}
func (*SpanWriter) createBadgerEntry(key []byte, value []byte, expireTime uint64) *badger.Entry {
return &badger.Entry{
Key: key,
Value: value,
ExpiresAt: expireTime,
}
}
func (w *SpanWriter) createTraceEntry(span *model.Span, startTime, expireTime uint64) (*badger.Entry, error) {
pK, pV, err := createTraceKV(span, w.encodingType, startTime)
if err != nil {
return nil, err
}
e := w.createBadgerEntry(pK, pV, expireTime)
e.UserMeta = w.encodingType
return e, nil
}
func createTraceKV(span *model.Span, encodingType byte, startTime uint64) ([]byte, []byte, error) {
// TODO Add Hash for Zipkin compatibility?
// Note, KEY must include startTime for proper sorting order for span-ids
// KEY: ti<trace-id><startTime><span-id> VALUE: All the details (json for now) METADATA: Encoding
key := make([]byte, 1+sizeOfTraceID+8+8)
key[0] = spanKeyPrefix
pos := 1
binary.BigEndian.PutUint64(key[pos:], span.TraceID.High)
pos += 8
binary.BigEndian.PutUint64(key[pos:], span.TraceID.Low)
pos += 8
binary.BigEndian.PutUint64(key[pos:], startTime)
pos += 8
binary.BigEndian.PutUint64(key[pos:], uint64(span.SpanID))
var bb []byte
var err error
switch encodingType {
case protoEncoding:
bb, err = proto.Marshal(span)
case jsonEncoding:
bb, err = json.Marshal(span)
default:
return nil, nil, fmt.Errorf("unknown encoding type: %#02x", encodingType)
}
return key, bb, err
}