Skip to content
This repository has been archived by the owner on Sep 25, 2019. It is now read-only.

Commit

Permalink
Run scheduled flushes in background
Browse files Browse the repository at this point in the history
doing this unblocks incoming metrics while waiting for a flush to take
place.

we have to create a semaphore so that we can
'skip' flushes that try to run while a flush is already running.

closes influxdata#2262
  • Loading branch information
sparrc authored and mlinde201 committed Feb 6, 2017
1 parent a440d1e commit 80c73d0
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
### Bugfixes

- [#2077](https://github.com/influxdata/telegraf/issues/2077): SQL Server Input - Arithmetic overflow error converting numeric to data type int.
- [#2262](https://github.com/influxdata/telegraf/issues/2262): Flush jitter can inhibit metric collection.

## v1.2 [2017-01-00]

Expand Down
15 changes: 13 additions & 2 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
}()

ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
semaphore := make(chan struct{}, 1)
for {
select {
case <-shutdown:
Expand All @@ -295,8 +296,18 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
a.flush()
return nil
case <-ticker.C:
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush()
go func() {
select {
case semaphore <- struct{}{}:
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush()
<-semaphore
default:
// skipping this flush because one is already happening
log.Println("W! Skipping a scheduled flush because there is" +
" already a flush ongoing.")
}
}()
case metric := <-metricC:
// NOTE potential bottleneck here as we put each metric through the
// processors serially.
Expand Down

0 comments on commit 80c73d0

Please sign in to comment.