diff --git a/.gitignore b/.gitignore index f7e9c2319b1..d46261d6155 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ .idea .vscode *.test +*.out *.pprof /bin /cmd/tempo-cli/tempo-cli diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ffb6fa6afc..e58871a3e44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## main / unreleased - +* [ENHANCEMENT] The span multiplier now also sources its value from the resource attributes. [#4210](https://github.com/grafana/tempo/pull/4210) +* [FEATURE] Export cost attribution usage metrics from distributor [#4162](https://github.com/grafana/tempo/pull/4162) (@mdisibio) +* [ENHANCEMENT] Changed log level from INFO to DEBUG for the TempoDB Find operation using traceId to reduce excessive/unwanted logs in log search. [#4179](https://github.com/grafana/tempo/pull/4179) (@Aki0x137) +* [ENHANCEMENT] Pushdown collection of results from generators in the querier [#4119](https://github.com/grafana/tempo/pull/4119) (@electron0zero) * [CHANGE] Add throughput and SLO metrics in the tags and tag values endpoints [#4148](https://github.com/grafana/tempo/pull/4148) (@electron0zero) * [CHANGE] tempo-cli: add support for /api/v2/traces endpoint [#4127](https://github.com/grafana/tempo/pull/4127) (@electron0zero) **BREAKING CHANGE** The `tempo-cli` now uses the `/api/v2/traces` endpoint by default, diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index d81d3ec69a6..2780ed31364 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -243,6 +243,10 @@ func (t *App) initDistributor() (services.Service, error) { t.Server.HTTPRouter().Handle("/distributor/ring", distributor.DistributorRing) } + if usageHandler := distributor.UsageTrackerHandler(); usageHandler != nil { + t.Server.HTTPRouter().Handle("/usage_metrics", usageHandler) + } + return t.distributor, nil } diff --git a/docs/sources/tempo/api_docs/_index.md b/docs/sources/tempo/api_docs/_index.md index b89f3154688..4d85b0503ae 100644 --- a/docs/sources/tempo/api_docs/_index.md +++ b/docs/sources/tempo/api_docs/_index.md @@ -38,6 +38,7 @@ For externally supported GRPC API, [see below](#tempo-grpc-api). | Memberlist | Distributor, Ingester, Querier, Compactor | HTTP | `GET /memberlist` | | [Flush](#flush) | Ingester | HTTP | `GET,POST /flush` | | [Shutdown](#shutdown) | Ingester | HTTP | `GET,POST /shutdown` | +| [Usage Metrics](#usage-metrics) | Distributor | HTTP | `GET /usage_metrics` | | [Distributor ring status](#distributor-ring-status) (*) | Distributor | HTTP | `GET /distributor/ring` | | [Ingesters ring status](#ingesters-ring-status) | Distributor, Querier | HTTP | `GET /ingester/ring` | | [Metrics-generator ring status](#metrics-generator-ring-status) (*) | Distributor | HTTP | `GET /metrics-generator/ring` | @@ -684,6 +685,30 @@ ingester service. This is usually used at the time of scaling down a cluster. {{% /admonition %}} +### Usage metrics + +{{< admonition type="note" >}} +This endpoint is only available when one or more usage trackers are enabled in [the distributor]({{< relref "../configuration#distributor" >}}). +{{% /admonition %}} + +``` +GET /usage_metrics +``` + +Special metrics scrape endpoint that provides per-tenant metrics on ingested data. Per-tenant grouping rules are configured in [the per-tenant overrides]({{< relref "../configuration#overrides" >}}) + +Example: +``` +curl http://localhost:3200/usage_metrics +# HELP tempo_usage_tracker_bytes_received_total bytes total received with these attributes +# TYPE tempo_usage_tracker_bytes_received_total counter +tempo_usage_tracker_bytes_received_total{service="auth-service",tenant="single-tenant",tracker="cost-attribution"} 96563 +tempo_usage_tracker_bytes_received_total{service="cache",tenant="single-tenant",tracker="cost-attribution"} 81904 +tempo_usage_tracker_bytes_received_total{service="gateway",tenant="single-tenant",tracker="cost-attribution"} 164751 +tempo_usage_tracker_bytes_received_total{service="identity-service",tenant="single-tenant",tracker="cost-attribution"} 85974 +tempo_usage_tracker_bytes_received_total{service="service-A",tenant="single-tenant",tracker="cost-attribution"} 92799 +``` + ### Distributor ring status {{< admonition type="note" >}} diff --git a/docs/sources/tempo/configuration/_index.md b/docs/sources/tempo/configuration/_index.md index 3ab4252e737..e8141fd68d8 100644 --- a/docs/sources/tempo/configuration/_index.md +++ b/docs/sources/tempo/configuration/_index.md @@ -228,6 +228,19 @@ distributor: # defaults to 0 which means that by default ResourceExhausted is not retried. Set this to a duration such as `1s` to # instruct the client how to retry. [retry_after_on_resource_exhausted: | default = '0' ] + + # Optional. + # Configures usage trackers in the distributor which expose metrics of ingested traffic grouped by configurable + # attributes exposed on /usage_metrics. + usage: + cost_attribution: + # Enables the "cost-attribution" usage tracker. Per-tenant attributes are configured in overrides. + [enabled: | default = false] + # Maximum number of series per tenant. + [max_cardinality: | default = 10000] + # Interval after which a series is considered stale and will be deleted from the registry. + # Once a metrics series is deleted, it won't be emitted anymore, keeping active series low. + [stale_duration: | default = 15m0s] ``` ## Ingester @@ -472,7 +485,7 @@ metrics_generator: [collection_interval: | default = 15s] # Interval after which a series is considered stale and will be deleted from the registry. - # Once a metrics series is deleted it won't be emitted anymore, keeping active series low. + # Once a metrics series is deleted, it won't be emitted anymore, keeping active series low. [stale_duration: | default = 15m] # A list of labels that will be added to all generated metrics. @@ -1719,6 +1732,13 @@ overrides: scope: # scope of the attribute. options: resource, span ] + # Cost attribution usage tracker configuration + cost_attribution: + # List of attributes to group ingested data by. Map value is optional. Can be used to rename and + # combine attributes. + dimensions: + + # Tenant-specific overrides settings configuration file. The empty string (default # value) disables using an overrides file. [per_tenant_override_config: | default = ""] diff --git a/docs/sources/tempo/configuration/manifest.md b/docs/sources/tempo/configuration/manifest.md index fdb0e702a96..ec2f36c6965 100644 --- a/docs/sources/tempo/configuration/manifest.md +++ b/docs/sources/tempo/configuration/manifest.md @@ -18,7 +18,7 @@ go run ./cmd/tempo --storage.trace.backend=local --storage.trace.local.path=/var ## Complete configuration {{< admonition type="note" >}} -This manifest was generated on 2024-10-11. +This manifest was generated on 2024-10-21. {{% /admonition %}} ```yaml @@ -186,6 +186,11 @@ distributor: receivers: {} override_ring_key: distributor forwarders: [] + usage: + cost_attribution: + enabled: false + max_cardinality: 10000 + stale_duration: 15m0s extend_writes: true retry_after_on_resource_exhausted: 0s ingester_client: diff --git a/modules/distributor/config.go b/modules/distributor/config.go index d814247a41a..14bb02fa41b 100644 --- a/modules/distributor/config.go +++ b/modules/distributor/config.go @@ -8,6 +8,7 @@ import ( ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/tempo/modules/distributor/forwarder" + "github.com/grafana/tempo/modules/distributor/usage" "github.com/grafana/tempo/pkg/util" ) @@ -37,8 +38,8 @@ type Config struct { LogReceivedSpans LogSpansConfig `yaml:"log_received_spans,omitempty"` LogDiscardedSpans LogSpansConfig `yaml:"log_discarded_spans,omitempty"` MetricReceivedSpans MetricReceivedSpansConfig `yaml:"metric_received_spans,omitempty"` - - Forwarders forwarder.ConfigList `yaml:"forwarders"` + Forwarders forwarder.ConfigList `yaml:"forwarders"` + Usage usage.Config `yaml:"usage,omitempty"` // disables write extension with inactive ingesters. Use this along with ingester.lifecycler.unregister_on_shutdown = true // note that setting these two config values reduces tolerance to failures on rollout b/c there is always one guaranteed to be failing replica @@ -80,4 +81,6 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) f.BoolVar(&cfg.LogDiscardedSpans.Enabled, util.PrefixConfig(prefix, "log-discarded-spans.enabled"), false, "Enable to log every discarded span to help debug ingestion or calculate span error distributions using the logs.") f.BoolVar(&cfg.LogDiscardedSpans.IncludeAllAttributes, util.PrefixConfig(prefix, "log-discarded-spans.include-attributes"), false, "Enable to include span attributes in the logs.") f.BoolVar(&cfg.LogDiscardedSpans.FilterByStatusError, util.PrefixConfig(prefix, "log-discarded-spans.filter-by-status-error"), false, "Enable to filter out spans without status error.") + + cfg.Usage.RegisterFlagsAndApplyDefaults(prefix, f) } diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index 02b9a511f6e..87e57e271b5 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "fmt" "math" + "net/http" "sync" "time" @@ -28,6 +29,7 @@ import ( "github.com/grafana/tempo/modules/distributor/forwarder" "github.com/grafana/tempo/modules/distributor/receiver" + "github.com/grafana/tempo/modules/distributor/usage" generator_client "github.com/grafana/tempo/modules/generator/client" ingester_client "github.com/grafana/tempo/modules/ingester/client" "github.com/grafana/tempo/modules/overrides" @@ -154,6 +156,8 @@ type Distributor struct { subservices *services.Manager subservicesWatcher *services.FailureWatcher + usage *usage.Tracker + logger log.Logger } @@ -214,6 +218,14 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi logger: logger, } + if cfg.Usage.CostAttribution.Enabled { + usage, err := usage.NewTracker(cfg.Usage.CostAttribution, "cost-attribution", o.CostAttributionDimensions, o.CostAttributionMaxCardinality) + if err != nil { + return nil, fmt.Errorf("creating usage tracker: %w", err) + } + d.usage = usage + } + var generatorsPoolFactory ring_client.PoolAddrFunc = func(addr string) (ring_client.PoolClient, error) { return generator_client.New(addr, generatorClientCfg) } @@ -328,6 +340,7 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te return &tempopb.PushResponse{}, nil } // check limits + // todo - usage tracker include discarded bytes? err = d.checkForRateLimits(size, spanCount, userID) if err != nil { return nil, err @@ -360,6 +373,11 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te statBytesReceived.Inc(int64(size)) statSpansReceived.Inc(int64(spanCount)) + // Usage tracking + if d.usage != nil { + d.usage.Observe(userID, batches) + } + keys, rebatchedTraces, err := requestsByTraceID(batches, userID, spanCount) if err != nil { overrides.RecordDiscardedSpans(spanCount, reasonInternalError, userID) @@ -498,6 +516,14 @@ func (*Distributor) Check(_ context.Context, _ *grpc_health_v1.HealthCheckReques return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil } +func (d *Distributor) UsageTrackerHandler() http.Handler { + if d.usage != nil { + return d.usage.Handler() + } + + return nil +} + // requestsByTraceID takes an incoming tempodb.PushRequest and creates a set of keys for the hash ring // and traces to pass onto the ingesters. func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int) ([]uint32, []*rebatchedTrace, error) { diff --git a/modules/distributor/usage/config.go b/modules/distributor/usage/config.go new file mode 100644 index 00000000000..b6496ecd50d --- /dev/null +++ b/modules/distributor/usage/config.go @@ -0,0 +1,30 @@ +package usage + +import ( + "flag" + "time" +) + +const ( + defaultMaxCardinality = uint64(10000) + defaultStaleDuration = 15 * time.Minute + defaultPurgePeriod = time.Minute +) + +type PerTrackerConfig struct { + Enabled bool `yaml:"enabled,omitempty" json:"enabled,omitempty"` + MaxCardinality uint64 `yaml:"max_cardinality,omitempty" json:"max_cardinality,omitempty"` + StaleDuration time.Duration `yaml:"stale_duration,omitempty" json:"stale_duration,omitempty"` +} + +type Config struct { + CostAttribution PerTrackerConfig `yaml:"cost_attribution,omitempty" json:"cost_attribution,omitempty"` +} + +func (c *Config) RegisterFlagsAndApplyDefaults(_ string, _ *flag.FlagSet) { + c.CostAttribution = PerTrackerConfig{ + Enabled: false, + MaxCardinality: defaultMaxCardinality, + StaleDuration: defaultStaleDuration, + } +} diff --git a/modules/distributor/usage/tracker.go b/modules/distributor/usage/tracker.go new file mode 100644 index 00000000000..27911b4a008 --- /dev/null +++ b/modules/distributor/usage/tracker.go @@ -0,0 +1,427 @@ +package usage + +import ( + "maps" + "math" + "math/bits" + "net/http" + "slices" + "sync" + "time" + + "github.com/cespare/xxhash/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/prometheus/util/strutil" + + v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" +) + +const ( + tenantLabel = "tenant" + trackerLabel = "tracker" + missingLabel = "__missing__" + overflowLabel = "__overflow__" +) + +type ( + tenantLabelsFunc func(string) map[string]string + tenantMaxFunc func(string) uint64 +) + +type bucket struct { + // Configuration + descr *prometheus.Desc // Configuration can change over time so it is captured with the bucket. + labels []string + + // Runtime data + bytes uint64 + lastUpdated int64 +} + +func (b *bucket) Inc(bytes uint64, unix int64) { + b.bytes += bytes + b.lastUpdated = unix +} + +type mapping struct { + from string + to int // Index into the values buffer +} + +type tenantUsage struct { + series map[uint64]*bucket + constLabels prometheus.Labels + + // Buffers for Observe + dimensions map[string]string // Originally configured dimensions + mapping []mapping // Mapping from attribute => final sanitized label. Typically few values and slice is faster than map + sortedKeys []string // So we can always iterate the buffer in order, this can be precomputed up front + buffer1 []string // Batch-level values + buffer2 []string // Span-level values + buffer3 []string // Last hashed values + overflow uint64 +} + +// GetBuffersForDimensions takes advantage of the fact that the configuration for a tracker +// changes slowly. Reuses buffers from the previous call when the dimensions are the same. +func (t *tenantUsage) GetBuffersForDimensions(dimensions map[string]string) ([]mapping, []string, []string, []string) { + if !maps.Equal(dimensions, t.dimensions) { + // The configuration changed. + + // Step 1 + // Gather all configured dimensions and their sanitized output + t.dimensions = dimensions + sanitizedDimensions := make(map[string]string, len(dimensions)) + for k, v := range dimensions { + // Get the final sanitized output label for this + // dimension. Dimensions are key-value pairs with + // optional value. If value is empty string, then + // we use the just the key. Regardless the output + // is always the sanitized version. + // Example: + // service.name="" => "service_name" + // service.name="foo.bar" => "foo_bar" + var sanitized string + if v == "" { + // The dimension is using default mapping + v = k + } + sanitized = strutil.SanitizeFullLabelName(v) + sanitizedDimensions[k] = sanitized + } + + // Step 2 + // Build the final list of sorted/distinct outputs + t.sortedKeys = t.sortedKeys[:0] + for _, v := range sanitizedDimensions { + if !slices.Contains(t.sortedKeys, v) { + t.sortedKeys = append(t.sortedKeys, v) + } + } + slices.Sort(t.sortedKeys) + + // Step 3 + // Prepare the mapping from raw attribute names to the final location of + // where it goes in the output buffers. This avoids another layer of indirection. + t.mapping = t.mapping[:0] + for k := range dimensions { + i := slices.Index(t.sortedKeys, sanitizedDimensions[k]) + t.mapping = append(t.mapping, mapping{ + from: k, + to: i, + }) + } + + // Step 4 + // Prepopulate the buffers and precompute the overflow bucket + t.buffer1 = make([]string, len(t.sortedKeys)) + t.buffer2 = make([]string, len(t.sortedKeys)) + t.buffer3 = make([]string, len(t.sortedKeys)) + for i := range t.sortedKeys { + t.buffer1[i] = overflowLabel + } + t.overflow = hash(t.sortedKeys, t.buffer1) + } + return t.mapping, t.buffer1, t.buffer2, t.buffer3 +} + +// func (t *tenantUsage) getSeries(labels, values []string, maxCardinality uint64) *bucket { +func (t *tenantUsage) getSeries(buffer []string, maxCardinality uint64) *bucket { + h := hash(t.sortedKeys, buffer) + + b := t.series[h] + if b == nil { + // Before creating a new series, check for cardinality limit. + if uint64(len(t.series)) >= maxCardinality { + // Overflow + // This tenant is at the maximum number of series. In this case all data + // goes into the final overflow bucket. It has the same dimensions as the + // current configuration, except every label is overridden to the special overflow value. + for k := range buffer { + buffer[k] = overflowLabel + } + h = t.overflow + b = t.series[h] + } + } + + if b == nil { + // First encounter with this series. Initialize it. + // Detach a copy of the values + v := make([]string, len(buffer)) + copy(v, buffer) + b = &bucket{ + // Metric description - constant for this pass now that the dimensions are known + descr: prometheus.NewDesc("tempo_usage_tracker_bytes_received_total", "bytes total received with these attributes", t.sortedKeys, t.constLabels), + labels: v, + } + t.series[h] = b + } + return b +} + +type Tracker struct { + mtx sync.Mutex + name string + tenants map[string]*tenantUsage + labelsFn tenantLabelsFunc + maxFn tenantMaxFunc + reg *prometheus.Registry + cfg PerTrackerConfig +} + +func NewTracker(cfg PerTrackerConfig, name string, labelsFn tenantLabelsFunc, maxFn tenantMaxFunc) (*Tracker, error) { + u := &Tracker{ + cfg: cfg, + name: name, + tenants: make(map[string]*tenantUsage), + labelsFn: labelsFn, + maxFn: maxFn, + reg: prometheus.NewRegistry(), + } + + err := u.reg.Register(u) + if err != nil { + return nil, err + } + + go u.PurgeRoutine() + + return u, nil +} + +// getTenant must be called under lock. +func (u *Tracker) getTenant(tenant string) *tenantUsage { + data := u.tenants[tenant] + if data == nil { + data = &tenantUsage{ + series: make(map[uint64]*bucket), + constLabels: prometheus.Labels{ + tenantLabel: tenant, + trackerLabel: u.name, + }, + } + u.tenants[tenant] = data + } + return data +} + +func (u *Tracker) Observe(tenant string, batches []*v1.ResourceSpans) { + dimensions := u.labelsFn(tenant) + if len(dimensions) == 0 { + // Not configured + // TODO - Should we put it all in the unattributed bucket instead? + return + } + + max := u.maxFn(tenant) + if max == 0 { + max = u.cfg.MaxCardinality + } + + u.mtx.Lock() + defer u.mtx.Unlock() + + var ( + now = time.Now().Unix() + data = u.getTenant(tenant) + mapping, buffer1, buffer2, last = data.GetBuffersForDimensions(dimensions) + ) + + for _, batch := range batches { + unaccountedForBatchData, totalSpanCount := nonSpanDataLength(batch) + + if totalSpanCount == 0 { + // Mainly to prevent a panic below, but is this even possible? + continue + } + + // This is 1/Nth of the unaccounted for batch data that gets added to each span. + // Adding this incrementally as we go through the spans is the fastest method, but + // loses some precision. The other (original) implementation is to record span counts + // per series into a map and reconcile at the end. That method has more accurate data because + // it performs the floating point math once on the total, instead of accumulating 1/N + 1/N ... errors. + batchPortion := int(math.RoundToEven(float64(unaccountedForBatchData) / float64(totalSpanCount))) + + // To account for the accumulated error we dump the remaining delta onto the first span, which can be negative. + // The result ensures the total recorded bytes matches the input. + firstSpanPortion := unaccountedForBatchData - batchPortion*totalSpanCount + + // Reset value buffer for every batch. + for k := range buffer1 { + buffer1[k] = missingLabel + } + + if batch.Resource != nil { + for _, m := range mapping { + for _, a := range batch.Resource.Attributes { + v := a.Value.GetStringValue() + if v == "" { + continue + } + if a.Key == m.from { + buffer1[m.to] = v + break + } + } + } + } + + var bucket *bucket + + for i, ss := range batch.ScopeSpans { + for j, s := range ss.Spans { + sz := s.Size() + sz += protoLengthMath(sz) + sz += batchPortion // Incrementally add 1/Nth worth of the unaccounted for batch data + if i == 0 && j == 0 { + sz += firstSpanPortion + } + + // Reset to batch values to for some spans having missing values. + copy(buffer2, buffer1) + + for _, m := range mapping { + for _, a := range s.Attributes { + v := a.Value.GetStringValue() + if v == "" { + continue + } + if a.Key == m.from { + buffer2[m.to] = v + break + } + } + } + + // Every span can be a different series. + // If the values buffer hasn't changed then we + // know it's the same bucket and avoid hashing again. + // This shows up in 2 common cases: + // - Dimensions are only resource attributes + // - Runs of spans with the same attributes + // NOTE - Not happy about the slices.Equal to detect when + // to rehash, but couldn't figure out a better way for now. + // The difficulty is tracking bucket dirty status while + // resetting to batch values and recording the span values. + if bucket == nil || !slices.Equal(buffer2, last) { + bucket = data.getSeries(buffer2, max) + copy(last, buffer2) + } + bucket.Inc(uint64(sz), now) + } + } + } +} + +func (u *Tracker) PurgeRoutine() { + purge := time.NewTicker(defaultPurgePeriod) + for range purge.C { + u.purge() + } +} + +func (u *Tracker) purge() { + u.mtx.Lock() + defer u.mtx.Unlock() + + stale := time.Now().Add(-u.cfg.StaleDuration).Unix() + + for t, data := range u.tenants { + for h, s := range data.series { + if s.lastUpdated <= stale { + delete(data.series, h) + } + } + + if len(data.series) == 0 { + // Remove empty tenant + delete(u.tenants, t) + } + } +} + +func (u *Tracker) Handler() http.Handler { + return promhttp.HandlerFor(u.reg, promhttp.HandlerOpts{}) +} + +func (u *Tracker) Describe(chan<- *prometheus.Desc) { + // This runs on startup when registering the tracker. Therefore + // we will have nothing to describe, but it's also not required. +} + +func (u *Tracker) Collect(ch chan<- prometheus.Metric) { + u.mtx.Lock() + defer u.mtx.Unlock() + + for _, t := range u.tenants { + for _, b := range t.series { + ch <- prometheus.MustNewConstMetric(b.descr, prometheus.CounterValue, float64(b.bytes), b.labels...) + } + } +} + +var _ prometheus.Collector = (*Tracker)(nil) + +// hash the given key-value pairs buffers. Buffers must have the +// same lengths +func hash(keys []string, values []string) uint64 { + h := xxhash.New() + + for i := range keys { + _, _ = h.WriteString(keys[i]) + _, _ = h.Write([]byte{255}) + _, _ = h.WriteString(values[i]) + _, _ = h.Write([]byte{255}) + } + + return h.Sum64() +} + +// nonSpanDataLength returns the number of proto bytes in the batch +// that aren't attributable to specific spans. It's complicated but much faster +// to do this because it ensures we only measure each part of the proto once. +// The first (and simpler) approach was to call batch.Size() and then subtract +// each encountered span. But this measures spans twice, which is already the slowest +// part by far. Hopefully isn't too brittle. It must be updated for new fields above the +// span level. Also returns the count of spans while we're here so we don't have to loop again. +func nonSpanDataLength(batch *v1.ResourceSpans) (int, int) { + total := 0 + spans := 0 + + if batch.Resource != nil { + sz := batch.Resource.Size() + total += sz + protoLengthMath(sz) + } + + l := len(batch.SchemaUrl) + if l > 0 { + total += l + protoLengthMath(l) + } + + for _, ss := range batch.ScopeSpans { + // This is the data to store the presence of this ss + total += protoLengthMath(1) + + l = len(ss.SchemaUrl) + if l > 0 { + total += l + protoLengthMath(l) + } + + if ss.Scope != nil { + sz := ss.Scope.Size() + total += sz + protoLengthMath(sz) + } + + spans += len(ss.Spans) + } + + return total, spans +} + +// Bookkeeping data to encode a length in proto. +// Copied from sovTrace in .pb.go +func protoLengthMath(x int) (n int) { + return 1 + (bits.Len64(uint64(x)|1)+6)/7 +} diff --git a/modules/distributor/usage/tracker_test.go b/modules/distributor/usage/tracker_test.go new file mode 100644 index 00000000000..0d95c783f76 --- /dev/null +++ b/modules/distributor/usage/tracker_test.go @@ -0,0 +1,299 @@ +package usage + +import ( + "math" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" + + v1common "github.com/grafana/tempo/pkg/tempopb/common/v1" + v1resource "github.com/grafana/tempo/pkg/tempopb/resource/v1" + v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" + "github.com/grafana/tempo/pkg/util/test" +) + +func testConfig() PerTrackerConfig { + return PerTrackerConfig{ + Enabled: true, + MaxCardinality: defaultMaxCardinality, + StaleDuration: defaultStaleDuration, + } +} + +func TestUsageTracker(t *testing.T) { + type testcase struct { + name string + max int + dimensions map[string]string + expected map[uint64]*bucket + } + + // Reused for all test cases + data := []*v1.ResourceSpans{ + { + Resource: &v1resource.Resource{ + Attributes: []*v1common.KeyValue{ + { + Key: "service.name", + Value: &v1common.AnyValue{Value: &v1common.AnyValue_StringValue{StringValue: "svc"}}, + }, + }, + }, + ScopeSpans: []*v1.ScopeSpans{ + { + Spans: []*v1.Span{ + { + Attributes: []*v1common.KeyValue{ + {Key: "attr", Value: &v1common.AnyValue{Value: &v1common.AnyValue_StringValue{StringValue: "1"}}}, + {Key: "attr2", Value: &v1common.AnyValue{Value: &v1common.AnyValue_StringValue{StringValue: "attr2Value"}}}, + }, + }, + { + Attributes: []*v1common.KeyValue{ + {Key: "attr", Value: &v1common.AnyValue{Value: &v1common.AnyValue_StringValue{StringValue: "1"}}}, + }, + }, + { + Attributes: []*v1common.KeyValue{ + {Key: "attr", Value: &v1common.AnyValue{Value: &v1common.AnyValue_StringValue{StringValue: "2"}}}, + }, + }, + { + Attributes: []*v1common.KeyValue{ + {Key: "attr", Value: &v1common.AnyValue{Value: &v1common.AnyValue_StringValue{StringValue: "1"}}}, + }, + }, + }, + SchemaUrl: "test", + }, + }, + SchemaUrl: "test", + }, + } + nonSpanSize, _ := nonSpanDataLength(data[0]) + + // Helper functions for dividing up data sizes + nonSpanRatio := func(r float64) uint64 { + return uint64(math.RoundToEven(float64(nonSpanSize) * r)) + } + + spanSize := func(i int) uint64 { + sz := data[0].ScopeSpans[0].Spans[i].Size() + sz += protoLengthMath(sz) + return uint64(sz) + } + + var ( + testCases []testcase + name string + dimensions map[string]string + expected map[uint64]*bucket + ) + + // ------------------------------------------------------------- + // Test case 1 - Group by service.name, entire batch is 1 series + // ------------------------------------------------------------- + name = "standard" + dimensions = map[string]string{"service.name": ""} + expected = make(map[uint64]*bucket) + expected[hash([]string{"service_name"}, []string{"svc"})] = &bucket{ + labels: []string{"svc"}, + bytes: uint64(data[0].Size()), // The entire batch is included, with the exact number of bytes + } + testCases = append(testCases, testcase{ + name: name, + dimensions: dimensions, + expected: expected, + }) + + // ------------------------------------------------------------- + // Test case 2 - Group by attr, batch is split 75%/25% + // ------------------------------------------------------------- + name = "splitbatch" + dimensions = map[string]string{"attr": ""} + expected = make(map[uint64]*bucket) + expected[hash([]string{"attr"}, []string{"1"})] = &bucket{ + labels: []string{"1"}, + bytes: nonSpanRatio(0.75) + spanSize(0) + spanSize(1) + spanSize(3), + } + expected[hash([]string{"attr"}, []string{"2"})] = &bucket{ + labels: []string{"2"}, + bytes: nonSpanRatio(0.25) + spanSize(2), + } + testCases = append(testCases, testcase{ + name: name, + dimensions: dimensions, + expected: expected, + }) + + // ------------------------------------------------------------- + // Test case 3 - Missing labels are set to __missing__ + // ------------------------------------------------------------- + name = "missing" + dimensions = map[string]string{"foo": ""} + expected = make(map[uint64]*bucket) + expected[hash([]string{"foo"}, []string{missingLabel})] = &bucket{ + labels: []string{missingLabel}, // No spans have "foo" so it is assigned to the missingvalue + bytes: uint64(data[0].Size()), + } + testCases = append(testCases, testcase{ + name: name, + dimensions: dimensions, + expected: expected, + }) + + // ------------------------------------------------------------- + // Test case 4 - Max cardinality + // ------------------------------------------------------------- + name = "maxcardinality" + dimensions = map[string]string{"attr": ""} + expected = make(map[uint64]*bucket) + expected[hash([]string{"attr"}, []string{"1"})] = &bucket{ + labels: []string{"1"}, + bytes: nonSpanRatio(0.75) + spanSize(0) + spanSize(1) + spanSize(3), // attr=1 is encountered first and recorded, with 75% of spans + } + expected[hash([]string{"attr"}, []string{overflowLabel})] = &bucket{ + labels: []string{overflowLabel}, + bytes: nonSpanRatio(0.25) + spanSize(2), // attr=2 doesn't fit within cardinality and those 25% of spans go into the overflow series. + } + testCases = append(testCases, testcase{ + name: name, + max: 1, + dimensions: dimensions, + expected: expected, + }) + + // ------------------------------------------------------------- + // Test case 5 - Multiple labels with rename + // Multiple dimensions are renamed into the same output label + // ------------------------------------------------------------- + name = "rename" + dimensions = map[string]string{ + "service.name": "foo", + "attr": "foo", + } + expected = make(map[uint64]*bucket) + expected[hash([]string{"foo"}, []string{"1"})] = &bucket{ + labels: []string{"1"}, + bytes: nonSpanRatio(0.75) + spanSize(0) + spanSize(1) + spanSize(3), + } + expected[hash([]string{"foo"}, []string{"2"})] = &bucket{ + labels: []string{"2"}, + bytes: nonSpanRatio(0.25) + spanSize(2), + } + testCases = append(testCases, testcase{ + name: name, + dimensions: dimensions, + expected: expected, + }) + + // ------------------------------------------------------------- + // Test case 6 - Some spans missing value + // Some spans within the same batch are missing values and + // should continue to inherit the batch value + // ------------------------------------------------------------- + name = "partially_missing" + dimensions = map[string]string{ + "attr2": "", + } + expected = make(map[uint64]*bucket) + expected[hash([]string{"attr2"}, []string{"attr2Value"})] = &bucket{ + labels: []string{"attr2Value"}, + bytes: nonSpanRatio(0.25) + spanSize(0), + } + expected[hash([]string{"attr2"}, []string{missingLabel})] = &bucket{ + labels: []string{missingLabel}, + bytes: nonSpanRatio(0.75) + spanSize(1) + spanSize(2) + spanSize(3), + } + testCases = append(testCases, testcase{ + name: name, + dimensions: dimensions, + expected: expected, + }) + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cfg := testConfig() + if tc.max > 0 { + cfg.MaxCardinality = uint64(tc.max) + } + + u, err := NewTracker(cfg, "test", func(_ string) map[string]string { return tc.dimensions }, func(_ string) uint64 { return 0 }) + require.NoError(t, err) + + u.Observe("test", data) + actual := u.tenants["test"].series + + require.Equal(t, len(tc.expected), len(actual)) + + // Ensure total bytes recorded exactly matches the batch + total := 0 + for _, b := range actual { + total += int(b.bytes) + } + require.Equal(t, data[0].Size(), total, "total") + + for expectedHash, expectedBucket := range tc.expected { + require.Equal(t, expectedBucket.labels, actual[expectedHash].labels) + // To make testing less brittle from rounding, just ensure that each series + // is within 1 byte of expected. We already ensured the total is 100% accurate above. + require.InDelta(t, expectedBucket.bytes, actual[expectedHash].bytes, 1.0) + } + }) + } +} + +func BenchmarkUsageTrackerObserve(b *testing.B) { + var ( + tr = test.MakeTrace(10, nil) + dims = map[string]string{"service.name": "service_name"} + // dims = map[string]string{"key": ""} // To benchmark span-level attribute + labelsFn = func(_ string) map[string]string { return dims } // Allocation outside the function to not influence benchmark + maxFn = func(_ string) uint64 { return 0 } + ) + + u, err := NewTracker(testConfig(), "test", labelsFn, maxFn) + require.NoError(b, err) + + for i := 0; i < b.N; i++ { + u.Observe("test", tr.ResourceSpans) + } +} + +func BenchmarkUsageTrackerCollect(b *testing.B) { + var ( + tr = test.MakeTrace(10, nil) + dims = map[string]string{"service.name": ""} + labelsFn = func(_ string) map[string]string { return dims } // Allocation outside the function to not influence benchmark + maxFn = func(_ string) uint64 { return 0 } + req = httptest.NewRequest("", "/", nil) + resp = &NoopHTTPResponseWriter{} + ) + + u, err := NewTracker(testConfig(), "test", labelsFn, maxFn) + require.NoError(b, err) + + u.Observe("test", tr.ResourceSpans) + + handler := u.Handler() + for i := 0; i < b.N; i++ { + handler.ServeHTTP(resp, req) + } +} + +type NoopHTTPResponseWriter struct { + headers map[string][]string +} + +var _ http.ResponseWriter = (*NoopHTTPResponseWriter)(nil) + +func (n *NoopHTTPResponseWriter) Header() http.Header { + if n.headers == nil { + n.headers = make(map[string][]string) + } + return n.headers +} +func (NoopHTTPResponseWriter) Write(buf []byte) (int, error) { return len(buf), nil } +func (NoopHTTPResponseWriter) WriteHeader(_ int) {} diff --git a/modules/overrides/config.go b/modules/overrides/config.go index 88c9cbe615a..860449fecdf 100644 --- a/modules/overrides/config.go +++ b/modules/overrides/config.go @@ -180,6 +180,11 @@ type StorageOverrides struct { DedicatedColumns backend.DedicatedColumns `yaml:"parquet_dedicated_columns" json:"parquet_dedicated_columns"` } +type CostAttributionOverrides struct { + MaxCardinality uint64 `yaml:"max_cardinality,omitempty" json:"max_cardinality,omitempty"` + Dimensions map[string]string `yaml:"dimensions,omitempty" json:"dimensions,omitempty"` +} + type Overrides struct { // Ingestion enforced overrides. Ingestion IngestionOverrides `yaml:"ingestion,omitempty" json:"ingestion,omitempty"` @@ -194,7 +199,8 @@ type Overrides struct { // Global enforced overrides. Global GlobalOverrides `yaml:"global,omitempty" json:"global,omitempty"` // Storage enforced overrides. - Storage StorageOverrides `yaml:"storage,omitempty" json:"storage,omitempty"` + Storage StorageOverrides `yaml:"storage,omitempty" json:"storage,omitempty"` + CostAttribution CostAttributionOverrides `yaml:"cost_attribution,omitempty" json:"cost_attribution,omitempty"` } type Config struct { diff --git a/modules/overrides/config_legacy.go b/modules/overrides/config_legacy.go index 65d0190cdef..526011f486d 100644 --- a/modules/overrides/config_legacy.go +++ b/modules/overrides/config_legacy.go @@ -134,6 +134,8 @@ type LegacyOverrides struct { // is not used when doing a trace by id lookup. MaxBytesPerTrace int `yaml:"max_bytes_per_trace" json:"max_bytes_per_trace"` + CostAttribution CostAttributionOverrides `yaml:"cost_attribution,omitempty" json:"cost_attribution,omitempty"` + // tempodb limits DedicatedColumns backend.DedicatedColumns `yaml:"parquet_dedicated_columns" json:"parquet_dedicated_columns"` } @@ -209,6 +211,9 @@ func (l *LegacyOverrides) toNewLimits() Overrides { Storage: StorageOverrides{ DedicatedColumns: l.DedicatedColumns, }, + CostAttribution: CostAttributionOverrides{ + Dimensions: l.CostAttribution.Dimensions, + }, } } diff --git a/modules/overrides/interface.go b/modules/overrides/interface.go index 9a07352a815..79797c42076 100644 --- a/modules/overrides/interface.go +++ b/modules/overrides/interface.go @@ -76,6 +76,8 @@ type Interface interface { MaxMetricsDuration(userID string) time.Duration DedicatedColumns(userID string) backend.DedicatedColumns UnsafeQueryHints(userID string) bool + CostAttributionMaxCardinality(userID string) uint64 + CostAttributionDimensions(userID string) map[string]string // Management API WriteStatusRuntimeConfig(w io.Writer, r *http.Request) error diff --git a/modules/overrides/runtime_config_overrides.go b/modules/overrides/runtime_config_overrides.go index dd1f4209559..b4fe43bcb9a 100644 --- a/modules/overrides/runtime_config_overrides.go +++ b/modules/overrides/runtime_config_overrides.go @@ -350,6 +350,14 @@ func (o *runtimeConfigOverridesManager) UnsafeQueryHints(userID string) bool { return o.getOverridesForUser(userID).Read.UnsafeQueryHints } +func (o *runtimeConfigOverridesManager) CostAttributionMaxCardinality(userID string) uint64 { + return o.getOverridesForUser(userID).CostAttribution.MaxCardinality +} + +func (o *runtimeConfigOverridesManager) CostAttributionDimensions(userID string) map[string]string { + return o.getOverridesForUser(userID).CostAttribution.Dimensions +} + // MaxSearchDuration is the duration of the max search duration for this tenant. func (o *runtimeConfigOverridesManager) MaxSearchDuration(userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).Read.MaxSearchDuration) diff --git a/modules/overrides/user_configurable_overrides.go b/modules/overrides/user_configurable_overrides.go index 386dc45b74a..d3e42ac6a2d 100644 --- a/modules/overrides/user_configurable_overrides.go +++ b/modules/overrides/user_configurable_overrides.go @@ -223,6 +223,13 @@ func (o *userConfigurableOverridesManager) Forwarders(userID string) []string { return o.Interface.Forwarders(userID) } +func (o *userConfigurableOverridesManager) CostAttributionDimensions(userID string) map[string]string { + if dims, ok := o.getTenantLimits(userID).GetCostAttribution().GetDimensions(); ok { + return dims + } + return o.Interface.CostAttributionDimensions(userID) +} + func (o *userConfigurableOverridesManager) MetricsGeneratorProcessors(userID string) map[string]struct{} { // We merge settings from both layers meaning if a processor is enabled on any layer it will be always enabled (OR logic) processorsUserConfigurable, _ := o.getTenantLimits(userID).GetMetricsGenerator().GetProcessors() diff --git a/modules/overrides/userconfigurable/api/api_test.go b/modules/overrides/userconfigurable/api/api_test.go index 653ceb4b6bd..f312dfb4989 100644 --- a/modules/overrides/userconfigurable/api/api_test.go +++ b/modules/overrides/userconfigurable/api/api_test.go @@ -62,7 +62,7 @@ func Test_UserConfigOverridesAPI_overridesHandlers(t *testing.T) { name: "GET", handler: overridesAPI.GetHandler, req: prepareRequest(tenant, "GET", nil), - expResp: `{"forwarders":["my-other-forwarder"],"metrics_generator":{"processor":{"service_graphs":{},"span_metrics":{}}}}`, + expResp: `{"forwarders":["my-other-forwarder"],"cost_attribution":{},"metrics_generator":{"processor":{"service_graphs":{},"span_metrics":{}}}}`, expContentType: api.HeaderAcceptJSON, expStatusCode: 200, }, @@ -149,7 +149,7 @@ func Test_UserConfigOverridesAPI_patchOverridesHandlers(t *testing.T) { name: "PATCH - no values stored yet", patch: `{"forwarders":["my-other-forwarder"]}`, current: ``, - expResp: `{"forwarders":["my-other-forwarder"],"metrics_generator":{"processor":{"service_graphs":{},"span_metrics":{}}}}`, + expResp: `{"forwarders":["my-other-forwarder"],"cost_attribution":{},"metrics_generator":{"processor":{"service_graphs":{},"span_metrics":{}}}}`, expContentType: api.HeaderAcceptJSON, expStatusCode: 200, }, @@ -157,7 +157,7 @@ func Test_UserConfigOverridesAPI_patchOverridesHandlers(t *testing.T) { name: "PATCH - empty overrides are merged", patch: `{"forwarders":["my-other-forwarder"]}`, current: `{}`, - expResp: `{"forwarders":["my-other-forwarder"],"metrics_generator":{"processor":{"service_graphs":{},"span_metrics":{}}}}`, + expResp: `{"forwarders":["my-other-forwarder"],"cost_attribution":{},"metrics_generator":{"processor":{"service_graphs":{},"span_metrics":{}}}}`, expContentType: api.HeaderAcceptJSON, expStatusCode: 200, }, @@ -165,7 +165,7 @@ func Test_UserConfigOverridesAPI_patchOverridesHandlers(t *testing.T) { name: "PATCH - overwrite", patch: `{"forwarders":["my-other-forwarder"]}`, current: `{"forwarders":["previous-forwarder"]}`, - expResp: `{"forwarders":["my-other-forwarder"],"metrics_generator":{"processor":{"service_graphs":{},"span_metrics":{}}}}`, + expResp: `{"forwarders":["my-other-forwarder"],"cost_attribution":{},"metrics_generator":{"processor":{"service_graphs":{},"span_metrics":{}}}}`, expContentType: api.HeaderAcceptJSON, expStatusCode: 200, }, @@ -247,7 +247,7 @@ func TestUserConfigOverridesAPI_patchOverridesHandler_noVersionConflict(t *testi overridesAPI.PatchHandler(w, r) data := w.Body.String() - assert.Equal(t, `{"forwarders":["f"],"metrics_generator":{"processor":{"service_graphs":{},"span_metrics":{}}}}`, data) + assert.Equal(t, `{"forwarders":["f"],"cost_attribution":{},"metrics_generator":{"processor":{"service_graphs":{},"span_metrics":{}}}}`, data) res := w.Result() assert.Equal(t, "2", res.Header.Get(headerEtag)) diff --git a/modules/overrides/userconfigurable/api/limits_test.go b/modules/overrides/userconfigurable/api/limits_test.go index 60d19a082d7..ddba17403d9 100644 --- a/modules/overrides/userconfigurable/api/limits_test.go +++ b/modules/overrides/userconfigurable/api/limits_test.go @@ -63,6 +63,7 @@ func Test_limitsFromOverrides(t *testing.T) { "forwarders": [ "my-forwarder" ], + "cost_attribution": {}, "metrics_generator": { "processors": [ "service-graphs" diff --git a/modules/overrides/userconfigurable/client/limits.go b/modules/overrides/userconfigurable/client/limits.go index 62cb9afef99..1438b2a95c7 100644 --- a/modules/overrides/userconfigurable/client/limits.go +++ b/modules/overrides/userconfigurable/client/limits.go @@ -8,8 +8,8 @@ import ( ) type Limits struct { - Forwarders *[]string `yaml:"forwarders,omitempty" json:"forwarders,omitempty"` - + Forwarders *[]string `yaml:"forwarders,omitempty" json:"forwarders,omitempty"` + CostAttribution CostAttribution `yaml:"cost_attribution,omitempty" json:"cost_attribution,omitempty"` MetricsGenerator LimitsMetricsGenerator `yaml:"metrics_generator,omitempty" json:"metrics_generator,omitempty"` } @@ -27,6 +27,13 @@ func (l *Limits) GetMetricsGenerator() *LimitsMetricsGenerator { return nil } +func (l *Limits) GetCostAttribution() *CostAttribution { + if l != nil { + return &l.CostAttribution + } + return nil +} + type LimitsMetricsGenerator struct { Processors listtomap.ListToMap `yaml:"processors,omitempty" json:"processors,omitempty"` DisableCollection *bool `yaml:"disable_collection,omitempty" json:"disable_collection,omitempty"` @@ -168,3 +175,14 @@ func (l *LimitsMetricsGeneratorProcessorSpanMetrics) GetTargetInfoExcludedDimens } return nil, false } + +type CostAttribution struct { + Dimensions *map[string]string `yaml:"dimensions,omitempty" json:"dimensions,omitempty"` +} + +func (l *CostAttribution) GetDimensions() (map[string]string, bool) { + if l != nil && l.Dimensions != nil { + return *l.Dimensions, true + } + return nil, false +}