diff --git a/cmd/otel-allocator/allocation/consistent_hashing.go b/cmd/otel-allocator/allocation/consistent_hashing.go index 41f31a4e92..f55499f791 100644 --- a/cmd/otel-allocator/allocation/consistent_hashing.go +++ b/cmd/otel-allocator/allocation/consistent_hashing.go @@ -15,8 +15,6 @@ package allocation import ( - "fmt" - "net/url" "sync" "github.com/buraksezer/consistent" @@ -25,6 +23,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" ) var _ Allocator = &consistentHashingAllocator{} @@ -47,7 +46,7 @@ type consistentHashingAllocator struct { collectors map[string]*Collector // targetItems is a map from a target item's hash to the target items allocated state - targetItems map[string]*TargetItem + targetItems map[string]*target.Item log logr.Logger } @@ -63,7 +62,7 @@ func newConsistentHashingAllocator(log logr.Logger) Allocator { return &consistentHashingAllocator{ consistentHasher: consistentHasher, collectors: make(map[string]*Collector), - targetItems: make(map[string]*TargetItem), + targetItems: make(map[string]*target.Item), log: log, } } @@ -72,20 +71,14 @@ func newConsistentHashingAllocator(log logr.Logger) Allocator { // This method is called from within SetTargets and SetCollectors, which acquire the needed lock. // This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap. // INVARIANT: c.collectors must have at least 1 collector set. -func (c *consistentHashingAllocator) addTargetToTargetItems(target *TargetItem) { +func (c *consistentHashingAllocator) addTargetToTargetItems(tg *target.Item) { // Check if this is a reassignment, if so, decrement the previous collector's NumTargets - if previousColName, ok := c.collectors[target.CollectorName]; ok { + if previousColName, ok := c.collectors[tg.CollectorName]; ok { previousColName.NumTargets-- TargetsPerCollector.WithLabelValues(previousColName.String(), consistentHashingStrategyName).Set(float64(c.collectors[previousColName.String()].NumTargets)) } - colOwner := c.consistentHasher.LocateKey([]byte(target.Hash())) - targetItem := &TargetItem{ - JobName: target.JobName, - Link: LinkJSON{Link: fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))}, - TargetURL: target.TargetURL, - Label: target.Label, - CollectorName: colOwner.String(), - } + colOwner := c.consistentHasher.LocateKey([]byte(tg.Hash())) + targetItem := target.NewItem(tg.JobName, tg.TargetURL, tg.Label, colOwner.String()) c.targetItems[targetItem.Hash()] = targetItem c.collectors[colOwner.String()].NumTargets++ TargetsPerCollector.WithLabelValues(colOwner.String(), consistentHashingStrategyName).Set(float64(c.collectors[colOwner.String()].NumTargets)) @@ -94,7 +87,7 @@ func (c *consistentHashingAllocator) addTargetToTargetItems(target *TargetItem) // handleTargets receives the new and removed targets and reconciles the current state. // Any removals are removed from the allocator's targetItems and unassigned from the corresponding collector. // Any net-new additions are assigned to the next available collector. -func (c *consistentHashingAllocator) handleTargets(diff diff.Changes[*TargetItem]) { +func (c *consistentHashingAllocator) handleTargets(diff diff.Changes[*target.Item]) { // Check for removals for k, target := range c.targetItems { // if the current target is in the removals list @@ -143,7 +136,7 @@ func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collect // SetTargets accepts a list of targets that will be used to make // load balancing decisions. This method should be called when there are // new targets discovered or existing targets are shutdown. -func (c *consistentHashingAllocator) SetTargets(targets map[string]*TargetItem) { +func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item) { timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", consistentHashingStrategyName)) defer timer.ObserveDuration() @@ -186,10 +179,10 @@ func (c *consistentHashingAllocator) SetCollectors(collectors map[string]*Collec } // TargetItems returns a shallow copy of the targetItems map. -func (c *consistentHashingAllocator) TargetItems() map[string]*TargetItem { +func (c *consistentHashingAllocator) TargetItems() map[string]*target.Item { c.m.RLock() defer c.m.RUnlock() - targetItemsCopy := make(map[string]*TargetItem) + targetItemsCopy := make(map[string]*target.Item) for k, v := range c.targetItems { targetItemsCopy[k] = v } diff --git a/cmd/otel-allocator/allocation/http.go b/cmd/otel-allocator/allocation/http.go index ee60f31609..4d373b6fdb 100644 --- a/cmd/otel-allocator/allocation/http.go +++ b/cmd/otel-allocator/allocation/http.go @@ -19,11 +19,9 @@ import ( "net/url" "github.com/prometheus/common/model" -) -type LinkJSON struct { - Link string `json:"_link"` -} + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" +) type collectorJSON struct { Link string `json:"_link"` @@ -35,11 +33,11 @@ type targetGroupJSON struct { Labels model.LabelSet `json:"labels"` } -func GetAllTargetsByJob(job string, cMap map[string][]TargetItem, allocator Allocator) map[string]collectorJSON { +func GetAllTargetsByJob(job string, cMap map[string][]target.Item, allocator Allocator) map[string]collectorJSON { displayData := make(map[string]collectorJSON) for _, j := range allocator.TargetItems() { if j.JobName == job { - var targetList []TargetItem + var targetList []target.Item targetList = append(targetList, cMap[j.CollectorName+j.JobName]...) var targetGroupList []targetGroupJSON @@ -58,9 +56,9 @@ func GetAllTargetsByJob(job string, cMap map[string][]TargetItem, allocator Allo return displayData } -func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]TargetItem, allocator Allocator) []targetGroupJSON { +func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]target.Item, allocator Allocator) []targetGroupJSON { var tgs []targetGroupJSON - group := make(map[string]TargetItem) + group := make(map[string]target.Item) labelSet := make(map[string]model.LabelSet) if _, ok := allocator.Collectors()[collector]; ok { for _, targetItemArr := range cMap { diff --git a/cmd/otel-allocator/allocation/http_test.go b/cmd/otel-allocator/allocation/http_test.go index e4e7fb7b61..c3d6967d1d 100644 --- a/cmd/otel-allocator/allocation/http_test.go +++ b/cmd/otel-allocator/allocation/http_test.go @@ -20,6 +20,8 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" ) func TestGetAllTargetsByCollectorAndJob(t *testing.T) { @@ -30,7 +32,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { type args struct { collector string job string - cMap map[string][]TargetItem + cMap map[string][]target.Item allocator Allocator } var tests = []struct { @@ -43,7 +45,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { args: args{ collector: "test-collector", job: "test-job", - cMap: map[string][]TargetItem{}, + cMap: map[string][]target.Item{}, allocator: baseAllocator, }, want: nil, @@ -53,9 +55,9 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { args: args{ collector: "test-collector", job: "test-job", - cMap: map[string][]TargetItem{ + cMap: map[string][]target.Item{ "test-collectortest-job": { - TargetItem{ + target.Item{ JobName: "test-job", Label: model.LabelSet{ "test-label": "test-value", @@ -81,9 +83,9 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { args: args{ collector: "test-collector", job: "test-job", - cMap: map[string][]TargetItem{ + cMap: map[string][]target.Item{ "test-collectortest-job": { - TargetItem{ + target.Item{ JobName: "test-job", Label: model.LabelSet{ "test-label": "test-value", @@ -93,7 +95,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { }, }, "test-collectortest-job2": { - TargetItem{ + target.Item{ JobName: "test-job2", Label: model.LabelSet{ "test-label": "test-value", @@ -119,9 +121,9 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { args: args{ collector: "test-collector", job: "test-job", - cMap: map[string][]TargetItem{ + cMap: map[string][]target.Item{ "test-collectortest-job": { - TargetItem{ + target.Item{ JobName: "test-job", Label: model.LabelSet{ "test-label": "test-value", @@ -132,7 +134,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { }, }, "test-collectortest-job2": { - TargetItem{ + target.Item{ JobName: "test-job", Label: model.LabelSet{ "test-label": "test-value", @@ -165,9 +167,9 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { args: args{ collector: "test-collector", job: "test-job", - cMap: map[string][]TargetItem{ + cMap: map[string][]target.Item{ "test-collectortest-job": { - TargetItem{ + target.Item{ JobName: "test-job", Label: model.LabelSet{ "test-label": "test-value", @@ -176,7 +178,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { TargetURL: "test-url", CollectorName: "test-collector", }, - TargetItem{ + target.Item{ JobName: "test-job", Label: model.LabelSet{ "test-label": "test-value", diff --git a/cmd/otel-allocator/allocation/least_weighted.go b/cmd/otel-allocator/allocation/least_weighted.go index bed1dbd51f..033c7bf52b 100644 --- a/cmd/otel-allocator/allocation/least_weighted.go +++ b/cmd/otel-allocator/allocation/least_weighted.go @@ -15,11 +15,10 @@ package allocation import ( - "fmt" - "net/url" "sync" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" @@ -46,16 +45,16 @@ type leastWeightedAllocator struct { // collectors is a map from a Collector's name to a Collector instance collectors map[string]*Collector // targetItems is a map from a target item's hash to the target items allocated state - targetItems map[string]*TargetItem + targetItems map[string]*target.Item log logr.Logger } // TargetItems returns a shallow copy of the targetItems map. -func (allocator *leastWeightedAllocator) TargetItems() map[string]*TargetItem { +func (allocator *leastWeightedAllocator) TargetItems() map[string]*target.Item { allocator.m.RLock() defer allocator.m.RUnlock() - targetItemsCopy := make(map[string]*TargetItem) + targetItemsCopy := make(map[string]*target.Item) for k, v := range allocator.targetItems { targetItemsCopy[k] = v } @@ -94,15 +93,9 @@ func (allocator *leastWeightedAllocator) findNextCollector() *Collector { // This method is called from within SetTargets and SetCollectors, which acquire the needed lock. // This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap. // INVARIANT: allocator.collectors must have at least 1 collector set. -func (allocator *leastWeightedAllocator) addTargetToTargetItems(target *TargetItem) { +func (allocator *leastWeightedAllocator) addTargetToTargetItems(tg *target.Item) { chosenCollector := allocator.findNextCollector() - targetItem := &TargetItem{ - JobName: target.JobName, - Link: LinkJSON{Link: fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))}, - TargetURL: target.TargetURL, - Label: target.Label, - CollectorName: chosenCollector.Name, - } + targetItem := target.NewItem(tg.JobName, tg.TargetURL, tg.Label, chosenCollector.Name) allocator.targetItems[targetItem.Hash()] = targetItem chosenCollector.NumTargets++ TargetsPerCollector.WithLabelValues(chosenCollector.Name, leastWeightedStrategyName).Set(float64(chosenCollector.NumTargets)) @@ -111,7 +104,7 @@ func (allocator *leastWeightedAllocator) addTargetToTargetItems(target *TargetIt // handleTargets receives the new and removed targets and reconciles the current state. // Any removals are removed from the allocator's targetItems and unassigned from the corresponding collector. // Any net-new additions are assigned to the next available collector. -func (allocator *leastWeightedAllocator) handleTargets(diff diff.Changes[*TargetItem]) { +func (allocator *leastWeightedAllocator) handleTargets(diff diff.Changes[*target.Item]) { // Check for removals for k, target := range allocator.targetItems { // if the current target is in the removals list @@ -160,7 +153,7 @@ func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Col // SetTargets accepts a list of targets that will be used to make // load balancing decisions. This method should be called when there are // new targets discovered or existing targets are shutdown. -func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*TargetItem) { +func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*target.Item) { timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", leastWeightedStrategyName)) defer timer.ObserveDuration() @@ -206,6 +199,6 @@ func newLeastWeightedAllocator(log logr.Logger) Allocator { return &leastWeightedAllocator{ log: log, collectors: make(map[string]*Collector), - targetItems: make(map[string]*TargetItem), + targetItems: make(map[string]*target.Item), } } diff --git a/cmd/otel-allocator/allocation/least_weighted_test.go b/cmd/otel-allocator/allocation/least_weighted_test.go index 3b6209ddd5..2812541966 100644 --- a/cmd/otel-allocator/allocation/least_weighted_test.go +++ b/cmd/otel-allocator/allocation/least_weighted_test.go @@ -24,6 +24,8 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" logf "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" ) var logger = logf.Log.WithName("unit-tests") @@ -35,8 +37,8 @@ func colIndex(index, numCols int) int { return index % numCols } -func makeNNewTargets(n int, numCollectors int, startingIndex int) map[string]*TargetItem { - toReturn := map[string]*TargetItem{} +func makeNNewTargets(n int, numCollectors int, startingIndex int) map[string]*target.Item { + toReturn := map[string]*target.Item{} for i := startingIndex; i < n+startingIndex; i++ { collector := fmt.Sprintf("collector-%d", colIndex(i, numCollectors)) label := model.LabelSet{ @@ -44,7 +46,7 @@ func makeNNewTargets(n int, numCollectors int, startingIndex int) map[string]*Ta "i": model.LabelValue(strconv.Itoa(i)), "total": model.LabelValue(strconv.Itoa(n + startingIndex)), } - newTarget := NewTargetItem(fmt.Sprintf("test-job-%d", i), "test-url", label, collector) + newTarget := target.NewItem(fmt.Sprintf("test-job-%d", i), "test-url", label, collector) toReturn[newTarget.Hash()] = newTarget } return toReturn @@ -124,10 +126,10 @@ func TestAllocationCollision(t *testing.T) { secondLabels := model.LabelSet{ "test": "test2", } - firstTarget := NewTargetItem("sample-name", "0.0.0.0:8000", firstLabels, "") - secondTarget := NewTargetItem("sample-name", "0.0.0.0:8000", secondLabels, "") + firstTarget := target.NewItem("sample-name", "0.0.0.0:8000", firstLabels, "") + secondTarget := target.NewItem("sample-name", "0.0.0.0:8000", secondLabels, "") - targetList := map[string]*TargetItem{ + targetList := map[string]*target.Item{ firstTarget.Hash(): firstTarget, secondTarget.Hash(): secondTarget, } diff --git a/cmd/otel-allocator/allocation/strategy.go b/cmd/otel-allocator/allocation/strategy.go index 2808c32b5d..24d3b9b4bf 100644 --- a/cmd/otel-allocator/allocation/strategy.go +++ b/cmd/otel-allocator/allocation/strategy.go @@ -17,13 +17,13 @@ package allocation import ( "errors" "fmt" - "net/url" "github.com/buraksezer/consistent" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/common/model" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" ) type AllocatorProvider func(log logr.Logger) Allocator @@ -64,33 +64,11 @@ func Register(name string, provider AllocatorProvider) error { type Allocator interface { SetCollectors(collectors map[string]*Collector) - SetTargets(targets map[string]*TargetItem) - TargetItems() map[string]*TargetItem + SetTargets(targets map[string]*target.Item) + TargetItems() map[string]*target.Item Collectors() map[string]*Collector } -type TargetItem struct { - JobName string - Link LinkJSON - TargetURL string - Label model.LabelSet - CollectorName string -} - -func NewTargetItem(jobName string, targetURL string, label model.LabelSet, collectorName string) *TargetItem { - return &TargetItem{ - JobName: jobName, - Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(jobName))}, - TargetURL: targetURL, - Label: label, - CollectorName: collectorName, - } -} - -func (t TargetItem) Hash() string { - return t.JobName + t.TargetURL + t.Label.Fingerprint().String() -} - var _ consistent.Member = Collector{} // Collector Creates a struct that holds Collector information. diff --git a/cmd/otel-allocator/discovery/discovery.go b/cmd/otel-allocator/discovery/discovery.go index 9cdadde887..d8298096a6 100644 --- a/cmd/otel-allocator/discovery/discovery.go +++ b/cmd/otel-allocator/discovery/discovery.go @@ -25,7 +25,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" ) @@ -84,7 +84,7 @@ func (m *Manager) ApplyConfig(source allocatorWatcher.EventSource, cfg *config.C return m.manager.ApplyConfig(discoveryCfg) } -func (m *Manager) Watch(fn func(targets map[string]*allocation.TargetItem)) { +func (m *Manager) Watch(fn func(targets map[string]*target.Item)) { log := m.log.WithValues("component", "opentelemetry-targetallocator") go func() { @@ -94,14 +94,14 @@ func (m *Manager) Watch(fn func(targets map[string]*allocation.TargetItem)) { log.Info("Service Discovery watch event stopped: discovery manager closed") return case tsets := <-m.manager.SyncCh(): - targets := map[string]*allocation.TargetItem{} + targets := map[string]*target.Item{} for jobName, tgs := range tsets { var count float64 = 0 for _, tg := range tgs { for _, t := range tg.Targets { count++ - item := &allocation.TargetItem{ + item := &target.Item{ JobName: jobName, TargetURL: string(t[model.AddressLabel]), Label: t.Merge(tg.Labels), diff --git a/cmd/otel-allocator/discovery/discovery_test.go b/cmd/otel-allocator/discovery/discovery_test.go index afea787176..56fd1aa4e4 100644 --- a/cmd/otel-allocator/discovery/discovery_test.go +++ b/cmd/otel-allocator/discovery/discovery_test.go @@ -28,8 +28,8 @@ import ( "github.com/stretchr/testify/assert" ctrl "sigs.k8s.io/controller-runtime" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" ) @@ -47,7 +47,7 @@ func TestMain(m *testing.M) { manager = NewManager(ctrl.Log.WithName("test"), context.Background(), gokitlog.NewNopLogger()) results = make(chan []string) - manager.Watch(func(targets map[string]*allocation.TargetItem) { + manager.Watch(func(targets map[string]*target.Item) { var result []string for _, t := range targets { result = append(result, t.TargetURL) diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index 85e1ca4c45..1dffa9d3d7 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -39,6 +39,7 @@ import ( "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/collector" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" lbdiscovery "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/discovery" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" ) @@ -229,9 +230,9 @@ func (s *server) ScrapeConfigsHandler(w http.ResponseWriter, r *http.Request) { } func (s *server) JobHandler(w http.ResponseWriter, r *http.Request) { - displayData := make(map[string]allocation.LinkJSON) + displayData := make(map[string]target.LinkJSON) for _, v := range s.allocator.TargetItems() { - displayData[v.JobName] = allocation.LinkJSON{Link: v.Link.Link} + displayData[v.JobName] = target.LinkJSON{Link: v.Link.Link} } s.jsonHandler(w, displayData) } @@ -250,7 +251,7 @@ func (s *server) PrometheusMiddleware(next http.Handler) http.Handler { func (s *server) TargetsHandler(w http.ResponseWriter, r *http.Request) { q := r.URL.Query()["collector_id"] - var compareMap = make(map[string][]allocation.TargetItem) // CollectorName+jobName -> TargetItem + var compareMap = make(map[string][]target.Item) // CollectorName+jobName -> TargetItem for _, v := range s.allocator.TargetItems() { compareMap[v.CollectorName+v.JobName] = append(compareMap[v.CollectorName+v.JobName], *v) } diff --git a/cmd/otel-allocator/target/target.go b/cmd/otel-allocator/target/target.go new file mode 100644 index 0000000000..ab68589fe9 --- /dev/null +++ b/cmd/otel-allocator/target/target.go @@ -0,0 +1,49 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package target + +import ( + "fmt" + "net/url" + + "github.com/prometheus/common/model" +) + +// This package contains common structs and methods that relate to scrape targets. +type LinkJSON struct { + Link string `json:"_link"` +} + +type Item struct { + JobName string + Link LinkJSON + TargetURL string + Label model.LabelSet + CollectorName string +} + +func (t Item) Hash() string { + return t.JobName + t.TargetURL + t.Label.Fingerprint().String() +} + +func NewItem(jobName string, targetURL string, label model.LabelSet, collectorName string) *Item { + return &Item{ + JobName: jobName, + Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(jobName))}, + TargetURL: targetURL, + Label: label, + CollectorName: collectorName, + } +}