Skip to content

Commit

Permalink
Fix shutdown logic in persistent queue to not require consumers to be…
Browse files Browse the repository at this point in the history
… closed first (#8899)

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Nov 14, 2023
1 parent 228509c commit 560bd1d
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 87 deletions.
13 changes: 13 additions & 0 deletions .chloggen/fixshutdown.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Fix shutdown logic in persistent queue to not require consumers to be closed first"

# One or more tracking issues or pull requests related to the change
issues: [8899]
37 changes: 18 additions & 19 deletions exporter/exporterhelper/internal/mock_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"sync"
"sync/atomic"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/experimental/storage"
Expand All @@ -23,43 +24,41 @@ func (m *mockStorageExtension) GetClient(_ context.Context, _ component.Kind, _
if m.getClientError != nil {
return nil, m.getClientError
}
return &mockStorageClient{st: &m.st}, nil
return &mockStorageClient{st: &m.st, closed: &atomic.Bool{}}, nil
}

func NewMockStorageExtension(getClientError error) storage.Extension {
return &mockStorageExtension{getClientError: getClientError}
}

type mockStorageClient struct {
st *sync.Map
closeCounter uint64
st *sync.Map
closed *atomic.Bool
}

func (m *mockStorageClient) Get(_ context.Context, s string) ([]byte, error) {
val, found := m.st.Load(s)
if !found {
return nil, nil
}

return val.([]byte), nil
func (m *mockStorageClient) Get(ctx context.Context, s string) ([]byte, error) {
getOp := storage.GetOperation(s)
err := m.Batch(ctx, getOp)
return getOp.Value, err
}

func (m *mockStorageClient) Set(_ context.Context, s string, bytes []byte) error {
m.st.Store(s, bytes)
return nil
func (m *mockStorageClient) Set(ctx context.Context, s string, bytes []byte) error {
return m.Batch(ctx, storage.SetOperation(s, bytes))
}

func (m *mockStorageClient) Delete(_ context.Context, s string) error {
m.st.Delete(s)
return nil
func (m *mockStorageClient) Delete(ctx context.Context, s string) error {
return m.Batch(ctx, storage.DeleteOperation(s))
}

func (m *mockStorageClient) Close(_ context.Context) error {
m.closeCounter++
m.closed.Store(true)
return nil
}

func (m *mockStorageClient) Batch(_ context.Context, ops ...storage.Operation) error {
if m.isClosed() {
panic("client already closed")
}
for _, op := range ops {
switch op.Type {
case storage.Get:
Expand All @@ -80,6 +79,6 @@ func (m *mockStorageClient) Batch(_ context.Context, ops ...storage.Operation) e
return nil
}

func (m *mockStorageClient) getCloseCount() uint64 {
return m.closeCounter
func (m *mockStorageClient) isClosed() bool {
return m.closed.Load()
}
11 changes: 2 additions & 9 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@ import (
)

var (
// Monkey patching for unit test
stopStorage = func(client storage.Client, ctx context.Context) error {
if client == nil {
return nil
}
return client.Close(ctx)
}
errNoStorageClient = errors.New("no storage client extension found")
errWrongExtensionType = errors.New("requested extension is not a storage extension")
)
Expand Down Expand Up @@ -70,9 +63,9 @@ func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set Q

// Shutdown stops accepting items, shuts down the queue and closes the persistent queue
func (pq *persistentQueue) Shutdown(ctx context.Context) error {
close(pq.persistentContiguousStorage.stopChan)
err := pq.persistentContiguousStorage.Shutdown(ctx)
pq.stopWG.Wait()
return stopStorage(pq.persistentContiguousStorage.client, ctx)
return err
}

func toStorageClient(ctx context.Context, storageID component.ID, host component.Host, ownerID component.ID, signal component.DataType) (storage.Client, error) {
Expand Down
56 changes: 15 additions & 41 deletions exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,26 @@ func createTestQueue(t *testing.T, capacity, numConsumers int, set QueueSettings
host := &mockHost{ext: map[component.ID]component.Component{
{}: NewMockStorageExtension(nil),
}}
err := pq.Start(context.Background(), host, set)
require.NoError(t, err)
require.NoError(t, pq.Start(context.Background(), host, set))
return pq
}

func TestPersistentQueue_Capacity(t *testing.T) {
pq := NewPersistentQueue(5, 1, component.ID{}, newFakeTracesRequestMarshalerFunc(),
newFakeTracesRequestUnmarshalerFunc(), exportertest.NewNopCreateSettings())
host := &mockHost{ext: map[component.ID]component.Component{
{}: NewMockStorageExtension(nil),
}}
err := pq.Start(context.Background(), host, newNopQueueSettings())
require.NoError(t, err)

// Stop consumer to imitate queue overflow
close(pq.(*persistentQueue).persistentContiguousStorage.stopChan)
pq.(*persistentQueue).stopWG.Wait()

func TestPersistentQueue_FullCapacity(t *testing.T) {
start := make(chan struct{})
done := make(chan struct{})
pq := createTestQueue(t, 5, 1, newQueueSettings(func(item QueueRequest) {
start <- struct{}{}
<-done
item.OnProcessingFinished()
}))
assert.Equal(t, 0, pq.Size())

req := newFakeTracesRequest(newTraces(1, 10))

// First request is picked by the consumer. Wait until the consumer is blocked on done.
assert.NoError(t, pq.Offer(context.Background(), req))
<-start

for i := 0; i < 10; i++ {
result := pq.Offer(context.Background(), req)
if i < 5 {
Expand All @@ -70,7 +68,8 @@ func TestPersistentQueue_Capacity(t *testing.T) {
}
}
assert.Equal(t, 5, pq.Size())
assert.NoError(t, stopStorage(pq.(*persistentQueue).persistentContiguousStorage.client, context.Background()))
close(done)
assert.NoError(t, pq.Shutdown(context.Background()))
}

func TestPersistentQueueShutdown(t *testing.T) {
Expand All @@ -83,31 +82,6 @@ func TestPersistentQueueShutdown(t *testing.T) {
assert.NoError(t, pq.Shutdown(context.Background()))
}

// Verify storage closes after queue consumers. If not in this order, successfully consumed items won't be updated in storage
func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) {
pq := createTestQueue(t, 1001, 1, newNopQueueSettings())

lastRequestProcessedTime := time.Now()
req := newFakeTracesRequest(newTraces(1, 10))
req.processingFinishedCallback = func() {
lastRequestProcessedTime = time.Now()
}

fnBefore := stopStorage
stopStorageTime := time.Now()
stopStorage = func(storage storage.Client, ctx context.Context) error {
stopStorageTime = time.Now()
return storage.Close(ctx)
}

for i := 0; i < 1000; i++ {
assert.NoError(t, pq.Offer(context.Background(), req))
}
assert.NoError(t, pq.Shutdown(context.Background()))
assert.True(t, stopStorageTime.After(lastRequestProcessedTime), "storage stop time should be after last request processed time")
stopStorage = fnBefore
}

func TestPersistentQueue_ConsumersProducers(t *testing.T) {
cases := []struct {
numMessagesProduced int
Expand Down
34 changes: 29 additions & 5 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type persistentContiguousStorage struct {
readIndex itemIndex
writeIndex itemIndex
currentlyDispatchedItems []itemIndex
refClient int64
}

type itemIndex uint64
Expand Down Expand Up @@ -88,6 +89,7 @@ func newPersistentContiguousStorage(

func (pcs *persistentContiguousStorage) start(ctx context.Context, client storage.Client) {
pcs.client = client
pcs.refClient = 1
pcs.initPersistentContiguousStorage(ctx)
// Make sure the leftover requests are handled
pcs.retrieveAndEnqueueNotDispatchedReqs(ctx)
Expand Down Expand Up @@ -154,9 +156,22 @@ func (pcs *persistentContiguousStorage) Capacity() int {
return int(pcs.capacity)
}

func (pcs *persistentContiguousStorage) stop(ctx context.Context) error {
pcs.logger.Debug("Stopping persistentContiguousStorage")
return pcs.client.Close(ctx)
func (pcs *persistentContiguousStorage) Shutdown(ctx context.Context) error {
close(pcs.stopChan)
// Hold the lock only for `refClient`.
pcs.mu.Lock()
defer pcs.mu.Unlock()
return pcs.unrefClient(ctx)
}

// unrefClient unrefs the client, and closes if no more references. Callers MUST hold the mutex.
// This is needed because consumers of the queue may still process the requests while the queue is shutting down or immediately after.
func (pcs *persistentContiguousStorage) unrefClient(ctx context.Context) error {
pcs.refClient--
if pcs.refClient == 0 {
return pcs.client.Close(ctx)
}
return nil
}

// Offer inserts the specified element into this queue if it is possible to do so immediately
Expand Down Expand Up @@ -202,6 +217,11 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) QueueRe
pcs.mu.Lock()
defer pcs.mu.Unlock()

// If called in the same time with Shutdown, make sure client is not closed.
if pcs.refClient <= 0 {
return QueueRequest{}
}

if pcs.readIndex == pcs.writeIndex {
return QueueRequest{}
}
Expand All @@ -224,20 +244,24 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) QueueRe
if err != nil || req.Request == nil {
pcs.logger.Debug("Failed to dispatch item", zap.Error(err))
// We need to make sure that currently dispatched items list is cleaned
if err := pcs.itemDispatchingFinish(ctx, index); err != nil {
if err = pcs.itemDispatchingFinish(ctx, index); err != nil {
pcs.logger.Error("Error deleting item from queue", zap.Error(err))
}

return QueueRequest{}
}

// If all went well so far, cleanup will be handled by callback
pcs.refClient++
req.onProcessingFinishedFunc = func() {
pcs.mu.Lock()
defer pcs.mu.Unlock()
if err := pcs.itemDispatchingFinish(ctx, index); err != nil {
if err = pcs.itemDispatchingFinish(ctx, index); err != nil {
pcs.logger.Error("Error deleting item from queue", zap.Error(err))
}
if err = pcs.unrefClient(ctx); err != nil {
pcs.logger.Error("Error closing the storage client", zap.Error(err))
}
}
return req
}
Expand Down
28 changes: 17 additions & 11 deletions exporter/exporterhelper/internal/persistent_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestPersistentStorage_CorruptedData(t *testing.T) {
assert.Equal(t, 3, ps.Size())
_, _ = ps.get()
assert.Equal(t, 2, ps.Size())
assert.NoError(t, ps.stop(context.Background()))
assert.NoError(t, ps.Shutdown(context.Background()))

// ... so now we can corrupt data (in several ways)
if c.corruptAllData || c.corruptSomeData {
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestPersistentStorage_StartWithNonDispatched(t *testing.T) {
require.NoError(t, ps.Offer(context.Background(), req))

require.Equal(t, 5, ps.Size())
assert.NoError(t, ps.stop(context.Background()))
assert.NoError(t, ps.Shutdown(context.Background()))

// Reload
newPs := createTestPersistentStorageWithCapacity(client, 5)
Expand All @@ -272,7 +272,7 @@ func TestPersistentStorage_PutCloseReadClose(t *testing.T) {
assert.Equal(t, 2, ps.Size())
// TODO: Remove this, after the initialization writes the readIndex.
_, _ = ps.get()
assert.NoError(t, ps.stop(context.Background()))
assert.NoError(t, ps.Shutdown(context.Background()))

newPs := createTestPersistentStorage(createTestClient(t, ext))
require.Equal(t, 2, newPs.Size())
Expand All @@ -286,7 +286,7 @@ func TestPersistentStorage_PutCloseReadClose(t *testing.T) {
require.True(t, found)
require.Equal(t, req.td, readReq.Request.(*fakeTracesRequest).td)
require.Equal(t, 0, newPs.Size())
assert.NoError(t, ps.stop(context.Background()))
assert.NoError(t, newPs.Shutdown(context.Background()))
}

func TestPersistentStorage_EmptyRequest(t *testing.T) {
Expand Down Expand Up @@ -399,16 +399,22 @@ func TestItemIndexArrayMarshaling(t *testing.T) {
}
}

func TestPersistentStorage_StopShouldCloseClient(t *testing.T) {
ext := NewMockStorageExtension(nil)
client := createTestClient(t, ext)
func TestPersistentStorage_ShutdownWhileConsuming(t *testing.T) {
client := createTestClient(t, NewMockStorageExtension(nil))
ps := createTestPersistentStorage(client)

assert.NoError(t, ps.stop(context.Background()))
assert.Equal(t, 0, ps.Size())
assert.False(t, client.(*mockStorageClient).isClosed())

assert.NoError(t, ps.Offer(context.Background(), newFakeTracesRequest(newTraces(5, 10))))

castedClient, ok := client.(*mockStorageClient)
require.True(t, ok, "expected client to be mockStorageClient")
require.Equal(t, uint64(1), castedClient.getCloseCount())
req, ok := ps.get()
require.True(t, ok)
assert.False(t, client.(*mockStorageClient).isClosed())
assert.NoError(t, ps.Shutdown(context.Background()))
assert.False(t, client.(*mockStorageClient).isClosed())
req.OnProcessingFinished()
assert.True(t, client.(*mockStorageClient).isClosed())
}

func TestPersistentStorage_StorageFull(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,15 +357,15 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) {
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)

require.NoError(t, be.Start(context.Background(), &mockHost{}))

// wraps original queue so we can count operations
be.queueSender.(*queueSender).queue = &producerConsumerQueueWithCounter{
Queue: be.queueSender.(*queueSender).queue,
produceCounter: produceCounter,
}
be.queueSender.(*queueSender).requeuingEnabled = true

require.NoError(t, be.Start(context.Background(), &mockHost{}))

// Invoke queuedRetrySender so the producer will put the item for consumer to poll
require.NoError(t, be.send(context.Background(), newErrorRequest()))

Expand Down

0 comments on commit 560bd1d

Please sign in to comment.