From 82d0db2f68914112c76442788c36e447381d21a3 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Wed, 13 Sep 2023 12:57:19 -0600 Subject: [PATCH] [pkg/stanza] Extract trim func from split package (#26536) Follows https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/26241 Previously, split funcs were responsible for applying trim funcs. This PR increases composability by applying trim funcs as a wrapper around split funcs. One nuance that was surfaced here is that the newline split func was not handling the case where a line starts with a newline. When this happens, we need to tell the scanner to advance, but we still want to return a `""` token, rather than nil. This is covered by existing tests, but previously it was "fixed" by the trim func which would return an empty slice when the token was nil. Now, the newline split func will explicitly handle this case, while the trim func will return the original value if it is nil or empty. --- .chloggen/pkg-stanza-extract-trim-split.yaml | 30 +++ pkg/stanza/fileconsumer/config.go | 2 +- .../fileconsumer/internal/header/config.go | 10 +- .../fileconsumer/internal/splitter/custom.go | 6 +- .../internal/splitter/custom_test.go | 62 ++++++- .../internal/splitter/multiline.go | 9 +- .../internal/splitter/multiline_test.go | 26 ++- pkg/stanza/flush/flush.go | 10 +- pkg/stanza/flush/flush_test.go | 6 +- pkg/stanza/operator/input/tcp/tcp.go | 12 +- pkg/stanza/operator/input/udp/udp.go | 5 +- pkg/stanza/split/split.go | 40 ++-- pkg/stanza/split/split_test.go | 175 ++++++------------ pkg/stanza/split/splittest/splittest.go | 20 +- pkg/stanza/trim/trim.go | 28 ++- pkg/stanza/trim/trim_test.go | 88 +++++++-- 16 files changed, 328 insertions(+), 201 deletions(-) create mode 100755 .chloggen/pkg-stanza-extract-trim-split.yaml diff --git a/.chloggen/pkg-stanza-extract-trim-split.yaml b/.chloggen/pkg-stanza-extract-trim-split.yaml new file mode 100755 index 000000000000..651ae0e0b097 --- /dev/null +++ b/.chloggen/pkg-stanza-extract-trim-split.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Make trim func composable + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26536] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + - Adds trim.WithFunc to allow trim funcs to wrap bufio.SplitFuncs. + - Removes trim.Func from split.Config.Func. Use trim.WithFunc instead. + - Removes trim.Func from flush.WithPeriod. Use trim.WithFunc instead. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index c0158b4a5d2b..d5416d8da88a 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -119,7 +119,7 @@ func (c Config) BuildWithSplitFunc(logger *zap.SugaredLogger, emit emit.Callback } // Ensure that splitter is buildable - factory := splitter.NewCustomFactory(splitFunc, c.FlushPeriod) + factory := splitter.NewCustomFactory(splitFunc, c.TrimConfig.Func(), c.FlushPeriod) if _, err := factory.SplitFunc(); err != nil { return nil, err } diff --git a/pkg/stanza/fileconsumer/internal/header/config.go b/pkg/stanza/fileconsumer/internal/header/config.go index c28ba2ac103a..618d1b1fa714 100644 --- a/pkg/stanza/fileconsumer/internal/header/config.go +++ b/pkg/stanza/fileconsumer/internal/header/config.go @@ -16,6 +16,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) type Config struct { @@ -69,13 +70,16 @@ func NewConfig(matchRegex string, metadataOperators []operator.Config, enc encod return nil, fmt.Errorf("failed to compile `pattern`: %w", err) } - splitFunc, err := split.NewlineSplitFunc(enc, false, func(b []byte) []byte { - return bytes.Trim(b, "\r\n") - }) + splitFunc, err := split.NewlineSplitFunc(enc, false) if err != nil { return nil, fmt.Errorf("failed to create split func: %w", err) } + var trimFunc trim.Func = func(b []byte) []byte { + return bytes.Trim(b, "\r\n") + } + splitFunc = trim.WithFunc(splitFunc, trimFunc) + return &Config{ regex: regex, SplitFunc: splitFunc, diff --git a/pkg/stanza/fileconsumer/internal/splitter/custom.go b/pkg/stanza/fileconsumer/internal/splitter/custom.go index 52fe9125e627..6cb8afced95f 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/custom.go +++ b/pkg/stanza/fileconsumer/internal/splitter/custom.go @@ -13,19 +13,21 @@ import ( type customFactory struct { splitFunc bufio.SplitFunc + trimFunc trim.Func flushPeriod time.Duration } var _ Factory = (*customFactory)(nil) -func NewCustomFactory(splitFunc bufio.SplitFunc, flushPeriod time.Duration) Factory { +func NewCustomFactory(splitFunc bufio.SplitFunc, trimFunc trim.Func, flushPeriod time.Duration) Factory { return &customFactory{ splitFunc: splitFunc, + trimFunc: trimFunc, flushPeriod: flushPeriod, } } // SplitFunc builds a bufio.SplitFunc based on the configuration func (f *customFactory) SplitFunc() (bufio.SplitFunc, error) { - return flush.WithPeriod(f.splitFunc, trim.Nop, f.flushPeriod), nil + return trim.WithFunc(flush.WithPeriod(f.splitFunc, f.flushPeriod), f.trimFunc), nil } diff --git a/pkg/stanza/fileconsumer/internal/splitter/custom_test.go b/pkg/stanza/fileconsumer/internal/splitter/custom_test.go index a03d7e0f290b..1b65b08d0097 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/custom_test.go +++ b/pkg/stanza/fileconsumer/internal/splitter/custom_test.go @@ -9,10 +9,12 @@ import ( "time" "github.com/stretchr/testify/assert" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) func TestCustom(t *testing.T) { - factory := NewCustomFactory(bufio.ScanLines, 0) + factory := NewCustomFactory(bufio.ScanLines, trim.Nop, 0) splitFunc, err := factory.SplitFunc() assert.NoError(t, err) assert.NotNil(t, splitFunc) @@ -35,9 +37,33 @@ func TestCustom(t *testing.T) { assert.Nil(t, token) } +func TestCustomWithTrim(t *testing.T) { + factory := NewCustomFactory(bufio.ScanLines, trim.Whitespace, 0) + splitFunc, err := factory.SplitFunc() + assert.NoError(t, err) + assert.NotNil(t, splitFunc) + + input := []byte(" hello \n world \n extra ") + + advance, token, err := splitFunc(input, false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte("hello"), token) + + advance, token, err = splitFunc(input[8:], false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte("world"), token) + + advance, token, err = splitFunc(input[16:], false) + assert.NoError(t, err) + assert.Equal(t, 0, advance) + assert.Nil(t, token) +} + func TestCustomWithFlush(t *testing.T) { flushPeriod := 100 * time.Millisecond - factory := NewCustomFactory(bufio.ScanLines, flushPeriod) + factory := NewCustomFactory(bufio.ScanLines, trim.Nop, flushPeriod) splitFunc, err := factory.SplitFunc() assert.NoError(t, err) assert.NotNil(t, splitFunc) @@ -66,3 +92,35 @@ func TestCustomWithFlush(t *testing.T) { assert.Equal(t, 7, advance) assert.Equal(t, []byte(" extra "), token) } + +func TestCustomWithFlushTrim(t *testing.T) { + flushPeriod := 100 * time.Millisecond + factory := NewCustomFactory(bufio.ScanLines, trim.Whitespace, flushPeriod) + splitFunc, err := factory.SplitFunc() + assert.NoError(t, err) + assert.NotNil(t, splitFunc) + + input := []byte(" hello \n world \n extra ") + + advance, token, err := splitFunc(input, false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte("hello"), token) + + advance, token, err = splitFunc(input[8:], false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte("world"), token) + + advance, token, err = splitFunc(input[16:], false) + assert.NoError(t, err) + assert.Equal(t, 0, advance) + assert.Nil(t, token) + + time.Sleep(2 * flushPeriod) + + advance, token, err = splitFunc(input[16:], false) + assert.NoError(t, err) + assert.Equal(t, 7, advance) + assert.Equal(t, []byte("extra"), token) // Ensure trim applies to flushed token +} diff --git a/pkg/stanza/fileconsumer/internal/splitter/multiline.go b/pkg/stanza/fileconsumer/internal/splitter/multiline.go index ae770350ed80..917bf2aeddb7 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/multiline.go +++ b/pkg/stanza/fileconsumer/internal/splitter/multiline.go @@ -42,9 +42,14 @@ func NewSplitFuncFactory( // SplitFunc builds a bufio.SplitFunc based on the configuration func (f *splitFuncFactory) SplitFunc() (bufio.SplitFunc, error) { - splitFunc, err := f.splitConfig.Func(f.encoding, false, f.maxLogSize, f.trimFunc) + splitFunc, err := f.splitConfig.Func(f.encoding, false, f.maxLogSize) if err != nil { return nil, err } - return flush.WithPeriod(splitFunc, f.trimFunc, f.flushPeriod), nil + splitFunc = flush.WithPeriod(splitFunc, f.flushPeriod) + if f.encoding == encoding.Nop { + // Special case where we should never trim + return splitFunc, nil + } + return trim.WithFunc(splitFunc, f.trimFunc), nil } diff --git a/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go b/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go index d0207f964762..89b773ad802d 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go +++ b/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go @@ -49,6 +49,30 @@ func TestSplitFunc(t *testing.T) { assert.Nil(t, token) } +func TestSplitFuncWithTrim(t *testing.T) { + factory := NewSplitFuncFactory(split.Config{}, unicode.UTF8, 1024, trim.Whitespace, 0) + splitFunc, err := factory.SplitFunc() + assert.NoError(t, err) + assert.NotNil(t, splitFunc) + + input := []byte(" hello \n world \n extra ") + + advance, token, err := splitFunc(input, false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte("hello"), token) + + advance, token, err = splitFunc(input[8:], false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte("world"), token) + + advance, token, err = splitFunc(input[16:], false) + assert.NoError(t, err) + assert.Equal(t, 0, advance) + assert.Nil(t, token) +} + func TestSplitFuncWithFlush(t *testing.T) { flushPeriod := 100 * time.Millisecond factory := NewSplitFuncFactory(split.Config{}, unicode.UTF8, 1024, trim.Nop, flushPeriod) @@ -81,7 +105,7 @@ func TestSplitFuncWithFlush(t *testing.T) { assert.Equal(t, []byte(" extra "), token) } -func TestSplitFuncWithTrim(t *testing.T) { +func TestSplitFuncWithFlushTrim(t *testing.T) { flushPeriod := 100 * time.Millisecond factory := NewSplitFuncFactory(split.Config{}, unicode.UTF8, 1024, trim.Whitespace, flushPeriod) splitFunc, err := factory.SplitFunc() diff --git a/pkg/stanza/flush/flush.go b/pkg/stanza/flush/flush.go index f42e18c82370..4197f527972d 100644 --- a/pkg/stanza/flush/flush.go +++ b/pkg/stanza/flush/flush.go @@ -6,12 +6,10 @@ package flush // import "github.com/open-telemetry/opentelemetry-collector-contr import ( "bufio" "time" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) // Wrap a bufio.SplitFunc with a flusher -func WithPeriod(splitFunc bufio.SplitFunc, trimFunc trim.Func, period time.Duration) bufio.SplitFunc { +func WithPeriod(splitFunc bufio.SplitFunc, period time.Duration) bufio.SplitFunc { if period <= 0 { return splitFunc } @@ -20,7 +18,7 @@ func WithPeriod(splitFunc bufio.SplitFunc, trimFunc trim.Func, period time.Durat forcePeriod: period, previousDataLength: 0, } - return f.splitFunc(splitFunc, trimFunc) + return f.splitFunc(splitFunc) } // flusher keeps information about flush state @@ -61,7 +59,7 @@ func (f *flusher) shouldFlush() bool { return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0 } -func (f *flusher) splitFunc(splitFunc bufio.SplitFunc, trimFunc trim.Func) bufio.SplitFunc { +func (f *flusher) splitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { advance, token, err = splitFunc(data, atEOF) @@ -81,7 +79,7 @@ func (f *flusher) splitFunc(splitFunc bufio.SplitFunc, trimFunc trim.Func) bufio if f.shouldFlush() { // Inform flusher that we just flushed f.flushed() - token = trimFunc(data) + token = data advance = len(data) return } diff --git a/pkg/stanza/flush/flush_test.go b/pkg/stanza/flush/flush_test.go index 25d3aec0212b..140308274033 100644 --- a/pkg/stanza/flush/flush_test.go +++ b/pkg/stanza/flush/flush_test.go @@ -9,8 +9,6 @@ import ( "time" "github.com/stretchr/testify/assert" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) func TestFlusher(t *testing.T) { @@ -22,7 +20,7 @@ func TestFlusher(t *testing.T) { // always use atEOF=false. flushPeriod := 100 * time.Millisecond - f := WithPeriod(bufio.ScanWords, trim.Nop, flushPeriod) + f := WithPeriod(bufio.ScanWords, flushPeriod) content := []byte("foo bar hellowo") @@ -64,7 +62,7 @@ func TestNoFlushPeriod(t *testing.T) { // In other words, we should expect exactly the behavior of bufio.ScanWords. flushPeriod := time.Duration(0) - f := WithPeriod(bufio.ScanWords, trim.Nop, flushPeriod) + f := WithPeriod(bufio.ScanWords, flushPeriod) content := []byte("foo bar hellowo") diff --git a/pkg/stanza/operator/input/tcp/tcp.go b/pkg/stanza/operator/input/tcp/tcp.go index 2a618e7d1054..2dd553e77512 100644 --- a/pkg/stanza/operator/input/tcp/tcp.go +++ b/pkg/stanza/operator/input/tcp/tcp.go @@ -81,13 +81,8 @@ type BaseConfig struct { type SplitFuncBuilder func(enc encoding.Encoding) (bufio.SplitFunc, error) -func (c Config) defaultMultilineBuilder(enc encoding.Encoding) (bufio.SplitFunc, error) { - trimFunc := c.TrimConfig.Func() - splitFunc, err := c.SplitConfig.Func(enc, true, int(c.MaxLogSize), trimFunc) - if err != nil { - return nil, err - } - return splitFunc, nil +func (c Config) defaultSplitFuncBuilder(enc encoding.Encoding) (bufio.SplitFunc, error) { + return c.SplitConfig.Func(enc, true, int(c.MaxLogSize)) } // Build will build a tcp input operator. @@ -121,7 +116,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { } if c.SplitFuncBuilder == nil { - c.SplitFuncBuilder = c.defaultMultilineBuilder + c.SplitFuncBuilder = c.defaultSplitFuncBuilder } // Build split func @@ -129,6 +124,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { if err != nil { return nil, err } + splitFunc = trim.WithFunc(splitFunc, c.TrimConfig.Func()) var resolver *helper.IPResolver if c.AddAttributes { diff --git a/pkg/stanza/operator/input/udp/udp.go b/pkg/stanza/operator/input/udp/udp.go index 0bf1c78f76fe..e45ee9b8d498 100644 --- a/pkg/stanza/operator/input/udp/udp.go +++ b/pkg/stanza/operator/input/udp/udp.go @@ -89,11 +89,12 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { return nil, err } - // Build SplitFunc - splitFunc, err := c.SplitConfig.Func(enc, true, MaxUDPSize, c.TrimConfig.Func()) + // Build split func + splitFunc, err := c.SplitConfig.Func(enc, true, MaxUDPSize) if err != nil { return nil, err } + splitFunc = trim.WithFunc(splitFunc, c.TrimConfig.Func()) var resolver *helper.IPResolver if c.AddAttributes { diff --git a/pkg/stanza/split/split.go b/pkg/stanza/split/split.go index 6e0282f52d28..fe449141da36 100644 --- a/pkg/stanza/split/split.go +++ b/pkg/stanza/split/split.go @@ -10,8 +10,6 @@ import ( "regexp" "golang.org/x/text/encoding" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) // Config is the configuration for a split func @@ -21,7 +19,7 @@ type Config struct { } // Func will return a bufio.SplitFunc based on the config -func (c Config) Func(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, trimFunc trim.Func) (bufio.SplitFunc, error) { +func (c Config) Func(enc encoding.Encoding, flushAtEOF bool, maxLogSize int) (bufio.SplitFunc, error) { endPattern := c.LineEndPattern startPattern := c.LineStartPattern @@ -38,7 +36,7 @@ func (c Config) Func(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, tri case enc == encoding.Nop: return NoSplitFunc(maxLogSize), nil case endPattern == "" && startPattern == "": - splitFunc, err = NewlineSplitFunc(enc, flushAtEOF, trimFunc) + splitFunc, err = NewlineSplitFunc(enc, flushAtEOF) if err != nil { return nil, err } @@ -47,26 +45,26 @@ func (c Config) Func(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, tri if err != nil { return nil, fmt.Errorf("compile line end regex: %w", err) } - splitFunc = LineEndSplitFunc(re, flushAtEOF, trimFunc) + splitFunc = LineEndSplitFunc(re, flushAtEOF) case startPattern != "": re, err := regexp.Compile("(?m)" + c.LineStartPattern) if err != nil { return nil, fmt.Errorf("compile line start regex: %w", err) } - splitFunc = LineStartSplitFunc(re, flushAtEOF, trimFunc) + splitFunc = LineStartSplitFunc(re, flushAtEOF) } return splitFunc, nil } // LineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that start with a match to the regex pattern provided -func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) bufio.SplitFunc { +func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { firstLoc := re.FindIndex(data) if firstLoc == nil { // Flush if no more data is expected if len(data) != 0 && atEOF && flushAtEOF { - token = trimFunc(data) + token = data advance = len(data) return } @@ -78,7 +76,7 @@ func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) if firstMatchStart != 0 { // the beginning of the file does not match the start pattern, so return a token up to the first match so we don't lose data advance = firstMatchStart - token = trimFunc(data[0:firstMatchStart]) + token = data[0:firstMatchStart] // return if non-matching pattern is not only whitespaces if token != nil { @@ -93,7 +91,7 @@ func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) // Flush if no more data is expected if atEOF && flushAtEOF { - token = trimFunc(data) + token = data advance = len(data) return } @@ -105,8 +103,8 @@ func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) } secondMatchStart := secondLoc[0] + secondLocOfset - advance = secondMatchStart // start scanning at the beginning of the second match - token = trimFunc(data[firstMatchStart:secondMatchStart]) // the token begins at the first match, and ends at the beginning of the second match + advance = secondMatchStart // start scanning at the beginning of the second match + token = data[firstMatchStart:secondMatchStart] // the token begins at the first match, and ends at the beginning of the second match err = nil return } @@ -114,13 +112,13 @@ func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) // LineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that end with a match to the regex pattern provided -func LineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) bufio.SplitFunc { +func LineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { loc := re.FindIndex(data) if loc == nil { // Flush if no more data is expected if len(data) != 0 && atEOF && flushAtEOF { - token = trimFunc(data) + token = data advance = len(data) return } @@ -134,7 +132,7 @@ func LineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) bu } advance = loc[1] - token = trimFunc(data[:loc[1]]) + token = data[:loc[1]] err = nil return } @@ -142,7 +140,7 @@ func LineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) bu // NewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but // never returning an token using EOF as a terminator -func NewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool, trimFunc trim.Func) (bufio.SplitFunc, error) { +func NewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) { newline, err := encodedNewline(enc) if err != nil { return nil, err @@ -158,15 +156,19 @@ func NewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool, trimFunc trim.Func return 0, nil, nil } - if i := bytes.Index(data, newline); i >= 0 { + i := bytes.Index(data, newline) + if i == 0 { + return len(newline), []byte{}, nil + } + if i >= 0 { // We have a full newline-terminated line. token = bytes.TrimSuffix(data[:i], carriageReturn) - return i + len(newline), trimFunc(token), nil + return i + len(newline), token, nil } // Flush if no more data is expected if atEOF && flushAtEOF { - token = trimFunc(data) + token = data advance = len(data) return } diff --git a/pkg/stanza/split/split_test.go b/pkg/stanza/split/split_test.go index 372768553854..ace7112896d8 100644 --- a/pkg/stanza/split/split_test.go +++ b/pkg/stanza/split/split_test.go @@ -24,19 +24,14 @@ func TestConfigFunc(t *testing.T) { maxLogSize := 100 t.Run("BothStartAndEnd", func(t *testing.T) { - cfg := &Config{ - LineStartPattern: "foo", - LineEndPattern: "bar", - } - - _, err := cfg.Func(unicode.UTF8, false, maxLogSize, trim.Nop) + cfg := &Config{LineStartPattern: "foo", LineEndPattern: "bar"} + _, err := cfg.Func(unicode.UTF8, false, maxLogSize) assert.EqualError(t, err, "only one of line_start_pattern or line_end_pattern can be set") }) t.Run("NopEncoding", func(t *testing.T) { cfg := &Config{} - - f, err := cfg.Func(encoding.Nop, false, maxLogSize, trim.Nop) + f, err := cfg.Func(encoding.Nop, false, maxLogSize) assert.NoError(t, err) raw := splittest.GenerateBytes(maxLogSize * 2) @@ -48,8 +43,7 @@ func TestConfigFunc(t *testing.T) { t.Run("Newline", func(t *testing.T) { cfg := &Config{} - - f, err := cfg.Func(unicode.UTF8, false, maxLogSize, trim.Nop) + f, err := cfg.Func(unicode.UTF8, false, maxLogSize) assert.NoError(t, err) advance, token, err := f([]byte("foo\nbar\nbaz\n"), false) @@ -59,20 +53,14 @@ func TestConfigFunc(t *testing.T) { }) t.Run("InvalidStartRegex", func(t *testing.T) { - cfg := &Config{ - LineStartPattern: "[", - } - - _, err := cfg.Func(unicode.UTF8, false, maxLogSize, trim.Nop) + cfg := &Config{LineStartPattern: "["} + _, err := cfg.Func(unicode.UTF8, false, maxLogSize) assert.EqualError(t, err, "compile line start regex: error parsing regexp: missing closing ]: `[`") }) t.Run("InvalidEndRegex", func(t *testing.T) { - cfg := &Config{ - LineEndPattern: "[", - } - - _, err := cfg.Func(unicode.UTF8, false, maxLogSize, trim.Nop) + cfg := &Config{LineEndPattern: "["} + _, err := cfg.Func(unicode.UTF8, false, maxLogSize) assert.EqualError(t, err, "compile line end regex: error parsing regexp: missing closing ]: `[`") }) } @@ -92,8 +80,8 @@ func TestLineStartSplitFunc(t *testing.T) { Pattern: `LOGSTART \d+ `, Input: []byte(`LOGSTART 123 log1 LOGSTART 234 log2 LOGSTART 345 foo`), ExpectedTokens: []string{ - `LOGSTART 123 log1`, - `LOGSTART 234 log2`, + `LOGSTART 123 log1 `, + `LOGSTART 234 log2 `, }, }, { @@ -101,8 +89,8 @@ func TestLineStartSplitFunc(t *testing.T) { Pattern: `^LOGSTART \d+ `, Input: []byte("LOGSTART 123 LOGSTART 345 log1\nLOGSTART 234 log2\nLOGSTART 345 foo"), ExpectedTokens: []string{ - "LOGSTART 123 LOGSTART 345 log1", - "LOGSTART 234 log2", + "LOGSTART 123 LOGSTART 345 log1\n", + "LOGSTART 234 log2\n", }, }, { @@ -115,7 +103,7 @@ func TestLineStartSplitFunc(t *testing.T) { Pattern: `LOGSTART \d+ `, Input: []byte(`part that doesn't match LOGSTART 123 part that matchesLOGSTART 123 foo`), ExpectedTokens: []string{ - `part that doesn't match`, + `part that doesn't match `, `LOGSTART 123 part that matches`, }, }, @@ -161,30 +149,26 @@ func TestLineStartSplitFunc(t *testing.T) { Pattern: `^LOGSTART \d+`, Input: []byte("LOGSTART 12 log1\t \nLOGPART log1\nLOGPART log1\t \nLOGSTART 17 log2\nLOGPART log2\nanother line\nLOGSTART 43 log5"), ExpectedTokens: []string{ - "LOGSTART 12 log1\t \nLOGPART log1\nLOGPART log1", - "LOGSTART 17 log2\nLOGPART log2\nanother line", + "LOGSTART 12 log1\t \nLOGPART log1\nLOGPART log1\t \n", + "LOGSTART 17 log2\nLOGPART log2\nanother line\n", }, }, { - Name: "LogsWithoutFlusher", + Name: "NoMatch", Pattern: `^LOGSTART \d+`, Input: []byte("LOGPART log1\nLOGPART log1\t \n"), }, } for _, tc := range testCases { - cfg := Config{LineStartPattern: tc.Pattern} - trimFunc := trim.Config{ - PreserveLeading: tc.PreserveLeadingWhitespaces, - PreserveTrailing: tc.PreserveTrailingWhitespaces, - }.Func() - splitFunc, err := cfg.Func(unicode.UTF8, false, 0, trimFunc) + cfg := &Config{LineStartPattern: tc.Pattern} + splitFunc, err := cfg.Func(unicode.UTF8, false, 0) require.NoError(t, err) t.Run(tc.Name, tc.Run(splitFunc)) } t.Run("FirstMatchHitsEndOfBuffer", func(t *testing.T) { - splitFunc := LineStartSplitFunc(regexp.MustCompile("LOGSTART"), false, trim.Nop) + splitFunc := LineStartSplitFunc(regexp.MustCompile("LOGSTART"), false) data := []byte(`LOGSTART`) t.Run("NotAtEOF", func(t *testing.T) { @@ -221,21 +205,21 @@ func TestLineStartSplitFunc(t *testing.T) { }, } for _, tc := range flushAtEOFCases { - cfg := &Config{ - LineStartPattern: `^LOGSTART \d+`, - } - splitFunc, err := cfg.Func(unicode.UTF8, true, 0, trim.Nop) + cfg := &Config{LineStartPattern: `^LOGSTART \d+`} + splitFunc, err := cfg.Func(unicode.UTF8, true, 0) require.NoError(t, err) t.Run(tc.Name, tc.Run(splitFunc)) } }) + // TODO move to internal/splitter? t.Run("ApplyTrimFunc", func(t *testing.T) { - cfg := Config{LineStartPattern: ` LOGSTART \d+ `} + cfg := &Config{LineStartPattern: ` LOGSTART \d+ `} input := []byte(" LOGSTART 123 log1 LOGSTART 234 log2 LOGSTART 345 foo") - - splitTrimLeading, err := cfg.Func(unicode.UTF8, false, 0, trim.Leading) + splitFunc, err := cfg.Func(unicode.UTF8, false, 0) require.NoError(t, err) + + splitTrimLeading := trim.WithFunc(splitFunc, trim.Leading) t.Run("TrimLeading", splittest.TestCase{ Input: input, ExpectedTokens: []string{ @@ -243,8 +227,7 @@ func TestLineStartSplitFunc(t *testing.T) { `LOGSTART 234 log2 `, }}.Run(splitTrimLeading)) - splitTrimTrailing, err := cfg.Func(unicode.UTF8, false, 0, trim.Trailing) - require.NoError(t, err) + splitTrimTrailing := trim.WithFunc(splitFunc, trim.Trailing) t.Run("TrimTrailing", splittest.TestCase{ Input: input, ExpectedTokens: []string{ @@ -252,8 +235,7 @@ func TestLineStartSplitFunc(t *testing.T) { ` LOGSTART 234 log2`, }}.Run(splitTrimTrailing)) - splitTrimBoth, err := cfg.Func(unicode.UTF8, false, 0, trim.Whitespace) - require.NoError(t, err) + splitTrimBoth := trim.WithFunc(splitFunc, trim.Whitespace) t.Run("TrimBoth", splittest.TestCase{ Input: input, ExpectedTokens: []string{ @@ -288,7 +270,7 @@ func TestLineEndSplitFunc(t *testing.T) { Input: []byte("log1 LOGEND LOGEND\nlog2 LOGEND\n"), ExpectedTokens: []string{ "log1 LOGEND LOGEND", - "log2 LOGEND", + "\nlog2 LOGEND", }, }, { @@ -339,38 +321,31 @@ func TestLineEndSplitFunc(t *testing.T) { ExpectedError: errors.New("bufio.Scanner: token too long"), }, { - Name: "MultipleMultilineLogs", + Name: "MultiplesplitLogs", Pattern: `^LOGEND.*$`, Input: []byte("LOGSTART 12 log1\t \nLOGPART log1\nLOGEND log1\t \nLOGSTART 17 log2\nLOGPART log2\nLOGEND log2\nLOGSTART 43 log5"), ExpectedTokens: []string{ - "LOGSTART 12 log1\t \nLOGPART log1\nLOGEND log1", - "LOGSTART 17 log2\nLOGPART log2\nLOGEND log2", + "LOGSTART 12 log1\t \nLOGPART log1\nLOGEND log1\t ", + "\nLOGSTART 17 log2\nLOGPART log2\nLOGEND log2", }, }, { - Name: "LogsWithoutFlusher", + Name: "NoMatch", Pattern: `^LOGEND.*$`, Input: []byte("LOGPART log1\nLOGPART log1\t \n"), }, } for _, tc := range testCases { - cfg := Config{LineEndPattern: tc.Pattern} - - trimFunc := trim.Config{ - PreserveLeading: tc.PreserveLeadingWhitespaces, - PreserveTrailing: tc.PreserveTrailingWhitespaces, - }.Func() - splitFunc, err := cfg.Func(unicode.UTF8, false, 0, trimFunc) + cfg := &Config{LineEndPattern: tc.Pattern} + splitFunc, err := cfg.Func(unicode.UTF8, false, 0) require.NoError(t, err) t.Run(tc.Name, tc.Run(splitFunc)) } t.Run("FlushAtEOF", func(t *testing.T) { - cfg := &Config{ - LineEndPattern: `^LOGSTART \d+`, - } - splitFunc, err := cfg.Func(unicode.UTF8, true, 0, trim.Nop) + cfg := &Config{LineEndPattern: `^LOGSTART \d+`} + splitFunc, err := cfg.Func(unicode.UTF8, true, 0) require.NoError(t, err) splittest.TestCase{ Name: "NoMatch", @@ -379,12 +354,14 @@ func TestLineEndSplitFunc(t *testing.T) { }.Run(splitFunc)(t) }) + // TODO move to internal/splitter? t.Run("ApplyTrimFunc", func(t *testing.T) { - cfg := Config{LineEndPattern: ` LOGEND `} + cfg := &Config{LineEndPattern: ` LOGEND `} input := []byte(" log1 LOGEND log2 LOGEND ") - - splitTrimLeading, err := cfg.Func(unicode.UTF8, false, 0, trim.Leading) + splitFunc, err := cfg.Func(unicode.UTF8, false, 0) require.NoError(t, err) + + splitTrimLeading := trim.WithFunc(splitFunc, trim.Leading) t.Run("TrimLeading", splittest.TestCase{ Input: input, ExpectedTokens: []string{ @@ -392,8 +369,7 @@ func TestLineEndSplitFunc(t *testing.T) { `log2 LOGEND `, }}.Run(splitTrimLeading)) - splitTrimTrailing, err := cfg.Func(unicode.UTF8, false, 0, trim.Trailing) - require.NoError(t, err) + splitTrimTrailing := trim.WithFunc(splitFunc, trim.Trailing) t.Run("TrimTrailing", splittest.TestCase{ Input: input, ExpectedTokens: []string{ @@ -401,8 +377,7 @@ func TestLineEndSplitFunc(t *testing.T) { ` log2 LOGEND`, }}.Run(splitTrimTrailing)) - splitTrimBoth, err := cfg.Func(unicode.UTF8, false, 0, trim.Whitespace) - require.NoError(t, err) + splitTrimBoth := trim.WithFunc(splitFunc, trim.Whitespace) t.Run("TrimBoth", splittest.TestCase{ Input: input, ExpectedTokens: []string{ @@ -484,7 +459,7 @@ func TestNewlineSplitFunc(t *testing.T) { Input: []byte("LOGPART log1"), }, { - Name: "DefaultFlusherSplits", + Name: "DefaultSplits", Input: []byte("log1\nlog2\n"), ExpectedTokens: []string{ "log1", @@ -499,48 +474,16 @@ func TestNewlineSplitFunc(t *testing.T) { "LOGEND 333", }, }, - { - Name: "PreserveLeadingWhitespaces", - Input: []byte("\n LOGEND 333 \nAnother one "), - ExpectedTokens: []string{ - "", - " LOGEND 333", - }, - PreserveLeadingWhitespaces: true, - }, - { - Name: "PreserveTrailingWhitespaces", - Input: []byte("\n LOGEND 333 \nAnother one "), - ExpectedTokens: []string{ - "", - "LOGEND 333 ", - }, - PreserveTrailingWhitespaces: true, - }, - { - Name: "PreserveBothLeadingAndTrailingWhitespaces", - Input: []byte("\n LOGEND 333 \nAnother one "), - ExpectedTokens: []string{ - "", - " LOGEND 333 ", - }, - PreserveLeadingWhitespaces: true, - PreserveTrailingWhitespaces: true, - }, } for _, tc := range testCases { - trimFunc := trim.Config{ - PreserveLeading: tc.PreserveLeadingWhitespaces, - PreserveTrailing: tc.PreserveTrailingWhitespaces, - }.Func() - splitFunc, err := NewlineSplitFunc(unicode.UTF8, false, trimFunc) + splitFunc, err := NewlineSplitFunc(unicode.UTF8, false) require.NoError(t, err) t.Run(tc.Name, tc.Run(splitFunc)) } t.Run("FlushAtEOF", func(t *testing.T) { - splitFunc, err := Config{}.Func(unicode.UTF8, true, 0, trim.Nop) + splitFunc, err := Config{}.Func(unicode.UTF8, true, 0) require.NoError(t, err) splittest.TestCase{ Name: "FlushAtEOF", @@ -549,12 +492,14 @@ func TestNewlineSplitFunc(t *testing.T) { }.Run(splitFunc)(t) }) + // // TODO move to internal/splitter? t.Run("ApplyTrimFunc", func(t *testing.T) { cfg := &Config{} input := []byte(" log1 \n log2 \n") - - splitTrimLeading, err := cfg.Func(unicode.UTF8, false, 0, trim.Leading) + splitFunc, err := cfg.Func(unicode.UTF8, false, 0) require.NoError(t, err) + + splitTrimLeading := trim.WithFunc(splitFunc, trim.Leading) t.Run("TrimLeading", splittest.TestCase{ Input: input, ExpectedTokens: []string{ @@ -562,8 +507,7 @@ func TestNewlineSplitFunc(t *testing.T) { `log2 `, }}.Run(splitTrimLeading)) - splitTrimTrailing, err := cfg.Func(unicode.UTF8, false, 0, trim.Trailing) - require.NoError(t, err) + splitTrimTrailing := trim.WithFunc(splitFunc, trim.Trailing) t.Run("TrimTrailing", splittest.TestCase{ Input: input, ExpectedTokens: []string{ @@ -571,8 +515,9 @@ func TestNewlineSplitFunc(t *testing.T) { ` log2`, }}.Run(splitTrimTrailing)) - splitTrimBoth, err := cfg.Func(unicode.UTF8, false, 0, trim.Whitespace) + splitTrimBoth, err := cfg.Func(unicode.UTF8, false, 0) require.NoError(t, err) + splitTrimBoth = trim.WithFunc(splitTrimBoth, trim.Whitespace) t.Run("TrimBoth", splittest.TestCase{ Input: input, ExpectedTokens: []string{ @@ -644,14 +589,12 @@ func TestNoSplitFunc(t *testing.T) { } func TestNoopEncodingError(t *testing.T) { - endCfg := Config{LineEndPattern: "\n"} - - _, err := endCfg.Func(encoding.Nop, false, 0, trim.Nop) + endCfg := &Config{LineEndPattern: "\n"} + _, err := endCfg.Func(encoding.Nop, false, 0) require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding")) - startCfg := Config{LineStartPattern: "\n"} - - _, err = startCfg.Func(encoding.Nop, false, 0, trim.Nop) + startCfg := &Config{LineStartPattern: "\n"} + _, err = startCfg.Func(encoding.Nop, false, 0) require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding")) } @@ -712,7 +655,7 @@ func TestNewlineSplitFunc_Encodings(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - splitFunc, err := NewlineSplitFunc(tc.encoding, false, trim.Nop) + splitFunc, err := NewlineSplitFunc(tc.encoding, false) require.NoError(t, err) scanner := bufio.NewScanner(bytes.NewReader(tc.input)) scanner.Split(splitFunc) diff --git a/pkg/stanza/split/splittest/splittest.go b/pkg/stanza/split/splittest/splittest.go index 1ea3ebc2c3ca..b784b1b68dec 100644 --- a/pkg/stanza/split/splittest/splittest.go +++ b/pkg/stanza/split/splittest/splittest.go @@ -71,18 +71,16 @@ func (r *testReader) splitFunc(split bufio.SplitFunc) bufio.SplitFunc { } type TestCase struct { - Name string - Pattern string - Input []byte - ExpectedTokens []string - ExpectedError error - Sleep time.Duration - AdditionalIterations int - PreserveLeadingWhitespaces bool - PreserveTrailingWhitespaces bool + Name string + Pattern string + Input []byte + ExpectedTokens []string + ExpectedError error + Sleep time.Duration + AdditionalIterations int } -func (tc TestCase) Run(split bufio.SplitFunc) func(t *testing.T) { +func (tc TestCase) Run(splitFunc bufio.SplitFunc) func(t *testing.T) { reader := newTestReader(tc.Input) return func(t *testing.T) { @@ -94,7 +92,7 @@ func (tc TestCase) Run(split bufio.SplitFunc) func(t *testing.T) { } reader.Reset() scanner := bufio.NewScanner(reader) - scanner.Split(reader.splitFunc(split)) + scanner.Split(reader.splitFunc(splitFunc)) for { ok := scanner.Scan() if !ok { diff --git a/pkg/stanza/trim/trim.go b/pkg/stanza/trim/trim.go index 90118dfc6543..20e48e3fe3da 100644 --- a/pkg/stanza/trim/trim.go +++ b/pkg/stanza/trim/trim.go @@ -4,11 +4,22 @@ package trim // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" import ( + "bufio" "bytes" ) type Func func([]byte) []byte +func WithFunc(splitFunc bufio.SplitFunc, trimFunc Func) bufio.SplitFunc { + if trimFunc == nil { + return splitFunc + } + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + advance, token, err = splitFunc(data, atEOF) + return advance, trimFunc(token), err + } +} + type Config struct { PreserveLeading bool `mapstructure:"preserve_leading_whitespaces,omitempty"` PreserveTrailing bool `mapstructure:"preserve_trailing_whitespaces,omitempty"` @@ -27,27 +38,24 @@ func (c Config) Func() Func { return Whitespace } -func Nop(token []byte) []byte { +var Nop Func = func(token []byte) []byte { return token } -func Leading(data []byte) []byte { - // TrimLeft to strip EOF whitespaces in case of using $ in regex - // For some reason newline and carriage return are being moved to beginning of next log +var Leading Func = func(data []byte) []byte { token := bytes.TrimLeft(data, "\r\n\t ") - - // TrimLeft will return nil if data is an empty slice if token == nil { - return []byte{} + // TrimLeft sometimes overwrites something with nothing. + // We need to override this behavior in order to preserve empty tokens. + return data } return token } -func Trailing(data []byte) []byte { - // TrimRight to strip all whitespaces from the end of log +var Trailing Func = func(data []byte) []byte { return bytes.TrimRight(data, "\r\n\t ") } -func Whitespace(data []byte) []byte { +var Whitespace Func = func(data []byte) []byte { return Leading(Trailing(data)) } diff --git a/pkg/stanza/trim/trim_test.go b/pkg/stanza/trim/trim_test.go index 114a645853ee..5db6cf44f77f 100644 --- a/pkg/stanza/trim/trim_test.go +++ b/pkg/stanza/trim/trim_test.go @@ -4,6 +4,7 @@ package trim import ( + "bufio" "testing" "github.com/stretchr/testify/assert" @@ -15,36 +16,50 @@ func TestTrim(t *testing.T) { name string preserveLeading bool preserveTrailing bool - input string - expect string + input []byte + expect []byte }{ { name: "preserve both", preserveLeading: true, preserveTrailing: true, - input: " hello world ", - expect: " hello world ", + input: []byte(" hello world "), + expect: []byte(" hello world "), }, { name: "preserve leading", preserveLeading: true, preserveTrailing: false, - input: " hello world ", - expect: " hello world", + input: []byte(" hello world "), + expect: []byte(" hello world"), }, { name: "preserve trailing", preserveLeading: false, preserveTrailing: true, - input: " hello world ", - expect: "hello world ", + input: []byte(" hello world "), + expect: []byte("hello world "), }, { name: "preserve neither", preserveLeading: false, preserveTrailing: false, - input: " hello world ", - expect: "hello world", + input: []byte(" hello world "), + expect: []byte("hello world"), + }, + { + name: "trim leading returns nil when given nil", + preserveLeading: false, + preserveTrailing: true, + input: nil, + expect: nil, + }, + { + name: "trim leading returns []byte when given []byte", + preserveLeading: false, + preserveTrailing: true, + input: []byte{}, + expect: []byte{}, }, } @@ -54,10 +69,55 @@ func TestTrim(t *testing.T) { PreserveLeading: tc.preserveLeading, PreserveTrailing: tc.preserveTrailing, }.Func() - assert.Equal(t, []byte(tc.expect), trimFunc([]byte(tc.input))) - - // Also test that regardless of configuration, an empty []byte in gives an empty []byte out - assert.Equal(t, []byte{}, trimFunc([]byte{})) + assert.Equal(t, tc.expect, trimFunc(tc.input)) }) } } + +func TestWithFunc(t *testing.T) { + scanAndTrimLines := WithFunc(bufio.ScanLines, Config{ + PreserveLeading: false, + PreserveTrailing: false, + }.Func()) + + input := []byte(" hello \n world \n extra ") + + advance, token, err := scanAndTrimLines(input, false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte("hello"), token) + + advance, token, err = scanAndTrimLines(input[8:], false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte("world"), token) + + advance, token, err = scanAndTrimLines(input[16:], false) + assert.NoError(t, err) + assert.Equal(t, 0, advance) + assert.Nil(t, token) +} + +func TestWithNilTrimFunc(t *testing.T) { + // Same test as above, but pass nil instead of a trim func + // In other words, we should expect exactly the behavior of bufio.ScanLines. + + scanLines := WithFunc(bufio.ScanLines, nil) + + input := []byte(" hello \n world \n extra ") + + advance, token, err := scanLines(input, false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte(" hello "), token) + + advance, token, err = scanLines(input[8:], false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte(" world "), token) + + advance, token, err = scanLines(input[16:], false) + assert.NoError(t, err) + assert.Equal(t, 0, advance) + assert.Nil(t, token) +}