Skip to content

Commit

Permalink
Set a timeout for calls to input.Gather
Browse files Browse the repository at this point in the history
currently the input interface does not have any methods for killing a
running Gather call, so there is nothing we can do but log a "FATAL
ERROR" and move on. This will at least give some visibility into the
plugin that is acting up.

Open questions:

- should the telegraf process die and exit when this happens? This might
  be a better idea than leaving around the dead process.
- should the input interface have a Kill() method? I suspect not, since
  most inputs wouldn't have a way of killing themselves anyways.

closes #1230
  • Loading branch information
sparrc committed May 20, 2016
1 parent 56aee1c commit 7a483a0
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 10 deletions.
3 changes: 0 additions & 3 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"log"
"math"
"sync"
"time"

"github.com/influxdata/telegraf"
Expand All @@ -22,8 +21,6 @@ func NewAccumulator(
}

type accumulator struct {
sync.Mutex

metrics chan telegraf.Metric

defaultTags map[string]string
Expand Down
37 changes: 30 additions & 7 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,7 @@ func (a *Agent) gatherParallel(metricC chan telegraf.Metric) error {
}
}

if err := input.Input.Gather(acc); err != nil {
log.Printf("Error in input [%s]: %s", input.Name, err)
}

gatherWithTimeout(input, acc, a.Config.Agent.Interval.Duration)
}(input)
}

Expand Down Expand Up @@ -176,9 +173,7 @@ func (a *Agent) gatherSeparate(
acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags)

if err := input.Input.Gather(acc); err != nil {
log.Printf("Error in input [%s]: %s", input.Name, err)
}
gatherWithTimeout(input, acc, input.Config.Interval)

elapsed := time.Since(start)
if !a.Config.Agent.Quiet {
Expand All @@ -199,6 +194,34 @@ func (a *Agent) gatherSeparate(
}
}

// gatherWithTimeout gathers from the given input, with the given timeout.
// if Gather() times out, unfortunately there isn't much it can do, and so it
// simply returns and logs a "FATAL ERROR"
func gatherWithTimeout(
input *internal_models.RunningInput,
acc *accumulator,
timeout time.Duration,
) {
timer := time.NewTimer(timeout)
done := make(chan error)
go func() {
done <- input.Input.Gather(acc)
}()

select {
case err := <-done:
if err != nil {
log.Printf("ERROR in input [%s]: %s", input.Name, err)
}
return
case <-timer.C:
log.Printf("FATAL ERROR: input [%s] took longer than interval (%s)"+
" to run. Leaving behind a hung process and moving on.",
input.Name, timeout)
return
}
}

// Test verifies that we can 'Gather' from all inputs with their configured
// Config struct
func (a *Agent) Test() error {
Expand Down

0 comments on commit 7a483a0

Please sign in to comment.