Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for once mode; run processors and aggregators during test. #7474

Merged
merged 7 commits into from
Jun 1, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 191 additions & 68 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,107 +142,230 @@ func (a *Agent) Run(ctx context.Context) error {
return nil
}

// Test runs the inputs once and prints the output to stdout in line protocol.
func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error {
var wg sync.WaitGroup
metricC := make(chan telegraf.Metric)
nulC := make(chan telegraf.Metric)
defer func() {
close(metricC)
close(nulC)
wg.Wait()
}()

wg.Add(1)
go func() {
defer wg.Done()

func (a *Agent) Test(ctx context.Context, wait time.Duration) error {
outputF := func(src <-chan telegraf.Metric) {
s := influx.NewSerializer()
s.SetFieldSortOrder(influx.SortFields)
for metric := range metricC {

for metric := range src {
octets, err := s.Serialize(metric)
if err == nil {
fmt.Print("> ", string(octets))
}
metric.Reject()
}
}()
}

wg.Add(1)
go func() {
defer wg.Done()
for range nulC {
err := a.test(ctx, wait, outputF)
if err != nil {
return err
}

if models.GlobalGatherErrors.Get() != 0 {
return fmt.Errorf("input plugins recorded %d errors", models.GlobalGatherErrors.Get())
}
return nil

}
func (a *Agent) Once(ctx context.Context, wait time.Duration) error {
outputF := func(src <-chan telegraf.Metric) {
interval := a.Config.Agent.FlushInterval.Duration

ctx, cancel := context.WithCancel(context.Background())

var wg sync.WaitGroup
for _, output := range a.Config.Outputs {
interval := interval
// Overwrite agent flush_interval if this plugin has its own.
if output.Config.FlushInterval != 0 {
interval = output.Config.FlushInterval
}

jitter := 0 * time.Second

ticker := NewRollingTicker(interval, jitter)
defer ticker.Stop()

wg.Add(1)
go func(output *models.RunningOutput) {
defer wg.Done()
a.flushLoop(ctx, output, ticker)
}(output)
}
}()

hasServiceInputs := false
for _, input := range a.Config.Inputs {
if _, ok := input.Input.(telegraf.ServiceInput); ok {
hasServiceInputs = true
break
for metric := range src {
for i, output := range a.Config.Outputs {
if i == len(a.Config.Outputs)-1 {
output.AddMetric(metric)
} else {
output.AddMetric(metric.Copy())
}
}
}

cancel()
wg.Wait()
}

err := a.test(ctx, wait, outputF)
if err != nil {
return err
}

if models.GlobalGatherErrors.Get() != 0 {
return fmt.Errorf("input plugins recorded %d errors", models.GlobalGatherErrors.Get())
}

unsent := 0
for _, output := range a.Config.Outputs {
unsent += output.BufferLength()
}

return fmt.Errorf("output plugins unable to send %d metrics", unsent)
}

func (a *Agent) test(ctx context.Context, wait time.Duration, outputF func(<-chan telegraf.Metric)) error {
log.Printf("D! [agent] Initializing plugins")
err := a.initPlugins()
if err != nil {
return err
}

if hasServiceInputs {
log.Printf("D! [agent] Starting service inputs")
err := a.startServiceInputs(ctx, metricC)
if err != nil {
return err
}
log.Printf("D! [agent] Connecting outputs")
err = a.connectOutputs(ctx)
if err != nil {
return err
}

hasErrors := false
for _, input := range a.Config.Inputs {
select {
case <-ctx.Done():
return nil
default:
break
inputC := make(chan telegraf.Metric, 100)
procC := make(chan telegraf.Metric, 100)
outputC := make(chan telegraf.Metric, 100)

startTime := time.Now()

log.Printf("D! [agent] Starting service inputs")
err = a.startServiceInputs(ctx, inputC)
if err != nil {
return err
}

var wg sync.WaitGroup

src := inputC
dst := inputC

nul := make(chan telegraf.Metric)
wg.Add(1)
go func() {
defer wg.Done()
for range nul {
}
}()

acc := NewAccumulator(input, metricC)
acc.SetPrecision(a.Precision())
wg.Add(1)
go func(dst chan telegraf.Metric) {
defer wg.Done()

// Special instructions for some inputs. cpu, for example, needs to be
// run twice in order to return cpu usage percentages.
switch input.Config.Name {
case "cpu", "mongodb", "procstat":
nulAcc := NewAccumulator(input, nulC)
nulAcc.SetPrecision(a.Precision())
if err := input.Input.Gather(nulAcc); err != nil {
acc.AddError(err)
hasErrors = true
}
var wg sync.WaitGroup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have variable shadowing here. It'd be more clear to give this a different name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for _, input := range a.Config.Inputs {
wg.Add(1)
go func(input *models.RunningInput) {
defer wg.Done()
jitter := 0 * time.Second
interval := a.Config.Agent.Interval.Duration

// Overwrite agent interval if this plugin has its own.
if input.Config.Interval != 0 {
interval = input.Config.Interval
}

ticker := NewUnalignedTicker(interval, jitter)
defer ticker.Stop()
<-ticker.Elapsed()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we want to collect immediately for the -test and -once command line arguments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed


switch input.Config.Name {
case "cpu", "mongodb", "procstat":
nulAcc := NewAccumulator(input, nul)
nulAcc.SetPrecision(a.Precision())
if err := input.Input.Gather(nulAcc); err != nil {
nulAcc.AddError(err)
}

time.Sleep(500 * time.Millisecond)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lost the comment here, but it'd still be helpful. Very wtf inducing. Or.. This might go well in a separate function, then you could name it gatherOnceAndDiscard or something.


acc := NewAccumulator(input, dst)
acc.SetPrecision(a.Precision())

if err := input.Input.Gather(acc); err != nil {
acc.AddError(err)
}
}(input)

time.Sleep(500 * time.Millisecond)
if err := input.Input.Gather(acc); err != nil {
acc.AddError(err)
hasErrors = true
}
default:
if err := input.Input.Gather(acc); err != nil {
acc.AddError(err)
hasErrors = true
}
}
}
wg.Wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just clarifying that this is a wait on the internal wg, not the external wg.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a function for running the inputs in test mode, but yeah it was.


internal.SleepContext(ctx, wait)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do -test and -once really need to sleep?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for the --test-wait cli option, which I am supporting here in --once mode as well. This option can be used to let service inputs run for a bit, allowing you to test plugins like influxdb_listener.


if hasServiceInputs {
log.Printf("D! [agent] Waiting for service inputs")
internal.SleepContext(ctx, waitDuration)
log.Printf("D! [agent] Stopping service inputs")
a.stopServiceInputs()

close(nul)
close(dst)
log.Printf("D! [agent] Input channel closed")
}(dst)

src = dst

if len(a.Config.Processors) > 0 {
dst = procC

wg.Add(1)
go func(src, dst chan telegraf.Metric) {
defer wg.Done()

err := a.runProcessors(src, dst)
if err != nil {
log.Printf("E! [agent] Error running processors: %v", err)
}
close(dst)
log.Printf("D! [agent] Processor channel closed")
}(src, dst)

src = dst
}

if hasErrors {
return fmt.Errorf("One or more input plugins had an error")
if len(a.Config.Aggregators) > 0 {
dst = outputC

wg.Add(1)
go func(src, dst chan telegraf.Metric) {
defer wg.Done()

err := a.runAggregators(startTime, src, dst)
if err != nil {
log.Printf("E! [agent] Error running aggregators: %v", err)
}
close(dst)
log.Printf("D! [agent] Output channel closed")
}(src, dst)

src = dst
}

wg.Add(1)
go func(src <-chan telegraf.Metric) {
defer wg.Done()
outputF(src)
}(src)

wg.Wait()

log.Printf("D! [agent] Closing outputs")
a.closeOutputs()

log.Printf("D! [agent] Stopped Successfully")

return nil
}

Expand Down
10 changes: 8 additions & 2 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var fServiceDisplayName = flag.String("service-display-name", "Telegraf Data Col
var fRunAsConsole = flag.Bool("console", false, "run as console application (windows only)")
var fPlugins = flag.String("plugin-directory", "",
"path to directory containing external plugins")
var fRunOnce = flag.Bool("once", false, "run one gather and exit")

var (
version string
Expand Down Expand Up @@ -169,9 +170,14 @@ func runAgent(ctx context.Context,

logger.SetupLogging(logConfig)

if *fRunOnce {
wait := time.Duration(*fTestWait) * time.Second
return ag.Once(ctx, wait)
}

if *fTest || *fTestWait != 0 {
testWaitDuration := time.Duration(*fTestWait) * time.Second
return ag.Test(ctx, testWaitDuration)
wait := time.Duration(*fTestWait) * time.Second
return ag.Test(ctx, wait)
}

log.Printf("I! Loaded inputs: %s", strings.Join(c.InputNames(), " "))
Expand Down
7 changes: 3 additions & 4 deletions internal/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ The commands & flags are:
Valid values are 'agent', 'global_tags', 'outputs',
'processors', 'aggregators' and 'inputs'
--sample-config print out full sample configuration
--test enable test mode: gather metrics, print them out,
and exit. Note: Test mode only runs inputs, not
processors, aggregators, or outputs
--once enable once mode: gather metrics once, write them, and exit
--test enable test mode: gather metrics once and print them
--test-wait wait up to this many seconds for service
inputs to complete in test mode
inputs to complete in test or once mode
--usage <plugin> print usage for a plugin, ie, 'telegraf --usage mysql'
--version display the version and exit

Expand Down
7 changes: 3 additions & 4 deletions internal/usage_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ The commands & flags are:
--section-filter filter config sections to output, separator is :
Valid values are 'agent', 'global_tags', 'outputs',
'processors', 'aggregators' and 'inputs'
--test enable test mode: gather metrics, print them out,
and exit. Note: Test mode only runs inputs, not
processors, aggregators, or outputs
--once enable once mode: gather metrics once, write them, and exit
--test enable test mode: gather metrics once and print them
--test-wait wait up to this many seconds for service
inputs to complete in test mode
inputs to complete in test or once mode
--usage <plugin> print usage for a plugin, ie, 'telegraf --usage mysql'
--version display the version and exit

Expand Down
4 changes: 4 additions & 0 deletions models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,7 @@ func (r *RunningOutput) LogBufferStatus() {
func (r *RunningOutput) Log() telegraf.Logger {
return r.log
}

func (r *RunningOutput) BufferLength() int {
return r.buffer.Len()
}