diff --git a/.chloggen/pkg-stanza-rm-tokenize.yaml b/.chloggen/pkg-stanza-rm-tokenize.yaml index ff533bc5e3e8..d4677afc292f 100755 --- a/.chloggen/pkg-stanza-rm-tokenize.yaml +++ b/.chloggen/pkg-stanza-rm-tokenize.yaml @@ -15,7 +15,10 @@ issues: [26540] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. # Use pipe (|) for multiline entries. -subtext: +subtext: | + - Remove 'Multiline' struct + - Remove 'NewMultilineConfig' struct + - Rename 'MultilineConfig' to 'split.Config' # If your change doesn't affect end users or the exported elements of any package, # you should instead start your pull request title with [chore] or use the "Skip Changelog" label. diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index e56df81c03ff..376f3c86c6ee 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -53,7 +53,6 @@ func NewConfig() *Config { IncludeFileNameResolved: false, IncludeFilePathResolved: false, PollInterval: 200 * time.Millisecond, - Multiline: split.NewMultilineConfig(), Encoding: defaultEncoding, StartAt: "end", FingerprintSize: fingerprint.DefaultSize, @@ -66,22 +65,22 @@ func NewConfig() *Config { // Config is the configuration of a file input operator type Config struct { matcher.Criteria `mapstructure:",squash"` - IncludeFileName bool `mapstructure:"include_file_name,omitempty"` - IncludeFilePath bool `mapstructure:"include_file_path,omitempty"` - IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty"` - IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty"` - PollInterval time.Duration `mapstructure:"poll_interval,omitempty"` - StartAt string `mapstructure:"start_at,omitempty"` - FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty"` - MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"` - MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty"` - MaxBatches int `mapstructure:"max_batches,omitempty"` - DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"` - Multiline split.MultilineConfig `mapstructure:"multiline,omitempty"` - TrimConfig trim.Config `mapstructure:",squash,omitempty"` - Encoding string `mapstructure:"encoding,omitempty"` - FlushPeriod time.Duration `mapstructure:"force_flush_period,omitempty"` - Header *HeaderConfig `mapstructure:"header,omitempty"` + IncludeFileName bool `mapstructure:"include_file_name,omitempty"` + IncludeFilePath bool `mapstructure:"include_file_path,omitempty"` + IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty"` + IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty"` + PollInterval time.Duration `mapstructure:"poll_interval,omitempty"` + StartAt string `mapstructure:"start_at,omitempty"` + FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty"` + MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"` + MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty"` + MaxBatches int `mapstructure:"max_batches,omitempty"` + DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"` + SplitConfig split.Config `mapstructure:"multiline,omitempty"` + TrimConfig trim.Config `mapstructure:",squash,omitempty"` + Encoding string `mapstructure:"encoding,omitempty"` + FlushPeriod time.Duration `mapstructure:"force_flush_period,omitempty"` + Header *HeaderConfig `mapstructure:"header,omitempty"` } type HeaderConfig struct { @@ -101,7 +100,7 @@ func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback) (*Manager, } // Ensure that splitter is buildable - factory := splitter.NewMultilineFactory(c.Multiline, enc, int(c.MaxLogSize), c.TrimConfig.Func(), c.FlushPeriod) + factory := splitter.NewMultilineFactory(c.SplitConfig, enc, int(c.MaxLogSize), c.TrimConfig.Func(), c.FlushPeriod) if _, err := factory.Build(); err != nil { return nil, err } diff --git a/pkg/stanza/fileconsumer/config_test.go b/pkg/stanza/fileconsumer/config_test.go index f5fbc9efafe8..3ee03b7b751f 100644 --- a/pkg/stanza/fileconsumer/config_test.go +++ b/pkg/stanza/fileconsumer/config_test.go @@ -16,7 +16,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" ) @@ -280,9 +279,7 @@ func TestUnmarshal(t *testing.T) { Name: "multiline_line_start_string", Expect: func() *mockOperatorConfig { cfg := NewConfig() - cfg.Multiline = split.MultilineConfig{ - LineStartPattern: "Start", - } + cfg.SplitConfig.LineStartPattern = "Start" return newMockOperatorConfig(cfg) }(), }, @@ -290,9 +287,7 @@ func TestUnmarshal(t *testing.T) { Name: "multiline_line_start_special", Expect: func() *mockOperatorConfig { cfg := NewConfig() - cfg.Multiline = split.MultilineConfig{ - LineStartPattern: "%", - } + cfg.SplitConfig.LineStartPattern = "%" return newMockOperatorConfig(cfg) }(), }, @@ -300,9 +295,7 @@ func TestUnmarshal(t *testing.T) { Name: "multiline_line_end_string", Expect: func() *mockOperatorConfig { cfg := NewConfig() - cfg.Multiline = split.MultilineConfig{ - LineEndPattern: "Start", - } + cfg.SplitConfig.LineEndPattern = "Start" return newMockOperatorConfig(cfg) }(), }, @@ -310,9 +303,7 @@ func TestUnmarshal(t *testing.T) { Name: "multiline_line_end_special", Expect: func() *mockOperatorConfig { cfg := NewConfig() - cfg.Multiline = split.MultilineConfig{ - LineEndPattern: "%", - } + cfg.SplitConfig.LineEndPattern = "%" return newMockOperatorConfig(cfg) }(), }, @@ -427,127 +418,113 @@ func TestBuild(t *testing.T) { }{ { "Basic", - func(f *Config) {}, + func(cfg *Config) {}, require.NoError, - func(t *testing.T, f *Manager) { - require.Equal(t, f.pollInterval, 10*time.Millisecond) + func(t *testing.T, m *Manager) { + require.Equal(t, m.pollInterval, 10*time.Millisecond) }, }, { "BadIncludeGlob", - func(f *Config) { - f.Include = []string{"["} + func(cfg *Config) { + cfg.Include = []string{"["} }, require.Error, nil, }, { "BadExcludeGlob", - func(f *Config) { - f.Include = []string{"["} + func(cfg *Config) { + cfg.Include = []string{"["} }, require.Error, nil, }, { "MultilineConfiguredStartAndEndPatterns", - func(f *Config) { - f.Multiline = split.MultilineConfig{ - LineEndPattern: "Exists", - LineStartPattern: "Exists", - } + func(cfg *Config) { + cfg.SplitConfig.LineEndPattern = "Exists" + cfg.SplitConfig.LineStartPattern = "Exists" }, require.Error, nil, }, { "MultilineConfiguredStartPattern", - func(f *Config) { - f.Multiline = split.MultilineConfig{ - LineStartPattern: "START.*", - } + func(cfg *Config) { + cfg.SplitConfig.LineStartPattern = "START.*" }, require.NoError, func(t *testing.T, f *Manager) {}, }, { "MultilineConfiguredEndPattern", - func(f *Config) { - f.Multiline = split.MultilineConfig{ - LineEndPattern: "END.*", - } + func(cfg *Config) { + cfg.SplitConfig.LineEndPattern = "END.*" }, require.NoError, func(t *testing.T, f *Manager) {}, }, { "InvalidEncoding", - func(f *Config) { - f.Encoding = "UTF-3233" + func(cfg *Config) { + cfg.Encoding = "UTF-3233" }, require.Error, nil, }, { "LineStartAndEnd", - func(f *Config) { - f.Multiline = split.MultilineConfig{ - LineStartPattern: ".*", - LineEndPattern: ".*", - } + func(cfg *Config) { + cfg.SplitConfig.LineStartPattern = ".*" + cfg.SplitConfig.LineEndPattern = ".*" }, require.Error, nil, }, { "NoLineStartOrEnd", - func(f *Config) { - f.Multiline = split.MultilineConfig{} - }, + func(cfg *Config) {}, require.NoError, func(t *testing.T, f *Manager) {}, }, { "InvalidLineStartRegex", - func(f *Config) { - f.Multiline = split.MultilineConfig{ - LineStartPattern: "(", - } + func(cfg *Config) { + cfg.SplitConfig.LineStartPattern = "(" }, require.Error, nil, }, { "InvalidLineEndRegex", - func(f *Config) { - f.Multiline = split.MultilineConfig{ - LineEndPattern: "(", - } + func(cfg *Config) { + cfg.SplitConfig.LineEndPattern = "(" }, require.Error, nil, }, { "InvalidStartAtDelete", - func(f *Config) { - f.StartAt = "end" - f.DeleteAfterRead = true + func(cfg *Config) { + cfg.StartAt = "end" + cfg.DeleteAfterRead = true }, require.Error, nil, }, { "InvalidMaxBatches", - func(f *Config) { - f.MaxBatches = -1 + func(cfg *Config) { + cfg.MaxBatches = -1 }, require.Error, nil, }, { "ValidMaxBatches", - func(f *Config) { - f.MaxBatches = 6 + func(cfg *Config) { + cfg.MaxBatches = 6 }, require.NoError, func(t *testing.T, m *Manager) { @@ -556,16 +533,16 @@ func TestBuild(t *testing.T) { }, { "HeaderConfigNoFlag", - func(f *Config) { - f.Header = &HeaderConfig{} + func(cfg *Config) { + cfg.Header = &HeaderConfig{} }, require.Error, nil, }, { "BadOrderingCriteriaRegex", - func(f *Config) { - f.OrderingCriteria = matcher.OrderingCriteria{ + func(cfg *Config) { + cfg.OrderingCriteria = matcher.OrderingCriteria{ SortBy: []matcher.Sort{ { SortType: "numeric", @@ -579,8 +556,8 @@ func TestBuild(t *testing.T) { }, { "OrderingCriteriaTimestampMissingLayout", - func(f *Config) { - f.OrderingCriteria = matcher.OrderingCriteria{ + func(cfg *Config) { + cfg.OrderingCriteria = matcher.OrderingCriteria{ Regex: ".*", SortBy: []matcher.Sort{ { @@ -595,8 +572,8 @@ func TestBuild(t *testing.T) { }, { "GoodOrderingCriteriaTimestamp", - func(f *Config) { - f.OrderingCriteria = matcher.OrderingCriteria{ + func(cfg *Config) { + cfg.OrderingCriteria = matcher.OrderingCriteria{ Regex: ".*", SortBy: []matcher.Sort{ { @@ -649,32 +626,32 @@ func TestBuildWithSplitFunc(t *testing.T) { }{ { "Basic", - func(f *Config) {}, + func(cfg *Config) {}, require.NoError, - func(t *testing.T, f *Manager) { - require.Equal(t, f.pollInterval, 10*time.Millisecond) + func(t *testing.T, m *Manager) { + require.Equal(t, m.pollInterval, 10*time.Millisecond) }, }, { "BadIncludeGlob", - func(f *Config) { - f.Include = []string{"["} + func(cfg *Config) { + cfg.Include = []string{"["} }, require.Error, nil, }, { "BadExcludeGlob", - func(f *Config) { - f.Include = []string{"["} + func(cfg *Config) { + cfg.Include = []string{"["} }, require.Error, nil, }, { "InvalidEncoding", - func(f *Config) { - f.Encoding = "UTF-3233" + func(cfg *Config) { + cfg.Encoding = "UTF-3233" }, require.Error, nil, @@ -731,19 +708,19 @@ func TestBuildWithHeader(t *testing.T) { }{ { "InvalidHeaderConfig", - func(f *Config) { - f.Header = &HeaderConfig{} - f.StartAt = "beginning" + func(cfg *Config) { + cfg.Header = &HeaderConfig{} + cfg.StartAt = "beginning" }, require.Error, nil, }, { "HeaderConfigWithStartAtEnd", - func(f *Config) { + func(cfg *Config) { regexCfg := regex.NewConfig() regexCfg.Regex = "^(?P.*)" - f.Header = &HeaderConfig{ + cfg.Header = &HeaderConfig{ Pattern: "^#", MetadataOperators: []operator.Config{ { @@ -751,17 +728,17 @@ func TestBuildWithHeader(t *testing.T) { }, }, } - f.StartAt = "end" + cfg.StartAt = "end" }, require.Error, nil, }, { "ValidHeaderConfig", - func(f *Config) { + func(cfg *Config) { regexCfg := regex.NewConfig() regexCfg.Regex = "^(?P.*)" - f.Header = &HeaderConfig{ + cfg.Header = &HeaderConfig{ Pattern: "^#", MetadataOperators: []operator.Config{ { @@ -769,11 +746,11 @@ func TestBuildWithHeader(t *testing.T) { }, }, } - f.StartAt = "beginning" + cfg.StartAt = "beginning" }, require.NoError, - func(t *testing.T, f *Manager) { - require.NotNil(t, f.readerFactory.headerConfig.SplitFunc) + func(t *testing.T, m *Manager) { + require.NotNil(t, m.readerFactory.headerConfig.SplitFunc) }, }, } diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 121cafa4fdc9..9fb82199dab6 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -24,7 +24,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" ) @@ -547,7 +546,6 @@ func TestNoNewline(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - cfg.Multiline = split.NewMultilineConfig() cfg.FlushPeriod = time.Nanosecond operator, emitCalls := buildTestManager(t, cfg) diff --git a/pkg/stanza/fileconsumer/internal/splitter/multiline.go b/pkg/stanza/fileconsumer/internal/splitter/multiline.go index 7c945e438cc0..07ed5c969991 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/multiline.go +++ b/pkg/stanza/fileconsumer/internal/splitter/multiline.go @@ -15,7 +15,7 @@ import ( ) type multilineFactory struct { - multilineCfg split.MultilineConfig + multilineCfg split.Config encoding encoding.Encoding maxLogSize int trimFunc trim.Func @@ -25,7 +25,7 @@ type multilineFactory struct { var _ Factory = (*multilineFactory)(nil) func NewMultilineFactory( - multilineCfg split.MultilineConfig, + multilineCfg split.Config, encoding encoding.Encoding, maxLogSize int, trimFunc trim.Func, @@ -42,7 +42,7 @@ func NewMultilineFactory( // Build builds Multiline Splitter struct func (f *multilineFactory) Build() (bufio.SplitFunc, error) { - splitFunc, err := f.multilineCfg.Build(f.encoding, false, f.maxLogSize, f.trimFunc) + splitFunc, err := f.multilineCfg.Func(f.encoding, false, f.maxLogSize, f.trimFunc) if err != nil { return nil, err } diff --git a/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go b/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go index 1dfcd5199651..10785480be23 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go +++ b/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go @@ -18,23 +18,22 @@ import ( func TestMultilineBuild(t *testing.T) { tests := []struct { name string - multilineCfg split.MultilineConfig + multilineCfg split.Config encoding encoding.Encoding maxLogSize int flushPeriod time.Duration wantErr bool }{ { - name: "default configuration", - multilineCfg: split.NewMultilineConfig(), - encoding: unicode.UTF8, - maxLogSize: 1024, - flushPeriod: 100 * time.Millisecond, - wantErr: false, + name: "default configuration", + encoding: unicode.UTF8, + maxLogSize: 1024, + flushPeriod: 100 * time.Millisecond, + wantErr: false, }, { name: "Multiline error", - multilineCfg: split.MultilineConfig{ + multilineCfg: split.Config{ LineStartPattern: "START", LineEndPattern: "END", }, diff --git a/pkg/stanza/fileconsumer/reader_test.go b/pkg/stanza/fileconsumer/reader_test.go index 90b418e78730..b9f59b5d8856 100644 --- a/pkg/stanza/fileconsumer/reader_test.go +++ b/pkg/stanza/fileconsumer/reader_test.go @@ -24,7 +24,7 @@ import ( func TestPersistFlusher(t *testing.T) { flushPeriod := 100 * time.Millisecond - f, emitChan := testReaderFactory(t, split.NewMultilineConfig(), defaultMaxLogSize, flushPeriod) + f, emitChan := testReaderFactory(t, split.Config{}, defaultMaxLogSize, flushPeriod) temp := openTemp(t, t.TempDir()) fp, err := f.newFingerprint(temp) @@ -110,7 +110,7 @@ func TestTokenization(t *testing.T) { for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { - f, emitChan := testReaderFactory(t, split.NewMultilineConfig(), defaultMaxLogSize, defaultFlushPeriod) + f, emitChan := testReaderFactory(t, split.Config{}, defaultMaxLogSize, defaultFlushPeriod) temp := openTemp(t, t.TempDir()) _, err := temp.Write(tc.fileContent) @@ -140,7 +140,7 @@ func TestTokenizationTooLong(t *testing.T) { []byte("aaa"), } - f, emitChan := testReaderFactory(t, split.NewMultilineConfig(), 10, defaultFlushPeriod) + f, emitChan := testReaderFactory(t, split.Config{}, 10, defaultFlushPeriod) temp := openTemp(t, t.TempDir()) _, err := temp.Write(fileContent) @@ -170,9 +170,9 @@ func TestTokenizationTooLongWithLineStartPattern(t *testing.T) { []byte("2023-01-01 2"), } - mCfg := split.NewMultilineConfig() - mCfg.LineStartPattern = `\d+-\d+-\d+` - f, emitChan := testReaderFactory(t, mCfg, 15, defaultFlushPeriod) + sCfg := split.Config{} + sCfg.LineStartPattern = `\d+-\d+-\d+` + f, emitChan := testReaderFactory(t, sCfg, 15, defaultFlushPeriod) temp := openTemp(t, t.TempDir()) _, err := temp.Write(fileContent) @@ -195,7 +195,7 @@ func TestTokenizationTooLongWithLineStartPattern(t *testing.T) { func TestHeaderFingerprintIncluded(t *testing.T) { fileContent := []byte("#header-line\naaa\n") - f, _ := testReaderFactory(t, split.NewMultilineConfig(), 10, defaultFlushPeriod) + f, _ := testReaderFactory(t, split.Config{}, 10, defaultFlushPeriod) regexConf := regex.NewConfig() regexConf.Regex = "^#(?P
.*)" @@ -223,7 +223,7 @@ func TestHeaderFingerprintIncluded(t *testing.T) { require.Equal(t, []byte("#header-line\naaa\n"), r.Fingerprint.FirstBytes) } -func testReaderFactory(t *testing.T, mCfg split.MultilineConfig, maxLogSize int, flushPeriod time.Duration) (*readerFactory, chan *emitParams) { +func testReaderFactory(t *testing.T, sCfg split.Config, maxLogSize int, flushPeriod time.Duration) (*readerFactory, chan *emitParams) { emitChan := make(chan *emitParams, 100) enc, err := decode.LookupEncoding(defaultEncoding) trimFunc := trim.Whitespace @@ -236,7 +236,7 @@ func testReaderFactory(t *testing.T, mCfg split.MultilineConfig, maxLogSize int, emit: testEmitFunc(emitChan), }, fromBeginning: true, - splitterFactory: splitter.NewMultilineFactory(mCfg, enc, maxLogSize, trimFunc, flushPeriod), + splitterFactory: splitter.NewMultilineFactory(sCfg, enc, maxLogSize, trimFunc, flushPeriod), encoding: enc, }, emitChan } diff --git a/pkg/stanza/operator/input/file/config_test.go b/pkg/stanza/operator/input/file/config_test.go index 7aedf46116e0..429b7489e42c 100644 --- a/pkg/stanza/operator/input/file/config_test.go +++ b/pkg/stanza/operator/input/file/config_test.go @@ -13,7 +13,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" ) @@ -315,9 +314,7 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *Config { cfg := NewConfig() - cfg.Multiline = split.MultilineConfig{ - LineStartPattern: "Start", - } + cfg.SplitConfig.LineStartPattern = "Start" return cfg }(), }, @@ -326,9 +323,7 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *Config { cfg := NewConfig() - cfg.Multiline = split.MultilineConfig{ - LineStartPattern: "%", - } + cfg.SplitConfig.LineStartPattern = "%" return cfg }(), }, @@ -337,9 +332,7 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *Config { cfg := NewConfig() - cfg.Multiline = split.MultilineConfig{ - LineEndPattern: "Start", - } + cfg.SplitConfig.LineEndPattern = "Start" return cfg }(), }, @@ -348,9 +341,7 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *Config { cfg := NewConfig() - cfg.Multiline = split.MultilineConfig{ - LineEndPattern: "%", - } + cfg.SplitConfig.LineEndPattern = "%" return cfg }(), }, @@ -451,7 +442,7 @@ func TestBuild(t *testing.T) { }{ { "Default", - func(f *Config) {}, + func(cfg *Config) {}, require.NoError, func(t *testing.T, f *Input) { require.Equal(t, f.OutputOperators[0], fakeOutput) @@ -459,94 +450,80 @@ func TestBuild(t *testing.T) { }, { "BadIncludeGlob", - func(f *Config) { - f.Include = []string{"["} + func(cfg *Config) { + cfg.Include = []string{"["} }, require.Error, nil, }, { "BadExcludeGlob", - func(f *Config) { - f.Include = []string{"["} + func(cfg *Config) { + cfg.Include = []string{"["} }, require.Error, nil, }, { "MultilineConfiguredStartAndEndPatterns", - func(f *Config) { - f.Multiline = split.MultilineConfig{ - LineEndPattern: "Exists", - LineStartPattern: "Exists", - } + func(cfg *Config) { + cfg.SplitConfig.LineEndPattern = "Exists" + cfg.SplitConfig.LineStartPattern = "Exists" }, require.Error, nil, }, { "MultilineConfiguredStartPattern", - func(f *Config) { - f.Multiline = split.MultilineConfig{ - LineStartPattern: "START.*", - } + func(cfg *Config) { + cfg.SplitConfig.LineStartPattern = "START.*" }, require.NoError, func(t *testing.T, f *Input) {}, }, { "MultilineConfiguredEndPattern", - func(f *Config) { - f.Multiline = split.MultilineConfig{ - LineEndPattern: "END.*", - } + func(cfg *Config) { + cfg.SplitConfig.LineEndPattern = "END.*" }, require.NoError, func(t *testing.T, f *Input) {}, }, { "InvalidEncoding", - func(f *Config) { - f.Encoding = "UTF-3233" + func(cfg *Config) { + cfg.Encoding = "UTF-3233" }, require.Error, nil, }, { "LineStartAndEnd", - func(f *Config) { - f.Multiline = split.MultilineConfig{ - LineStartPattern: ".*", - LineEndPattern: ".*", - } + func(cfg *Config) { + cfg.SplitConfig.LineStartPattern = ".*" + cfg.SplitConfig.LineEndPattern = ".*" }, require.Error, nil, }, { "NoLineStartOrEnd", - func(f *Config) { - f.Multiline = split.MultilineConfig{} - }, + func(cfg *Config) {}, require.NoError, func(t *testing.T, f *Input) {}, }, { "InvalidLineStartRegex", - func(f *Config) { - f.Multiline = split.MultilineConfig{ - LineStartPattern: "(", - } + func(cfg *Config) { + cfg.SplitConfig.LineStartPattern = "(" }, require.Error, nil, }, { "InvalidLineEndRegex", - func(f *Config) { - f.Multiline = split.MultilineConfig{ - LineEndPattern: "(", - } + func(cfg *Config) { + cfg.SplitConfig.LineEndPattern = "(" }, require.Error, nil, diff --git a/pkg/stanza/operator/input/syslog/config_test.go b/pkg/stanza/operator/input/syslog/config_test.go index 41e81ff98378..fc8354440db0 100644 --- a/pkg/stanza/operator/input/syslog/config_test.go +++ b/pkg/stanza/operator/input/syslog/config_test.go @@ -12,7 +12,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/udp" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" ) func TestUnmarshal(t *testing.T) { @@ -38,8 +37,8 @@ func TestUnmarshal(t *testing.T) { cfg.TCP.ListenAddress = "10.0.0.1:9000" cfg.TCP.AddAttributes = true cfg.TCP.Encoding = "utf-16" - cfg.TCP.Multiline = split.NewMultilineConfig() - cfg.TCP.Multiline.LineStartPattern = "ABC" + cfg.TCP.SplitConfig.LineStartPattern = "ABC" + cfg.TCP.SplitConfig.LineEndPattern = "" cfg.TCP.TLS = &configtls.TLSServerSetting{ TLSSetting: configtls.TLSSetting{ CertFile: "foo", @@ -62,8 +61,8 @@ func TestUnmarshal(t *testing.T) { cfg.UDP.ListenAddress = "10.0.0.1:9000" cfg.UDP.AddAttributes = true cfg.UDP.Encoding = "utf-16" - cfg.UDP.Multiline = split.NewMultilineConfig() - cfg.UDP.Multiline.LineStartPattern = "ABC" + cfg.UDP.SplitConfig.LineStartPattern = "ABC" + cfg.UDP.SplitConfig.LineEndPattern = "" return cfg }(), }, diff --git a/pkg/stanza/operator/input/tcp/config_test.go b/pkg/stanza/operator/input/tcp/config_test.go index e8aa7177bf02..07e7f9305bf6 100644 --- a/pkg/stanza/operator/input/tcp/config_test.go +++ b/pkg/stanza/operator/input/tcp/config_test.go @@ -10,7 +10,6 @@ import ( "go.opentelemetry.io/collector/config/configtls" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" ) func TestUnmarshal(t *testing.T) { @@ -32,8 +31,7 @@ func TestUnmarshal(t *testing.T) { cfg.ListenAddress = "10.0.0.1:9000" cfg.AddAttributes = true cfg.Encoding = "utf-8" - cfg.Multiline = split.NewMultilineConfig() - cfg.Multiline.LineStartPattern = "ABC" + cfg.SplitConfig.LineStartPattern = "ABC" cfg.TLS = &configtls.TLSServerSetting{ TLSSetting: configtls.TLSSetting{ CertFile: "foo", diff --git a/pkg/stanza/operator/input/tcp/tcp.go b/pkg/stanza/operator/input/tcp/tcp.go index e511c94bd413..081e727f6f91 100644 --- a/pkg/stanza/operator/input/tcp/tcp.go +++ b/pkg/stanza/operator/input/tcp/tcp.go @@ -55,7 +55,6 @@ func NewConfigWithID(operatorID string) *Config { InputConfig: helper.NewInputConfig(operatorID, operatorType), BaseConfig: BaseConfig{ OneLogPerPacket: false, - Multiline: split.NewMultilineConfig(), Encoding: "utf-8", }, } @@ -75,7 +74,7 @@ type BaseConfig struct { AddAttributes bool `mapstructure:"add_attributes,omitempty"` OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"` Encoding string `mapstructure:"encoding,omitempty"` - Multiline split.MultilineConfig `mapstructure:"multiline,omitempty"` + SplitConfig split.Config `mapstructure:"multiline,omitempty"` TrimConfig trim.Config `mapstructure:",squash"` MultiLineBuilder MultiLineBuilderFunc } @@ -84,7 +83,7 @@ type MultiLineBuilderFunc func(enc encoding.Encoding) (bufio.SplitFunc, error) func (c Config) defaultMultilineBuilder(enc encoding.Encoding) (bufio.SplitFunc, error) { trimFunc := c.TrimConfig.Func() - splitFunc, err := c.Multiline.Build(enc, true, int(c.MaxLogSize), trimFunc) + splitFunc, err := c.SplitConfig.Func(enc, true, int(c.MaxLogSize), trimFunc) if err != nil { return nil, err } diff --git a/pkg/stanza/operator/input/udp/config_test.go b/pkg/stanza/operator/input/udp/config_test.go index ba51eb1e031b..a12307069739 100644 --- a/pkg/stanza/operator/input/udp/config_test.go +++ b/pkg/stanza/operator/input/udp/config_test.go @@ -8,7 +8,6 @@ import ( "testing" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" ) func TestUnmarshal(t *testing.T) { @@ -29,8 +28,8 @@ func TestUnmarshal(t *testing.T) { cfg.ListenAddress = "10.0.0.1:9000" cfg.AddAttributes = true cfg.Encoding = "utf-8" - cfg.Multiline = split.NewMultilineConfig() - cfg.Multiline.LineStartPattern = "ABC" + cfg.SplitConfig.LineStartPattern = "ABC" + cfg.SplitConfig.LineEndPattern = "" return cfg }(), }, diff --git a/pkg/stanza/operator/input/udp/udp.go b/pkg/stanza/operator/input/udp/udp.go index 575711e93515..987db9419b36 100644 --- a/pkg/stanza/operator/input/udp/udp.go +++ b/pkg/stanza/operator/input/udp/udp.go @@ -45,9 +45,8 @@ func NewConfigWithID(operatorID string) *Config { BaseConfig: BaseConfig{ Encoding: "utf-8", OneLogPerPacket: false, - Multiline: split.MultilineConfig{ - LineStartPattern: "", - LineEndPattern: ".^", // Use never matching regex to not split data by default + SplitConfig: split.Config{ + LineEndPattern: ".^", // Use never matching regex to not split data by default }, }, } @@ -61,12 +60,12 @@ type Config struct { // BaseConfig is the details configuration of a udp input operator. type BaseConfig struct { - ListenAddress string `mapstructure:"listen_address,omitempty"` - OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"` - AddAttributes bool `mapstructure:"add_attributes,omitempty"` - Encoding string `mapstructure:"encoding,omitempty"` - Multiline split.MultilineConfig `mapstructure:"multiline,omitempty"` - TrimConfig trim.Config `mapstructure:",squash"` + ListenAddress string `mapstructure:"listen_address,omitempty"` + OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"` + AddAttributes bool `mapstructure:"add_attributes,omitempty"` + Encoding string `mapstructure:"encoding,omitempty"` + SplitConfig split.Config `mapstructure:"multiline,omitempty"` + TrimConfig trim.Config `mapstructure:",squash"` } // Build will build a udp input operator. @@ -92,7 +91,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { // Build multiline trimFunc := c.TrimConfig.Func() - splitFunc, err := c.Multiline.Build(enc, true, MaxUDPSize, trimFunc) + splitFunc, err := c.SplitConfig.Func(enc, true, MaxUDPSize, trimFunc) if err != nil { return nil, err } diff --git a/pkg/stanza/split/multiline.go b/pkg/stanza/split/split.go similarity index 86% rename from pkg/stanza/split/multiline.go rename to pkg/stanza/split/split.go index 84a176c9c679..6e0282f52d28 100644 --- a/pkg/stanza/split/multiline.go +++ b/pkg/stanza/split/split.go @@ -14,32 +14,14 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) -// Multiline consists of splitFunc and variables needed to perform force flush -type Multiline struct { - SplitFunc bufio.SplitFunc -} - -// NewMultilineConfig creates a new Multiline config -func NewMultilineConfig() MultilineConfig { - return MultilineConfig{ - LineStartPattern: "", - LineEndPattern: "", - } -} - -// MultilineConfig is the configuration of a multiline helper -type MultilineConfig struct { +// Config is the configuration for a split func +type Config struct { LineStartPattern string `mapstructure:"line_start_pattern"` LineEndPattern string `mapstructure:"line_end_pattern"` } -// Build will build a Multiline operator. -func (c MultilineConfig) Build(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, trimFunc trim.Func) (bufio.SplitFunc, error) { - return c.getSplitFunc(enc, flushAtEOF, maxLogSize, trimFunc) -} - -// getSplitFunc returns split function for bufio.Scanner basing on configured pattern -func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, trimFunc trim.Func) (bufio.SplitFunc, error) { +// Func will return a bufio.SplitFunc based on the config +func (c Config) Func(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, trimFunc trim.Func) (bufio.SplitFunc, error) { endPattern := c.LineEndPattern startPattern := c.LineStartPattern @@ -72,8 +54,6 @@ func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, ma return nil, fmt.Errorf("compile line start regex: %w", err) } splitFunc = LineStartSplitFunc(re, flushAtEOF, trimFunc) - default: - return nil, fmt.Errorf("unreachable") } return splitFunc, nil } @@ -181,7 +161,6 @@ func NewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool, trimFunc trim.Func if i := bytes.Index(data, newline); i >= 0 { // We have a full newline-terminated line. token = bytes.TrimSuffix(data[:i], carriageReturn) - return i + len(newline), trimFunc(token), nil } diff --git a/pkg/stanza/split/multiline_test.go b/pkg/stanza/split/split_test.go similarity index 68% rename from pkg/stanza/split/multiline_test.go rename to pkg/stanza/split/split_test.go index f5f7dac39ee1..372768553854 100644 --- a/pkg/stanza/split/multiline_test.go +++ b/pkg/stanza/split/split_test.go @@ -11,6 +11,7 @@ import ( "regexp" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/text/encoding" "golang.org/x/text/encoding/unicode" @@ -19,6 +20,63 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) +func TestConfigFunc(t *testing.T) { + maxLogSize := 100 + + t.Run("BothStartAndEnd", func(t *testing.T) { + cfg := &Config{ + LineStartPattern: "foo", + LineEndPattern: "bar", + } + + _, err := cfg.Func(unicode.UTF8, false, maxLogSize, trim.Nop) + assert.EqualError(t, err, "only one of line_start_pattern or line_end_pattern can be set") + }) + + t.Run("NopEncoding", func(t *testing.T) { + cfg := &Config{} + + f, err := cfg.Func(encoding.Nop, false, maxLogSize, trim.Nop) + assert.NoError(t, err) + + raw := splittest.GenerateBytes(maxLogSize * 2) + advance, token, err := f(raw, false) + assert.NoError(t, err) + assert.Equal(t, maxLogSize, advance) + assert.Equal(t, raw[:maxLogSize], token) + }) + + t.Run("Newline", func(t *testing.T) { + cfg := &Config{} + + f, err := cfg.Func(unicode.UTF8, false, maxLogSize, trim.Nop) + assert.NoError(t, err) + + advance, token, err := f([]byte("foo\nbar\nbaz\n"), false) + assert.NoError(t, err) + assert.Equal(t, 4, advance) + assert.Equal(t, []byte("foo"), token) + }) + + t.Run("InvalidStartRegex", func(t *testing.T) { + cfg := &Config{ + LineStartPattern: "[", + } + + _, err := cfg.Func(unicode.UTF8, false, maxLogSize, trim.Nop) + assert.EqualError(t, err, "compile line start regex: error parsing regexp: missing closing ]: `[`") + }) + + t.Run("InvalidEndRegex", func(t *testing.T) { + cfg := &Config{ + LineEndPattern: "[", + } + + _, err := cfg.Func(unicode.UTF8, false, maxLogSize, trim.Nop) + assert.EqualError(t, err, "compile line end regex: error parsing regexp: missing closing ]: `[`") + }) +} + func TestLineStartSplitFunc(t *testing.T) { testCases := []splittest.TestCase{ { @@ -115,15 +173,12 @@ func TestLineStartSplitFunc(t *testing.T) { } for _, tc := range testCases { - cfg := &MultilineConfig{ - LineStartPattern: tc.Pattern, - } - + cfg := Config{LineStartPattern: tc.Pattern} trimFunc := trim.Config{ PreserveLeading: tc.PreserveLeadingWhitespaces, PreserveTrailing: tc.PreserveTrailingWhitespaces, }.Func() - splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, trimFunc) + splitFunc, err := cfg.Func(unicode.UTF8, false, 0, trimFunc) require.NoError(t, err) t.Run(tc.Name, tc.Run(splitFunc)) } @@ -146,6 +201,66 @@ func TestLineStartSplitFunc(t *testing.T) { require.Nil(t, token) }) }) + + t.Run("FlushAtEOF", func(t *testing.T) { + flushAtEOFCases := []splittest.TestCase{ + { + Name: "NoMatch", + Pattern: `^LOGSTART \d+`, + Input: []byte("LOGPART log1\nLOGPART log1\t \n"), + ExpectedTokens: []string{"LOGPART log1\nLOGPART log1\t \n"}, + }, + { + Name: "MatchThenNoMatch", + Pattern: `^LOGSTART \d+`, + Input: []byte("LOGSTART 12 log1\t \nLOGPART log1\nLOGPART log1\t \nLOGSTART 17 log2\nLOGPART log2\nanother line"), + ExpectedTokens: []string{ + "LOGSTART 12 log1\t \nLOGPART log1\nLOGPART log1\t \n", + "LOGSTART 17 log2\nLOGPART log2\nanother line", + }, + }, + } + for _, tc := range flushAtEOFCases { + cfg := &Config{ + LineStartPattern: `^LOGSTART \d+`, + } + splitFunc, err := cfg.Func(unicode.UTF8, true, 0, trim.Nop) + require.NoError(t, err) + t.Run(tc.Name, tc.Run(splitFunc)) + } + }) + + t.Run("ApplyTrimFunc", func(t *testing.T) { + cfg := Config{LineStartPattern: ` LOGSTART \d+ `} + input := []byte(" LOGSTART 123 log1 LOGSTART 234 log2 LOGSTART 345 foo") + + splitTrimLeading, err := cfg.Func(unicode.UTF8, false, 0, trim.Leading) + require.NoError(t, err) + t.Run("TrimLeading", splittest.TestCase{ + Input: input, + ExpectedTokens: []string{ + `LOGSTART 123 log1 `, + `LOGSTART 234 log2 `, + }}.Run(splitTrimLeading)) + + splitTrimTrailing, err := cfg.Func(unicode.UTF8, false, 0, trim.Trailing) + require.NoError(t, err) + t.Run("TrimTrailing", splittest.TestCase{ + Input: input, + ExpectedTokens: []string{ + ` LOGSTART 123 log1`, + ` LOGSTART 234 log2`, + }}.Run(splitTrimTrailing)) + + splitTrimBoth, err := cfg.Func(unicode.UTF8, false, 0, trim.Whitespace) + require.NoError(t, err) + t.Run("TrimBoth", splittest.TestCase{ + Input: input, + ExpectedTokens: []string{ + `LOGSTART 123 log1`, + `LOGSTART 234 log2`, + }}.Run(splitTrimBoth)) + }) } func TestLineEndSplitFunc(t *testing.T) { @@ -240,18 +355,61 @@ func TestLineEndSplitFunc(t *testing.T) { } for _, tc := range testCases { - cfg := &MultilineConfig{ - LineEndPattern: tc.Pattern, - } + cfg := Config{LineEndPattern: tc.Pattern} trimFunc := trim.Config{ PreserveLeading: tc.PreserveLeadingWhitespaces, PreserveTrailing: tc.PreserveTrailingWhitespaces, }.Func() - splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, trimFunc) + splitFunc, err := cfg.Func(unicode.UTF8, false, 0, trimFunc) require.NoError(t, err) t.Run(tc.Name, tc.Run(splitFunc)) } + + t.Run("FlushAtEOF", func(t *testing.T) { + cfg := &Config{ + LineEndPattern: `^LOGSTART \d+`, + } + splitFunc, err := cfg.Func(unicode.UTF8, true, 0, trim.Nop) + require.NoError(t, err) + splittest.TestCase{ + Name: "NoMatch", + Input: []byte("LOGPART log1\nLOGPART log1\t \n"), + ExpectedTokens: []string{"LOGPART log1\nLOGPART log1\t \n"}, + }.Run(splitFunc)(t) + }) + + t.Run("ApplyTrimFunc", func(t *testing.T) { + cfg := Config{LineEndPattern: ` LOGEND `} + input := []byte(" log1 LOGEND log2 LOGEND ") + + splitTrimLeading, err := cfg.Func(unicode.UTF8, false, 0, trim.Leading) + require.NoError(t, err) + t.Run("TrimLeading", splittest.TestCase{ + Input: input, + ExpectedTokens: []string{ + `log1 LOGEND `, + `log2 LOGEND `, + }}.Run(splitTrimLeading)) + + splitTrimTrailing, err := cfg.Func(unicode.UTF8, false, 0, trim.Trailing) + require.NoError(t, err) + t.Run("TrimTrailing", splittest.TestCase{ + Input: input, + ExpectedTokens: []string{ + ` log1 LOGEND`, + ` log2 LOGEND`, + }}.Run(splitTrimTrailing)) + + splitTrimBoth, err := cfg.Func(unicode.UTF8, false, 0, trim.Whitespace) + require.NoError(t, err) + t.Run("TrimBoth", splittest.TestCase{ + Input: input, + ExpectedTokens: []string{ + `log1 LOGEND`, + `log2 LOGEND`, + }}.Run(splitTrimBoth)) + }) } func TestNewlineSplitFunc(t *testing.T) { @@ -380,6 +538,48 @@ func TestNewlineSplitFunc(t *testing.T) { require.NoError(t, err) t.Run(tc.Name, tc.Run(splitFunc)) } + + t.Run("FlushAtEOF", func(t *testing.T) { + splitFunc, err := Config{}.Func(unicode.UTF8, true, 0, trim.Nop) + require.NoError(t, err) + splittest.TestCase{ + Name: "FlushAtEOF", + Input: []byte("log1\nlog2"), + ExpectedTokens: []string{"log1", "log2"}, + }.Run(splitFunc)(t) + }) + + t.Run("ApplyTrimFunc", func(t *testing.T) { + cfg := &Config{} + input := []byte(" log1 \n log2 \n") + + splitTrimLeading, err := cfg.Func(unicode.UTF8, false, 0, trim.Leading) + require.NoError(t, err) + t.Run("TrimLeading", splittest.TestCase{ + Input: input, + ExpectedTokens: []string{ + `log1 `, + `log2 `, + }}.Run(splitTrimLeading)) + + splitTrimTrailing, err := cfg.Func(unicode.UTF8, false, 0, trim.Trailing) + require.NoError(t, err) + t.Run("TrimTrailing", splittest.TestCase{ + Input: input, + ExpectedTokens: []string{ + ` log1`, + ` log2`, + }}.Run(splitTrimTrailing)) + + splitTrimBoth, err := cfg.Func(unicode.UTF8, false, 0, trim.Whitespace) + require.NoError(t, err) + t.Run("TrimBoth", splittest.TestCase{ + Input: input, + ExpectedTokens: []string{ + `log1`, + `log2`, + }}.Run(splitTrimBoth)) + }) } func TestNoSplitFunc(t *testing.T) { @@ -444,18 +644,14 @@ func TestNoSplitFunc(t *testing.T) { } func TestNoopEncodingError(t *testing.T) { - cfg := &MultilineConfig{ - LineEndPattern: "\n", - } + endCfg := Config{LineEndPattern: "\n"} - _, err := cfg.getSplitFunc(encoding.Nop, false, 0, trim.Nop) + _, err := endCfg.Func(encoding.Nop, false, 0, trim.Nop) require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding")) - cfg = &MultilineConfig{ - LineStartPattern: "\n", - } + startCfg := Config{LineStartPattern: "\n"} - _, err = cfg.getSplitFunc(encoding.Nop, false, 0, trim.Nop) + _, err = startCfg.Func(encoding.Nop, false, 0, trim.Nop) require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding")) } diff --git a/receiver/otlpjsonfilereceiver/file_test.go b/receiver/otlpjsonfilereceiver/file_test.go index 5b858cc32463..ea0bdec4266a 100644 --- a/receiver/otlpjsonfilereceiver/file_test.go +++ b/receiver/otlpjsonfilereceiver/file_test.go @@ -24,7 +24,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otlpjsonfilereceiver/internal/metadata" ) @@ -124,7 +123,6 @@ func testdataConfigYamlAsMap() *Config { IncludeFileNameResolved: false, IncludeFilePathResolved: false, PollInterval: 200 * time.Millisecond, - Multiline: split.NewMultilineConfig(), Encoding: "utf-8", StartAt: "end", FingerprintSize: 1000,