Skip to content

Commit

Permalink
A proper gauge
Browse files Browse the repository at this point in the history
  • Loading branch information
olegbespalov committed Jun 10, 2024
1 parent 02e1c0d commit d0f4424
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 34 deletions.
9 changes: 5 additions & 4 deletions pkg/opentelemetry/tags.go → pkg/opentelemetry/attribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"go.opentelemetry.io/otel/attribute"
)

// MapTagSet converts a k6 tag set into
// newAttributeSet converts a k6 tag set into
// the equivalent set of opentelemetry attributes
func MapTagSet(t *metrics.TagSet) []attribute.KeyValue {
func newAttributeSet(t *metrics.TagSet) attribute.Set {
n := (*atlas.Node)(t)
if n.Len() < 1 {
return nil
return *attribute.EmptySet()
}
labels := make([]attribute.KeyValue, 0, n.Len())
for !n.IsRoot() {
Expand All @@ -22,5 +22,6 @@ func MapTagSet(t *metrics.TagSet) []attribute.KeyValue {
}
labels = append(labels, attribute.String(key, value))
}
return labels

return attribute.NewSet(labels...)
}
67 changes: 67 additions & 0 deletions pkg/opentelemetry/gauge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package opentelemetry

import (
"context"
"errors"
"fmt"
"sync"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

// NewFloat64Gauge returns a new Float64Gauge.
func NewFloat64Gauge() *Float64Gauge {
return &Float64Gauge{}
}

// Float64Gauge is a temporary implementation of the OpenTelemetry sync gauge
// it will be replaced by the official implementation once it's available.
//
// https://github.com/open-telemetry/opentelemetry-go/issues/3984
type Float64Gauge struct {
observations sync.Map
}

// Callback implements the callback function for the underlying asynchronous gauge
// it observes the current state of all previous Set() calls.
func (f *Float64Gauge) Callback(_ context.Context, o metric.Float64Observer) error {
var err error

f.observations.Range(func(key, value interface{}) bool {
var v float64

// TODO: improve type assertion
switch val := value.(type) {
case float64:
v = val
case int64:
v = float64(val)
default:
err = errors.New("unexpected type for value " + fmt.Sprintf("%T", val))
return false
}

attrs, ok := key.(attribute.Set)
if !ok {
err = errors.New("unexpected type for key")
return false
}

o.Observe(v, metric.WithAttributeSet(attrs))

return true
})

return err
}

// Set sets the value of the gauge.
func (f *Float64Gauge) Set(val float64, attrs attribute.Set) {
f.observations.Store(attrs, val)
}

// Delete deletes the gauge.
func (f *Float64Gauge) Delete(attrs attribute.Set) {
f.observations.Delete(attrs)
}
15 changes: 9 additions & 6 deletions pkg/opentelemetry/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,38 +145,41 @@ func (o *Output) dispatch(entry metrics.Sample) error {
ctx := context.Background()
name := normalizeMetricName(o.config, entry.Metric.Name)

attributeSet := newAttributeSet(entry.Tags)
attributeSetOpt := otelMetric.WithAttributeSet(attributeSet)

switch entry.Metric.Type {
case metrics.Counter:
counter, err := o.metricsRegistry.getOrCreateCounter(name)
if err != nil {
return err
}

counter.Add(ctx, entry.Value, otelMetric.WithAttributes(MapTagSet(entry.Tags)...))
counter.Add(ctx, entry.Value, attributeSetOpt)
case metrics.Gauge:
gauge, err := o.metricsRegistry.getOrCreateUpDownCounter(name)
gauge, err := o.metricsRegistry.getOrCreateGauge(name)
if err != nil {
return err
}

gauge.Add(ctx, entry.Value, otelMetric.WithAttributes(MapTagSet(entry.Tags)...))
gauge.Set(entry.Value, attributeSet)
case metrics.Trend:
trend, err := o.metricsRegistry.getOrCreateHistogram(name)
if err != nil {
return err
}

trend.Record(ctx, entry.Value, otelMetric.WithAttributes(MapTagSet(entry.Tags)...))
trend.Record(ctx, entry.Value, attributeSetOpt)
case metrics.Rate:
nonZero, total, err := o.metricsRegistry.getOrCreateCountersForRate(name)
if err != nil {
return err
}

if entry.Value != 0 {
nonZero.Add(ctx, 1, otelMetric.WithAttributes(MapTagSet(entry.Tags)...))
nonZero.Add(ctx, 1, attributeSetOpt)
}
total.Add(ctx, 1, otelMetric.WithAttributes(MapTagSet(entry.Tags)...))
total.Add(ctx, 1, attributeSetOpt)
default:
o.logger.Warnf("metric %q has unsupported metric type", entry.Metric.Name)
}
Expand Down
49 changes: 25 additions & 24 deletions pkg/opentelemetry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ type registry struct {
meter otelMetric.Meter
logger logrus.FieldLogger

counters sync.Map
upDownCounters sync.Map
histograms sync.Map
rateCounters sync.Map
counters sync.Map
gauges sync.Map
histograms sync.Map
rateCounters sync.Map
}

// newRegistry creates a new registry.
Expand Down Expand Up @@ -67,26 +67,6 @@ func (r *registry) getOrCreateHistogram(name string) (otelMetric.Float64Histogra
return h, nil
}

func (r *registry) getOrCreateUpDownCounter(name string) (otelMetric.Float64UpDownCounter, error) {
if counter, ok := r.upDownCounters.Load(name); ok {
if v, ok := counter.(otelMetric.Float64UpDownCounter); ok {
return v, nil
}

return nil, fmt.Errorf("metric %q is not an up/down counter", name)
}

c, err := r.meter.Float64UpDownCounter(name)
if err != nil {
return nil, fmt.Errorf("failed to create up/down counter for %q: %w", name, err)
}

r.logger.Debugf("registered up/down counter (gauge) metric %q ", name)

r.upDownCounters.Store(name, c)
return c, nil
}

func (r *registry) getOrCreateCountersForRate(name string) (otelMetric.Int64Counter, otelMetric.Int64Counter, error) {
// k6's rate metric tracks how frequently a non-zero value occurs.
// so to correctly calculate the rate in a metrics backend
Expand Down Expand Up @@ -134,3 +114,24 @@ func (r *registry) getOrCreateCountersForRate(name string) (otelMetric.Int64Coun

return nonZeroCounter, totalCounter, nil
}

func (r *registry) getOrCreateGauge(name string) (*Float64Gauge, error) {
if gauge, ok := r.gauges.Load(name); ok {
if v, ok := gauge.(*Float64Gauge); ok {
return v, nil
}

return nil, fmt.Errorf("metric %q is not a gauge", name)
}

g := NewFloat64Gauge()
_, err := r.meter.Float64ObservableGauge(name, otelMetric.WithFloat64Callback(g.Callback))
if err != nil {
return nil, fmt.Errorf("failed to create gauge for %q: %w", name, err)
}

r.logger.Debugf("registered gauge metric %q ", name)

r.gauges.Store(name, g)
return g, nil
}

0 comments on commit d0f4424

Please sign in to comment.