Skip to content

Commit debb5f2

Browse files
authored
feat(wal): Benchmark and improve WAL writes using Reset. (#13272)
1 parent 05176e4 commit debb5f2

File tree

5 files changed

+206
-63
lines changed

5 files changed

+206
-63
lines changed

pkg/storage/wal/index/buffer.go

+5
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ func (fw *BufferWriter) Close() error {
8686
return nil
8787
}
8888

89+
func (fw *BufferWriter) Reset() {
90+
fw.pos = 0
91+
fw.buf.Reset()
92+
}
93+
8994
func (fw *BufferWriter) Remove() error {
9095
return nil
9196
}

pkg/storage/wal/index/index.go

+31-19
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,6 @@ type PostingsEncoder func(*encoding.Encbuf, []uint32) error
118118
// Writer implements the IndexWriter interface for the standard
119119
// serialization format.
120120
type Writer struct {
121-
ctx context.Context
122-
123121
// For the main index file.
124122
f *BufferWriter
125123

@@ -197,9 +195,8 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
197195

198196
// NewWriter returns a new Writer to the given filename. It serializes data in format version 2.
199197
// It uses the given encoder to encode each postings list.
200-
func NewWriterWithEncoder(ctx context.Context, encoder PostingsEncoder) (*Writer, error) {
198+
func NewWriterWithEncoder(encoder PostingsEncoder) (*Writer, error) {
201199
iw := &Writer{
202-
ctx: ctx,
203200
f: NewBufferWriter(),
204201
fP: NewBufferWriter(),
205202
fPO: NewBufferWriter(),
@@ -222,8 +219,8 @@ func NewWriterWithEncoder(ctx context.Context, encoder PostingsEncoder) (*Writer
222219

223220
// NewWriter creates a new index writer using the default encoder. See
224221
// NewWriterWithEncoder.
225-
func NewWriter(ctx context.Context) (*Writer, error) {
226-
return NewWriterWithEncoder(ctx, EncodePostingsRaw)
222+
func NewWriter() (*Writer, error) {
223+
return NewWriterWithEncoder(EncodePostingsRaw)
227224
}
228225

229226
func (w *Writer) write(bufs ...[]byte) error {
@@ -242,15 +239,36 @@ func (w *Writer) Buffer() ([]byte, io.Closer, error) {
242239
return w.f.Buffer()
243240
}
244241

242+
func (w *Writer) Reset() error {
243+
w.f.Reset()
244+
w.fP.Reset()
245+
w.fPO.Reset()
246+
w.buf1.Reset()
247+
w.buf2.Reset()
248+
w.stage = idxStageNone
249+
w.toc = TOC{}
250+
w.postingsStart = 0
251+
w.numSymbols = 0
252+
w.symbols = nil
253+
w.symbolFile = nil
254+
w.lastSymbol = ""
255+
w.symbolCache = make(map[string]symbolCacheEntry, 1<<8)
256+
w.labelIndexes = w.labelIndexes[:0]
257+
w.labelNames = make(map[string]uint64, 1<<8)
258+
w.lastSeries = nil
259+
w.lastSeriesRef = 0
260+
w.lastChunkRef = 0
261+
w.cntPO = 0
262+
w.crc32.Reset()
263+
if err := w.writeMeta(); err != nil {
264+
return err
265+
}
266+
return nil
267+
}
268+
245269
// ensureStage handles transitions between write stages and ensures that IndexWriter
246270
// methods are called in an order valid for the implementation.
247271
func (w *Writer) ensureStage(s indexWriterStage) error {
248-
select {
249-
case <-w.ctx.Done():
250-
return w.ctx.Err()
251-
default:
252-
}
253-
254272
if w.stage == s {
255273
return nil
256274
}
@@ -691,7 +709,6 @@ func (w *Writer) writePostingsOffsetTable() error {
691709
if err := w.fPO.Remove(); err != nil {
692710
return err
693711
}
694-
w.fPO = nil
695712

696713
err = w.writeLengthAndHash(startPos)
697714
if err != nil {
@@ -854,11 +871,7 @@ func (w *Writer) writePostingsToTmpFiles() error {
854871
}
855872
}
856873
}
857-
select {
858-
case <-w.ctx.Done():
859-
return w.ctx.Err()
860-
default:
861-
}
874+
862875
}
863876
return nil
864877
}
@@ -936,7 +949,6 @@ func (w *Writer) writePostings() error {
936949
if err := w.fP.Remove(); err != nil {
937950
return err
938951
}
939-
w.fP = nil
940952
return nil
941953
}
942954

pkg/storage/wal/index/index_test.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func (m mockIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder,
135135

136136
func TestIndexRW_Create_Open(t *testing.T) {
137137
// An empty index must still result in a readable file.
138-
iw, err := NewWriter(context.Background())
138+
iw, err := NewWriter()
139139
require.NoError(t, err)
140140
require.NoError(t, iw.Close())
141141

@@ -160,7 +160,7 @@ func TestIndexRW_Postings(t *testing.T) {
160160
labels: labels.FromStrings("a", "1", "b", strconv.Itoa(i)),
161161
})
162162
}
163-
ir, buf, _ := createReader(ctx, t, input)
163+
ir, buf, _ := createReader(t, input)
164164

165165
p, err := ir.Postings(ctx, "a", "1")
166166
require.NoError(t, err)
@@ -271,7 +271,7 @@ func TestPostingsMany(t *testing.T) {
271271
labels: labels.FromStrings("i", v, "foo", "bar"),
272272
})
273273
}
274-
ir, _, symbols := createReader(ctx, t, input)
274+
ir, _, symbols := createReader(t, input)
275275

276276
cases := []struct {
277277
in []string
@@ -353,7 +353,7 @@ func TestPersistence_index_e2e(t *testing.T) {
353353
})
354354
}
355355

356-
ir, _, _ := createReader(ctx, t, input)
356+
ir, _, _ := createReader(t, input)
357357

358358
// Population procedure as done by compaction.
359359
var (
@@ -435,7 +435,7 @@ func TestPersistence_index_e2e(t *testing.T) {
435435
}
436436

437437
func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) {
438-
w, err := NewWriter(context.Background())
438+
w, err := NewWriter()
439439
require.NoError(t, err)
440440

441441
require.NoError(t, w.AddSymbol("__name__"))
@@ -523,7 +523,7 @@ func BenchmarkReader_ShardedPostings(b *testing.B) {
523523
labels: labels.FromStrings("const", fmt.Sprintf("%10d", 1), "unique", fmt.Sprintf("%10d", i)),
524524
})
525525
}
526-
ir, _, _ := createReader(ctx, b, input)
526+
ir, _, _ := createReader(b, input)
527527
b.ResetTimer()
528528

529529
for n := 0; n < b.N; n++ {
@@ -540,7 +540,7 @@ func TestDecoder_Postings_WrongInput(t *testing.T) {
540540
}
541541

542542
func TestChunksRefOrdering(t *testing.T) {
543-
idx, err := NewWriter(context.Background())
543+
idx, err := NewWriter()
544544
require.NoError(t, err)
545545

546546
require.NoError(t, idx.AddSymbol("1"))
@@ -558,7 +558,7 @@ func TestChunksRefOrdering(t *testing.T) {
558558
}
559559

560560
func TestChunksTimeOrdering(t *testing.T) {
561-
idx, err := NewWriter(context.Background())
561+
idx, err := NewWriter()
562562
require.NoError(t, err)
563563

564564
require.NoError(t, idx.AddSymbol("1"))
@@ -585,10 +585,10 @@ func TestChunksTimeOrdering(t *testing.T) {
585585

586586
// createFileReader creates a temporary index file. It writes the provided input to this file.
587587
// It returns a Reader for this file, the file's name, and the symbol map.
588-
func createReader(ctx context.Context, tb testing.TB, input indexWriterSeriesSlice) (*Reader, []byte, map[string]struct{}) {
588+
func createReader(tb testing.TB, input indexWriterSeriesSlice) (*Reader, []byte, map[string]struct{}) {
589589
tb.Helper()
590590

591-
iw, err := NewWriter(ctx)
591+
iw, err := NewWriter()
592592
require.NoError(tb, err)
593593

594594
symbols := map[string]struct{}{}

pkg/storage/wal/segment.go

+49-30
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ import (
88
"fmt"
99
"io"
1010
"sort"
11-
12-
"github.com/dolthub/swiss"
11+
"sync"
1312

1413
"github.com/prometheus/prometheus/model/labels"
1514
"github.com/prometheus/prometheus/storage"
@@ -24,8 +23,15 @@ import (
2423

2524
// LOKW is the magic number for the Loki WAL format.
2625
var (
27-
magicNumber = uint32(0x4C4F4B57)
28-
magicBuf [4]byte
26+
magicNumber = uint32(0x4C4F4B57)
27+
magicBuf [4]byte
28+
streamSegmentPool = sync.Pool{
29+
New: func() interface{} {
30+
return &streamSegment{
31+
entries: make([]*logproto.Entry, 0, 4096),
32+
}
33+
},
34+
}
2935
)
3036

3137
func init() {
@@ -37,9 +43,10 @@ type streamID struct {
3743
}
3844

3945
type SegmentWriter struct {
40-
streams *swiss.Map[streamID, *streamSegment]
46+
streams map[streamID]*streamSegment
4147
buf1 encoding.Encbuf
4248
inputSize int64
49+
idxWriter *index.Writer
4350
}
4451

4552
type streamSegment struct {
@@ -49,12 +56,21 @@ type streamSegment struct {
4956
maxt int64
5057
}
5158

59+
func (s *streamSegment) Reset() {
60+
s.entries = s.entries[:0]
61+
}
62+
5263
// NewWalSegmentWriter creates a new WalSegmentWriter.
53-
func NewWalSegmentWriter() *SegmentWriter {
54-
return &SegmentWriter{
55-
streams: swiss.NewMap[streamID, *streamSegment](64),
56-
buf1: encoding.EncWith(make([]byte, 0, 4)),
64+
func NewWalSegmentWriter() (*SegmentWriter, error) {
65+
idxWriter, err := index.NewWriter()
66+
if err != nil {
67+
return nil, err
5768
}
69+
return &SegmentWriter{
70+
streams: make(map[streamID]*streamSegment, 64),
71+
buf1: encoding.EncWith(make([]byte, 0, 4)),
72+
idxWriter: idxWriter,
73+
}, nil
5874
}
5975

6076
// Labels are passed a string `{foo="bar",baz="qux"}` `{foo="foo",baz="foo"}`. labels.Labels => Symbols foo, baz , qux
@@ -66,22 +82,18 @@ func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels
6682
b.inputSize += int64(len(e.Line))
6783
}
6884
id := streamID{labels: labelsString, tenant: tenantID}
69-
s, ok := b.streams.Get(id)
85+
s, ok := b.streams[id]
7086
if !ok {
7187
if lbls.Get(tsdb.TenantLabel) == "" {
7288
lbls = labels.NewBuilder(lbls).Set(tsdb.TenantLabel, tenantID).Labels()
7389
}
74-
s = &streamSegment{
75-
// todo: should be pooled.
76-
// prometheus bucketed pool
77-
// https://pkg.go.dev/github.com/prometheus/prometheus/util/pool
78-
entries: make([]*logproto.Entry, 0, 64),
79-
lbls: lbls,
80-
tenantID: tenantID,
81-
}
90+
s = streamSegmentPool.Get().(*streamSegment)
91+
s.Reset()
92+
s.lbls = lbls
93+
s.tenantID = tenantID
8294
s.maxt = entries[len(entries)-1].Timestamp.UnixNano()
8395
s.entries = append(s.entries, entries...)
84-
b.streams.Put(id, s)
96+
b.streams[id] = s
8597
return
8698
}
8799

@@ -105,22 +117,25 @@ func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels
105117
func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) {
106118
var (
107119
total int64
108-
streams = make([]*streamSegment, 0, b.streams.Count())
120+
streams = make([]*streamSegment, 0, len(b.streams))
109121
)
110122

111123
// Collect all streams and sort them by tenantID and labels.
112-
b.streams.Iter(func(k streamID, v *streamSegment) bool {
113-
streams = append(streams, v)
114-
return false
115-
})
124+
for _, s := range b.streams {
125+
if len(s.entries) == 0 {
126+
continue
127+
}
128+
streams = append(streams, s)
129+
}
130+
116131
sort.Slice(streams, func(i, j int) bool {
117132
if streams[i].tenantID != streams[j].tenantID {
118133
return streams[i].tenantID < streams[j].tenantID
119134
}
120135
return labels.Compare(streams[i].lbls, streams[j].lbls) < 0
121136
})
122137

123-
idxw, err := index.NewWriter(context.TODO())
138+
err := b.idxWriter.Reset()
124139
if err != nil {
125140
return total, err
126141
}
@@ -143,7 +158,7 @@ func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) {
143158

144159
// Add symbols
145160
for _, symbol := range symbols {
146-
if err := idxw.AddSymbol(symbol); err != nil {
161+
if err := b.idxWriter.AddSymbol(symbol); err != nil {
147162
return total, err
148163
}
149164
}
@@ -163,7 +178,7 @@ func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) {
163178
if err != nil {
164179
return total, err
165180
}
166-
err = idxw.AddSeries(storage.SeriesRef(i), s.lbls, chunks.Meta{
181+
err = b.idxWriter.AddSeries(storage.SeriesRef(i), s.lbls, chunks.Meta{
167182
MinTime: s.entries[0].Timestamp.UnixNano(),
168183
MaxTime: s.entries[len(s.entries)-1].Timestamp.UnixNano(),
169184
Ref: chunks.NewChunkRef(uint64(total), uint64(n)),
@@ -175,11 +190,11 @@ func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) {
175190

176191
}
177192

178-
if err := idxw.Close(); err != nil {
193+
if err := b.idxWriter.Close(); err != nil {
179194
return total, err
180195
}
181196

182-
buf, closer, err := idxw.Buffer()
197+
buf, closer, err := b.idxWriter.Buffer()
183198
if err != nil {
184199
return total, err
185200
}
@@ -226,7 +241,11 @@ func (s *streamSegment) WriteTo(w io.Writer) (n int64, err error) {
226241
// Reset clears the writer.
227242
// After calling Reset, the writer can be reused.
228243
func (b *SegmentWriter) Reset() {
229-
b.streams.Clear()
244+
for _, s := range b.streams {
245+
s := s
246+
streamSegmentPool.Put(s)
247+
}
248+
b.streams = make(map[streamID]*streamSegment, 64)
230249
b.buf1.Reset()
231250
b.inputSize = 0
232251
}

0 commit comments

Comments
 (0)