Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PoC implementation of Sum aggregator #2973

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ env:
# Path to where test results will be saved.
TEST_RESULTS: /tmp/test-results
# Default minimum version of Go to support.
DEFAULT_GO_VERSION: 1.17
DEFAULT_GO_VERSION: 1.18
jobs:
lint:
runs-on: ubuntu-latest
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17
//go:build go1.18
// +build go1.18

package metric // import "go.opentelemetry.io/otel/sdk/metric"

Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17
//go:build go1.18
// +build go1.18

package metric

Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/export/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17
//go:build go1.18
// +build go1.18

// TODO: NOTE this is a temporary space, it may be moved following the
// discussion of #2813, or #2841
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17
//go:build go1.18
// +build go1.18

package metric // import "go.opentelemetry.io/otel/sdk/metric"

Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module go.opentelemetry.io/otel/sdk/metric

go 1.17
go 1.18

require (
github.com/go-logr/logr v1.2.3
Expand Down
1 change: 0 additions & 1 deletion sdk/metric/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbV
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/instrumentkind.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17
//go:build go1.18
// +build go1.18

package metric // import "go.opentelemetry.io/otel/sdk/metric"

Expand Down
61 changes: 61 additions & 0 deletions sdk/metric/internal/aggregation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"go.opentelemetry.io/otel/attribute"
)

// Aggregation is a single data point in a timeseries that summarizes
// measurements made during a time span.
type Aggregation struct {
// TODO(#2968): Replace this with the export.Aggregation type once #2961
// is merged.

// Timestamp defines the time the last measurement was made. If zero, no
// measurements were made for this time span. The time is represented as a
// unix timestamp with nanosecond precision.
Timestamp int64

// Attributes are the unique dimensions Value describes.
Attributes *attribute.Set

// Value is the summarization of the measurements made.
Value value
}

type value interface {
private()
}

// SingleValue summarizes a set of measurements as a single value.
type SingleValue[N int64 | float64] struct {
Value N
}

func (SingleValue[N]) private() {}

// HistogramValue summarizes a set of measurements as a histogram.
type HistogramValue struct {
Bounds []float64
Counts []uint64
Sum float64
Min, Max float64
}

func (HistogramValue) private() {}
37 changes: 37 additions & 0 deletions sdk/metric/internal/aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import "go.opentelemetry.io/otel/attribute"

// Aggregator forms an aggregation from a collection of recorded measurements.
// Aggregators are use with Cyclers to collect and produce metrics from
// instrument measurements. Aggregators handle the collection (and
// aggregation) of measurements, while Cyclers handle how those aggregated
// measurements are combined and then produced to the telemetry pipeline.
type Aggregator[N int64 | float64] interface {
// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
Aggregate(measurement N, attr *attribute.Set)

// flush clears aggregations that have been recorded. The Aggregator
// resets itself for a new aggregation period when called, it does not
// carry forward any state. If aggregation periods need to be combined it
// is the callers responsibility to achieve this.
flush() []Aggregation
}
126 changes: 126 additions & 0 deletions sdk/metric/internal/aggregator_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package internal

import (
"context"
"fmt"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
)

type meter struct {
// When a reader initiates a collection, the meter would collect
// aggregations from each of these cyclers. In this process they will
// progress the aggregation period of each instrument's aggregator.
cyclers []Cycler
}

func (m *meter) SyncInt64() syncint64.InstrumentProvider {
// The same would be done for all the other instrument providers.
return (*syncInt64Provider)(m)
}

type syncInt64Provider meter

func (p *syncInt64Provider) Counter(string, ...instrument.Option) (syncint64.Counter, error) {
// This is an example of how a synchronous int64 provider would create an
// aggregator and cycler for a new counter. At this point the provider
// would determine the aggregation and temporality to used based on the
// Reader and View configuration. Assume here these are determined to be a
// cumulative sum.

aggregator := NewSum[int64](NewInt64)
count := inst{agg: aggregator}

cycler := NewCumulativeCylcer(aggregator)
p.cyclers = append(p.cyclers, cycler)

return count, nil
}

func (p *syncInt64Provider) UpDownCounter(string, ...instrument.Option) (syncint64.UpDownCounter, error) {
// This is an example of how a synchronous int64 provider would create an
// aggregator and cycler for a new up-down counter. At this point the
// provider would determine the aggregation and temporality to used based
// on the Reader and View configuration. Assume here these are determined
// to be a delta last-value.

aggregator := NewLastValue[int64]()
upDownCount := inst{agg: aggregator}

cycler := NewDeltaCylcer(aggregator)
p.cyclers = append(p.cyclers, cycler)

return upDownCount, nil
}

func (p *syncInt64Provider) Histogram(string, ...instrument.Option) (syncint64.Histogram, error) {
// This is an example of how a synchronous int64 provider would create an
// aggregator and cycler for a new histogram. At this point the provider
// would determine the aggregation and temporality to used based on the
// Reader and View configuration. Assume here these are determined to be a
// delta explicit-bucket histogram.

aggregator := NewHistogram[int64](aggregation.ExplicitBucketHistogram{
Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000},
NoMinMax: false,
})
hist := inst{agg: aggregator}

cycler := NewDeltaCylcer(aggregator)
p.cyclers = append(p.cyclers, cycler)

return hist, nil
}

// inst is a generalized int64 synchronous counter, up-down counter, and
// histogram used for demonstration purposes only.
type inst struct {
instrument.Synchronous

agg Aggregator[int64]
}

func (inst) Add(context.Context, int64, ...attribute.KeyValue) {}
func (inst) Record(context.Context, int64, ...attribute.KeyValue) {}

func Example() {
m := meter{}
provider := m.SyncInt64()

count, _ := provider.Counter("counter example")
fmt.Printf("counter aggregator: %T\n", count.(inst).agg)

upDownCount, _ := provider.UpDownCounter("up-down counter example")
fmt.Printf("up-down counter aggregator: %T\n", upDownCount.(inst).agg)

hist, _ := provider.Histogram("histogram example")
fmt.Printf("histogram aggregator: %T\n", hist.(inst).agg)

fmt.Printf("meter cyclers: %T{%T, %T, %T}\n", m.cyclers, m.cyclers[0], m.cyclers[1], m.cyclers[2])

// Output:
// counter aggregator: *internal.sumAgg[int64]
// up-down counter aggregator: *internal.lastValueAgg[int64]
// histogram aggregator: *internal.histogramAgg[int64]
// meter cyclers: []internal.Cycler{internal.cumulativeCylcer[int64], internal.deltaCylcer[int64], internal.deltaCylcer[int64]}
}
84 changes: 84 additions & 0 deletions sdk/metric/internal/atomic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"math"
"sync/atomic"
)

// Atomic provides atomic access to a generic value type.
type Atomic[N int64 | float64] interface {
// Store value atomically.
Store(value N)

// Add value atomically.
Add(value N)

// Load returns the stored value.
Load() N
}

type NewAtomicFunc[N int64 | float64] func() Atomic[N]

// Int64 is an int64 implementation of an Atomic.
type Int64 struct {
value *int64
}

var _ Atomic[int64] = Int64{}

func NewInt64() Atomic[int64] {
var v int64
return Int64{value: &v}
}

func (v Int64) Store(value int64) { atomic.StoreInt64(v.value, value) }
func (v Int64) Add(value int64) { atomic.AddInt64(v.value, value) }
func (v Int64) Load() int64 { return atomic.LoadInt64(v.value) }

// Float64 is a float64 implementation of an Atomic.
type Float64 struct {
value *uint64
}

var _ Atomic[float64] = Float64{}

func NewFloat64() Atomic[float64] {
var v float64
u := math.Float64bits(v)
return Float64{value: &u}
}

func (v Float64) Store(value float64) {
atomic.StoreUint64(v.value, math.Float64bits(value))
}

func (v Float64) Add(value float64) {
for {
old := atomic.LoadUint64(v.value)
sum := math.Float64bits(math.Float64frombits(old) + value)
if atomic.CompareAndSwapUint64(v.value, old, sum) {
return
}
}
}

func (v Float64) Load() float64 {
return math.Float64frombits(atomic.LoadUint64(v.value))
}
Loading