Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][fileconsumer/archive] - add signature methods for archive feature #35098

Merged
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ea3f57d
chore: add initial structure and signature methods
VihasMakwana Aug 6, 2024
17ec1aa
fix: add license
VihasMakwana Aug 6, 2024
26deb01
fix: lint
VihasMakwana Aug 6, 2024
fe1912c
fix: lint
VihasMakwana Aug 6, 2024
33566b0
fix: check
VihasMakwana Aug 6, 2024
5acb848
chore: add write method, update interface
VihasMakwana Aug 19, 2024
06e4a45
Merge branch 'main' into create-archive-storage
VihasMakwana Aug 22, 2024
431ab41
chore: commit, archive module
VihasMakwana Aug 22, 2024
077ab35
chore: cleanup, simplify the PR
VihasMakwana Sep 13, 2024
e3cdd5d
fix: initial commit, second PR
VihasMakwana Sep 16, 2024
d88d5ae
chore: remove stanza.go
VihasMakwana Sep 17, 2024
d9e5992
Format the comment
VihasMakwana Sep 17, 2024
c68817a
chore: use options
VihasMakwana Sep 17, 2024
39d9e86
fix: lint
VihasMakwana Sep 27, 2024
a6a010e
Merge branch 'main' into create-archive-storage-temp
VihasMakwana Sep 27, 2024
3490e7d
fix: bug
VihasMakwana Sep 27, 2024
87c2d2a
chore: rename function
VihasMakwana Sep 27, 2024
b668554
chore: rename function
VihasMakwana Sep 27, 2024
7adf335
Merge branch 'main' into create-archive-storage-temp
VihasMakwana Sep 27, 2024
924e002
Merge branch 'main' into create-archive-storage-temp
VihasMakwana Sep 30, 2024
3f762ed
chore: cleanup tracker and use options
VihasMakwana Sep 30, 2024
66824c6
fix: move function before loading
VihasMakwana Sep 30, 2024
ef2e53a
chore: log the error
VihasMakwana Sep 30, 2024
f009e71
chore: lint, ci
VihasMakwana Sep 30, 2024
79ce0e3
chore: remove redundant argument
VihasMakwana Sep 30, 2024
0dd23be
Merge branch 'cleanup-tracker-options' into create-archive-storage-temp
VihasMakwana Oct 2, 2024
cae05c7
chore: remove redundant code
VihasMakwana Oct 2, 2024
25d923b
chore: add new no tracking
VihasMakwana Oct 2, 2024
4bc2150
fix: pass persister instead of m.persister
VihasMakwana Oct 2, 2024
9006e3f
chore: remove options
VihasMakwana Oct 3, 2024
4413322
chore: fix tests
VihasMakwana Oct 3, 2024
637bfa5
Merge branch 'main' into create-archive-storage-temp
VihasMakwana Oct 3, 2024
27cc4e1
chore: move checkpointing to file.go
VihasMakwana Oct 9, 2024
9eaa7e6
Revert "chore: move checkpointing to file.go"
VihasMakwana Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand Down Expand Up @@ -87,6 +86,7 @@ type Config struct {
DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"`
IncludeFileRecordNumber bool `mapstructure:"include_file_record_number,omitempty"`
Compression string `mapstructure:"compression,omitempty"`
PollsToArchive int `mapstructure:"-"` // TODO: activate this config once archiving is set up
AcquireFSLock bool `mapstructure:"acquire_fs_lock,omitempty"`
}

Expand Down Expand Up @@ -174,13 +174,6 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
AcquireFSLock: c.AcquireFSLock,
}

var t tracker.Tracker
if o.noTracking {
t = tracker.NewNoStateTracker(set, c.MaxConcurrentFiles/2)
} else {
t = tracker.NewFileTracker(set, c.MaxConcurrentFiles/2)
}

telemetryBuilder, err := metadata.NewTelemetryBuilder(set)
if err != nil {
return nil, err
Expand All @@ -192,8 +185,8 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
tracker: t,
telemetryBuilder: telemetryBuilder,
noTracking: o.noTracking,
}, nil
}

Expand Down
30 changes: 25 additions & 5 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ type Manager struct {
readerFactory reader.Factory
fileMatcher *matcher.Matcher
tracker tracker.Tracker
noTracking bool

pollInterval time.Duration
persister operator.Persister
maxBatches int
maxBatchFiles int
pollInterval time.Duration
persister operator.Persister
maxBatches int
maxBatchFiles int
pollsToArchive int

telemetryBuilder *metadata.TelemetryBuilder
}
Expand All @@ -47,6 +49,9 @@ func (m *Manager) Start(persister operator.Persister) error {
m.set.Logger.Warn("finding files", zap.Error(err))
}

// instantiate the tracker
m.instantiateTracker()
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved

if persister != nil {
m.persister = persister
offsets, err := checkpoint.Load(ctx, m.persister)
Expand All @@ -58,6 +63,8 @@ func (m *Manager) Start(persister operator.Persister) error {
m.readerFactory.FromBeginning = true
m.tracker.LoadMetadata(offsets)
}
} else if m.pollsToArchive > 0 {
m.set.Logger.Error("archiving is not supported in memory, please use a storage extension")
}

// Start polling goroutine
Expand All @@ -73,7 +80,9 @@ func (m *Manager) Stop() error {
m.cancel = nil
}
m.wg.Wait()
m.telemetryBuilder.FileconsumerOpenFiles.Add(context.TODO(), int64(0-m.tracker.ClosePreviousFiles()))
if m.tracker != nil {
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
m.telemetryBuilder.FileconsumerOpenFiles.Add(context.TODO(), int64(0-m.tracker.ClosePreviousFiles()))
}
if m.persister != nil {
if err := checkpoint.Save(context.Background(), m.persister, m.tracker.GetMetadata()); err != nil {
m.set.Logger.Error("save offsets", zap.Error(err))
Expand Down Expand Up @@ -261,3 +270,14 @@ func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.
m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1)
return r, nil
}

func (m *Manager) instantiateTracker() {
opts := []tracker.OptionFunc{tracker.WithMaxBatchFiles(m.maxBatchFiles)}
if m.noTracking {
opts = append(opts, tracker.WithNoTracking())
}
if m.pollsToArchive > 0 {
opts = append(opts, tracker.WithPollsToArchive(m.pollsToArchive), tracker.WithPersister(m.persister))
}
m.tracker = tracker.NewFileTracker(m.set, opts...)
}
6 changes: 5 additions & 1 deletion pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ const knownFilesKey = "knownFiles"

// Save syncs the most recent set of files to the database
func Save(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata) error {
return SaveKey(ctx, persister, rmds, knownFilesKey)
}

func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata, key string) error {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)

Expand All @@ -34,7 +38,7 @@ func Save(ctx context.Context, persister operator.Persister, rmds []*reader.Meta
}
}

if err := persister.Set(ctx, knownFilesKey, buf.Bytes()); err != nil {
if err := persister.Set(ctx, key, buf.Bytes()); err != nil {
errs = append(errs, fmt.Errorf("persist known files: %w", err))
}

Expand Down
109 changes: 95 additions & 14 deletions pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
package tracker // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)

// Interface for tracking files that are being consumed.
Expand Down Expand Up @@ -37,20 +42,74 @@ type fileTracker struct {
currentPollFiles *fileset.Fileset[*reader.Reader]
previousPollFiles *fileset.Fileset[*reader.Reader]
knownFiles []*fileset.Fileset[*reader.Metadata]

// persister is to be used to store offsets older than 3 poll cycles.
// These offsets will be stored on disk
persister operator.Persister

pollsToArchive int
archiveIndex int
}

type option struct {
noTracking bool
maxBatchFiles int
pollsToArchive int
persister operator.Persister
}

type OptionFunc func(*option)

func WithMaxBatchFiles(maxBatchFiles int) OptionFunc {
return func(fto *option) {
fto.maxBatchFiles = maxBatchFiles
}
}

func WithPollsToArchive(pollsToArchive int) OptionFunc {
return func(fto *option) {
fto.pollsToArchive = pollsToArchive
}
}

func WithPersister(persister operator.Persister) OptionFunc {
return func(fto *option) {
fto.persister = persister
}
}

func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int) Tracker {
func WithNoTracking() OptionFunc {
return func(fto *option) {
fto.noTracking = true
}
}

func NewFileTracker(set component.TelemetrySettings, opts ...OptionFunc) Tracker {
option := &option{}
for _, opt := range opts {
opt(option)
}
if option.noTracking {
return &noStateTracker{
set: set,
maxBatchFiles: option.maxBatchFiles,
currentPollFiles: fileset.New[*reader.Reader](option.maxBatchFiles),
}
}
knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3)
for i := 0; i < len(knownFiles); i++ {
knownFiles[i] = fileset.New[*reader.Metadata](maxBatchFiles)
knownFiles[i] = fileset.New[*reader.Metadata](option.maxBatchFiles)
}
set.Logger = set.Logger.With(zap.String("tracker", "fileTracker"))
return &fileTracker{
set: set,
maxBatchFiles: maxBatchFiles,
currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles),
previousPollFiles: fileset.New[*reader.Reader](maxBatchFiles),
maxBatchFiles: option.maxBatchFiles,
currentPollFiles: fileset.New[*reader.Reader](option.maxBatchFiles),
previousPollFiles: fileset.New[*reader.Reader](option.maxBatchFiles),
knownFiles: knownFiles,
pollsToArchive: option.pollsToArchive,
persister: option.persister,
archiveIndex: 0,
}
}

Expand Down Expand Up @@ -113,6 +172,9 @@ func (t *fileTracker) ClosePreviousFiles() (filesClosed int) {
func (t *fileTracker) EndPoll() {
// shift the filesets at end of every poll() call
// t.knownFiles[0] -> t.knownFiles[1] -> t.knownFiles[2]

// Instead of throwing it away, archive it.
t.archive(t.knownFiles[2])
copy(t.knownFiles[1:], t.knownFiles)
t.knownFiles[0] = fileset.New[*reader.Metadata](t.maxBatchFiles)
}
Expand All @@ -125,6 +187,34 @@ func (t *fileTracker) TotalReaders() int {
return total
}

func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) {
// We make use of a ring buffer, where each set of files is stored under a specific index.
// Instead of discarding knownFiles[2], write it to the next index and eventually roll over.
// Separate storage keys knownFilesArchive0, knownFilesArchive1, ..., knownFilesArchiveN, roll over back to knownFilesArchive0

// Archiving: ┌─────────────────────on-disk archive─────────────────────────┐
// | ┌───┐ ┌───┐ ┌──────────────────┐ |
// index | ▶ │ 0 │ ▶ │ 1 │ ▶ ... ▶ │ polls_to_archive │ |
// | ▲ └───┘ └───┘ └──────────────────┘ |
// | ▲ ▲ ▼ |
// | ▲ │ Roll over overriting older offsets, if any ◀ |
// └──────│──────────────────────────────────────────────────────┘
// │
// │
// │
// start
// index

if t.pollsToArchive <= 0 || t.persister == nil {
return
}
Comment on lines +164 to +166
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to worry about these cases? Aren't we checking them and instantiating a NoStateTracker when these would ever be relevant?

Copy link
Contributor Author

@VihasMakwana VihasMakwana Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NoStateTracker will be created when WithNoTracking option is selected.
We can create a normal tracker with archiving disabled, by setting pollsToArchive: 0.
A tracker with archiving enabled will have pollsToArchive > 0.

The check t.persister == nil is just a precautionary check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright. Maybe in another PR we can clean this up. IMO we have starting to have too many special cases to handle and the easiest solution to that is move to a pattern where the nils pieces are nops instead. Then the implementation doesn't need to worry about details so much.

key := fmt.Sprintf("knownFiles%d", t.archiveIndex)
if err := checkpoint.SaveKey(context.Background(), t.persister, metadata.Get(), key); err != nil {
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
t.set.Logger.Error("error faced while saving to the archive", zap.Error(err))
}
t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index
}

// noStateTracker only tracks the current polled files. Once the poll is
// complete and telemetry is consumed, the tracked files are closed. The next
// poll will create fresh readers with no previously tracked offsets.
Expand All @@ -134,15 +224,6 @@ type noStateTracker struct {
currentPollFiles *fileset.Fileset[*reader.Reader]
}

func NewNoStateTracker(set component.TelemetrySettings, maxBatchFiles int) Tracker {
set.Logger = set.Logger.With(zap.String("tracker", "noStateTracker"))
return &noStateTracker{
set: set,
maxBatchFiles: maxBatchFiles,
currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles),
}
}

func (t *noStateTracker) Add(reader *reader.Reader) {
// add a new reader for tracking
t.currentPollFiles.Add(reader)
Expand Down
2 changes: 2 additions & 0 deletions pkg/stanza/fileconsumer/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"
)

func testManager(t *testing.T, cfg *Config, opts ...Option) (*Manager, *emittest.Sink) {
Expand All @@ -20,6 +21,7 @@ func testManager(t *testing.T, cfg *Config, opts ...Option) (*Manager, *emittest
func testManagerWithSink(t *testing.T, cfg *Config, sink *emittest.Sink, opts ...Option) *Manager {
set := componenttest.NewNopTelemetrySettings()
input, err := cfg.Build(set, sink.Callback, opts...)
input.tracker = tracker.NewFileTracker(set)
require.NoError(t, err)
t.Cleanup(func() { input.tracker.ClosePreviousFiles() })
return input
Expand Down