Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add TCP StatsD listener support #71

Merged
merged 14 commits into from
Aug 1, 2017
Merged
237 changes: 139 additions & 98 deletions exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
package main

import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"hash/fnv"
"io"
"net"
"regexp"
"strconv"
Expand Down Expand Up @@ -289,10 +291,6 @@ func NewExporter(mapper *metricMapper, addSuffix bool) *Exporter {
}
}

type StatsDListener struct {
conn *net.UDPConn
}

func buildEvent(statType, metric string, value float64, relative bool, labels map[string]string) (Event, error) {
switch statType {
case "c":
Expand Down Expand Up @@ -321,17 +319,6 @@ func buildEvent(statType, metric string, value float64, relative bool, labels ma
}
}

func (l *StatsDListener) Listen(e chan<- Events) {
buf := make([]byte, 65535)
for {
n, _, err := l.conn.ReadFromUDP(buf)
if err != nil {
log.Fatal(err)
}
l.handlePacket(buf[0:n], e)
}
}

func parseDogStatsDTagsToLabels(component string) map[string]string {
labels := map[string]string{}
networkStats.WithLabelValues("dogstatsd_tags").Inc()
Expand All @@ -351,105 +338,159 @@ func parseDogStatsDTagsToLabels(component string) map[string]string {
return labels
}

func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) {
lines := strings.Split(string(packet), "\n")
events := Events{}
for _, line := range lines {
if line == "" {
continue
}
func lineToEvents(line string) (events Events) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the named return variable here as it adds stutter to the function signature and use explicit returns below. It's not a go idiom to use naked returns in such long functions.

Named returned values should only be used on public funcs and methods
when it contributes to the documentation.

https://go-review.googlesource.com/c/20024/

if line == "" {
return
}

elements := strings.SplitN(line, ":", 2)
if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) {
networkStats.WithLabelValues("malformed_line").Inc()
log.Errorln("Bad line from StatsD:", line)
elements := strings.SplitN(line, ":", 2)
if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) {
networkStats.WithLabelValues("malformed_line").Inc()
log.Errorln("Bad line from StatsD:", line)
return
}
metric := elements[0]
var samples []string
if strings.Contains(elements[1], "|#") {
// using datadog extensions, disable multi-metrics
samples = elements[1:]
} else {
samples = strings.Split(elements[1], ":")
}
samples:
for _, sample := range samples {
components := strings.Split(sample, "|")
samplingFactor := 1.0
if len(components) < 2 || len(components) > 4 {
networkStats.WithLabelValues("malformed_component").Inc()
log.Errorln("Bad component on line:", line)
continue
}
metric := elements[0]
var samples []string
if strings.Contains(elements[1], "|#") {
// using datadog extensions, disable multi-metrics
samples = elements[1:]
} else {
samples = strings.Split(elements[1], ":")
valueStr, statType := components[0], components[1]

var relative = false
if strings.Index(valueStr, "+") == 0 || strings.Index(valueStr, "-") == 0 {
relative = true
}
samples:
for _, sample := range samples {
components := strings.Split(sample, "|")
samplingFactor := 1.0
if len(components) < 2 || len(components) > 4 {
networkStats.WithLabelValues("malformed_component").Inc()
log.Errorln("Bad component on line:", line)
continue
}
valueStr, statType := components[0], components[1]

var relative = false
if strings.Index(valueStr, "+") == 0 || strings.Index(valueStr, "-") == 0 {
relative = true
}
value, err := strconv.ParseFloat(valueStr, 64)
if err != nil {
log.Errorf("Bad value %s on line: %s", valueStr, line)
networkStats.WithLabelValues("malformed_value").Inc()
continue
}

value, err := strconv.ParseFloat(valueStr, 64)
if err != nil {
log.Errorf("Bad value %s on line: %s", valueStr, line)
networkStats.WithLabelValues("malformed_value").Inc()
continue
multiplyEvents := 1
labels := map[string]string{}
if len(components) >= 3 {
for _, component := range components[2:] {
if len(component) == 0 {
log.Errorln("Empty component on line: ", line)
networkStats.WithLabelValues("malformed_component").Inc()
continue samples
}
}

multiplyEvents := 1
labels := map[string]string{}
if len(components) >= 3 {
for _, component := range components[2:] {
if len(component) == 0 {
log.Errorln("Empty component on line: ", line)
networkStats.WithLabelValues("malformed_component").Inc()
continue samples
for _, component := range components[2:] {
switch component[0] {
case '@':
if statType != "c" && statType != "ms" {
log.Errorln("Illegal sampling factor for non-counter metric on line", line)
networkStats.WithLabelValues("illegal_sample_factor").Inc()
continue
}
}

for _, component := range components[2:] {
switch component[0] {
case '@':
if statType != "c" && statType != "ms" {
log.Errorln("Illegal sampling factor for non-counter metric on line", line)
networkStats.WithLabelValues("illegal_sample_factor").Inc()
continue
}
samplingFactor, err = strconv.ParseFloat(component[1:], 64)
if err != nil {
log.Errorf("Invalid sampling factor %s on line %s", component[1:], line)
networkStats.WithLabelValues("invalid_sample_factor").Inc()
}
if samplingFactor == 0 {
samplingFactor = 1
}

if statType == "c" {
value /= samplingFactor
} else if statType == "ms" {
multiplyEvents = int(1 / samplingFactor)
}
case '#':
labels = parseDogStatsDTagsToLabels(component)
default:
log.Errorf("Invalid sampling factor or tag section %s on line %s", components[2], line)
samplingFactor, err = strconv.ParseFloat(component[1:], 64)
if err != nil {
log.Errorf("Invalid sampling factor %s on line %s", component[1:], line)
networkStats.WithLabelValues("invalid_sample_factor").Inc()
continue
}
}
}
if samplingFactor == 0 {
samplingFactor = 1
}

for i := 0; i < multiplyEvents; i++ {
event, err := buildEvent(statType, metric, value, relative, labels)
if err != nil {
log.Errorf("Error building event on line %s: %s", line, err)
networkStats.WithLabelValues("illegal_event").Inc()
if statType == "c" {
value /= samplingFactor
} else if statType == "ms" {
multiplyEvents = int(1 / samplingFactor)
}
case '#':
labels = parseDogStatsDTagsToLabels(component)
default:
log.Errorf("Invalid sampling factor or tag section %s on line %s", components[2], line)
networkStats.WithLabelValues("invalid_sample_factor").Inc()
continue
}
events = append(events, event)
}
networkStats.WithLabelValues("legal").Inc()
}

for i := 0; i < multiplyEvents; i++ {
event, err := buildEvent(statType, metric, value, relative, labels)
if err != nil {
log.Errorf("Error building event on line %s: %s", line, err)
networkStats.WithLabelValues("illegal_event").Inc()
continue
}
events = append(events, event)
}
networkStats.WithLabelValues("legal").Inc()
}
return
}

type StatsDListener struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this StatsDUDPListener for consistency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i thought UDP is primary and TCP is secondary originally.

done rename.

conn *net.UDPConn
}

func (l *StatsDListener) Listen(e chan<- Events) {
buf := make([]byte, 65535)
for {
n, _, err := l.conn.ReadFromUDP(buf)
if err != nil {
log.Fatal(err)
}
l.handlePacket(buf[0:n], e)
}
}

func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) {
lines := strings.Split(string(packet), "\n")
events := Events{}
for _, line := range lines {
events = append(events, lineToEvents(line)...)
}
e <- events
}

type StatsDTCPListener struct {
conn *net.TCPListener
}

func (l *StatsDTCPListener) Listen(e chan<- Events) {
for {
c, err := l.conn.AcceptTCP()
if err != nil {
log.Fatalf("AcceptTCP failed: %v", err)
}
go l.handleConn(c, e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before calling handleconn (or immediately in it) -- we should probably call SetReadBuffer -- to be consistent with UDP

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UDP needs a 64K buffer, since it will recv all data in one packet.
but for TCP, i guess the underlying bufio is manage the buffer for us.

}
}

func (l *StatsDTCPListener) handleConn(c *net.TCPConn, e chan<- Events) {
defer c.Close()

r := bufio.NewReader(c)
for {
line, isPrefix, err := r.ReadLine()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReadBytes (https://golang.org/pkg/bufio/#Reader.ReadBytes) would be a better method to use-- from the docs: ReadLine is a low-level line-reading primitive. Most callers should use ReadBytes('\n') or ReadString('\n') instead or use a Scanner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReadBytes will read till delimiter or eof, so i think ReadLine is more safe for bogus input with really long line, or no line delimiter at all. :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, LGTM then :) Lets merge it!

if err != nil {
if err != io.EOF {
log.Errorf("Read %s failed: %v", c.RemoteAddr(), err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be cool to have metrics for these errors so that operators can set up alerts on misbehaving clients.

}
break
}
if isPrefix {
log.Errorf("Read %s failed: line too long", c.RemoteAddr())
break
}
e <- lineToEvents(string(line))
}
}
61 changes: 48 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ func init() {
var (
listenAddress = flag.String("web.listen-address", ":9102", "The address on which to expose the web interface and generated Prometheus metrics.")
metricsEndpoint = flag.String("web.telemetry-path", "/metrics", "Path under which to expose metrics.")
statsdListenAddress = flag.String("statsd.listen-address", ":9125", "The UDP address on which to receive statsd metric lines.")
statsdListenAddress = flag.String("statsd.listen-address", ":9125", "The UDP/TCP address on which to receive statsd metric lines.")
statsdListenUDP = flag.Bool("statsd.listen-udp", true, "Whether to receive UDP statsd metrics.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of boolean flags and introducing a dependency between flags, it'd be simpler to just have to flags with the addresses for each protocol, like -statsd.listen-address-udp=":9125" -statsd.listen-address-tcp=":9125". An empty value signals disables the server on that protocol.

Copy link
Contributor Author

@jwfang jwfang Jul 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doing it like what you mentioned have two conerns for me:

  1. a bit uncomfortable for setting something to "" to disable it; and since we're default both UDP/TCP to on, use have to explicitly disable it;
  2. the flags changes will break the user interface, from statsd.listen-address to statsd.listen-udp.

i have no strong feeling for 1; and for 2, if that's OK, i will do the flags changes.

EDIT: i think we can also support statsd.listen-address as statsd.listen-udp.

statsdListenTCP = flag.Bool("statsd.listen-tcp", false, "Whether to receive TCP statsd metrics.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be true by default.

mappingConfig = flag.String("statsd.mapping-config", "", "Metric mapping configuration file name.")
readBuffer = flag.Int("statsd.read-buffer", 0, "Size (in bytes) of the operating system's transmit read buffer associated with the UDP connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.")
addSuffix = flag.Bool("statsd.add-suffix", true, "Add the metric type (counter/gauge/timer) as suffix to the generated Prometheus metric (NOT recommended, but set by default for backward compatibility).")
Expand All @@ -55,7 +57,7 @@ func serveHTTP() {
log.Fatal(http.ListenAndServe(*listenAddress, nil))
}

func udpAddrFromString(addr string) *net.UDPAddr {
func ipPortFromString(addr string) (*net.IPAddr, int) {
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
log.Fatal("Bad StatsD listening address", addr)
Expand All @@ -74,13 +76,27 @@ func udpAddrFromString(addr string) *net.UDPAddr {
log.Fatalf("Bad port %s: %s", portStr, err)
}

return ip, port
}

func udpAddrFromString(addr string) *net.UDPAddr {
ip, port := ipPortFromString(addr)
return &net.UDPAddr{
IP: ip.IP,
Port: port,
Zone: ip.Zone,
}
}

func tcpAddrFromString(addr string) *net.TCPAddr {
ip, port := ipPortFromString(addr)
return &net.TCPAddr{
IP: ip.IP,
Port: port,
Zone: ip.Zone,
}
}

func watchConfig(fileName string, mapper *metricMapper) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
Expand Down Expand Up @@ -122,34 +138,53 @@ func main() {
os.Exit(0)
}

if !*statsdListenUDP && !*statsdListenTCP {
log.Fatalln("At least one of UDP/TCP listeners should be specified.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a must then.

}

if *addSuffix {
log.Warnln("Warning: Using -statsd.add-suffix is discouraged. We recommend explicitly naming metrics appropriately in the mapping configuration.")
}

log.Infoln("Starting StatsD -> Prometheus Exporter", version.Info())
log.Infoln("Build context", version.BuildContext())
log.Infoln("Accepting StatsD Traffic on", *statsdListenAddress)
log.Infof("Accepting StatsD Traffic on %s, UDP %v, TCP %v", *statsdListenAddress, *statsdListenUDP, *statsdListenTCP)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not longer printing the address it's listening on, but true/false, but would be fixed with the comment above.

log.Infoln("Accepting Prometheus Requests on", *listenAddress)

go serveHTTP()

events := make(chan Events, 1024)
defer close(events)

listenAddr := udpAddrFromString(*statsdListenAddress)
conn, err := net.ListenUDP("udp", listenAddr)
if err != nil {
log.Fatal(err)
if *statsdListenUDP {
listenAddr := udpAddrFromString(*statsdListenAddress)
conn, err := net.ListenUDP("udp", listenAddr)
if err != nil {
log.Fatal(err)
}

if *readBuffer != 0 {
err = conn.SetReadBuffer(*readBuffer)
if err != nil {
log.Fatal("Error setting UDP read buffer:", err)
}
}

l := &StatsDListener{conn: conn}
go l.Listen(events)
}

if *readBuffer != 0 {
err = conn.SetReadBuffer(*readBuffer)
if *statsdListenTCP {
tcpListenAddr := tcpAddrFromString(*statsdListenAddress)
tconn, err := net.ListenTCP("tcp", tcpListenAddr)
if err != nil {
log.Fatal("Error setting UDP read buffer:", err)
log.Fatal(err)
}
}
defer tconn.Close()

l := &StatsDListener{conn: conn}
go l.Listen(events)
tl := &StatsDTCPListener{conn: tconn}
go tl.Listen(events)
}

mapper := &metricMapper{}
if *mappingConfig != "" {
Expand Down