diff --git a/.chloggen/fix-remotetap-limit.yaml b/.chloggen/fix-remotetap-limit.yaml new file mode 100644 index 000000000000..23b154e3b6ca --- /dev/null +++ b/.chloggen/fix-remotetap-limit.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: remotetapprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Make the `limit` configuration work properly. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32385] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The `limit` configuration was ignored previously, but now it works according to the configuration and documentation. + Nothing is required of users. + See the remotetapprocessor's `README.md` for details. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/remotetapprocessor/processor.go b/processor/remotetapprocessor/processor.go index 23de9beeebd0..ea7d89c8ab40 100644 --- a/processor/remotetapprocessor/processor.go +++ b/processor/remotetapprocessor/processor.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/collector/processor" "go.uber.org/zap" "golang.org/x/net/websocket" + "golang.org/x/time/rate" ) type wsprocessor struct { @@ -27,6 +28,7 @@ type wsprocessor struct { server *http.Server shutdownWG sync.WaitGroup cs *channelSet + limiter *rate.Limiter } var logMarshaler = &plog.JSONMarshaler{} @@ -38,6 +40,7 @@ func newProcessor(settings processor.CreateSettings, config *Config) *wsprocesso config: config, telemetrySettings: settings.TelemetrySettings, cs: newChannelSet(), + limiter: rate.NewLimiter(config.Limit, int(config.Limit)), } } @@ -98,31 +101,40 @@ func (w *wsprocessor) Shutdown(ctx context.Context) error { } func (w *wsprocessor) ConsumeMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { - b, err := metricMarshaler.MarshalMetrics(md) - if err != nil { - w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err)) - } else { - w.cs.writeBytes(b) + if w.limiter.Allow() { + b, err := metricMarshaler.MarshalMetrics(md) + if err != nil { + w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err)) + } else { + w.cs.writeBytes(b) + } } + return md, nil } func (w *wsprocessor) ConsumeLogs(_ context.Context, ld plog.Logs) (plog.Logs, error) { - b, err := logMarshaler.MarshalLogs(ld) - if err != nil { - w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err)) - } else { - w.cs.writeBytes(b) + if w.limiter.Allow() { + b, err := logMarshaler.MarshalLogs(ld) + if err != nil { + w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err)) + } else { + w.cs.writeBytes(b) + } } + return ld, nil } func (w *wsprocessor) ConsumeTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) { - b, err := traceMarshaler.MarshalTraces(td) - if err != nil { - w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err)) - } else { - w.cs.writeBytes(b) + if w.limiter.Allow() { + b, err := traceMarshaler.MarshalTraces(td) + if err != nil { + w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err)) + } else { + w.cs.writeBytes(b) + } } + return td, nil } diff --git a/processor/remotetapprocessor/processor_test.go b/processor/remotetapprocessor/processor_test.go new file mode 100644 index 000000000000..c0222a99acf2 --- /dev/null +++ b/processor/remotetapprocessor/processor_test.go @@ -0,0 +1,163 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package remotetapprocessor + +import ( + "context" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor/processortest" + "golang.org/x/time/rate" +) + +func TestConsumeMetrics(t *testing.T) { + metric := pmetric.NewMetrics() + metric.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("foo") + + cases := []struct { + name string + limit int + }{ + {name: "limit_0", limit: 0}, + {name: "limit_1", limit: 1}, + {name: "limit_10", limit: 10}, + {name: "limit_50", limit: 50}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + conf := &Config{ + Limit: rate.Limit(c.limit), + } + + processor := newProcessor(processortest.NewNopCreateSettings(), conf) + + ch := make(chan []byte) + idx := processor.cs.add(ch) + receiveNum := 0 + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for range ch { + receiveNum++ + } + }() + + for i := 0; i < c.limit*2; i++ { + // send metric to chan c.limit*2 per sec. + metric2, err := processor.ConsumeMetrics(context.Background(), metric) + assert.Nil(t, err) + assert.Equal(t, metric, metric2) + } + + processor.cs.closeAndRemove(idx) + wg.Wait() + assert.Equal(t, receiveNum, c.limit) + + }) + } +} + +func TestConsumeLogs(t *testing.T) { + log := plog.NewLogs() + log.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("foo") + + cases := []struct { + name string + limit int + }{ + {name: "limit_0", limit: 0}, + {name: "limit_1", limit: 1}, + {name: "limit_10", limit: 10}, + {name: "limit_50", limit: 50}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + conf := &Config{ + Limit: rate.Limit(c.limit), + } + + processor := newProcessor(processortest.NewNopCreateSettings(), conf) + + ch := make(chan []byte) + idx := processor.cs.add(ch) + receiveNum := 0 + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for range ch { + receiveNum++ + } + }() + + // send log to chan c.limit*2 per sec. + for i := 0; i < c.limit*2; i++ { + log2, err := processor.ConsumeLogs(context.Background(), log) + assert.Nil(t, err) + assert.Equal(t, log, log2) + } + + processor.cs.closeAndRemove(idx) + wg.Wait() + t.Log(receiveNum) + assert.Equal(t, receiveNum, c.limit) + }) + } +} + +func TestConsumeTraces(t *testing.T) { + trace := ptrace.NewTraces() + trace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetName("foo") + + cases := []struct { + name string + limit int + }{ + {name: "limit_0", limit: 0}, + {name: "limit_1", limit: 1}, + {name: "limit_10", limit: 10}, + {name: "limit_50", limit: 50}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + conf := &Config{ + Limit: rate.Limit(c.limit), + } + + processor := newProcessor(processortest.NewNopCreateSettings(), conf) + + ch := make(chan []byte) + idx := processor.cs.add(ch) + receiveNum := 0 + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for range ch { + receiveNum++ + } + }() + + for i := 0; i < c.limit*2; i++ { + // send trace to chan c.limit*2 per sec. + trace2, err := processor.ConsumeTraces(context.Background(), trace) + assert.Nil(t, err) + assert.Equal(t, trace, trace2) + } + + processor.cs.closeAndRemove(idx) + wg.Wait() + assert.Equal(t, receiveNum, c.limit) + }) + } +} diff --git a/processor/remotetapprocessor/server_test.go b/processor/remotetapprocessor/server_test.go index 66e7e1f75672..779a2353e56b 100644 --- a/processor/remotetapprocessor/server_test.go +++ b/processor/remotetapprocessor/server_test.go @@ -25,6 +25,7 @@ func TestSocketConnectionLogs(t *testing.T) { ServerConfig: confighttp.ServerConfig{ Endpoint: "localhost:12001", }, + Limit: 1, } logSink := &consumertest.LogsSink{} processor, err := NewFactory().CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, @@ -62,6 +63,7 @@ func TestSocketConnectionMetrics(t *testing.T) { ServerConfig: confighttp.ServerConfig{ Endpoint: "localhost:12002", }, + Limit: 1, } metricsSink := &consumertest.MetricsSink{} processor, err := NewFactory().CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, @@ -97,6 +99,7 @@ func TestSocketConnectionTraces(t *testing.T) { ServerConfig: confighttp.ServerConfig{ Endpoint: "localhost:12003", }, + Limit: 1, } tracesSink := &consumertest.TracesSink{} processor, err := NewFactory().CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg,