Skip to content

Commit

Permalink
chore: move checkpointing to file.go
Browse files Browse the repository at this point in the history
  • Loading branch information
VihasMakwana committed Oct 9, 2024
1 parent 637bfa5 commit 080bf7a
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 51 deletions.
10 changes: 9 additions & 1 deletion pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand Down Expand Up @@ -174,6 +175,13 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
AcquireFSLock: c.AcquireFSLock,
}

var t tracker.Tracker
if o.noTracking {
t = tracker.NewNoStateTracker(set, c.MaxConcurrentFiles/2)
} else {
t = tracker.NewFileTracker(set, c.MaxConcurrentFiles/2, c.PollsToArchive)
}

telemetryBuilder, err := metadata.NewTelemetryBuilder(set)
if err != nil {
return nil, err
Expand All @@ -185,8 +193,8 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
tracker: t,
telemetryBuilder: telemetryBuilder,
noTracking: o.noTracking,
}, nil
}

Expand Down
24 changes: 7 additions & 17 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type Manager struct {
readerFactory reader.Factory
fileMatcher *matcher.Matcher
tracker tracker.Tracker
noTracking bool

pollInterval time.Duration
persister operator.Persister
Expand All @@ -49,9 +48,6 @@ func (m *Manager) Start(persister operator.Persister) error {
m.set.Logger.Warn("finding files", zap.Error(err))
}

// instantiate the tracker
m.instantiateTracker(persister)

if persister != nil {
m.persister = persister
offsets, err := checkpoint.Load(ctx, m.persister)
Expand Down Expand Up @@ -80,9 +76,7 @@ func (m *Manager) Stop() error {
m.cancel = nil
}
m.wg.Wait()
if m.tracker != nil {
m.telemetryBuilder.FileconsumerOpenFiles.Add(context.TODO(), int64(0-m.tracker.ClosePreviousFiles()))
}
m.telemetryBuilder.FileconsumerOpenFiles.Add(context.TODO(), int64(0-m.tracker.ClosePreviousFiles()))
if m.persister != nil {
if err := checkpoint.Save(context.Background(), m.persister, m.tracker.GetMetadata()); err != nil {
m.set.Logger.Error("save offsets", zap.Error(err))
Expand Down Expand Up @@ -148,6 +142,12 @@ func (m *Manager) poll(ctx context.Context) {
m.set.Logger.Error("save offsets", zap.Error(err))
}
}
archiveMetadata, key := m.tracker.GetArchiveMetadata()
if archiveMetadata != nil {
if err := checkpoint.SaveKey(context.Background(), m.persister, metadata, key); err != nil {
m.set.Logger.Error("save offsets", zap.Error(err))
}
}
}
// rotate at end of every poll()
m.tracker.EndPoll()
Expand Down Expand Up @@ -270,13 +270,3 @@ func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.
m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1)
return r, nil
}

func (m *Manager) instantiateTracker(persister operator.Persister) {
var t tracker.Tracker
if m.noTracking {
t = tracker.NewNoStateTracker(m.set, m.maxBatchFiles)
} else {
t = tracker.NewFileTracker(m.set, m.maxBatchFiles, m.pollsToArchive, persister)
}
m.tracker = t
}
45 changes: 12 additions & 33 deletions pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@
package tracker // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)

// Interface for tracking files that are being consumed.
Expand All @@ -24,6 +21,7 @@ type Tracker interface {
GetOpenFile(fp *fingerprint.Fingerprint) *reader.Reader
GetClosedFile(fp *fingerprint.Fingerprint) *reader.Metadata
GetMetadata() []*reader.Metadata
GetArchiveMetadata() ([]*reader.Metadata, string)
LoadMetadata(metadata []*reader.Metadata)
CurrentPollFiles() []*reader.Reader
PreviousPollFiles() []*reader.Reader
Expand All @@ -43,15 +41,11 @@ type fileTracker struct {
previousPollFiles *fileset.Fileset[*reader.Reader]
knownFiles []*fileset.Fileset[*reader.Metadata]

// persister is to be used to store offsets older than 3 poll cycles.
// These offsets will be stored on disk
persister operator.Persister

pollsToArchive int
archiveIndex int
}

func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToArchive int, persister operator.Persister) Tracker {
func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToArchive int) Tracker {
knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3)
for i := 0; i < len(knownFiles); i++ {
knownFiles[i] = fileset.New[*reader.Metadata](maxBatchFiles)
Expand All @@ -64,7 +58,6 @@ func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToA
previousPollFiles: fileset.New[*reader.Reader](maxBatchFiles),
knownFiles: knownFiles,
pollsToArchive: pollsToArchive,
persister: persister,
archiveIndex: 0,
}
}
Expand Down Expand Up @@ -128,9 +121,6 @@ func (t *fileTracker) ClosePreviousFiles() (filesClosed int) {
func (t *fileTracker) EndPoll() {
// shift the filesets at end of every poll() call
// t.knownFiles[0] -> t.knownFiles[1] -> t.knownFiles[2]

// Instead of throwing it away, archive it.
t.archive(t.knownFiles[2])
copy(t.knownFiles[1:], t.knownFiles)
t.knownFiles[0] = fileset.New[*reader.Metadata](t.maxBatchFiles)
}
Expand All @@ -143,32 +133,19 @@ func (t *fileTracker) TotalReaders() int {
return total
}

func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) {
func (t *fileTracker) GetArchiveMetadata() ([]*reader.Metadata, string) {
// return offsets that are about to be discarded, along with a key for storing them on disk.

// We make use of a ring buffer, where each set of files is stored under a specific index.
// Instead of discarding knownFiles[2], write it to the next index and eventually roll over.
// Separate storage keys knownFilesArchive0, knownFilesArchive1, ..., knownFilesArchiveN, roll over back to knownFilesArchive0

// Archiving: ┌─────────────────────on-disk archive─────────────────────────┐
// | ┌───┐ ┌───┐ ┌──────────────────┐ |
// index | ▶ │ 0 │ ▶ │ 1 │ ▶ ... ▶ │ polls_to_archive │ |
// | ▲ └───┘ └───┘ └──────────────────┘ |
// | ▲ ▲ ▼ |
// | ▲ │ Roll over overriting older offsets, if any ◀ |
// └──────│──────────────────────────────────────────────────────┘
// │
// │
// │
// start
// index

if t.pollsToArchive <= 0 || t.persister == nil {
return
}
key := fmt.Sprintf("knownFiles%d", t.archiveIndex)
if err := checkpoint.SaveKey(context.Background(), t.persister, metadata.Get(), key); err != nil {
t.set.Logger.Error("error faced while saving to the archive", zap.Error(err))
if t.pollsToArchive <= 0 || t.knownFiles[2] == nil {
// return if archiving is disabled or if there’s nothing available to archive.
return nil, ""
}
oldIndex := t.archiveIndex
t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index
return t.knownFiles[2].Get(), fmt.Sprintf("knownFiles%d", oldIndex)
}

// noStateTracker only tracks the current polled files. Once the poll is
Expand Down Expand Up @@ -225,3 +202,5 @@ func (t *noStateTracker) ClosePreviousFiles() int { return 0 }
func (t *noStateTracker) EndPoll() {}

func (t *noStateTracker) TotalReaders() int { return 0 }

func (t *noStateTracker) GetArchiveMetadata() ([]*reader.Metadata, string) { return nil, "" }

0 comments on commit 080bf7a

Please sign in to comment.