From 5c190511f022c90f72bc095a95740420f362f8bc Mon Sep 17 00:00:00 2001 From: Gavin Cabbage <gavin.cabbage@servicenow.com> Date: Fri, 6 Sep 2024 15:17:01 -0400 Subject: [PATCH 1/7] [receiver/awsfirehosereceiver] Add support for CloudWatch logs --- .chloggen/awsfirehosereceiver-cwlogs.yaml | 27 ++++ receiver/awsfirehosereceiver/README.md | 23 +++- receiver/awsfirehosereceiver/config_test.go | 62 +++++---- receiver/awsfirehosereceiver/factory.go | 23 +++- receiver/awsfirehosereceiver/factory_test.go | 11 ++ .../generated_component_test.go | 7 + .../generated_package_test.go | 3 +- .../internal/metadata/generated_status.go | 1 + .../cwlog/compression/compression.go | 46 +++++++ .../internal/unmarshaler/cwlog/cwlog.go | 17 +++ .../internal/unmarshaler/cwlog/logsbuilder.go | 50 ++++++++ .../cwlog/testdata/invalid_records | 4 + .../cwlog/testdata/multiple_records | 2 + .../cwlog/testdata/multiple_resources | 6 + .../unmarshaler/cwlog/testdata/single_record | 1 + .../cwlog/testdata/some_invalid_records | 3 + .../internal/unmarshaler/cwlog/unmarshaler.go | 107 ++++++++++++++++ .../unmarshaler/cwlog/unmarshaler_test.go | 83 ++++++++++++ .../internal/unmarshaler/unmarshaler.go | 10 ++ .../unmarshalertest/nop_logs_unmarshaler.go | 47 +++++++ .../nop_logs_unmarshaler_test.go | 41 ++++++ ...arshaler.go => nop_metrics_unmarshaler.go} | 0 ...est.go => nop_metrics_unmarshaler_test.go} | 0 receiver/awsfirehosereceiver/logs_receiver.go | 81 ++++++++++++ .../awsfirehosereceiver/logs_receiver_test.go | 121 ++++++++++++++++++ receiver/awsfirehosereceiver/metadata.yaml | 2 +- .../testdata/cwlogs_config.yaml | 7 + .../{config.yaml => cwmetrics_config.yaml} | 0 .../testdata/invalid_config.yaml | 3 + 29 files changed, 758 insertions(+), 30 deletions(-) create mode 100644 .chloggen/awsfirehosereceiver-cwlogs.yaml create mode 100644 receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go create mode 100644 receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/cwlog.go create mode 100644 receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go create mode 100644 receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/invalid_records create mode 100644 receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/multiple_records create mode 100644 receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/multiple_resources create mode 100644 receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/single_record create mode 100644 receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/some_invalid_records create mode 100644 receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go create mode 100644 receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go create mode 100644 receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go create mode 100644 receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler_test.go rename receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/{nop_unmarshaler.go => nop_metrics_unmarshaler.go} (100%) rename receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/{nop_unmarshaler_test.go => nop_metrics_unmarshaler_test.go} (100%) create mode 100644 receiver/awsfirehosereceiver/logs_receiver.go create mode 100644 receiver/awsfirehosereceiver/logs_receiver_test.go create mode 100644 receiver/awsfirehosereceiver/testdata/cwlogs_config.yaml rename receiver/awsfirehosereceiver/testdata/{config.yaml => cwmetrics_config.yaml} (100%) create mode 100644 receiver/awsfirehosereceiver/testdata/invalid_config.yaml diff --git a/.chloggen/awsfirehosereceiver-cwlogs.yaml b/.chloggen/awsfirehosereceiver-cwlogs.yaml new file mode 100644 index 000000000000..14345b8a1bc7 --- /dev/null +++ b/.chloggen/awsfirehosereceiver-cwlogs.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awsfirehosereceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for CloudWatch logs + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35077] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/awsfirehosereceiver/README.md b/receiver/awsfirehosereceiver/README.md index eb4041c833d0..a8e6d242775d 100644 --- a/receiver/awsfirehosereceiver/README.md +++ b/receiver/awsfirehosereceiver/README.md @@ -3,7 +3,7 @@ <!-- status autogenerated section --> | Status | | | ------------- |-----------| -| Stability | [alpha]: metrics | +| Stability | [alpha]: metrics, logs | | Distributions | [contrib] | | Issues | [](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fawsfirehose) [](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fawsfirehose) | | [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@Aneurysm9](https://www.github.com/Aneurysm9) | @@ -62,3 +62,24 @@ See [documentation](https://docs.aws.amazon.com/firehose/latest/dev/create-desti The record type for the CloudWatch metric stream. Expects the format for the records to be JSON. See [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Metric-Streams.html) for details. +### cwlogs +The record type for the CloudWatch log stream. Expects the format for the records to be JSON. +For example: + +```json +{ + "messageType": "DATA_MESSAGE", + "owner": "111122223333", + "logGroup": "my-log-group", + "logStream": "my-log-stream", + "subscriptionFilters": ["my-subscription-filter"], + "logEvents": [ + { + "id": "123", + "timestamp": 1725544035523, + "message": "My log message." + } + ] +} +``` + diff --git a/receiver/awsfirehosereceiver/config_test.go b/receiver/awsfirehosereceiver/config_test.go index d7ec5c71bb04..77f5ef1fb7bb 100644 --- a/receiver/awsfirehosereceiver/config_test.go +++ b/receiver/awsfirehosereceiver/config_test.go @@ -4,6 +4,7 @@ package awsfirehosereceiver import ( + "fmt" "path/filepath" "testing" @@ -18,29 +19,40 @@ import ( ) func TestLoadConfig(t *testing.T) { - cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) - require.NoError(t, err) - - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - - sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "").String()) - require.NoError(t, err) - require.NoError(t, sub.Unmarshal(cfg)) - - assert.NoError(t, component.ValidateConfig(cfg)) - - require.Equal(t, &Config{ - RecordType: "cwmetrics", - AccessKey: "some_access_key", - ServerConfig: confighttp.ServerConfig{ - Endpoint: "0.0.0.0:4433", - TLSSetting: &configtls.ServerConfig{ - Config: configtls.Config{ - CertFile: "server.crt", - KeyFile: "server.key", - }, - }, - }, - }, cfg) + for _, configType := range []string{ + "cwmetrics", "cwlogs", "invalid", + } { + t.Run(configType, func(t *testing.T) { + fileName := fmt.Sprintf("%s_config.yaml", configType) + cm, err := confmaptest.LoadConf(filepath.Join("testdata", fileName)) + require.NoError(t, err) + + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + + sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "").String()) + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(cfg)) + + err = component.ValidateConfig(cfg) + if configType == "invalid" { + assert.Error(t, err) + } else { + assert.NoError(t, err) + require.Equal(t, &Config{ + RecordType: configType, + AccessKey: "some_access_key", + ServerConfig: confighttp.ServerConfig{ + Endpoint: "0.0.0.0:4433", + TLSSetting: &configtls.ServerConfig{ + Config: configtls.Config{ + CertFile: "server.crt", + KeyFile: "server.key", + }, + }, + }, + }, cfg) + } + }) + } } diff --git a/receiver/awsfirehosereceiver/factory.go b/receiver/awsfirehosereceiver/factory.go index 4e187231ba59..3b781b6b040c 100644 --- a/receiver/awsfirehosereceiver/factory.go +++ b/receiver/awsfirehosereceiver/factory.go @@ -16,6 +16,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/localhostgate" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream" ) @@ -29,6 +30,7 @@ var ( errUnrecognizedRecordType = errors.New("unrecognized record type") availableRecordTypes = map[string]bool{ cwmetricstream.TypeStr: true, + cwlog.TypeStr: true, } ) @@ -38,7 +40,8 @@ func NewFactory() receiver.Factory { return receiver.NewFactory( metadata.Type, createDefaultConfig, - receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability)) + receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability), + receiver.WithLogs(createLogsReceiver, metadata.LogsStability)) } // validateRecordType checks the available record types for the @@ -59,6 +62,14 @@ func defaultMetricsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.Metri } } +// defaultLogsUnmarshalers creates a map of the available logs unmarshalers. +func defaultLogsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.LogsUnmarshaler { + u := cwlog.NewUnmarshaler(logger) + return map[string]unmarshaler.LogsUnmarshaler{ + u.Type(): u, + } +} + // createDefaultConfig creates a default config with the endpoint set // to port 8443 and the record type set to the CloudWatch metric stream. func createDefaultConfig() component.Config { @@ -79,3 +90,13 @@ func createMetricsReceiver( ) (receiver.Metrics, error) { return newMetricsReceiver(cfg.(*Config), set, defaultMetricsUnmarshalers(set.Logger), nextConsumer) } + +// createMetricsReceiver implements the CreateMetricsReceiver function type. +func createLogsReceiver( + _ context.Context, + set receiver.Settings, + cfg component.Config, + nextConsumer consumer.Logs, +) (receiver.Logs, error) { + return newLogsReceiver(cfg.(*Config), set, defaultLogsUnmarshalers(set.Logger), nextConsumer) +} diff --git a/receiver/awsfirehosereceiver/factory_test.go b/receiver/awsfirehosereceiver/factory_test.go index e481ab1595ea..68914e75a29a 100644 --- a/receiver/awsfirehosereceiver/factory_test.go +++ b/receiver/awsfirehosereceiver/factory_test.go @@ -29,6 +29,17 @@ func TestCreateMetricsReceiver(t *testing.T) { require.NotNil(t, r) } +func TestCreateLogsReceiver(t *testing.T) { + r, err := createLogsReceiver( + context.Background(), + receivertest.NewNopSettings(), + createDefaultConfig(), + consumertest.NewNop(), + ) + require.NoError(t, err) + require.NotNil(t, r) +} + func TestValidateRecordType(t *testing.T) { require.NoError(t, validateRecordType(defaultRecordType)) require.Error(t, validateRecordType("nop")) diff --git a/receiver/awsfirehosereceiver/generated_component_test.go b/receiver/awsfirehosereceiver/generated_component_test.go index 8833fc62fc99..96dbbd3e427c 100644 --- a/receiver/awsfirehosereceiver/generated_component_test.go +++ b/receiver/awsfirehosereceiver/generated_component_test.go @@ -31,6 +31,13 @@ func TestComponentLifecycle(t *testing.T) { createFn func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) }{ + { + name: "logs", + createFn: func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) { + return factory.CreateLogsReceiver(ctx, set, cfg, consumertest.NewNop()) + }, + }, + { name: "metrics", createFn: func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) { diff --git a/receiver/awsfirehosereceiver/generated_package_test.go b/receiver/awsfirehosereceiver/generated_package_test.go index 4de8a57d127d..e7de7aee2b8b 100644 --- a/receiver/awsfirehosereceiver/generated_package_test.go +++ b/receiver/awsfirehosereceiver/generated_package_test.go @@ -3,9 +3,8 @@ package awsfirehosereceiver import ( - "testing" - "go.uber.org/goleak" + "testing" ) func TestMain(m *testing.M) { diff --git a/receiver/awsfirehosereceiver/internal/metadata/generated_status.go b/receiver/awsfirehosereceiver/internal/metadata/generated_status.go index 97a9e06c34f5..447dc3186c20 100644 --- a/receiver/awsfirehosereceiver/internal/metadata/generated_status.go +++ b/receiver/awsfirehosereceiver/internal/metadata/generated_status.go @@ -13,4 +13,5 @@ var ( const ( MetricsStability = component.StabilityLevelAlpha + LogsStability = component.StabilityLevelAlpha ) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go new file mode 100644 index 000000000000..d426a4753df4 --- /dev/null +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package compression + +import ( + "bytes" + "compress/gzip" +) + +func Zip(data []byte) ([]byte, error) { + var b bytes.Buffer + w := gzip.NewWriter(&b) + + _, err := w.Write(data) + if err != nil { + return nil, err + } + + if err = w.Flush(); err != nil { + return nil, err + } + + if err = w.Close(); err != nil { + return nil, err + } + + return b.Bytes(), nil +} + +func Unzip(data []byte) ([]byte, error) { + b := bytes.NewBuffer(data) + + r, err := gzip.NewReader(b) + if err != nil { + return nil, err + } + + var rv bytes.Buffer + _, err = rv.ReadFrom(r) + if err != nil { + return nil, err + } + + return rv.Bytes(), nil +} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/cwlog.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/cwlog.go new file mode 100644 index 000000000000..3ccd591e09d7 --- /dev/null +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/cwlog.go @@ -0,0 +1,17 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package cwlog + +type cwLog struct { + MessageType string `json:"messageType"` + Owner string `json:"owner"` + LogGroup string `json:"logGroup"` + LogStream string `json:"logStream"` + SubscriptionFilters []string `json:"subscriptionFilters"` + LogEvents []struct { + Id string `json:"id"` + Timestamp int64 `json:"timestamp"` + Message string `json:"message"` + } `json:"logEvents"` +} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go new file mode 100644 index 000000000000..7f21669a85d4 --- /dev/null +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package cwlog + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + conventions "go.opentelemetry.io/collector/semconv/v1.6.1" +) + +const ( + attributeCloudwatchLogGroupName = "cloudwatch.log.group.name" + attributeCloudwatchLogStreamName = "cloudwatch.log.stream.name" +) + +// resourceAttributes are the CloudWatch log attributes that define a unique resource. +type resourceAttributes struct { + owner, logGroup, logStream string +} + +// resourceLogsBuilder provides convenient access to the a Resource's LogRecordSlice. +type resourceLogsBuilder struct { + rls plog.LogRecordSlice +} + +// setAttributes applies the resourceAttributes to the provided Resource. +func (ra *resourceAttributes) setAttributes(resource pcommon.Resource) { + attrs := resource.Attributes() + attrs.PutStr(conventions.AttributeCloudAccountID, ra.owner) + attrs.PutStr("cloudwatch.log.group.name", ra.logStream) + attrs.PutStr("cloudwatch.log.stream", ra.logGroup) +} + +// newResourceLogsBuilder to capture logs for the Resource defined by the provided attributes. +func newResourceLogsBuilder(logs plog.Logs, attrs resourceAttributes) *resourceLogsBuilder { + rls := logs.ResourceLogs().AppendEmpty() + attrs.setAttributes(rls.Resource()) + return &resourceLogsBuilder{rls.ScopeLogs().AppendEmpty().LogRecords()} +} + +// AddLog events to the LogRecordSlice. Resource attributes are captured when creating +// the resourceLogsBuilder, so we only need to consider the LogEvents themselves. +func (rlb *resourceLogsBuilder) AddLog(log cwLog) { + for _, event := range log.LogEvents { + logLine := rlb.rls.AppendEmpty() + logLine.SetTimestamp(pcommon.Timestamp(event.Timestamp)) + logLine.Body().SetStr(event.Message) + } +} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/invalid_records b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/invalid_records new file mode 100644 index 000000000000..d2672dbd26d3 --- /dev/null +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/invalid_records @@ -0,0 +1,4 @@ +{"CHANGE":-0.09,"PRICE":4.96,"TICKER_SYMBOL":"KIN","SECTOR":"ENERGY"} +{"CHANGE":-1.47,"PRICE":134.74,"TICKER_SYMBOL":"DFG","SECTOR":"TECHNOLOGY"} +{"CHANGE":1.96,"PRICE":57.53,"TICKER_SYMBOL":"SAC","SECTOR":"ENERGY"} +{"CHANGE":0.04,"PRICE":32.84,"TICKER_SYMBOL":"PJN","SECTOR":"RETAIL"} \ No newline at end of file diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/multiple_records b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/multiple_records new file mode 100644 index 000000000000..18a1888d99a8 --- /dev/null +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/multiple_records @@ -0,0 +1,2 @@ +{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test","logStream":"test","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695232","timestamp":1725544035523,"message":"Hello world, here is our first log message!"}]} +{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test","logStream":"test","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695233","timestamp":1725554035523,"message":"Hello world, here is our second log message!"}]} \ No newline at end of file diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/multiple_resources b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/multiple_resources new file mode 100644 index 000000000000..0115614f1b9b --- /dev/null +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/multiple_resources @@ -0,0 +1,6 @@ +{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test","logStream":"test","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695232","timestamp":1725544035523,"message":"Hello world, here is our first log message!"}]} +{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test","logStream":"test","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695233","timestamp":1725554035523,"message":"Hello world, here is our second log message!"}]} +{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test2","logStream":"test1","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695234","timestamp":1725564035523,"message":"Hello world, here is our third log message!"}]} +{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test2","logStream":"test2","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695235","timestamp":1725574035523,"message":"Hello world, here is our fourth log message!"}]} +{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test2","logStream":"test1","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695236","timestamp":1725584035523,"message":"Hello world, here is our fifth log message!"}]} +{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test2","logStream":"test2","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695237","timestamp":1725594035523,"message":"Hello world, here is our sixth log message!"}]} \ No newline at end of file diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/single_record b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/single_record new file mode 100644 index 000000000000..b35e166e47da --- /dev/null +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/single_record @@ -0,0 +1 @@ +{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test","logStream":"test","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695232","timestamp":1725544035523,"message":"Hello world, here is our first log message!"}]} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/some_invalid_records b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/some_invalid_records new file mode 100644 index 000000000000..4026ad877a43 --- /dev/null +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/testdata/some_invalid_records @@ -0,0 +1,3 @@ +{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test","logStream":"test","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695232","timestamp":1725544035523,"message":"Hello world, here is our first log message!"}]} +{"CHANGE":1.96,"PRICE":57.53,"TICKER_SYMBOL":"SAC","SECTOR":"ENERGY"} +{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test","logStream":"test","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695233","timestamp":1725554035523,"message":"Hello world, here is our second log message!"}]} \ No newline at end of file diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go new file mode 100644 index 000000000000..f862919384e0 --- /dev/null +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go @@ -0,0 +1,107 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package cwlog // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream" + +import ( + "bytes" + "encoding/json" + "errors" + + "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression" +) + +const ( + TypeStr = "cwlogs" + recordDelimiter = "\n" +) + +var ( + errInvalidRecords = errors.New("record format invalid") +) + +// Unmarshaler for the CloudWatch Log JSON record format. +type Unmarshaler struct { + logger *zap.Logger +} + +var _ unmarshaler.LogsUnmarshaler = (*Unmarshaler)(nil) + +// NewUnmarshaler creates a new instance of the Unmarshaler. +func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { + return &Unmarshaler{logger} +} + +// Unmarshal deserializes the records into cWMetrics and uses the +// resourceMetricsBuilder to group them into a single pmetric.Metrics. +// Skips invalid cWMetrics received in the record and +func (u Unmarshaler) Unmarshal(records [][]byte) (plog.Logs, error) { + md := plog.NewLogs() + builders := make(map[resourceAttributes]*resourceLogsBuilder) + for recordIndex, compressedRecord := range records { + record, err := compression.Unzip(compressedRecord) + if err != nil { + u.logger.Error("Failed to unzip record", + zap.Error(err), + zap.Int("record_index", recordIndex), + ) + continue + } + // Multiple logs in each record separated by newline character + for datumIndex, datum := range bytes.Split(record, []byte(recordDelimiter)) { + if len(datum) > 0 { + var log cwLog + err := json.Unmarshal(datum, &log) + if err != nil { + u.logger.Error( + "Unable to unmarshal input", + zap.Error(err), + zap.Int("datum_index", datumIndex), + zap.Int("record_index", recordIndex), + ) + continue + } + if !u.isValid(log) { + u.logger.Error( + "Invalid log", + zap.Int("datum_index", datumIndex), + zap.Int("record_index", recordIndex), + ) + continue + } + attrs := resourceAttributes{ + owner: log.Owner, + logGroup: log.LogGroup, + logStream: log.LogStream, + } + lb, ok := builders[attrs] + if !ok { + lb = newResourceLogsBuilder(md, attrs) + builders[attrs] = lb + } + lb.AddLog(log) + + } + } + } + + if len(builders) == 0 { + return plog.NewLogs(), errInvalidRecords + } + + return md, nil +} + +// isValid validates that the cWMetric has been unmarshalled correctly. +func (u Unmarshaler) isValid(log cwLog) bool { + return log.Owner != "" && log.LogGroup != "" && log.LogStream != "" +} + +// Type of the serialized messages. +func (u Unmarshaler) Type() string { + return TypeStr +} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go new file mode 100644 index 000000000000..fbd683939aa5 --- /dev/null +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go @@ -0,0 +1,83 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package cwlog + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression" +) + +func TestType(t *testing.T) { + unmarshaler := NewUnmarshaler(zap.NewNop()) + require.Equal(t, TypeStr, unmarshaler.Type()) +} + +func TestUnmarshal(t *testing.T) { + unmarshaler := NewUnmarshaler(zap.NewNop()) + testCases := map[string]struct { + filename string + wantResourceCount int + wantLogCount int + wantErr error + }{ + "WithMultipleRecords": { + filename: "multiple_records", + wantResourceCount: 1, + wantLogCount: 2, + }, + "WithSingleRecord": { + filename: "single_record", + wantResourceCount: 1, + wantLogCount: 1, + }, + "WithInvalidRecords": { + filename: "invalid_records", + wantErr: errInvalidRecords, + }, + "WithSomeInvalidRecords": { + filename: "some_invalid_records", + wantResourceCount: 1, + wantLogCount: 2, + }, + "WithMultipleResources": { + filename: "multiple_resources", + wantResourceCount: 3, + wantLogCount: 6, + }, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + record, err := os.ReadFile(filepath.Join(".", "testdata", testCase.filename)) + require.NoError(t, err) + + compressedRecord, err := compression.Zip(record) + require.NoError(t, err) + records := [][]byte{compressedRecord} + + got, err := unmarshaler.Unmarshal(records) + if testCase.wantErr != nil { + require.Error(t, err) + require.Equal(t, testCase.wantErr, err) + } else { + require.NoError(t, err) + require.NotNil(t, got) + require.Equal(t, testCase.wantResourceCount, got.ResourceLogs().Len()) + gotLogCount := 0 + for i := 0; i < got.ResourceLogs().Len(); i++ { + rm := got.ResourceLogs().At(i) + require.Equal(t, 1, rm.ScopeLogs().Len()) + ilm := rm.ScopeLogs().At(0) + gotLogCount += ilm.LogRecords().Len() + } + require.Equal(t, testCase.wantLogCount, gotLogCount) + } + }) + } +} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go index 8a61bb1934a5..0ffb4b0a80e8 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go @@ -4,6 +4,7 @@ package unmarshaler // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" import ( + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -15,3 +16,12 @@ type MetricsUnmarshaler interface { // Type of the serialized messages. Type() string } + +// LogsUnmarshaler deserializes the message body +type LogsUnmarshaler interface { + // Unmarshal deserializes the records into logs. + Unmarshal(records [][]byte) (plog.Logs, error) + + // Type of the serialized messages. + Type() string +} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go new file mode 100644 index 000000000000..3ba9caf6056a --- /dev/null +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package unmarshalertest // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest" + +import ( + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" +) + +// NopLogsUnmarshaler is a LogsUnmarshaler that doesn't do anything +// with the inputs and just returns the logs and error passed in. +type NopLogsUnmarshaler struct { + logs plog.Logs + err error +} + +var _ unmarshaler.LogsUnmarshaler = (*NopLogsUnmarshaler)(nil) + +// NewNopMetrics provides a nop logs unmarshaler with the default +// plog.Logs and no error. +func NewNopLogs() *NopLogsUnmarshaler { + return &NopLogsUnmarshaler{} +} + +// NewWithMetrics provides a nop logs unmarshaler with the passed +// in logs as the result of the Unmarshal and no error. +func NewWithLogs(logs plog.Logs) *NopLogsUnmarshaler { + return &NopLogsUnmarshaler{logs: logs} +} + +// NewErrLogs provides a nop logs unmarshaler with the passed +// in error as the Unmarshal error. +func NewErrLogs(err error) *NopLogsUnmarshaler { + return &NopLogsUnmarshaler{err: err} +} + +// Unmarshal deserializes the records into logs. +func (u *NopLogsUnmarshaler) Unmarshal([][]byte) (plog.Logs, error) { + return u.logs, u.err +} + +// Type of the serialized messages. +func (u *NopLogsUnmarshaler) Type() string { + return typeStr +} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler_test.go new file mode 100644 index 000000000000..69be0c74f224 --- /dev/null +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler_test.go @@ -0,0 +1,41 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package unmarshalertest + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/plog" +) + +func TestNewNopLogs(t *testing.T) { + unmarshaler := NewNopLogs() + got, err := unmarshaler.Unmarshal(nil) + require.NoError(t, err) + require.NotNil(t, got) + require.Equal(t, typeStr, unmarshaler.Type()) +} + +func TestNewWithLogs(t *testing.T) { + logs := plog.NewLogs() + logs.ResourceLogs().AppendEmpty() + unmarshaler := NewWithLogs(logs) + got, err := unmarshaler.Unmarshal(nil) + require.NoError(t, err) + require.NotNil(t, got) + require.Equal(t, logs, got) + require.Equal(t, typeStr, unmarshaler.Type()) +} + +func TestNewErrLogs(t *testing.T) { + wantErr := fmt.Errorf("test error") + unmarshaler := NewErrLogs(wantErr) + got, err := unmarshaler.Unmarshal(nil) + require.Error(t, err) + require.Equal(t, wantErr, err) + require.NotNil(t, got) + require.Equal(t, typeStr, unmarshaler.Type()) +} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler.go similarity index 100% rename from receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_unmarshaler.go rename to receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler.go diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler_test.go similarity index 100% rename from receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_unmarshaler_test.go rename to receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler_test.go diff --git a/receiver/awsfirehosereceiver/logs_receiver.go b/receiver/awsfirehosereceiver/logs_receiver.go new file mode 100644 index 000000000000..a05d40907e53 --- /dev/null +++ b/receiver/awsfirehosereceiver/logs_receiver.go @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awsfirehosereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver" + +import ( + "context" + "net/http" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" +) + +// The logsConsumer implements the firehoseConsumer +// to use a metrics consumer and unmarshaler. +type logsConsumer struct { + // consumer passes the translated metrics on to the + // next consumer. + consumer consumer.Logs + // unmarshaler is the configured LogsUnmarshaler + // to use when processing the records. + unmarshaler unmarshaler.LogsUnmarshaler +} + +var _ firehoseConsumer = (*logsConsumer)(nil) + +// newLogsReceiver creates a new instance of the receiver +// with a logsConsumer. +func newLogsReceiver( + config *Config, + set receiver.Settings, + unmarshalers map[string]unmarshaler.LogsUnmarshaler, + nextConsumer consumer.Logs, +) (receiver.Logs, error) { + + configuredUnmarshaler := unmarshalers[config.RecordType] + if configuredUnmarshaler == nil { + return nil, errUnrecognizedRecordType + } + + mc := &logsConsumer{ + consumer: nextConsumer, + unmarshaler: configuredUnmarshaler, + } + + return &firehoseReceiver{ + settings: set, + config: config, + consumer: mc, + }, nil +} + +// Consume uses the configured unmarshaler to deserialize the records into a +// single pmetric.Metrics. If there are common attributes available, then it will +// attach those to each of the pcommon.Resources. It will send the final result +// to the next consumer. +func (mc *logsConsumer) Consume(ctx context.Context, records [][]byte, commonAttributes map[string]string) (int, error) { + md, err := mc.unmarshaler.Unmarshal(records) + if err != nil { + return http.StatusBadRequest, err + } + + if commonAttributes != nil { + for i := 0; i < md.ResourceLogs().Len(); i++ { + rm := md.ResourceLogs().At(i) + for k, v := range commonAttributes { + if _, found := rm.Resource().Attributes().Get(k); !found { + rm.Resource().Attributes().PutStr(k, v) + } + } + } + } + + err = mc.consumer.ConsumeLogs(ctx, md) + if err != nil { + return http.StatusInternalServerError, err + } + return http.StatusOK, nil +} diff --git a/receiver/awsfirehosereceiver/logs_receiver_test.go b/receiver/awsfirehosereceiver/logs_receiver_test.go new file mode 100644 index 000000000000..c44180bba844 --- /dev/null +++ b/receiver/awsfirehosereceiver/logs_receiver_test.go @@ -0,0 +1,121 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package awsfirehosereceiver + +import ( + "context" + "errors" + "go.opentelemetry.io/collector/pdata/plog" + "net/http" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest" +) + +type logsRecordConsumer struct { + result plog.Logs +} + +var _ consumer.Logs = (*logsRecordConsumer)(nil) + +func (rc *logsRecordConsumer) ConsumeLogs(_ context.Context, logs plog.Logs) error { + rc.result = logs + return nil +} + +func (rc *logsRecordConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +func TestNewLogsReceiver(t *testing.T) { + testCases := map[string]struct { + consumer consumer.Metrics + recordType string + wantErr error + }{ + "WithInvalidRecordType": { + consumer: consumertest.NewNop(), + recordType: "test", + wantErr: errUnrecognizedRecordType, + }, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.RecordType = testCase.recordType + got, err := newMetricsReceiver( + cfg, + receivertest.NewNopSettings(), + defaultMetricsUnmarshalers(zap.NewNop()), + testCase.consumer, + ) + require.Equal(t, testCase.wantErr, err) + if testCase.wantErr == nil { + require.NotNil(t, got) + } else { + require.Nil(t, got) + } + }) + } +} + +func TestLogsConsumer(t *testing.T) { + testErr := errors.New("test error") + testCases := map[string]struct { + unmarshalerErr error + consumerErr error + wantStatus int + wantErr error + }{ + "WithUnmarshalerError": { + unmarshalerErr: testErr, + wantStatus: http.StatusBadRequest, + wantErr: testErr, + }, + "WithConsumerError": { + consumerErr: testErr, + wantStatus: http.StatusInternalServerError, + wantErr: testErr, + }, + "WithNoError": { + wantStatus: http.StatusOK, + }, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + mc := &logsConsumer{ + unmarshaler: unmarshalertest.NewErrLogs(testCase.unmarshalerErr), + consumer: consumertest.NewErr(testCase.consumerErr), + } + gotStatus, gotErr := mc.Consume(context.TODO(), nil, nil) + require.Equal(t, testCase.wantStatus, gotStatus) + require.Equal(t, testCase.wantErr, gotErr) + }) + } + + t.Run("WithCommonAttributes", func(t *testing.T) { + base := plog.NewLogs() + base.ResourceLogs().AppendEmpty() + rc := logsRecordConsumer{} + mc := &logsConsumer{ + unmarshaler: unmarshalertest.NewWithLogs(base), + consumer: &rc, + } + gotStatus, gotErr := mc.Consume(context.TODO(), nil, map[string]string{ + "CommonAttributes": "Test", + }) + require.Equal(t, http.StatusOK, gotStatus) + require.NoError(t, gotErr) + gotRms := rc.result.ResourceLogs() + require.Equal(t, 1, gotRms.Len()) + gotRm := gotRms.At(0) + require.Equal(t, 1, gotRm.Resource().Attributes().Len()) + }) +} diff --git a/receiver/awsfirehosereceiver/metadata.yaml b/receiver/awsfirehosereceiver/metadata.yaml index 46944e8af8f4..b712ca7e25fd 100644 --- a/receiver/awsfirehosereceiver/metadata.yaml +++ b/receiver/awsfirehosereceiver/metadata.yaml @@ -3,7 +3,7 @@ type: awsfirehose status: class: receiver stability: - alpha: [metrics] + alpha: [metrics, logs] distributions: [contrib] codeowners: active: [Aneurysm9] diff --git a/receiver/awsfirehosereceiver/testdata/cwlogs_config.yaml b/receiver/awsfirehosereceiver/testdata/cwlogs_config.yaml new file mode 100644 index 000000000000..9ec04ad0a171 --- /dev/null +++ b/receiver/awsfirehosereceiver/testdata/cwlogs_config.yaml @@ -0,0 +1,7 @@ +awsfirehose: + endpoint: 0.0.0.0:4433 + record_type: cwlogs + access_key: "some_access_key" + tls: + cert_file: server.crt + key_file: server.key diff --git a/receiver/awsfirehosereceiver/testdata/config.yaml b/receiver/awsfirehosereceiver/testdata/cwmetrics_config.yaml similarity index 100% rename from receiver/awsfirehosereceiver/testdata/config.yaml rename to receiver/awsfirehosereceiver/testdata/cwmetrics_config.yaml diff --git a/receiver/awsfirehosereceiver/testdata/invalid_config.yaml b/receiver/awsfirehosereceiver/testdata/invalid_config.yaml new file mode 100644 index 000000000000..17ad1316b7cd --- /dev/null +++ b/receiver/awsfirehosereceiver/testdata/invalid_config.yaml @@ -0,0 +1,3 @@ +awsfirehose: + endpoint: 0.0.0.0:4433 + record_type: invalid \ No newline at end of file From b4b04dcab52955fe291f99b56766203278157366 Mon Sep 17 00:00:00 2001 From: Gavin Cabbage <gavin.cabbage@servicenow.com> Date: Tue, 1 Oct 2024 13:21:52 -0400 Subject: [PATCH 2/7] [awsfirehosereceiver/cwlogs] Refactor default record type logic; PR feedback --- receiver/awsfirehosereceiver/config.go | 8 +++++--- receiver/awsfirehosereceiver/factory.go | 6 ++---- receiver/awsfirehosereceiver/factory_test.go | 3 ++- .../awsfirehosereceiver/generated_component_test.go | 12 ++++++------ .../unmarshaler/cwlog/compression/compression.go | 2 +- .../internal/unmarshaler/cwlog/cwlog.go | 6 +++--- .../internal/unmarshaler/cwlog/logsbuilder.go | 4 ++-- .../internal/unmarshaler/cwlog/unmarshaler.go | 8 ++++---- receiver/awsfirehosereceiver/logs_receiver.go | 9 ++++++++- receiver/awsfirehosereceiver/logs_receiver_test.go | 8 ++++---- receiver/awsfirehosereceiver/metrics_receiver.go | 9 ++++++++- receiver/awsfirehosereceiver/receiver_test.go | 8 ++------ 12 files changed, 47 insertions(+), 36 deletions(-) diff --git a/receiver/awsfirehosereceiver/config.go b/receiver/awsfirehosereceiver/config.go index e6de4c2e50ba..784e0af2501b 100644 --- a/receiver/awsfirehosereceiver/config.go +++ b/receiver/awsfirehosereceiver/config.go @@ -30,8 +30,10 @@ func (c *Config) Validate() error { if c.Endpoint == "" { return errors.New("must specify endpoint") } - if c.RecordType == "" { - return errors.New("must specify record type") + // If a record type is specified, it must be valid. + // An empty string is acceptable, however, because it will use a telemetry-type-specific default. + if c.RecordType != "" { + return validateRecordType(c.RecordType) } - return validateRecordType(c.RecordType) + return nil } diff --git a/receiver/awsfirehosereceiver/factory.go b/receiver/awsfirehosereceiver/factory.go index 3b781b6b040c..d2cd934a23c3 100644 --- a/receiver/awsfirehosereceiver/factory.go +++ b/receiver/awsfirehosereceiver/factory.go @@ -21,9 +21,8 @@ import ( ) const ( - defaultRecordType = cwmetricstream.TypeStr - defaultEndpoint = "0.0.0.0:4433" - defaultPort = 4433 + defaultEndpoint = "0.0.0.0:4433" + defaultPort = 4433 ) var ( @@ -74,7 +73,6 @@ func defaultLogsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.LogsUnma // to port 8443 and the record type set to the CloudWatch metric stream. func createDefaultConfig() component.Config { return &Config{ - RecordType: defaultRecordType, ServerConfig: confighttp.ServerConfig{ Endpoint: localhostgate.EndpointForPort(defaultPort), }, diff --git a/receiver/awsfirehosereceiver/factory_test.go b/receiver/awsfirehosereceiver/factory_test.go index 68914e75a29a..b5e4e4d68bb3 100644 --- a/receiver/awsfirehosereceiver/factory_test.go +++ b/receiver/awsfirehosereceiver/factory_test.go @@ -41,6 +41,7 @@ func TestCreateLogsReceiver(t *testing.T) { } func TestValidateRecordType(t *testing.T) { - require.NoError(t, validateRecordType(defaultRecordType)) + require.NoError(t, validateRecordType(defaultMetricsRecordType)) + require.NoError(t, validateRecordType(defaultLogsRecordType)) require.Error(t, validateRecordType("nop")) } diff --git a/receiver/awsfirehosereceiver/generated_component_test.go b/receiver/awsfirehosereceiver/generated_component_test.go index 96dbbd3e427c..33b5a8add994 100644 --- a/receiver/awsfirehosereceiver/generated_component_test.go +++ b/receiver/awsfirehosereceiver/generated_component_test.go @@ -53,21 +53,21 @@ func TestComponentLifecycle(t *testing.T) { require.NoError(t, err) require.NoError(t, sub.Unmarshal(&cfg)) - for _, tt := range tests { - t.Run(tt.name+"-shutdown", func(t *testing.T) { - c, err := tt.createFn(context.Background(), receivertest.NewNopSettings(), cfg) + for _, test := range tests { + t.Run(test.name+"-shutdown", func(t *testing.T) { + c, err := test.createFn(context.Background(), receivertest.NewNopSettings(), cfg) require.NoError(t, err) err = c.Shutdown(context.Background()) require.NoError(t, err) }) - t.Run(tt.name+"-lifecycle", func(t *testing.T) { - firstRcvr, err := tt.createFn(context.Background(), receivertest.NewNopSettings(), cfg) + t.Run(test.name+"-lifecycle", func(t *testing.T) { + firstRcvr, err := test.createFn(context.Background(), receivertest.NewNopSettings(), cfg) require.NoError(t, err) host := componenttest.NewNopHost() require.NoError(t, err) require.NoError(t, firstRcvr.Start(context.Background(), host)) require.NoError(t, firstRcvr.Shutdown(context.Background())) - secondRcvr, err := tt.createFn(context.Background(), receivertest.NewNopSettings(), cfg) + secondRcvr, err := test.createFn(context.Background(), receivertest.NewNopSettings(), cfg) require.NoError(t, err) require.NoError(t, secondRcvr.Start(context.Background(), host)) require.NoError(t, secondRcvr.Shutdown(context.Background())) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go index d426a4753df4..67829bc29c35 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package compression +package compression // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression" import ( "bytes" diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/cwlog.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/cwlog.go index 3ccd591e09d7..1ab85509873a 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/cwlog.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/cwlog.go @@ -1,16 +1,16 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package cwlog +package cwlog // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog" -type cwLog struct { +type cWLog struct { MessageType string `json:"messageType"` Owner string `json:"owner"` LogGroup string `json:"logGroup"` LogStream string `json:"logStream"` SubscriptionFilters []string `json:"subscriptionFilters"` LogEvents []struct { - Id string `json:"id"` + ID string `json:"id"` Timestamp int64 `json:"timestamp"` Message string `json:"message"` } `json:"logEvents"` diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go index 7f21669a85d4..cb7e919ffbd3 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package cwlog +package cwlog // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog" import ( "go.opentelemetry.io/collector/pdata/pcommon" @@ -41,7 +41,7 @@ func newResourceLogsBuilder(logs plog.Logs, attrs resourceAttributes) *resourceL // AddLog events to the LogRecordSlice. Resource attributes are captured when creating // the resourceLogsBuilder, so we only need to consider the LogEvents themselves. -func (rlb *resourceLogsBuilder) AddLog(log cwLog) { +func (rlb *resourceLogsBuilder) AddLog(log cWLog) { for _, event := range log.LogEvents { logLine := rlb.rls.AppendEmpty() logLine.SetTimestamp(pcommon.Timestamp(event.Timestamp)) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go index f862919384e0..e837257b906e 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package cwlog // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream" +package cwlog // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog" import ( "bytes" @@ -37,7 +37,7 @@ func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { } // Unmarshal deserializes the records into cWMetrics and uses the -// resourceMetricsBuilder to group them into a single pmetric.Metrics. +// resourceMetricsBuilder to group them into a single plog.Logs. // Skips invalid cWMetrics received in the record and func (u Unmarshaler) Unmarshal(records [][]byte) (plog.Logs, error) { md := plog.NewLogs() @@ -54,7 +54,7 @@ func (u Unmarshaler) Unmarshal(records [][]byte) (plog.Logs, error) { // Multiple logs in each record separated by newline character for datumIndex, datum := range bytes.Split(record, []byte(recordDelimiter)) { if len(datum) > 0 { - var log cwLog + var log cWLog err := json.Unmarshal(datum, &log) if err != nil { u.logger.Error( @@ -97,7 +97,7 @@ func (u Unmarshaler) Unmarshal(records [][]byte) (plog.Logs, error) { } // isValid validates that the cWMetric has been unmarshalled correctly. -func (u Unmarshaler) isValid(log cwLog) bool { +func (u Unmarshaler) isValid(log cWLog) bool { return log.Owner != "" && log.LogGroup != "" && log.LogStream != "" } diff --git a/receiver/awsfirehosereceiver/logs_receiver.go b/receiver/awsfirehosereceiver/logs_receiver.go index a05d40907e53..96e679553569 100644 --- a/receiver/awsfirehosereceiver/logs_receiver.go +++ b/receiver/awsfirehosereceiver/logs_receiver.go @@ -11,8 +11,11 @@ import ( "go.opentelemetry.io/collector/receiver" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog" ) +const defaultLogsRecordType = cwlog.TypeStr + // The logsConsumer implements the firehoseConsumer // to use a metrics consumer and unmarshaler. type logsConsumer struct { @@ -35,7 +38,11 @@ func newLogsReceiver( nextConsumer consumer.Logs, ) (receiver.Logs, error) { - configuredUnmarshaler := unmarshalers[config.RecordType] + recordType := config.RecordType + if recordType == "" { + recordType = defaultLogsRecordType + } + configuredUnmarshaler := unmarshalers[recordType] if configuredUnmarshaler == nil { return nil, errUnrecognizedRecordType } diff --git a/receiver/awsfirehosereceiver/logs_receiver_test.go b/receiver/awsfirehosereceiver/logs_receiver_test.go index c44180bba844..da448640ddb4 100644 --- a/receiver/awsfirehosereceiver/logs_receiver_test.go +++ b/receiver/awsfirehosereceiver/logs_receiver_test.go @@ -6,13 +6,13 @@ package awsfirehosereceiver import ( "context" "errors" - "go.opentelemetry.io/collector/pdata/plog" "net/http" "testing" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" @@ -36,7 +36,7 @@ func (rc *logsRecordConsumer) Capabilities() consumer.Capabilities { func TestNewLogsReceiver(t *testing.T) { testCases := map[string]struct { - consumer consumer.Metrics + consumer consumer.Logs recordType string wantErr error }{ @@ -50,10 +50,10 @@ func TestNewLogsReceiver(t *testing.T) { t.Run(name, func(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.RecordType = testCase.recordType - got, err := newMetricsReceiver( + got, err := newLogsReceiver( cfg, receivertest.NewNopSettings(), - defaultMetricsUnmarshalers(zap.NewNop()), + defaultLogsUnmarshalers(zap.NewNop()), testCase.consumer, ) require.Equal(t, testCase.wantErr, err) diff --git a/receiver/awsfirehosereceiver/metrics_receiver.go b/receiver/awsfirehosereceiver/metrics_receiver.go index 86626b803dcf..db24595ccc48 100644 --- a/receiver/awsfirehosereceiver/metrics_receiver.go +++ b/receiver/awsfirehosereceiver/metrics_receiver.go @@ -11,8 +11,11 @@ import ( "go.opentelemetry.io/collector/receiver" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream" ) +const defaultMetricsRecordType = cwmetricstream.TypeStr + // The metricsConsumer implements the firehoseConsumer // to use a metrics consumer and unmarshaler. type metricsConsumer struct { @@ -35,7 +38,11 @@ func newMetricsReceiver( nextConsumer consumer.Metrics, ) (receiver.Metrics, error) { - configuredUnmarshaler := unmarshalers[config.RecordType] + recordType := config.RecordType + if recordType == "" { + recordType = defaultMetricsRecordType + } + configuredUnmarshaler := unmarshalers[recordType] if configuredUnmarshaler == nil { return nil, errUnrecognizedRecordType } diff --git a/receiver/awsfirehosereceiver/receiver_test.go b/receiver/awsfirehosereceiver/receiver_test.go index 1ef5bdf4d354..80f4244ffbe3 100644 --- a/receiver/awsfirehosereceiver/receiver_test.go +++ b/receiver/awsfirehosereceiver/receiver_test.go @@ -57,9 +57,7 @@ func TestStart(t *testing.T) { } for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - cfg := &Config{ - RecordType: defaultRecordType, - } + cfg := &Config{} ctx := context.TODO() r := testFirehoseReceiver(cfg, nil) got := r.Start(ctx, testCase.host) @@ -76,7 +74,6 @@ func TestStart(t *testing.T) { require.NoError(t, listener.Close()) }) cfg := &Config{ - RecordType: defaultRecordType, ServerConfig: confighttp.ServerConfig{ Endpoint: listener.Addr().String(), }, @@ -95,8 +92,7 @@ func TestFirehoseRequest(t *testing.T) { defaultConsumer := newNopFirehoseConsumer(http.StatusOK, nil) firehoseConsumerErr := errors.New("firehose consumer error") cfg := &Config{ - RecordType: defaultRecordType, - AccessKey: testFirehoseAccessKey, + AccessKey: testFirehoseAccessKey, } var noRecords []firehoseRecord testCases := map[string]struct { From dc50db24c9041432f2e92001d56fd7a241e494e1 Mon Sep 17 00:00:00 2001 From: Gavin Cabbage <gavin.cabbage@servicenow.com> Date: Tue, 1 Oct 2024 13:25:47 -0400 Subject: [PATCH 3/7] [awsfirehosereceiver/cwlogs] Correct comments --- .../unmarshaler/unmarshalertest/nop_logs_unmarshaler.go | 6 +++--- receiver/awsfirehosereceiver/logs_receiver.go | 7 +++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go index 3ba9caf6056a..79f29caecfdb 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go @@ -5,7 +5,7 @@ package unmarshalertest // import "github.com/open-telemetry/opentelemetry-colle import ( "go.opentelemetry.io/collector/pdata/plog" - + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" ) @@ -18,13 +18,13 @@ type NopLogsUnmarshaler struct { var _ unmarshaler.LogsUnmarshaler = (*NopLogsUnmarshaler)(nil) -// NewNopMetrics provides a nop logs unmarshaler with the default +// NewNopLogs provides a nop logs unmarshaler with the default // plog.Logs and no error. func NewNopLogs() *NopLogsUnmarshaler { return &NopLogsUnmarshaler{} } -// NewWithMetrics provides a nop logs unmarshaler with the passed +// NewWithLogs provides a nop logs unmarshaler with the passed // in logs as the result of the Unmarshal and no error. func NewWithLogs(logs plog.Logs) *NopLogsUnmarshaler { return &NopLogsUnmarshaler{logs: logs} diff --git a/receiver/awsfirehosereceiver/logs_receiver.go b/receiver/awsfirehosereceiver/logs_receiver.go index 96e679553569..4d4b4adcf7fa 100644 --- a/receiver/awsfirehosereceiver/logs_receiver.go +++ b/receiver/awsfirehosereceiver/logs_receiver.go @@ -17,9 +17,9 @@ import ( const defaultLogsRecordType = cwlog.TypeStr // The logsConsumer implements the firehoseConsumer -// to use a metrics consumer and unmarshaler. +// to use a logs consumer and unmarshaler. type logsConsumer struct { - // consumer passes the translated metrics on to the + // consumer passes the translated logs on to the // next consumer. consumer consumer.Logs // unmarshaler is the configured LogsUnmarshaler @@ -60,8 +60,7 @@ func newLogsReceiver( } // Consume uses the configured unmarshaler to deserialize the records into a -// single pmetric.Metrics. If there are common attributes available, then it will -// attach those to each of the pcommon.Resources. It will send the final result +// single plog.Logs. It will send the final result // to the next consumer. func (mc *logsConsumer) Consume(ctx context.Context, records [][]byte, commonAttributes map[string]string) (int, error) { md, err := mc.unmarshaler.Unmarshal(records) From 12a18292b2c787ab4f986b6dd4cf2448b00a5fb2 Mon Sep 17 00:00:00 2001 From: Gavin Cabbage <gavin.cabbage@servicenow.com> Date: Tue, 1 Oct 2024 14:53:24 -0400 Subject: [PATCH 4/7] [awsfirehosereceiver/cwlogs] Remove unused constants --- .../internal/unmarshaler/cwlog/logsbuilder.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go index cb7e919ffbd3..80774aa77413 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go @@ -9,11 +9,6 @@ import ( conventions "go.opentelemetry.io/collector/semconv/v1.6.1" ) -const ( - attributeCloudwatchLogGroupName = "cloudwatch.log.group.name" - attributeCloudwatchLogStreamName = "cloudwatch.log.stream.name" -) - // resourceAttributes are the CloudWatch log attributes that define a unique resource. type resourceAttributes struct { owner, logGroup, logStream string From de1dca360faa6d740d49479d18474e23bf89a238 Mon Sep 17 00:00:00 2001 From: Gavin Cabbage <gavin.cabbage@servicenow.com> Date: Tue, 1 Oct 2024 15:08:37 -0400 Subject: [PATCH 5/7] [awsfirehosereceiver/cwlogs] Remove unconventional common attributes --- .../internal/unmarshaler/cwlog/logsbuilder.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go index 80774aa77413..e5329c0896e9 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go @@ -23,8 +23,6 @@ type resourceLogsBuilder struct { func (ra *resourceAttributes) setAttributes(resource pcommon.Resource) { attrs := resource.Attributes() attrs.PutStr(conventions.AttributeCloudAccountID, ra.owner) - attrs.PutStr("cloudwatch.log.group.name", ra.logStream) - attrs.PutStr("cloudwatch.log.stream", ra.logGroup) } // newResourceLogsBuilder to capture logs for the Resource defined by the provided attributes. From 61cb91b5b3cec95d464f5642051111c5046dcb05 Mon Sep 17 00:00:00 2001 From: Gavin Cabbage <gavin.cabbage@servicenow.com> Date: Wed, 2 Oct 2024 14:29:54 -0400 Subject: [PATCH 6/7] [awsfirehosereceiver/cwlogs] Run make generate --- .../awsfirehosereceiver/generated_component_test.go | 12 ++++++------ .../awsfirehosereceiver/generated_package_test.go | 3 ++- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/receiver/awsfirehosereceiver/generated_component_test.go b/receiver/awsfirehosereceiver/generated_component_test.go index 33b5a8add994..96dbbd3e427c 100644 --- a/receiver/awsfirehosereceiver/generated_component_test.go +++ b/receiver/awsfirehosereceiver/generated_component_test.go @@ -53,21 +53,21 @@ func TestComponentLifecycle(t *testing.T) { require.NoError(t, err) require.NoError(t, sub.Unmarshal(&cfg)) - for _, test := range tests { - t.Run(test.name+"-shutdown", func(t *testing.T) { - c, err := test.createFn(context.Background(), receivertest.NewNopSettings(), cfg) + for _, tt := range tests { + t.Run(tt.name+"-shutdown", func(t *testing.T) { + c, err := tt.createFn(context.Background(), receivertest.NewNopSettings(), cfg) require.NoError(t, err) err = c.Shutdown(context.Background()) require.NoError(t, err) }) - t.Run(test.name+"-lifecycle", func(t *testing.T) { - firstRcvr, err := test.createFn(context.Background(), receivertest.NewNopSettings(), cfg) + t.Run(tt.name+"-lifecycle", func(t *testing.T) { + firstRcvr, err := tt.createFn(context.Background(), receivertest.NewNopSettings(), cfg) require.NoError(t, err) host := componenttest.NewNopHost() require.NoError(t, err) require.NoError(t, firstRcvr.Start(context.Background(), host)) require.NoError(t, firstRcvr.Shutdown(context.Background())) - secondRcvr, err := test.createFn(context.Background(), receivertest.NewNopSettings(), cfg) + secondRcvr, err := tt.createFn(context.Background(), receivertest.NewNopSettings(), cfg) require.NoError(t, err) require.NoError(t, secondRcvr.Start(context.Background(), host)) require.NoError(t, secondRcvr.Shutdown(context.Background())) diff --git a/receiver/awsfirehosereceiver/generated_package_test.go b/receiver/awsfirehosereceiver/generated_package_test.go index e7de7aee2b8b..4de8a57d127d 100644 --- a/receiver/awsfirehosereceiver/generated_package_test.go +++ b/receiver/awsfirehosereceiver/generated_package_test.go @@ -3,8 +3,9 @@ package awsfirehosereceiver import ( - "go.uber.org/goleak" "testing" + + "go.uber.org/goleak" ) func TestMain(m *testing.M) { From f94f35b2682773a56d3a4c520b6833d3a27bb968 Mon Sep 17 00:00:00 2001 From: Gavin Cabbage <gavin.cabbage@servicenow.com> Date: Wed, 2 Oct 2024 14:48:40 -0400 Subject: [PATCH 7/7] [awsfirehosereceiver/cwlogs] Doc fixes; add additional common attributes --- .../internal/unmarshaler/cwlog/compression/compression.go | 2 ++ .../internal/unmarshaler/cwlog/logsbuilder.go | 7 +++++++ .../internal/unmarshaler/cwlog/unmarshaler.go | 8 ++++---- receiver/awsfirehosereceiver/logs_receiver.go | 2 +- 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go index 67829bc29c35..2ebca77861dd 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go @@ -8,6 +8,7 @@ import ( "compress/gzip" ) +// Zip returns a gzip-compressed representation of the input bytes. func Zip(data []byte) ([]byte, error) { var b bytes.Buffer w := gzip.NewWriter(&b) @@ -28,6 +29,7 @@ func Zip(data []byte) ([]byte, error) { return b.Bytes(), nil } +// Unzip expects gzip-compressed input bytes and returns their uncompressed form. func Unzip(data []byte) ([]byte, error) { b := bytes.NewBuffer(data) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go index e5329c0896e9..eae5902b5ea1 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go @@ -9,6 +9,11 @@ import ( conventions "go.opentelemetry.io/collector/semconv/v1.6.1" ) +const ( + attributeAWSCloudWatchLogGroupName = "aws.cloudwatch.log_group_name" + attributeAWSCloudWatchLogStreamName = "aws.cloudwatch.log_stream_name" +) + // resourceAttributes are the CloudWatch log attributes that define a unique resource. type resourceAttributes struct { owner, logGroup, logStream string @@ -23,6 +28,8 @@ type resourceLogsBuilder struct { func (ra *resourceAttributes) setAttributes(resource pcommon.Resource) { attrs := resource.Attributes() attrs.PutStr(conventions.AttributeCloudAccountID, ra.owner) + attrs.PutStr(attributeAWSCloudWatchLogGroupName, ra.logGroup) + attrs.PutStr(attributeAWSCloudWatchLogStreamName, ra.logStream) } // newResourceLogsBuilder to capture logs for the Resource defined by the provided attributes. diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go index e837257b906e..43b5568e7854 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go @@ -36,9 +36,9 @@ func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { return &Unmarshaler{logger} } -// Unmarshal deserializes the records into cWMetrics and uses the -// resourceMetricsBuilder to group them into a single plog.Logs. -// Skips invalid cWMetrics received in the record and +// Unmarshal deserializes the records into cWLogs and uses the +// resourceLogsBuilder to group them into a single plog.Logs. +// Skips invalid cWLogs received in the record and func (u Unmarshaler) Unmarshal(records [][]byte) (plog.Logs, error) { md := plog.NewLogs() builders := make(map[resourceAttributes]*resourceLogsBuilder) @@ -96,7 +96,7 @@ func (u Unmarshaler) Unmarshal(records [][]byte) (plog.Logs, error) { return md, nil } -// isValid validates that the cWMetric has been unmarshalled correctly. +// isValid validates that the cWLog has been unmarshalled correctly. func (u Unmarshaler) isValid(log cWLog) bool { return log.Owner != "" && log.LogGroup != "" && log.LogStream != "" } diff --git a/receiver/awsfirehosereceiver/logs_receiver.go b/receiver/awsfirehosereceiver/logs_receiver.go index 4d4b4adcf7fa..a7b8c0628525 100644 --- a/receiver/awsfirehosereceiver/logs_receiver.go +++ b/receiver/awsfirehosereceiver/logs_receiver.go @@ -16,7 +16,7 @@ import ( const defaultLogsRecordType = cwlog.TypeStr -// The logsConsumer implements the firehoseConsumer +// logsConsumer implements the firehoseConsumer // to use a logs consumer and unmarshaler. type logsConsumer struct { // consumer passes the translated logs on to the