Skip to content

Commit

Permalink
chore: commit, archive module
Browse files Browse the repository at this point in the history
  • Loading branch information
VihasMakwana committed Sep 10, 2024
1 parent 06e4a45 commit 431ab41
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 47 deletions.
9 changes: 2 additions & 7 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type Config struct {
DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"`
IncludeFileRecordNumber bool `mapstructure:"include_file_record_number,omitempty"`
Compression string `mapstructure:"compression,omitempty"`
PollsToArchive int `mapstructure:"-"`
PollsToArchive int `mapstructure:"-"` // TODO: activate this config once archiving is set up
AcquireFSLock bool `mapstructure:"acquire_fs_lock,omitempty"`
}

Expand Down Expand Up @@ -183,12 +183,7 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
t = tracker.NewFileTracker(set, c.MaxConcurrentFiles/2)
}

var a archive.Archive
if c.PollsToArchive <= 0 {
a = archive.NewNopArchive()
} else {
a = archive.NewArchive(c.PollsToArchive)
}
a := archive.NewDefaultArchive() // TODO: once archiving is set up, update this.

telemetryBuilder, err := metadata.NewTelemetryBuilder(set)
if err != nil {
Expand Down
64 changes: 49 additions & 15 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Manager struct {
pollsToArchive int

telemetryBuilder *metadata.TelemetryBuilder

unmatchedFiles []*archive.FileRecord
}

func (m *Manager) Start(persister operator.Persister) error {
Expand Down Expand Up @@ -147,7 +149,7 @@ func (m *Manager) poll(ctx context.Context) {
}
}
// rotate at end of every poll()
m.tracker.EndPoll()
m.tracker.EndPoll(m.archive)
}

func (m *Manager) consume(ctx context.Context, paths []string) {
Expand Down Expand Up @@ -201,6 +203,7 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (m *Manager) makeReaders(ctx context.Context, paths []string) {
m.unmatchedFiles = make([]*archive.FileRecord, 0)
for _, path := range paths {
fp, file := m.makeFingerprint(path)
if fp == nil {
Expand All @@ -219,17 +222,54 @@ func (m *Manager) makeReaders(ctx context.Context, paths []string) {
continue
}

r, err := m.newReader(ctx, file, fp)
r, matchFound, err := m.newReader(ctx, file, fp)
if err != nil {
m.set.Logger.Error("Failed to create reader", zap.Error(err))
continue
}
if matchFound {
m.tracker.Add(r)
} else {
m.unmatchedFiles = append(m.unmatchedFiles, archive.NewFileRecord(file, fp))
}
}

m.tracker.Add(r)
records, err := m.archive.Match(m.unmatchedFiles)
if err != nil {
m.set.Logger.Error("Errors encountered while reading the archive", zap.Error(err))
}
INNER:
for _, record := range records {
// Exclude duplicate paths with the same content. This can happen when files are
// being rotated with copy/truncate strategy. (After copy, prior to truncate.)
if r := m.tracker.GetCurrentFile(record.Fingerprint); r != nil {
m.set.Logger.Debug("Skipping duplicate file", zap.String("path", r.GetFileName()))
// re-add the reader as Match() removes duplicates
m.tracker.Add(r)
record.File.Close()
continue INNER
}
var reader *reader.Reader
var err error
if record.Metadata != nil {
// match is found if record.Metadata exists
reader, err = m.readerFactory.NewReaderFromMetadata(record.File, record.Metadata)
} else {
// empty record.Metada i.e. a new file
reader, err = m.readerFactory.NewReader(record.File, record.Fingerprint)
}
if err != nil {
m.set.Logger.Error("Failed to create reader", zap.Error(err))
continue INNER
}
m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1)
m.tracker.Add(reader)
m.set.Logger.Info("Started watching file", zap.String("path", reader.GetFileName()))
}

}

func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, bool, error) {
// Check previous poll cycle for match
if oldReader := m.tracker.GetOpenFile(fp); oldReader != nil {
if oldReader.GetFileName() != file.Name() {
Expand All @@ -245,25 +285,19 @@ func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.
zap.String("rotated_path", file.Name()))
}
}
return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close())
r, err := m.readerFactory.NewReaderFromMetadata(file, oldReader.Close())
return r, true, err
}

// Check for closed files for match
if oldMetadata := m.tracker.GetClosedFile(fp); oldMetadata != nil {
r, err := m.readerFactory.NewReaderFromMetadata(file, oldMetadata)
if err != nil {
return nil, err
return nil, false, err
}
m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1)
return r, nil
return r, true, nil
}

// If we don't match any previously known files, create a new reader from scratch
m.set.Logger.Info("Started watching file", zap.String("path", file.Name()))
r, err := m.readerFactory.NewReader(file, fp)
if err != nil {
return nil, err
}
m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1)
return r, nil
return nil, false, nil
}
43 changes: 34 additions & 9 deletions pkg/stanza/fileconsumer/internal/archive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
package archive // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/archive"

import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
"os"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
Expand All @@ -14,29 +15,53 @@ const _ = "knownFiles"

type Archive interface {
SetStorageClient(operator.Persister)
Match(*fingerprint.Fingerprint) (*reader.Metadata, error)

Match([]*FileRecord) ([]*FileRecord, error)
Write([]*reader.Metadata) error
}

type FileRecord struct {
File *os.File
Fingerprint *fingerprint.Fingerprint

// Metadata is populated if a match is found in storage
// For new files, Metadata would remain empty
Metadata *reader.Metadata
}

func NewFileRecord(file *os.File, fp *fingerprint.Fingerprint) *FileRecord {
return &FileRecord{
File: file,
Fingerprint: fp,
}
}

type archive struct {
persister operator.Persister
pollsToArchive int
index int
_ *fileset.Fileset[*reader.Metadata]
persister operator.Persister
}

func NewArchive(pollsToArchive int) Archive {
return &archive{pollsToArchive: pollsToArchive}
func NewArchive() Archive {
return &archive{}
}

func (a *archive) SetStorageClient(persister operator.Persister) {
a.persister = persister
}

func (a *archive) Match(_ *fingerprint.Fingerprint) (*reader.Metadata, error) {
// The Match function processes unmatched files by performing the following steps
// 1. Access the storage key and retrieve the existing metadata.
// 2. Iterate through the unmatched files:
// a. If a corresponding record is found in storage, update the record's Metadata with the retrieved metadata
// b. If no matching record is found, skip to the next file.
// c. Update the storage key with updated metadata
// 3. Unmatched files will be updated with the following details:
// a. Files with a matching record will have their Metadata updated to the known metadata.
// b. New files will have an empty Metadata record.
func (a *archive) Match(_ []*FileRecord) ([]*FileRecord, error) {
// TODO:
// Add logic to go through the storage and return a match.
// Also update the storage if match found.

return nil, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,26 @@
package archive // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/archive"

import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)

type nopArchive struct{}
type defaultArchive struct {
}

func NewNopArchive() Archive {
return &nopArchive{}
func NewDefaultArchive() Archive {
return &defaultArchive{}
}

func (a *nopArchive) SetStorageClient(_ operator.Persister) {
func (a *defaultArchive) SetStorageClient(_ operator.Persister) {
}

func (a *nopArchive) Match(_ *fingerprint.Fingerprint) (*reader.Metadata, error) {
return nil, nil
func (a *defaultArchive) Match(unmatchedFiles []*FileRecord) ([]*FileRecord, error) {
// Default archiving returns the files as it is
return unmatchedFiles, nil
}

func (a *nopArchive) Write(_ []*reader.Metadata) error {
func (a *defaultArchive) Write(_ []*reader.Metadata) error {
// discard the old offsets by default
return nil
}
5 changes: 0 additions & 5 deletions pkg/stanza/fileconsumer/internal/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ func (set *Fileset[T]) Add(readers ...T) {
set.readers = append(set.readers, readers...)
}

func (set *Fileset[T]) Reset(readers ...T) {
// reset the underlying array.
set.readers = readers
}

func (set *Fileset[T]) Match(fp *fingerprint.Fingerprint, cmp func(a, b *fingerprint.Fingerprint) bool) T {
var val T
for idx, r := range set.readers {
Expand Down
13 changes: 10 additions & 3 deletions pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/archive"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
Expand All @@ -23,7 +24,7 @@ type Tracker interface {
CurrentPollFiles() []*reader.Reader
PreviousPollFiles() []*reader.Reader
ClosePreviousFiles() int
EndPoll()
EndPoll(archive archive.Archive)
EndConsume() int
TotalReaders() int
}
Expand Down Expand Up @@ -110,11 +111,17 @@ func (t *fileTracker) ClosePreviousFiles() (filesClosed int) {
return
}

func (t *fileTracker) EndPoll() {
func (t *fileTracker) EndPoll(archive archive.Archive) {
// shift the filesets at end of every poll() call
// t.knownFiles[0] -> t.knownFiles[1] -> t.knownFiles[2]
oldFileset := t.knownFiles[2]
copy(t.knownFiles[1:], t.knownFiles)
t.knownFiles[0] = fileset.New[*reader.Metadata](t.maxBatchFiles)

err := archive.Write(oldFileset.Get())
if err != nil {
t.set.Logger.Error("Error faced while archiving", zap.Error(err))
}
}

func (t *fileTracker) TotalReaders() int {
Expand Down Expand Up @@ -176,6 +183,6 @@ func (t *noStateTracker) PreviousPollFiles() []*reader.Reader { return nil }

func (t *noStateTracker) ClosePreviousFiles() int { return 0 }

func (t *noStateTracker) EndPoll() {}
func (t *noStateTracker) EndPoll(archive.Archive) {}

func (t *noStateTracker) TotalReaders() int { return 0 }

0 comments on commit 431ab41

Please sign in to comment.