From 8d1a55203ed13ec94dc4fb226000631039b2272a Mon Sep 17 00:00:00 2001 From: Soren Mathiasen Date: Fri, 16 Mar 2018 10:43:51 +0200 Subject: [PATCH 1/2] Add scraping for Prometheus endpoint in Kubernetes This will allow users to add Prometheus annotations in services in Kubernetes, and have telegraf scan for them and add them to the list of endpoints to collect metrics from --- Godeps | 16 +++ plugins/inputs/prometheus/README.md | 16 +++ plugins/inputs/prometheus/kubernetes.go | 116 +++++++++++++++++++ plugins/inputs/prometheus/kubernetes_test.go | 83 +++++++++++++ plugins/inputs/prometheus/prometheus.go | 54 +++++++-- plugins/inputs/prometheus/prometheus_test.go | 4 + 6 files changed, 279 insertions(+), 10 deletions(-) create mode 100644 plugins/inputs/prometheus/kubernetes.go create mode 100644 plugins/inputs/prometheus/kubernetes_test.go diff --git a/Godeps b/Godeps index 4b5dd401221ef..cd08b6a28e905 100644 --- a/Godeps +++ b/Godeps @@ -1,4 +1,8 @@ collectd.org 2ce144541b8903101fb8f1483cc0497a68798122 +github.com/PuerkitoBio/purell fd18e053af8a4ff11039269006e8037ff374ce0e +github.com/PuerkitoBio/urlesc de5bf2ad457846296e2031421a34e2568e304e35 +github.com/Shopify/sarama 3b1b38866a79f06deddf0487d5c27ba0697ccd65 +github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985 github.com/apache/thrift 4aaa92ece8503a6da9bc6701604f69acf2b99d07 @@ -17,15 +21,22 @@ github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 github.com/eapache/go-xerial-snappy bb955e01b9346ac19dc29eb16586c90ded99a98c github.com/eapache/queue 44cc805cf13205b55f69e14bcb69867d1ae92f98 github.com/eclipse/paho.mqtt.golang aff15770515e3c57fc6109da73d42b0d46f7f483 +github.com/emicklei/go-restful 2dd44038f0b95ae693b266c5f87593b5d2fdd78d github.com/go-logfmt/logfmt 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 +github.com/go-openapi/jsonpointer 779f45308c19820f1a69e9a4cd965f496e0da10f +github.com/go-openapi/jsonreference 36d33bfe519efae5632669801b180bf1a245da3b +github.com/go-openapi/spec a4fa9574c7aa73b2fc54e251eb9524d0482bb592 +github.com/go-openapi/swag cf0bdb963811675a4d7e74901cefc7411a1df939 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 github.com/gobwas/glob bea32b9cd2d6f55753d94a28e959b13f0244797a github.com/go-ini/ini 9144852efba7c4daf409943ee90767da62d55438 github.com/gogo/protobuf 7b6c6391c4ff245962047fc1e2c6e08b1cdfa0e8 +github.com/golang/glog 23def4e6c14b4da8ac2ed8007337bc5eb5007998 github.com/golang/protobuf 8ee79997227bf9b34611aee7946ae64735e6fd93 github.com/golang/snappy 7db9049039a047d955fe8c19b83c8ff5abd765c7 github.com/go-ole/go-ole be49f7c07711fcb603cff39e1de7c67926dc0ba7 github.com/google/go-cmp f94e52cad91c65a63acc1e75d4be223ea22e99bc +github.com/google/gofuzz 24818f796faf91cd76ec7bddd72458fbced7a6c1 github.com/gorilla/mux 392c28fe23e1c45ddba891b0320b3b5df220beea github.com/go-redis/redis 73b70592cdaa9e6abdfcfbf97b4a90d80728c836 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 @@ -39,6 +50,7 @@ github.com/jmespath/go-jmespath bd40a432e4c76585ef6b72d3fd96fb9b6dc7b68d github.com/kardianos/osext c2c54e542fb797ad986b31721e1baedf214ca413 github.com/kardianos/service 6d3a0ee7d3425d9d835debc51a0ca1ffa28f4893 github.com/kballard/go-shellquote d8ec1a69a250a17bb0e419c386eac1f3711dc142 +github.com/mailru/easyjson 5f62e4f3afa2f576dc86531b7df4d966b19ef8f8 github.com/matttproud/golang_protobuf_extensions c12348ce28de40eed0136aa2b644d0ee0650e56c github.com/Microsoft/go-winio ce2922f643c8fd76b46cadc7f404a06282678b34 github.com/miekg/dns 99f84ae56e75126dd77e5de4fae2ea034a468ca1 @@ -93,3 +105,7 @@ gopkg.in/mgo.v2 3f83fa5005286a7fe593b055f0d7771a7dce4655 gopkg.in/olivere/elastic.v5 3113f9b9ad37509fe5f8a0e5e91c96fdc4435e26 gopkg.in/tomb.v1 dd632973f1e7218eb1089048e0798ec9ae7dceb8 gopkg.in/yaml.v2 4c78c975fe7c825c6d1466c42be594d1d6f3aba6 +k8s.io/api 5584376ceeffeb13a2e98b5e9f0e9dab37de4bab +k8s.io/apimachinery 18a564baac720819100827c16fdebcadb05b2d0d +k8s.io/client-go a3c9a30b35fc56f709f4428827b807c90af99fc3 +k8s.io/kube-openapi 39a7bf85c140f972372c2a0d1ee40adbf0c8bfe1 \ No newline at end of file diff --git a/plugins/inputs/prometheus/README.md b/plugins/inputs/prometheus/README.md index ac7405014a8ce..66bfb681a3c55 100644 --- a/plugins/inputs/prometheus/README.md +++ b/plugins/inputs/prometheus/README.md @@ -14,6 +14,12 @@ in Prometheus format. ## An array of Kubernetes services to scrape metrics from. # kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"] + # Scrape Kubernetes service for prometheus annotations. + # prometheus.io/scrape: Enable scraping for this service + # prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. + # prometheus.io/port: If port is not 9102 use this annotation + # kubernetes_scraping = true + ## Use bearer token for authorization # bearer_token = /path/to/bearer/token @@ -37,6 +43,16 @@ by looking up all A records assigned to the hostname as described in This method can be used to locate all [Kubernetes headless services](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services). +#### Kubernetes scraping + +Enabling this option will allow the plugin to scrape for prometheus annotation on Kubernetes +pods. +Currently the following annotation are supported: + +* `prometheus.io/scrape` Enable scraping for this pod +* `prometheus.io/path` Override the path for the metrics endpoint on the service. (default metrics). +* `prometheus.io/port` Used to override the port, the default value is 9102 + #### Bearer Token If set, the file specified by the `bearer_token` parameter will be read on diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go new file mode 100644 index 0000000000000..b30221c54f308 --- /dev/null +++ b/plugins/inputs/prometheus/kubernetes.go @@ -0,0 +1,116 @@ +package prometheus + +import ( + "fmt" + "log" + "strings" + "time" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/rest" +) + +func start(p *Prometheus) error { + config, err := rest.InClusterConfig() + if err != nil { + return err + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return err + } + watchlist := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceAll, fields.Everything()) + _, controller := cache.NewInformer( + watchlist, + &v1.Pod{}, + time.Second*0, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*v1.Pod) + registerPod(pod, p) + }, + DeleteFunc: func(obj interface{}) { + pod := obj.(*v1.Pod) + unregisterPod(pod, p) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + podPod := oldObj.(*v1.Pod) + newPod := newObj.(*v1.Pod) + unregisterPod(podPod, p) + registerPod(newPod, p) + }, + }, + ) + + go controller.Run(wait.NeverStop) + return nil +} + +func registerPod(pod *v1.Pod, p *Prometheus) { + url := scrapeURL(pod) + if url != nil { + log.Printf("Will scrape metrics from %v\n", *url) + p.lock.Lock() + // add annotation as metrics tags + tags := pod.GetAnnotations() + tags["pod_name"] = pod.Name + tags["namespace"] = pod.Namespace + // add labels as metrics tags + for k, v := range pod.GetLabels() { + tags[k] = v + } + p.KubernetesPods = append(p.KubernetesPods, Target{url: *url, tags: tags}) + p.lock.Unlock() + } +} + +func scrapeURL(pod *v1.Pod) *string { + scrape := pod.ObjectMeta.Annotations["prometheus.io/scrape"] + if pod.Status.PodIP == "" { + // return as if scrape was disabled, we will be notified again once the pod + // has an IP + return nil + } + if scrape == "true" { + path := pod.ObjectMeta.Annotations["prometheus.io/path"] + port := pod.ObjectMeta.Annotations["prometheus.io/port"] + if port == "" { + port = "9102" // default + } + if path == "" { + path = "/metrics" + } + if !strings.HasPrefix(path, "/") { + path = "/" + path + } + + ip := pod.Status.PodIP + x := fmt.Sprintf("http://%v:%v%v", ip, port, path) + return &x + } + return nil +} + +func unregisterPod(pod *v1.Pod, p *Prometheus) { + url := scrapeURL(pod) + if url != nil { + p.lock.Lock() + defer p.lock.Unlock() + log.Printf("Registred a delete request for %v in namespace '%v'\n", pod.Name, pod.Namespace) + var result []Target + for _, v := range p.KubernetesPods { + if v.url != *url { + result = append(result, v) + } else { + log.Printf("Will stop scraping for %v\n", *url) + } + + } + p.KubernetesPods = result + } +} diff --git a/plugins/inputs/prometheus/kubernetes_test.go b/plugins/inputs/prometheus/kubernetes_test.go new file mode 100644 index 0000000000000..2e3caba38e06e --- /dev/null +++ b/plugins/inputs/prometheus/kubernetes_test.go @@ -0,0 +1,83 @@ +package prometheus + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestScrapeURLNoAnnotations(t *testing.T) { + p := &v1.Pod{} + p.Annotations = map[string]string{} + url := scrapeURL(p) + assert.Nil(t, url) +} +func TestScrapeURLAnnotationsNoScrape(t *testing.T) { + p := &v1.Pod{} + p.Name = "myPod" + p.Annotations = map[string]string{"prometheus.io/scrape": "false"} + url := scrapeURL(p) + assert.Nil(t, url) +} +func TestScrapeURLAnnotations(t *testing.T) { + p := pod() + p.Annotations = map[string]string{"prometheus.io/scrape": "true"} + url := scrapeURL(p) + assert.Equal(t, "http://127.0.0.1:9102/metrics", *url) +} +func TestScrapeURLAnnotationsCustomPort(t *testing.T) { + p := pod() + p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"} + url := scrapeURL(p) + assert.Equal(t, "http://127.0.0.1:9000/metrics", *url) +} +func TestScrapeURLAnnotationsCustomPath(t *testing.T) { + p := pod() + p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"} + url := scrapeURL(p) + assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) +} + +func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) { + p := pod() + p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"} + url := scrapeURL(p) + assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) +} + +func TestAddPod(t *testing.T) { + prom := &Prometheus{lock: &sync.Mutex{}} + p := pod() + p.Annotations = map[string]string{"prometheus.io/scrape": "true"} + registerPod(p, prom) + assert.Equal(t, 1, len(prom.KubernetesPods)) +} +func TestAddMultiplePods(t *testing.T) { + prom := &Prometheus{lock: &sync.Mutex{}} + + p := pod() + p.Annotations = map[string]string{"prometheus.io/scrape": "true"} + registerPod(p, prom) + p.Name = "Pod2" + registerPod(p, prom) + assert.Equal(t, 2, len(prom.KubernetesPods)) +} +func TestDeletePods(t *testing.T) { + prom := &Prometheus{lock: &sync.Mutex{}} + + p := pod() + p.Annotations = map[string]string{"prometheus.io/scrape": "true"} + registerPod(p, prom) + unregisterPod(p, prom) + assert.Equal(t, 0, len(prom.KubernetesPods)) +} + +func pod() *v1.Pod { + p := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default"}} + p.Status.PodIP = "127.0.0.1" + p.Name = "myPod" + return p +} diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 2a8a6b284b206..1a7bfe12b5c14 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -18,6 +18,10 @@ import ( const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3` +type Target struct { + url string + tags map[string]string +} type Prometheus struct { // An array of urls to scrape metrics from. URLs []string `toml:"urls"` @@ -25,6 +29,11 @@ type Prometheus struct { // An array of Kubernetes services to scrape metrics from. KubernetesServices []string + // Should we scrape Kubernetes services for prometheus annotations + KubernetesScraping bool `toml:"kubernetes_scraping"` + lock *sync.Mutex + KubernetesPods []Target + // Bearer Token authorization file path BearerToken string `toml:"bearer_token"` @@ -48,6 +57,12 @@ var sampleConfig = ` ## An array of Kubernetes services to scrape metrics from. # kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"] + + # Scrape Kubernetes pods for prometheus annotations. + # prometheus.io/scrape: Enable scraping for this pod + # prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. + # prometheus.io/port: If port is not 9102 use this annotation + # kubernetes_scraping = true ## Use bearer token for authorization # bearer_token = /path/to/bearer/token @@ -96,6 +111,7 @@ type URLAndAddress struct { OriginalURL *url.URL URL *url.URL Address string + Tags map[string]string } func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) { @@ -109,11 +125,25 @@ func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) { allURLs = append(allURLs, URLAndAddress{URL: URL, OriginalURL: URL}) } + p.lock.Lock() + defer p.lock.Unlock() + // loop through all pods scraped via the prometheus annotation on the pods + for _, pod := range p.KubernetesPods { + URL, err := url.Parse(pod.url) + if err != nil { + log.Printf("prometheus: Could not parse url %s, skipping it. Error: %s", pod.url, err) + continue + } + podURL := p.AddressToURL(URL, URL.Hostname()) + allURLs = append(allURLs, URLAndAddress{URL: podURL, Address: URL.Hostname(), OriginalURL: URL, Tags: pod.tags}) + } + for _, service := range p.KubernetesServices { URL, err := url.Parse(service) if err != nil { return nil, err } + resolvedAddresses, err := net.LookupHost(URL.Hostname()) if err != nil { log.Printf("prometheus: Could not resolve %s, skipping it. Error: %s", URL.Host, err) @@ -157,15 +187,6 @@ func (p *Prometheus) Gather(acc telegraf.Accumulator) error { return nil } -var tr = &http.Transport{ - ResponseHeaderTimeout: time.Duration(3 * time.Second), -} - -var client = &http.Client{ - Transport: tr, - Timeout: time.Duration(4 * time.Second), -} - func (p *Prometheus) createHttpClient() (*http.Client, error) { tlsCfg, err := internal.GetTLSConfig( p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify) @@ -226,6 +247,9 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error if u.Address != "" { tags["address"] = u.Address } + for k, v := range u.Tags { + tags[k] = v + } switch metric.Type() { case telegraf.Counter: @@ -244,8 +268,18 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error return nil } +// Start will start the Kubernetes scraping if enabled in the configuration +func (p *Prometheus) Start(a telegraf.Accumulator) error { + if p.KubernetesScraping { + return start(p) + } + return nil +} + +func (p *Prometheus) Stop() {} + func init() { inputs.Add("prometheus", func() telegraf.Input { - return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3}} + return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3}, lock: &sync.Mutex{}} }) } diff --git a/plugins/inputs/prometheus/prometheus_test.go b/plugins/inputs/prometheus/prometheus_test.go index 9a2982ff989bf..b16e020451684 100644 --- a/plugins/inputs/prometheus/prometheus_test.go +++ b/plugins/inputs/prometheus/prometheus_test.go @@ -5,6 +5,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "sync" "testing" "time" @@ -37,6 +38,7 @@ func TestPrometheusGeneratesMetrics(t *testing.T) { defer ts.Close() p := &Prometheus{ + lock: &sync.Mutex{}, URLs: []string{ts.URL}, } @@ -60,6 +62,7 @@ func TestPrometheusGeneratesMetricsWithHostNameTag(t *testing.T) { defer ts.Close() p := &Prometheus{ + lock: &sync.Mutex{}, KubernetesServices: []string{ts.URL}, } u, _ := url.Parse(ts.URL) @@ -89,6 +92,7 @@ func TestPrometheusGeneratesMetricsAlthoughFirstDNSFails(t *testing.T) { defer ts.Close() p := &Prometheus{ + lock: &sync.Mutex{}, URLs: []string{ts.URL}, KubernetesServices: []string{"http://random.telegraf.local:88/metrics"}, } From 7cdc0e7001d7808256083f2fb03f50eab0ae5ae8 Mon Sep 17 00:00:00 2001 From: Soren Mathiasen Date: Mon, 30 Apr 2018 16:47:42 +0200 Subject: [PATCH 2/2] fixup! Add scraping for Prometheus endpoint in Kubernetes --- Godeps | 18 +-- plugins/inputs/prometheus/README.md | 2 +- plugins/inputs/prometheus/kubernetes.go | 161 ++++++++++++------- plugins/inputs/prometheus/kubernetes_test.go | 48 +++--- plugins/inputs/prometheus/prometheus.go | 36 ++--- 5 files changed, 147 insertions(+), 118 deletions(-) diff --git a/Godeps b/Godeps index cd08b6a28e905..07421e929ce7e 100644 --- a/Godeps +++ b/Godeps @@ -1,8 +1,4 @@ collectd.org 2ce144541b8903101fb8f1483cc0497a68798122 -github.com/PuerkitoBio/purell fd18e053af8a4ff11039269006e8037ff374ce0e -github.com/PuerkitoBio/urlesc de5bf2ad457846296e2031421a34e2568e304e35 -github.com/Shopify/sarama 3b1b38866a79f06deddf0487d5c27ba0697ccd65 -github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985 github.com/apache/thrift 4aaa92ece8503a6da9bc6701604f69acf2b99d07 @@ -21,22 +17,17 @@ github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3 github.com/eapache/go-xerial-snappy bb955e01b9346ac19dc29eb16586c90ded99a98c github.com/eapache/queue 44cc805cf13205b55f69e14bcb69867d1ae92f98 github.com/eclipse/paho.mqtt.golang aff15770515e3c57fc6109da73d42b0d46f7f483 -github.com/emicklei/go-restful 2dd44038f0b95ae693b266c5f87593b5d2fdd78d +github.com/ericchiang/k8s 677cf3318ef83bf681a38821f81a233a9be09641 +github.com/ghodss/yaml 0ca9ea5df5451ffdf184b4428c902747c2c11cd7 github.com/go-logfmt/logfmt 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 -github.com/go-openapi/jsonpointer 779f45308c19820f1a69e9a4cd965f496e0da10f -github.com/go-openapi/jsonreference 36d33bfe519efae5632669801b180bf1a245da3b -github.com/go-openapi/spec a4fa9574c7aa73b2fc54e251eb9524d0482bb592 -github.com/go-openapi/swag cf0bdb963811675a4d7e74901cefc7411a1df939 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 github.com/gobwas/glob bea32b9cd2d6f55753d94a28e959b13f0244797a github.com/go-ini/ini 9144852efba7c4daf409943ee90767da62d55438 github.com/gogo/protobuf 7b6c6391c4ff245962047fc1e2c6e08b1cdfa0e8 -github.com/golang/glog 23def4e6c14b4da8ac2ed8007337bc5eb5007998 github.com/golang/protobuf 8ee79997227bf9b34611aee7946ae64735e6fd93 github.com/golang/snappy 7db9049039a047d955fe8c19b83c8ff5abd765c7 github.com/go-ole/go-ole be49f7c07711fcb603cff39e1de7c67926dc0ba7 github.com/google/go-cmp f94e52cad91c65a63acc1e75d4be223ea22e99bc -github.com/google/gofuzz 24818f796faf91cd76ec7bddd72458fbced7a6c1 github.com/gorilla/mux 392c28fe23e1c45ddba891b0320b3b5df220beea github.com/go-redis/redis 73b70592cdaa9e6abdfcfbf97b4a90d80728c836 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034 @@ -50,7 +41,6 @@ github.com/jmespath/go-jmespath bd40a432e4c76585ef6b72d3fd96fb9b6dc7b68d github.com/kardianos/osext c2c54e542fb797ad986b31721e1baedf214ca413 github.com/kardianos/service 6d3a0ee7d3425d9d835debc51a0ca1ffa28f4893 github.com/kballard/go-shellquote d8ec1a69a250a17bb0e419c386eac1f3711dc142 -github.com/mailru/easyjson 5f62e4f3afa2f576dc86531b7df4d966b19ef8f8 github.com/matttproud/golang_protobuf_extensions c12348ce28de40eed0136aa2b644d0ee0650e56c github.com/Microsoft/go-winio ce2922f643c8fd76b46cadc7f404a06282678b34 github.com/miekg/dns 99f84ae56e75126dd77e5de4fae2ea034a468ca1 @@ -105,7 +95,3 @@ gopkg.in/mgo.v2 3f83fa5005286a7fe593b055f0d7771a7dce4655 gopkg.in/olivere/elastic.v5 3113f9b9ad37509fe5f8a0e5e91c96fdc4435e26 gopkg.in/tomb.v1 dd632973f1e7218eb1089048e0798ec9ae7dceb8 gopkg.in/yaml.v2 4c78c975fe7c825c6d1466c42be594d1d6f3aba6 -k8s.io/api 5584376ceeffeb13a2e98b5e9f0e9dab37de4bab -k8s.io/apimachinery 18a564baac720819100827c16fdebcadb05b2d0d -k8s.io/client-go a3c9a30b35fc56f709f4428827b807c90af99fc3 -k8s.io/kube-openapi 39a7bf85c140f972372c2a0d1ee40adbf0c8bfe1 \ No newline at end of file diff --git a/plugins/inputs/prometheus/README.md b/plugins/inputs/prometheus/README.md index 66bfb681a3c55..0d91ae469abd1 100644 --- a/plugins/inputs/prometheus/README.md +++ b/plugins/inputs/prometheus/README.md @@ -18,7 +18,7 @@ in Prometheus format. # prometheus.io/scrape: Enable scraping for this service # prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. # prometheus.io/port: If port is not 9102 use this annotation - # kubernetes_scraping = true + # monitor_kubernetes_pods = true ## Use bearer token for authorization # bearer_token = /path/to/bearer/token diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go index b30221c54f308..102ec9e0dd576 100644 --- a/plugins/inputs/prometheus/kubernetes.go +++ b/plugins/inputs/prometheus/kubernetes.go @@ -1,84 +1,129 @@ package prometheus import ( + "context" "fmt" + "io/ioutil" "log" + "net/url" + "os" "strings" - "time" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" - - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/rest" + "github.com/ericchiang/k8s" + corev1 "github.com/ericchiang/k8s/apis/core/v1" + "github.com/ghodss/yaml" ) -func start(p *Prometheus) error { - config, err := rest.InClusterConfig() +// loadClient parses a kubeconfig from a file and returns a Kubernetes +// client. It does not support extensions or client auth providers. +func loadClient(kubeconfigPath string) (*k8s.Client, error) { + data, err := ioutil.ReadFile(kubeconfigPath) if err != nil { - return err + return nil, fmt.Errorf("read kubeconfig: %v", err) } - clientset, err := kubernetes.NewForConfig(config) + + // Unmarshal YAML into a Kubernetes config object. + var config k8s.Config + if err := yaml.Unmarshal(data, &config); err != nil { + return nil, fmt.Errorf("unmarshal kubeconfig: %v", err) + } + return k8s.NewClient(&config) +} + +func start(p *Prometheus) error { + client, err := k8s.NewInClusterClient() if err != nil { - return err + client, err = loadClient(fmt.Sprintf("%v/.kube/config", os.Getenv("HOME"))) + if err != nil { + log.Fatal(err) + } } - watchlist := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceAll, fields.Everything()) - _, controller := cache.NewInformer( - watchlist, - &v1.Pod{}, - time.Second*0, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - pod := obj.(*v1.Pod) - registerPod(pod, p) - }, - DeleteFunc: func(obj interface{}) { - pod := obj.(*v1.Pod) - unregisterPod(pod, p) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - podPod := oldObj.(*v1.Pod) - newPod := newObj.(*v1.Pod) - unregisterPod(podPod, p) - registerPod(newPod, p) - }, - }, - ) + type payload struct { + eventype string + pod *corev1.Pod + } + in := make(chan payload) + go func() { + var pod corev1.Pod + watcher, err := client.Watch(context.Background(), "", &pod) + if err != nil { + log.Printf("E! [inputs.prometheus] unable to watch resources: %v", err) + } + defer watcher.Close() + + for { + cm := new(corev1.Pod) + eventType, err := watcher.Next(cm) + if err != nil { + log.Println() + } + in <- payload{eventType, cm} + } + }() + + go func() { + for { + select { + case <-p.done: + log.Printf("I! [inputs.prometheus] shutting dow\n") + return + case payload := <-in: + cm := payload.pod + eventType := payload.eventype + + if err != nil { + log.Printf("E! [inputs.prometheus] watcher encountered and error: %v", err) + break + } + switch eventType { + case k8s.EventAdded: + registerPod(cm, p) + case k8s.EventDeleted: + unregisterPod(cm, p) + case k8s.EventModified: + } + } + } + }() - go controller.Run(wait.NeverStop) return nil } -func registerPod(pod *v1.Pod, p *Prometheus) { - url := scrapeURL(pod) - if url != nil { - log.Printf("Will scrape metrics from %v\n", *url) +func registerPod(pod *corev1.Pod, p *Prometheus) { + targetURL := scrapeURL(pod) + if targetURL != nil { + log.Printf("I! [inputs.prometheus] will scrape metrics from %v\n", *targetURL) p.lock.Lock() // add annotation as metrics tags - tags := pod.GetAnnotations() - tags["pod_name"] = pod.Name - tags["namespace"] = pod.Namespace + tags := pod.GetMetadata().GetAnnotations() + tags["pod_name"] = pod.GetMetadata().GetName() + tags["namespace"] = pod.GetMetadata().GetNamespace() // add labels as metrics tags - for k, v := range pod.GetLabels() { + for k, v := range pod.GetMetadata().GetLabels() { tags[k] = v } - p.KubernetesPods = append(p.KubernetesPods, Target{url: *url, tags: tags}) + URL, err := url.Parse(*targetURL) + if err != nil { + log.Printf("E! [inputs.prometheus] could not parse URL %q: %v", targetURL, err) + return + } + podURL := p.AddressToURL(URL, URL.Hostname()) + p.kubernetesPods = append(p.kubernetesPods, URLAndAddress{URL: podURL, Address: URL.Hostname(), OriginalURL: URL, Tags: tags}) p.lock.Unlock() } } -func scrapeURL(pod *v1.Pod) *string { - scrape := pod.ObjectMeta.Annotations["prometheus.io/scrape"] - if pod.Status.PodIP == "" { +func scrapeURL(pod *corev1.Pod) *string { + scrape := pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"] + if pod.Status.GetPodIP() == "" { // return as if scrape was disabled, we will be notified again once the pod // has an IP + log.Println("pod doesn't have an IP") return nil } if scrape == "true" { - path := pod.ObjectMeta.Annotations["prometheus.io/path"] - port := pod.ObjectMeta.Annotations["prometheus.io/port"] + path := pod.GetMetadata().GetAnnotations()["prometheus.io/path"] + port := pod.GetMetadata().GetAnnotations()["prometheus.io/port"] if port == "" { port = "9102" // default } @@ -89,28 +134,28 @@ func scrapeURL(pod *v1.Pod) *string { path = "/" + path } - ip := pod.Status.PodIP + ip := pod.Status.GetPodIP() x := fmt.Sprintf("http://%v:%v%v", ip, port, path) return &x } return nil } -func unregisterPod(pod *v1.Pod, p *Prometheus) { +func unregisterPod(pod *corev1.Pod, p *Prometheus) { url := scrapeURL(pod) if url != nil { p.lock.Lock() defer p.lock.Unlock() - log.Printf("Registred a delete request for %v in namespace '%v'\n", pod.Name, pod.Namespace) - var result []Target - for _, v := range p.KubernetesPods { - if v.url != *url { + log.Printf("D! [inputs.prometheus] registred a delete request for %v in namespace %v\n", pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace()) + var result []URLAndAddress + for _, v := range p.kubernetesPods { + if v.URL.String() != *url { result = append(result, v) } else { - log.Printf("Will stop scraping for %v\n", *url) + log.Printf("D! [inputs.prometheus] will stop scraping for %v\n", *url) } } - p.KubernetesPods = result + p.kubernetesPods = result } } diff --git a/plugins/inputs/prometheus/kubernetes_test.go b/plugins/inputs/prometheus/kubernetes_test.go index 2e3caba38e06e..4e85c198d2534 100644 --- a/plugins/inputs/prometheus/kubernetes_test.go +++ b/plugins/inputs/prometheus/kubernetes_test.go @@ -5,45 +5,46 @@ import ( "testing" "github.com/stretchr/testify/assert" - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + v1 "github.com/ericchiang/k8s/apis/core/v1" + metav1 "github.com/ericchiang/k8s/apis/meta/v1" ) func TestScrapeURLNoAnnotations(t *testing.T) { - p := &v1.Pod{} - p.Annotations = map[string]string{} + p := &v1.Pod{Metadata: &metav1.ObjectMeta{}} + p.GetMetadata().Annotations = map[string]string{} url := scrapeURL(p) assert.Nil(t, url) } func TestScrapeURLAnnotationsNoScrape(t *testing.T) { - p := &v1.Pod{} - p.Name = "myPod" - p.Annotations = map[string]string{"prometheus.io/scrape": "false"} + p := &v1.Pod{Metadata: &metav1.ObjectMeta{}} + p.Metadata.Name = str("myPod") + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "false"} url := scrapeURL(p) assert.Nil(t, url) } func TestScrapeURLAnnotations(t *testing.T) { p := pod() - p.Annotations = map[string]string{"prometheus.io/scrape": "true"} + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} url := scrapeURL(p) assert.Equal(t, "http://127.0.0.1:9102/metrics", *url) } func TestScrapeURLAnnotationsCustomPort(t *testing.T) { p := pod() - p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"} + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"} url := scrapeURL(p) assert.Equal(t, "http://127.0.0.1:9000/metrics", *url) } func TestScrapeURLAnnotationsCustomPath(t *testing.T) { p := pod() - p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"} + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"} url := scrapeURL(p) assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) } func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) { p := pod() - p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"} + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"} url := scrapeURL(p) assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) } @@ -51,33 +52,38 @@ func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) { func TestAddPod(t *testing.T) { prom := &Prometheus{lock: &sync.Mutex{}} p := pod() - p.Annotations = map[string]string{"prometheus.io/scrape": "true"} + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} registerPod(p, prom) - assert.Equal(t, 1, len(prom.KubernetesPods)) + assert.Equal(t, 1, len(prom.kubernetesPods)) } func TestAddMultiplePods(t *testing.T) { prom := &Prometheus{lock: &sync.Mutex{}} p := pod() - p.Annotations = map[string]string{"prometheus.io/scrape": "true"} + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} registerPod(p, prom) - p.Name = "Pod2" + p.Metadata.Name = str("Pod2") registerPod(p, prom) - assert.Equal(t, 2, len(prom.KubernetesPods)) + assert.Equal(t, 2, len(prom.kubernetesPods)) } func TestDeletePods(t *testing.T) { prom := &Prometheus{lock: &sync.Mutex{}} p := pod() - p.Annotations = map[string]string{"prometheus.io/scrape": "true"} + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} registerPod(p, prom) unregisterPod(p, prom) - assert.Equal(t, 0, len(prom.KubernetesPods)) + assert.Equal(t, 0, len(prom.kubernetesPods)) } func pod() *v1.Pod { - p := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default"}} - p.Status.PodIP = "127.0.0.1" - p.Name = "myPod" + p := &v1.Pod{Metadata: &metav1.ObjectMeta{}, Status: &v1.PodStatus{}} + p.Status.PodIP = str("127.0.0.1") + p.Metadata.Name = str("myPod") + p.Metadata.Namespace = str("default") return p } + +func str(x string) *string { + return &x +} diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 1a7bfe12b5c14..13b485e07d771 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -18,10 +18,6 @@ import ( const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3` -type Target struct { - url string - tags map[string]string -} type Prometheus struct { // An array of urls to scrape metrics from. URLs []string `toml:"urls"` @@ -29,11 +25,6 @@ type Prometheus struct { // An array of Kubernetes services to scrape metrics from. KubernetesServices []string - // Should we scrape Kubernetes services for prometheus annotations - KubernetesScraping bool `toml:"kubernetes_scraping"` - lock *sync.Mutex - KubernetesPods []Target - // Bearer Token authorization file path BearerToken string `toml:"bearer_token"` @@ -49,6 +40,12 @@ type Prometheus struct { InsecureSkipVerify bool client *http.Client + + // Should we scrape Kubernetes services for prometheus annotations + MonitorPods bool `toml:"monitor_kubernetes_pods"` + lock *sync.Mutex + kubernetesPods []URLAndAddress + done chan struct{} } var sampleConfig = ` @@ -62,7 +59,7 @@ var sampleConfig = ` # prometheus.io/scrape: Enable scraping for this pod # prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. # prometheus.io/port: If port is not 9102 use this annotation - # kubernetes_scraping = true + # monitor_kubernetes_pods = true ## Use bearer token for authorization # bearer_token = /path/to/bearer/token @@ -128,15 +125,7 @@ func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) { p.lock.Lock() defer p.lock.Unlock() // loop through all pods scraped via the prometheus annotation on the pods - for _, pod := range p.KubernetesPods { - URL, err := url.Parse(pod.url) - if err != nil { - log.Printf("prometheus: Could not parse url %s, skipping it. Error: %s", pod.url, err) - continue - } - podURL := p.AddressToURL(URL, URL.Hostname()) - allURLs = append(allURLs, URLAndAddress{URL: podURL, Address: URL.Hostname(), OriginalURL: URL, Tags: pod.tags}) - } + allURLs = append(allURLs, p.kubernetesPods...) for _, service := range p.KubernetesServices { URL, err := url.Parse(service) @@ -270,16 +259,19 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error // Start will start the Kubernetes scraping if enabled in the configuration func (p *Prometheus) Start(a telegraf.Accumulator) error { - if p.KubernetesScraping { + if p.MonitorPods { return start(p) } return nil } -func (p *Prometheus) Stop() {} +func (p *Prometheus) Stop() { + close(p.done) +} func init() { inputs.Add("prometheus", func() telegraf.Input { - return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3}, lock: &sync.Mutex{}} + return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3}, + lock: &sync.Mutex{}} }) }