Skip to content

Commit

Permalink
Add s3 storage class to the AWS S3 exporter (#35574)
Browse files Browse the repository at this point in the history
**Description:**  Add s3 storage class
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

**Link to tracking Issue:**
#35173

**Testing:**  Unit for the config interface

**Documentation:**  Added storage class for the S3
  • Loading branch information
pollosp authored Jan 30, 2025
1 parent de4d492 commit ff2d5e2
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 36 deletions.
27 changes: 27 additions & 0 deletions .chloggen/S3CLASS.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awss3exporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add support for S3 Storgeclass"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35173]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions exporter/awss3exporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The following exporter configuration parameters are supported.
| `encoding` | Encoding extension to use to marshal data. Overrides the `marshaler` configuration option if set. | |
| `encoding_file_extension` | file format extension suffix when using the `encoding` configuration option. May be left empty for no suffix to be appended. | |
| `endpoint` | (REST API endpoint) overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | |
| `storage_class` | [S3 storageclass](https://docs.aws.amazon.com/AmazonS3/latest/userguide/storage-class-intro.html) | STANDARD |
| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false |
| `disable_ssl` | set this to `true` to disable SSL when sending requests | false |
| `compression` | should the file be compressed | none |
Expand Down
16 changes: 16 additions & 0 deletions exporter/awss3exporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type S3UploaderConfig struct {
S3ForcePathStyle bool `mapstructure:"s3_force_path_style"`
// DisableSLL forces communication to happen via HTTP instead of HTTPS.
DisableSSL bool `mapstructure:"disable_ssl"`

StorageClass string `mapstructure:"storage_class"`
// Compression sets the algorithm used to process the payload
// before uploading to S3.
// Valid values are: `gzip` or no value set.
Expand Down Expand Up @@ -62,12 +64,26 @@ type Config struct {

func (c *Config) Validate() error {
var errs error
validStorageClasses := map[string]bool{
"STANDARD": true,
"STANDARD_IA": true,
"ONEZONE_IA": true,
"INTELLIGENT_TIERING": true,
"GLACIER": true,
"DEEP_ARCHIVE": true,
}

if c.S3Uploader.Region == "" {
errs = multierr.Append(errs, errors.New("region is required"))
}
if c.S3Uploader.S3Bucket == "" && c.S3Uploader.Endpoint == "" {
errs = multierr.Append(errs, errors.New("bucket or endpoint is required"))
}

if !validStorageClasses[c.S3Uploader.StorageClass] {
errs = multierr.Append(errs, errors.New("invalid StorageClass"))
}

compression := c.S3Uploader.Compression
if compression.IsCompressed() {
if compression != configcompression.TypeGzip {
Expand Down
83 changes: 61 additions & 22 deletions exporter/awss3exporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ func TestLoadConfig(t *testing.T) {
Encoding: &encoding,
EncodingFileExtension: "baz",
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Bucket: "foo",
S3Partition: "minute",
Region: "us-east-1",
S3Bucket: "foo",
S3Partition: "minute",
StorageClass: "STANDARD",
},
MarshalerName: "otlp_json",
}, e,
Expand Down Expand Up @@ -72,17 +73,50 @@ func TestConfig(t *testing.T) {
assert.Equal(t, &Config{
QueueSettings: queueCfg,
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Bucket: "foo",
S3Prefix: "bar",
S3Partition: "minute",
Endpoint: "http://endpoint.com",
Region: "us-east-1",
S3Bucket: "foo",
S3Prefix: "bar",
S3Partition: "minute",
Endpoint: "http://endpoint.com",
StorageClass: "STANDARD",
},
MarshalerName: "otlp_json",
}, e,
)
}

func TestConfigS3StorageClaas(t *testing.T) {
factories, err := otelcoltest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Exporters[factory.Type()] = factory
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33594
cfg, err := otelcoltest.LoadConfigAndValidate(
filepath.Join("testdata", "config-s3_storage_class.yaml"), factories)

require.NoError(t, err)
require.NotNil(t, cfg)

e := cfg.Exporters[component.MustNewID("awss3")].(*Config)
queueCfg := exporterhelper.NewDefaultQueueConfig()
queueCfg.Enabled = false

assert.Equal(t, &Config{
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Bucket: "foo",
S3Prefix: "bar",
S3Partition: "minute",
Endpoint: "http://endpoint.com",
StorageClass: "STANDARD_IA",
},
QueueSettings: queueCfg,
MarshalerName: "otlp_json",
}, e,
)
}

func TestConfigForS3CompatibleSystems(t *testing.T) {
factories, err := otelcoltest.NopFactories()
assert.NoError(t, err)
Expand Down Expand Up @@ -110,6 +144,7 @@ func TestConfigForS3CompatibleSystems(t *testing.T) {
Endpoint: "alternative-s3-system.example.com",
S3ForcePathStyle: true,
DisableSSL: true,
StorageClass: "STANDARD",
},
MarshalerName: "otlp_json",
}, e,
Expand Down Expand Up @@ -217,9 +252,10 @@ func TestMarshallerName(t *testing.T) {
assert.Equal(t, &Config{
QueueSettings: queueCfg,
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Bucket: "foo",
S3Partition: "minute",
Region: "us-east-1",
S3Bucket: "foo",
S3Partition: "minute",
StorageClass: "STANDARD",
},
MarshalerName: "sumo_ic",
}, e,
Expand All @@ -230,9 +266,10 @@ func TestMarshallerName(t *testing.T) {
assert.Equal(t, &Config{
QueueSettings: queueCfg,
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Bucket: "bar",
S3Partition: "minute",
Region: "us-east-1",
S3Bucket: "bar",
S3Partition: "minute",
StorageClass: "STANDARD",
},
MarshalerName: "otlp_proto",
}, e,
Expand All @@ -259,10 +296,11 @@ func TestCompressionName(t *testing.T) {
assert.Equal(t, &Config{
QueueSettings: queueCfg,
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Bucket: "foo",
S3Partition: "minute",
Compression: "gzip",
Region: "us-east-1",
S3Bucket: "foo",
S3Partition: "minute",
Compression: "gzip",
StorageClass: "STANDARD",
},
MarshalerName: "otlp_json",
}, e,
Expand All @@ -273,10 +311,11 @@ func TestCompressionName(t *testing.T) {
assert.Equal(t, &Config{
QueueSettings: queueCfg,
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Bucket: "bar",
S3Partition: "minute",
Compression: "none",
Region: "us-east-1",
S3Bucket: "bar",
S3Partition: "minute",
Compression: "none",
StorageClass: "STANDARD",
},
MarshalerName: "otlp_proto",
}, e,
Expand Down
5 changes: 3 additions & 2 deletions exporter/awss3exporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ func createDefaultConfig() component.Config {
return &Config{
QueueSettings: queueCfg,
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Partition: "minute",
Region: "us-east-1",
S3Partition: "minute",
StorageClass: "STANDARD",
},
MarshalerName: "otlp_json",
}
Expand Down
18 changes: 11 additions & 7 deletions exporter/awss3exporter/internal/upload/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/tilinna/clock"
"go.opentelemetry.io/collector/config/configcompression"
)
Expand All @@ -20,18 +21,20 @@ type Manager interface {
}

type s3manager struct {
bucket string
builder *PartitionKeyBuilder
uploader *manager.Uploader
bucket string
builder *PartitionKeyBuilder
uploader *manager.Uploader
storageClass s3types.StorageClass
}

var _ Manager = (*s3manager)(nil)

func NewS3Manager(bucket string, builder *PartitionKeyBuilder, service *s3.Client) Manager {
func NewS3Manager(bucket string, builder *PartitionKeyBuilder, service *s3.Client, storageClass s3types.StorageClass) Manager {
return &s3manager{
bucket: bucket,
builder: builder,
uploader: manager.NewUploader(service),
bucket: bucket,
builder: builder,
uploader: manager.NewUploader(service),
storageClass: storageClass,
}
}

Expand All @@ -57,6 +60,7 @@ func (sw *s3manager) Upload(ctx context.Context, data []byte) error {
Key: aws.String(sw.builder.Build(now)),
Body: content,
ContentEncoding: aws.String(encoding),
StorageClass: sw.storageClass,
})

return err
Expand Down
25 changes: 20 additions & 5 deletions exporter/awss3exporter/internal/upload/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestNewS3Manager(t *testing.T) {
"my-bucket",
&PartitionKeyBuilder{},
s3.New(s3.Options{}),
"STANDARD",
)

assert.NotNil(t, sm, "Must have a valid client returned")
Expand All @@ -35,11 +36,12 @@ func TestS3ManagerUpload(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
handler func(t *testing.T) http.Handler
compression configcompression.Type
data []byte
errVal string
name string
handler func(t *testing.T) http.Handler
compression configcompression.Type
data []byte
errVal string
storageClass string
}{
{
name: "successful upload",
Expand Down Expand Up @@ -115,6 +117,18 @@ func TestS3ManagerUpload(t *testing.T) {
data: []byte("good payload"),
errVal: "operation error S3: PutObject, https response error StatusCode: 401, RequestID: , HostID: , api error Unauthorized: Unauthorized",
},
{
name: "STANDARD_IA storage class",
handler: func(t *testing.T) http.Handler {
return http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) {
// Example of validating that the S3 storage class header is set correctly
assert.Equal(t, "STANDARD_IA", r.Header.Get("x-amz-storage-class"))
})
},
storageClass: "STANDARD_IA",
data: []byte("some data"),
errVal: "",
},
} {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
Expand All @@ -139,6 +153,7 @@ func TestS3ManagerUpload(t *testing.T) {
BaseEndpoint: aws.String(s.URL),
Region: "local",
}),
"STANDARD_IA",
)

// Using a mocked virtual clock to fix the timestamp used
Expand Down
2 changes: 2 additions & 0 deletions exporter/awss3exporter/s3_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials/stscreds"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/aws-sdk-go-v2/service/sts"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter/internal/upload"
Expand Down Expand Up @@ -70,5 +71,6 @@ func newUploadManager(
Compression: conf.S3Uploader.Compression,
},
s3.NewFromConfig(cfg, s3Opts...),
s3types.StorageClass(conf.S3Uploader.StorageClass),
), nil
}
22 changes: 22 additions & 0 deletions exporter/awss3exporter/testdata/config-s3_storage_class.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
receivers:
nop:

exporters:
awss3:
s3uploader:
region: 'us-east-1'
s3_bucket: 'foo'
s3_prefix: 'bar'
s3_partition: 'minute'
endpoint: "http://endpoint.com"
storage_class: "STANDARD_IA"

processors:
nop:

service:
pipelines:
traces:
receivers: [nop]
processors: [nop]
exporters: [awss3]

0 comments on commit ff2d5e2

Please sign in to comment.