Skip to content

Commit

Permalink
[chore] [exporterhelper] Increase test coverage for persistent queue
Browse files Browse the repository at this point in the history
Add tests covering e2e data delivery through persistent storage
  • Loading branch information
dmitryax committed Aug 21, 2023
1 parent 3dad181 commit e11eb7b
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 80 deletions.
89 changes: 89 additions & 0 deletions exporter/exporterhelper/internal/mock_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
"context"
"errors"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/experimental/storage"
)

type mockStorageExtension struct {
component.StartFunc
component.ShutdownFunc
}

func (m mockStorageExtension) GetClient(_ context.Context, _ component.Kind, _ component.ID, _ string) (storage.Client, error) {
return &mockStorageClient{st: map[string][]byte{}}, nil
}

func NewMockStorageExtension() storage.Extension {
return &mockStorageExtension{}
}

type mockStorageClient struct {
st map[string][]byte
mux sync.Mutex
closeCounter uint64
}

func (m *mockStorageClient) Get(_ context.Context, s string) ([]byte, error) {
m.mux.Lock()
defer m.mux.Unlock()

val, found := m.st[s]
if !found {
return nil, nil
}

return val, nil
}

func (m *mockStorageClient) Set(_ context.Context, s string, bytes []byte) error {
m.mux.Lock()
defer m.mux.Unlock()

m.st[s] = bytes
return nil
}

func (m *mockStorageClient) Delete(_ context.Context, s string) error {
m.mux.Lock()
defer m.mux.Unlock()

delete(m.st, s)
return nil
}

func (m *mockStorageClient) Close(_ context.Context) error {
m.closeCounter++
return nil
}

func (m *mockStorageClient) Batch(_ context.Context, ops ...storage.Operation) error {
m.mux.Lock()
defer m.mux.Unlock()

for _, op := range ops {
switch op.Type {
case storage.Get:
op.Value = m.st[op.Key]
case storage.Set:
m.st[op.Key] = op.Value
case storage.Delete:
delete(m.st, op.Key)
default:
return errors.New("wrong operation type")
}
}

return nil
}

func (m *mockStorageClient) getCloseCount() uint64 {
return m.closeCounter
}
78 changes: 1 addition & 77 deletions exporter/exporterhelper/internal/persistent_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

func createStorageExtension(_ string) storage.Extension {
// After having storage moved to core, we could leverage storagetest.NewTestExtension(nil, path)
return newMockStorageExtension()
return NewMockStorageExtension()
}

func createTestClient(extension storage.Extension) storage.Client {
Expand Down Expand Up @@ -580,82 +580,6 @@ func requireCurrentlyDispatchedItemsEqual(t *testing.T, pcs *persistentContiguou
}, 5*time.Second, 10*time.Millisecond)
}

type mockStorageExtension struct {
component.StartFunc
component.ShutdownFunc
}

func (m mockStorageExtension) GetClient(_ context.Context, _ component.Kind, _ component.ID, _ string) (storage.Client, error) {
return &mockStorageClient{st: map[string][]byte{}}, nil
}

func newMockStorageExtension() storage.Extension {
return &mockStorageExtension{}
}

type mockStorageClient struct {
st map[string][]byte
mux sync.Mutex
closeCounter uint64
}

func (m *mockStorageClient) Get(_ context.Context, s string) ([]byte, error) {
m.mux.Lock()
defer m.mux.Unlock()

val, found := m.st[s]
if !found {
return nil, nil
}

return val, nil
}

func (m *mockStorageClient) Set(_ context.Context, s string, bytes []byte) error {
m.mux.Lock()
defer m.mux.Unlock()

m.st[s] = bytes
return nil
}

func (m *mockStorageClient) Delete(_ context.Context, s string) error {
m.mux.Lock()
defer m.mux.Unlock()

delete(m.st, s)
return nil
}

func (m *mockStorageClient) Close(_ context.Context) error {
m.closeCounter++
return nil
}

func (m *mockStorageClient) Batch(_ context.Context, ops ...storage.Operation) error {
m.mux.Lock()
defer m.mux.Unlock()

for _, op := range ops {
switch op.Type {
case storage.Get:
op.Value = m.st[op.Key]
case storage.Set:
m.st[op.Key] = op.Value
case storage.Delete:
delete(m.st, op.Key)
default:
return errors.New("wrong operation type")
}
}

return nil
}

func (m *mockStorageClient) getCloseCount() uint64 {
return m.closeCounter
}

func newFakeBoundedStorageClient(maxSizeInBytes int) *fakeBoundedStorageClient {
return &fakeBoundedStorageClient{
st: map[string][]byte{},
Expand Down
26 changes: 26 additions & 0 deletions exporter/exporterhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -20,7 +21,9 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/internal/testdata"
Expand Down Expand Up @@ -148,6 +151,29 @@ func TestLogsRequestExporter_Default_ExportError(t *testing.T) {
require.Equal(t, want, le.ConsumeLogs(context.Background(), ld))
}

func TestLogsExporter_WithPersistentQueue(t *testing.T) {
qCfg := NewDefaultQueueSettings()
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID
rCfg := NewDefaultRetrySettings()
ts := consumertest.LogsSink{}
te, err := NewLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeLogsExporterConfig,
ts.ConsumeLogs, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)

host := &mockHost{ext: map[component.ID]component.Component{
storageID: internal.NewMockStorageExtension(),
}}
require.NoError(t, te.Start(context.Background(), host))
t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) })

traces := testdata.GenerateLogs(2)
require.NoError(t, te.ConsumeLogs(context.Background(), traces))
require.Eventually(t, func() bool {
return len(ts.AllLogs()) == 1 && ts.LogRecordCount() == 2
}, 500*time.Millisecond, 10*time.Millisecond)
}

func TestLogsExporter_WithRecordMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName)
require.NoError(t, err)
Expand Down
26 changes: 26 additions & 0 deletions exporter/exporterhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -20,7 +21,9 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/internal/testdata"
Expand Down Expand Up @@ -149,6 +152,29 @@ func TestMetricsRequestExporter_Default_ExportError(t *testing.T) {
require.Equal(t, want, me.ConsumeMetrics(context.Background(), md))
}

func TestMetricsExporter_WithPersistentQueue(t *testing.T) {
qCfg := NewDefaultQueueSettings()
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID
rCfg := NewDefaultRetrySettings()
ms := consumertest.MetricsSink{}
te, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeTracesExporterConfig,
ms.ConsumeMetrics, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)

host := &mockHost{ext: map[component.ID]component.Component{
storageID: internal.NewMockStorageExtension(),
}}
require.NoError(t, te.Start(context.Background(), host))
t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) })

metrics := testdata.GenerateMetrics(2)
require.NoError(t, te.ConsumeMetrics(context.Background(), metrics))
require.Eventually(t, func() bool {
return len(ms.AllMetrics()) == 1 && ms.DataPointCount() == 4
}, 500*time.Millisecond, 10*time.Millisecond)
}

func TestMetricsExporter_WithRecordMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName)
require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/queued_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,15 +357,15 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "")
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})

checkValueForGlobalManager(t, defaultExporterTags, int64(defaultQueueSize), "exporter/queue_capacity")
for i := 0; i < 7; i++ {
require.NoError(t, be.sender.send(newErrorRequest(context.Background())))
}
checkValueForGlobalManager(t, defaultExporterTags, int64(7), "exporter/queue_size")

assert.NoError(t, be.Shutdown(context.Background()))
checkValueForGlobalManager(t, defaultExporterTags, int64(0), "exporter/queue_size")
}

func TestNoCancellationContext(t *testing.T) {
Expand Down
26 changes: 26 additions & 0 deletions exporter/exporterhelper/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -20,7 +21,9 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/internal/testdata"
Expand Down Expand Up @@ -146,6 +149,29 @@ func TestTracesRequestExporter_Default_ExportError(t *testing.T) {
require.Equal(t, want, te.ConsumeTraces(context.Background(), td))
}

func TestTracesExporter_WithPersistentQueue(t *testing.T) {
qCfg := NewDefaultQueueSettings()
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID
rCfg := NewDefaultRetrySettings()
ts := consumertest.TracesSink{}
te, err := NewTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeTracesExporterConfig,
ts.ConsumeTraces, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)

host := &mockHost{ext: map[component.ID]component.Component{
storageID: internal.NewMockStorageExtension(),
}}
require.NoError(t, te.Start(context.Background(), host))
t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) })

traces := testdata.GenerateTraces(2)
require.NoError(t, te.ConsumeTraces(context.Background(), traces))
require.Eventually(t, func() bool {
return len(ts.AllTraces()) == 1 && ts.SpanCount() == 2
}, 500*time.Millisecond, 10*time.Millisecond)
}

func TestTracesExporter_WithRecordMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName)
require.NoError(t, err)
Expand Down

0 comments on commit e11eb7b

Please sign in to comment.