From 805aa530f70c9b0ba3dfde9afb49c2db40e6dbb2 Mon Sep 17 00:00:00 2001 From: Eric L <100242256+splunkericl@users.noreply.github.com> Date: Fri, 6 Sep 2024 13:03:00 -0700 Subject: [PATCH] return queue is full error if sized_channel is full (#11063) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### Description This change fixes a potential deadlock bug for persistent queue. There is a race condition in persistent queue that caused `used` in `sizedChannel` to become out of sync with `ch` len. This causes `Offer` to be deadlocked in specific race condition. For example: 1. Multiple consumers are calling Consume 2. Multiple producers are calling Offer to insert into the queue a. All elements are taken from consumers. ch is empty 3. One consumer completes consume, calls onProcessingFinished a. Inside sizedChannel, syncSize is invoked, used is reset to 0 when other consumers are still waiting for lock to consume 4. More Offer is called inserting elements -> used and ch len should equal 5. As step 3a consumers completes, used is decreased -> used is lower than ch len a. More Offer is called inserting since used is below capacity. however, ch is full. b. goroutine calling offer is holding the mutex but can’t release it as ch is full. c. no consumer can acquire mutex to complete previous onProcessingFinished This change returns an error if channel is full instead of waiting for it to unblock. #### Link to tracking issue Fixes # https://github.com/open-telemetry/opentelemetry-collector/issues/11015 #### Testing - Added concurrent test in persistent queue that can reproduce the problem(note: need to re-run it 100 times as the race condition is not consistent). - Added unit test for sizedChannel #### Documentation Added comment in the block explaining it --------- Co-authored-by: Dmitrii Anoshin --- .chloggen/fix-persistent-queue-deadlock.yaml | 25 +++++++ exporter/internal/queue/mock_storage.go | 21 ++++-- .../internal/queue/persistent_queue_test.go | 65 +++++++++++++++++++ exporter/internal/queue/sized_channel.go | 12 +++- exporter/internal/queue/sized_channel_test.go | 10 +++ 5 files changed, 127 insertions(+), 6 deletions(-) create mode 100644 .chloggen/fix-persistent-queue-deadlock.yaml diff --git a/.chloggen/fix-persistent-queue-deadlock.yaml b/.chloggen/fix-persistent-queue-deadlock.yaml new file mode 100644 index 00000000000..141a8ab9dbf --- /dev/null +++ b/.chloggen/fix-persistent-queue-deadlock.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'bug_fix' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: 'exporterqueue' + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix a bug in persistent queue that Offer can becomes deadlocked when queue is almost full + +# One or more tracking issues or pull requests related to the change +issues: [11015] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/internal/queue/mock_storage.go b/exporter/internal/queue/mock_storage.go index 6ef9810529b..26e2ae994e0 100644 --- a/exporter/internal/queue/mock_storage.go +++ b/exporter/internal/queue/mock_storage.go @@ -10,6 +10,7 @@ import ( "sync" "sync/atomic" "syscall" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension/experimental/storage" @@ -20,22 +21,31 @@ type mockStorageExtension struct { component.ShutdownFunc st sync.Map getClientError error + executionDelay time.Duration } func (m *mockStorageExtension) GetClient(context.Context, component.Kind, component.ID, string) (storage.Client, error) { if m.getClientError != nil { return nil, m.getClientError } - return &mockStorageClient{st: &m.st, closed: &atomic.Bool{}}, nil + return &mockStorageClient{st: &m.st, closed: &atomic.Bool{}, executionDelay: m.executionDelay}, nil } func NewMockStorageExtension(getClientError error) storage.Extension { - return &mockStorageExtension{getClientError: getClientError} + return NewMockStorageExtensionWithDelay(getClientError, 0) +} + +func NewMockStorageExtensionWithDelay(getClientError error, executionDelay time.Duration) storage.Extension { + return &mockStorageExtension{ + getClientError: getClientError, + executionDelay: executionDelay, + } } type mockStorageClient struct { - st *sync.Map - closed *atomic.Bool + st *sync.Map + closed *atomic.Bool + executionDelay time.Duration // simulate real storage client delay } func (m *mockStorageClient) Get(ctx context.Context, s string) ([]byte, error) { @@ -61,6 +71,9 @@ func (m *mockStorageClient) Batch(_ context.Context, ops ...storage.Operation) e if m.isClosed() { panic("client already closed") } + if m.executionDelay != 0 { + time.Sleep(m.executionDelay) + } for _, op := range ops { switch op.Type { case storage.Get: diff --git a/exporter/internal/queue/persistent_queue_test.go b/exporter/internal/queue/persistent_queue_test.go index f226e35c430..a6a7b8b974e 100644 --- a/exporter/internal/queue/persistent_queue_test.go +++ b/exporter/internal/queue/persistent_queue_test.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "strconv" + "sync" "sync/atomic" "syscall" "testing" @@ -531,6 +532,70 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) { require.Equal(t, 6, newPs.Size()) } +func TestPersistentQueueStartWithNonDispatchedConcurrent(t *testing.T) { + req := newTracesRequest(1, 1) + + ext := NewMockStorageExtensionWithDelay(nil, 20*time.Nanosecond) + pq := createTestPersistentQueueWithItemsCapacity(t, ext, 25) + + proWg := sync.WaitGroup{} + // Sending small amount of data as windows test can't handle the test fast enough + for j := 0; j < 5; j++ { + proWg.Add(1) + go func() { + defer proWg.Done() + // Put in items up to capacity + for i := 0; i < 10; i++ { + for { + // retry infinitely so the exact amount of items are added to the queue eventually + if err := pq.Offer(context.Background(), req); err == nil { + break + } + time.Sleep(50 * time.Nanosecond) + } + } + }() + } + + conWg := sync.WaitGroup{} + for j := 0; j < 5; j++ { + conWg.Add(1) + go func() { + defer conWg.Done() + for i := 0; i < 10; i++ { + require.True(t, pq.Consume(func(context.Context, tracesRequest) error { return nil })) + } + }() + } + + conDone := make(chan struct{}) + go func() { + defer close(conDone) + conWg.Wait() + }() + + proDone := make(chan struct{}) + go func() { + defer close(proDone) + proWg.Wait() + }() + + doneCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + select { + case <-conDone: + case <-doneCtx.Done(): + assert.Fail(t, "timed out waiting for consumers to complete") + } + + select { + case <-proDone: + case <-doneCtx.Done(): + assert.Fail(t, "timed out waiting for producers to complete") + } + assert.Zero(t, pq.sizedChannel.Size()) +} + func TestPersistentQueue_PutCloseReadClose(t *testing.T) { req := newTracesRequest(5, 10) ext := NewMockStorageExtension(nil) diff --git a/exporter/internal/queue/sized_channel.go b/exporter/internal/queue/sized_channel.go index 1702a38ac2f..f322e58c01c 100644 --- a/exporter/internal/queue/sized_channel.go +++ b/exporter/internal/queue/sized_channel.go @@ -55,8 +55,16 @@ func (vcq *sizedChannel[T]) push(el T, size int64, callback func() error) error return err } } - vcq.ch <- el - return nil + + select { + // for persistent queue implementation, channel len can be out of sync with used size. Attempt to put it + // into the channel. If it is full, simply returns ErrQueueIsFull error. This prevents potential deadlock issues. + case vcq.ch <- el: + return nil + default: + vcq.used.Add(-size) + return ErrQueueIsFull + } } // pop removes the element from the queue and returns it. diff --git a/exporter/internal/queue/sized_channel_test.go b/exporter/internal/queue/sized_channel_test.go index 02cd4bf8e68..8d25510ff63 100644 --- a/exporter/internal/queue/sized_channel_test.go +++ b/exporter/internal/queue/sized_channel_test.go @@ -42,3 +42,13 @@ func TestSizedCapacityChannel(t *testing.T) { assert.False(t, ok) assert.Equal(t, 0, el) } + +func TestSizedCapacityChannel_Offer_sizedNotFullButChannelFull(t *testing.T) { + q := newSizedChannel[int](1, nil, 0) + assert.NoError(t, q.push(1, 1, nil)) + + q.used.Store(0) + err := q.push(1, 1, nil) + assert.Error(t, err) + assert.Equal(t, ErrQueueIsFull, err) +}