From 76cf475040a4cb249ba230654284a5261328d87e Mon Sep 17 00:00:00 2001 From: Charlie Le Date: Sun, 19 Jan 2025 13:36:31 -0800 Subject: [PATCH] feat: add HTML response handling for job and target endpoints Signed-off-by: Charlie Le --- .chloggen/otel-targetallocator-html.yaml | 16 + cmd/otel-allocator/allocation/allocator.go | 1 + cmd/otel-allocator/server/server.go | 454 ++++++++++++++- cmd/otel-allocator/server/server_test.go | 631 ++++++++++++++++++++- cmd/otel-allocator/target/target.go | 8 + 5 files changed, 1104 insertions(+), 6 deletions(-) create mode 100644 .chloggen/otel-targetallocator-html.yaml diff --git a/.chloggen/otel-targetallocator-html.yaml b/.chloggen/otel-targetallocator-html.yaml new file mode 100644 index 0000000000..caffeb8b09 --- /dev/null +++ b/.chloggen/otel-targetallocator-html.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action) +component: target allocator + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Adds support for HTML output in the target allocator." + +# One or more tracking issues related to the change +issues: [3622] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/cmd/otel-allocator/allocation/allocator.go b/cmd/otel-allocator/allocation/allocator.go index b0a9125ba9..bfeecd6c58 100644 --- a/cmd/otel-allocator/allocation/allocator.go +++ b/cmd/otel-allocator/allocation/allocator.go @@ -144,6 +144,7 @@ func (a *allocator) GetTargetsForCollectorAndJob(collector string, job string) [ } // TargetItems returns a shallow copy of the targetItems map. +// The key is the target item's hash, and the value is the target item. func (a *allocator) TargetItems() map[string]*target.Item { a.m.RLock() defer a.m.RUnlock() diff --git a/cmd/otel-allocator/server/server.go b/cmd/otel-allocator/server/server.go index 2e9df9a8b0..a2cb671ea8 100644 --- a/cmd/otel-allocator/server/server.go +++ b/cmd/otel-allocator/server/server.go @@ -15,13 +15,17 @@ package server import ( + "bytes" "context" "crypto/tls" "encoding/json" "fmt" + "html/template" "net/http" "net/http/pprof" "net/url" + "sort" + "strconv" "strings" "sync" "time" @@ -105,8 +109,13 @@ func (s *Server) setRouter(router *gin.Engine) { router.UnescapePathValues = false router.Use(s.PrometheusMiddleware) + router.GET("/", s.IndexHandler) + router.GET("/collector", s.CollectorHTMLHandler) + router.GET("/job", s.JobHTMLHandler) + router.GET("/target", s.TargetHTMLHandler) + router.GET("/targets", s.TargetsHTMLHandler) router.GET("/scrape_configs", s.ScrapeConfigsHandler) - router.GET("/jobs", s.JobHandler) + router.GET("/jobs", s.JobsHandler) router.GET("/jobs/:job_id/targets", s.TargetsHandler) router.GET("/metrics", gin.WrapH(promhttp.Handler())) router.GET("/livez", s.LivenessProbeHandler) @@ -272,11 +281,15 @@ func (s *Server) ReadinessProbeHandler(c *gin.Context) { } } -func (s *Server) JobHandler(c *gin.Context) { +func (s *Server) JobsHandler(c *gin.Context) { displayData := make(map[string]linkJSON) for _, v := range s.allocator.TargetItems() { displayData[v.JobName] = linkJSON{Link: fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(v.JobName))} } + if strings.Contains(c.Request.Header.Get("Accept"), "text/html") { + s.JobsHTMLHandler(c) + return + } s.jsonHandler(c.Writer, displayData) } @@ -291,6 +304,402 @@ func (s *Server) PrometheusMiddleware(c *gin.Context) { timer.ObserveDuration() } +func header(data ...string) string { + return "" + strings.Join(data, "") + "\n" +} + +func row(data ...string) string { + return "" + strings.Join(data, "") + "\n" +} + +// IndexHandler displays the main page of the allocator. It shows the number of jobs and targets. +// It also displays a table with the collectors and the number of jobs and targets for each collector. +// The collector names are links to the respective pages. The table is sorted by collector name. +func (s *Server) IndexHandler(c *gin.Context) { + c.Writer.Header().Set("Content-Type", "text/html") + var b bytes.Buffer + b.WriteString(` + +

OpenTelemetry Target Allocator

+`) + + fmt.Fprint(&b, "\n") + fmt.Fprint(&b, header("Category", "Count")) + fmt.Fprint(&b, row(jobsAnchorLink(), strconv.Itoa(s.getJobCount()))) + fmt.Fprint(&b, row(targetsAnchorLink(), strconv.Itoa(len(s.allocator.TargetItems())))) + fmt.Fprint(&b, "
\n") + + fmt.Fprint(&b, "\n") + fmt.Fprint(&b, header("Collector", "Job Count", "Target Count")) + + // Sort the collectors by name to ensure consistent order + collectorNames := []string{} + for k := range s.allocator.Collectors() { + collectorNames = append(collectorNames, k) + } + sort.Strings(collectorNames) + + for _, colName := range collectorNames { + jobCount := strconv.Itoa(s.getJobCountForCollector(colName)) + targetCount := strconv.Itoa(s.getTargetCountForCollector(colName)) + fmt.Fprint(&b, row(collectorAnchorLink(colName), jobCount, targetCount)) + } + b.WriteString(`
+ +`) + + _, err := c.Writer.Write(b.Bytes()) + if err != nil { + s.logger.Error(err, "failed to write response") + c.Status(http.StatusInternalServerError) + } + + c.Status(http.StatusOK) +} + +func targetsAnchorLink() string { + return `Targets` +} + +// TargetsHTMLHandler displays the targets in a table format. Each target is a row in the table. +// The table has four columns: Job, Target, Collector, and Endpoint Slice. +// The Job, Target, and Collector columns are links to the respective pages. +func (s *Server) TargetsHTMLHandler(c *gin.Context) { + c.Writer.Header().Set("X-Content-Type-Options", "nosniff") + c.Writer.Header().Set("Content-Type", "text/html; charset=utf-8") + + var b bytes.Buffer + b.WriteString(` + +

Targets

+ +`) + fmt.Fprint(&b, header("Job", "Target", "Collector", "Endpoint Slice")) + for _, v := range s.sortedTargetItems() { + fmt.Fprint(&b, row(jobAnchorLink(v.JobName), targetAnchorLink(v), collectorAnchorLink(v.CollectorName), v.GetEndpointSliceName())) + } + + b.WriteString(`
+ +`) + + _, err := c.Writer.Write(b.Bytes()) + if err != nil { + s.logger.Error(err, "failed to write response") + c.Status(http.StatusInternalServerError) + } + + c.Status(http.StatusOK) +} + +func targetAnchorLink(t *target.Item) string { + return fmt.Sprintf("%s", t.Hash(), t.TargetURL) +} + +// TargetHTMLHandler displays information about a target in a table format. +// There are two tables: one for high-level target information and another for the target's labels. +func (s *Server) TargetHTMLHandler(c *gin.Context) { + c.Writer.Header().Set("X-Content-Type-Options", "nosniff") + c.Writer.Header().Set("Content-Type", "text/html; charset=utf-8") + + targetHash := c.Request.URL.Query().Get("target_hash") + if targetHash == "" { + c.Status(http.StatusBadRequest) + _, err := c.Writer.WriteString(` + +

Bad Request

+

Expected target_hash in the query string

+

Example: /target?target_hash=my-target-42

+ +`) + if err != nil { + s.logger.Error(err, "failed to write response") + } + return + } + + target, found := s.allocator.TargetItems()[targetHash] + if !found { + c.Status(http.StatusNotFound) + t, err := template.New("unknown_target").Parse(` + +

Unknown Target: {{.}}

+ +`) + if err != nil { + s.logger.Error(err, "failed to parse template") + } + err = t.Execute(c.Writer, targetHash) + if err != nil { + s.logger.Error(err, "failed to write response") + } + return + } + + var b bytes.Buffer + b.WriteString(` + +

Target: ` + target.TargetURL + `

+ +`) + + fmt.Fprint(&b, row("Collector", target.CollectorName)) + fmt.Fprint(&b, row("Job", target.JobName)) + if namespace := target.Labels.Get("__meta_kubernetes_namespace"); namespace != "" { + fmt.Fprint(&b, row("Namespace", namespace)) + } + if service := target.Labels.Get("__meta_kubernetes_service_name"); service != "" { + fmt.Fprint(&b, row("Service Name", service)) + } + if port := target.Labels.Get("__meta_kubernetes_service_port"); port != "" { + fmt.Fprint(&b, row("Service Port", port)) + } + if podName := target.Labels.Get("__meta_kubernetes_pod_name"); podName != "" { + fmt.Fprint(&b, row("Pod Name", podName)) + } + if container := target.Labels.Get("__meta_kubernetes_pod_container_name"); container != "" { + fmt.Fprint(&b, row("Container Name", container)) + } + if containerPortName := target.Labels.Get("__meta_kubernetes_pod_container_port_name"); containerPortName != "" { + fmt.Fprint(&b, row("Container Port Name", containerPortName)) + } + if node := target.GetNodeName(); node != "" { + fmt.Fprint(&b, row("Node Name", node)) + } + if endpointSliceName := target.GetEndpointSliceName(); endpointSliceName != "" { + fmt.Fprint(&b, row("Endpoint Slice Name", endpointSliceName)) + } + + b.WriteString(`
+

Target Labels

+ +`) + fmt.Fprint(&b, header("Label", "Value")) + for _, l := range target.Labels { + fmt.Fprint(&b, row(l.Name, l.Value)) + } + b.WriteString(`
+ +`) + _, err := c.Writer.Write(b.Bytes()) + if err != nil { + s.logger.Error(err, "failed to write response") + c.Status(http.StatusInternalServerError) + } + + c.Status(http.StatusOK) +} + +func jobsAnchorLink() string { + return `Jobs` +} + +// JobsHTMLHandler displays the jobs in a table format. Each job is a row in the table. +// The table has two columns: Job and Target Count. The Job column is a link to the job's targets. +func (s *Server) JobsHTMLHandler(c *gin.Context) { + c.Writer.Header().Set("X-Content-Type-Options", "nosniff") + c.Writer.Header().Set("Content-Type", "text/html; charset=utf-8") + + var b bytes.Buffer + b.WriteString(` + +

Jobs

+ +`) + fmt.Fprint(&b, header("Job", "Target Count")) + + jobs := make(map[string]int) + for _, v := range s.allocator.TargetItems() { + jobs[v.JobName]++ + } + + // Sort the jobs by name to ensure consistent order + jobNames := make([]string, 0, len(jobs)) + for k := range jobs { + jobNames = append(jobNames, k) + } + sort.Strings(jobNames) + + for _, j := range jobNames { + fmt.Fprint(&b, row(jobAnchorLink(j), strconv.Itoa(jobs[j]))) + } + + b.WriteString(`
+ +`) + + _, err := c.Writer.Write(b.Bytes()) + if err != nil { + s.logger.Error(err, "failed to write response") + c.Status(http.StatusInternalServerError) + } + + c.Status(http.StatusOK) +} + +func jobAnchorLink(jobId string) string { + return fmt.Sprintf("%s", jobId, jobId) +} +func (s *Server) JobHTMLHandler(c *gin.Context) { + c.Writer.Header().Set("X-Content-Type-Options", "nosniff") + c.Writer.Header().Set("Content-Type", "text/html; charset=utf-8") + + jobIdValues := c.Request.URL.Query()["job_id"] + if len(jobIdValues) != 1 { + c.Status(http.StatusBadRequest) + return + } + jobId := jobIdValues[0] + + var b bytes.Buffer + t, err := template.New("job").Parse(` + +

Job: {{.}}

+ +`) + if err != nil { + s.logger.Error(err, "failed to parse template") + return + } + err = t.Execute(&b, jobId) + if err != nil { + s.logger.Error(err, "failed to execute template") + return + } + fmt.Fprint(&b, header("Collector", "Target Count")) + + // Filter targets by job + targets := map[string]*target.Item{} + for k, v := range s.allocator.TargetItems() { + if v.JobName == jobId { + targets[k] = v + } + } + + colNames := []string{} + for _, col := range s.allocator.Collectors() { + colNames = append(colNames, col.Name) + } + sort.Strings(colNames) + + for _, colName := range colNames { + count := 0 + for _, target := range targets { + if target.CollectorName == colName { + count++ + } + } + fmt.Fprint(&b, row(collectorAnchorLink(colName), strconv.Itoa(count))) + } + b.WriteString(`
+ +`) + fmt.Fprint(&b, header("Collector", "Target")) + for _, v := range colNames { + for _, t := range targets { + if t.CollectorName == v { + fmt.Fprint(&b, row(collectorAnchorLink(v), targetAnchorLink(t))) + } + } + } + b.WriteString(`
+ +`) + _, err = c.Writer.Write(b.Bytes()) + if err != nil { + s.logger.Error(err, "failed to write response") + c.Status(http.StatusInternalServerError) + } + + c.Status(http.StatusOK) +} + +func collectorAnchorLink(collectorId string) string { + return fmt.Sprintf("%s", collectorId, collectorId) +} + +func (s *Server) CollectorHTMLHandler(c *gin.Context) { + c.Writer.Header().Set("X-Content-Type-Options", "nosniff") + c.Writer.Header().Set("Content-Type", "text/html; charset=utf-8") + collectorIdValues := c.Request.URL.Query()["collector_id"] + collectorId := "" + if len(collectorIdValues) == 1 { + collectorId = collectorIdValues[0] + } + + if collectorId == "" { + c.Status(http.StatusBadRequest) + _, err := c.Writer.WriteString(` + +

Bad Request

+

Expected collector_id in the query string

+

Example: /collector?collector_id=my-collector-42

+ +`) + if err != nil { + s.logger.Error(err, "failed to write response") + } + return + } + + found := false + for _, v := range s.allocator.Collectors() { + if v.Name == collectorId { + found = true + break + } + } + if !found { + c.Status(http.StatusNotFound) + t, err := template.New("unknown_collector").Parse(` + +

Unknown Collector: {{.}}

+ +`) + if err != nil { + s.logger.Error(err, "failed to parse template") + } + err = t.Execute(c.Writer, collectorId) + if err != nil { + s.logger.Error(err, "failed to write response") + } + return + } + + var b bytes.Buffer + t, err := template.New("collector").Parse(` + +

Collector: {{.}}

+ +`) + if err != nil { + s.logger.Error(err, "failed to parse template") + return + } + err = t.Execute(&b, collectorId) + if err != nil { + s.logger.Error(err, "failed to execute template") + return + } + + fmt.Fprint(&b, header("Job", "Target", "Endpoint Slice")) + for _, v := range s.sortedTargetItems() { + if v.CollectorName == collectorId { + fmt.Fprint(&b, row(jobAnchorLink(v.JobName), targetAnchorLink(v), v.GetEndpointSliceName())) + } + } + b.WriteString(`
+ +`) + _, err = c.Writer.Write(b.Bytes()) + if err != nil { + s.logger.Error(err, "failed to write response") + c.Status(http.StatusInternalServerError) + } + + c.Status(http.StatusOK) +} + func (s *Server) TargetsHandler(c *gin.Context) { q := c.Request.URL.Query()["collector_id"] @@ -313,7 +722,6 @@ func (s *Server) TargetsHandler(c *gin.Context) { } s.jsonHandler(c.Writer, targets) } - } func (s *Server) errorHandler(w http.ResponseWriter, err error) { @@ -329,6 +737,46 @@ func (s *Server) jsonHandler(w http.ResponseWriter, data interface{}) { } } +// sortedTargetItems returns a sorted list of target items by its hash. +func (s *Server) sortedTargetItems() []*target.Item { + targetItems := make([]*target.Item, 0, len(s.allocator.TargetItems())) + for _, v := range s.allocator.TargetItems() { + targetItems = append(targetItems, v) + } + sort.Slice(targetItems, func(i, j int) bool { + return targetItems[i].Hash() < targetItems[j].Hash() + }) + return targetItems +} + +func (s *Server) getJobCount() int { + jobs := make(map[string]struct{}) + for _, v := range s.allocator.TargetItems() { + jobs[v.JobName] = struct{}{} + } + return len(jobs) +} + +func (s *Server) getJobCountForCollector(collector string) int { + jobs := make(map[string]struct{}) + for _, v := range s.allocator.TargetItems() { + if v.CollectorName == collector { + jobs[v.JobName] = struct{}{} + } + } + return len(jobs) +} + +func (s *Server) getTargetCountForCollector(collector string) int { + count := 0 + for _, v := range s.allocator.TargetItems() { + if v.CollectorName == collector { + count++ + } + } + return count +} + // GetAllTargetsByJob is a relatively expensive call that is usually only used for debugging purposes. func GetAllTargetsByJob(allocator allocation.Allocator, job string) map[string]collectorJSON { displayData := make(map[string]collectorJSON) diff --git a/cmd/otel-allocator/server/server_test.go b/cmd/otel-allocator/server/server_test.go index 4bc403251c..d7fab605e9 100644 --- a/cmd/otel-allocator/server/server_test.go +++ b/cmd/otel-allocator/server/server_test.go @@ -22,6 +22,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "strings" "testing" "time" @@ -48,9 +49,10 @@ var ( testJobLabelSetTwo = labels.Labels{ {Name: "test_label", Value: "test-value2"}, } - baseTargetItem = target.NewItem("test-job", "test-url", baseLabelSet, "test-collector") - secondTargetItem = target.NewItem("test-job", "test-url", baseLabelSet, "test-collector") - testJobTargetItemTwo = target.NewItem("test-job", "test-url2", testJobLabelSetTwo, "test-collector2") + baseTargetItem = target.NewItem("test-job", "test-url", baseLabelSet, "test-collector") + secondTargetItem = target.NewItem("test-job", "test-url", baseLabelSet, "test-collector") + testJobTargetItemTwo = target.NewItem("test-job", "test-url2", testJobLabelSetTwo, "test-collector2") + testJobTwoTargetItemTwo = target.NewItem("test-job2", "test-url3", testJobLabelSetTwo, "test-collector2") ) func TestServer_LivenessProbeHandler(t *testing.T) { @@ -617,6 +619,629 @@ func TestServer_JobHandler(t *testing.T) { }) } } +func TestServer_JobsHandler_HTML(t *testing.T) { + tests := []struct { + description string + targetItems map[string]*target.Item + expectedCode int + expectedJobs string + }{ + { + description: "nil jobs", + targetItems: nil, + expectedCode: http.StatusOK, + expectedJobs: ` + +

Jobs

+ + +
JobTarget Count
+ +`, + }, + { + description: "empty jobs", + targetItems: map[string]*target.Item{}, + expectedCode: http.StatusOK, + expectedJobs: ` + +

Jobs

+ + +
JobTarget Count
+ +`, + }, + { + description: "one job", + targetItems: map[string]*target.Item{ + "targetitem": target.NewItem("job1", "", labels.Labels{}, ""), + }, + expectedCode: http.StatusOK, + expectedJobs: ` + +

Jobs

+ + + +
JobTarget Count
job11
+ +`, + }, + { + description: "multiple jobs", + targetItems: map[string]*target.Item{ + "a": target.NewItem("job1", "1.1.1.1:8080", labels.Labels{}, ""), + "b": target.NewItem("job2", "1.1.1.2:8080", labels.Labels{}, ""), + "c": target.NewItem("job3", "1.1.1.3:8080", labels.Labels{}, ""), + "d": target.NewItem("job3", "1.1.1.4:8080", labels.Labels{}, ""), + "e": target.NewItem("job3", "1.1.1.5:8080", labels.Labels{}, "")}, + expectedCode: http.StatusOK, + expectedJobs: ` + +

Jobs

+ + + + + +
JobTarget Count
job11
job21
job33
+ +`, + }, + } + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + listenAddr := ":8080" + a := &mockAllocator{targetItems: tc.targetItems} + s := NewServer(logger, a, listenAddr) + a.SetCollectors(map[string]*allocation.Collector{ + "test-collector": {Name: "test-collector"}, + "test-collector2": {Name: "test-collector2"}, + }) + request := httptest.NewRequest("GET", "/jobs", nil) + request.Header.Set("Accept", "text/html") + w := httptest.NewRecorder() + + s.server.Handler.ServeHTTP(w, request) + result := w.Result() + + assert.Equal(t, tc.expectedCode, result.StatusCode) + bodyBytes, err := io.ReadAll(result.Body) + require.NoError(t, err) + assert.Equal(t, tc.expectedJobs, string(bodyBytes)) + }) + } +} + +func TestServer_JobHandler_HTML(t *testing.T) { + consistentHashing, _ := allocation.New("consistent-hashing", logger) + type args struct { + job string + cMap map[string]*target.Item + allocator allocation.Allocator + } + type want struct { + items string + errString string + } + tests := []struct { + name string + args args + want want + }{ + { + name: "Empty target map", + args: args{ + job: "test-job", + cMap: map[string]*target.Item{}, + allocator: consistentHashing, + }, + want: want{ + items: ` + +

Job: test-job

+ + + + +
CollectorTarget Count
test-collector0
test-collector20
+ + +
CollectorTarget
+ +`}, + }, + { + name: "Single entry target map", + args: args{ + job: "test-job", + cMap: map[string]*target.Item{ + baseTargetItem.Hash(): baseTargetItem, + }, + allocator: consistentHashing, + }, + want: want{ + items: ` + +

Job: test-job

+ + + + +
CollectorTarget Count
test-collector0
test-collector21
+ + + +
CollectorTarget
test-collector2test-url
+ +`, + }, + }, + { + name: "Multiple entry target map", + args: args{ + job: "test-job", + cMap: map[string]*target.Item{ + baseTargetItem.Hash(): baseTargetItem, + testJobTwoTargetItemTwo.Hash(): testJobTwoTargetItemTwo, + }, + allocator: consistentHashing, + }, + want: want{ + items: ` + +

Job: test-job

+ + + + +
CollectorTarget Count
test-collector0
test-collector21
+ + + +
CollectorTarget
test-collector2test-url
+ +`, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + listenAddr := ":8080" + s := NewServer(logger, tt.args.allocator, listenAddr) + tt.args.allocator.SetCollectors(map[string]*allocation.Collector{ + "test-collector": {Name: "test-collector"}, + "test-collector2": {Name: "test-collector2"}, + }) + tt.args.allocator.SetTargets(tt.args.cMap) + request := httptest.NewRequest("GET", fmt.Sprintf("/job?job_id=%s", tt.args.job), nil) + request.Header.Set("Accept", "text/html") + w := httptest.NewRecorder() + + s.server.Handler.ServeHTTP(w, request) + result := w.Result() + + assert.Equal(t, http.StatusOK, result.StatusCode) + body := result.Body + bodyBytes, err := io.ReadAll(body) + assert.NoError(t, err) + if len(tt.want.errString) != 0 { + assert.EqualError(t, err, tt.want.errString) + return + } + assert.Equal(t, tt.want.items, string(bodyBytes)) + }) + } +} + +func TestServer_IndexHandler(t *testing.T) { + allocator, _ := allocation.New("consistent-hashing", logger) + tests := []struct { + description string + allocator allocation.Allocator + targetItems map[string]*target.Item + expectedHTML string + }{ + { + description: "Empty target map", + targetItems: map[string]*target.Item{}, + allocator: allocator, + expectedHTML: strings.Trim(` + + +

OpenTelemetry Target Allocator

+ + + + +
CategoryCount
Jobs0
Targets0
+ + + + +
CollectorJob CountTarget Count
test-collector100
test-collector200
+ + +`, "\n"), + }, + { + description: "Single entry target map", + targetItems: map[string]*target.Item{ + baseTargetItem.Hash(): baseTargetItem, + }, + allocator: allocator, + expectedHTML: strings.Trim(` + + +

OpenTelemetry Target Allocator

+ + + + +
CategoryCount
Jobs1
Targets1
+ + + + +
CollectorJob CountTarget Count
test-collector111
test-collector200
+ + +`, "\n"), + }, + { + description: "Multiple entry target map", + targetItems: map[string]*target.Item{ + baseTargetItem.Hash(): baseTargetItem, + testJobTargetItemTwo.Hash(): testJobTargetItemTwo, + testJobTwoTargetItemTwo.Hash(): testJobTwoTargetItemTwo, + }, + allocator: allocator, + expectedHTML: strings.Trim(` + + +

OpenTelemetry Target Allocator

+ + + + +
CategoryCount
Jobs2
Targets3
+ + + + +
CollectorJob CountTarget Count
test-collector122
test-collector211
+ + +`, "\n"), + }, + } + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + listenAddr := ":8080" + s := NewServer(logger, tc.allocator, listenAddr) + tc.allocator.SetCollectors(map[string]*allocation.Collector{ + "test-collector1": {Name: "test-collector1"}, + "test-collector2": {Name: "test-collector2"}, + }) + tc.allocator.SetTargets(tc.targetItems) + request := httptest.NewRequest("GET", "/", nil) + request.Header.Set("Accept", "text/html") + w := httptest.NewRecorder() + + s.server.Handler.ServeHTTP(w, request) + result := w.Result() + + assert.Equal(t, http.StatusOK, result.StatusCode) + body := result.Body + bodyBytes, err := io.ReadAll(body) + assert.NoError(t, err) + assert.Equal(t, tc.expectedHTML, string(bodyBytes)) + }) + } +} +func TestServer_TargetsHTMLHandler(t *testing.T) { + allocator, _ := allocation.New("consistent-hashing", logger) + tests := []struct { + description string + allocator allocation.Allocator + targetItems map[string]*target.Item + expectedHTML string + }{ + { + description: "Empty target map", + targetItems: map[string]*target.Item{}, + allocator: allocator, + expectedHTML: ` + +

Targets

+ + +
JobTargetCollectorEndpoint Slice
+ +`, + }, + { + description: "Single entry target map", + targetItems: map[string]*target.Item{ + baseTargetItem.Hash(): baseTargetItem, + }, + allocator: allocator, + expectedHTML: ` + +

Targets

+ + + +
JobTargetCollectorEndpoint Slice
test-jobtest-urltest-collector1
+ +`, + }, + { + description: "Multiple entry target map", + targetItems: map[string]*target.Item{ + baseTargetItem.Hash(): baseTargetItem, + testJobTargetItemTwo.Hash(): testJobTargetItemTwo, + testJobTwoTargetItemTwo.Hash(): testJobTwoTargetItemTwo, + }, + allocator: allocator, + expectedHTML: ` + +

Targets

+ + + + + +
JobTargetCollectorEndpoint Slice
test-job2test-url3test-collector1
test-jobtest-url2test-collector2
test-jobtest-urltest-collector1
+ +`, + }, + } + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + listenAddr := ":8080" + s := NewServer(logger, tc.allocator, listenAddr) + tc.allocator.SetCollectors(map[string]*allocation.Collector{ + "test-collector1": {Name: "test-collector1"}, + "test-collector2": {Name: "test-collector2"}, + }) + tc.allocator.SetTargets(tc.targetItems) + request := httptest.NewRequest("GET", "/targets", nil) + request.Header.Set("Accept", "text/html") + w := httptest.NewRecorder() + + s.server.Handler.ServeHTTP(w, request) + result := w.Result() + + assert.Equal(t, http.StatusOK, result.StatusCode) + body := result.Body + bodyBytes, err := io.ReadAll(body) + assert.NoError(t, err) + assert.Equal(t, tc.expectedHTML, string(bodyBytes)) + }) + } +} + +func TestServer_CollectorHandler(t *testing.T) { + allocator, _ := allocation.New("consistent-hashing", logger) + tests := []struct { + description string + collectorId string + allocator allocation.Allocator + targetItems map[string]*target.Item + expectedCode int + expectedHTML string + }{ + { + description: "Empty target map", + collectorId: "test-collector", + targetItems: map[string]*target.Item{}, + allocator: allocator, + expectedCode: http.StatusOK, + expectedHTML: ` + +

Collector: test-collector

+ + +
JobTargetEndpoint Slice
+ +`, + }, + { + description: "Single entry target map", + collectorId: "test-collector2", + targetItems: map[string]*target.Item{ + baseTargetItem.Hash(): baseTargetItem, + }, + allocator: allocator, + expectedCode: http.StatusOK, + expectedHTML: ` + +

Collector: test-collector2

+ + + +
JobTargetEndpoint Slice
test-jobtest-url
+ +`, + }, + { + description: "Multiple entry target map", + collectorId: "test-collector2", + targetItems: map[string]*target.Item{ + baseTargetItem.Hash(): baseTargetItem, + testJobTwoTargetItemTwo.Hash(): testJobTwoTargetItemTwo, + }, + allocator: allocator, + expectedCode: http.StatusOK, + expectedHTML: ` + +

Collector: test-collector2

+ + + +
JobTargetEndpoint Slice
test-jobtest-url
+ +`, + }, + { + description: "Multiple entry target map, collector id is empty", + collectorId: "", + targetItems: map[string]*target.Item{ + baseTargetItem.Hash(): baseTargetItem, + testJobTwoTargetItemTwo.Hash(): testJobTwoTargetItemTwo, + }, + allocator: allocator, + expectedCode: http.StatusBadRequest, + expectedHTML: ` + +

Bad Request

+

Expected collector_id in the query string

+

Example: /collector?collector_id=my-collector-42

+ +`, + }, + { + description: "Multiple entry target map, unknown collector id", + collectorId: "unknown-collector-1", + targetItems: map[string]*target.Item{ + baseTargetItem.Hash(): baseTargetItem, + testJobTwoTargetItemTwo.Hash(): testJobTwoTargetItemTwo, + }, + allocator: allocator, + expectedCode: http.StatusNotFound, + expectedHTML: ` + +

Unknown Collector: unknown-collector-1

+ +`, + }, + } + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + listenAddr := ":8080" + s := NewServer(logger, tc.allocator, listenAddr) + tc.allocator.SetCollectors(map[string]*allocation.Collector{ + "test-collector": {Name: "test-collector"}, + "test-collector2": {Name: "test-collector2"}, + }) + tc.allocator.SetTargets(tc.targetItems) + request := httptest.NewRequest("GET", "/collector", nil) + request.Header.Set("Accept", "text/html") + request.URL.RawQuery = "collector_id=" + tc.collectorId + w := httptest.NewRecorder() + + s.server.Handler.ServeHTTP(w, request) + result := w.Result() + + assert.Equal(t, tc.expectedCode, result.StatusCode) + body := result.Body + bodyBytes, err := io.ReadAll(body) + assert.NoError(t, err) + assert.Equal(t, tc.expectedHTML, string(bodyBytes)) + }) + } +} + +func TestServer_TargetHTMLHandler(t *testing.T) { + allocator, _ := allocation.New("consistent-hashing", logger) + tests := []struct { + description string + targetHash string + allocator allocation.Allocator + targetItems map[string]*target.Item + expectedCode int + expectedHTML string + }{ + { + description: "Missing target hash", + targetHash: "", + targetItems: map[string]*target.Item{}, + allocator: allocator, + expectedCode: http.StatusBadRequest, + expectedHTML: ` + +

Bad Request

+

Expected target_hash in the query string

+

Example: /target?target_hash=my-target-42

+ +`, + }, + { + description: "Single entry target map", + targetHash: baseTargetItem.Hash(), + targetItems: map[string]*target.Item{ + baseTargetItem.Hash(): baseTargetItem, + }, + allocator: allocator, + expectedCode: http.StatusOK, + expectedHTML: ` + +

Target: test-url

+ + + +
Collectortest-collector2
Jobtest-job
+

Target Labels

+ + + +
LabelValue
test_labeltest-value
+ +`, + }, + { + description: "Multiple entry target map", + targetHash: testJobTwoTargetItemTwo.Hash(), + targetItems: map[string]*target.Item{ + baseTargetItem.Hash(): baseTargetItem, + testJobTwoTargetItemTwo.Hash(): testJobTwoTargetItemTwo, + }, + allocator: allocator, + expectedCode: http.StatusOK, + expectedHTML: ` + +

Target: test-url3

+ + + +
Collectortest-collector
Jobtest-job2
+

Target Labels

+ + + +
LabelValue
test_labeltest-value2
+ +`, + }, + } + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + listenAddr := ":8080" + s := NewServer(logger, tc.allocator, listenAddr) + tc.allocator.SetCollectors(map[string]*allocation.Collector{ + "test-collector": {Name: "test-collector"}, + "test-collector2": {Name: "test-collector2"}, + }) + tc.allocator.SetTargets(tc.targetItems) + request := httptest.NewRequest("GET", "/target", nil) + request.Header.Set("Accept", "text/html") + request.URL.RawQuery = "target_hash=" + tc.targetHash + w := httptest.NewRecorder() + + s.server.Handler.ServeHTTP(w, request) + result := w.Result() + + assert.Equal(t, tc.expectedCode, result.StatusCode) + body := result.Body + bodyBytes, err := io.ReadAll(body) + assert.NoError(t, err) + assert.Equal(t, tc.expectedHTML, string(bodyBytes)) + }) + } +} + func TestServer_Readiness(t *testing.T) { tests := []struct { description string diff --git a/cmd/otel-allocator/target/target.go b/cmd/otel-allocator/target/target.go index 5a157bc11d..9914961cf0 100644 --- a/cmd/otel-allocator/target/target.go +++ b/cmd/otel-allocator/target/target.go @@ -31,9 +31,11 @@ var ( } endpointSliceTargetKindLabel = "__meta_kubernetes_endpointslice_address_target_kind" endpointSliceTargetNameLabel = "__meta_kubernetes_endpointslice_address_target_name" + endpointSliceName = "__meta_kubernetes_endpointslice_name" relevantLabelNames = append(nodeLabels, endpointSliceTargetKindLabel, endpointSliceTargetNameLabel) ) +// Item represents a target to be scraped. type Item struct { JobName string TargetURL string @@ -61,6 +63,12 @@ func (t *Item) GetNodeName() string { return relevantLabels.Get(endpointSliceTargetNameLabel) } +// GetEndpointSliceName returns the name of the EndpointSlice that the target is part of. +// If the target is not part of an EndpointSlice, it returns an empty string. +func (t *Item) GetEndpointSliceName() string { + return t.Labels.Get(endpointSliceName) +} + // NewItem Creates a new target item. // INVARIANTS: // * Item fields must not be modified after creation.