Skip to content

Commit

Permalink
pkg/stanza: add a replay_file config option to support heartbeats
Browse files Browse the repository at this point in the history
This change adds a config option to allow users to replay static
telemetry at an interval (poll_interval is the configurable interval).

This is useful for some  usecases like:
- Sending a heartbeat log to a logging backend, as a signal that the
  logging agent is alive
- Sending static metadata as metrics to metrics backend. The metadata
  can be read using the `otlpjsonfilereceiver` that uses this package
  and config.

Signed-off-by: Ridwan Sharif <[email protected]>
  • Loading branch information
ridwanmsharif committed Apr 18, 2024
1 parent c542a03 commit 85749e2
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 0 deletions.
27 changes: 27 additions & 0 deletions .chloggen/ridwanmsharif_replay.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add a replay_file config option to support replaying static telemetry

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31533]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 2 additions & 0 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type Config struct {
FlushPeriod time.Duration `mapstructure:"force_flush_period,omitempty"`
Header *HeaderConfig `mapstructure:"header,omitempty"`
DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"`
ReplayFile bool `mapstructure:"replay_file,omitempty"`
}

type HeaderConfig struct {
Expand Down Expand Up @@ -161,6 +162,7 @@ func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback, opts ...Opt
Attributes: c.Resolver,
HeaderConfig: hCfg,
DeleteAtEOF: c.DeleteAfterRead,
ReplayFile: c.ReplayFile,
}

return &Manager{
Expand Down
2 changes: 2 additions & 0 deletions pkg/stanza/fileconsumer/internal/reader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Factory struct {
EmitFunc emit.Callback
Attributes attrs.Resolver
DeleteAtEOF bool
ReplayFile bool
}

func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) {
Expand Down Expand Up @@ -71,6 +72,7 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
decoder: decode.New(f.Encoding),
lineSplitFunc: f.SplitFunc,
deleteAtEOF: f.DeleteAtEOF,
replayFile: f.ReplayFile,
}

if r.Fingerprint.Len() > r.fingerprintSize {
Expand Down
7 changes: 7 additions & 0 deletions pkg/stanza/fileconsumer/internal/reader/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func testFactory(t *testing.T, opts ...testFactoryOpt) (*Factory, *emittest.Sink
FlushTimeout: cfg.flushPeriod,
EmitFunc: sink.Callback,
Attributes: cfg.attributes,
ReplayFile: cfg.replayFile,
}, sink
}

Expand All @@ -77,6 +78,7 @@ type testFactoryCfg struct {
flushPeriod time.Duration
sinkChanSize int
attributes attrs.Resolver
replayFile bool
}

func withFingerprintSize(size int) testFactoryOpt {
Expand Down Expand Up @@ -120,6 +122,11 @@ func fromEnd() testFactoryOpt {
c.fromBeginning = false
}
}
func withReplay(replay bool) testFactoryOpt {
return func(c *testFactoryCfg) {
c.replayFile = replay
}
}

func TestStartAt(t *testing.T) {
tempDir := t.TempDir()
Expand Down
4 changes: 4 additions & 0 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Reader struct {
processFunc emit.Callback
emitFunc emit.Callback
deleteAtEOF bool
replayFile bool
needsUpdateFingerprint bool
}

Expand All @@ -57,6 +58,9 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
if r.needsUpdateFingerprint {
r.updateFingerprint()
}
if r.replayFile {
r.Offset = 0
}
}()

s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.splitFunc)
Expand Down
37 changes: 37 additions & 0 deletions pkg/stanza/fileconsumer/internal/reader/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,43 @@ func TestFileReader_FingerprintUpdated(t *testing.T) {
require.Equal(t, fingerprint.New([]byte("testlog1\n")), reader.Fingerprint)
}

func TestFileReader_ReplayTest(t *testing.T) {
testCases := []struct {
testName string
replayFile bool
}{
{"replay_enabeld", true},
{"replay_disabled", false},
}

for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
tempDir := t.TempDir()
temp := filetest.OpenTemp(t, tempDir)
tempCopy := filetest.OpenFile(t, temp.Name())

f, sink := testFactory(t, withReplay(tc.replayFile))
fp, err := f.NewFingerprint(temp)
require.NoError(t, err)

reader, err := f.NewReader(tempCopy, fp)
require.NoError(t, err)
defer reader.Close()

filetest.WriteString(t, temp, "testlog1\n")
reader.ReadToEnd(context.Background())
sink.ExpectToken(t, []byte("testlog1"))

reader.ReadToEnd(context.Background())
if tc.replayFile {
sink.ExpectToken(t, []byte("testlog1"))
} else {
sink.ExpectNoCalls(t)
}
})
}
}

// Test that a fingerprint:
// - Starts empty
// - Updates as a file is read
Expand Down
38 changes: 38 additions & 0 deletions receiver/otlpjsonfilereceiver/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,44 @@ func TestFileMetricsReceiver(t *testing.T) {
assert.NoError(t, err)
}

func TestFileMetricsReceiverWithReplay(t *testing.T) {
tempFolder := t.TempDir()
factory := NewFactory()
cfg := createDefaultConfig().(*Config)
cfg.Config.Include = []string{filepath.Join(tempFolder, "*")}
cfg.Config.StartAt = "beginning"
cfg.Config.ReplayFile = true
cfg.Config.PollInterval = 5 * time.Second

sink := new(consumertest.MetricsSink)
receiver, err := factory.CreateMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, sink)
assert.NoError(t, err)
err = receiver.Start(context.Background(), nil)
assert.NoError(t, err)

md := testdata.GenerateMetricsManyMetricsSameResource(5)
marshaler := &pmetric.JSONMarshaler{}
b, err := marshaler.MarshalMetrics(md)
assert.NoError(t, err)
b = append(b, '\n')
err = os.WriteFile(filepath.Join(tempFolder, "metrics.json"), b, 0600)
assert.NoError(t, err)

// Wait for the first poll to complete.
time.Sleep(cfg.Config.PollInterval + time.Second)
require.Len(t, sink.AllMetrics(), 1)
assert.EqualValues(t, md, sink.AllMetrics()[0])

// Reset the sink and assert that the next poll replays all the existing metrics.
sink.Reset()
time.Sleep(cfg.Config.PollInterval + time.Second)
require.Len(t, sink.AllMetrics(), 1)
assert.EqualValues(t, md, sink.AllMetrics()[0])

err = receiver.Shutdown(context.Background())
assert.NoError(t, err)
}

func TestFileLogsReceiver(t *testing.T) {
tempFolder := t.TempDir()
factory := NewFactory()
Expand Down

0 comments on commit 85749e2

Please sign in to comment.