Skip to content

Commit

Permalink
Create a extra sampled logger as part of the telemetry settings
Browse files Browse the repository at this point in the history
  • Loading branch information
antonjim-te committed Sep 6, 2023
1 parent 310b747 commit 9540c55
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 34 deletions.
25 changes: 25 additions & 0 deletions .chloggen/SampledLoggerTelemetry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service/telemetry exporter/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "New sampled logger in the telemetry configuration used to avoid flooding the logs with messages that are repeated frequently."

# One or more tracking issues or pull requests related to the change
issues: [8134]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: The sampled logger is configured by `LogsSamplingConfig` (`service.telemetry.logs.sampling`).

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions component/componenttest/nop_telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
func NewNopTelemetrySettings() component.TelemetrySettings {
return component.TelemetrySettings{
Logger: zap.NewNop(),
SampledLogger: zap.NewNop(),
TracerProvider: trace.NewNoopTracerProvider(),
MeterProvider: noop.NewMeterProvider(),
MetricsLevel: configtelemetry.LevelNone,
Expand Down
4 changes: 4 additions & 0 deletions component/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type TelemetrySettings struct {
// component to be used later as well.
Logger *zap.Logger

// SampledLogger passed to the created component.
// It will be used to avoid flooding the logs with messages that are repeated frequently.
SampledLogger *zap.Logger

// TracerProvider that the factory can pass to other instrumented third-party libraries.
TracerProvider trace.TracerProvider

Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo
return nil, err
}

be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queue, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queue, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.SampledLogger)
be.sender = be.qrSender
be.StartFunc = func(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
Expand Down
23 changes: 1 addition & 22 deletions exporter/exporterhelper/queued_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
Expand Down Expand Up @@ -78,9 +77,8 @@ type queuedRetrySender struct {
}

func newQueuedRetrySender(id component.ID, signal component.DataType, queue internal.ProducerConsumerQueue,
rCfg RetrySettings, nextSender requestSender, logger *zap.Logger) *queuedRetrySender {
rCfg RetrySettings, nextSender requestSender, sampledLogger *zap.Logger) *queuedRetrySender {
retryStopCh := make(chan struct{})
sampledLogger := createSampledLogger(logger)
traceAttr := attribute.String(obsmetrics.ExporterKey, id.String())

qrs := &queuedRetrySender{
Expand Down Expand Up @@ -217,25 +215,6 @@ func NewDefaultRetrySettings() RetrySettings {
}
}

func createSampledLogger(logger *zap.Logger) *zap.Logger {
if logger.Core().Enabled(zapcore.DebugLevel) {
// Debugging is enabled. Don't do any sampling.
return logger
}

// Create a logger that samples all messages to 1 per 10 seconds initially,
// and 1/100 of messages after that.
opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewSamplerWithOptions(
core,
10*time.Second,
1,
100,
)
})
return logger.WithOptions(opts)
}

// send implements the requestSender interface
func (qrs *queuedRetrySender) send(req internal.Request) error {
if qrs.queue == nil {
Expand Down
1 change: 1 addition & 0 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {

srv.telemetrySettings = component.TelemetrySettings{
Logger: srv.telemetry.Logger(),
SampledLogger: srv.telemetry.SampledLogger(),
TracerProvider: srv.telemetry.TracerProvider(),
MeterProvider: noop.NewMeterProvider(),
MetricsLevel: cfg.Telemetry.Metrics.Level,
Expand Down
13 changes: 13 additions & 0 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,19 @@ func TestNilCollectorEffectiveConfig(t *testing.T) {
require.NoError(t, srv.Shutdown(context.Background()))
}

func TestServiceTelemetryLoggers(t *testing.T) {
srv, err := New(context.Background(), newNopSettings(), newNopConfig())
require.NoError(t, err)

assert.NoError(t, srv.Start(context.Background()))
t.Cleanup(func() {
assert.NoError(t, srv.Shutdown(context.Background()))
})
assert.NotNil(t, srv.telemetrySettings.Logger)
assert.NotNil(t, srv.telemetrySettings.SampledLogger)
assert.NotEqual(t, srv.telemetrySettings.Logger, srv.telemetrySettings.SampledLogger)
}

func assertResourceLabels(t *testing.T, res pcommon.Resource, expectedLabels map[string]labelValue) {
for key, labelValue := range expectedLabels {
lookupKey, ok := prometheusToOtelConv[key]
Expand Down
14 changes: 10 additions & 4 deletions service/telemetry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package telemetry // import "go.opentelemetry.io/collector/service/telemetry"

import (
"fmt"
"time"

"go.uber.org/zap/zapcore"

Expand Down Expand Up @@ -54,7 +55,11 @@ type LogsConfig struct {
// (default = false)
DisableStacktrace bool `mapstructure:"disable_stacktrace"`

// Sampling sets a sampling policy. A nil SamplingConfig disables sampling.
// Sampling sets a sampling policy for the extra sampled logger.
// Default:
// initial: 1
// thereafter: 100
// tick: 10s
Sampling *LogsSamplingConfig `mapstructure:"sampling"`

// OutputPaths is a list of URLs or file paths to write logging output to.
Expand Down Expand Up @@ -87,12 +92,13 @@ type LogsConfig struct {
InitialFields map[string]any `mapstructure:"initial_fields"`
}

// LogsSamplingConfig sets a sampling strategy for the logger. Sampling caps the
// LogsSamplingConfig sets a sampling strategy for the extra sampled logger. Sampling caps the
// global CPU and I/O load that logging puts on your process while attempting
// to preserve a representative subset of your logs.
type LogsSamplingConfig struct {
Initial int `mapstructure:"initial"`
Thereafter int `mapstructure:"thereafter"`
Initial int `mapstructure:"initial"`
Thereafter int `mapstructure:"thereafter"`
Tick time.Duration `mapstructure:"tick"`
}

// MetricsConfig exposes the common Telemetry configuration for one component.
Expand Down
39 changes: 32 additions & 7 deletions service/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package telemetry // import "go.opentelemetry.io/collector/service/telemetry"

import (
"context"
"time"

sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
Expand All @@ -15,6 +16,7 @@ import (

type Telemetry struct {
logger *zap.Logger
sampledLogger *zap.Logger
tracerProvider *sdktrace.TracerProvider
}

Expand All @@ -26,6 +28,10 @@ func (t *Telemetry) Logger() *zap.Logger {
return t.logger
}

func (t *Telemetry) SampledLogger() *zap.Logger {
return t.sampledLogger
}

func (t *Telemetry) Shutdown(ctx context.Context) error {
// TODO: Sync logger.
return multierr.Combine(
Expand All @@ -44,12 +50,15 @@ func New(_ context.Context, set Settings, cfg Config) (*Telemetry, error) {
if err != nil {
return nil, err
}
sampledLogger := newSampledLogger(cfg.Logs.Sampling, logger)

tp := sdktrace.NewTracerProvider(
// needed for supporting the zpages extension
sdktrace.WithSampler(alwaysRecord()),
)
return &Telemetry{
logger: logger,
sampledLogger: sampledLogger,
tracerProvider: tp,
}, nil
}
Expand All @@ -59,7 +68,6 @@ func newLogger(cfg LogsConfig, options []zap.Option) (*zap.Logger, error) {
zapCfg := &zap.Config{
Level: zap.NewAtomicLevelAt(cfg.Level),
Development: cfg.Development,
Sampling: toSamplingConfig(cfg.Sampling),
Encoding: cfg.Encoding,
EncoderConfig: zap.NewProductionEncoderConfig(),
OutputPaths: cfg.OutputPaths,
Expand All @@ -82,12 +90,29 @@ func newLogger(cfg LogsConfig, options []zap.Option) (*zap.Logger, error) {
return logger, nil
}

func toSamplingConfig(sc *LogsSamplingConfig) *zap.SamplingConfig {
if sc == nil {
return nil
func newSampledLogger(cfg *LogsSamplingConfig, logger *zap.Logger) *zap.Logger {
if cfg == nil {
cfg = newDefaultLogsSamplingConfig()
}
return &zap.SamplingConfig{
Initial: sc.Initial,
Thereafter: sc.Thereafter,

// Create a logger that samples all messages to "initial" per "tick" initially,
// and cfg.Initial/cfg.Thereafter of messages after that.
opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewSamplerWithOptions(
core,
cfg.Tick,
cfg.Initial,
cfg.Thereafter,
)
})
return logger.WithOptions(opts)
}

// newDefaultLogsSamplingConfig returns a default LogsSamplingConfig.
func newDefaultLogsSamplingConfig() *LogsSamplingConfig {
return &LogsSamplingConfig{
Initial: 1,
Thereafter: 100,
Tick: 10 * time.Second,
}
}

0 comments on commit 9540c55

Please sign in to comment.