From 74968470d8041ba1bce1f252dc233ac6824a973d Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Thu, 22 Aug 2024 22:49:04 +0530 Subject: [PATCH] chore: commit, archive module --- pkg/stanza/fileconsumer/config.go | 9 +-- pkg/stanza/fileconsumer/file.go | 67 ++++++++++++++----- .../fileconsumer/internal/archive/archive.go | 43 +++++++++--- .../{archive_nop.go => archive_default.go} | 18 ++--- .../fileconsumer/internal/fileset/fileset.go | 5 -- .../fileconsumer/internal/tracker/tracker.go | 13 +++- 6 files changed, 108 insertions(+), 47 deletions(-) rename pkg/stanza/fileconsumer/internal/archive/{archive_nop.go => archive_default.go} (50%) diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index c69cf6e19a157..ba05527473727 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -88,7 +88,7 @@ type Config struct { DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"` IncludeFileRecordNumber bool `mapstructure:"include_file_record_number,omitempty"` Compression string `mapstructure:"compression,omitempty"` - PollsToArchive int `mapstructure:"-"` + PollsToArchive int `mapstructure:"-"` // TODO: activate this config once archiving is set up AcquireFSLock bool `mapstructure:"acquire_fs_lock,omitempty"` } @@ -183,12 +183,7 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts t = tracker.NewFileTracker(set, c.MaxConcurrentFiles/2) } - var a archive.Archive - if c.PollsToArchive <= 0 { - a = archive.NewNopArchive() - } else { - a = archive.NewArchive(c.PollsToArchive) - } + a := archive.NewDefaultArchive() // TODO: once archiving is set up, update this. telemetryBuilder, err := metadata.NewTelemetryBuilder(set) if err != nil { diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index f4398b82c3ccf..df15c78663f50 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -40,6 +40,8 @@ type Manager struct { pollsToArchive int telemetryBuilder *metadata.TelemetryBuilder + + unmatchedFiles []*archive.FileRecord } func (m *Manager) Start(persister operator.Persister) error { @@ -147,7 +149,7 @@ func (m *Manager) poll(ctx context.Context) { } } // rotate at end of every poll() - m.tracker.EndPoll() + m.tracker.EndPoll(m.archive) } func (m *Manager) consume(ctx context.Context, paths []string) { @@ -201,6 +203,7 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi // discarding any that have a duplicate fingerprint to other files that have already // been read this polling interval func (m *Manager) makeReaders(ctx context.Context, paths []string) { + m.unmatchedFiles = make([]*archive.FileRecord, 0) for _, path := range paths { fp, file := m.makeFingerprint(path) if fp == nil { @@ -219,17 +222,57 @@ func (m *Manager) makeReaders(ctx context.Context, paths []string) { continue } - r, err := m.newReader(ctx, file, fp) + r, matchFound, err := m.newReader(ctx, file, fp) if err != nil { m.set.Logger.Error("Failed to create reader", zap.Error(err)) continue } + if matchFound { + m.tracker.Add(r) + } else { + m.unmatchedFiles = append(m.unmatchedFiles, archive.NewFileRecord(file, fp)) + } + } - m.tracker.Add(r) + records, err := m.archive.Match(m.unmatchedFiles) + if err != nil { + m.set.Logger.Error("Errors encountered while reading the archive", zap.Error(err)) + } +INNER: + for _, record := range records { + // Exclude duplicate paths with the same content. This can happen when files are + // being rotated with copy/truncate strategy. (After copy, prior to truncate.) + if r := m.tracker.GetCurrentFile(record.Fingerprint); r != nil { + m.set.Logger.Debug("Skipping duplicate file", zap.String("path", r.GetFileName())) + // re-add the reader as Match() removes duplicates + m.tracker.Add(r) + record.File.Close() + continue INNER + } + var reader *reader.Reader + if record.Metadata != nil { + // match is found if record.Metadata exists + reader, err = m.readerFactory.NewReaderFromMetadata(record.File, record.Metadata) + if err != nil { + m.set.Logger.Error("Failed to create reader", zap.Error(err)) + continue INNER + } + } else { + // empty record.Metada i.e. a new file + reader, err = m.readerFactory.NewReader(record.File, record.Fingerprint) + if err != nil { + m.set.Logger.Error("Failed to create reader", zap.Error(err)) + continue INNER + } + } + m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1) + m.tracker.Add(reader) + m.set.Logger.Info("Started watching file", zap.String("path", reader.GetFileName())) } + } -func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) { +func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, bool, error) { // Check previous poll cycle for match if oldReader := m.tracker.GetOpenFile(fp); oldReader != nil { if oldReader.GetFileName() != file.Name() { @@ -245,25 +288,19 @@ func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint. zap.String("rotated_path", file.Name())) } } - return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close()) + r, err := m.readerFactory.NewReaderFromMetadata(file, oldReader.Close()) + return r, true, err } // Check for closed files for match if oldMetadata := m.tracker.GetClosedFile(fp); oldMetadata != nil { r, err := m.readerFactory.NewReaderFromMetadata(file, oldMetadata) if err != nil { - return nil, err + return nil, false, err } m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1) - return r, nil + return r, true, nil } - // If we don't match any previously known files, create a new reader from scratch - m.set.Logger.Info("Started watching file", zap.String("path", file.Name())) - r, err := m.readerFactory.NewReader(file, fp) - if err != nil { - return nil, err - } - m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1) - return r, nil + return nil, false, nil } diff --git a/pkg/stanza/fileconsumer/internal/archive/archive.go b/pkg/stanza/fileconsumer/internal/archive/archive.go index 866698a323563..27f08d73460de 100644 --- a/pkg/stanza/fileconsumer/internal/archive/archive.go +++ b/pkg/stanza/fileconsumer/internal/archive/archive.go @@ -4,7 +4,8 @@ package archive // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/archive" import ( - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset" + "os" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" @@ -14,29 +15,53 @@ const _ = "knownFiles" type Archive interface { SetStorageClient(operator.Persister) - Match(*fingerprint.Fingerprint) (*reader.Metadata, error) + + Match([]*FileRecord) ([]*FileRecord, error) Write([]*reader.Metadata) error } +type FileRecord struct { + File *os.File + Fingerprint *fingerprint.Fingerprint + + // Metadata is populated if a match is found in storage + // For new files, Metadata would remain empty + Metadata *reader.Metadata +} + +func NewFileRecord(file *os.File, fp *fingerprint.Fingerprint) *FileRecord { + return &FileRecord{ + File: file, + Fingerprint: fp, + } +} + type archive struct { - persister operator.Persister - pollsToArchive int - index int - _ *fileset.Fileset[*reader.Metadata] + persister operator.Persister } -func NewArchive(pollsToArchive int) Archive { - return &archive{pollsToArchive: pollsToArchive} +func NewArchive() Archive { + return &archive{} } func (a *archive) SetStorageClient(persister operator.Persister) { a.persister = persister } -func (a *archive) Match(_ *fingerprint.Fingerprint) (*reader.Metadata, error) { +// The Match function processes unmatched files by performing the following steps +// 1. Access the storage key and retrieve the existing metadata. +// 2. Iterate through the unmatched files: +// a. If a corresponding record is found in storage, update the record's Metadata with the retrieved metadata +// b. If no matching record is found, skip to the next file. +// c. Update the storage key with updated metadata +// 3. Unmatched files will be updated with the following details: +// a. Files with a matching record will have their Metadata updated to the known metadata. +// b. New files will have an empty Metadata record. +func (a *archive) Match(_ []*FileRecord) ([]*FileRecord, error) { // TODO: // Add logic to go through the storage and return a match. // Also update the storage if match found. + return nil, nil } diff --git a/pkg/stanza/fileconsumer/internal/archive/archive_nop.go b/pkg/stanza/fileconsumer/internal/archive/archive_default.go similarity index 50% rename from pkg/stanza/fileconsumer/internal/archive/archive_nop.go rename to pkg/stanza/fileconsumer/internal/archive/archive_default.go index e5e29e5748f29..0b50a8db7881d 100644 --- a/pkg/stanza/fileconsumer/internal/archive/archive_nop.go +++ b/pkg/stanza/fileconsumer/internal/archive/archive_default.go @@ -4,24 +4,26 @@ package archive // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/archive" import ( - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" ) -type nopArchive struct{} +type defaultArchive struct { +} -func NewNopArchive() Archive { - return &nopArchive{} +func NewDefaultArchive() Archive { + return &defaultArchive{} } -func (a *nopArchive) SetStorageClient(_ operator.Persister) { +func (a *defaultArchive) SetStorageClient(_ operator.Persister) { } -func (a *nopArchive) Match(_ *fingerprint.Fingerprint) (*reader.Metadata, error) { - return nil, nil +func (a *defaultArchive) Match(unmatchedFiles []*FileRecord) ([]*FileRecord, error) { + // Default archiving returns the files as it is + return unmatchedFiles, nil } -func (a *nopArchive) Write(_ []*reader.Metadata) error { +func (a *defaultArchive) Write(_ []*reader.Metadata) error { + // discard the old offsets by default return nil } diff --git a/pkg/stanza/fileconsumer/internal/fileset/fileset.go b/pkg/stanza/fileconsumer/internal/fileset/fileset.go index 10c66711c9d3c..3d1cf50fb1433 100644 --- a/pkg/stanza/fileconsumer/internal/fileset/fileset.go +++ b/pkg/stanza/fileconsumer/internal/fileset/fileset.go @@ -55,11 +55,6 @@ func (set *Fileset[T]) Add(readers ...T) { set.readers = append(set.readers, readers...) } -func (set *Fileset[T]) Reset(readers ...T) { - // reset the underlying array. - set.readers = readers -} - func (set *Fileset[T]) Match(fp *fingerprint.Fingerprint, cmp func(a, b *fingerprint.Fingerprint) bool) T { var val T for idx, r := range set.readers { diff --git a/pkg/stanza/fileconsumer/internal/tracker/tracker.go b/pkg/stanza/fileconsumer/internal/tracker/tracker.go index 5039003a36edf..6a970387987f0 100644 --- a/pkg/stanza/fileconsumer/internal/tracker/tracker.go +++ b/pkg/stanza/fileconsumer/internal/tracker/tracker.go @@ -7,6 +7,7 @@ import ( "go.opentelemetry.io/collector/component" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/archive" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" @@ -23,7 +24,7 @@ type Tracker interface { CurrentPollFiles() []*reader.Reader PreviousPollFiles() []*reader.Reader ClosePreviousFiles() int - EndPoll() + EndPoll(archive archive.Archive) EndConsume() int TotalReaders() int } @@ -110,11 +111,17 @@ func (t *fileTracker) ClosePreviousFiles() (filesClosed int) { return } -func (t *fileTracker) EndPoll() { +func (t *fileTracker) EndPoll(archive archive.Archive) { // shift the filesets at end of every poll() call // t.knownFiles[0] -> t.knownFiles[1] -> t.knownFiles[2] + oldFileset := t.knownFiles[2] copy(t.knownFiles[1:], t.knownFiles) t.knownFiles[0] = fileset.New[*reader.Metadata](t.maxBatchFiles) + + err := archive.Write(oldFileset.Get()) + if err != nil { + t.set.Logger.Error("Error faced while archiving", zap.Error(err)) + } } func (t *fileTracker) TotalReaders() int { @@ -176,6 +183,6 @@ func (t *noStateTracker) PreviousPollFiles() []*reader.Reader { return nil } func (t *noStateTracker) ClosePreviousFiles() int { return 0 } -func (t *noStateTracker) EndPoll() {} +func (t *noStateTracker) EndPoll(archive.Archive) {} func (t *noStateTracker) TotalReaders() int { return 0 }