Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds periodical http config reloads and change detection #10102

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,35 @@ func TestAgent_OmitHostname(t *testing.T) {
func TestAgent_LoadPlugin(t *testing.T) {
c := config.NewConfig()
c.InputFilters = []string{"mysql"}
err := c.LoadConfig("../config/testdata/telegraf-agent.toml")
_, err := c.LoadConfig("../config/testdata/telegraf-agent.toml", nil)
assert.NoError(t, err)
a, _ := NewAgent(c)
assert.Equal(t, 1, len(a.Config.Inputs))

c = config.NewConfig()
c.InputFilters = []string{"foo"}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
_, err = c.LoadConfig("../config/testdata/telegraf-agent.toml", nil)
assert.NoError(t, err)
a, _ = NewAgent(c)
assert.Equal(t, 0, len(a.Config.Inputs))

c = config.NewConfig()
c.InputFilters = []string{"mysql", "foo"}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
_, err = c.LoadConfig("../config/testdata/telegraf-agent.toml", nil)
assert.NoError(t, err)
a, _ = NewAgent(c)
assert.Equal(t, 1, len(a.Config.Inputs))

c = config.NewConfig()
c.InputFilters = []string{"mysql", "redis"}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
_, err = c.LoadConfig("../config/testdata/telegraf-agent.toml", nil)
assert.NoError(t, err)
a, _ = NewAgent(c)
assert.Equal(t, 2, len(a.Config.Inputs))

c = config.NewConfig()
c.InputFilters = []string{"mysql", "foo", "redis", "bar"}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
_, err = c.LoadConfig("../config/testdata/telegraf-agent.toml", nil)
assert.NoError(t, err)
a, _ = NewAgent(c)
assert.Equal(t, 2, len(a.Config.Inputs))
Expand All @@ -59,50 +59,50 @@ func TestAgent_LoadPlugin(t *testing.T) {
func TestAgent_LoadOutput(t *testing.T) {
c := config.NewConfig()
c.OutputFilters = []string{"influxdb"}
err := c.LoadConfig("../config/testdata/telegraf-agent.toml")
_, err := c.LoadConfig("../config/testdata/telegraf-agent.toml", nil)
assert.NoError(t, err)
a, _ := NewAgent(c)
assert.Equal(t, 2, len(a.Config.Outputs))

c = config.NewConfig()
c.OutputFilters = []string{"kafka"}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
_, err = c.LoadConfig("../config/testdata/telegraf-agent.toml", nil)
assert.NoError(t, err)
a, _ = NewAgent(c)
assert.Equal(t, 1, len(a.Config.Outputs))

c = config.NewConfig()
c.OutputFilters = []string{}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
_, err = c.LoadConfig("../config/testdata/telegraf-agent.toml", nil)
assert.NoError(t, err)
a, _ = NewAgent(c)
assert.Equal(t, 3, len(a.Config.Outputs))

c = config.NewConfig()
c.OutputFilters = []string{"foo"}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
_, err = c.LoadConfig("../config/testdata/telegraf-agent.toml", nil)
assert.NoError(t, err)
a, _ = NewAgent(c)
assert.Equal(t, 0, len(a.Config.Outputs))

c = config.NewConfig()
c.OutputFilters = []string{"influxdb", "foo"}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
_, err = c.LoadConfig("../config/testdata/telegraf-agent.toml", nil)
assert.NoError(t, err)
a, _ = NewAgent(c)
assert.Equal(t, 2, len(a.Config.Outputs))

c = config.NewConfig()
c.OutputFilters = []string{"influxdb", "kafka"}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
_, err = c.LoadConfig("../config/testdata/telegraf-agent.toml", nil)
assert.NoError(t, err)
assert.Equal(t, 3, len(c.Outputs))
a, _ = NewAgent(c)
assert.Equal(t, 3, len(a.Config.Outputs))

c = config.NewConfig()
c.OutputFilters = []string{"influxdb", "foo", "kafka", "bar"}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
_, err = c.LoadConfig("../config/testdata/telegraf-agent.toml", nil)
assert.NoError(t, err)
a, _ = NewAgent(c)
assert.Equal(t, 3, len(a.Config.Outputs))
Expand Down
87 changes: 81 additions & 6 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ import (
"github.com/influxdata/telegraf/plugins/outputs"
_ "github.com/influxdata/telegraf/plugins/outputs/all"
_ "github.com/influxdata/telegraf/plugins/processors/all"

"github.com/lthibault/jitterbug"
"gopkg.in/tomb.v1"

ctls "github.com/influxdata/telegraf/plugins/common/tls"
)

type sliceFlags []string
Expand Down Expand Up @@ -56,6 +60,18 @@ var fTestWait = flag.Int("test-wait", 0, "wait up to this many seconds for servi
var fConfigs sliceFlags
var fConfigDirs sliceFlags
var fWatchConfig = flag.String("watch-config", "", "Monitoring config changes [notify, poll]")

var fHTTPConfWatchInterval = flag.Duration("watch-interval", 0, "Interval to monitor http based config files ( default 0 = deactivated)")
var fHTTPConfWatchJitter = flag.Duration("watch-jitter", time.Second*3, "time variation to ensure avoid all agents downloading the config file from the server hosting it at the same time (default 10s)")
var fHTTPConfRetryInterval = flag.Duration("watch-retry-interval", time.Second*20, "time in seconds to retry download config if download failed (default 20s)")
var fHTTPConfMaxRetries = flag.Int("watch-max-retries", 3, "number of retries to download config file if previously failed (default 3)")
var fHTTPConfTLSCert = flag.String("watch-tls-cert", "", "Certificate File path for TLS Config on HTTP(S) Config downloads")
var fHTTPConfTLSKey = flag.String("watch-tls-key", "", "Certificate Key File path for TLS Config on HTTP(S) Config downloads")
var fHTTPConfTLSKeyPwd = flag.String("watch-tls-key-pwd", "", "Certificate Key File password")
var fHTTPConfTLSCA = flag.String("watch-tls-ca", "", "CA File path for TLS Config on HTTP(S) Config downloads")
var fHTTPConfSSLInsecureSkipVerify = flag.Bool("watch-insecure-skip-verify", false, "If set this flag we use TLS but skip chain & host verification")
var fHTTPConfSNI = flag.String("watch-tls-sni", "", "indicates which hostname it is attempting to connect to at the start of the TLS handshaking process")

var fVersion = flag.Bool("version", false, "display the version and exit")
var fSampleConfig = flag.Bool("sample-config", false,
"print out full sample configuration")
Expand Down Expand Up @@ -126,6 +142,25 @@ func reloadLoop(
}
}
}
if *fHTTPConfWatchInterval != 0 {
log.Printf("I! Reloading Telegraf config from HTTP Server")
settings = &config.HTTPLoadSettings{
ReloadInterval: *fHTTPConfWatchInterval,
ReloadJitter: *fHTTPConfWatchJitter,
MaxRetries: *fHTTPConfMaxRetries,
RetryInterval: *fHTTPConfRetryInterval,
ClientConf: &ctls.ClientConfig{
TLSCA: *fHTTPConfTLSCA,
TLSCert: *fHTTPConfTLSCert,
TLSKey: *fHTTPConfTLSKey,
TLSKeyPwd: *fHTTPConfTLSKeyPwd,
InsecureSkipVerify: *fHTTPConfSSLInsecureSkipVerify,
ServerName: *fHTTPConfSNI,
},
CacheData: make(map[string]*config.HTTPCacheInfo),
}
go watchRemoteConfig(signals, settings)
}
go func() {
select {
case sig := <-signals:
Expand All @@ -140,9 +175,46 @@ func reloadLoop(
}
}()

err := runAgent(ctx, inputFilters, outputFilters)
if err != nil && err != context.Canceled {
log.Fatalf("E! [telegraf] Error running agent: %v", err)
if *fHTTPConfWatchInterval == 0 {
//working with file based config, should exit
err := runAgent(ctx, inputFilters, outputFilters)
if err != nil && err != context.Canceled {
log.Printf("E! [telegraf] Error running agent: %v", err)
return
}
} else {
// working with http remote base config and with watch interval
// should wait until config server is ok, the watchRemoteConfig
// background process will detect and context will be canceled then.
err := runAgent(ctx, inputFilters, outputFilters)
if err != nil && err != context.Canceled {
log.Printf("W! [telegraf] Error when running agent: %v [Waiting for Config Served OK]", err)
<-ctx.Done()
log.Printf("D! Context Canceled ")
}
}
}
}

func watchRemoteConfig(signals chan os.Signal, settings *config.HTTPLoadSettings) {
t := jitterbug.New(settings.ReloadInterval, &jitterbug.Norm{Stdev: settings.ReloadJitter})
log.Printf("D! config reload Period set each [%s] with Jitter [%s]", settings.ReloadInterval, settings.ReloadJitter)
for tick := range t.C {
log.Printf("I! retrieving new HTTP based remote config at Tick[%s]", tick.Format(time.RFC1123))

c := config.NewConfig()
for _, fConfig := range fConfigs {
modified, err := c.LoadConfig(fConfig, settings)
if err != nil {
log.Printf("E! can not get remote config file %s: Error %s", fConfig, err)
continue
}
if modified {
log.Printf("I! Remote Config [%s] Modified !! ", fConfig)
signals <- syscall.SIGHUP
return
}
log.Printf("I! Remote Config [%s] Not Modified ", fConfig)
}
}
}
Expand Down Expand Up @@ -187,6 +259,8 @@ func watchLocalConfig(signals chan os.Signal, fConfig string) {
signals <- syscall.SIGHUP
}

var settings *config.HTTPLoadSettings

func runAgent(ctx context.Context,
inputFilters []string,
outputFilters []string,
Expand All @@ -200,20 +274,21 @@ func runAgent(ctx context.Context,
var err error
// providing no "config" flag should load default config
if len(fConfigs) == 0 {
err = c.LoadConfig("")
_, err = c.LoadConfig("", nil) // config settings not needed on default config file
if err != nil {
return err
}
}

for _, fConfig := range fConfigs {
err = c.LoadConfig(fConfig)
_, err := c.LoadConfig(fConfig, settings)
if err != nil {
return err
}
}

for _, fConfigDirectory := range fConfigDirs {
err = c.LoadDirectory(fConfigDirectory)
err = c.LoadDirectory(fConfigDirectory) //settings not needed when
if err != nil {
return err
}
Expand Down
Loading