-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[chore][pkg/stanza] Fix the bug that the log emitter might hang when the receiver retry indefinitely #37159
[chore][pkg/stanza] Fix the bug that the log emitter might hang when the receiver retry indefinitely #37159
Conversation
Signed-off-by: Mengnan Gong <[email protected]>
Signed-off-by: Mengnan Gong <[email protected]>
return nil | ||
} | ||
|
||
// Stop will close the log channel and stop running goroutines | ||
func (e *LogEmitter) Stop() error { | ||
e.stopOnce.Do(func() { | ||
close(e.closeChan) | ||
// the cancel func could be nil if the emitter is never started. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized that quite a few unit tests call Stop()
without actually starting the emitter, and that would fail because the context is not created if Start()
is not called. I put a defensive check here.
I can also move the context creation to NewLogEmitter
, which feels a bit less "natural". I also think some of the lifecycle tests should call Start()
, but I didn't touch it in this PR to keep it focused. I can leave a TODO or create a separate PR to fix it. I am open to suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix @namco1992
Thanks for the review @djaglowski, I think it's not changelog-worthy so I didn't add one, but I suppose I added the |
…the receiver retry indefinitely (open-telemetry#37159) <!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description I was exploring options for backpressure the pipeline when the exporter fails. Inspired by open-telemetry#29410 (comment), I realized that I could enable the `retry_on_failure` on the receiver side, and have it retry indefinitely by setting `max_elapsed_time` to 0. ```yaml receivers: filelog: include: [ input.log ] retry_on_failure: enabled: true max_elapsed_time: 0 ``` With this config, the consumer will be blocked at the `ConsumeLogs` func in `consumerretry` when the exporter fails to consume the logs: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/12551d324375bd0c4647a8cdc7bd0f8c435c1034/internal/coreinternal/consumerretry/logs.go#L35 The func `flusher()` from the `LogEmitter` starts a loop and call the `consumerFunc` with `context.Background()`. When the `ConsumeLogs` is blocked by the retry, there is no way to cancel the retry, thus the `LogEmitter` will hang when I try to shut down the collector. In this PR, I created a ctx in the `Start` func, which will be cancelled later in the `Shutdown` func. The ctx is passed to the flusher and used for the flush in every `flushInterval`. However, I have to swap it with another ctx with timeout during shutdown to flush the remaining batch out one last time. That's the best approach I can think of for now, and I'm open to other suggestions. --------- Signed-off-by: Mengnan Gong <[email protected]> Co-authored-by: Daniel Jaglowski <[email protected]>
Description
I was exploring options for backpressure the pipeline when the exporter fails. Inspired by #29410 (comment), I realized that I could enable the
retry_on_failure
on the receiver side, and have it retry indefinitely by settingmax_elapsed_time
to 0.With this config, the consumer will be blocked at the
ConsumeLogs
func inconsumerretry
when the exporter fails to consume the logs:opentelemetry-collector-contrib/internal/coreinternal/consumerretry/logs.go
Line 35 in 12551d3
The func
flusher()
from theLogEmitter
starts a loop and call theconsumerFunc
withcontext.Background()
. When theConsumeLogs
is blocked by the retry, there is no way to cancel the retry, thus theLogEmitter
will hang when I try to shut down the collector.In this PR, I created a ctx in the
Start
func, which will be cancelled later in theShutdown
func. The ctx is passed to the flusher and used for the flush in everyflushInterval
. However, I have to swap it with another ctx with timeout during shutdown to flush the remaining batch out one last time. That's the best approach I can think of for now, and I'm open to other suggestions.