Skip to content

Commit

Permalink
Capability to configure sampledLogger in the exporterHelper
Browse files Browse the repository at this point in the history
  • Loading branch information
antonjim-te committed Jul 26, 2023
1 parent 5613523 commit 4394496
Show file tree
Hide file tree
Showing 16 changed files with 103 additions and 9 deletions.
2 changes: 2 additions & 0 deletions exporter/exporterhelper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ The following configuration options can be modified:
[the batch processor](https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor/batchprocessor)
is used, the metric `batch_send_size` can be used for estimation)
- `timeout` (default = 5s): Time to wait per individual attempt to send data to a backend
- `sampled_logger`: Samples logging messages, which caps the CPU and I/O load of logging while keeping a representative subset of your logs.
- `enabled` (default = true)

The `initial_interval`, `max_interval`, `max_elapsed_time`, and `timeout` options accept
[duration strings](https://pkg.go.dev/time#ParseDuration),
Expand Down
21 changes: 19 additions & 2 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ func NewDefaultTimeoutSettings() TimeoutSettings {
}
}

// SampledLoggerSettings samples logging messages, which caps the CPU and I/O load of logging while keeping a
// representative subset of your logs.
// Its purpose is to balance between the need for comprehensive logging and the potential performance impact of logging too much data.
type SampledLoggerSettings struct {
// Enable the sampledLogger
Enabled bool `mapstructure:"enabled"`
}

// NewDefaultSampledLoggerSettings returns the default settings for SampledLoggerSettings.
func NewDefaultSampledLoggerSettings() SampledLoggerSettings {
return SampledLoggerSettings{
Enabled: true,
}
}

// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
type requestSender interface {
send(req internal.Request) error
Expand Down Expand Up @@ -64,6 +79,7 @@ type baseSettings struct {
TimeoutSettings
QueueSettings
RetrySettings
SampledLoggerSettings
}

// fromOptions returns the internal options starting from the default and applying all configured options.
Expand All @@ -74,7 +90,8 @@ func fromOptions(options ...Option) *baseSettings {
// TODO: Enable queuing by default (call DefaultQueueSettings)
QueueSettings: QueueSettings{Enabled: false},
// TODO: Enable retry by default (call DefaultRetrySettings)
RetrySettings: RetrySettings{Enabled: false},
RetrySettings: RetrySettings{Enabled: false},
SampledLoggerSettings: NewDefaultSampledLoggerSettings(),
}

for _, op := range options {
Expand Down Expand Up @@ -154,7 +171,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo
return nil, err
}

be.qrSender = newQueuedRetrySender(set.ID, signal, bs.QueueSettings, bs.RetrySettings, reqUnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.qrSender = newQueuedRetrySender(set.ID, signal, bs.QueueSettings, bs.RetrySettings, bs.SampledLoggerSettings, reqUnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.sender = be.qrSender
be.StartFunc = func(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
Expand Down
14 changes: 9 additions & 5 deletions exporter/exporterhelper/queued_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ type queuedRetrySender struct {
requestUnmarshaler internal.RequestUnmarshaler
}

func newQueuedRetrySender(id component.ID, signal component.DataType, qCfg QueueSettings, rCfg RetrySettings, reqUnmarshaler internal.RequestUnmarshaler, nextSender requestSender, logger *zap.Logger) *queuedRetrySender {
func newQueuedRetrySender(id component.ID, signal component.DataType, qCfg QueueSettings, rCfg RetrySettings, lCfg SampledLoggerSettings, reqUnmarshaler internal.RequestUnmarshaler, nextSender requestSender, logger *zap.Logger) *queuedRetrySender {
retryStopCh := make(chan struct{})
sampledLogger := createSampledLogger(logger)
newLogger := createSampledLogger(logger, lCfg)
traceAttr := attribute.String(obsmetrics.ExporterKey, id.String())

qrs := &queuedRetrySender{
Expand All @@ -95,7 +95,7 @@ func newQueuedRetrySender(id component.ID, signal component.DataType, qCfg Queue
cfg: qCfg,
retryStopCh: retryStopCh,
traceAttribute: traceAttr,
logger: sampledLogger,
logger: newLogger,
requestUnmarshaler: reqUnmarshaler,
}

Expand All @@ -104,7 +104,7 @@ func newQueuedRetrySender(id component.ID, signal component.DataType, qCfg Queue
cfg: rCfg,
nextSender: nextSender,
stopCh: retryStopCh,
logger: sampledLogger,
logger: newLogger,
// Following three functions actually depend on queuedRetrySender
onTemporaryFailure: qrs.onTemporaryFailure,
}
Expand Down Expand Up @@ -266,12 +266,16 @@ func NewDefaultRetrySettings() RetrySettings {
}
}

func createSampledLogger(logger *zap.Logger) *zap.Logger {
func createSampledLogger(logger *zap.Logger, lCfg SampledLoggerSettings) *zap.Logger {
if logger.Core().Enabled(zapcore.DebugLevel) {
// Debugging is enabled. Don't do any sampling.
return logger
}

if !lCfg.Enabled {
return logger
}

// Create a logger that samples all messages to 1 per 10 seconds initially,
// and 1/100 of messages after that.
opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core {
Expand Down
52 changes: 52 additions & 0 deletions exporter/exporterhelper/queued_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"context"
"errors"
"fmt"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -617,6 +619,56 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) {
}, time.Second, 1*time.Millisecond)
}

func TestCreateSampledLogger(t *testing.T) {
testCases := []struct {
desc string
logger *zap.Logger
lCfg SampledLoggerSettings
sampledLoggedCreated bool
}{
{
desc: "default configuration - sampledLogger enabled",
logger: zaptest.NewLogger(t, zaptest.Level(zap.WarnLevel)),
lCfg: NewDefaultSampledLoggerSettings(),
sampledLoggedCreated: true,
},
{
desc: "sampledLogger disable",
logger: zaptest.NewLogger(t, zaptest.Level(zap.WarnLevel)),
lCfg: SampledLoggerSettings{
Enabled: false,
},
sampledLoggedCreated: false,
},
{
desc: "debug logger level and sampledLogged enabled - sampledLogger not created",
logger: zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel)),
lCfg: NewDefaultSampledLoggerSettings(),
sampledLoggedCreated: false,
},
{
desc: "debug logger level and sampledLogged disabled - sampledLogger not created",
logger: zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel)),
lCfg: SampledLoggerSettings{
Enabled: false,
},
sampledLoggedCreated: false,
},
}

for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
newLogger := createSampledLogger(tC.logger, tC.lCfg)
require.NotNil(t, newLogger)
if tC.sampledLoggedCreated {
require.NotEqual(t, tC.logger, newLogger)
} else {
require.Equal(t, tC.logger, newLogger)
}
})
}
}

type mockErrorRequest struct {
baseRequest
}
Expand Down
1 change: 1 addition & 0 deletions exporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (

require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions exporter/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ github.com/aws/aws-sdk-go-v2/service/sso v1.4.2/go.mod h1:NBvT9R1MEF+Ud6ApJKM0G+
github.com/aws/aws-sdk-go-v2/service/sts v1.7.2/go.mod h1:8EzeIqfWt2wWT4rJVu3f21TfrhJ8AEMzVybRNSb/b4g=
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down
2 changes: 1 addition & 1 deletion exporter/otlpexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Several helper files are leveraged to provide additional capabilities automatica

- [gRPC settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configgrpc/README.md)
- [TLS and mTLS settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md)
- [Queuing, retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
- [Queuing, retry, sampled logger and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)

[beta]: https://github.com/open-telemetry/opentelemetry-collector#beta
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
Expand Down
3 changes: 2 additions & 1 deletion exporter/otlpexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ type Config struct {
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

configgrpc.GRPCClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
configgrpc.GRPCClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.SampledLoggerSettings `mapstructure:"sampled_logger"`
}

var _ component.Config = (*Config)(nil)
Expand Down
3 changes: 3 additions & 0 deletions exporter/otlpexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,8 @@ func TestUnmarshalConfig(t *testing.T) {
BalancerName: "round_robin",
Auth: &configauth.Authentication{AuthenticatorID: component.NewID("nop")},
},
SampledLoggerSettings: exporterhelper.SampledLoggerSettings{
Enabled: false,
},
}, cfg)
}
1 change: 1 addition & 0 deletions exporter/otlpexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func createDefaultConfig() component.Config {
// We almost read 0 bytes, so no need to tune ReadBufferSize.
WriteBufferSize: 512 * 1024,
},
SampledLoggerSettings: exporterhelper.NewDefaultSampledLoggerSettings(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions exporter/otlpexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ keepalive:
timeout: 30s
permit_without_stream: true
balancer_name: "round_robin"
sampled_logger:
enabled: false
2 changes: 2 additions & 0 deletions exporter/otlphttpexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ The following settings can be optionally configured:
- `timeout` (default = 30s): HTTP request time limit. For details see https://golang.org/pkg/net/http/#Client
- `read_buffer_size` (default = 0): ReadBufferSize for HTTP client.
- `write_buffer_size` (default = 512 * 1024): WriteBufferSize for HTTP client.
- `sampled_logger`: Samples logging messages, which caps the CPU and I/O load of logging while keeping a representative subset of your logs.
- `enabled` (default = true)

Example:

Expand Down
2 changes: 2 additions & 0 deletions exporter/otlphttpexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type Config struct {

// The URL to send logs to. If omitted the Endpoint + "/v1/logs" will be used.
LogsEndpoint string `mapstructure:"logs_endpoint"`

exporterhelper.SampledLoggerSettings `mapstructure:"sampled_logger"`
}

var _ component.Config = (*Config)(nil)
Expand Down
3 changes: 3 additions & 0 deletions exporter/otlphttpexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,8 @@ func TestUnmarshalConfig(t *testing.T) {
Timeout: time.Second * 10,
Compression: "gzip",
},
SampledLoggerSettings: exporterhelper.SampledLoggerSettings{
Enabled: false,
},
}, cfg)
}
1 change: 1 addition & 0 deletions exporter/otlphttpexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func createDefaultConfig() component.Config {
// We almost read 0 bytes, so no need to tune ReadBufferSize.
WriteBufferSize: 512 * 1024,
},
SampledLoggerSettings: exporterhelper.NewDefaultSampledLoggerSettings(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions exporter/otlphttpexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ headers:
header1: 234
another: "somevalue"
compression: gzip
sampled_logger:
enabled: false

0 comments on commit 4394496

Please sign in to comment.