diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index 4d8e1f31..0a69b4c0 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -167,6 +167,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, firstCheck: true, cancel: func() {}, knownFiles: make([]*Reader, 0, 10), + roller: newRoller(), fingerprintSize: int(c.FingerprintSize), MaxLogSize: int(c.MaxLogSize), MaxConcurrentFiles: c.MaxConcurrentFiles, diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index f13d2f15..346583ee 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -47,10 +47,10 @@ type InputOperator struct { persister operator.Persister - knownFiles []*Reader - queuedMatches []string - maxBatchFiles int - lastPollReaders []*Reader + knownFiles []*Reader + queuedMatches []string + maxBatchFiles int + roller roller startAtBeginning bool @@ -70,6 +70,7 @@ func (f *InputOperator) Start(persister operator.Persister) error { f.firstCheck = true f.persister = persister + // Load offsets from disk if err := f.loadLastPollFiles(ctx); err != nil { return fmt.Errorf("read known files from database: %s", err) @@ -85,9 +86,7 @@ func (f *InputOperator) Start(persister operator.Persister) error { func (f *InputOperator) Stop() error { f.cancel() f.wg.Wait() - for _, reader := range f.lastPollReaders { - reader.Close() - } + f.roller.cleanup() for _, reader := range f.knownFiles { reader.Close() } @@ -148,27 +147,7 @@ func (f *InputOperator) poll(ctx context.Context) { readers := f.makeReaders(matches) f.firstCheck = false - // Detect files that have been rotated out of matching pattern - lostReaders := make([]*Reader, 0, len(f.lastPollReaders)) -OUTER: - for _, oldReader := range f.lastPollReaders { - for _, reader := range readers { - if reader.Fingerprint.StartsWith(oldReader.Fingerprint) { - continue OUTER - } - } - lostReaders = append(lostReaders, oldReader) - } - var wg sync.WaitGroup - for _, reader := range lostReaders { - wg.Add(1) - go func(r *Reader) { - defer wg.Done() - r.ReadToEnd(ctx) - }(reader) - } - for _, reader := range readers { wg.Add(1) go func(r *Reader) { @@ -176,17 +155,9 @@ OUTER: r.ReadToEnd(ctx) }(reader) } - - // Wait until all the reader goroutines are finished wg.Wait() - // Close all files - for _, reader := range f.lastPollReaders { - reader.Close() - } - - f.lastPollReaders = readers - + f.roller.roll(ctx, readers) f.saveCurrent(readers) f.syncLastPollFiles(ctx) } diff --git a/operator/builtin/input/file/roller.go b/operator/builtin/input/file/roller.go new file mode 100644 index 00000000..21498fdb --- /dev/null +++ b/operator/builtin/input/file/roller.go @@ -0,0 +1,22 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file + +import "context" + +type roller interface { + roll(context.Context, []*Reader) + cleanup() +} diff --git a/operator/builtin/input/file/roller_other.go b/operator/builtin/input/file/roller_other.go new file mode 100644 index 00000000..2dc80f87 --- /dev/null +++ b/operator/builtin/input/file/roller_other.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !windows +// +build !windows + +package file + +import ( + "context" + "sync" +) + +type detectLostFiles struct { + oldReaders []*Reader +} + +func newRoller() roller { + return &detectLostFiles{[]*Reader{}} +} + +func (r *detectLostFiles) roll(ctx context.Context, readers []*Reader) { + // Detect files that have been rotated out of matching pattern + lostReaders := make([]*Reader, 0, len(r.oldReaders)) +OUTER: + for _, oldReader := range r.oldReaders { + for _, reader := range readers { + if reader.Fingerprint.StartsWith(oldReader.Fingerprint) { + continue OUTER + } + } + lostReaders = append(lostReaders, oldReader) + } + + var lostWG sync.WaitGroup + for _, reader := range lostReaders { + lostWG.Add(1) + go func(r *Reader) { + defer lostWG.Done() + r.ReadToEnd(ctx) + }(reader) + } + lostWG.Wait() + + for _, reader := range r.oldReaders { + reader.Close() + } + + r.oldReaders = readers +} + +func (r *detectLostFiles) cleanup() { + for _, reader := range r.oldReaders { + reader.Close() + } +} diff --git a/operator/builtin/input/file/roller_windows.go b/operator/builtin/input/file/roller_windows.go new file mode 100644 index 00000000..fbf44fe9 --- /dev/null +++ b/operator/builtin/input/file/roller_windows.go @@ -0,0 +1,36 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build windows +// +build windows + +package file + +import "context" + +type closeImmediately struct{} + +func newRoller() roller { + return &closeImmediately{} +} + +func (r *closeImmediately) roll(_ context.Context, readers []*Reader) { + for _, reader := range readers { + reader.Close() + } +} + +func (r *closeImmediately) cleanup() { + return +}