Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for once mode; run processors and aggregators during test. #7474

Merged
merged 7 commits into from
Jun 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 210 additions & 68 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,108 +142,250 @@ func (a *Agent) Run(ctx context.Context) error {
return nil
}

// Test runs the inputs once and prints the output to stdout in line protocol.
func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error {
var wg sync.WaitGroup
metricC := make(chan telegraf.Metric)
nulC := make(chan telegraf.Metric)
defer func() {
close(metricC)
close(nulC)
wg.Wait()
}()

wg.Add(1)
go func() {
defer wg.Done()

// Test runs the inputs, processors and aggregators for a single gather and
// writes the metrics to stdout.
func (a *Agent) Test(ctx context.Context, wait time.Duration) error {
outputF := func(src <-chan telegraf.Metric) {
s := influx.NewSerializer()
s.SetFieldSortOrder(influx.SortFields)
for metric := range metricC {

for metric := range src {
octets, err := s.Serialize(metric)
if err == nil {
fmt.Print("> ", string(octets))
}
metric.Reject()
}
}()
}

wg.Add(1)
go func() {
defer wg.Done()
for range nulC {
err := a.test(ctx, wait, outputF)
if err != nil {
return err
}

if models.GlobalGatherErrors.Get() != 0 {
return fmt.Errorf("input plugins recorded %d errors", models.GlobalGatherErrors.Get())
}
return nil

}

// Once runs the full agent for a single gather.
func (a *Agent) Once(ctx context.Context, wait time.Duration) error {
outputF := func(src <-chan telegraf.Metric) {
interval := a.Config.Agent.FlushInterval.Duration

ctx, cancel := context.WithCancel(context.Background())

var wg sync.WaitGroup
for _, output := range a.Config.Outputs {
interval := interval
// Overwrite agent flush_interval if this plugin has its own.
if output.Config.FlushInterval != 0 {
interval = output.Config.FlushInterval
}

jitter := 0 * time.Second

ticker := NewRollingTicker(interval, jitter)
defer ticker.Stop()

wg.Add(1)
go func(output *models.RunningOutput) {
defer wg.Done()
a.flushLoop(ctx, output, ticker)
}(output)
}
}()

hasServiceInputs := false
for _, input := range a.Config.Inputs {
if _, ok := input.Input.(telegraf.ServiceInput); ok {
hasServiceInputs = true
break
for metric := range src {
for i, output := range a.Config.Outputs {
if i == len(a.Config.Outputs)-1 {
output.AddMetric(metric)
} else {
output.AddMetric(metric.Copy())
}
}
}

cancel()
wg.Wait()
}

err := a.test(ctx, wait, outputF)
if err != nil {
return err
}

if models.GlobalGatherErrors.Get() != 0 {
return fmt.Errorf("input plugins recorded %d errors", models.GlobalGatherErrors.Get())
}

unsent := 0
for _, output := range a.Config.Outputs {
unsent += output.BufferLength()
}
if unsent != 0 {
return fmt.Errorf("output plugins unable to send %d metrics", unsent)
}
return nil
}

// Test runs the agent and performs a single gather sending output to the
// outputF. After gathering pauses for the wait duration to allow service
// inputs to run.
func (a *Agent) test(ctx context.Context, wait time.Duration, outputF func(<-chan telegraf.Metric)) error {
log.Printf("D! [agent] Initializing plugins")
err := a.initPlugins()
if err != nil {
return err
}

if hasServiceInputs {
log.Printf("D! [agent] Starting service inputs")
err := a.startServiceInputs(ctx, metricC)
if err != nil {
return err
}
log.Printf("D! [agent] Connecting outputs")
err = a.connectOutputs(ctx)
if err != nil {
return err
}

hasErrors := false
for _, input := range a.Config.Inputs {
select {
case <-ctx.Done():
return nil
default:
break
}
inputC := make(chan telegraf.Metric, 100)
procC := make(chan telegraf.Metric, 100)
outputC := make(chan telegraf.Metric, 100)

acc := NewAccumulator(input, metricC)
acc.SetPrecision(a.Precision())
startTime := time.Now()

// Special instructions for some inputs. cpu, for example, needs to be
// run twice in order to return cpu usage percentages.
switch input.Config.Name {
case "cpu", "mongodb", "procstat":
nulAcc := NewAccumulator(input, nulC)
nulAcc.SetPrecision(a.Precision())
if err := input.Input.Gather(nulAcc); err != nil {
acc.AddError(err)
hasErrors = true
var wg sync.WaitGroup

src := inputC
dst := inputC

wg.Add(1)
go func(dst chan telegraf.Metric) {
defer wg.Done()

a.testRunInputs(ctx, wait, dst)

close(dst)
log.Printf("D! [agent] Input channel closed")
}(dst)

src = dst

if len(a.Config.Processors) > 0 {
dst = procC

wg.Add(1)
go func(src, dst chan telegraf.Metric) {
defer wg.Done()

err := a.runProcessors(src, dst)
if err != nil {
log.Printf("E! [agent] Error running processors: %v", err)
}
close(dst)
log.Printf("D! [agent] Processor channel closed")
}(src, dst)

time.Sleep(500 * time.Millisecond)
if err := input.Input.Gather(acc); err != nil {
acc.AddError(err)
hasErrors = true
src = dst
}

if len(a.Config.Aggregators) > 0 {
dst = outputC

wg.Add(1)
go func(src, dst chan telegraf.Metric) {
defer wg.Done()

err := a.runAggregators(startTime, src, dst)
if err != nil {
log.Printf("E! [agent] Error running aggregators: %v", err)
}
default:
if err := input.Input.Gather(acc); err != nil {
close(dst)
log.Printf("D! [agent] Output channel closed")
}(src, dst)

src = dst
}

wg.Add(1)
go func(src <-chan telegraf.Metric) {
defer wg.Done()
outputF(src)
}(src)

wg.Wait()

log.Printf("D! [agent] Closing outputs")
a.closeOutputs()

log.Printf("D! [agent] Stopped Successfully")

return nil
}

func (a *Agent) testRunInputs(
ctx context.Context,
wait time.Duration,
dst chan<- telegraf.Metric,
) {
log.Printf("D! [agent] Starting service inputs")
for _, input := range a.Config.Inputs {
if si, ok := input.Input.(telegraf.ServiceInput); ok {
// Service input plugins are not subject to timestamp rounding.
// This only applies to the accumulator passed to Start(), the
// Gather() accumulator does apply rounding according to the
// precision agent setting.
acc := NewAccumulator(input, dst)
acc.SetPrecision(time.Nanosecond)

err := si.Start(acc)
if err != nil {
acc.AddError(err)
hasErrors = true
si.Stop()
continue
}
}
}

if hasServiceInputs {
log.Printf("D! [agent] Waiting for service inputs")
internal.SleepContext(ctx, waitDuration)
log.Printf("D! [agent] Stopping service inputs")
a.stopServiceInputs()
}
nul := make(chan telegraf.Metric)
go func() {
for range nul {
}
}()

var wg sync.WaitGroup
for _, input := range a.Config.Inputs {
wg.Add(1)
go func(input *models.RunningInput) {
defer wg.Done()

// Run plugins that require multiple gathers to calculate rate
// and delta metrics twice.
switch input.Config.Name {
case "cpu", "mongodb", "procstat":
nulAcc := NewAccumulator(input, nul)
nulAcc.SetPrecision(a.Precision())
if err := input.Input.Gather(nulAcc); err != nil {
nulAcc.AddError(err)
}

time.Sleep(500 * time.Millisecond)
}

if hasErrors {
return fmt.Errorf("One or more input plugins had an error")
acc := NewAccumulator(input, dst)
acc.SetPrecision(a.Precision())

if err := input.Input.Gather(acc); err != nil {
acc.AddError(err)
}
}(input)
}
return nil
wg.Wait()
close(nul)

internal.SleepContext(ctx, wait)

log.Printf("D! [agent] Stopping service inputs")
a.stopServiceInputs()

}

// runInputs starts and triggers the periodic gather for Inputs.
Expand Down
10 changes: 8 additions & 2 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var fServiceDisplayName = flag.String("service-display-name", "Telegraf Data Col
var fRunAsConsole = flag.Bool("console", false, "run as console application (windows only)")
var fPlugins = flag.String("plugin-directory", "",
"path to directory containing external plugins")
var fRunOnce = flag.Bool("once", false, "run one gather and exit")

var (
version string
Expand Down Expand Up @@ -169,9 +170,14 @@ func runAgent(ctx context.Context,

logger.SetupLogging(logConfig)

if *fRunOnce {
wait := time.Duration(*fTestWait) * time.Second
return ag.Once(ctx, wait)
}

if *fTest || *fTestWait != 0 {
testWaitDuration := time.Duration(*fTestWait) * time.Second
return ag.Test(ctx, testWaitDuration)
wait := time.Duration(*fTestWait) * time.Second
return ag.Test(ctx, wait)
}

log.Printf("I! Loaded inputs: %s", strings.Join(c.InputNames(), " "))
Expand Down
7 changes: 3 additions & 4 deletions internal/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ The commands & flags are:
Valid values are 'agent', 'global_tags', 'outputs',
'processors', 'aggregators' and 'inputs'
--sample-config print out full sample configuration
--test enable test mode: gather metrics, print them out,
and exit. Note: Test mode only runs inputs, not
processors, aggregators, or outputs
--once enable once mode: gather metrics once, write them, and exit
--test enable test mode: gather metrics once and print them
--test-wait wait up to this many seconds for service
inputs to complete in test mode
inputs to complete in test or once mode
--usage <plugin> print usage for a plugin, ie, 'telegraf --usage mysql'
--version display the version and exit

Expand Down
7 changes: 3 additions & 4 deletions internal/usage_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ The commands & flags are:
--section-filter filter config sections to output, separator is :
Valid values are 'agent', 'global_tags', 'outputs',
'processors', 'aggregators' and 'inputs'
--test enable test mode: gather metrics, print them out,
and exit. Note: Test mode only runs inputs, not
processors, aggregators, or outputs
--once enable once mode: gather metrics once, write them, and exit
--test enable test mode: gather metrics once and print them
--test-wait wait up to this many seconds for service
inputs to complete in test mode
inputs to complete in test or once mode
--usage <plugin> print usage for a plugin, ie, 'telegraf --usage mysql'
--version display the version and exit

Expand Down
4 changes: 4 additions & 0 deletions models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,7 @@ func (r *RunningOutput) LogBufferStatus() {
func (r *RunningOutput) Log() telegraf.Logger {
return r.log
}

func (r *RunningOutput) BufferLength() int {
return r.buffer.Len()
}