Skip to content

Commit

Permalink
[remotetapprocessor] use 'time/rate' to limit traffic (#32481)
Browse files Browse the repository at this point in the history
bug: The remotetapprocessor `limit` configure doesn't work.
how to fix: use `time/rate` to limit traffic. 

Resolves
#32385

---------

Co-authored-by: Andrzej Stencel <[email protected]>
  • Loading branch information
li-zeyuan and andrzej-stencel authored May 8, 2024
1 parent f4a3147 commit 497fed7
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 15 deletions.
30 changes: 30 additions & 0 deletions .chloggen/fix-remotetap-limit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: remotetapprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Make the `limit` configuration work properly.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32385]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The `limit` configuration was ignored previously, but now it works according to the configuration and documentation.
Nothing is required of users.
See the remotetapprocessor's `README.md` for details.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
42 changes: 27 additions & 15 deletions processor/remotetapprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.opentelemetry.io/collector/processor"
"go.uber.org/zap"
"golang.org/x/net/websocket"
"golang.org/x/time/rate"
)

type wsprocessor struct {
Expand All @@ -27,6 +28,7 @@ type wsprocessor struct {
server *http.Server
shutdownWG sync.WaitGroup
cs *channelSet
limiter *rate.Limiter
}

var logMarshaler = &plog.JSONMarshaler{}
Expand All @@ -38,6 +40,7 @@ func newProcessor(settings processor.CreateSettings, config *Config) *wsprocesso
config: config,
telemetrySettings: settings.TelemetrySettings,
cs: newChannelSet(),
limiter: rate.NewLimiter(config.Limit, int(config.Limit)),
}
}

Expand Down Expand Up @@ -98,31 +101,40 @@ func (w *wsprocessor) Shutdown(ctx context.Context) error {
}

func (w *wsprocessor) ConsumeMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
b, err := metricMarshaler.MarshalMetrics(md)
if err != nil {
w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err))
} else {
w.cs.writeBytes(b)
if w.limiter.Allow() {
b, err := metricMarshaler.MarshalMetrics(md)
if err != nil {
w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err))
} else {
w.cs.writeBytes(b)
}
}

return md, nil
}

func (w *wsprocessor) ConsumeLogs(_ context.Context, ld plog.Logs) (plog.Logs, error) {
b, err := logMarshaler.MarshalLogs(ld)
if err != nil {
w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err))
} else {
w.cs.writeBytes(b)
if w.limiter.Allow() {
b, err := logMarshaler.MarshalLogs(ld)
if err != nil {
w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err))
} else {
w.cs.writeBytes(b)
}
}

return ld, nil
}

func (w *wsprocessor) ConsumeTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) {
b, err := traceMarshaler.MarshalTraces(td)
if err != nil {
w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err))
} else {
w.cs.writeBytes(b)
if w.limiter.Allow() {
b, err := traceMarshaler.MarshalTraces(td)
if err != nil {
w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err))
} else {
w.cs.writeBytes(b)
}
}

return td, nil
}
163 changes: 163 additions & 0 deletions processor/remotetapprocessor/processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package remotetapprocessor

import (
"context"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor/processortest"
"golang.org/x/time/rate"
)

func TestConsumeMetrics(t *testing.T) {
metric := pmetric.NewMetrics()
metric.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("foo")

cases := []struct {
name string
limit int
}{
{name: "limit_0", limit: 0},
{name: "limit_1", limit: 1},
{name: "limit_10", limit: 10},
{name: "limit_50", limit: 50},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
conf := &Config{
Limit: rate.Limit(c.limit),
}

processor := newProcessor(processortest.NewNopCreateSettings(), conf)

ch := make(chan []byte)
idx := processor.cs.add(ch)
receiveNum := 0
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for range ch {
receiveNum++
}
}()

for i := 0; i < c.limit*2; i++ {
// send metric to chan c.limit*2 per sec.
metric2, err := processor.ConsumeMetrics(context.Background(), metric)
assert.Nil(t, err)
assert.Equal(t, metric, metric2)
}

processor.cs.closeAndRemove(idx)
wg.Wait()
assert.Equal(t, receiveNum, c.limit)

})
}
}

func TestConsumeLogs(t *testing.T) {
log := plog.NewLogs()
log.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("foo")

cases := []struct {
name string
limit int
}{
{name: "limit_0", limit: 0},
{name: "limit_1", limit: 1},
{name: "limit_10", limit: 10},
{name: "limit_50", limit: 50},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
conf := &Config{
Limit: rate.Limit(c.limit),
}

processor := newProcessor(processortest.NewNopCreateSettings(), conf)

ch := make(chan []byte)
idx := processor.cs.add(ch)
receiveNum := 0
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for range ch {
receiveNum++
}
}()

// send log to chan c.limit*2 per sec.
for i := 0; i < c.limit*2; i++ {
log2, err := processor.ConsumeLogs(context.Background(), log)
assert.Nil(t, err)
assert.Equal(t, log, log2)
}

processor.cs.closeAndRemove(idx)
wg.Wait()
t.Log(receiveNum)
assert.Equal(t, receiveNum, c.limit)
})
}
}

func TestConsumeTraces(t *testing.T) {
trace := ptrace.NewTraces()
trace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetName("foo")

cases := []struct {
name string
limit int
}{
{name: "limit_0", limit: 0},
{name: "limit_1", limit: 1},
{name: "limit_10", limit: 10},
{name: "limit_50", limit: 50},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
conf := &Config{
Limit: rate.Limit(c.limit),
}

processor := newProcessor(processortest.NewNopCreateSettings(), conf)

ch := make(chan []byte)
idx := processor.cs.add(ch)
receiveNum := 0
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for range ch {
receiveNum++
}
}()

for i := 0; i < c.limit*2; i++ {
// send trace to chan c.limit*2 per sec.
trace2, err := processor.ConsumeTraces(context.Background(), trace)
assert.Nil(t, err)
assert.Equal(t, trace, trace2)
}

processor.cs.closeAndRemove(idx)
wg.Wait()
assert.Equal(t, receiveNum, c.limit)
})
}
}
3 changes: 3 additions & 0 deletions processor/remotetapprocessor/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func TestSocketConnectionLogs(t *testing.T) {
ServerConfig: confighttp.ServerConfig{
Endpoint: "localhost:12001",
},
Limit: 1,
}
logSink := &consumertest.LogsSink{}
processor, err := NewFactory().CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg,
Expand Down Expand Up @@ -62,6 +63,7 @@ func TestSocketConnectionMetrics(t *testing.T) {
ServerConfig: confighttp.ServerConfig{
Endpoint: "localhost:12002",
},
Limit: 1,
}
metricsSink := &consumertest.MetricsSink{}
processor, err := NewFactory().CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg,
Expand Down Expand Up @@ -97,6 +99,7 @@ func TestSocketConnectionTraces(t *testing.T) {
ServerConfig: confighttp.ServerConfig{
Endpoint: "localhost:12003",
},
Limit: 1,
}
tracesSink := &consumertest.TracesSink{}
processor, err := NewFactory().CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg,
Expand Down

0 comments on commit 497fed7

Please sign in to comment.