From d67d631438e2dd43912b602e4872afa0d32116d7 Mon Sep 17 00:00:00 2001 From: Felipe Lopes Date: Wed, 15 Jan 2025 18:11:13 -0300 Subject: [PATCH 1/2] [exporter/awss3] Implement sending queue --- exporter/awss3exporter/config.go | 3 ++ exporter/awss3exporter/config_test.go | 34 +++++++++++++++ exporter/awss3exporter/factory.go | 47 ++++++++++++++++++--- exporter/awss3exporter/testdata/config.yaml | 5 +++ 4 files changed, 83 insertions(+), 6 deletions(-) diff --git a/exporter/awss3exporter/config.go b/exporter/awss3exporter/config.go index 4514d2bb7643..cac26c285746 100644 --- a/exporter/awss3exporter/config.go +++ b/exporter/awss3exporter/config.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcompression" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.uber.org/multierr" ) @@ -49,6 +50,8 @@ const ( // Config contains the main configuration options for the s3 exporter type Config struct { + QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"` + S3Uploader S3UploaderConfig `mapstructure:"s3uploader"` MarshalerName MarshalerType `mapstructure:"marshaler"` diff --git a/exporter/awss3exporter/config_test.go b/exporter/awss3exporter/config_test.go index cd08539581c5..ca2f78a651e4 100644 --- a/exporter/awss3exporter/config_test.go +++ b/exporter/awss3exporter/config_test.go @@ -5,6 +5,7 @@ package awss3exporter import ( "errors" + "go.opentelemetry.io/collector/exporter/exporterhelper" "path/filepath" "testing" @@ -32,7 +33,13 @@ func TestLoadConfig(t *testing.T) { e := cfg.Exporters[component.MustNewID("awss3")].(*Config) encoding := component.MustNewIDWithName("foo", "bar") + + queueCfg := exporterhelper.NewDefaultQueueConfig() + queueCfg.Enabled = false + assert.Equal(t, &Config{ + QueueSettings: queueCfg, + Encoding: &encoding, EncodingFileExtension: "baz", S3Uploader: S3UploaderConfig{ @@ -59,9 +66,17 @@ func TestConfig(t *testing.T) { require.NoError(t, err) require.NotNil(t, cfg) + queueCfg := exporterhelper.QueueConfig{ + Enabled: true, + NumConsumers: 23, + QueueSize: 42, + } + e := cfg.Exporters[component.MustNewID("awss3")].(*Config) assert.Equal(t, &Config{ + QueueSettings: queueCfg, + S3Uploader: S3UploaderConfig{ Region: "us-east-1", S3Bucket: "foo", @@ -88,9 +103,14 @@ func TestConfigForS3CompatibleSystems(t *testing.T) { require.NoError(t, err) require.NotNil(t, cfg) + queueCfg := exporterhelper.NewDefaultQueueConfig() + queueCfg.Enabled = false + e := cfg.Exporters[component.MustNewID("awss3")].(*Config) assert.Equal(t, &Config{ + QueueSettings: queueCfg, + S3Uploader: S3UploaderConfig{ Region: "us-east-1", S3Bucket: "foo", @@ -200,9 +220,14 @@ func TestMarshallerName(t *testing.T) { require.NoError(t, err) require.NotNil(t, cfg) + queueCfg := exporterhelper.NewDefaultQueueConfig() + queueCfg.Enabled = false + e := cfg.Exporters[component.MustNewID("awss3")].(*Config) assert.Equal(t, &Config{ + QueueSettings: queueCfg, + S3Uploader: S3UploaderConfig{ Region: "us-east-1", S3Bucket: "foo", @@ -215,6 +240,8 @@ func TestMarshallerName(t *testing.T) { e = cfg.Exporters[component.MustNewIDWithName("awss3", "proto")].(*Config) assert.Equal(t, &Config{ + QueueSettings: queueCfg, + S3Uploader: S3UploaderConfig{ Region: "us-east-1", S3Bucket: "bar", @@ -239,9 +266,14 @@ func TestCompressionName(t *testing.T) { require.NoError(t, err) require.NotNil(t, cfg) + queueCfg := exporterhelper.NewDefaultQueueConfig() + queueCfg.Enabled = false + e := cfg.Exporters[component.MustNewID("awss3")].(*Config) assert.Equal(t, &Config{ + QueueSettings: queueCfg, + S3Uploader: S3UploaderConfig{ Region: "us-east-1", S3Bucket: "foo", @@ -255,6 +287,8 @@ func TestCompressionName(t *testing.T) { e = cfg.Exporters[component.MustNewIDWithName("awss3", "proto")].(*Config) assert.Equal(t, &Config{ + QueueSettings: queueCfg, + S3Uploader: S3UploaderConfig{ Region: "us-east-1", S3Bucket: "bar", diff --git a/exporter/awss3exporter/factory.go b/exporter/awss3exporter/factory.go index da343d63ba0a..393f49eec39f 100644 --- a/exporter/awss3exporter/factory.go +++ b/exporter/awss3exporter/factory.go @@ -26,7 +26,12 @@ func NewFactory() exporter.Factory { } func createDefaultConfig() component.Config { + queueCfg := exporterhelper.NewDefaultQueueConfig() + queueCfg.Enabled = false + return &Config{ + QueueSettings: queueCfg, + S3Uploader: S3UploaderConfig{ Region: "us-east-1", S3Partition: "minute", @@ -39,19 +44,31 @@ func createLogsExporter(ctx context.Context, params exporter.Settings, config component.Config, ) (exporter.Logs, error) { - s3Exporter := newS3Exporter(config.(*Config), "logs", params) + cfg, err := checkAndCastConfig(config) + if err != nil { + return nil, err + } + + s3Exporter := newS3Exporter(cfg, "logs", params) return exporterhelper.NewLogs(ctx, params, config, s3Exporter.ConsumeLogs, - exporterhelper.WithStart(s3Exporter.start)) + exporterhelper.WithStart(s3Exporter.start), + exporterhelper.WithQueue(cfg.QueueSettings), + ) } func createMetricsExporter(ctx context.Context, params exporter.Settings, config component.Config, ) (exporter.Metrics, error) { - s3Exporter := newS3Exporter(config.(*Config), "metrics", params) + cfg, err := checkAndCastConfig(config) + if err != nil { + return nil, err + } + + s3Exporter := newS3Exporter(cfg, "metrics", params) if config.(*Config).MarshalerName == SumoIC { return nil, fmt.Errorf("metrics are not supported by sumo_ic output format") @@ -60,14 +77,21 @@ func createMetricsExporter(ctx context.Context, return exporterhelper.NewMetrics(ctx, params, config, s3Exporter.ConsumeMetrics, - exporterhelper.WithStart(s3Exporter.start)) + exporterhelper.WithStart(s3Exporter.start), + exporterhelper.WithQueue(cfg.QueueSettings), + ) } func createTracesExporter(ctx context.Context, params exporter.Settings, config component.Config, ) (exporter.Traces, error) { - s3Exporter := newS3Exporter(config.(*Config), "traces", params) + cfg, err := checkAndCastConfig(config) + if err != nil { + return nil, err + } + + s3Exporter := newS3Exporter(cfg, "traces", params) if config.(*Config).MarshalerName == SumoIC { return nil, fmt.Errorf("traces are not supported by sumo_ic output format") @@ -77,5 +101,16 @@ func createTracesExporter(ctx context.Context, params, config, s3Exporter.ConsumeTraces, - exporterhelper.WithStart(s3Exporter.start)) + exporterhelper.WithStart(s3Exporter.start), + exporterhelper.WithQueue(cfg.QueueSettings), + ) +} + +// checkAndCastConfig checks the configuration type and casts it to the S3 exporter Config struct. +func checkAndCastConfig(c component.Config) (*Config, error) { + cfg, ok := c.(*Config) + if !ok { + return nil, fmt.Errorf("config structure is not of type *awss3exporter.Config") + } + return cfg, nil } diff --git a/exporter/awss3exporter/testdata/config.yaml b/exporter/awss3exporter/testdata/config.yaml index 3f0d5808033e..c373fe878cb9 100644 --- a/exporter/awss3exporter/testdata/config.yaml +++ b/exporter/awss3exporter/testdata/config.yaml @@ -3,6 +3,11 @@ receivers: exporters: awss3: + sending_queue: + enabled: true + num_consumers: 23 + queue_size: 42 + s3uploader: region: 'us-east-1' s3_bucket: 'foo' From 308e07e11ee32ad1ab3462640a2c7a4e54a6b011 Mon Sep 17 00:00:00 2001 From: Felipe Lopes Date: Wed, 15 Jan 2025 18:24:29 -0300 Subject: [PATCH 2/2] lint --- exporter/awss3exporter/config_test.go | 2 +- receiver/datadogreceiver/go.mod | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/exporter/awss3exporter/config_test.go b/exporter/awss3exporter/config_test.go index ca2f78a651e4..bb339f1c3cc9 100644 --- a/exporter/awss3exporter/config_test.go +++ b/exporter/awss3exporter/config_test.go @@ -5,13 +5,13 @@ package awss3exporter import ( "errors" - "go.opentelemetry.io/collector/exporter/exporterhelper" "path/filepath" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/otelcol/otelcoltest" "go.uber.org/multierr" diff --git a/receiver/datadogreceiver/go.mod b/receiver/datadogreceiver/go.mod index ea0cd9e1427d..c4ff23f813df 100644 --- a/receiver/datadogreceiver/go.mod +++ b/receiver/datadogreceiver/go.mod @@ -1,8 +1,6 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver -go 1.22.7 - -toolchain go1.23.0 +go 1.22.0 require ( github.com/DataDog/agent-payload/v5 v5.0.140