-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add scraping for Prometheus endpoint in Kubernetes #3901
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
package prometheus | ||
|
||
import ( | ||
"fmt" | ||
"log" | ||
"strings" | ||
"time" | ||
|
||
"k8s.io/client-go/kubernetes" | ||
"k8s.io/client-go/tools/cache" | ||
|
||
"k8s.io/api/core/v1" | ||
"k8s.io/apimachinery/pkg/fields" | ||
"k8s.io/apimachinery/pkg/util/wait" | ||
"k8s.io/client-go/rest" | ||
) | ||
|
||
func start(p *Prometheus) error { | ||
config, err := rest.InClusterConfig() | ||
if err != nil { | ||
return err | ||
} | ||
clientset, err := kubernetes.NewForConfig(config) | ||
if err != nil { | ||
return err | ||
} | ||
watchlist := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceAll, fields.Everything()) | ||
_, controller := cache.NewInformer( | ||
watchlist, | ||
&v1.Pod{}, | ||
time.Second*0, | ||
cache.ResourceEventHandlerFuncs{ | ||
AddFunc: func(obj interface{}) { | ||
pod := obj.(*v1.Pod) | ||
registerPod(pod, p) | ||
}, | ||
DeleteFunc: func(obj interface{}) { | ||
pod := obj.(*v1.Pod) | ||
unregisterPod(pod, p) | ||
}, | ||
UpdateFunc: func(oldObj, newObj interface{}) { | ||
podPod := oldObj.(*v1.Pod) | ||
newPod := newObj.(*v1.Pod) | ||
unregisterPod(podPod, p) | ||
registerPod(newPod, p) | ||
}, | ||
}, | ||
) | ||
|
||
go controller.Run(wait.NeverStop) | ||
return nil | ||
} | ||
|
||
func registerPod(pod *v1.Pod, p *Prometheus) { | ||
url := scrapeURL(pod) | ||
if url != nil { | ||
log.Printf("Will scrape metrics from %v\n", *url) | ||
p.lock.Lock() | ||
glinton marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// add annotation as metrics tags | ||
tags := pod.GetAnnotations() | ||
tags["pod_name"] = pod.Name | ||
tags["namespace"] = pod.Namespace | ||
// add labels as metrics tags | ||
for k, v := range pod.GetLabels() { | ||
tags[k] = v | ||
} | ||
p.KubernetesPods = append(p.KubernetesPods, Target{url: *url, tags: tags}) | ||
p.lock.Unlock() | ||
} | ||
} | ||
|
||
func scrapeURL(pod *v1.Pod) *string { | ||
scrape := pod.ObjectMeta.Annotations["prometheus.io/scrape"] | ||
if pod.Status.PodIP == "" { | ||
// return as if scrape was disabled, we will be notified again once the pod | ||
// has an IP | ||
return nil | ||
} | ||
if scrape == "true" { | ||
glinton marked this conversation as resolved.
Show resolved
Hide resolved
|
||
path := pod.ObjectMeta.Annotations["prometheus.io/path"] | ||
port := pod.ObjectMeta.Annotations["prometheus.io/port"] | ||
if port == "" { | ||
port = "9102" // default | ||
} | ||
if path == "" { | ||
path = "/metrics" | ||
} | ||
if !strings.HasPrefix(path, "/") { | ||
path = "/" + path | ||
} | ||
|
||
ip := pod.Status.PodIP | ||
x := fmt.Sprintf("http://%v:%v%v", ip, port, path) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider using url.URL to create the url, which would avoid the need to fixup the path above and potentially handle other cases. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Always using http seems like a potential issue to me. What if the service is using https? |
||
return &x | ||
} | ||
return nil | ||
} | ||
|
||
func unregisterPod(pod *v1.Pod, p *Prometheus) { | ||
url := scrapeURL(pod) | ||
if url != nil { | ||
glinton marked this conversation as resolved.
Show resolved
Hide resolved
|
||
p.lock.Lock() | ||
defer p.lock.Unlock() | ||
log.Printf("Registred a delete request for %v in namespace '%v'\n", pod.Name, pod.Namespace) | ||
var result []Target | ||
for _, v := range p.KubernetesPods { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should use a |
||
if v.url != *url { | ||
result = append(result, v) | ||
} else { | ||
log.Printf("Will stop scraping for %v\n", *url) | ||
} | ||
|
||
} | ||
p.KubernetesPods = result | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
package prometheus | ||
|
||
import ( | ||
"sync" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
) | ||
|
||
func TestScrapeURLNoAnnotations(t *testing.T) { | ||
p := &v1.Pod{} | ||
p.Annotations = map[string]string{} | ||
url := scrapeURL(p) | ||
assert.Nil(t, url) | ||
} | ||
func TestScrapeURLAnnotationsNoScrape(t *testing.T) { | ||
p := &v1.Pod{} | ||
p.Name = "myPod" | ||
p.Annotations = map[string]string{"prometheus.io/scrape": "false"} | ||
url := scrapeURL(p) | ||
assert.Nil(t, url) | ||
} | ||
func TestScrapeURLAnnotations(t *testing.T) { | ||
p := pod() | ||
p.Annotations = map[string]string{"prometheus.io/scrape": "true"} | ||
url := scrapeURL(p) | ||
assert.Equal(t, "http://127.0.0.1:9102/metrics", *url) | ||
} | ||
func TestScrapeURLAnnotationsCustomPort(t *testing.T) { | ||
p := pod() | ||
p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"} | ||
url := scrapeURL(p) | ||
assert.Equal(t, "http://127.0.0.1:9000/metrics", *url) | ||
} | ||
func TestScrapeURLAnnotationsCustomPath(t *testing.T) { | ||
p := pod() | ||
p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"} | ||
url := scrapeURL(p) | ||
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) | ||
} | ||
|
||
func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) { | ||
p := pod() | ||
p.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"} | ||
url := scrapeURL(p) | ||
assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) | ||
} | ||
|
||
func TestAddPod(t *testing.T) { | ||
prom := &Prometheus{lock: &sync.Mutex{}} | ||
p := pod() | ||
p.Annotations = map[string]string{"prometheus.io/scrape": "true"} | ||
registerPod(p, prom) | ||
assert.Equal(t, 1, len(prom.KubernetesPods)) | ||
} | ||
func TestAddMultiplePods(t *testing.T) { | ||
prom := &Prometheus{lock: &sync.Mutex{}} | ||
|
||
p := pod() | ||
p.Annotations = map[string]string{"prometheus.io/scrape": "true"} | ||
registerPod(p, prom) | ||
p.Name = "Pod2" | ||
registerPod(p, prom) | ||
assert.Equal(t, 2, len(prom.KubernetesPods)) | ||
} | ||
func TestDeletePods(t *testing.T) { | ||
prom := &Prometheus{lock: &sync.Mutex{}} | ||
|
||
p := pod() | ||
p.Annotations = map[string]string{"prometheus.io/scrape": "true"} | ||
registerPod(p, prom) | ||
unregisterPod(p, prom) | ||
assert.Equal(t, 0, len(prom.KubernetesPods)) | ||
} | ||
|
||
func pod() *v1.Pod { | ||
p := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default"}} | ||
p.Status.PodIP = "127.0.0.1" | ||
p.Name = "myPod" | ||
return p | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,13 +18,22 @@ import ( | |
|
||
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3` | ||
|
||
type Target struct { | ||
url string | ||
tags map[string]string | ||
} | ||
type Prometheus struct { | ||
// An array of urls to scrape metrics from. | ||
URLs []string `toml:"urls"` | ||
|
||
// An array of Kubernetes services to scrape metrics from. | ||
KubernetesServices []string | ||
|
||
// Should we scrape Kubernetes services for prometheus annotations | ||
KubernetesScraping bool `toml:"kubernetes_scraping"` | ||
lock *sync.Mutex | ||
KubernetesPods []Target | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be set in the config file? If not make it unexported, move to bottom of struct too please. |
||
|
||
// Bearer Token authorization file path | ||
BearerToken string `toml:"bearer_token"` | ||
|
||
|
@@ -48,6 +57,12 @@ var sampleConfig = ` | |
|
||
## An array of Kubernetes services to scrape metrics from. | ||
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"] | ||
|
||
# Scrape Kubernetes pods for prometheus annotations. | ||
# prometheus.io/scrape: Enable scraping for this pod | ||
# prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. | ||
# prometheus.io/port: If port is not 9102 use this annotation | ||
# kubernetes_scraping = true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
## Use bearer token for authorization | ||
# bearer_token = /path/to/bearer/token | ||
|
@@ -96,6 +111,7 @@ type URLAndAddress struct { | |
OriginalURL *url.URL | ||
URL *url.URL | ||
Address string | ||
Tags map[string]string | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be a good idea to rename this struct... perhaps we merge this with the Target struct? |
||
|
||
func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) { | ||
|
@@ -109,11 +125,25 @@ func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) { | |
|
||
allURLs = append(allURLs, URLAndAddress{URL: URL, OriginalURL: URL}) | ||
} | ||
p.lock.Lock() | ||
defer p.lock.Unlock() | ||
// loop through all pods scraped via the prometheus annotation on the pods | ||
for _, pod := range p.KubernetesPods { | ||
URL, err := url.Parse(pod.url) | ||
if err != nil { | ||
log.Printf("prometheus: Could not parse url %s, skipping it. Error: %s", pod.url, err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see that this is based on some existing log messages, but could you do me a favor and update them to look more like:
|
||
continue | ||
} | ||
podURL := p.AddressToURL(URL, URL.Hostname()) | ||
allURLs = append(allURLs, URLAndAddress{URL: podURL, Address: URL.Hostname(), OriginalURL: URL, Tags: pod.tags}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we merge |
||
} | ||
|
||
for _, service := range p.KubernetesServices { | ||
URL, err := url.Parse(service) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
resolvedAddresses, err := net.LookupHost(URL.Hostname()) | ||
if err != nil { | ||
log.Printf("prometheus: Could not resolve %s, skipping it. Error: %s", URL.Host, err) | ||
|
@@ -157,15 +187,6 @@ func (p *Prometheus) Gather(acc telegraf.Accumulator) error { | |
return nil | ||
} | ||
|
||
var tr = &http.Transport{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you! |
||
ResponseHeaderTimeout: time.Duration(3 * time.Second), | ||
} | ||
|
||
var client = &http.Client{ | ||
Transport: tr, | ||
Timeout: time.Duration(4 * time.Second), | ||
} | ||
|
||
func (p *Prometheus) createHttpClient() (*http.Client, error) { | ||
tlsCfg, err := internal.GetTLSConfig( | ||
p.SSLCert, p.SSLKey, p.SSLCA, p.InsecureSkipVerify) | ||
|
@@ -226,6 +247,9 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error | |
if u.Address != "" { | ||
tags["address"] = u.Address | ||
} | ||
for k, v := range u.Tags { | ||
tags[k] = v | ||
} | ||
|
||
switch metric.Type() { | ||
case telegraf.Counter: | ||
|
@@ -244,8 +268,18 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error | |
return nil | ||
} | ||
|
||
// Start will start the Kubernetes scraping if enabled in the configuration | ||
func (p *Prometheus) Start(a telegraf.Accumulator) error { | ||
if p.KubernetesScraping { | ||
return start(p) | ||
} | ||
return nil | ||
} | ||
|
||
func (p *Prometheus) Stop() {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to stop the kubernetes pod monitor. Easiest way to test this is by sending a SIGHUP, the config could be completely different after reload. |
||
|
||
func init() { | ||
inputs.Add("prometheus", func() telegraf.Input { | ||
return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3}} | ||
return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3}, lock: &sync.Mutex{}} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpick: can you wrap this line? |
||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For these log at debug level: "
D! [inputs.prometheus] adding %q to scrape targets
"