Skip to content

Commit

Permalink
[exporter/prometheusremotewrite] Fix WAL deadlock (#37630)
Browse files Browse the repository at this point in the history
I was taking a look over #20875 and hoping to finish it.
Fixes #19363
Fixes #24399
Fixes #15277 

---

As mentioned in
#24399 (comment),
I used a library to help me understand how the deadlock was happening.
(1st commit). It showed that `persistToWal` was trying to acquire the
lock, while `readPrompbFromWal` held it forever.

I changed the strategy here and instead of using fs.Notify, and all that
complicated logic around it, we're just using a pub/sub strategy between
the writer and reader Go routines.

The reader go routine, once finding an empty WAL, will now release the
lock immediately and wait for a notification from the writer. While
previously it would hold the lock while waiting for a write that would
never happen.

---------

Signed-off-by: Arthur Silva Sens <[email protected]>
  • Loading branch information
ArthurSens authored Feb 4, 2025
1 parent efddc6f commit 7f581ca
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 67 deletions.
19 changes: 19 additions & 0 deletions .chloggen/prw-WAL-deadlock.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusremotewriteexproter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: |
Resolves a deadlock in the WAL by temporarily releasing a lock while waiting for new writes to the WAL.
# One or more tracking issues related to the change
issues: [19363, 24399, 15277]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:


change_logs: [user]
3 changes: 2 additions & 1 deletion exporter/prometheusremotewriteexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.22.0

require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/fsnotify/fsnotify v1.8.0
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.119.0
Expand Down Expand Up @@ -37,6 +36,8 @@ require (
go.uber.org/zap v1.27.0
)

require github.com/fsnotify/fsnotify v1.8.0 // indirect

require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
Expand Down
87 changes: 21 additions & 66 deletions exporter/prometheusremotewriteexporter/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sync/atomic"
"time"

"github.com/fsnotify/fsnotify"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/prometheus/prompb"
"github.com/tidwall/wal"
Expand All @@ -30,6 +29,7 @@ type prweWAL struct {

stopOnce sync.Once
stopChan chan struct{}
rNotify chan struct{}
rWALIndex *atomic.Uint64
wWALIndex *atomic.Uint64
}
Expand Down Expand Up @@ -70,6 +70,7 @@ func newWAL(walConfig *WALConfig, exportSink func(context.Context, []*prompb.Wri
exportSink: exportSink,
walConfig: walConfig,
stopChan: make(chan struct{}),
rNotify: make(chan struct{}),
rWALIndex: &atomic.Uint64{},
wWALIndex: &atomic.Uint64{},
}
Expand Down Expand Up @@ -315,13 +316,15 @@ func (prwe *prweWAL) persistToWAL(requests []*prompb.WriteRequest) error {
batch.Write(wIndex, protoBlob)
}

// Notify reader go routine that is possibly waiting for writes.
select {
case prwe.rNotify <- struct{}{}:
default:
}
return prwe.wal.WriteBatch(batch)
}

func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq *prompb.WriteRequest, err error) {
prwe.mu.Lock()
defer prwe.mu.Unlock()

var protoBlob []byte
for i := 0; i < 12; i++ {
// Firstly check if we've been terminated, then exit if so.
Expand All @@ -337,10 +340,10 @@ func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq
index = 1
}

prwe.mu.Lock()
if prwe.wal == nil {
return nil, fmt.Errorf("attempt to read from closed WAL")
}

protoBlob, err = prwe.wal.Read(index)
if err == nil { // The read succeeded.
req := new(prompb.WriteRequest)
Expand All @@ -351,74 +354,26 @@ func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq
// Now increment the WAL's read index.
prwe.rWALIndex.Add(1)

prwe.mu.Unlock()
return req, nil
}
prwe.mu.Unlock()

if !errors.Is(err, wal.ErrNotFound) {
return nil, err
}

if index <= 1 {
// This could be the very first attempted read, so try again, after a small sleep.
time.Sleep(time.Duration(1<<i) * time.Millisecond)
continue
}

// Otherwise, we couldn't find the record, let's try watching
// the WAL file until perhaps there is a write to it.
walWatcher, werr := fsnotify.NewWatcher()
if werr != nil {
return nil, werr
}
if werr = walWatcher.Add(prwe.walPath); werr != nil {
return nil, werr
}

// Watch until perhaps there is a write to the WAL file.
watchCh := make(chan error)
wErr := err
go func() {
defer func() {
watchCh <- wErr
close(watchCh)
// Close the file watcher.
walWatcher.Close()
}()

// If WAL was empty, let's wait for a notification from
// the writer go routine.
if errors.Is(err, wal.ErrNotFound) {
select {
case <-ctx.Done(): // If the context was cancelled, bail out ASAP.
wErr = ctx.Err()
return

case event, ok := <-walWatcher.Events:
if !ok {
return
}
switch event.Op {
case fsnotify.Remove:
// The file got deleted.
// TODO: Add capabilities to search for the updated file.
case fsnotify.Rename:
// Renamed, we don't have information about the renamed file's new name.
case fsnotify.Write:
// Finally a write, let's try reading again, but after some watch.
wErr = nil
}

case eerr, ok := <-walWatcher.Errors:
if ok {
wErr = eerr
}
case <-prwe.rNotify:
case <-ctx.Done():
return nil, ctx.Err()
case <-prwe.stopChan:
return nil, fmt.Errorf("attempt to read from WAL after stopped")
}
}()

if gerr := <-watchCh; gerr != nil {
return nil, gerr
}

// Otherwise a write occurred might have occurred,
// and we can sleep for a little bit then try again.
time.Sleep(time.Duration(1<<i) * time.Millisecond)
if !errors.Is(err, wal.ErrNotFound) {
return nil, err
}
}
return nil, err
}
66 changes: 66 additions & 0 deletions exporter/prometheusremotewriteexporter/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,22 @@ package prometheusremotewriteexporter

import (
"context"
"io"
"net/http"
"net/http/httptest"
"sort"
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/exporter/exportertest"
)

func doNothingExportSink(_ context.Context, reqL []*prompb.WriteRequest) error {
Expand Down Expand Up @@ -149,3 +158,60 @@ func TestWAL_persist(t *testing.T) {
require.Equal(t, reqLFromWAL[0], reqL[0])
require.Equal(t, reqLFromWAL[1], reqL[1])
}

func TestExportWithWALEnabled(t *testing.T) {
cfg := &Config{
WAL: &WALConfig{
Directory: t.TempDir(),
},
TargetInfo: &TargetInfo{}, // Declared just to avoid nil pointer dereference.
CreatedMetric: &CreatedMetric{}, // Declared just to avoid nil pointer dereference.
}
buildInfo := component.BuildInfo{
Description: "OpenTelemetry Collector",
Version: "1.0",
}
set := exportertest.NewNopSettings()
set.BuildInfo = buildInfo

server := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
assert.NoError(t, err)
assert.NotNil(t, body)
// Receives the http requests and unzip, unmarshalls, and extracts TimeSeries
writeReq := &prompb.WriteRequest{}
var unzipped []byte

dest, err := snappy.Decode(unzipped, body)
assert.NoError(t, err)

ok := proto.Unmarshal(dest, writeReq)
assert.NoError(t, ok)

assert.Len(t, writeReq.Timeseries, 1)
}))
clientConfig := confighttp.NewDefaultClientConfig()
clientConfig.Endpoint = server.URL
cfg.ClientConfig = clientConfig

prwe, err := newPRWExporter(cfg, set)
assert.NoError(t, err)
assert.NotNil(t, prwe)
err = prwe.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
assert.NotNil(t, prwe.client)

metrics := map[string]*prompb.TimeSeries{
"test_metric": {
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Samples: []prompb.Sample{{Value: 1, Timestamp: 100}},
},
}
err = prwe.handleExport(context.Background(), metrics, nil)
assert.NoError(t, err)

// While on Unix systems, t.TempDir() would easily close the WAL files,
// on Windows, it doesn't. So we need to close it manually to avoid flaky tests.
err = prwe.Shutdown(context.Background())
assert.NoError(t, err)
}

0 comments on commit 7f581ca

Please sign in to comment.