Skip to content

Commit

Permalink
Improve timeout in input plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
PierreF authored and sparrc committed Mar 1, 2016
1 parent ea7cbc7 commit 7416d6e
Show file tree
Hide file tree
Showing 21 changed files with 184 additions and 21 deletions.
5 changes: 4 additions & 1 deletion plugins/inputs/apache/apache.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}

var client = &http.Client{Transport: tr}
var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}

func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
resp, err := client.Get(addr.String())
Expand Down
12 changes: 11 additions & 1 deletion plugins/inputs/couchdb/couchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"reflect"
"strings"
"sync"
"time"
)

// Schema:
Expand Down Expand Up @@ -112,9 +113,18 @@ func (c *CouchDB) Gather(accumulator telegraf.Accumulator) error {

}

var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}

var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}

func (c *CouchDB) fetchAndInsertData(accumulator telegraf.Accumulator, host string) error {

response, error := http.Get(host)
response, error := client.Get(host)
if error != nil {
return error
}
Expand Down
8 changes: 7 additions & 1 deletion plugins/inputs/disque/disque.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
Expand All @@ -30,6 +31,8 @@ var sampleConfig = `
servers = ["localhost"]
`

var defaultTimeout = 5 * time.Second

func (r *Disque) SampleConfig() string {
return sampleConfig
}
Expand Down Expand Up @@ -107,7 +110,7 @@ func (g *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
addr.Host = addr.Host + ":" + defaultPort
}

c, err := net.Dial("tcp", addr.Host)
c, err := net.DialTimeout("tcp", addr.Host, defaultTimeout)
if err != nil {
return fmt.Errorf("Unable to connect to disque server '%s': %s", addr.Host, err)
}
Expand All @@ -132,6 +135,9 @@ func (g *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
g.c = c
}

// Extend connection
g.c.SetDeadline(time.Now().Add(defaultTimeout))

g.c.Write([]byte("info\r\n"))

r := bufio.NewReader(g.c)
Expand Down
7 changes: 6 additions & 1 deletion plugins/inputs/dovecot/dovecot.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ var sampleConfig = `
domains = []
`

var defaultTimeout = time.Second * time.Duration(5)

func (d *Dovecot) SampleConfig() string { return sampleConfig }

const defaultPort = "24242"
Expand Down Expand Up @@ -74,12 +76,15 @@ func (d *Dovecot) gatherServer(addr string, acc telegraf.Accumulator, doms map[s
return fmt.Errorf("Error: %s on url %s\n", err, addr)
}

c, err := net.Dial("tcp", addr)
c, err := net.DialTimeout("tcp", addr, defaultTimeout)
if err != nil {
return fmt.Errorf("Unable to connect to dovecot server '%s': %s", addr, err)
}
defer c.Close()

// Extend connection
c.SetDeadline(time.Now().Add(defaultTimeout))

c.Write([]byte("EXPORT\tdomain\n\n"))
var buf bytes.Buffer
io.Copy(&buf, c)
Expand Down
7 changes: 6 additions & 1 deletion plugins/inputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,12 @@ type Elasticsearch struct {

// NewElasticsearch return a new instance of Elasticsearch
func NewElasticsearch() *Elasticsearch {
return &Elasticsearch{client: http.DefaultClient}
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)}
client := &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
return &Elasticsearch{client: client}
}

// SampleConfig returns sample configuration for this plugin.
Expand Down
7 changes: 5 additions & 2 deletions plugins/inputs/haproxy/haproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,11 @@ func (g *haproxy) Gather(acc telegraf.Accumulator) error {

func (g *haproxy) gatherServer(addr string, acc telegraf.Accumulator) error {
if g.client == nil {

client := &http.Client{}
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)}
client := &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
g.client = client
}

Expand Down
7 changes: 6 additions & 1 deletion plugins/inputs/httpjson/httpjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ func (h *HttpJson) sendRequest(serverURL string) (string, float64, error) {

func init() {
inputs.Add("httpjson", func() telegraf.Input {
return &HttpJson{client: RealHTTPClient{client: &http.Client{}}}
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)}
client := &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
return &HttpJson{client: RealHTTPClient{client: client}}
})
}
12 changes: 11 additions & 1 deletion plugins/inputs/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
Expand Down Expand Up @@ -70,6 +71,15 @@ type point struct {
Values map[string]interface{} `json:"values"`
}

var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}

var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}

// Gathers data from a particular URL
// Parameters:
// acc : The telegraf Accumulator to use
Expand All @@ -81,7 +91,7 @@ func (i *InfluxDB) gatherURL(
acc telegraf.Accumulator,
url string,
) error {
resp, err := http.Get(url)
resp, err := client.Get(url)
if err != nil {
return err
}
Expand Down
8 changes: 7 additions & 1 deletion plugins/inputs/jolokia/jolokia.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
Expand Down Expand Up @@ -160,6 +161,11 @@ func (j *Jolokia) Gather(acc telegraf.Accumulator) error {

func init() {
inputs.Add("jolokia", func() telegraf.Input {
return &Jolokia{jClient: &JolokiaClientImpl{client: &http.Client{}}}
tr := &http.Transport{ResponseHeaderTimeout: time.Duration(3 * time.Second)}
client := &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}
return &Jolokia{jClient: &JolokiaClientImpl{client: client}}
})
}
6 changes: 5 additions & 1 deletion plugins/inputs/mailchimp/chimp_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"
"regexp"
"sync"
"time"
)

const (
Expand Down Expand Up @@ -120,7 +121,10 @@ func (a *ChimpAPI) GetReport(campaignID string) (Report, error) {
}

func runChimp(api *ChimpAPI, params ReportsParams) ([]byte, error) {
client := &http.Client{Transport: api.Transport}
client := &http.Client{
Transport: api.Transport,
Timeout: time.Duration(4 * time.Second),
}

var b bytes.Buffer
req, err := http.NewRequest("GET", api.url.String(), &b)
Expand Down
12 changes: 11 additions & 1 deletion plugins/inputs/mesos/mesos.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
Expand Down Expand Up @@ -261,6 +262,15 @@ func (m *Mesos) removeGroup(j *map[string]interface{}) {
}
}

var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}

var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}

// This should not belong to the object
func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error {
var jsonOut map[string]interface{}
Expand All @@ -282,7 +292,7 @@ func (m *Mesos) gatherMetrics(a string, acc telegraf.Accumulator) error {

ts := strconv.Itoa(m.Timeout) + "ms"

resp, err := http.Get("http://" + a + "/metrics/snapshot?timeout=" + ts)
resp, err := client.Get("http://" + a + "/metrics/snapshot?timeout=" + ts)

if err != nil {
return err
Expand Down
23 changes: 23 additions & 0 deletions plugins/inputs/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package mysql

import (
"database/sql"
"net/url"
"strconv"
"strings"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/influxdata/telegraf"
Expand All @@ -26,6 +28,8 @@ var sampleConfig = `
servers = ["tcp(127.0.0.1:3306)/"]
`

var defaultTimeout = time.Second * time.Duration(5)

func (m *Mysql) SampleConfig() string {
return sampleConfig
}
Expand Down Expand Up @@ -122,6 +126,10 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
serv = ""
}

serv, err := dsnAddTimeout(serv)
if err != nil {
return err
}
db, err := sql.Open("mysql", serv)
if err != nil {
return err
Expand Down Expand Up @@ -207,6 +215,21 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
return nil
}

func dsnAddTimeout(dsn string) (string, error) {
u, err := url.Parse(dsn)
if err != nil {
return "", err
}
v := u.Query()

// Only override timeout if not already defined
if _, ok := v["timeout"]; ok == false {
v.Add("timeout", defaultTimeout.String())
u.RawQuery = v.Encode()
}
return u.String(), nil
}

func init() {
inputs.Add("mysql", func() telegraf.Input {
return &Mysql{}
Expand Down
35 changes: 35 additions & 0 deletions plugins/inputs/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,38 @@ func TestMysqlParseDSN(t *testing.T) {
}
}
}

func TestMysqlDNSAddTimeout(t *testing.T) {
tests := []struct {
input string
output string
}{
{
"",
"?timeout=5s",
},
{
"127.0.0.1",
"127.0.0.1?timeout=5s",
},
{
"tcp(192.168.1.1:3306)/",
"tcp(192.168.1.1:3306)/?timeout=5s",
},
{
"root:passwd@tcp(192.168.1.1:3306)/?tls=false",
"root:passwd@tcp(192.168.1.1:3306)/?timeout=5s&tls=false",
},
{
"root:passwd@tcp(192.168.1.1:3306)/?tls=false&timeout=10s",
"root:passwd@tcp(192.168.1.1:3306)/?tls=false&timeout=10s",
},
}

for _, test := range tests {
output, _ := parseDSN(test.input)
if output != test.output {
t.Errorf("Expected %s, got %s\n", test.output, output)
}
}
}
5 changes: 4 additions & 1 deletion plugins/inputs/nginx/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}

var client = &http.Client{Transport: tr}
var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}

func (n *Nginx) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
resp, err := client.Get(addr.String())
Expand Down
5 changes: 4 additions & 1 deletion plugins/inputs/nsq/nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}

var client = &http.Client{Transport: tr}
var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}

func (n *NSQ) gatherEndpoint(e string, acc telegraf.Accumulator) error {
u, err := buildURL(e)
Expand Down
12 changes: 11 additions & 1 deletion plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"net/http"
"sync"
"time"
)

type Prometheus struct {
Expand Down Expand Up @@ -51,8 +52,17 @@ func (g *Prometheus) Gather(acc telegraf.Accumulator) error {
return outerr
}

var tr = &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}

var client = &http.Client{
Transport: tr,
Timeout: time.Duration(4 * time.Second),
}

func (g *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
resp, err := http.Get(url)
resp, err := client.Get(url)
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
}
Expand Down
Loading

0 comments on commit 7416d6e

Please sign in to comment.