Skip to content

Commit a03e3d3

Browse files
chore: Use pool of bytes.Buffer instead of io.Pipe (#13543)
1 parent 8ca03a2 commit a03e3d3

File tree

4 files changed

+16
-52
lines changed

4 files changed

+16
-52
lines changed

pkg/ingester-rf1/flush.go

+14-10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package ingesterrf1
22

33
import (
4+
"bytes"
45
"crypto/rand"
56
"errors"
67
"fmt"
@@ -12,12 +13,10 @@ import (
1213
"github.com/go-kit/log/level"
1314
"github.com/grafana/dskit/backoff"
1415
"github.com/grafana/dskit/ring"
15-
"github.com/grafana/dskit/runutil"
1616
"github.com/oklog/ulid"
1717
"golang.org/x/net/context"
1818

1919
"github.com/grafana/loki/v3/pkg/storage/wal"
20-
util_log "github.com/grafana/loki/v3/pkg/util/log"
2120
)
2221

2322
const (
@@ -40,6 +39,7 @@ const (
4039
func (i *Ingester) InitFlushWorkers() {
4140
i.flushWorkersDone.Add(i.cfg.ConcurrentFlushes)
4241
for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
42+
i.flushBuffers[j] = new(bytes.Buffer)
4343
go i.flushWorker(j)
4444
}
4545
}
@@ -90,7 +90,7 @@ func (i *Ingester) flushWorker(j int) {
9090
n := it.Writer.InputSize()
9191
humanized := humanize.Bytes(uint64(n))
9292

93-
err = i.flushItem(l, it)
93+
err = i.flushItem(l, j, it)
9494
d := time.Since(start)
9595
if err != nil {
9696
level.Error(l).Log("msg", "failed to flush", "size", humanized, "duration", d, "err", err)
@@ -103,13 +103,13 @@ func (i *Ingester) flushWorker(j int) {
103103
}
104104
}
105105

106-
func (i *Ingester) flushItem(l log.Logger, it *wal.PendingItem) error {
106+
func (i *Ingester) flushItem(l log.Logger, j int, it *wal.PendingItem) error {
107107
ctx, cancelFunc := context.WithCancel(context.Background())
108108
defer cancelFunc()
109109

110110
b := backoff.New(ctx, i.cfg.FlushOpBackoff)
111111
for b.Ongoing() {
112-
err := i.flushSegment(ctx, it.Writer)
112+
err := i.flushSegment(ctx, j, it.Writer)
113113
if err == nil {
114114
break
115115
}
@@ -124,19 +124,23 @@ func (i *Ingester) flushItem(l log.Logger, it *wal.PendingItem) error {
124124
// If the flush is successful, metrics for this flush are to be reported.
125125
// If the flush isn't successful, the operation for this userID is requeued allowing this and all other unflushed
126126
// segments to have another opportunity to be flushed.
127-
func (i *Ingester) flushSegment(ctx context.Context, ch *wal.SegmentWriter) error {
127+
func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter) error {
128128
id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader)
129-
r := ch.Reader()
130129

131130
start := time.Now()
132131
defer func() {
133-
runutil.CloseWithLogOnErr(util_log.Logger, r, "flushSegment")
134132
i.metrics.flushDuration.Observe(time.Since(start).Seconds())
135-
ch.Observe()
133+
w.Observe()
136134
}()
137135

136+
buf := i.flushBuffers[j]
137+
defer buf.Reset()
138+
if _, err := w.WriteTo(buf); err != nil {
139+
return err
140+
}
141+
138142
i.metrics.flushesTotal.Add(1)
139-
if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id.String()), r); err != nil {
143+
if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id.String()), buf); err != nil {
140144
i.metrics.flushFailuresTotal.Inc()
141145
return fmt.Errorf("store put chunk: %w", err)
142146
}

pkg/ingester-rf1/ingester.go

+2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package ingesterrf1
22

33
import (
4+
"bytes"
45
"context"
56
"flag"
67
"fmt"
@@ -205,6 +206,7 @@ type Ingester struct {
205206

206207
// One queue per flush thread. Fingerprint is used to
207208
// pick a queue.
209+
flushBuffers []*bytes.Buffer
208210
flushWorkersDone sync.WaitGroup
209211

210212
wal *wal.Manager

pkg/storage/wal/segment.go

-26
Original file line numberDiff line numberDiff line change
@@ -299,32 +299,6 @@ func (b *SegmentWriter) Reset() {
299299
b.inputSize.Store(0)
300300
}
301301

302-
type EncodedSegmentReader struct {
303-
*io.PipeReader
304-
*io.PipeWriter
305-
}
306-
307-
func (e *EncodedSegmentReader) Close() error {
308-
err := e.PipeWriter.Close()
309-
if err != nil {
310-
return err
311-
}
312-
err = e.PipeReader.Close()
313-
if err != nil {
314-
return err
315-
}
316-
return nil
317-
}
318-
319-
func (b *SegmentWriter) Reader() io.ReadCloser {
320-
pr, pw := io.Pipe()
321-
go func() {
322-
_, err := b.WriteTo(pw)
323-
pw.CloseWithError(err)
324-
}()
325-
return &EncodedSegmentReader{PipeReader: pr, PipeWriter: pw}
326-
}
327-
328302
// InputSize returns the total size of the input data written to the writer.
329303
// It doesn't account for timestamps and labels.
330304
func (b *SegmentWriter) InputSize() int64 {

pkg/storage/wal/segment_test.go

-16
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7-
"io"
87
"sort"
98
"sync"
109
"testing"
@@ -510,19 +509,4 @@ func BenchmarkWrites(b *testing.B) {
510509
require.EqualValues(b, encodedLength, n)
511510
}
512511
})
513-
514-
bytesBuf := make([]byte, encodedLength)
515-
b.Run("Reader", func(b *testing.B) {
516-
b.ResetTimer()
517-
b.ReportAllocs()
518-
for i := 0; i < b.N; i++ {
519-
var err error
520-
reader := writer.Reader()
521-
522-
n, err := io.ReadFull(reader, bytesBuf)
523-
require.NoError(b, err)
524-
require.EqualValues(b, encodedLength, n)
525-
require.NoError(b, reader.Close())
526-
}
527-
})
528512
}

0 commit comments

Comments
 (0)