From f4c41b1110dc6732f941615762355cad4948f38a Mon Sep 17 00:00:00 2001 From: Namco <9165473+namco1992@users.noreply.github.com> Date: Tue, 28 Jan 2025 05:41:04 +0800 Subject: [PATCH] [chore][pkg/stanza] Fix the bug that the log emitter might hang when the receiver retry indefinitely (#37159) #### Description I was exploring options for backpressure the pipeline when the exporter fails. Inspired by https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29410#issuecomment-2277017007, I realized that I could enable the `retry_on_failure` on the receiver side, and have it retry indefinitely by setting `max_elapsed_time` to 0. ```yaml receivers: filelog: include: [ input.log ] retry_on_failure: enabled: true max_elapsed_time: 0 ``` With this config, the consumer will be blocked at the `ConsumeLogs` func in `consumerretry` when the exporter fails to consume the logs: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/12551d324375bd0c4647a8cdc7bd0f8c435c1034/internal/coreinternal/consumerretry/logs.go#L35 The func `flusher()` from the `LogEmitter` starts a loop and call the `consumerFunc` with `context.Background()`. When the `ConsumeLogs` is blocked by the retry, there is no way to cancel the retry, thus the `LogEmitter` will hang when I try to shut down the collector. In this PR, I created a ctx in the `Start` func, which will be cancelled later in the `Shutdown` func. The ctx is passed to the flusher and used for the flush in every `flushInterval`. However, I have to swap it with another ctx with timeout during shutdown to flush the remaining batch out one last time. That's the best approach I can think of for now, and I'm open to other suggestions. --------- Signed-off-by: Mengnan Gong Co-authored-by: Daniel Jaglowski --- pkg/stanza/operator/helper/emitter.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/pkg/stanza/operator/helper/emitter.go b/pkg/stanza/operator/helper/emitter.go index aa91b85c92be..4f0df8e81102 100644 --- a/pkg/stanza/operator/helper/emitter.go +++ b/pkg/stanza/operator/helper/emitter.go @@ -17,7 +17,7 @@ import ( // LogEmitter is a stanza operator that emits log entries to the consumer callback function `consumerFunc` type LogEmitter struct { OutputOperator - closeChan chan struct{} + cancel context.CancelFunc stopOnce sync.Once batchMux sync.Mutex batch []*entry.Entry @@ -65,7 +65,6 @@ func NewLogEmitter(set component.TelemetrySettings, consumerFunc func(context.Co op, _ := NewOutputConfig("log_emitter", "log_emitter").Build(set) e := &LogEmitter{ OutputOperator: op, - closeChan: make(chan struct{}), maxBatchSize: defaultMaxBatchSize, batch: make([]*entry.Entry, 0, defaultMaxBatchSize), flushInterval: defaultFlushInterval, @@ -79,15 +78,21 @@ func NewLogEmitter(set component.TelemetrySettings, consumerFunc func(context.Co // Start starts the goroutine(s) required for this operator func (e *LogEmitter) Start(_ operator.Persister) error { + ctx, cancel := context.WithCancel(context.Background()) + e.cancel = cancel + e.wg.Add(1) - go e.flusher() + go e.flusher(ctx) return nil } // Stop will close the log channel and stop running goroutines func (e *LogEmitter) Stop() error { e.stopOnce.Do(func() { - close(e.closeChan) + // the cancel func could be nil if the emitter is never started. + if e.cancel != nil { + e.cancel() + } e.wg.Wait() }) @@ -120,7 +125,7 @@ func (e *LogEmitter) appendEntry(ent *entry.Entry) []*entry.Entry { } // flusher flushes the current batch every flush interval. Intended to be run as a goroutine -func (e *LogEmitter) flusher() { +func (e *LogEmitter) flusher(ctx context.Context) { defer e.wg.Done() ticker := time.NewTicker(e.flushInterval) @@ -130,12 +135,16 @@ func (e *LogEmitter) flusher() { select { case <-ticker.C: if oldBatch := e.makeNewBatch(); len(oldBatch) > 0 { - e.consumerFunc(context.Background(), oldBatch) + e.consumerFunc(ctx, oldBatch) } - case <-e.closeChan: + case <-ctx.Done(): + // Create a new context with timeout for the final flush + flushCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + // flush currently batched entries if oldBatch := e.makeNewBatch(); len(oldBatch) > 0 { - e.consumerFunc(context.Background(), oldBatch) + e.consumerFunc(flushCtx, oldBatch) } return }