Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][pkg/stanza] Extract a trim package for managing trim functions #26341

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pkg/stanza/fileconsumer/internal/splitter/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bufio"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

type customFactory struct {
Expand All @@ -17,10 +18,13 @@ type customFactory struct {
var _ Factory = (*customFactory)(nil)

func NewCustomFactory(flusherCfg tokenize.FlusherConfig, splitFunc bufio.SplitFunc) Factory {
return &customFactory{flusherCfg: flusherCfg, splitFunc: splitFunc}
return &customFactory{
flusherCfg: flusherCfg,
splitFunc: splitFunc,
}
}

// Build builds Multiline Splitter struct
func (f *customFactory) Build() (bufio.SplitFunc, error) {
return f.flusherCfg.Wrap(f.splitFunc), nil
return f.flusherCfg.Wrap(f.splitFunc, trim.Whitespace(true, true)), nil
}
4 changes: 3 additions & 1 deletion pkg/stanza/operator/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

const (
Expand Down Expand Up @@ -83,7 +84,8 @@ type BaseConfig struct {
type MultiLineBuilderFunc func(enc encoding.Encoding) (bufio.SplitFunc, error)

func (c Config) defaultMultilineBuilder(enc encoding.Encoding) (bufio.SplitFunc, error) {
splitFunc, err := c.Multiline.Build(enc, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, int(c.MaxLogSize))
trimFunc := trim.Whitespace(c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces)
splitFunc, err := c.Multiline.Build(enc, true, int(c.MaxLogSize), trimFunc)
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/stanza/operator/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

const (
Expand Down Expand Up @@ -91,7 +92,8 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
}

// Build multiline
splitFunc, err := c.Multiline.Build(enc, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, MaxUDPSize)
trimFunc := trim.Whitespace(c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces)
splitFunc, err := c.Multiline.Build(enc, true, MaxUDPSize, trimFunc)
if err != nil {
return nil, err
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/stanza/tokenize/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package tokenize // import "github.com/open-telemetry/opentelemetry-collector-co
import (
"bufio"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

const DefaultFlushPeriod = 500 * time.Millisecond
Expand All @@ -24,13 +26,13 @@ func NewFlusherConfig() FlusherConfig {
}

// Wrap a bufio.SplitFunc with a flusher
func (c *FlusherConfig) Wrap(splitFunc bufio.SplitFunc) bufio.SplitFunc {
func (c *FlusherConfig) Wrap(splitFunc bufio.SplitFunc, trimFunc trim.Func) bufio.SplitFunc {
f := &flusher{
lastDataChange: time.Now(),
forcePeriod: c.Period,
previousDataLength: 0,
}
return f.splitFunc(splitFunc)
return f.splitFunc(splitFunc, trimFunc)
}

// flusher keeps information about flush state
Expand Down Expand Up @@ -71,7 +73,7 @@ func (f *flusher) shouldFlush() bool {
return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0
}

func (f *flusher) splitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc {
func (f *flusher) splitFunc(splitFunc bufio.SplitFunc, trimFunc trim.Func) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
advance, token, err = splitFunc(data, atEOF)

Expand All @@ -91,7 +93,7 @@ func (f *flusher) splitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc {
if f.shouldFlush() {
// Inform flusher that we just flushed
f.flushed()
token = trimWhitespacesFunc(data)
token = trimFunc(data)
advance = len(data)
return
}
Expand Down
62 changes: 11 additions & 51 deletions pkg/stanza/tokenize/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"regexp"

"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

// Multiline consists of splitFunc and variables needed to perform force flush
Expand All @@ -32,12 +34,12 @@ type MultilineConfig struct {
}

// Build will build a Multiline operator.
func (c MultilineConfig) Build(enc encoding.Encoding, flushAtEOF, preserveLeadingWhitespaces, preserveTrailingWhitespaces bool, maxLogSize int) (bufio.SplitFunc, error) {
return c.getSplitFunc(enc, flushAtEOF, maxLogSize, preserveLeadingWhitespaces, preserveTrailingWhitespaces)
func (c MultilineConfig) Build(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, trimFunc trim.Func) (bufio.SplitFunc, error) {
return c.getSplitFunc(enc, flushAtEOF, maxLogSize, trimFunc)
}

// getSplitFunc returns split function for bufio.Scanner basing on configured pattern
func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, preserveLeadingWhitespaces, preserveTrailingWhitespaces bool) (bufio.SplitFunc, error) {
func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, trimFunc trim.Func) (bufio.SplitFunc, error) {
endPattern := c.LineEndPattern
startPattern := c.LineStartPattern

Expand All @@ -54,7 +56,7 @@ func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, ma
case enc == encoding.Nop:
return NoSplitFunc(maxLogSize), nil
case endPattern == "" && startPattern == "":
splitFunc, err = NewlineSplitFunc(enc, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces))
splitFunc, err = NewlineSplitFunc(enc, flushAtEOF, trimFunc)
if err != nil {
return nil, err
}
Expand All @@ -63,13 +65,13 @@ func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, ma
if err != nil {
return nil, fmt.Errorf("compile line end regex: %w", err)
}
splitFunc = LineEndSplitFunc(re, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces))
splitFunc = LineEndSplitFunc(re, flushAtEOF, trimFunc)
case startPattern != "":
re, err := regexp.Compile("(?m)" + c.LineStartPattern)
if err != nil {
return nil, fmt.Errorf("compile line start regex: %w", err)
}
splitFunc = LineStartSplitFunc(re, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces))
splitFunc = LineStartSplitFunc(re, flushAtEOF, trimFunc)
default:
return nil, fmt.Errorf("unreachable")
}
Expand All @@ -78,7 +80,7 @@ func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, ma

// LineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into
// tokens that start with a match to the regex pattern provided
func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) bufio.SplitFunc {
func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
firstLoc := re.FindIndex(data)
if firstLoc == nil {
Expand Down Expand Up @@ -132,7 +134,7 @@ func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) b

// LineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into
// tokens that end with a match to the regex pattern provided
func LineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) bufio.SplitFunc {
func LineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
loc := re.FindIndex(data)
if loc == nil {
Expand Down Expand Up @@ -160,7 +162,7 @@ func LineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) buf

// NewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but
// never returning an token using EOF as a terminator
func NewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool, trimFunc trimFunc) (bufio.SplitFunc, error) {
func NewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool, trimFunc trim.Func) (bufio.SplitFunc, error) {
newline, err := encodedNewline(enc)
if err != nil {
return nil, err
Expand Down Expand Up @@ -224,45 +226,3 @@ func encodedCarriageReturn(enc encoding.Encoding) ([]byte, error) {
nDst, _, err := enc.NewEncoder().Transform(out, []byte{'\r'}, true)
return out[:nDst], err
}

type trimFunc func([]byte) []byte

func noTrim(token []byte) []byte {
return token
}

func trimLeadingWhitespacesFunc(data []byte) []byte {
// TrimLeft to strip EOF whitespaces in case of using $ in regex
// For some reason newline and carriage return are being moved to beginning of next log
token := bytes.TrimLeft(data, "\r\n\t ")
if token == nil {
return []byte{}
}
return token
}

func trimTrailingWhitespacesFunc(data []byte) []byte {
// TrimRight to strip all whitespaces from the end of log
token := bytes.TrimRight(data, "\r\n\t ")
if token == nil {
return []byte{}
}
return token
}

func trimWhitespacesFunc(data []byte) []byte {
return trimLeadingWhitespacesFunc(trimTrailingWhitespacesFunc(data))
}

func getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces bool) trimFunc {
if preserveLeadingWhitespaces && preserveTrailingWhitespaces {
return noTrim
}
if preserveLeadingWhitespaces {
return trimTrailingWhitespacesFunc
}
if preserveTrailingWhitespaces {
return trimLeadingWhitespacesFunc
}
return trimWhitespacesFunc
}
22 changes: 14 additions & 8 deletions pkg/stanza/tokenize/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ import (
"golang.org/x/text/encoding/unicode"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize/tokenizetest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

var noTrim = trim.Whitespace(true, true)

const (
// Those values has been experimentally figured out for windows
sleepDuration time.Duration = time.Millisecond * 80
Expand Down Expand Up @@ -215,10 +218,11 @@ func TestLineStartSplitFunc(t *testing.T) {
LineStartPattern: tc.Pattern,
}

splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces)
trimFunc := trim.Whitespace(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces)
splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, trimFunc)
require.NoError(t, err)
if tc.Flusher != nil {
splitFunc = tc.Flusher.Wrap(splitFunc)
splitFunc = tc.Flusher.Wrap(splitFunc, trimFunc)
}
t.Run(tc.Name, tc.Run(splitFunc))
}
Expand Down Expand Up @@ -426,10 +430,11 @@ func TestLineEndSplitFunc(t *testing.T) {
LineEndPattern: tc.Pattern,
}

splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces)
trimFunc := trim.Whitespace(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces)
splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, trimFunc)
require.NoError(t, err)
if tc.Flusher != nil {
splitFunc = tc.Flusher.Wrap(splitFunc)
splitFunc = tc.Flusher.Wrap(splitFunc, trimFunc)
}
t.Run(tc.Name, tc.Run(splitFunc))
}
Expand Down Expand Up @@ -591,10 +596,11 @@ func TestNewlineSplitFunc(t *testing.T) {
}

for _, tc := range testCases {
splitFunc, err := NewlineSplitFunc(unicode.UTF8, false, getTrimFunc(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces))
trimFunc := trim.Whitespace(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces)
splitFunc, err := NewlineSplitFunc(unicode.UTF8, false, trimFunc)
require.NoError(t, err)
if tc.Flusher != nil {
splitFunc = tc.Flusher.Wrap(splitFunc)
splitFunc = tc.Flusher.Wrap(splitFunc, trimFunc)
}
t.Run(tc.Name, tc.Run(splitFunc))
}
Expand Down Expand Up @@ -666,14 +672,14 @@ func TestNoopEncodingError(t *testing.T) {
LineEndPattern: "\n",
}

_, err := cfg.getSplitFunc(encoding.Nop, false, 0, false, false)
_, err := cfg.getSplitFunc(encoding.Nop, false, 0, noTrim)
require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding"))

cfg = &MultilineConfig{
LineStartPattern: "\n",
}

_, err = cfg.getSplitFunc(encoding.Nop, false, 0, false, false)
_, err = cfg.getSplitFunc(encoding.Nop, false, 0, noTrim)
require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding"))
}

Expand Down
17 changes: 9 additions & 8 deletions pkg/stanza/tokenize/splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ import (
"bufio"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

// SplitterConfig consolidates MultilineConfig and FlusherConfig
type SplitterConfig struct {
Encoding string `mapstructure:"encoding,omitempty"`
Flusher FlusherConfig `mapstructure:",squash,omitempty"`
Multiline MultilineConfig `mapstructure:"multiline,omitempty"`
PreserveLeadingWhitespaces bool `mapstructure:"preserve_leading_whitespaces,omitempty"`
PreserveTrailingWhitespaces bool `mapstructure:"preserve_trailing_whitespaces,omitempty"`
Encoding string `mapstructure:"encoding,omitempty"`
Flusher FlusherConfig `mapstructure:",squash,omitempty"`
Multiline MultilineConfig `mapstructure:"multiline,omitempty"`
PreserveLeading bool `mapstructure:"preserve_leading_whitespaces,omitempty"`
PreserveTrailing bool `mapstructure:"preserve_trailing_whitespaces,omitempty"`
}

// NewSplitterConfig returns default SplitterConfig
Expand All @@ -33,11 +34,11 @@ func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (bufio.SplitFunc
if err != nil {
return nil, err
}

splitFunc, err := c.Multiline.Build(enc, flushAtEOF, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, maxLogSize)
trimFunc := trim.Whitespace(c.PreserveLeading, c.PreserveTrailing)
splitFunc, err := c.Multiline.Build(enc, flushAtEOF, maxLogSize, trimFunc)
if err != nil {
return nil, err
}

return c.Flusher.Wrap(splitFunc), nil
return c.Flusher.Wrap(splitFunc, trimFunc), nil
}
48 changes: 48 additions & 0 deletions pkg/stanza/trim/trim.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package trim // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"

import (
"bytes"
)

type Func func([]byte) []byte

func Whitespace(preserveLeading, preserveTrailing bool) Func {
if preserveLeading && preserveTrailing {
return noTrim
}
if preserveLeading {
return trimTrailingWhitespacesFunc
}
if preserveTrailing {
return trimLeadingWhitespacesFunc
}
return trimWhitespacesFunc
}

func noTrim(token []byte) []byte {
return token
}

func trimLeadingWhitespacesFunc(data []byte) []byte {
// TrimLeft to strip EOF whitespaces in case of using $ in regex
// For some reason newline and carriage return are being moved to beginning of next log
token := bytes.TrimLeft(data, "\r\n\t ")

// TrimLeft will return nil if data is an empty slice
if token == nil {
return []byte{}
}
return token
}

func trimTrailingWhitespacesFunc(data []byte) []byte {
// TrimRight to strip all whitespaces from the end of log
return bytes.TrimRight(data, "\r\n\t ")
}

func trimWhitespacesFunc(data []byte) []byte {
return trimLeadingWhitespacesFunc(trimTrailingWhitespacesFunc(data))
}
Loading