Skip to content

Commit

Permalink
Utilize timeout in net_response plugin.
Browse files Browse the repository at this point in the history
Also changing the net_response and http_response plugins to only accept
duration strings for their timeout parameters. This is a breaking config
file change.

closes #1214
  • Loading branch information
sparrc committed May 23, 2016
1 parent c6699c3 commit eeec070
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 84 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

### Release Notes

- **Breaking Change**: net_response and http_response plugins timeouts will
now only be accepted as duration strings. ie, "2s" or "500ms".
- Input plugin Gathers will no longer be logged by default, but a Gather for
_each_ plugin will be logged in Debug mode.
- Debug mode will no longer print every point added to the accumulator. This
Expand All @@ -24,6 +26,7 @@ to "stdout".
- [#1228](https://github.com/influxdata/telegraf/pull/1228): Fix service plugin host tag overwrite.
- [#1198](https://github.com/influxdata/telegraf/pull/1198): http_response: override request Host header properly
- [#1230](https://github.com/influxdata/telegraf/issues/1230): Fix Telegraf process hangup due to a single plugin hanging.
- [#1214](https://github.com/influxdata/telegraf/issues/1214): Use TCP timeout argument in net_response plugin.

## v0.13 [2016-05-11]

Expand Down
9 changes: 4 additions & 5 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,15 @@ func (a *Agent) gatherer(
}

gatherWithTimeout(shutdown, input, acc, interval)

elapsed := time.Since(start)
if a.Config.Agent.Debug {
log.Printf("Input [%s] gathered metrics, (%s interval) in %s\n",
input.Name, interval, elapsed)
}

if outerr != nil {
return outerr
}
if a.Config.Agent.Debug {
log.Printf("Input [%s] gathered metrics, (%s interval) in %s\n",
input.Name, interval, elapsed)
}

select {
case <-shutdown:
Expand Down
14 changes: 9 additions & 5 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@
# ## Server address (default http://localhost)
# address = "http://github.com"
# ## Set response_timeout (default 5 seconds)
# response_timeout = 5
# response_timeout = "5s"
# ## HTTP Request Method
# method = "GET"
# ## Whether to follow redirects from the server (defaults to false)
Expand Down Expand Up @@ -946,14 +946,15 @@
# protocol = "tcp"
# ## Server address (default localhost)
# address = "github.com:80"
# ## Set timeout (default 1.0 seconds)
# timeout = 1.0
# ## Set read timeout (default 1.0 seconds)
# read_timeout = 1.0
# ## Set timeout
# timeout = "1s"
#
# ## Optional string sent to the server
# # send = "ssh"
# ## Optional expected string in answer
# # expect = "ssh"
# ## Set read timeout (only used if expecting a response)
# read_timeout = "1s"


# # Read TCP metrics such as established, time wait and sockets counts.
Expand Down Expand Up @@ -1144,6 +1145,9 @@
# ## user as argument for pgrep (ie, pgrep -u <user>)
# # user = "nginx"
#
# ## override for process_name
# ## This is optional; default is sourced from /proc/<pid>/status
# # process_name = "bar"
# ## Field name prefix
# prefix = ""
# ## comment this out if you want raw cpu_time stats
Expand Down
16 changes: 8 additions & 8 deletions plugins/inputs/http_response/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@ This input plugin will test HTTP/HTTPS connections.
### Configuration:

```
# List of UDP/TCP connections you want to check
# HTTP/HTTPS request given an address a method and a timeout
[[inputs.http_response]]
## Server address (default http://localhost)
address = "http://github.com"
## Set response_timeout (default 5 seconds)
response_timeout = 5
response_timeout = "5s"
## HTTP Request Method
method = "GET"
## HTTP Request Headers
[inputs.http_response.headers]
Host = github.com
## Whether to follow redirects from the server (defaults to false)
follow_redirects = true
## HTTP Request Headers (all values must be strings)
# [inputs.http_response.headers]
# Host = "github.com"
## Optional HTTP Request Body
body = '''
{'fake':'data'}
'''
# body = '''
# {'fake':'data'}
# '''
```

### Measurements & Fields:
Expand Down
13 changes: 7 additions & 6 deletions plugins/inputs/http_response/http_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)

Expand All @@ -17,7 +18,7 @@ type HTTPResponse struct {
Address string
Body string
Method string
ResponseTimeout int
ResponseTimeout internal.Duration
Headers map[string]string
FollowRedirects bool
}
Expand All @@ -31,7 +32,7 @@ var sampleConfig = `
## Server address (default http://localhost)
address = "http://github.com"
## Set response_timeout (default 5 seconds)
response_timeout = 5
response_timeout = "5s"
## HTTP Request Method
method = "GET"
## Whether to follow redirects from the server (defaults to false)
Expand All @@ -57,7 +58,7 @@ var ErrRedirectAttempted = errors.New("redirect")
// timeout period and can follow redirects if specified
func CreateHttpClient(followRedirects bool, ResponseTimeout time.Duration) *http.Client {
client := &http.Client{
Timeout: time.Second * ResponseTimeout,
Timeout: ResponseTimeout,
}

if followRedirects == false {
Expand All @@ -73,7 +74,7 @@ func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
// Prepare fields
fields := make(map[string]interface{})

client := CreateHttpClient(h.FollowRedirects, time.Duration(h.ResponseTimeout))
client := CreateHttpClient(h.FollowRedirects, h.ResponseTimeout.Duration)

var body io.Reader
if h.Body != "" {
Expand Down Expand Up @@ -113,8 +114,8 @@ func (h *HTTPResponse) HTTPGather() (map[string]interface{}, error) {
// Gather gets all metric fields and tags and returns any errors it encounters
func (h *HTTPResponse) Gather(acc telegraf.Accumulator) error {
// Set default values
if h.ResponseTimeout < 1 {
h.ResponseTimeout = 5
if h.ResponseTimeout.Duration < time.Second {
h.ResponseTimeout.Duration = time.Second * 5
}
// Check send and expected string
if h.Method == "" {
Expand Down
35 changes: 12 additions & 23 deletions plugins/inputs/net_response/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,30 @@ It can also check response text.
### Configuration:

```
# List of UDP/TCP connections you want to check
[[inputs.net_response]]
protocol = "tcp"
# Server address (default IP localhost)
address = "github.com:80"
# Set timeout (default 1.0)
timeout = 1.0
# Set read timeout (default 1.0)
read_timeout = 1.0
# String sent to the server
send = "ssh"
# Expected string in answer
expect = "ssh"
[[inputs.net_response]]
protocol = "tcp"
address = ":80"
# TCP or UDP 'ping' given url and collect response time in seconds
[[inputs.net_response]]
protocol = "udp"
# Server address (default IP localhost)
## Protocol, must be "tcp" or "udp"
protocol = "tcp"
## Server address (default localhost)
address = "github.com:80"
# Set timeout (default 1.0)
timeout = 1.0
# Set read timeout (default 1.0)
read_timeout = 1.0
# String sent to the server
## Set timeout
timeout = "1s"
## Optional string sent to the server
send = "ssh"
# Expected string in answer
## Optional expected string in answer
expect = "ssh"
## Set read timeout (only used if expecting a response)
read_timeout = "1s"
[[inputs.net_response]]
protocol = "udp"
address = "localhost:161"
timeout = 2.0
timeout = "2s"
```

### Measurements & Fields:
Expand Down
70 changes: 33 additions & 37 deletions plugins/inputs/net_response/net_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)

// NetResponses struct
type NetResponse struct {
Address string
Timeout float64
ReadTimeout float64
Timeout internal.Duration
ReadTimeout internal.Duration
Send string
Expect string
Protocol string
Expand All @@ -31,29 +32,28 @@ var sampleConfig = `
protocol = "tcp"
## Server address (default localhost)
address = "github.com:80"
## Set timeout (default 1.0 seconds)
timeout = 1.0
## Set read timeout (default 1.0 seconds)
read_timeout = 1.0
## Set timeout
timeout = "1s"
## Optional string sent to the server
# send = "ssh"
## Optional expected string in answer
# expect = "ssh"
## Set read timeout (only used if expecting a response)
read_timeout = "1s"
`

func (_ *NetResponse) SampleConfig() string {
return sampleConfig
}

func (t *NetResponse) TcpGather() (map[string]interface{}, error) {
func (n *NetResponse) TcpGather() (map[string]interface{}, error) {
// Prepare fields
fields := make(map[string]interface{})
// Start Timer
start := time.Now()
// Resolving
tcpAddr, err := net.ResolveTCPAddr("tcp", t.Address)
// Connecting
conn, err := net.DialTCP("tcp", nil, tcpAddr)
conn, err := net.DialTimeout("tcp", n.Address, n.Timeout.Duration)
// Stop timer
responseTime := time.Since(start).Seconds()
// Handle error
Expand All @@ -62,17 +62,16 @@ func (t *NetResponse) TcpGather() (map[string]interface{}, error) {
}
defer conn.Close()
// Send string if needed
if t.Send != "" {
msg := []byte(t.Send)
if n.Send != "" {
msg := []byte(n.Send)
conn.Write(msg)
conn.CloseWrite()
// Stop timer
responseTime = time.Since(start).Seconds()
}
// Read string if needed
if t.Expect != "" {
if n.Expect != "" {
// Set read timeout
conn.SetReadDeadline(time.Now().Add(time.Duration(t.ReadTimeout) * time.Second))
conn.SetReadDeadline(time.Now().Add(n.ReadTimeout.Duration))
// Prepare reader
reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader)
Expand All @@ -85,7 +84,7 @@ func (t *NetResponse) TcpGather() (map[string]interface{}, error) {
fields["string_found"] = false
} else {
// Looking for string in answer
RegEx := regexp.MustCompile(`.*` + t.Expect + `.*`)
RegEx := regexp.MustCompile(`.*` + n.Expect + `.*`)
find := RegEx.FindString(string(data))
if find != "" {
fields["string_found"] = true
Expand All @@ -99,27 +98,24 @@ func (t *NetResponse) TcpGather() (map[string]interface{}, error) {
return fields, nil
}

func (u *NetResponse) UdpGather() (map[string]interface{}, error) {
func (n *NetResponse) UdpGather() (map[string]interface{}, error) {
// Prepare fields
fields := make(map[string]interface{})
// Start Timer
start := time.Now()
// Resolving
udpAddr, err := net.ResolveUDPAddr("udp", u.Address)
LocalAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
// Connecting
conn, err := net.DialUDP("udp", LocalAddr, udpAddr)
conn, err := net.DialTimeout("udp", n.Address, n.Timeout.Duration)
defer conn.Close()
// Handle error
if err != nil {
return nil, err
}
// Send string
msg := []byte(u.Send)
msg := []byte(n.Send)
conn.Write(msg)
// Read string
// Set read timeout
conn.SetReadDeadline(time.Now().Add(time.Duration(u.ReadTimeout) * time.Second))
conn.SetReadDeadline(time.Now().Add(n.ReadTimeout.Duration))
// Read
buf := make([]byte, 1024)
_, _, err = conn.ReadFromUDP(buf)
Expand All @@ -130,7 +126,7 @@ func (u *NetResponse) UdpGather() (map[string]interface{}, error) {
return nil, err
} else {
// Looking for string in answer
RegEx := regexp.MustCompile(`.*` + u.Expect + `.*`)
RegEx := regexp.MustCompile(`.*` + n.Expect + `.*`)
find := RegEx.FindString(string(buf))
if find != "" {
fields["string_found"] = true
Expand All @@ -142,28 +138,28 @@ func (u *NetResponse) UdpGather() (map[string]interface{}, error) {
return fields, nil
}

func (c *NetResponse) Gather(acc telegraf.Accumulator) error {
func (n *NetResponse) Gather(acc telegraf.Accumulator) error {
// Set default values
if c.Timeout == 0 {
c.Timeout = 1.0
if n.Timeout.Duration == 0 {
n.Timeout.Duration = time.Second
}
if c.ReadTimeout == 0 {
c.ReadTimeout = 1.0
if n.ReadTimeout.Duration == 0 {
n.ReadTimeout.Duration = time.Second
}
// Check send and expected string
if c.Protocol == "udp" && c.Send == "" {
if n.Protocol == "udp" && n.Send == "" {
return errors.New("Send string cannot be empty")
}
if c.Protocol == "udp" && c.Expect == "" {
if n.Protocol == "udp" && n.Expect == "" {
return errors.New("Expected string cannot be empty")
}
// Prepare host and port
host, port, err := net.SplitHostPort(c.Address)
host, port, err := net.SplitHostPort(n.Address)
if err != nil {
return err
}
if host == "" {
c.Address = "localhost:" + port
n.Address = "localhost:" + port
}
if port == "" {
return errors.New("Bad port")
Expand All @@ -172,11 +168,11 @@ func (c *NetResponse) Gather(acc telegraf.Accumulator) error {
tags := map[string]string{"server": host, "port": port}
var fields map[string]interface{}
// Gather data
if c.Protocol == "tcp" {
fields, err = c.TcpGather()
if n.Protocol == "tcp" {
fields, err = n.TcpGather()
tags["protocol"] = "tcp"
} else if c.Protocol == "udp" {
fields, err = c.UdpGather()
} else if n.Protocol == "udp" {
fields, err = n.UdpGather()
tags["protocol"] = "udp"
} else {
return errors.New("Bad protocol")
Expand Down

0 comments on commit eeec070

Please sign in to comment.