From 560bd1d3d6ff19ba0839f205d8a5197bcc38e5ed Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Tue, 14 Nov 2023 13:54:59 -0800 Subject: [PATCH] Fix shutdown logic in persistent queue to not require consumers to be closed first (#8899) Signed-off-by: Bogdan Drutu --- .chloggen/fixshutdown.yaml | 13 +++++ .../exporterhelper/internal/mock_storage.go | 37 ++++++------ .../internal/persistent_queue.go | 11 +--- .../internal/persistent_queue_test.go | 56 +++++-------------- .../internal/persistent_storage.go | 34 +++++++++-- .../internal/persistent_storage_test.go | 28 ++++++---- exporter/exporterhelper/queue_sender_test.go | 4 +- 7 files changed, 96 insertions(+), 87 deletions(-) create mode 100755 .chloggen/fixshutdown.yaml diff --git a/.chloggen/fixshutdown.yaml b/.chloggen/fixshutdown.yaml new file mode 100755 index 00000000000..8b016577997 --- /dev/null +++ b/.chloggen/fixshutdown.yaml @@ -0,0 +1,13 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Fix shutdown logic in persistent queue to not require consumers to be closed first" + +# One or more tracking issues or pull requests related to the change +issues: [8899] diff --git a/exporter/exporterhelper/internal/mock_storage.go b/exporter/exporterhelper/internal/mock_storage.go index ebde592f788..8639c183af4 100644 --- a/exporter/exporterhelper/internal/mock_storage.go +++ b/exporter/exporterhelper/internal/mock_storage.go @@ -7,6 +7,7 @@ import ( "context" "errors" "sync" + "sync/atomic" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension/experimental/storage" @@ -23,7 +24,7 @@ func (m *mockStorageExtension) GetClient(_ context.Context, _ component.Kind, _ if m.getClientError != nil { return nil, m.getClientError } - return &mockStorageClient{st: &m.st}, nil + return &mockStorageClient{st: &m.st, closed: &atomic.Bool{}}, nil } func NewMockStorageExtension(getClientError error) storage.Extension { @@ -31,35 +32,33 @@ func NewMockStorageExtension(getClientError error) storage.Extension { } type mockStorageClient struct { - st *sync.Map - closeCounter uint64 + st *sync.Map + closed *atomic.Bool } -func (m *mockStorageClient) Get(_ context.Context, s string) ([]byte, error) { - val, found := m.st.Load(s) - if !found { - return nil, nil - } - - return val.([]byte), nil +func (m *mockStorageClient) Get(ctx context.Context, s string) ([]byte, error) { + getOp := storage.GetOperation(s) + err := m.Batch(ctx, getOp) + return getOp.Value, err } -func (m *mockStorageClient) Set(_ context.Context, s string, bytes []byte) error { - m.st.Store(s, bytes) - return nil +func (m *mockStorageClient) Set(ctx context.Context, s string, bytes []byte) error { + return m.Batch(ctx, storage.SetOperation(s, bytes)) } -func (m *mockStorageClient) Delete(_ context.Context, s string) error { - m.st.Delete(s) - return nil +func (m *mockStorageClient) Delete(ctx context.Context, s string) error { + return m.Batch(ctx, storage.DeleteOperation(s)) } func (m *mockStorageClient) Close(_ context.Context) error { - m.closeCounter++ + m.closed.Store(true) return nil } func (m *mockStorageClient) Batch(_ context.Context, ops ...storage.Operation) error { + if m.isClosed() { + panic("client already closed") + } for _, op := range ops { switch op.Type { case storage.Get: @@ -80,6 +79,6 @@ func (m *mockStorageClient) Batch(_ context.Context, ops ...storage.Operation) e return nil } -func (m *mockStorageClient) getCloseCount() uint64 { - return m.closeCounter +func (m *mockStorageClient) isClosed() bool { + return m.closed.Load() } diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index c9a1b122028..0dd9e5a2850 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -14,13 +14,6 @@ import ( ) var ( - // Monkey patching for unit test - stopStorage = func(client storage.Client, ctx context.Context) error { - if client == nil { - return nil - } - return client.Close(ctx) - } errNoStorageClient = errors.New("no storage client extension found") errWrongExtensionType = errors.New("requested extension is not a storage extension") ) @@ -70,9 +63,9 @@ func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set Q // Shutdown stops accepting items, shuts down the queue and closes the persistent queue func (pq *persistentQueue) Shutdown(ctx context.Context) error { - close(pq.persistentContiguousStorage.stopChan) + err := pq.persistentContiguousStorage.Shutdown(ctx) pq.stopWG.Wait() - return stopStorage(pq.persistentContiguousStorage.client, ctx) + return err } func toStorageClient(ctx context.Context, storageID component.ID, host component.Host, ownerID component.ID, signal component.DataType) (storage.Client, error) { diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index 022a46125ac..09bb29a0c0d 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -39,28 +39,26 @@ func createTestQueue(t *testing.T, capacity, numConsumers int, set QueueSettings host := &mockHost{ext: map[component.ID]component.Component{ {}: NewMockStorageExtension(nil), }} - err := pq.Start(context.Background(), host, set) - require.NoError(t, err) + require.NoError(t, pq.Start(context.Background(), host, set)) return pq } -func TestPersistentQueue_Capacity(t *testing.T) { - pq := NewPersistentQueue(5, 1, component.ID{}, newFakeTracesRequestMarshalerFunc(), - newFakeTracesRequestUnmarshalerFunc(), exportertest.NewNopCreateSettings()) - host := &mockHost{ext: map[component.ID]component.Component{ - {}: NewMockStorageExtension(nil), - }} - err := pq.Start(context.Background(), host, newNopQueueSettings()) - require.NoError(t, err) - - // Stop consumer to imitate queue overflow - close(pq.(*persistentQueue).persistentContiguousStorage.stopChan) - pq.(*persistentQueue).stopWG.Wait() - +func TestPersistentQueue_FullCapacity(t *testing.T) { + start := make(chan struct{}) + done := make(chan struct{}) + pq := createTestQueue(t, 5, 1, newQueueSettings(func(item QueueRequest) { + start <- struct{}{} + <-done + item.OnProcessingFinished() + })) assert.Equal(t, 0, pq.Size()) req := newFakeTracesRequest(newTraces(1, 10)) + // First request is picked by the consumer. Wait until the consumer is blocked on done. + assert.NoError(t, pq.Offer(context.Background(), req)) + <-start + for i := 0; i < 10; i++ { result := pq.Offer(context.Background(), req) if i < 5 { @@ -70,7 +68,8 @@ func TestPersistentQueue_Capacity(t *testing.T) { } } assert.Equal(t, 5, pq.Size()) - assert.NoError(t, stopStorage(pq.(*persistentQueue).persistentContiguousStorage.client, context.Background())) + close(done) + assert.NoError(t, pq.Shutdown(context.Background())) } func TestPersistentQueueShutdown(t *testing.T) { @@ -83,31 +82,6 @@ func TestPersistentQueueShutdown(t *testing.T) { assert.NoError(t, pq.Shutdown(context.Background())) } -// Verify storage closes after queue consumers. If not in this order, successfully consumed items won't be updated in storage -func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) { - pq := createTestQueue(t, 1001, 1, newNopQueueSettings()) - - lastRequestProcessedTime := time.Now() - req := newFakeTracesRequest(newTraces(1, 10)) - req.processingFinishedCallback = func() { - lastRequestProcessedTime = time.Now() - } - - fnBefore := stopStorage - stopStorageTime := time.Now() - stopStorage = func(storage storage.Client, ctx context.Context) error { - stopStorageTime = time.Now() - return storage.Close(ctx) - } - - for i := 0; i < 1000; i++ { - assert.NoError(t, pq.Offer(context.Background(), req)) - } - assert.NoError(t, pq.Shutdown(context.Background())) - assert.True(t, stopStorageTime.After(lastRequestProcessedTime), "storage stop time should be after last request processed time") - stopStorage = fnBefore -} - func TestPersistentQueue_ConsumersProducers(t *testing.T) { cases := []struct { numMessagesProduced int diff --git a/exporter/exporterhelper/internal/persistent_storage.go b/exporter/exporterhelper/internal/persistent_storage.go index 1639a2a1cb8..94220a4c0ed 100644 --- a/exporter/exporterhelper/internal/persistent_storage.go +++ b/exporter/exporterhelper/internal/persistent_storage.go @@ -53,6 +53,7 @@ type persistentContiguousStorage struct { readIndex itemIndex writeIndex itemIndex currentlyDispatchedItems []itemIndex + refClient int64 } type itemIndex uint64 @@ -88,6 +89,7 @@ func newPersistentContiguousStorage( func (pcs *persistentContiguousStorage) start(ctx context.Context, client storage.Client) { pcs.client = client + pcs.refClient = 1 pcs.initPersistentContiguousStorage(ctx) // Make sure the leftover requests are handled pcs.retrieveAndEnqueueNotDispatchedReqs(ctx) @@ -154,9 +156,22 @@ func (pcs *persistentContiguousStorage) Capacity() int { return int(pcs.capacity) } -func (pcs *persistentContiguousStorage) stop(ctx context.Context) error { - pcs.logger.Debug("Stopping persistentContiguousStorage") - return pcs.client.Close(ctx) +func (pcs *persistentContiguousStorage) Shutdown(ctx context.Context) error { + close(pcs.stopChan) + // Hold the lock only for `refClient`. + pcs.mu.Lock() + defer pcs.mu.Unlock() + return pcs.unrefClient(ctx) +} + +// unrefClient unrefs the client, and closes if no more references. Callers MUST hold the mutex. +// This is needed because consumers of the queue may still process the requests while the queue is shutting down or immediately after. +func (pcs *persistentContiguousStorage) unrefClient(ctx context.Context) error { + pcs.refClient-- + if pcs.refClient == 0 { + return pcs.client.Close(ctx) + } + return nil } // Offer inserts the specified element into this queue if it is possible to do so immediately @@ -202,6 +217,11 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) QueueRe pcs.mu.Lock() defer pcs.mu.Unlock() + // If called in the same time with Shutdown, make sure client is not closed. + if pcs.refClient <= 0 { + return QueueRequest{} + } + if pcs.readIndex == pcs.writeIndex { return QueueRequest{} } @@ -224,7 +244,7 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) QueueRe if err != nil || req.Request == nil { pcs.logger.Debug("Failed to dispatch item", zap.Error(err)) // We need to make sure that currently dispatched items list is cleaned - if err := pcs.itemDispatchingFinish(ctx, index); err != nil { + if err = pcs.itemDispatchingFinish(ctx, index); err != nil { pcs.logger.Error("Error deleting item from queue", zap.Error(err)) } @@ -232,12 +252,16 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) QueueRe } // If all went well so far, cleanup will be handled by callback + pcs.refClient++ req.onProcessingFinishedFunc = func() { pcs.mu.Lock() defer pcs.mu.Unlock() - if err := pcs.itemDispatchingFinish(ctx, index); err != nil { + if err = pcs.itemDispatchingFinish(ctx, index); err != nil { pcs.logger.Error("Error deleting item from queue", zap.Error(err)) } + if err = pcs.unrefClient(ctx); err != nil { + pcs.logger.Error("Error closing the storage client", zap.Error(err)) + } } return req } diff --git a/exporter/exporterhelper/internal/persistent_storage_test.go b/exporter/exporterhelper/internal/persistent_storage_test.go index 56e0a2998f4..31964487777 100644 --- a/exporter/exporterhelper/internal/persistent_storage_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_test.go @@ -145,7 +145,7 @@ func TestPersistentStorage_CorruptedData(t *testing.T) { assert.Equal(t, 3, ps.Size()) _, _ = ps.get() assert.Equal(t, 2, ps.Size()) - assert.NoError(t, ps.stop(context.Background())) + assert.NoError(t, ps.Shutdown(context.Background())) // ... so now we can corrupt data (in several ways) if c.corruptAllData || c.corruptSomeData { @@ -253,7 +253,7 @@ func TestPersistentStorage_StartWithNonDispatched(t *testing.T) { require.NoError(t, ps.Offer(context.Background(), req)) require.Equal(t, 5, ps.Size()) - assert.NoError(t, ps.stop(context.Background())) + assert.NoError(t, ps.Shutdown(context.Background())) // Reload newPs := createTestPersistentStorageWithCapacity(client, 5) @@ -272,7 +272,7 @@ func TestPersistentStorage_PutCloseReadClose(t *testing.T) { assert.Equal(t, 2, ps.Size()) // TODO: Remove this, after the initialization writes the readIndex. _, _ = ps.get() - assert.NoError(t, ps.stop(context.Background())) + assert.NoError(t, ps.Shutdown(context.Background())) newPs := createTestPersistentStorage(createTestClient(t, ext)) require.Equal(t, 2, newPs.Size()) @@ -286,7 +286,7 @@ func TestPersistentStorage_PutCloseReadClose(t *testing.T) { require.True(t, found) require.Equal(t, req.td, readReq.Request.(*fakeTracesRequest).td) require.Equal(t, 0, newPs.Size()) - assert.NoError(t, ps.stop(context.Background())) + assert.NoError(t, newPs.Shutdown(context.Background())) } func TestPersistentStorage_EmptyRequest(t *testing.T) { @@ -399,16 +399,22 @@ func TestItemIndexArrayMarshaling(t *testing.T) { } } -func TestPersistentStorage_StopShouldCloseClient(t *testing.T) { - ext := NewMockStorageExtension(nil) - client := createTestClient(t, ext) +func TestPersistentStorage_ShutdownWhileConsuming(t *testing.T) { + client := createTestClient(t, NewMockStorageExtension(nil)) ps := createTestPersistentStorage(client) - assert.NoError(t, ps.stop(context.Background())) + assert.Equal(t, 0, ps.Size()) + assert.False(t, client.(*mockStorageClient).isClosed()) + + assert.NoError(t, ps.Offer(context.Background(), newFakeTracesRequest(newTraces(5, 10)))) - castedClient, ok := client.(*mockStorageClient) - require.True(t, ok, "expected client to be mockStorageClient") - require.Equal(t, uint64(1), castedClient.getCloseCount()) + req, ok := ps.get() + require.True(t, ok) + assert.False(t, client.(*mockStorageClient).isClosed()) + assert.NoError(t, ps.Shutdown(context.Background())) + assert.False(t, client.(*mockStorageClient).isClosed()) + req.OnProcessingFinished() + assert.True(t, client.(*mockStorageClient).isClosed()) } func TestPersistentStorage_StorageFull(t *testing.T) { diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index 6c829c22b24..f880e49cc7f 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -357,8 +357,6 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), &mockHost{})) - // wraps original queue so we can count operations be.queueSender.(*queueSender).queue = &producerConsumerQueueWithCounter{ Queue: be.queueSender.(*queueSender).queue, @@ -366,6 +364,8 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { } be.queueSender.(*queueSender).requeuingEnabled = true + require.NoError(t, be.Start(context.Background(), &mockHost{})) + // Invoke queuedRetrySender so the producer will put the item for consumer to poll require.NoError(t, be.send(context.Background(), newErrorRequest()))