-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
Copy pathencoding.go
184 lines (159 loc) · 5.44 KB
/
encoding.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
// Package kafka provides encoding and decoding functionality for Loki's Kafka integration.
package kafka
import (
"errors"
"fmt"
math_bits "math/bits"
"sync"
"github.com/twmb/franz-go/pkg/kgo"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
)
var encoderPool = sync.Pool{
New: func() any {
return &logproto.Stream{}
},
}
// Encode converts a logproto.Stream into one or more Kafka records.
// It handles splitting large streams into multiple records if necessary.
//
// The encoding process works as follows:
// 1. If the stream size is smaller than maxSize, it's encoded into a single record.
// 2. For larger streams, it splits the entries into multiple batches, each under maxSize.
// 3. The data is wrapped in a Kafka record with the tenant ID as the key.
//
// The format of each record is:
// - Key: Tenant ID (used for routing, not for partitioning)
// - Value: Protobuf serialized logproto.Stream
// - Partition: As specified in the partitionID parameter
//
// Parameters:
// - partitionID: The Kafka partition ID for the record
// - tenantID: The tenant ID for the stream
// - stream: The logproto.Stream to be encoded
// - maxSize: The maximum size of each Kafka record
func Encode(partitionID int32, tenantID string, stream logproto.Stream, maxSize int) ([]*kgo.Record, error) {
reqSize := stream.Size()
// Fast path for small requests
if reqSize <= maxSize {
rec, err := marshalWriteRequestToRecord(partitionID, tenantID, stream)
if err != nil {
return nil, err
}
return []*kgo.Record{rec}, nil
}
var records []*kgo.Record
batch := encoderPool.Get().(*logproto.Stream)
defer encoderPool.Put(batch)
batch.Labels = stream.Labels
batch.Hash = stream.Hash
if batch.Entries == nil {
batch.Entries = make([]logproto.Entry, 0, 1024)
}
batch.Entries = batch.Entries[:0]
labelsSize := batch.Size()
currentSize := labelsSize
for i, entry := range stream.Entries {
l := entry.Size()
// Size of the entry in the stream
entrySize := 1 + l + sovPush(uint64(l))
// Check if a single entry is too big
if entrySize > maxSize || (i == 0 && currentSize+entrySize > maxSize) {
return nil, fmt.Errorf("single entry size (%d) exceeds maximum allowed size (%d)", entrySize, maxSize)
}
if currentSize+entrySize > maxSize {
// Current stream is full, create a record and start a new stream
if len(batch.Entries) > 0 {
rec, err := marshalWriteRequestToRecord(partitionID, tenantID, *batch)
if err != nil {
return nil, err
}
records = append(records, rec)
}
// Reset currentStream
batch.Entries = batch.Entries[:0]
currentSize = labelsSize
}
batch.Entries = append(batch.Entries, entry)
currentSize += entrySize
}
// Handle any remaining entries
if len(batch.Entries) > 0 {
rec, err := marshalWriteRequestToRecord(partitionID, tenantID, *batch)
if err != nil {
return nil, err
}
records = append(records, rec)
}
if len(records) == 0 {
return nil, errors.New("no valid records created")
}
return records, nil
}
func marshalWriteRequestToRecord(partitionID int32, tenantID string, stream logproto.Stream) (*kgo.Record, error) {
data, err := stream.Marshal()
if err != nil {
return nil, fmt.Errorf("failed to marshal stream: %w", err)
}
return &kgo.Record{
Key: []byte(tenantID),
Value: data,
Partition: partitionID,
}, nil
}
// Decoder is responsible for decoding Kafka record data back into logproto.Stream format.
// It caches parsed labels for efficiency.
type Decoder struct {
stream *logproto.Stream
cache *lru.Cache[string, labels.Labels]
}
func NewDecoder() (*Decoder, error) {
cache, err := lru.New[string, labels.Labels](5000)
if err != nil {
return nil, fmt.Errorf("failed to create LRU cache: %w", err)
}
return &Decoder{
stream: &logproto.Stream{},
cache: cache,
}, nil
}
// Decode converts a Kafka record's byte data back into a logproto.Stream and labels.Labels.
// The decoding process works as follows:
// 1. Unmarshal the data into a logproto.Stream.
// 2. Parse and cache the labels for efficiency in future decodes.
//
// Returns the decoded logproto.Stream, parsed labels, and any error encountered.
func (d *Decoder) Decode(data []byte) (logproto.Stream, labels.Labels, error) {
d.stream.Entries = d.stream.Entries[:0]
if err := d.stream.Unmarshal(data); err != nil {
return logproto.Stream{}, nil, fmt.Errorf("failed to unmarshal stream: %w", err)
}
var ls labels.Labels
if cachedLabels, ok := d.cache.Get(d.stream.Labels); ok {
ls = cachedLabels
} else {
var err error
ls, err = syntax.ParseLabels(d.stream.Labels)
if err != nil {
return logproto.Stream{}, nil, fmt.Errorf("failed to parse labels: %w", err)
}
d.cache.Add(d.stream.Labels, ls)
}
return *d.stream, ls, nil
}
// DecodeWithoutLabels converts a Kafka record's byte data back into a logproto.Stream without parsing labels.
func (d *Decoder) DecodeWithoutLabels(data []byte) (logproto.Stream, error) {
d.stream.Entries = d.stream.Entries[:0]
if err := d.stream.Unmarshal(data); err != nil {
return logproto.Stream{}, fmt.Errorf("failed to unmarshal stream: %w", err)
}
return *d.stream, nil
}
// sovPush calculates the size of varint-encoded uint64.
// It is used to determine the number of bytes needed to encode a uint64 value
// in Protocol Buffers' variable-length integer format.
func sovPush(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}