Skip to content

Commit

Permalink
Create a new decoder for each TCP/UDP connection to prevent concurren…
Browse files Browse the repository at this point in the history
…t write to buffers.
  • Loading branch information
jefchien committed Aug 8, 2023
1 parent eb26ae6 commit d242293
Show file tree
Hide file tree
Showing 13 changed files with 97 additions and 66 deletions.
27 changes: 27 additions & 0 deletions .chloggen/fix-concurrent-decode.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Create a new decoder for each TCP/UDP connection to prevent concurrent write to buffer.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24980]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
8 changes: 4 additions & 4 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact

var hCfg *header.Config
if c.Header != nil {
enc, err := c.Splitter.EncodingConfig.Build()
enc, err := helper.LookupEncoding(c.Splitter.EncodingConfig.Encoding)
if err != nil {
return nil, fmt.Errorf("failed to create encoding: %w", err)
}

hCfg, err = header.NewConfig(c.Header.Pattern, c.Header.MetadataOperators, enc.Encoding)
hCfg, err = header.NewConfig(c.Header.Pattern, c.Header.MetadataOperators, enc)
if err != nil {
return nil, fmt.Errorf("failed to build header config: %w", err)
}
Expand Down Expand Up @@ -212,13 +212,13 @@ func (c Config) validate() error {
return errors.New("`max_batches` must not be negative")
}

enc, err := c.Splitter.EncodingConfig.Build()
enc, err := helper.LookupEncoding(c.Splitter.EncodingConfig.Encoding)
if err != nil {
return err
}

if c.Header != nil {
if _, err := header.NewConfig(c.Header.Pattern, c.Header.MetadataOperators, enc.Encoding); err != nil {
if _, err := header.NewConfig(c.Header.Pattern, c.Header.MetadataOperators, enc); err != nil {
return fmt.Errorf("invalid config for `header`: %w", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/stanza/fileconsumer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type reader struct {
*readerConfig
lineSplitFunc bufio.SplitFunc
splitFunc bufio.SplitFunc
encoding helper.Encoding
decoder *helper.Decoder
processFunc emit.Callback

Fingerprint *fingerprint.Fingerprint
Expand Down Expand Up @@ -87,7 +87,7 @@ func (r *reader) ReadToEnd(ctx context.Context) {
break
}

token, err := r.encoding.Decode(s.Bytes())
token, err := r.decoder.Decode(s.Bytes())
if err != nil {
r.Errorw("decode: %w", zap.Error(err))
} else if err := r.processFunc(ctx, token, r.FileAttributes); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/stanza/fileconsumer/reader_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,11 @@ func (b *readerBuilder) build() (r *reader, err error) {
}
}

r.encoding, err = b.encodingConfig.Build()
encoding, err := helper.LookupEncoding(b.encodingConfig.Encoding)
if err != nil {
return nil, err
}
r.decoder = helper.NewDecoder(encoding)

if b.headerConfig == nil || b.headerFinalized {
r.splitFunc = r.lineSplitFunc
Expand Down
7 changes: 4 additions & 3 deletions pkg/stanza/fileconsumer/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,13 @@ func TestHeaderFingerprintIncluded(t *testing.T) {
regexConf := regex.NewConfig()
regexConf.Regex = "^#(?P<header>.*)"

enc, err := helper.EncodingConfig{
encodingConf := helper.EncodingConfig{
Encoding: "utf-8",
}.Build()
}
enc, err := helper.LookupEncoding(encodingConf.Encoding)
require.NoError(t, err)

h, err := header.NewConfig("^#", []operator.Config{{Builder: regexConf}}, enc.Encoding)
h, err := header.NewConfig("^#", []operator.Config{{Builder: regexConf}}, enc)
require.NoError(t, err)
f.headerConfig = h

Expand Down
4 changes: 2 additions & 2 deletions pkg/stanza/fileconsumer/splitter_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ func newMultilineSplitterFactory(splitter helper.SplitterConfig) *multilineSplit

// Build builds Multiline Splitter struct
func (factory *multilineSplitterFactory) Build(maxLogSize int) (bufio.SplitFunc, error) {
enc, err := factory.EncodingConfig.Build()
enc, err := helper.LookupEncoding(factory.EncodingConfig.Encoding)
if err != nil {
return nil, err
}
flusher := factory.Flusher.Build()
splitter, err := factory.Multiline.Build(enc.Encoding, false, factory.PreserveLeadingWhitespaces, factory.PreserveTrailingWhitespaces, flusher, maxLogSize)
splitter, err := factory.Multiline.Build(enc, false, factory.PreserveLeadingWhitespaces, factory.PreserveTrailingWhitespaces, flusher, maxLogSize)
if err != nil {
return nil, err
}
Expand Down
49 changes: 23 additions & 26 deletions pkg/stanza/operator/helper/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,48 +14,44 @@ import (
"golang.org/x/text/transform"
)

// NewBasicConfig creates a new Encoding config
// NewEncodingConfig creates a new Encoding config
func NewEncodingConfig() EncodingConfig {
return EncodingConfig{
Encoding: "utf-8",
}
}

// EncodingConfig is the configuration of a Encoding helper
// EncodingConfig is the configuration of an Encoding helper
type EncodingConfig struct {
Encoding string `mapstructure:"encoding,omitempty"`
}

// Build will build an Encoding operator.
func (c EncodingConfig) Build() (Encoding, error) {
enc, err := lookupEncoding(c.Encoding)
if err != nil {
return Encoding{}, err
}

return Encoding{
Encoding: enc,
decodeBuffer: make([]byte, 1<<12),
decoder: enc.NewDecoder(),
}, nil
}

type Encoding struct {
Encoding encoding.Encoding
type Decoder struct {
encoding encoding.Encoding
decoder *encoding.Decoder
decodeBuffer []byte
}

// Decode converts the bytes in msgBuf to utf-8 from the configured encoding
func (e *Encoding) Decode(msgBuf []byte) ([]byte, error) {
// NewDecoder wraps a character set encoding and creates a reusable buffer to reduce allocation.
// Decoder is not thread-safe and must not be used in multiple goroutines.
func NewDecoder(enc encoding.Encoding) *Decoder {
return &Decoder{
encoding: enc,
decoder: enc.NewDecoder(),
decodeBuffer: make([]byte, 1<<12),
}
}

// Decode converts the bytes in msgBuf to UTF-8 from the configured encoding.
func (d *Decoder) Decode(msgBuf []byte) ([]byte, error) {
for {
e.decoder.Reset()
nDst, _, err := e.decoder.Transform(e.decodeBuffer, msgBuf, true)
d.decoder.Reset()
nDst, _, err := d.decoder.Transform(d.decodeBuffer, msgBuf, true)
if err == nil {
return e.decodeBuffer[:nDst], nil
return d.decodeBuffer[:nDst], nil
}
if errors.Is(err, transform.ErrShortDst) {
e.decodeBuffer = make([]byte, len(e.decodeBuffer)*2)
d.decodeBuffer = make([]byte, len(d.decodeBuffer)*2)
continue
}
return nil, fmt.Errorf("transform encoding: %w", err)
Expand All @@ -73,7 +69,8 @@ var encodingOverrides = map[string]encoding.Encoding{
"": unicode.UTF8,
}

func lookupEncoding(enc string) (encoding.Encoding, error) {
// LookupEncoding attempts to match the string name provided with a character set encoding.
func LookupEncoding(enc string) (encoding.Encoding, error) {
if e, ok := encodingOverrides[strings.ToLower(enc)]; ok {
return e, nil
}
Expand All @@ -88,7 +85,7 @@ func lookupEncoding(enc string) (encoding.Encoding, error) {
}

func IsNop(enc string) bool {
e, err := lookupEncoding(enc)
e, err := LookupEncoding(enc)
if err != nil {
return false
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/operator/helper/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Multiline struct {
Force *Flusher
}

// NewBasicConfig creates a new Multiline config
// NewMultilineConfig creates a new Multiline config
func NewMultilineConfig() MultilineConfig {
return MultilineConfig{
LineStartPattern: "",
Expand Down
8 changes: 4 additions & 4 deletions pkg/stanza/operator/helper/splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,27 @@ func NewSplitterConfig() SplitterConfig {

// Build builds Splitter struct
func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (*Splitter, error) {
enc, err := c.EncodingConfig.Build()
enc, err := LookupEncoding(c.EncodingConfig.Encoding)
if err != nil {
return nil, err
}

flusher := c.Flusher.Build()
splitFunc, err := c.Multiline.Build(enc.Encoding, flushAtEOF, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, flusher, maxLogSize)
splitFunc, err := c.Multiline.Build(enc, flushAtEOF, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, flusher, maxLogSize)
if err != nil {
return nil, err
}

return &Splitter{
Encoding: enc,
Decoder: NewDecoder(enc),
Flusher: flusher,
SplitFunc: splitFunc,
}, nil
}

// Splitter consolidates Flusher and dependent splitFunc
type Splitter struct {
Encoding Encoding
Decoder *Decoder
SplitFunc bufio.SplitFunc
Flusher *Flusher
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/stanza/operator/input/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strconv"

"go.uber.org/zap"
"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand Down Expand Up @@ -143,7 +144,7 @@ func (t *Input) SetOutputs(operators []operator.Operator) error {
return t.parser.SetOutputs(operators)
}

func OctetMultiLineBuilder(_ helper.Encoding) (bufio.SplitFunc, error) {
func OctetMultiLineBuilder(_ encoding.Encoding) (bufio.SplitFunc, error) {
return newOctetFrameSplitFunc(true), nil
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/stanza/operator/input/syslog/syslog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/udp"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/internal"
Expand Down Expand Up @@ -250,7 +249,7 @@ func TestOctetFramingSplitFunc(t *testing.T) {
},
}
for _, tc := range testCases {
splitFunc, err := OctetMultiLineBuilder(helper.Encoding{})
splitFunc, err := OctetMultiLineBuilder(nil)
require.NoError(t, err)
t.Run(tc.Name, tc.RunFunc(splitFunc))
}
Expand Down
27 changes: 15 additions & 12 deletions pkg/stanza/operator/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/jpillora/backoff"
"go.opentelemetry.io/collector/config/configtls"
"go.uber.org/zap"
"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand All @@ -40,7 +41,7 @@ func init() {
operator.Register(operatorType, func() operator.Builder { return NewConfig() })
}

// NewConfigWithID creates a new TCP input config with default values
// NewConfig creates a new TCP input config with default values
func NewConfig() *Config {
return NewConfigWithID(operatorType)
}
Expand Down Expand Up @@ -77,10 +78,10 @@ type BaseConfig struct {
MultiLineBuilder MultiLineBuilderFunc
}

type MultiLineBuilderFunc func(encoding helper.Encoding) (bufio.SplitFunc, error)
type MultiLineBuilderFunc func(enc encoding.Encoding) (bufio.SplitFunc, error)

func (c Config) defaultMultilineBuilder(encoding helper.Encoding) (bufio.SplitFunc, error) {
splitFunc, err := c.Multiline.Build(encoding.Encoding, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, nil, int(c.MaxLogSize))
func (c Config) defaultMultilineBuilder(enc encoding.Encoding) (bufio.SplitFunc, error) {
splitFunc, err := c.Multiline.Build(enc, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, nil, int(c.MaxLogSize))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -112,7 +113,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
return nil, fmt.Errorf("failed to resolve listen_address: %w", err)
}

encoding, err := c.Encoding.Build()
enc, err := helper.LookupEncoding(c.Encoding.Encoding)
if err != nil {
return nil, err
}
Expand All @@ -122,7 +123,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
}

// Build multiline
splitFunc, err := c.MultiLineBuilder(encoding)
splitFunc, err := c.MultiLineBuilder(enc)
if err != nil {
return nil, err
}
Expand All @@ -138,7 +139,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
MaxLogSize: int(c.MaxLogSize),
addAttributes: c.AddAttributes,
OneLogPerPacket: c.OneLogPerPacket,
encoding: encoding,
encoding: enc,
splitFunc: splitFunc,
backoff: backoff.Backoff{
Max: 3 * time.Second,
Expand Down Expand Up @@ -170,7 +171,7 @@ type Input struct {
tls *tls.Config
backoff backoff.Backoff

encoding helper.Encoding
encoding encoding.Encoding
splitFunc bufio.SplitFunc
resolver *helper.IPResolver
}
Expand Down Expand Up @@ -260,14 +261,16 @@ func (t *Input) goHandleMessages(ctx context.Context, conn net.Conn, cancel cont
defer t.wg.Done()
defer cancel()

decoder := helper.NewDecoder(t.encoding)

if t.OneLogPerPacket {
var buf bytes.Buffer
_, err := io.Copy(&buf, conn)
if err != nil {
t.Errorw("IO copy net connection buffer error", zap.Error(err))
}
log := truncateMaxLog(buf.Bytes(), t.MaxLogSize)
t.handleMessage(ctx, conn, log)
t.handleMessage(ctx, conn, decoder, log)
return
}

Expand All @@ -279,7 +282,7 @@ func (t *Input) goHandleMessages(ctx context.Context, conn net.Conn, cancel cont
scanner.Split(t.splitFunc)

for scanner.Scan() {
t.handleMessage(ctx, conn, scanner.Bytes())
t.handleMessage(ctx, conn, decoder, scanner.Bytes())
}

if err := scanner.Err(); err != nil {
Expand All @@ -288,8 +291,8 @@ func (t *Input) goHandleMessages(ctx context.Context, conn net.Conn, cancel cont
}()
}

func (t *Input) handleMessage(ctx context.Context, conn net.Conn, log []byte) {
decoded, err := t.encoding.Decode(log)
func (t *Input) handleMessage(ctx context.Context, conn net.Conn, decoder *helper.Decoder, log []byte) {
decoded, err := decoder.Decode(log)
if err != nil {
t.Errorw("Failed to decode data", zap.Error(err))
return
Expand Down
Loading

0 comments on commit d242293

Please sign in to comment.