From 5670330752ee374b222116293f79c9403447be09 Mon Sep 17 00:00:00 2001 From: Jacques Heunis Date: Sat, 23 May 2020 13:09:47 +0100 Subject: [PATCH] Add a --once flag that will run the entire pipeline exactly once and exit. The use-case for this is any situation where it is preferable to manage metric posting from some external service (such as cron). --- agent/agent.go | 167 ++++++++++++++++++++++++++++++++++++++ cmd/telegraf/telegraf.go | 5 ++ internal/usage.go | 1 + internal/usage_windows.go | 1 + 4 files changed, 174 insertions(+) diff --git a/agent/agent.go b/agent/agent.go index 9ac51471a7587..a394df22eb673 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "math" "os" "runtime" "sync" @@ -142,6 +143,172 @@ func (a *Agent) Run(ctx context.Context) error { return nil } +func (a *Agent) RunOnce(ctx context.Context) error { + log.Printf("D! [agent] Initializing plugins") + err := a.initPlugins() + if err != nil { + return err + } + + log.Printf("D! [agent] Connecting outputs") + err = a.connectOutputs(ctx) + if err != nil { + return err + } + + for _, input := range a.Config.Inputs { + if _, ok := input.Input.(telegraf.ServiceInput); ok { + return fmt.Errorf("Error: input '%s' is a service input and is configured. Service "+ + "inputs are not supported when running with '--once'", input.Config.Name) + } + } + + inputChan := make(chan telegraf.Metric) + nullChan := make(chan telegraf.Metric) + aggregateChan := make(chan telegraf.Metric) + + go func() { + for range nullChan { + } + }() + + var inputWait sync.WaitGroup + for _, input := range a.Config.Inputs { + inputWait.Add(1) + go func(input *models.RunningInput) { + defer inputWait.Done() + acc := NewAccumulator(input, inputChan) + acc.SetPrecision(a.Precision()) + + // Special instructions for some inputs. cpu, for example, needs to be + // run twice in order to return cpu usage percentages. + switch input.Config.Name { + case "cpu", "mongodb", "procstat": + nulAcc := NewAccumulator(input, nullChan) + nulAcc.SetPrecision(a.Precision()) + if err := input.Input.Gather(nulAcc); err != nil { + acc.AddError(err) + return + } + + time.Sleep(500 * time.Millisecond) + } + + if err := input.Input.Gather(acc); err != nil { + acc.AddError(err) + } + }(input) + } + + var aggregateWait sync.WaitGroup + aggregateWait.Add(1) + go func() { + defer aggregateWait.Done() + for _, agg := range a.Config.Aggregators { + since := time.Unix(0, 0) + until := time.Unix(math.MaxInt64, math.MaxInt64) + agg.UpdateWindow(since, until) + } + + for metric := range inputChan { + log.Printf("D! [agent] Received input") + metrics := a.applyProcessors(metric) + for _, metric := range metrics { + var dropOriginal bool + for _, agg := range a.Config.Aggregators { + if ok := agg.Add(metric); ok { + dropOriginal = true + } + } + + if !dropOriginal { + aggregateChan <- metric + } else { + metric.Drop() + } + } + } + + for _, agg := range a.Config.Aggregators { + acc := NewAccumulator(agg, aggregateChan) + acc.SetPrecision(a.Precision()) + agg.Aggregator.Push(acc) + } + }() + + buffer := make([]telegraf.Metric, 0) + var bufferWait sync.WaitGroup + bufferWait.Add(1) + go func() { + defer bufferWait.Done() + for aggMetric := range aggregateChan { + metrics := a.applyProcessors(aggMetric) + for _, metric := range metrics { + buffer = append(buffer, metric) + } + } + }() + + inputWait.Wait() + close(inputChan) + close(nullChan) + aggregateWait.Wait() + close(aggregateChan) + bufferWait.Wait() + + batchSize := a.Config.Agent.MetricBatchSize + if batchSize == 0 { + batchSize = models.DEFAULT_METRIC_BATCH_SIZE + } + var outputWait sync.WaitGroup + for _, output := range a.Config.Outputs { + outputWait.Add(1) + go func(output *models.RunningOutput) { + defer outputWait.Done() + batch := make([]telegraf.Metric, 0, batchSize) + + for _, bufferMetric := range buffer { + if ok := output.Config.Filter.Select(bufferMetric); !ok { + continue + } + + metric := bufferMetric.Copy() + output.Config.Filter.Modify(metric) + if len(metric.FieldList()) == 0 { + continue + } + + if len(output.Config.NameOverride) > 0 { + metric.SetName(output.Config.NameOverride) + } + + if len(output.Config.NamePrefix) > 0 { + metric.AddPrefix(output.Config.NamePrefix) + } + + if len(output.Config.NameSuffix) > 0 { + metric.AddSuffix(output.Config.NameSuffix) + } + + batch = append(batch, metric) + if len(batch) == batchSize { + err = output.Output.Write(batch) + batch = batch[:0] + } + } + + if len(batch) > 0 { + err = output.Output.Write(batch) + } + }(output) + } + + outputWait.Wait() + a.closeOutputs() + + return nil +} + // Test runs the inputs once and prints the output to stdout in line protocol. func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error { var wg sync.WaitGroup diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 4f51bc2e13ca9..7a372a2f92c4c 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -37,6 +37,7 @@ var fQuiet = flag.Bool("quiet", false, "run in quiet mode") var fTest = flag.Bool("test", false, "enable test mode: gather metrics, print them out, and exit. Note: Test mode only runs inputs, not processors, aggregators, or outputs") var fTestWait = flag.Int("test-wait", 0, "wait up to this many seconds for service inputs to complete in test mode") +var fOnce = flag.Bool("once", false, "run each input/processor/aggregator/output once and exit") var fConfig = flag.String("config", "", "configuration file to load") var fConfigDirectory = flag.String("config-directory", "", "directory containing additional *.conf files") @@ -180,6 +181,10 @@ func runAgent(ctx context.Context, log.Printf("I! Loaded outputs: %s", strings.Join(c.OutputNames(), " ")) log.Printf("I! Tags enabled: %s", c.ListTags()) + if *fOnce { + return ag.RunOnce(ctx) + } + if *fPidfile != "" { f, err := os.OpenFile(*fPidfile, os.O_CREATE|os.O_WRONLY, 0644) if err != nil { diff --git a/internal/usage.go b/internal/usage.go index b0df62a6f4b20..8d019438c4e3b 100644 --- a/internal/usage.go +++ b/internal/usage.go @@ -37,6 +37,7 @@ The commands & flags are: processors, aggregators, or outputs --test-wait wait up to this many seconds for service inputs to complete in test mode + --once run each input/processor/aggregator/output once and exit --usage print usage for a plugin, ie, 'telegraf --usage mysql' --version display the version and exit diff --git a/internal/usage_windows.go b/internal/usage_windows.go index e205d6c1f23d5..d2933cb4070c7 100644 --- a/internal/usage_windows.go +++ b/internal/usage_windows.go @@ -34,6 +34,7 @@ The commands & flags are: processors, aggregators, or outputs --test-wait wait up to this many seconds for service inputs to complete in test mode + --once run each input/processor/aggregator/output once and exit --usage print usage for a plugin, ie, 'telegraf --usage mysql' --version display the version and exit