Skip to content

Commit

Permalink
fixup! Add scraping for Prometheus endpoint in Kubernetes
Browse files Browse the repository at this point in the history
  • Loading branch information
Soren Mathiasen committed May 7, 2018
1 parent 8d1a552 commit e959ef5
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 117 deletions.
16 changes: 0 additions & 16 deletions Godeps
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
collectd.org 2ce144541b8903101fb8f1483cc0497a68798122
github.com/PuerkitoBio/purell fd18e053af8a4ff11039269006e8037ff374ce0e
github.com/PuerkitoBio/urlesc de5bf2ad457846296e2031421a34e2568e304e35
github.com/Shopify/sarama 3b1b38866a79f06deddf0487d5c27ba0697ccd65
github.com/Sirupsen/logrus 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d
github.com/aerospike/aerospike-client-go 95e1ad7791bdbca44707fedbb29be42024900d9c
github.com/amir/raidman c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985
github.com/apache/thrift 4aaa92ece8503a6da9bc6701604f69acf2b99d07
Expand All @@ -21,22 +17,15 @@ github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3
github.com/eapache/go-xerial-snappy bb955e01b9346ac19dc29eb16586c90ded99a98c
github.com/eapache/queue 44cc805cf13205b55f69e14bcb69867d1ae92f98
github.com/eclipse/paho.mqtt.golang aff15770515e3c57fc6109da73d42b0d46f7f483
github.com/emicklei/go-restful 2dd44038f0b95ae693b266c5f87593b5d2fdd78d
github.com/go-logfmt/logfmt 390ab7935ee28ec6b286364bba9b4dd6410cb3d5
github.com/go-openapi/jsonpointer 779f45308c19820f1a69e9a4cd965f496e0da10f
github.com/go-openapi/jsonreference 36d33bfe519efae5632669801b180bf1a245da3b
github.com/go-openapi/spec a4fa9574c7aa73b2fc54e251eb9524d0482bb592
github.com/go-openapi/swag cf0bdb963811675a4d7e74901cefc7411a1df939
github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034
github.com/gobwas/glob bea32b9cd2d6f55753d94a28e959b13f0244797a
github.com/go-ini/ini 9144852efba7c4daf409943ee90767da62d55438
github.com/gogo/protobuf 7b6c6391c4ff245962047fc1e2c6e08b1cdfa0e8
github.com/golang/glog 23def4e6c14b4da8ac2ed8007337bc5eb5007998
github.com/golang/protobuf 8ee79997227bf9b34611aee7946ae64735e6fd93
github.com/golang/snappy 7db9049039a047d955fe8c19b83c8ff5abd765c7
github.com/go-ole/go-ole be49f7c07711fcb603cff39e1de7c67926dc0ba7
github.com/google/go-cmp f94e52cad91c65a63acc1e75d4be223ea22e99bc
github.com/google/gofuzz 24818f796faf91cd76ec7bddd72458fbced7a6c1
github.com/gorilla/mux 392c28fe23e1c45ddba891b0320b3b5df220beea
github.com/go-redis/redis 73b70592cdaa9e6abdfcfbf97b4a90d80728c836
github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034
Expand All @@ -50,7 +39,6 @@ github.com/jmespath/go-jmespath bd40a432e4c76585ef6b72d3fd96fb9b6dc7b68d
github.com/kardianos/osext c2c54e542fb797ad986b31721e1baedf214ca413
github.com/kardianos/service 6d3a0ee7d3425d9d835debc51a0ca1ffa28f4893
github.com/kballard/go-shellquote d8ec1a69a250a17bb0e419c386eac1f3711dc142
github.com/mailru/easyjson 5f62e4f3afa2f576dc86531b7df4d966b19ef8f8
github.com/matttproud/golang_protobuf_extensions c12348ce28de40eed0136aa2b644d0ee0650e56c
github.com/Microsoft/go-winio ce2922f643c8fd76b46cadc7f404a06282678b34
github.com/miekg/dns 99f84ae56e75126dd77e5de4fae2ea034a468ca1
Expand Down Expand Up @@ -105,7 +93,3 @@ gopkg.in/mgo.v2 3f83fa5005286a7fe593b055f0d7771a7dce4655
gopkg.in/olivere/elastic.v5 3113f9b9ad37509fe5f8a0e5e91c96fdc4435e26
gopkg.in/tomb.v1 dd632973f1e7218eb1089048e0798ec9ae7dceb8
gopkg.in/yaml.v2 4c78c975fe7c825c6d1466c42be594d1d6f3aba6
k8s.io/api 5584376ceeffeb13a2e98b5e9f0e9dab37de4bab
k8s.io/apimachinery 18a564baac720819100827c16fdebcadb05b2d0d
k8s.io/client-go a3c9a30b35fc56f709f4428827b807c90af99fc3
k8s.io/kube-openapi 39a7bf85c140f972372c2a0d1ee40adbf0c8bfe1
2 changes: 1 addition & 1 deletion plugins/inputs/prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ in Prometheus format.
# prometheus.io/scrape: Enable scraping for this service
# prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.
# prometheus.io/port: If port is not 9102 use this annotation
# kubernetes_scraping = true
# monitor_kubernetes_pods = true

## Use bearer token for authorization
# bearer_token = /path/to/bearer/token
Expand Down
139 changes: 81 additions & 58 deletions plugins/inputs/prometheus/kubernetes.go
Original file line number Diff line number Diff line change
@@ -1,84 +1,107 @@
package prometheus

import (
"context"
"fmt"
"io/ioutil"
"log"
"net/url"
"os"
"strings"
"time"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
"github.com/ericchiang/k8s"
corev1 "github.com/ericchiang/k8s/apis/core/v1"
"github.com/ghodss/yaml"
)

func start(p *Prometheus) error {
config, err := rest.InClusterConfig()
// loadClient parses a kubeconfig from a file and returns a Kubernetes
// client. It does not support extensions or client auth providers.
func loadClient(kubeconfigPath string) (*k8s.Client, error) {
data, err := ioutil.ReadFile(kubeconfigPath)
if err != nil {
return err
return nil, fmt.Errorf("read kubeconfig: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)

// Unmarshal YAML into a Kubernetes config object.
var config k8s.Config
if err := yaml.Unmarshal(data, &config); err != nil {
return nil, fmt.Errorf("unmarshal kubeconfig: %v", err)
}
return k8s.NewClient(&config)
}

func start(p *Prometheus) error {
client, err := k8s.NewInClusterClient()
if err != nil {
return err
client, err = loadClient(fmt.Sprintf("%v/.kube/config", os.Getenv("HOME")))
if err != nil {
log.Fatal(err)
}
}
watchlist := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceAll, fields.Everything())
_, controller := cache.NewInformer(
watchlist,
&v1.Pod{},
time.Second*0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
registerPod(pod, p)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
unregisterPod(pod, p)
},
UpdateFunc: func(oldObj, newObj interface{}) {
podPod := oldObj.(*v1.Pod)
newPod := newObj.(*v1.Pod)
unregisterPod(podPod, p)
registerPod(newPod, p)
},
},
)

go controller.Run(wait.NeverStop)
go func() {
var pod corev1.Pod
watcher, err := client.Watch(context.Background(), "", &pod)
if err != nil {
log.Printf("E! [inputs.prometheus] unable to watch resources: %v", err)
}
defer watcher.Close()

for {
cm := new(corev1.Pod)
eventType, err := watcher.Next(cm)
if err != nil {
log.Printf("E! [inputs.prometheus] watcher encountered and error: %v", err)
break
}
switch eventType {
case k8s.EventAdded:
registerPod(cm, p)
case k8s.EventDeleted:
unregisterPod(cm, p)
case k8s.EventModified:
}
}
}()

return nil
}

func registerPod(pod *v1.Pod, p *Prometheus) {
url := scrapeURL(pod)
if url != nil {
log.Printf("Will scrape metrics from %v\n", *url)
func registerPod(pod *corev1.Pod, p *Prometheus) {
targetURL := scrapeURL(pod)
if targetURL != nil {
log.Printf("I! [inputs.prometheus] will scrape metrics from %v\n", *targetURL)
p.lock.Lock()
// add annotation as metrics tags
tags := pod.GetAnnotations()
tags["pod_name"] = pod.Name
tags["namespace"] = pod.Namespace
tags := pod.GetMetadata().GetAnnotations()
tags["pod_name"] = pod.GetMetadata().GetName()
tags["namespace"] = pod.GetMetadata().GetNamespace()
// add labels as metrics tags
for k, v := range pod.GetLabels() {
for k, v := range pod.GetMetadata().GetLabels() {
tags[k] = v
}
p.KubernetesPods = append(p.KubernetesPods, Target{url: *url, tags: tags})
URL, err := url.Parse(*targetURL)
if err != nil {
log.Printf("E! [inputs.prometheus] could not parse URL %q: %v", targetURL, err)
return
}
podURL := p.AddressToURL(URL, URL.Hostname())
p.kubernetesPods = append(p.kubernetesPods, URLAndAddress{URL: podURL, Address: URL.Hostname(), OriginalURL: URL, Tags: tags})
p.lock.Unlock()
}
}

func scrapeURL(pod *v1.Pod) *string {
scrape := pod.ObjectMeta.Annotations["prometheus.io/scrape"]
if pod.Status.PodIP == "" {
func scrapeURL(pod *corev1.Pod) *string {
scrape := pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"]
if pod.Status.GetPodIP() == "" {
// return as if scrape was disabled, we will be notified again once the pod
// has an IP
log.Println("pod doesn't have an IP")
return nil
}
if scrape == "true" {
path := pod.ObjectMeta.Annotations["prometheus.io/path"]
port := pod.ObjectMeta.Annotations["prometheus.io/port"]
path := pod.GetMetadata().GetAnnotations()["prometheus.io/path"]
port := pod.GetMetadata().GetAnnotations()["prometheus.io/port"]
if port == "" {
port = "9102" // default
}
Expand All @@ -89,28 +112,28 @@ func scrapeURL(pod *v1.Pod) *string {
path = "/" + path
}

ip := pod.Status.PodIP
ip := pod.Status.GetPodIP()
x := fmt.Sprintf("http://%v:%v%v", ip, port, path)
return &x
}
return nil
}

func unregisterPod(pod *v1.Pod, p *Prometheus) {
func unregisterPod(pod *corev1.Pod, p *Prometheus) {
url := scrapeURL(pod)
if url != nil {
p.lock.Lock()
defer p.lock.Unlock()
log.Printf("Registred a delete request for %v in namespace '%v'\n", pod.Name, pod.Namespace)
var result []Target
for _, v := range p.KubernetesPods {
if v.url != *url {
log.Printf("D! [inputs.prometheus] registred a delete request for %v in namespace %v\n", pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace())
var result []URLAndAddress
for _, v := range p.kubernetesPods {
if v.URL.String() != *url {
result = append(result, v)
} else {
log.Printf("Will stop scraping for %v\n", *url)
log.Printf("D! [inputs.prometheus] will stop scraping for %v\n", *url)
}

}
p.KubernetesPods = result
p.kubernetesPods = result
}
}
48 changes: 27 additions & 21 deletions plugins/inputs/prometheus/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,79 +5,85 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

v1 "github.com/ericchiang/k8s/apis/core/v1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
)

func TestScrapeURLNoAnnotations(t *testing.T) {
p := &v1.Pod{}
p.Annotations = map[string]string{}
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}}
p.GetMetadata().Annotations = map[string]string{}
url := scrapeURL(p)
assert.Nil(t, url)
}
func TestScrapeURLAnnotationsNoScrape(t *testing.T) {
p := &v1.Pod{}
p.Name = "myPod"
p.Annotations = map[string]string{"prometheus.io/scrape": "false"}
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}}
p.Metadata.Name = str("myPod")
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "false"}
url := scrapeURL(p)
assert.Nil(t, url)
}
func TestScrapeURLAnnotations(t *testing.T) {
p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
url := scrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/metrics", *url)
}
func TestScrapeURLAnnotationsCustomPort(t *testing.T) {
p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"}
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"}
url := scrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9000/metrics", *url)
}
func TestScrapeURLAnnotationsCustomPath(t *testing.T) {
p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"}
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"}
url := scrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url)
}

func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) {
p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"}
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"}
url := scrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url)
}

func TestAddPod(t *testing.T) {
prom := &Prometheus{lock: &sync.Mutex{}}
p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
assert.Equal(t, 1, len(prom.KubernetesPods))
assert.Equal(t, 1, len(prom.kubernetesPods))
}
func TestAddMultiplePods(t *testing.T) {
prom := &Prometheus{lock: &sync.Mutex{}}

p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
p.Name = "Pod2"
p.Metadata.Name = str("Pod2")
registerPod(p, prom)
assert.Equal(t, 2, len(prom.KubernetesPods))
assert.Equal(t, 2, len(prom.kubernetesPods))
}
func TestDeletePods(t *testing.T) {
prom := &Prometheus{lock: &sync.Mutex{}}

p := pod()
p.Annotations = map[string]string{"prometheus.io/scrape": "true"}
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
unregisterPod(p, prom)
assert.Equal(t, 0, len(prom.KubernetesPods))
assert.Equal(t, 0, len(prom.kubernetesPods))
}

func pod() *v1.Pod {
p := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default"}}
p.Status.PodIP = "127.0.0.1"
p.Name = "myPod"
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}, Status: &v1.PodStatus{}}
p.Status.PodIP = str("127.0.0.1")
p.Metadata.Name = str("myPod")
p.Metadata.Namespace = str("default")
return p
}

func str(x string) *string {
return &x
}
Loading

0 comments on commit e959ef5

Please sign in to comment.