diff --git a/.gitignore b/.gitignore index a473671..b3a18c4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ .goxc.json .goxc.local.json + +# output binary +kafka-statsd diff --git a/Dockerfile b/Dockerfile index 499ea4b..d2487d7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,10 @@ -FROM golang:onbuild +FROM golang:1.12 AS build -ENTRYPOINT ["/go/bin/app"] +WORKDIR /app +COPY go.mod go.sum main.go ./ +RUN go build -v + +FROM gcr.io/distroless/base +COPY --from=build /app/kafka-statsd kafka-statsd +ENTRYPOINT ["/kafka-statsd"] CMD ["--help"] diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..b303a08 --- /dev/null +++ b/go.mod @@ -0,0 +1,12 @@ +module github.com/travisjeffery/kafka-statsd + +require ( + github.com/Shopify/sarama v1.21.0 + github.com/alecthomas/kingpin v2.2.6+incompatible + github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect + github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect + github.com/pkg/errors v0.8.1 + github.com/segmentio/go-log v1.9.0 + github.com/stretchr/testify v1.3.0 // indirect + gopkg.in/alexcesaro/statsd.v2 v2.0.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..2516c76 --- /dev/null +++ b/go.sum @@ -0,0 +1,38 @@ +github.com/DataDog/zstd v1.3.5 h1:DtpNbljikUepEPD16hD4LvIcmhnhdLTiW/5pHgbmp14= +github.com/DataDog/zstd v1.3.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= +github.com/Shopify/sarama v1.21.0 h1:0GKs+e8mn1RRUzfg9oUXv3v7ZieQLmOZF/bfnmmGhM8= +github.com/Shopify/sarama v1.21.0/go.mod h1:yuqtN/pe8cXRWG5zPaO7hCfNJp5MwmkoJEoLjkm5tCQ= +github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/alecthomas/kingpin v2.2.6+incompatible h1:5svnBTFgJjZvGKyYBtMB0+m5wvrbUHiqye8wRJMlnYI= +github.com/alecthomas/kingpin v2.2.6+incompatible/go.mod h1:59OFYbFVLKQKq+mqrL6Rw5bR0c3ACQaawgXx0QYndlE= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU= +github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= +github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/segmentio/go-log v1.9.0 h1:1dhZF9aIsQG3sK2To2l41bWcipIqPa+mXMEvc0Yx9zk= +github.com/segmentio/go-log v1.9.0/go.mod h1:OaWNxWOTRpA/fIXTfr2FmimINNi+0MyArbeI1FLjOkY= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVCwMc= +gopkg.in/alexcesaro/statsd.v2 v2.0.0/go.mod h1:i0ubccKGzBVNBpdGV5MocxyA/XlLUJzA7SLonnE4drU= diff --git a/main.go b/main.go index 08f505e..fc03875 100644 --- a/main.go +++ b/main.go @@ -4,113 +4,424 @@ import ( "fmt" "os" "os/signal" - "strings" + "strconv" "time" + "strings" + "sync" + "github.com/Shopify/sarama" - "github.com/quipo/statsd" + "github.com/alecthomas/kingpin" + "github.com/pkg/errors" "github.com/segmentio/go-log" - "github.com/wvanbergen/kazoo-go" - "gopkg.in/alecthomas/kingpin.v2" + "gopkg.in/alexcesaro/statsd.v2" ) var ( - zkAddrs = kingpin.Flag("zookeeper-addrs", "Zookeeper addresses (e.g. host1:2181,host2:2181)").Short('z').String() - statsdAddr = kingpin.Flag("statsd-addr", "Statsd address").Short('s').String() - statsdPrefix = kingpin.Flag("statsd-prefix", "Statsd prefix").Short('p').String() - interval = kingpin.Flag("refresh-interval", "Interval to refresh offset lag in seconds").Short('i').Default("5").Int() - useTags = kingpin.Flag("use-tags", "Use tags if your StatsD client supports them (like DataDog and InfluxDB)").Default("false").Bool() - includeTags = kingpin.Flag("include-tags", "Tags to include, if you want to include a host name or datacenter for example.").Strings() + brokers = kingpin.Flag("broker", "A kafka broker to connect to. Specify multiple times for multiple brokers. (e.g. host1:9092)").HintOptions("host1:9092").Short('b').Envar("KSTATSD_BROKERS").Required().Strings() + statsdAddr = kingpin.Flag("statsd-addr", "Statsd address").Short('s').Default("127.0.0.1").Envar("KSTATSD_STATSD_ADDR").String() + statsdPort = kingpin.Flag("statsd-port", "Statsd port").Short('P').Default("8125").Envar("KSTATSD_STATSD_PORT").String() + statsdPrefix = kingpin.Flag("statsd-prefix", "Statsd prefix").Short('p').Envar("KSTATSD_STATSD_PREFIX").String() + interval = kingpin.Flag("refresh-interval", "Interval to refresh offset lag in seconds").Short('i').Default("5").Envar("KSTATSD_INTERVAL").Int() + tagType = kingpin.Flag("tag-format", "Format to use when encoding tags (Options: none, influxdb, datadog)").HintOptions(statsdTagOptionsEnum()...).Default("none").Envar("KSTATSD_USE_TAGS").Enum(statsdTagOptionsEnum()...) + includeTags = kingpin.Flag("tag", "Tags to include. Specify multiple times for multiple tags. (e.g. tagname:value)").HintOptions("tagname:value").Envar("KSTATSD_TAGS").Strings() ) -func main() { - kingpin.Parse() +var statsdTagFormat = map[string]statsd.TagFormat{ + "influxdb": statsd.InfluxDB, + "datadog": statsd.Datadog, + "none": 0, +} - statsdClient := statsd.NewStatsdClient(*statsdAddr, *statsdPrefix) - err := statsdClient.CreateSocket() - if err != nil { - log.Error("%s", err) - return +func statsdTagOptionsEnum() []string { + res := make([]string, 0, len(statsdTagFormat)) + for k := range statsdTagFormat { + res = append(res, k) } - stats := statsd.NewStatsdBuffer(time.Second, statsdClient) - defer stats.Close() + return res +} - var zookeeperNodes []string - zookeeperNodes, chroot := kazoo.ParseConnectionString(*zkAddrs) +func newStatsdClient() (*statsd.Client, error) { + tags := make([]string, 0, len(*includeTags)*2) + for _, tag := range *includeTags { + splitTag := strings.SplitN(tag, ":", 2) + tags = append(tags, splitTag...) + } - var kz *kazoo.Kazoo - conf := kazoo.NewConfig() - conf.Chroot = chroot - if kz, err = kazoo.NewKazoo(zookeeperNodes, conf); err != nil { - log.Error("%s", err) - return + opts := []statsd.Option{ + statsd.Address(strings.Join([]string{*statsdAddr, *statsdPort}, ":")), + statsd.ErrorHandler(func(err error) { + log.Error("Statsd error: %s", err) + }), + } + + if *statsdPrefix != "" { + opts = append(opts, statsd.Prefix(*statsdPrefix)) + } + + if len(tags) > 0 { + opts = append(opts, statsd.Tags(tags...)) + } + + tagFormat := statsdTagFormat[*tagType] + if tagFormat != 0 { + opts = append(opts, statsd.TagsFormat(tagFormat)) } - defer kz.Close() - brokers, err := kz.BrokerList() + return statsd.New(opts...) +} + +func isTaggedReporting() bool { + // There's probably a better way to do this than string comparison + return *tagType != "none" +} + +func main() { + kingpin.Parse() + + statsdClient, err := newStatsdClient() if err != nil { - log.Error("%s", err) + log.Error("Error creating statsd client: %s", err) return } + defer statsdClient.Close() - client, err := sarama.NewClient(brokers, nil) + client, err := sarama.NewClient(*brokers, nil) if err != nil { - log.Error("%s, err") + log.Error("Error connecting to Kafka (client): %s", err) return } - defer client.Close() + defer func() { + err := client.Close() + if err != nil { + log.Error("Error closing kafka client connection: %s", err) + } + }() - consumerGroupList, err := kz.Consumergroups() + config := sarama.NewConfig() + config.Version = sarama.V2_1_0_0 + admin, err := sarama.NewClusterAdmin(*brokers, config) if err != nil { - log.Error("%s", err) + log.Error("Error connecting to Kafka (admin): %s", err) return } + defer func() { + err := admin.Close() + if err != nil { + log.Error("Error closing kafka admin connection: %s", err) + } + }() ticker := time.NewTicker(time.Duration(*interval) * time.Second) signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) + log.Info("Starting consumer offset daemon") + for { select { case <-ticker.C: - log.Info("Refreshing offset lag") - - for _, cg := range consumerGroupList { - offsets, err := cg.FetchAllOffsets() - if err != nil { - log.Error("%s", err) - return - } - for topic, m := range offsets { - for partitionID, cgOffset := range m { - tOffset, err := client.GetOffset(topic, partitionID, sarama.OffsetNewest) - if err != nil { - log.Error("%s", err) - return - } - lag := tOffset - cgOffset - - log.Info("Topic: %s, Partition: %d, Consumer Group: %s, Lag: %d", topic, partitionID, cg.Name, lag) - if *useTags { - var tags []string - tags = append(tags, "topic="+topic) - tags = append(tags, fmt.Sprintf("partition=%d", partitionID)) - tags = append(tags, "consumer_group="+cg.Name) - if includeTags != nil { - for _, t := range *includeTags { - tags = append(tags, t) - } - } - stats.Gauge(fmt.Sprintf("consumer_lag,%s", strings.Join(tags, ",")), lag) - } else { - stats.Gauge(fmt.Sprintf("topic.%s.partition.%d.consumer_group.%s.lag", topic, partitionID, cg.Name), lag) - } - } - } + log.Info("Refetching consumer offset lag") + err := refreshAndReportMetrics(statsdClient, client, admin) + if err != nil { + log.Error("Error occurred while refreshing cluster lag: %s", err) } + case <-signals: log.Info("Got interrupt signal, exiting.") return } } } + +func refreshAndReportMetrics(statsdClient *statsd.Client, client sarama.Client, admin sarama.ClusterAdmin) error { + // No need to thread statsd client interaction, the statsd client does buffered/batch sending for us + + clusterState, err := collectClusterState(client, admin) + if err != nil { + return errors.Wrap(err, "getting consumer groups") + } + + for topic, parts := range clusterState.TopicOffsets { + log.Debug("Reporting offsets for topic %s (parts: %v)", topic, parts) + for partition, position := range parts { + stats := statsdClient.Clone( + statsd.Tags("topic", topic), + statsd.Tags("partition", strconv.FormatInt(int64(partition), 10)), + ) + if isTaggedReporting() { + stats.Gauge("partition.offset", position) + } else { + key := fmt.Sprintf("topic.%s.partition.%d.offset", topic, partition) + stats.Gauge(key, position) + } + } + } + + for group, topicMap := range clusterState.ConsumerOffsets { + log.Debug("Reporting offsets for consumer group %s (parts :%v)", group, topicMap) + + for topic, partitionMap := range topicMap { + for partition, consumerOffset := range partitionMap { + stats := statsdClient.Clone( + statsd.Tags("topic", topic), + statsd.Tags("partition", strconv.FormatInt(int64(partition), 10)), + statsd.Tags("consumer_group", group), + ) + + topicOffset := clusterState.TopicOffsets[topic][partition] + lag := topicOffset - consumerOffset + + if isTaggedReporting() { + stats.Gauge("consumer.offset", consumerOffset) + stats.Gauge("consumer.lag", lag) + } else { + offsetKey := fmt.Sprintf("topic.%s.partition.%d.consumer_group.%s.offset", topic, partition, group) + stats.Gauge(offsetKey, consumerOffset) + + lagKey := fmt.Sprintf("topic.%s.partition.%d.consumer_group.%s.offset", topic, partition, group) + stats.Gauge(lagKey, lag) + } + } + } + } + + return nil +} + +func getConsumerGroups(admin sarama.ClusterAdmin) ([]string, error) { + cgMap, err := admin.ListConsumerGroups() + if err != nil { + return nil, errors.Wrap(err, "ListConsumerGroups") + } + consumerGroups := make([]string, 0, len(cgMap)) + for k := range cgMap { + // special kafka topic that tracks consumer offsets + if k != "__consumer_offsets" { + consumerGroups = append(consumerGroups, k) + } + } + return consumerGroups, nil +} + +func getConsumerGroupOffsets(admin sarama.ClusterAdmin, partitions map[string][]int32, groups []string) (map[string]map[string]map[int32]int64, error) { + type res struct { + group string + value *sarama.OffsetFetchResponse + err error + } + responses := make(chan *res, len(groups)) + var wg sync.WaitGroup + + for _, group := range groups { + wg.Add(1) + go func(grp string) { + defer wg.Done() + response, err := admin.ListConsumerGroupOffsets(grp, partitions) + log.Debug("admin.ListConsumerGroups(%s, nil) = %v, %v", grp, response, err) + responses <- &res{ + group: grp, + value: response, + err: err, + } + }(group) + } + + wg.Wait() + close(responses) + + // Map consumer group -> topic -> partition -> comnsumer offset + result := make(map[string]map[string]map[int32]int64, len(groups)) + for r := range responses { + if r.err != nil { + return nil, r.err + } + + topicMap, ok := result[r.group] + if !ok { + topicMap = make(map[string]map[int32]int64, len(r.value.Blocks)) + result[r.group] = topicMap + } + + for topic, block := range r.value.Blocks { + partitionMap := make(map[int32]int64, len(block)) + topicMap[topic] = partitionMap + for partitionId, offsetBlock := range block { + partitionMap[partitionId] = offsetBlock.Offset + } + } + } + + return result, nil +} + +func getOffsetsFromTopicAndPartitions(client sarama.Client, topic string, partitions []int32) (map[int32]int64, error) { + result := map[int32]int64{} + + type response struct { + partition int32 + offset int64 + err error + } + + ch := make(chan *response, len(partitions)) + var wg sync.WaitGroup + + for _, partition := range partitions { + wg.Add(1) + go func(partition int32) { + defer wg.Done() + offset, err := client.GetOffset(topic, partition, sarama.OffsetNewest) + ch <- &response{ + partition: partition, + offset: offset, + err: err, + } + }(partition) + } + + wg.Wait() + close(ch) + + var lastErr error + for res := range ch { + if res.err != nil { + lastErr = res.err + } else { + result[res.partition] = res.offset + } + } + + if lastErr != nil { + return nil, errors.Wrap(lastErr, "GetOffset") + } + + return result, nil +} + +func getTopicPartitionOffsets(client sarama.Client, topics map[string][]int32) (map[string]map[int32]int64, error) { + type response struct { + topic string + pts map[int32]int64 + err error + } + ch := make(chan *response, len(topics)) + var wg sync.WaitGroup + wg.Add(len(topics)) + + for topic, partitions := range topics { + go func(topic string, partitions []int32) { + defer wg.Done() + parts, err := getOffsetsFromTopicAndPartitions(client, topic, partitions) + ch <- &response{ + topic: topic, + pts: parts, + err: err, + } + }(topic, partitions) + } + + wg.Wait() + close(ch) + + result := make(map[string]map[int32]int64, len(topics)) + for res := range ch { + if res.err != nil { + return nil, res.err + } + result[res.topic] = res.pts + } + return result, nil +} + +func getTopicPartitions(client sarama.Client, topics []string) (map[string][]int32, error) { + type response struct { + topic string + pts []int32 + err error + } + ch := make(chan *response, len(topics)) + var wg sync.WaitGroup + wg.Add(len(topics)) + + for _, topic := range topics { + go func(topic string) { + defer wg.Done() + pts, err := client.Partitions(topic) + ch <- &response{ + topic: topic, + pts: pts, + err: err, + } + }(topic) + } + + wg.Wait() + close(ch) + + partitions := make(map[string][]int32, len(topics)) + for res := range ch { + if res.err != nil { + return nil, res.err + } + partitions[res.topic] = res.pts + } + return partitions, nil +} + +// ClusterState is a snapshot recording of the current kafka cluster's state +type ClusterState struct { + // Mapping of consumer groups to topics to partitions to consumer offsets + ConsumerOffsets map[string]map[string]map[int32]int64 + + // Map of topics to partitions to latest partition offset + TopicOffsets map[string]map[int32]int64 +} + +// NewClusterState gathers the current state about consumers and topics in the cluster +func collectClusterState(client sarama.Client, admin sarama.ClusterAdmin) (*ClusterState, error) { + topics, err := client.Topics() + if err != nil { + return nil, errors.Wrap(err, "Topics") + } + + partitions, err := getTopicPartitions(client, topics) + if err != nil { + return nil, errors.Wrap(err, "getTopicPartitions") + } + + var wg sync.WaitGroup + wg.Add(2) + + cs := &ClusterState{} + go func() { + defer wg.Done() + var cgs []string + var e error + cgs, e = getConsumerGroups(admin) + if e != nil { + err = errors.Wrap(e, "getConsumerGroups") + return + } + cs.ConsumerOffsets, e = getConsumerGroupOffsets(admin, partitions, cgs) + if e != nil { + err = errors.Wrap(e, "getConsumerGroupOffsets") + } + }() + + go func() { + defer wg.Done() + var e error + cs.TopicOffsets, e = getTopicPartitionOffsets(client, partitions) + if e != nil { + err = errors.Wrap(e, "getTopicPartitionOffsets") + } + }() + wg.Wait() + + if err != nil { + return nil, errors.Wrap(err, "NewClusterState") + } + + return cs, nil +}