From 411ce670ae646f34e68ec68b89120a1383a48c0d Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Mon, 27 Jun 2022 21:24:23 +0000 Subject: [PATCH] Changed to a simpler sum. --- sdk/metric/internal/sum.go | 45 ++++++++---------- sdk/metric/internal/sum_test.go | 83 +++++++++++++++++++++++++++++++-- 2 files changed, 100 insertions(+), 28 deletions(-) diff --git a/sdk/metric/internal/sum.go b/sdk/metric/internal/sum.go index e6ff7bd24c6..37dc79650c8 100644 --- a/sdk/metric/internal/sum.go +++ b/sdk/metric/internal/sum.go @@ -26,46 +26,41 @@ import ( // sumAgg summarizes a set of measurements as their arithmetic sum. type sumAgg[N int64 | float64] struct { - // zero value used for the base of all new sums. - newFunc NewAtomicFunc[N] - - // map[attribute.Set]Atomic[N] - current sync.Map + mu sync.Mutex + values map[attribute.Set]N } // NewSum returns an Aggregator that summarizes a set of measurements as their // arithmetic sum. The zero value will be used as the start value for all new // Aggregations. -func NewSum[N int64 | float64](f NewAtomicFunc[N]) Aggregator[N] { - return &sumAgg[N]{newFunc: f} +func NewSum[N int64 | float64]() Aggregator[N] { + return &sumAgg[N]{ + values: map[attribute.Set]N{}, + } } func (s *sumAgg[N]) Aggregate(value N, attr *attribute.Set) { - if v, ok := s.current.Load(*attr); ok { - v.(Atomic[N]).Add(value) - return - } + s.mu.Lock() + defer s.mu.Unlock() - v, _ := s.current.LoadOrStore(*attr, s.newFunc()) - v.(Atomic[N]).Add(value) + s.values[*attr] += value } func (s *sumAgg[N]) flush() []Aggregation { + s.mu.Lock() + defer s.mu.Unlock() + now := time.Now().UnixNano() - var aggs []Aggregation - s.current.Range(func(key, val any) bool { - attrs := key.(attribute.Set) + aggs := make([]Aggregation, 0, len(s.values)) + + for attr, value := range s.values { + attr := attr aggs = append(aggs, Aggregation{ Timestamp: now, - Attributes: &attrs, - Value: SingleValue[N]{Value: val.(Atomic[N]).Load()}, + Attributes: &attr, + Value: SingleValue[N]{Value: value}, }) - - // Reset. - s.current.Delete(key) - - return true - }) - + delete(s.values, attr) + } return aggs } diff --git a/sdk/metric/internal/sum_test.go b/sdk/metric/internal/sum_test.go index e6973d5bb26..2304285f373 100644 --- a/sdk/metric/internal/sum_test.go +++ b/sdk/metric/internal/sum_test.go @@ -18,8 +18,11 @@ package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import ( + "context" + "fmt" "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -58,7 +61,8 @@ func testSumAggregation[N int64 | float64](t *testing.T, agg Aggregator[N]) { extra := make(map[attribute.Set]struct{}) got := make(map[attribute.Set]N) - for _, a := range agg.flush() { + flush := agg.flush() + for _, a := range flush { got[*a.Attributes] = a.Value.(SingleValue[N]).Value extra[*a.Attributes] = struct{}{} } @@ -75,5 +79,78 @@ func testSumAggregation[N int64 | float64](t *testing.T, agg Aggregator[N]) { assert.Lenf(t, extra, 0, "unknown values added: %v", extra) } -func TestInt64Sum(t *testing.T) { testSumAggregation(t, NewSum[int64](NewInt64)) } -func TestFloat64Sum(t *testing.T) { testSumAggregation(t, NewSum[float64](NewFloat64)) } +func TestInt64Sum(t *testing.T) { testSumAggregation(t, NewSum[int64]()) } +func TestFloat64Sum(t *testing.T) { testSumAggregation(t, NewSum[float64]()) } + +func benchmarkSumAggregation[N int64 | float64](b *testing.B, agg Aggregator[N], count int) { + attrs := make([]attribute.Set, count) + for i := range attrs { + attrs[i] = attribute.NewSet(attribute.Int("value", i)) + } + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + for _, attr := range attrs { + agg.Aggregate(1, &attr) + } + agg.flush() + } +} + +func BenchmarkInt64Sum(b *testing.B) { + for _, n := range []int{10, 50, 100} { + b.Run(fmt.Sprintf("count-%d", n), func(b *testing.B) { + benchmarkSumAggregation(b, NewSum(NewInt64), n) + }) + } +} +func BenchmarkFloat64Sum(b *testing.B) { + for _, n := range []int{10, 50, 100} { + b.Run(fmt.Sprintf("count-%d", n), func(b *testing.B) { + benchmarkSumAggregation(b, NewSum(NewFloat64), n) + }) + } +} + +var aggsStore []Aggregation + +// This isn't a perfect benchmark, because we don't get consistant writes. I would probably remove it for production. +func benchmarkSumAggregationParallel[N int64 | float64](b *testing.B, agg Aggregator[N]) { + attrs := make([]attribute.Set, 100) + for i := range attrs { + attrs[i] = attribute.NewSet(attribute.Int("value", i)) + } + + ctx, cancel := context.WithCancel(context.Background()) + b.Cleanup(cancel) + + for i := 0; i < 4; i++ { + go func(i int) { + for { + if ctx.Err() != nil { + return + } + for j := 0; j < 25; j++ { + agg.Aggregate(1, &attrs[i*25+j]) + } + } + }(i) + } + + agg.flush() + b.ResetTimer() + + for n := 0; n < b.N; n++ { + aggsStore = agg.flush() + time.Sleep(time.Microsecond) + + } +} + +func BenchmarkInt64SumParallel(b *testing.B) { + benchmarkSumAggregationParallel(b, NewSum(NewInt64)) +} +func BenchmarkFloat64SumParallel(b *testing.B) { + benchmarkSumAggregationParallel(b, NewSum(NewFloat64)) +}