Skip to content

Commit

Permalink
[chore][pkg/stanza] Fix the bug that the log emitter might hang when …
Browse files Browse the repository at this point in the history
…the receiver retry indefinitely (open-telemetry#37159)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
I was exploring options for backpressure the pipeline when the exporter
fails. Inspired by
open-telemetry#29410 (comment),
I realized that I could enable the `retry_on_failure` on the receiver
side, and have it retry indefinitely by setting `max_elapsed_time` to 0.
```yaml
receivers:
   filelog:
     include: [ input.log ]
     retry_on_failure:
       enabled: true
       max_elapsed_time: 0
```
With this config, the consumer will be blocked at the `ConsumeLogs` func
in `consumerretry` when the exporter fails to consume the logs:

https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/12551d324375bd0c4647a8cdc7bd0f8c435c1034/internal/coreinternal/consumerretry/logs.go#L35

The func `flusher()` from the `LogEmitter` starts a loop and call the
`consumerFunc` with `context.Background()`. When the `ConsumeLogs` is
blocked by the retry, there is no way to cancel the retry, thus the
`LogEmitter` will hang when I try to shut down the collector.

In this PR, I created a ctx in the `Start` func, which will be cancelled
later in the `Shutdown` func. The ctx is passed to the flusher and used
for the flush in every `flushInterval`. However, I have to swap it with
another ctx with timeout during shutdown to flush the remaining batch
out one last time. That's the best approach I can think of for now, and
I'm open to other suggestions.

---------

Signed-off-by: Mengnan Gong <[email protected]>
Co-authored-by: Daniel Jaglowski <[email protected]>
  • Loading branch information
2 people authored and chengchuanpeng committed Jan 27, 2025
1 parent d106775 commit f4c41b1
Showing 1 changed file with 17 additions and 8 deletions.
25 changes: 17 additions & 8 deletions pkg/stanza/operator/helper/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
// LogEmitter is a stanza operator that emits log entries to the consumer callback function `consumerFunc`
type LogEmitter struct {
OutputOperator
closeChan chan struct{}
cancel context.CancelFunc
stopOnce sync.Once
batchMux sync.Mutex
batch []*entry.Entry
Expand Down Expand Up @@ -65,7 +65,6 @@ func NewLogEmitter(set component.TelemetrySettings, consumerFunc func(context.Co
op, _ := NewOutputConfig("log_emitter", "log_emitter").Build(set)
e := &LogEmitter{
OutputOperator: op,
closeChan: make(chan struct{}),
maxBatchSize: defaultMaxBatchSize,
batch: make([]*entry.Entry, 0, defaultMaxBatchSize),
flushInterval: defaultFlushInterval,
Expand All @@ -79,15 +78,21 @@ func NewLogEmitter(set component.TelemetrySettings, consumerFunc func(context.Co

// Start starts the goroutine(s) required for this operator
func (e *LogEmitter) Start(_ operator.Persister) error {
ctx, cancel := context.WithCancel(context.Background())
e.cancel = cancel

e.wg.Add(1)
go e.flusher()
go e.flusher(ctx)
return nil
}

// Stop will close the log channel and stop running goroutines
func (e *LogEmitter) Stop() error {
e.stopOnce.Do(func() {
close(e.closeChan)
// the cancel func could be nil if the emitter is never started.
if e.cancel != nil {
e.cancel()
}
e.wg.Wait()
})

Expand Down Expand Up @@ -120,7 +125,7 @@ func (e *LogEmitter) appendEntry(ent *entry.Entry) []*entry.Entry {
}

// flusher flushes the current batch every flush interval. Intended to be run as a goroutine
func (e *LogEmitter) flusher() {
func (e *LogEmitter) flusher(ctx context.Context) {
defer e.wg.Done()

ticker := time.NewTicker(e.flushInterval)
Expand All @@ -130,12 +135,16 @@ func (e *LogEmitter) flusher() {
select {
case <-ticker.C:
if oldBatch := e.makeNewBatch(); len(oldBatch) > 0 {
e.consumerFunc(context.Background(), oldBatch)
e.consumerFunc(ctx, oldBatch)
}
case <-e.closeChan:
case <-ctx.Done():
// Create a new context with timeout for the final flush
flushCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// flush currently batched entries
if oldBatch := e.makeNewBatch(); len(oldBatch) > 0 {
e.consumerFunc(context.Background(), oldBatch)
e.consumerFunc(flushCtx, oldBatch)
}
return
}
Expand Down

0 comments on commit f4c41b1

Please sign in to comment.