diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index da3fcf1b71f..aa2bd8d3fac 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,7 +8,7 @@ env: # Path to where test results will be saved. TEST_RESULTS: /tmp/test-results # Default minimum version of Go to support. - DEFAULT_GO_VERSION: 1.17 + DEFAULT_GO_VERSION: 1.18 jobs: lint: runs-on: ubuntu-latest diff --git a/sdk/metric/config.go b/sdk/metric/config.go index 258526dbe3a..fcf7024ab95 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/config_test.go b/sdk/metric/config_test.go index c04679981ac..12355db63dd 100644 --- a/sdk/metric/config_test.go +++ b/sdk/metric/config_test.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric diff --git a/sdk/metric/export/data.go b/sdk/metric/export/data.go index 750294b2218..9ac934d5390 100644 --- a/sdk/metric/export/data.go +++ b/sdk/metric/export/data.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 // TODO: NOTE this is a temporary space, it may be moved following the // discussion of #2813, or #2841 diff --git a/sdk/metric/exporter.go b/sdk/metric/exporter.go index 52f39292555..d62838c9b15 100644 --- a/sdk/metric/exporter.go +++ b/sdk/metric/exporter.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/go.mod b/sdk/metric/go.mod index 07226ebfd9e..77e8d8111b9 100644 --- a/sdk/metric/go.mod +++ b/sdk/metric/go.mod @@ -1,6 +1,6 @@ module go.opentelemetry.io/otel/sdk/metric -go 1.17 +go 1.18 require ( github.com/go-logr/logr v1.2.3 diff --git a/sdk/metric/go.sum b/sdk/metric/go.sum index ac3360c6fee..2e2aed63d24 100644 --- a/sdk/metric/go.sum +++ b/sdk/metric/go.sum @@ -6,7 +6,6 @@ github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbV github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= -github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/sdk/metric/instrumentkind.go b/sdk/metric/instrumentkind.go index 8174eee5ef3..20d94fcc43a 100644 --- a/sdk/metric/instrumentkind.go +++ b/sdk/metric/instrumentkind.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/internal/aggregation.go b/sdk/metric/internal/aggregation.go new file mode 100644 index 00000000000..23f0743920b --- /dev/null +++ b/sdk/metric/internal/aggregation.go @@ -0,0 +1,61 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" + +import ( + "go.opentelemetry.io/otel/attribute" +) + +// Aggregation is a single data point in a timeseries that summarizes +// measurements made during a time span. +type Aggregation struct { + // TODO(#2968): Replace this with the export.Aggregation type once #2961 + // is merged. + + // Timestamp defines the time the last measurement was made. If zero, no + // measurements were made for this time span. The time is represented as a + // unix timestamp with nanosecond precision. + Timestamp int64 + + // Attributes are the unique dimensions Value describes. + Attributes *attribute.Set + + // Value is the summarization of the measurements made. + Value value +} + +type value interface { + private() +} + +// SingleValue summarizes a set of measurements as a single value. +type SingleValue[N int64 | float64] struct { + Value N +} + +func (SingleValue[N]) private() {} + +// HistogramValue summarizes a set of measurements as a histogram. +type HistogramValue struct { + Bounds []float64 + Counts []uint64 + Sum float64 + Min, Max float64 +} + +func (HistogramValue) private() {} diff --git a/sdk/metric/internal/aggregator.go b/sdk/metric/internal/aggregator.go new file mode 100644 index 00000000000..2f2b11dc83a --- /dev/null +++ b/sdk/metric/internal/aggregator.go @@ -0,0 +1,37 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" + +import "go.opentelemetry.io/otel/attribute" + +// Aggregator forms an aggregation from a collection of recorded measurements. +// Aggregators are use with Cyclers to collect and produce metrics from +// instrument measurements. Aggregators handle the collection (and +// aggregation) of measurements, while Cyclers handle how those aggregated +// measurements are combined and then produced to the telemetry pipeline. +type Aggregator[N int64 | float64] interface { + // Aggregate records the measurement, scoped by attr, and aggregates it + // into an aggregation. + Aggregate(measurement N, attr *attribute.Set) + + // flush clears aggregations that have been recorded. The Aggregator + // resets itself for a new aggregation period when called, it does not + // carry forward any state. If aggregation periods need to be combined it + // is the callers responsibility to achieve this. + flush() []Aggregation +} diff --git a/sdk/metric/internal/aggregator_example_test.go b/sdk/metric/internal/aggregator_example_test.go new file mode 100644 index 00000000000..99e640815d6 --- /dev/null +++ b/sdk/metric/internal/aggregator_example_test.go @@ -0,0 +1,126 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.opentelemetry.io/otel/sdk/metric/aggregation" +) + +type meter struct { + // When a reader initiates a collection, the meter would collect + // aggregations from each of these cyclers. In this process they will + // progress the aggregation period of each instrument's aggregator. + cyclers []Cycler +} + +func (m *meter) SyncInt64() syncint64.InstrumentProvider { + // The same would be done for all the other instrument providers. + return (*syncInt64Provider)(m) +} + +type syncInt64Provider meter + +func (p *syncInt64Provider) Counter(string, ...instrument.Option) (syncint64.Counter, error) { + // This is an example of how a synchronous int64 provider would create an + // aggregator and cycler for a new counter. At this point the provider + // would determine the aggregation and temporality to used based on the + // Reader and View configuration. Assume here these are determined to be a + // cumulative sum. + + aggregator := NewSum[int64](NewInt64) + count := inst{agg: aggregator} + + cycler := NewCumulativeCylcer(aggregator) + p.cyclers = append(p.cyclers, cycler) + + return count, nil +} + +func (p *syncInt64Provider) UpDownCounter(string, ...instrument.Option) (syncint64.UpDownCounter, error) { + // This is an example of how a synchronous int64 provider would create an + // aggregator and cycler for a new up-down counter. At this point the + // provider would determine the aggregation and temporality to used based + // on the Reader and View configuration. Assume here these are determined + // to be a delta last-value. + + aggregator := NewLastValue[int64]() + upDownCount := inst{agg: aggregator} + + cycler := NewDeltaCylcer(aggregator) + p.cyclers = append(p.cyclers, cycler) + + return upDownCount, nil +} + +func (p *syncInt64Provider) Histogram(string, ...instrument.Option) (syncint64.Histogram, error) { + // This is an example of how a synchronous int64 provider would create an + // aggregator and cycler for a new histogram. At this point the provider + // would determine the aggregation and temporality to used based on the + // Reader and View configuration. Assume here these are determined to be a + // delta explicit-bucket histogram. + + aggregator := NewHistogram[int64](aggregation.ExplicitBucketHistogram{ + Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000}, + NoMinMax: false, + }) + hist := inst{agg: aggregator} + + cycler := NewDeltaCylcer(aggregator) + p.cyclers = append(p.cyclers, cycler) + + return hist, nil +} + +// inst is a generalized int64 synchronous counter, up-down counter, and +// histogram used for demonstration purposes only. +type inst struct { + instrument.Synchronous + + agg Aggregator[int64] +} + +func (inst) Add(context.Context, int64, ...attribute.KeyValue) {} +func (inst) Record(context.Context, int64, ...attribute.KeyValue) {} + +func Example() { + m := meter{} + provider := m.SyncInt64() + + count, _ := provider.Counter("counter example") + fmt.Printf("counter aggregator: %T\n", count.(inst).agg) + + upDownCount, _ := provider.UpDownCounter("up-down counter example") + fmt.Printf("up-down counter aggregator: %T\n", upDownCount.(inst).agg) + + hist, _ := provider.Histogram("histogram example") + fmt.Printf("histogram aggregator: %T\n", hist.(inst).agg) + + fmt.Printf("meter cyclers: %T{%T, %T, %T}\n", m.cyclers, m.cyclers[0], m.cyclers[1], m.cyclers[2]) + + // Output: + // counter aggregator: *internal.sumAgg[int64] + // up-down counter aggregator: *internal.lastValueAgg[int64] + // histogram aggregator: *internal.histogramAgg[int64] + // meter cyclers: []internal.Cycler{internal.cumulativeCylcer[int64], internal.deltaCylcer[int64], internal.deltaCylcer[int64]} +} diff --git a/sdk/metric/internal/atomic.go b/sdk/metric/internal/atomic.go new file mode 100644 index 00000000000..f3ce67bd236 --- /dev/null +++ b/sdk/metric/internal/atomic.go @@ -0,0 +1,84 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" + +import ( + "math" + "sync/atomic" +) + +// Atomic provides atomic access to a generic value type. +type Atomic[N int64 | float64] interface { + // Store value atomically. + Store(value N) + + // Add value atomically. + Add(value N) + + // Load returns the stored value. + Load() N +} + +type NewAtomicFunc[N int64 | float64] func() Atomic[N] + +// Int64 is an int64 implementation of an Atomic. +type Int64 struct { + value *int64 +} + +var _ Atomic[int64] = Int64{} + +func NewInt64() Atomic[int64] { + var v int64 + return Int64{value: &v} +} + +func (v Int64) Store(value int64) { atomic.StoreInt64(v.value, value) } +func (v Int64) Add(value int64) { atomic.AddInt64(v.value, value) } +func (v Int64) Load() int64 { return atomic.LoadInt64(v.value) } + +// Float64 is a float64 implementation of an Atomic. +type Float64 struct { + value *uint64 +} + +var _ Atomic[float64] = Float64{} + +func NewFloat64() Atomic[float64] { + var v float64 + u := math.Float64bits(v) + return Float64{value: &u} +} + +func (v Float64) Store(value float64) { + atomic.StoreUint64(v.value, math.Float64bits(value)) +} + +func (v Float64) Add(value float64) { + for { + old := atomic.LoadUint64(v.value) + sum := math.Float64bits(math.Float64frombits(old) + value) + if atomic.CompareAndSwapUint64(v.value, old, sum) { + return + } + } +} + +func (v Float64) Load() float64 { + return math.Float64frombits(atomic.LoadUint64(v.value)) +} diff --git a/sdk/metric/internal/atomic_test.go b/sdk/metric/internal/atomic_test.go new file mode 100644 index 00000000000..3975b7a9fb9 --- /dev/null +++ b/sdk/metric/internal/atomic_test.go @@ -0,0 +1,64 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +const routines = 5 + +func testAtomic[N int64 | float64](t *testing.T, a Atomic[N]) { + n := a.Load() + a.Add(1) + assert.Equal(t, n+1, a.Load()) + + a.Store(n) + assert.Equal(t, n, a.Load()) +} + +func TestInt64(t *testing.T) { + var wg sync.WaitGroup + wg.Add(routines) + for i := int64(0); i < routines; i++ { + go func(n int64) { + defer wg.Done() + a := NewInt64() + a.Store(n) + testAtomic[int64](t, a) + }(i) + } + wg.Wait() +} + +func TestFloat64(t *testing.T) { + var wg sync.WaitGroup + wg.Add(routines) + for i := 0; i < routines; i++ { + go func(n float64) { + defer wg.Done() + a := NewFloat64() + a.Store(n) + testAtomic[float64](t, a) + }(float64(i)) + } + wg.Wait() +} diff --git a/sdk/metric/internal/cycler.go b/sdk/metric/internal/cycler.go new file mode 100644 index 00000000000..e37a855b24c --- /dev/null +++ b/sdk/metric/internal/cycler.go @@ -0,0 +1,69 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" + +// Cycler cycles aggregation periods. It will handle any state progression +// from one period to the next based on the temporality of the cycling. +type Cycler interface { + // Cycle returns an []Aggregation for the current period. If the cycler + // merges state from previous periods into the current, the []Aggregation + // returned reflects this. + Cycle() []Aggregation + + // TODO(#2968): Replace the return type with []export.Aggregation once + // #2961 is merged. +} + +// deltaCylcer cycles aggregation periods by returning the aggregation +// produces from that period only. No state is maintained from one period to +// the next. +type deltaCylcer[N int64 | float64] struct { + aggregator Aggregator[N] +} + +func NewDeltaCylcer[N int64 | float64](a Aggregator[N]) Cycler { + return deltaCylcer[N]{aggregator: a} +} + +func (c deltaCylcer[N]) Cycle() []Aggregation { + return c.aggregator.flush() +} + +// cumulativeCylcer cycles aggregation periods by returning the cumulative +// aggregation from its start time until the current period. +type cumulativeCylcer[N int64 | float64] struct { + // TODO(#2969): implement a cumulative storing field. + aggregator Aggregator[N] +} + +func NewCumulativeCylcer[N int64 | float64](a Aggregator[N]) Cycler { + c := cumulativeCylcer[N]{aggregator: a} + + // TODO(#2969): Initialize a new cumulative storage. + + return c +} + +func (c cumulativeCylcer[N]) Cycle() []Aggregation { + // TODO(#2969): Update cumulative storage of aggregations and return them. + + // FIXME(#2969): currently this returns a delta representation of the + // aggregation. When the cumulative storage is complete it should return a + // cumulative representation. + return c.aggregator.flush() +} diff --git a/sdk/metric/internal/doc.go b/sdk/metric/internal/doc.go new file mode 100644 index 00000000000..e1aa11ab2e1 --- /dev/null +++ b/sdk/metric/internal/doc.go @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package internal provides types and functionality used to aggregate and +// cycle the state of metric measurements made by the SDK. These types and +// functionality are meant only for internal SDK use. +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" diff --git a/sdk/metric/internal/drop.go b/sdk/metric/internal/drop.go new file mode 100644 index 00000000000..23d32676fba --- /dev/null +++ b/sdk/metric/internal/drop.go @@ -0,0 +1,31 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" + +import "go.opentelemetry.io/otel/attribute" + +// dropAgg drops all recorded data and returns an empty Aggregation. +type dropAgg[N int64 | float64] struct{} + +// NewDrop returns an Aggregator that drops all recorded data and returns an +// empty Aggregation. +func NewDrop[N int64 | float64]() Aggregator[N] { return &dropAgg[N]{} } + +func (s *dropAgg[N]) Aggregate(N, *attribute.Set) {} + +func (s *dropAgg[N]) flush() []Aggregation { return nil } diff --git a/sdk/metric/internal/histogram.go b/sdk/metric/internal/histogram.go new file mode 100644 index 00000000000..90d442c8d84 --- /dev/null +++ b/sdk/metric/internal/histogram.go @@ -0,0 +1,48 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" + +import ( + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/aggregation" +) + +// histogramAgg summarizes a set of measurements as an histogram with +// explicitly defined buckets. +type histogramAgg[N int64 | float64] struct { + // TODO(#2970): implement. +} + +// NewHistogram returns an Aggregator that summarizes a set of measurements as +// an histogram. +func NewHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) Aggregator[N] { + return &histogramAgg[N]{} +} + +func (s *histogramAgg[N]) Aggregate(value N, attr *attribute.Set) { + // TODO(#2970): implement. +} + +func (s *histogramAgg[N]) flush() []Aggregation { + // TODO(#2970): implement. + return []Aggregation{ + { + Value: HistogramValue{ /* TODO(#2970): calculate. */ }, + }, + } +} diff --git a/sdk/metric/internal/lastvalue.go b/sdk/metric/internal/lastvalue.go new file mode 100644 index 00000000000..467eeae4c18 --- /dev/null +++ b/sdk/metric/internal/lastvalue.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" + +import "go.opentelemetry.io/otel/attribute" + +// lastValueAgg summarizes a set of measurements as the last one made. +type lastValueAgg[N int64 | float64] struct { + // TODO(#2971): implement. +} + +// NewLastValue returns an Aggregator that summarizes a set of measurements as +// the last one made. +func NewLastValue[N int64 | float64]() Aggregator[N] { + return &lastValueAgg[N]{} +} + +func (s *lastValueAgg[N]) Aggregate(value N, attr *attribute.Set) { + // TODO(#2971): implement. +} + +func (s *lastValueAgg[N]) flush() []Aggregation { + // TODO(#2971): implement. + return []Aggregation{ + { + Value: SingleValue[N]{ /* TODO(#2971): calculate */ }, + }, + } +} diff --git a/sdk/metric/internal/sum.go b/sdk/metric/internal/sum.go new file mode 100644 index 00000000000..37dc79650c8 --- /dev/null +++ b/sdk/metric/internal/sum.go @@ -0,0 +1,66 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" + +import ( + "sync" + "time" + + "go.opentelemetry.io/otel/attribute" +) + +// sumAgg summarizes a set of measurements as their arithmetic sum. +type sumAgg[N int64 | float64] struct { + mu sync.Mutex + values map[attribute.Set]N +} + +// NewSum returns an Aggregator that summarizes a set of measurements as their +// arithmetic sum. The zero value will be used as the start value for all new +// Aggregations. +func NewSum[N int64 | float64]() Aggregator[N] { + return &sumAgg[N]{ + values: map[attribute.Set]N{}, + } +} + +func (s *sumAgg[N]) Aggregate(value N, attr *attribute.Set) { + s.mu.Lock() + defer s.mu.Unlock() + + s.values[*attr] += value +} + +func (s *sumAgg[N]) flush() []Aggregation { + s.mu.Lock() + defer s.mu.Unlock() + + now := time.Now().UnixNano() + aggs := make([]Aggregation, 0, len(s.values)) + + for attr, value := range s.values { + attr := attr + aggs = append(aggs, Aggregation{ + Timestamp: now, + Attributes: &attr, + Value: SingleValue[N]{Value: value}, + }) + delete(s.values, attr) + } + return aggs +} diff --git a/sdk/metric/internal/sum_test.go b/sdk/metric/internal/sum_test.go new file mode 100644 index 00000000000..2304285f373 --- /dev/null +++ b/sdk/metric/internal/sum_test.go @@ -0,0 +1,156 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" +) + +const goroutines = 5 + +func testSumAggregation[N int64 | float64](t *testing.T, agg Aggregator[N]) { + const increments = 30 + additions := map[attribute.Set]N{ + attribute.NewSet( + attribute.String("user", "alice"), attribute.Bool("admin", true), + ): 1, + attribute.NewSet( + attribute.String("user", "bob"), attribute.Bool("admin", false), + ): -1, + attribute.NewSet( + attribute.String("user", "carol"), attribute.Bool("admin", false), + ): 2, + } + + var wg sync.WaitGroup + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + for j := 0; j < increments; j++ { + for attrs, n := range additions { + agg.Aggregate(n, &attrs) + } + } + }() + } + wg.Wait() + + extra := make(map[attribute.Set]struct{}) + got := make(map[attribute.Set]N) + flush := agg.flush() + for _, a := range flush { + got[*a.Attributes] = a.Value.(SingleValue[N]).Value + extra[*a.Attributes] = struct{}{} + } + + for attr, v := range additions { + name := attr.Encoded(attribute.DefaultEncoder()) + t.Run(name, func(t *testing.T) { + require.Contains(t, got, attr) + delete(extra, attr) + assert.Equal(t, v*increments*goroutines, got[attr]) + }) + } + + assert.Lenf(t, extra, 0, "unknown values added: %v", extra) +} + +func TestInt64Sum(t *testing.T) { testSumAggregation(t, NewSum[int64]()) } +func TestFloat64Sum(t *testing.T) { testSumAggregation(t, NewSum[float64]()) } + +func benchmarkSumAggregation[N int64 | float64](b *testing.B, agg Aggregator[N], count int) { + attrs := make([]attribute.Set, count) + for i := range attrs { + attrs[i] = attribute.NewSet(attribute.Int("value", i)) + } + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + for _, attr := range attrs { + agg.Aggregate(1, &attr) + } + agg.flush() + } +} + +func BenchmarkInt64Sum(b *testing.B) { + for _, n := range []int{10, 50, 100} { + b.Run(fmt.Sprintf("count-%d", n), func(b *testing.B) { + benchmarkSumAggregation(b, NewSum(NewInt64), n) + }) + } +} +func BenchmarkFloat64Sum(b *testing.B) { + for _, n := range []int{10, 50, 100} { + b.Run(fmt.Sprintf("count-%d", n), func(b *testing.B) { + benchmarkSumAggregation(b, NewSum(NewFloat64), n) + }) + } +} + +var aggsStore []Aggregation + +// This isn't a perfect benchmark, because we don't get consistant writes. I would probably remove it for production. +func benchmarkSumAggregationParallel[N int64 | float64](b *testing.B, agg Aggregator[N]) { + attrs := make([]attribute.Set, 100) + for i := range attrs { + attrs[i] = attribute.NewSet(attribute.Int("value", i)) + } + + ctx, cancel := context.WithCancel(context.Background()) + b.Cleanup(cancel) + + for i := 0; i < 4; i++ { + go func(i int) { + for { + if ctx.Err() != nil { + return + } + for j := 0; j < 25; j++ { + agg.Aggregate(1, &attrs[i*25+j]) + } + } + }(i) + } + + agg.flush() + b.ResetTimer() + + for n := 0; n < b.N; n++ { + aggsStore = agg.flush() + time.Sleep(time.Microsecond) + + } +} + +func BenchmarkInt64SumParallel(b *testing.B) { + benchmarkSumAggregationParallel(b, NewSum(NewInt64)) +} +func BenchmarkFloat64SumParallel(b *testing.B) { + benchmarkSumAggregationParallel(b, NewSum(NewFloat64)) +} diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 83f0328604b..f7e5d863208 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/manual_reader_test.go b/sdk/metric/manual_reader_test.go index 61b9ec74291..14c5d0765f3 100644 --- a/sdk/metric/manual_reader_test.go +++ b/sdk/metric/manual_reader_test.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric/reader" diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 3b052ae864e..daf892dd1dc 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 3fdfbdcb344..38aa7ae28d9 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 8bc27e0abb9..c0a403c5e90 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index ae5e40f2a20..e33e28a26e7 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/provider.go b/sdk/metric/provider.go index 8276d278f26..98d0faffc22 100644 --- a/sdk/metric/provider.go +++ b/sdk/metric/provider.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/provider_test.go b/sdk/metric/provider_test.go index dca401fe3b8..20eb60590a9 100644 --- a/sdk/metric/provider_test.go +++ b/sdk/metric/provider_test.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index 1e5888ee4c7..76c7e7b6daa 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index 374dc09e909..3a0abbec4db 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric/reader" diff --git a/sdk/metric/temporality.go b/sdk/metric/temporality.go index 289b151606e..fa896748287 100644 --- a/sdk/metric/temporality.go +++ b/sdk/metric/temporality.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/view/instrument.go b/sdk/metric/view/instrument.go index 12e74b05c83..10d30243fa8 100644 --- a/sdk/metric/view/instrument.go +++ b/sdk/metric/view/instrument.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package view // import "go.opentelemetry.io/otel/sdk/metric/view" diff --git a/sdk/metric/view/view.go b/sdk/metric/view/view.go index 98f2e24ca76..35483f59ab1 100644 --- a/sdk/metric/view/view.go +++ b/sdk/metric/view/view.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package view // import "go.opentelemetry.io/otel/sdk/metric/view" diff --git a/sdk/metric/view/view_test.go b/sdk/metric/view/view_test.go index ffd4095fea1..10696ff00a6 100644 --- a/sdk/metric/view/view_test.go +++ b/sdk/metric/view/view_test.go @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build go1.18 +// +build go1.18 + package view import (