Skip to content

Commit bfe97d7

Browse files
feat: Add metrics to WAL Manager (#13490)
1 parent 8f1d12f commit bfe97d7

File tree

3 files changed

+134
-9
lines changed

3 files changed

+134
-9
lines changed

pkg/ingester-rf1/ingester.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ func New(cfg Config, clientConfig client.Config,
257257
MaxAge: wal.DefaultMaxAge,
258258
MaxSegments: wal.DefaultMaxSegments,
259259
MaxSegmentSize: wal.DefaultMaxSegmentSize,
260-
})
260+
}, wal.NewMetrics(registerer))
261261
if err != nil {
262262
return nil, err
263263
}

pkg/storage/wal/manager.go

+40-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"sync"
77
"time"
88

9+
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/prometheus/client_golang/prometheus/promauto"
911
"github.com/prometheus/prometheus/model/labels"
1012

1113
"github.com/grafana/loki/v3/pkg/logproto"
@@ -81,6 +83,29 @@ type Config struct {
8183
MaxSegmentSize int64
8284
}
8385

86+
type Metrics struct {
87+
NumAvailable prometheus.Gauge
88+
NumPending prometheus.Gauge
89+
NumFlushing prometheus.Gauge
90+
}
91+
92+
func NewMetrics(r prometheus.Registerer) *Metrics {
93+
return &Metrics{
94+
NumAvailable: promauto.With(r).NewGauge(prometheus.GaugeOpts{
95+
Name: "wal_segments_available",
96+
Help: "The number of WAL segments accepting writes.",
97+
}),
98+
NumPending: promauto.With(r).NewGauge(prometheus.GaugeOpts{
99+
Name: "wal_segments_pending",
100+
Help: "The number of WAL segments waiting to be flushed.",
101+
}),
102+
NumFlushing: promauto.With(r).NewGauge(prometheus.GaugeOpts{
103+
Name: "wal_segments_flushing",
104+
Help: "The number of WAL segments being flushed.",
105+
}),
106+
}
107+
}
108+
84109
// Manager buffers segments in memory, and keeps track of which segments are
85110
// available and which are waiting to be flushed. The maximum number of
86111
// segments that can be buffered in memory, and their maximum age and maximum
@@ -97,7 +122,8 @@ type Config struct {
97122
// and returned to the available list. This allows the manager to apply back-pressure
98123
// and avoid congestion collapse due to excessive timeouts and retries.
99124
type Manager struct {
100-
cfg Config
125+
cfg Config
126+
metrics *Metrics
101127

102128
// available is a list of segments that are available and accepting data.
103129
// All segments other than the segment at the front of the list are empty,
@@ -135,13 +161,16 @@ type PendingItem struct {
135161
Writer *SegmentWriter
136162
}
137163

138-
func NewManager(cfg Config) (*Manager, error) {
164+
func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
139165
m := Manager{
140166
cfg: cfg,
167+
metrics: metrics,
141168
available: list.New(),
142169
pending: list.New(),
143170
shutdown: make(chan struct{}),
144171
}
172+
m.metrics.NumPending.Set(0)
173+
m.metrics.NumFlushing.Set(0)
145174
for i := int64(0); i < cfg.MaxSegments; i++ {
146175
w, err := NewWalSegmentWriter()
147176
if err != nil {
@@ -151,6 +180,7 @@ func NewManager(cfg Config) (*Manager, error) {
151180
r: &AppendResult{done: make(chan struct{})},
152181
w: w,
153182
})
183+
m.metrics.NumAvailable.Inc()
154184
}
155185
return &m, nil
156186
}
@@ -171,7 +201,9 @@ func (m *Manager) Append(r AppendRequest) (*AppendResult, error) {
171201
// the closed list to be flushed.
172202
if time.Since(it.firstAppendedAt) >= m.cfg.MaxAge || it.w.InputSize() >= m.cfg.MaxSegmentSize {
173203
m.pending.PushBack(it)
204+
m.metrics.NumPending.Inc()
174205
m.available.Remove(el)
206+
m.metrics.NumAvailable.Dec()
175207
}
176208
return it.r, nil
177209
}
@@ -189,7 +221,9 @@ func (m *Manager) NextPending() (*PendingItem, error) {
189221
it := el.Value.(*item)
190222
if !it.firstAppendedAt.IsZero() && time.Since(it.firstAppendedAt) >= m.cfg.MaxAge {
191223
m.pending.PushBack(it)
224+
m.metrics.NumPending.Inc()
192225
m.available.Remove(el)
226+
m.metrics.NumAvailable.Dec()
193227
}
194228
}
195229
// If the pending list is still empty return nil.
@@ -200,6 +234,8 @@ func (m *Manager) NextPending() (*PendingItem, error) {
200234
el := m.pending.Front()
201235
it := el.Value.(*item)
202236
m.pending.Remove(el)
237+
m.metrics.NumPending.Dec()
238+
m.metrics.NumFlushing.Inc()
203239
return &PendingItem{Result: it.r, Writer: it.w}, nil
204240
}
205241

@@ -209,6 +245,8 @@ func (m *Manager) Put(it *PendingItem) error {
209245
m.mu.Lock()
210246
defer m.mu.Unlock()
211247
it.Writer.Reset()
248+
m.metrics.NumFlushing.Dec()
249+
m.metrics.NumAvailable.Inc()
212250
m.available.PushBack(&item{
213251
r: &AppendResult{done: make(chan struct{})},
214252
w: it.Writer,

pkg/storage/wal/manager_test.go

+93-6
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,19 @@ import (
66
"testing"
77
"time"
88

9-
"github.com/grafana/loki/v3/pkg/logproto"
10-
9+
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/prometheus/client_golang/prometheus/testutil"
1111
"github.com/prometheus/prometheus/model/labels"
1212
"github.com/stretchr/testify/require"
13+
14+
"github.com/grafana/loki/v3/pkg/logproto"
1315
)
1416

1517
func TestManager_Append(t *testing.T) {
1618
m, err := NewManager(Config{
1719
MaxSegments: 1,
1820
MaxSegmentSize: 1024, // 1KB
19-
})
21+
}, NewMetrics(nil))
2022
require.NoError(t, err)
2123

2224
// Append some data.
@@ -96,7 +98,7 @@ func TestManager_Append_ErrFull(t *testing.T) {
9698
m, err := NewManager(Config{
9799
MaxSegments: 10,
98100
MaxSegmentSize: 1024, // 1KB
99-
})
101+
}, NewMetrics(nil))
100102
require.NoError(t, err)
101103

102104
// Should be able to write to all 10 segments of 1KB each.
@@ -140,7 +142,7 @@ func TestManager_NextPending(t *testing.T) {
140142
MaxAge: DefaultMaxAge,
141143
MaxSegments: 1,
142144
MaxSegmentSize: 1024, // 1KB
143-
})
145+
}, NewMetrics(nil))
144146
require.NoError(t, err)
145147

146148
// There should be no items as no data has been written.
@@ -195,7 +197,7 @@ func TestManager_Put(t *testing.T) {
195197
m, err := NewManager(Config{
196198
MaxSegments: 10,
197199
MaxSegmentSize: 1024, // 1KB
198-
})
200+
}, NewMetrics(nil))
199201
require.NoError(t, err)
200202

201203
// There should be 10 available segments, and 0 pending.
@@ -242,3 +244,88 @@ func TestManager_Put(t *testing.T) {
242244
// The segment should be reset.
243245
require.Equal(t, int64(0), it.Writer.InputSize())
244246
}
247+
248+
func TestManager_Metrics(t *testing.T) {
249+
r := prometheus.NewRegistry()
250+
m, err := NewManager(Config{
251+
MaxSegments: 1,
252+
MaxSegmentSize: 1024, // 1KB
253+
}, NewMetrics(r))
254+
require.NoError(t, err)
255+
256+
metricNames := []string{
257+
"wal_segments_available",
258+
"wal_segments_flushing",
259+
"wal_segments_pending",
260+
}
261+
expected := `
262+
# HELP wal_segments_available The number of WAL segments accepting writes.
263+
# TYPE wal_segments_available gauge
264+
wal_segments_available 1
265+
# HELP wal_segments_flushing The number of WAL segments being flushed.
266+
# TYPE wal_segments_flushing gauge
267+
wal_segments_flushing 0
268+
# HELP wal_segments_pending The number of WAL segments waiting to be flushed.
269+
# TYPE wal_segments_pending gauge
270+
wal_segments_pending 0
271+
`
272+
require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...))
273+
274+
// Appending 1KB of data.
275+
lbs := labels.Labels{{Name: "foo", Value: "bar"}}
276+
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("b", 1024)}}
277+
_, err = m.Append(AppendRequest{
278+
TenantID: "1",
279+
Labels: lbs,
280+
LabelsStr: lbs.String(),
281+
Entries: entries,
282+
})
283+
require.NoError(t, err)
284+
285+
// This should move the segment from the available to the pending list.
286+
expected = `
287+
# HELP wal_segments_available The number of WAL segments accepting writes.
288+
# TYPE wal_segments_available gauge
289+
wal_segments_available 0
290+
# HELP wal_segments_flushing The number of WAL segments being flushed.
291+
# TYPE wal_segments_flushing gauge
292+
wal_segments_flushing 0
293+
# HELP wal_segments_pending The number of WAL segments waiting to be flushed.
294+
# TYPE wal_segments_pending gauge
295+
wal_segments_pending 1
296+
`
297+
require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...))
298+
299+
// Get the segment from the pending list.
300+
it, err := m.NextPending()
301+
require.NoError(t, err)
302+
require.NotNil(t, it)
303+
expected = `
304+
# HELP wal_segments_available The number of WAL segments accepting writes.
305+
# TYPE wal_segments_available gauge
306+
wal_segments_available 0
307+
# HELP wal_segments_flushing The number of WAL segments being flushed.
308+
# TYPE wal_segments_flushing gauge
309+
wal_segments_flushing 1
310+
# HELP wal_segments_pending The number of WAL segments waiting to be flushed.
311+
# TYPE wal_segments_pending gauge
312+
wal_segments_pending 0
313+
`
314+
require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...))
315+
316+
// Reset the segment and put it back in the available list.
317+
require.NoError(t, m.Put(it))
318+
expected = `
319+
# HELP wal_segments_available The number of WAL segments accepting writes.
320+
# TYPE wal_segments_available gauge
321+
wal_segments_available 1
322+
# HELP wal_segments_flushing The number of WAL segments being flushed.
323+
# TYPE wal_segments_flushing gauge
324+
wal_segments_flushing 0
325+
# HELP wal_segments_pending The number of WAL segments waiting to be flushed.
326+
# TYPE wal_segments_pending gauge
327+
wal_segments_pending 0
328+
`
329+
require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...))
330+
331+
}

0 commit comments

Comments
 (0)