From 7a483a0484bfa77b3beebe16854e1deb90d19527 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 19 May 2016 16:36:58 +0100 Subject: [PATCH] Set a timeout for calls to input.Gather currently the input interface does not have any methods for killing a running Gather call, so there is nothing we can do but log a "FATAL ERROR" and move on. This will at least give some visibility into the plugin that is acting up. Open questions: - should the telegraf process die and exit when this happens? This might be a better idea than leaving around the dead process. - should the input interface have a Kill() method? I suspect not, since most inputs wouldn't have a way of killing themselves anyways. closes #1230 --- agent/accumulator.go | 3 --- agent/agent.go | 37 ++++++++++++++++++++++++++++++------- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/agent/accumulator.go b/agent/accumulator.go index 6b2ffde2d4374..10897aa6a84a0 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -4,7 +4,6 @@ import ( "fmt" "log" "math" - "sync" "time" "github.com/influxdata/telegraf" @@ -22,8 +21,6 @@ func NewAccumulator( } type accumulator struct { - sync.Mutex - metrics chan telegraf.Metric defaultTags map[string]string diff --git a/agent/agent.go b/agent/agent.go index 60f2d63c63798..45b3a9bcc128f 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -136,10 +136,7 @@ func (a *Agent) gatherParallel(metricC chan telegraf.Metric) error { } } - if err := input.Input.Gather(acc); err != nil { - log.Printf("Error in input [%s]: %s", input.Name, err) - } - + gatherWithTimeout(input, acc, a.Config.Agent.Interval.Duration) }(input) } @@ -176,9 +173,7 @@ func (a *Agent) gatherSeparate( acc.SetDebug(a.Config.Agent.Debug) acc.setDefaultTags(a.Config.Tags) - if err := input.Input.Gather(acc); err != nil { - log.Printf("Error in input [%s]: %s", input.Name, err) - } + gatherWithTimeout(input, acc, input.Config.Interval) elapsed := time.Since(start) if !a.Config.Agent.Quiet { @@ -199,6 +194,34 @@ func (a *Agent) gatherSeparate( } } +// gatherWithTimeout gathers from the given input, with the given timeout. +// if Gather() times out, unfortunately there isn't much it can do, and so it +// simply returns and logs a "FATAL ERROR" +func gatherWithTimeout( + input *internal_models.RunningInput, + acc *accumulator, + timeout time.Duration, +) { + timer := time.NewTimer(timeout) + done := make(chan error) + go func() { + done <- input.Input.Gather(acc) + }() + + select { + case err := <-done: + if err != nil { + log.Printf("ERROR in input [%s]: %s", input.Name, err) + } + return + case <-timer.C: + log.Printf("FATAL ERROR: input [%s] took longer than interval (%s)"+ + " to run. Leaving behind a hung process and moving on.", + input.Name, timeout) + return + } +} + // Test verifies that we can 'Gather' from all inputs with their configured // Config struct func (a *Agent) Test() error {