Skip to content

Commit

Permalink
Merge branch 'main' into nv-prune-spanmetrics-histograms
Browse files Browse the repository at this point in the history
  • Loading branch information
MovieStoreGuy authored Oct 4, 2023
2 parents 926f2a3 + eca1a29 commit 3745bed
Show file tree
Hide file tree
Showing 13 changed files with 335 additions and 318 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/generate-weekly-report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
get_issues:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- run: npm install js-yaml
working-directory: ./.github/workflows/scripts
- uses: actions/github-script@v6
Expand Down
11 changes: 11 additions & 0 deletions pkg/stanza/attrs/attrs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package attrs // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/attrs"

const (
LogFileName = "log.file.name"
LogFilePath = "log.file.path"
LogFileNameResolved = "log.file.name_resolved"
LogFilePathResolved = "log.file.path_resolved"
)
31 changes: 16 additions & 15 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/splitter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
Expand Down Expand Up @@ -163,29 +164,29 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact
return &Manager{
SugaredLogger: logger.With("component", "fileconsumer"),
cancel: func() {},
readerFactory: readerFactory{
readerFactory: reader.Factory{
SugaredLogger: logger.With("component", "fileconsumer"),
readerConfig: &readerConfig{
fingerprintSize: int(c.FingerprintSize),
maxLogSize: int(c.MaxLogSize),
emit: emit,
includeFileName: c.IncludeFileName,
includeFilePath: c.IncludeFilePath,
includeFileNameResolved: c.IncludeFileNameResolved,
includeFilePathResolved: c.IncludeFilePathResolved,
Config: &reader.Config{
FingerprintSize: int(c.FingerprintSize),
MaxLogSize: int(c.MaxLogSize),
Emit: emit,
IncludeFileName: c.IncludeFileName,
IncludeFilePath: c.IncludeFilePath,
IncludeFileNameResolved: c.IncludeFileNameResolved,
IncludeFilePathResolved: c.IncludeFilePathResolved,
},
fromBeginning: startAtBeginning,
splitterFactory: factory,
encoding: enc,
headerConfig: hCfg,
FromBeginning: startAtBeginning,
SplitterFactory: factory,
Encoding: enc,
HeaderConfig: hCfg,
},
fileMatcher: fileMatcher,
roller: newRoller(int(c.FingerprintSize)),
roller: newRoller(),
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
deleteAfterRead: c.DeleteAfterRead,
knownFiles: make([]*reader, 0, 10),
knownFiles: make([]*reader.Reader, 0, 10*c.MaxConcurrentFiles),
seenPaths: make(map[string]struct{}, 100),
}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ func TestBuildWithHeader(t *testing.T) {
},
require.NoError,
func(t *testing.T, m *Manager) {
require.NotNil(t, m.readerFactory.headerConfig.SplitFunc)
require.NotNil(t, m.readerFactory.HeaderConfig.SplitFunc)
},
},
}
Expand Down
74 changes: 26 additions & 48 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,17 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)

const (
logFileName = "log.file.name"
logFilePath = "log.file.path"
logFileNameResolved = "log.file.name_resolved"
logFilePathResolved = "log.file.path_resolved"
)

type Manager struct {
*zap.SugaredLogger
wg sync.WaitGroup
cancel context.CancelFunc

readerFactory readerFactory
readerFactory reader.Factory
fileMatcher *matcher.Matcher
roller roller
persister operator.Persister
Expand All @@ -41,7 +35,7 @@ type Manager struct {
maxBatchFiles int
deleteAfterRead bool

knownFiles []*reader
knownFiles []*reader.Reader
seenPaths map[string]struct{}

currentFps []*fingerprint.Fingerprint
Expand Down Expand Up @@ -75,7 +69,6 @@ func (m *Manager) Stop() error {
for _, reader := range m.knownFiles {
reader.Close()
}
m.knownFiles = nil
m.cancel = nil
return nil
}
Expand Down Expand Up @@ -103,12 +96,6 @@ func (m *Manager) startPoller(ctx context.Context) {

// poll checks all the watched paths for new entries
func (m *Manager) poll(ctx context.Context) {
// Increment the generation on all known readers
// This is done here because the next generation is about to start
for i := 0; i < len(m.knownFiles); i++ {
m.knownFiles[i].generation++
}

// Used to keep track of the number of batches processed in this poll cycle
batchesProcessed := 0

Expand Down Expand Up @@ -136,7 +123,7 @@ func (m *Manager) poll(ctx context.Context) {

func (m *Manager) consume(ctx context.Context, paths []string) {
m.Debug("Consuming files")
readers := make([]*reader, 0, len(paths))
readers := make([]*reader.Reader, 0, len(paths))
for _, path := range paths {
r := m.makeReader(path)
if r != nil {
Expand All @@ -152,11 +139,11 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
var wg sync.WaitGroup
for _, r := range readers {
wg.Add(1)
go func(r *reader) {
go func(r *reader.Reader) {
defer wg.Done()
r.ReadToEnd(ctx)
// Delete a file if deleteAfterRead is enabled and we reached the end of the file
if m.deleteAfterRead && r.eof {
if m.deleteAfterRead && r.EOF {
r.Delete()
}
}(r)
Expand All @@ -165,9 +152,9 @@ func (m *Manager) consume(ctx context.Context, paths []string) {

// Save off any files that were not fully read
if m.deleteAfterRead {
unfinished := make([]*reader, 0, len(readers))
unfinished := make([]*reader.Reader, 0, len(readers))
for _, r := range readers {
if !r.eof {
if !r.EOF {
unfinished = append(unfinished, r)
}
}
Expand All @@ -180,7 +167,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
}

// Any new files that appear should be consumed entirely
m.readerFactory.fromBeginning = true
m.readerFactory.FromBeginning = true

m.roller.roll(ctx, readers)
m.saveCurrent(readers)
Expand All @@ -190,7 +177,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {

func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) {
if _, ok := m.seenPaths[path]; !ok {
if m.readerFactory.fromBeginning {
if m.readerFactory.FromBeginning {
m.Infow("Started watching file", "path", path)
} else {
m.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path)
Expand All @@ -203,7 +190,7 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi
return nil, nil
}

fp, err := m.readerFactory.newFingerprint(file)
fp, err := m.readerFactory.NewFingerprint(file)
if err != nil {
if err = file.Close(); err != nil {
m.Debugw("problem closing file", zap.Error(err))
Expand Down Expand Up @@ -233,7 +220,7 @@ func (m *Manager) checkDuplicates(fp *fingerprint.Fingerprint) bool {
// makeReader take a file path, then creates reader,
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (m *Manager) makeReader(path string) *reader {
func (m *Manager) makeReader(path string) *reader.Reader {
// Open the files first to minimize the time between listing and opening
fp, file := m.makeFingerprint(path)
if fp == nil {
Expand Down Expand Up @@ -265,33 +252,26 @@ func (m *Manager) clearCurrentFingerprints() {
// saveCurrent adds the readers from this polling interval to this list of
// known files, then increments the generation of all tracked old readers
// before clearing out readers that have existed for 3 generations.
func (m *Manager) saveCurrent(readers []*reader) {
// Add readers from the current, completed poll interval to the list of known files
m.knownFiles = append(m.knownFiles, readers...)

// Clear out old readers. They are sorted such that they are oldest first,
// so we can just find the first reader whose generation is less than our
// max, and keep every reader after that
for i := 0; i < len(m.knownFiles); i++ {
reader := m.knownFiles[i]
if reader.generation <= 3 {
m.knownFiles = m.knownFiles[i:]
break
}
func (m *Manager) saveCurrent(readers []*reader.Reader) {
forgetNum := len(m.knownFiles) + len(readers) - cap(m.knownFiles)
if forgetNum > 0 {
m.knownFiles = append(m.knownFiles[forgetNum:], readers...)
return
}
m.knownFiles = append(m.knownFiles, readers...)
}

func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader, error) {
func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
// Check if the new path has the same fingerprint as an old path
if oldReader, ok := m.findFingerprintMatch(fp); ok {
return m.readerFactory.copy(oldReader, file)
return m.readerFactory.Copy(oldReader, file)
}

// If we don't match any previously known files, create a new reader from scratch
return m.readerFactory.newReader(file, fp)
return m.readerFactory.NewReader(file, fp)
}

func (m *Manager) findFingerprintMatch(fp *fingerprint.Fingerprint) (*reader, bool) {
func (m *Manager) findFingerprintMatch(fp *fingerprint.Fingerprint) (*reader.Reader, bool) {
// Iterate backwards to match newest first
for i := len(m.knownFiles) - 1; i >= 0; i-- {
oldReader := m.knownFiles[i]
Expand Down Expand Up @@ -320,7 +300,7 @@ func (m *Manager) syncLastPollFiles(ctx context.Context) {

// Encode each known file
for _, fileReader := range m.knownFiles {
if err := enc.Encode(fileReader.readerMetadata); err != nil {
if err := enc.Encode(fileReader.Metadata); err != nil {
m.Errorw("Failed to encode known files", zap.Error(err))
}
}
Expand All @@ -338,7 +318,6 @@ func (m *Manager) loadLastPollFiles(ctx context.Context) error {
}

if encoded == nil {
m.knownFiles = make([]*reader, 0, 10)
return nil
}

Expand All @@ -352,13 +331,12 @@ func (m *Manager) loadLastPollFiles(ctx context.Context) error {

if knownFileCount > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.fromBeginning = true
m.readerFactory.FromBeginning = true
}

// Decode each of the known files
m.knownFiles = make([]*reader, 0, knownFileCount)
for i := 0; i < knownFileCount; i++ {
rmd := &readerMetadata{}
rmd := new(reader.Metadata)
if err = dec.Decode(rmd); err != nil {
return err
}
Expand All @@ -378,7 +356,7 @@ func (m *Manager) loadLastPollFiles(ctx context.Context) error {
}

// This reader won't be used for anything other than metadata reference, so just wrap the metadata
m.knownFiles = append(m.knownFiles, &reader{readerMetadata: rmd})
m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: rmd})
}

return nil
Expand Down
Loading

0 comments on commit 3745bed

Please sign in to comment.