Skip to content

Commit

Permalink
[chore] [exporterhelper] Increase test coverage for persistent queue (#…
Browse files Browse the repository at this point in the history
…8250)

Add tests covering e2e data delivery using the persistent queue
  • Loading branch information
dmitryax authored Aug 28, 2023
1 parent 0a63747 commit 0af1c11
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 77 deletions.
89 changes: 89 additions & 0 deletions exporter/exporterhelper/internal/mock_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
"context"
"errors"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/experimental/storage"
)

type mockStorageExtension struct {
component.StartFunc
component.ShutdownFunc
}

func (m mockStorageExtension) GetClient(_ context.Context, _ component.Kind, _ component.ID, _ string) (storage.Client, error) {
return &mockStorageClient{st: map[string][]byte{}}, nil
}

func NewMockStorageExtension() storage.Extension {
return &mockStorageExtension{}
}

type mockStorageClient struct {
st map[string][]byte
mux sync.Mutex
closeCounter uint64
}

func (m *mockStorageClient) Get(_ context.Context, s string) ([]byte, error) {
m.mux.Lock()
defer m.mux.Unlock()

val, found := m.st[s]
if !found {
return nil, nil
}

return val, nil
}

func (m *mockStorageClient) Set(_ context.Context, s string, bytes []byte) error {
m.mux.Lock()
defer m.mux.Unlock()

m.st[s] = bytes
return nil
}

func (m *mockStorageClient) Delete(_ context.Context, s string) error {
m.mux.Lock()
defer m.mux.Unlock()

delete(m.st, s)
return nil
}

func (m *mockStorageClient) Close(_ context.Context) error {
m.closeCounter++
return nil
}

func (m *mockStorageClient) Batch(_ context.Context, ops ...storage.Operation) error {
m.mux.Lock()
defer m.mux.Unlock()

for _, op := range ops {
switch op.Type {
case storage.Get:
op.Value = m.st[op.Key]
case storage.Set:
m.st[op.Key] = op.Value
case storage.Delete:
delete(m.st, op.Key)
default:
return errors.New("wrong operation type")
}
}

return nil
}

func (m *mockStorageClient) getCloseCount() uint64 {
return m.closeCounter
}
78 changes: 1 addition & 77 deletions exporter/exporterhelper/internal/persistent_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

func createStorageExtension(_ string) storage.Extension {
// After having storage moved to core, we could leverage storagetest.NewTestExtension(nil, path)
return newMockStorageExtension()
return NewMockStorageExtension()
}

func createTestClient(extension storage.Extension) storage.Client {
Expand Down Expand Up @@ -575,82 +575,6 @@ func requireCurrentlyDispatchedItemsEqual(t *testing.T, pcs *persistentContiguou
}, 5*time.Second, 10*time.Millisecond)
}

type mockStorageExtension struct {
component.StartFunc
component.ShutdownFunc
}

func (m mockStorageExtension) GetClient(_ context.Context, _ component.Kind, _ component.ID, _ string) (storage.Client, error) {
return &mockStorageClient{st: map[string][]byte{}}, nil
}

func newMockStorageExtension() storage.Extension {
return &mockStorageExtension{}
}

type mockStorageClient struct {
st map[string][]byte
mux sync.Mutex
closeCounter uint64
}

func (m *mockStorageClient) Get(_ context.Context, s string) ([]byte, error) {
m.mux.Lock()
defer m.mux.Unlock()

val, found := m.st[s]
if !found {
return nil, nil
}

return val, nil
}

func (m *mockStorageClient) Set(_ context.Context, s string, bytes []byte) error {
m.mux.Lock()
defer m.mux.Unlock()

m.st[s] = bytes
return nil
}

func (m *mockStorageClient) Delete(_ context.Context, s string) error {
m.mux.Lock()
defer m.mux.Unlock()

delete(m.st, s)
return nil
}

func (m *mockStorageClient) Close(_ context.Context) error {
m.closeCounter++
return nil
}

func (m *mockStorageClient) Batch(_ context.Context, ops ...storage.Operation) error {
m.mux.Lock()
defer m.mux.Unlock()

for _, op := range ops {
switch op.Type {
case storage.Get:
op.Value = m.st[op.Key]
case storage.Set:
m.st[op.Key] = op.Value
case storage.Delete:
delete(m.st, op.Key)
default:
return errors.New("wrong operation type")
}
}

return nil
}

func (m *mockStorageClient) getCloseCount() uint64 {
return m.closeCounter
}

func newFakeBoundedStorageClient(maxSizeInBytes int) *fakeBoundedStorageClient {
return &fakeBoundedStorageClient{
st: map[string][]byte{},
Expand Down
27 changes: 27 additions & 0 deletions exporter/exporterhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -20,7 +21,9 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/internal/testdata"
Expand Down Expand Up @@ -148,6 +151,30 @@ func TestLogsRequestExporter_Default_ExportError(t *testing.T) {
require.Equal(t, want, le.ConsumeLogs(context.Background(), ld))
}

func TestLogsExporter_WithPersistentQueue(t *testing.T) {
qCfg := NewDefaultQueueSettings()
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID
rCfg := NewDefaultRetrySettings()
ts := consumertest.LogsSink{}
set := exportertest.NewNopCreateSettings()
set.ID = component.NewIDWithName("test_logs", "with_persistent_queue")
te, err := NewLogsExporter(context.Background(), set, &fakeLogsExporterConfig, ts.ConsumeLogs, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)

host := &mockHost{ext: map[component.ID]component.Component{
storageID: internal.NewMockStorageExtension(),
}}
require.NoError(t, te.Start(context.Background(), host))
t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) })

traces := testdata.GenerateLogs(2)
require.NoError(t, te.ConsumeLogs(context.Background(), traces))
require.Eventually(t, func() bool {
return len(ts.AllLogs()) == 1 && ts.LogRecordCount() == 2
}, 500*time.Millisecond, 10*time.Millisecond)
}

func TestLogsExporter_WithRecordMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName)
require.NoError(t, err)
Expand Down
27 changes: 27 additions & 0 deletions exporter/exporterhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -20,7 +21,9 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/internal/testdata"
Expand Down Expand Up @@ -149,6 +152,30 @@ func TestMetricsRequestExporter_Default_ExportError(t *testing.T) {
require.Equal(t, want, me.ConsumeMetrics(context.Background(), md))
}

func TestMetricsExporter_WithPersistentQueue(t *testing.T) {
qCfg := NewDefaultQueueSettings()
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID
rCfg := NewDefaultRetrySettings()
ms := consumertest.MetricsSink{}
set := exportertest.NewNopCreateSettings()
set.ID = component.NewIDWithName("test_metrics", "with_persistent_queue")
te, err := NewMetricsExporter(context.Background(), set, &fakeTracesExporterConfig, ms.ConsumeMetrics, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)

host := &mockHost{ext: map[component.ID]component.Component{
storageID: internal.NewMockStorageExtension(),
}}
require.NoError(t, te.Start(context.Background(), host))
t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) })

metrics := testdata.GenerateMetrics(2)
require.NoError(t, te.ConsumeMetrics(context.Background(), metrics))
require.Eventually(t, func() bool {
return len(ms.AllMetrics()) == 1 && ms.DataPointCount() == 4
}, 500*time.Millisecond, 10*time.Millisecond)
}

func TestMetricsExporter_WithRecordMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName)
require.NoError(t, err)
Expand Down
27 changes: 27 additions & 0 deletions exporter/exporterhelper/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -20,7 +21,9 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/internal/testdata"
Expand Down Expand Up @@ -146,6 +149,30 @@ func TestTracesRequestExporter_Default_ExportError(t *testing.T) {
require.Equal(t, want, te.ConsumeTraces(context.Background(), td))
}

func TestTracesExporter_WithPersistentQueue(t *testing.T) {
qCfg := NewDefaultQueueSettings()
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID
rCfg := NewDefaultRetrySettings()
ts := consumertest.TracesSink{}
set := exportertest.NewNopCreateSettings()
set.ID = component.NewIDWithName("test_traces", "with_persistent_queue")
te, err := NewTracesExporter(context.Background(), set, &fakeTracesExporterConfig, ts.ConsumeTraces, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)

host := &mockHost{ext: map[component.ID]component.Component{
storageID: internal.NewMockStorageExtension(),
}}
require.NoError(t, te.Start(context.Background(), host))
t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) })

traces := testdata.GenerateTraces(2)
require.NoError(t, te.ConsumeTraces(context.Background(), traces))
require.Eventually(t, func() bool {
return len(ts.AllTraces()) == 1 && ts.SpanCount() == 2
}, 500*time.Millisecond, 10*time.Millisecond)
}

func TestTracesExporter_WithRecordMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName)
require.NoError(t, err)
Expand Down

0 comments on commit 0af1c11

Please sign in to comment.