Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pkg/stanza] Fully decompose the tokenize package #26241

Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Major refactoring of tokenize package
  • Loading branch information
djaglowski committed Sep 12, 2023
commit 1646f4b5c90d1a0cd9f53e7de50a8cbf482b13ff
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (c Config) BuildWithSplitFunc(logger *zap.SugaredLogger, emit emit.Callback
}

// Ensure that splitter is buildable
factory := splitter.NewCustomFactory(splitFunc, c.FlushPeriod)
factory := splitter.NewCustomSplitFuncFactory(splitFunc, c.TrimConfig.Func(), c.FlushPeriod)
if _, err := factory.SplitFunc(); err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/stanza/fileconsumer/internal/splitter/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ import (

type customFactory struct {
splitFunc bufio.SplitFunc
trimFunc trim.Func
flushPeriod time.Duration
}

var _ Factory = (*customFactory)(nil)

func NewCustomFactory(splitFunc bufio.SplitFunc, flushPeriod time.Duration) Factory {
func NewCustomSplitFuncFactory(splitFunc bufio.SplitFunc, trimFunc trim.Func, flushPeriod time.Duration) Factory {
return &customFactory{
splitFunc: splitFunc,
trimFunc: trimFunc,
flushPeriod: flushPeriod,
}
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/stanza/fileconsumer/internal/splitter/custom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,29 @@ import (
"time"

"github.com/stretchr/testify/assert"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

func TestCustomFactory(t *testing.T) {
tests := []struct {
name string
splitter bufio.SplitFunc
splitFunc bufio.SplitFunc
flushPeriod time.Duration
wantErr bool
}{
{
name: "default configuration",
splitter: func(data []byte, atEOF bool) (advance int, token []byte, err error) {
splitFunc: func(data []byte, atEOF bool) (advance int, token []byte, err error) {
return len(data), data, nil
},
flushPeriod: 100 * time.Millisecond,
flushPeriod: 500 * time.Millisecond,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := NewCustomFactory(tt.splitter, tt.flushPeriod)
factory := NewCustomSplitFuncFactory(tt.splitFunc, trim.Whitespace, tt.flushPeriod)
got, err := factory.SplitFunc()
if (err != nil) != tt.wantErr {
t.Errorf("Build() error = %v, wantErr %v", err, tt.wantErr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestSplitFuncFactory(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := NewSplitFuncFactory(tt.splitConfig, tt.encoding, tt.maxLogSize, trim.Nop, tt.flushPeriod)
factory := NewSplitFuncFactory(tt.splitConfig, tt.encoding, tt.maxLogSize, trim.Whitespace, tt.flushPeriod)
got, err := factory.SplitFunc()
if (err != nil) != tt.wantErr {
t.Errorf("SplitFunc() error = %v, wantErr %v", err, tt.wantErr)
Expand Down
7 changes: 3 additions & 4 deletions pkg/stanza/operator/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,8 @@ type BaseConfig struct {

type SplitFuncBuilder func(enc encoding.Encoding) (bufio.SplitFunc, error)

func (c Config) defaultMultilineBuilder(enc encoding.Encoding) (bufio.SplitFunc, error) {
trimFunc := c.TrimConfig.Func()
splitFunc, err := c.SplitConfig.Func(enc, true, int(c.MaxLogSize), trimFunc)
func (c Config) defaultSplitFuncBuilder(enc encoding.Encoding) (bufio.SplitFunc, error) {
splitFunc, err := c.SplitConfig.Func(enc, true, int(c.MaxLogSize), c.TrimConfig.Func())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -121,7 +120,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
}

if c.SplitFuncBuilder == nil {
c.SplitFuncBuilder = c.defaultMultilineBuilder
c.SplitFuncBuilder = c.defaultSplitFuncBuilder
}

// Build split func
Expand Down
113 changes: 51 additions & 62 deletions pkg/stanza/split/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ func TestConfigFunc(t *testing.T) {
_, err := cfg.Func(unicode.UTF8, false, maxLogSize, trim.Nop)
assert.EqualError(t, err, "compile line end regex: error parsing regexp: missing closing ]: `[`")
})

t.Run("EncodeNewlineDstTooShort", func(t *testing.T) {
cfg := &Config{}
enc := errEncoding{}
_, err := cfg.Func(&enc, false, maxLogSize, trim.Nop)
assert.EqualError(t, err, "strange encoding")
})
}

func TestLineStartSplitFunc(t *testing.T) {
Expand All @@ -92,17 +99,17 @@ func TestLineStartSplitFunc(t *testing.T) {
Pattern: `LOGSTART \d+ `,
Input: []byte(`LOGSTART 123 log1 LOGSTART 234 log2 LOGSTART 345 foo`),
ExpectedTokens: []string{
`LOGSTART 123 log1`,
`LOGSTART 234 log2`,
`LOGSTART 123 log1 `,
`LOGSTART 234 log2 `,
},
},
{
Name: "TwoLogsLineStart",
Pattern: `^LOGSTART \d+ `,
Input: []byte("LOGSTART 123 LOGSTART 345 log1\nLOGSTART 234 log2\nLOGSTART 345 foo"),
ExpectedTokens: []string{
"LOGSTART 123 LOGSTART 345 log1",
"LOGSTART 234 log2",
"LOGSTART 123 LOGSTART 345 log1\n",
"LOGSTART 234 log2\n",
},
},
{
Expand All @@ -115,7 +122,7 @@ func TestLineStartSplitFunc(t *testing.T) {
Pattern: `LOGSTART \d+ `,
Input: []byte(`part that doesn't match LOGSTART 123 part that matchesLOGSTART 123 foo`),
ExpectedTokens: []string{
`part that doesn't match`,
`part that doesn't match `,
`LOGSTART 123 part that matches`,
},
},
Expand Down Expand Up @@ -157,28 +164,24 @@ func TestLineStartSplitFunc(t *testing.T) {
ExpectedError: errors.New("bufio.Scanner: token too long"),
},
{
Name: "MultipleMultilineLogs",
Name: "MultiplesplitLogs",
Pattern: `^LOGSTART \d+`,
Input: []byte("LOGSTART 12 log1\t \nLOGPART log1\nLOGPART log1\t \nLOGSTART 17 log2\nLOGPART log2\nanother line\nLOGSTART 43 log5"),
ExpectedTokens: []string{
"LOGSTART 12 log1\t \nLOGPART log1\nLOGPART log1",
"LOGSTART 17 log2\nLOGPART log2\nanother line",
"LOGSTART 12 log1\t \nLOGPART log1\nLOGPART log1\t \n",
"LOGSTART 17 log2\nLOGPART log2\nanother line\n",
},
},
{
Name: "LogsWithoutFlusher",
Name: "NoMatch",
Pattern: `^LOGSTART \d+`,
Input: []byte("LOGPART log1\nLOGPART log1\t \n"),
},
}

for _, tc := range testCases {
cfg := Config{LineStartPattern: tc.Pattern}
trimFunc := trim.Config{
PreserveLeading: tc.PreserveLeadingWhitespaces,
PreserveTrailing: tc.PreserveTrailingWhitespaces,
}.Func()
splitFunc, err := cfg.Func(unicode.UTF8, false, 0, trimFunc)
cfg := &Config{LineStartPattern: tc.Pattern}
splitFunc, err := cfg.Func(unicode.UTF8, false, 0, trim.Nop)
require.NoError(t, err)
t.Run(tc.Name, tc.Run(splitFunc))
}
Expand Down Expand Up @@ -288,7 +291,7 @@ func TestLineEndSplitFunc(t *testing.T) {
Input: []byte("log1 LOGEND LOGEND\nlog2 LOGEND\n"),
ExpectedTokens: []string{
"log1 LOGEND LOGEND",
"log2 LOGEND",
"\nlog2 LOGEND",
},
},
{
Expand Down Expand Up @@ -339,29 +342,24 @@ func TestLineEndSplitFunc(t *testing.T) {
ExpectedError: errors.New("bufio.Scanner: token too long"),
},
{
Name: "MultipleMultilineLogs",
Name: "MultiplesplitLogs",
Pattern: `^LOGEND.*$`,
Input: []byte("LOGSTART 12 log1\t \nLOGPART log1\nLOGEND log1\t \nLOGSTART 17 log2\nLOGPART log2\nLOGEND log2\nLOGSTART 43 log5"),
ExpectedTokens: []string{
"LOGSTART 12 log1\t \nLOGPART log1\nLOGEND log1",
"LOGSTART 17 log2\nLOGPART log2\nLOGEND log2",
"LOGSTART 12 log1\t \nLOGPART log1\nLOGEND log1\t ",
"\nLOGSTART 17 log2\nLOGPART log2\nLOGEND log2",
},
},
{
Name: "LogsWithoutFlusher",
Name: "NoMatch",
Pattern: `^LOGEND.*$`,
Input: []byte("LOGPART log1\nLOGPART log1\t \n"),
},
}

for _, tc := range testCases {
cfg := Config{LineEndPattern: tc.Pattern}

trimFunc := trim.Config{
PreserveLeading: tc.PreserveLeadingWhitespaces,
PreserveTrailing: tc.PreserveTrailingWhitespaces,
}.Func()
splitFunc, err := cfg.Func(unicode.UTF8, false, 0, trimFunc)
splitFunc, err := cfg.Func(unicode.UTF8, false, 0, trim.Nop)
require.NoError(t, err)
t.Run(tc.Name, tc.Run(splitFunc))
}
Expand Down Expand Up @@ -484,7 +482,7 @@ func TestNewlineSplitFunc(t *testing.T) {
Input: []byte("LOGPART log1"),
},
{
Name: "DefaultFlusherSplits",
Name: "DefaultSplits",
Input: []byte("log1\nlog2\n"),
ExpectedTokens: []string{
"log1",
Expand All @@ -499,42 +497,10 @@ func TestNewlineSplitFunc(t *testing.T) {
"LOGEND 333",
},
},
{
Name: "PreserveLeadingWhitespaces",
Input: []byte("\n LOGEND 333 \nAnother one "),
ExpectedTokens: []string{
"",
" LOGEND 333",
},
PreserveLeadingWhitespaces: true,
},
{
Name: "PreserveTrailingWhitespaces",
Input: []byte("\n LOGEND 333 \nAnother one "),
ExpectedTokens: []string{
"",
"LOGEND 333 ",
},
PreserveTrailingWhitespaces: true,
},
{
Name: "PreserveBothLeadingAndTrailingWhitespaces",
Input: []byte("\n LOGEND 333 \nAnother one "),
ExpectedTokens: []string{
"",
" LOGEND 333 ",
},
PreserveLeadingWhitespaces: true,
PreserveTrailingWhitespaces: true,
},
}

for _, tc := range testCases {
trimFunc := trim.Config{
PreserveLeading: tc.PreserveLeadingWhitespaces,
PreserveTrailing: tc.PreserveTrailingWhitespaces,
}.Func()
splitFunc, err := NewlineSplitFunc(unicode.UTF8, false, trimFunc)
splitFunc, err := NewlineSplitFunc(unicode.UTF8, false, trim.Nop)
require.NoError(t, err)
t.Run(tc.Name, tc.Run(splitFunc))
}
Expand Down Expand Up @@ -623,6 +589,7 @@ func TestNoSplitFunc(t *testing.T) {
"stuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmn",
},
},

{
Name: "EOFBeforeMaxLogSize",
Input: func() []byte {
Expand All @@ -645,12 +612,10 @@ func TestNoSplitFunc(t *testing.T) {

func TestNoopEncodingError(t *testing.T) {
endCfg := Config{LineEndPattern: "\n"}

_, err := endCfg.Func(encoding.Nop, false, 0, trim.Nop)
require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding"))

startCfg := Config{LineStartPattern: "\n"}

_, err = startCfg.Func(encoding.Nop, false, 0, trim.Nop)
require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding"))
}
Expand Down Expand Up @@ -689,6 +654,7 @@ func TestNewlineSplitFunc_Encodings(t *testing.T) {
{0, 108, 0, 111, 0, 103, 0, 50}, // log2
},
},

{
"MultiCarriageReturnUTF16",
unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM),
Expand Down Expand Up @@ -732,3 +698,26 @@ func TestNewlineSplitFunc_Encodings(t *testing.T) {
})
}
}

// TODO the only error case is when an encoding requires more than 10
// bytes to represent a newline or carriage return. This is very unlikely to happen
// and doesn't seem to warrant error handling in more than one place.
// Therefore, move error case into decoder.LookupEncoding?
type errEncoding struct{}
type errEncoder struct{}

func (e errEncoding) NewEncoder() *encoding.Encoder {
return &encoding.Encoder{
Transformer: &errEncoder{},
}
}

func (e errEncoding) NewDecoder() *encoding.Decoder {
return &encoding.Decoder{}
}

func (e errEncoder) Transform(_, _ []byte, _ bool) (int, int, error) {
return 0, 0, errors.New("strange encoding")
}

func (e errEncoder) Reset() {}
16 changes: 7 additions & 9 deletions pkg/stanza/split/splittest/splittest.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,13 @@ func (r *testReader) splitFunc(split bufio.SplitFunc) bufio.SplitFunc {
}

type TestCase struct {
Name string
Pattern string
Input []byte
ExpectedTokens []string
ExpectedError error
Sleep time.Duration
AdditionalIterations int
PreserveLeadingWhitespaces bool
PreserveTrailingWhitespaces bool
Name string
Pattern string
Input []byte
ExpectedTokens []string
ExpectedError error
Sleep time.Duration
AdditionalIterations int
}

func (tc TestCase) Run(split bufio.SplitFunc) func(t *testing.T) {
Expand Down