-
Notifications
You must be signed in to change notification settings - Fork 2.5k
/
Copy pathlogs.go
113 lines (99 loc) · 3.18 KB
/
logs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package consumerretry // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/consumerretry"
import (
"context"
"errors"
"fmt"
"time"
"github.com/cenkalti/backoff/v4"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
type logsConsumer struct {
consumer.Logs
cfg Config
logger *zap.Logger
}
func NewLogs(config Config, logger *zap.Logger, next consumer.Logs) consumer.Logs {
return &logsConsumer{
Logs: next,
cfg: config,
logger: logger,
}
}
func (lc *logsConsumer) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
if !lc.cfg.Enabled {
err := lc.Logs.ConsumeLogs(ctx, logs)
if err != nil {
lc.logger.Error("ConsumeLogs() failed. "+
"Enable retry_on_failure to slow down reading logs and avoid dropping.", zap.Error(err))
}
return err
}
// Do not use NewExponentialBackOff since it calls Reset and the code here must
// call Reset after changing the InitialInterval (this saves an unnecessary call to Now).
expBackoff := backoff.ExponentialBackOff{
MaxElapsedTime: lc.cfg.MaxElapsedTime,
InitialInterval: lc.cfg.InitialInterval,
MaxInterval: lc.cfg.MaxInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
expBackoff.Reset()
span := trace.SpanFromContext(ctx)
retryNum := int64(0)
retryableErr := consumererror.Logs{}
for {
span.AddEvent(
"Sending logs.",
trace.WithAttributes(attribute.Int64("retry_num", retryNum)))
err := lc.Logs.ConsumeLogs(ctx, logs)
if err == nil {
return nil
}
if consumererror.IsPermanent(err) {
lc.logger.Error(
"ConsumeLogs() failed. The error is not retryable. Dropping data.",
zap.Error(err),
zap.Int("dropped_items", logs.LogRecordCount()),
)
return err
}
if errors.As(err, &retryableErr) {
logs = retryableErr.Data()
}
// TODO: take delay from the error once it is available in the consumererror package.
backoffDelay := expBackoff.NextBackOff()
if backoffDelay == backoff.Stop {
lc.logger.Error("Max elapsed time expired. Dropping data.", zap.Error(err), zap.Int("dropped_items",
logs.LogRecordCount()))
return err
}
backoffDelayStr := backoffDelay.String()
span.AddEvent(
"ConsumeLogs() failed. Will retry the request after interval.",
trace.WithAttributes(
attribute.String("interval", backoffDelayStr),
attribute.String("error", err.Error())))
lc.logger.Debug(
"ConsumeLogs() failed. Will retry the request after interval.",
zap.Error(err),
zap.String("interval", backoffDelayStr),
zap.Int("logs_count", logs.LogRecordCount()),
)
retryNum++
// back-off, but get interrupted when shutting down or request is cancelled or timed out.
select {
case <-ctx.Done():
return fmt.Errorf("context is cancelled or timed out %w", err)
case <-time.After(backoffDelay):
}
}
}