Skip to content

Commit

Permalink
Merge pull request #604 from MadVikingGod/mvg/new_sdk/impl-sum
Browse files Browse the repository at this point in the history
Changed to a simpler sum.
  • Loading branch information
MrAlias authored Jul 7, 2022
2 parents 704b166 + 411ce67 commit c7f9013
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 28 deletions.
45 changes: 20 additions & 25 deletions sdk/metric/internal/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,46 +26,41 @@ import (

// sumAgg summarizes a set of measurements as their arithmetic sum.
type sumAgg[N int64 | float64] struct {
// zero value used for the base of all new sums.
newFunc NewAtomicFunc[N]

// map[attribute.Set]Atomic[N]
current sync.Map
mu sync.Mutex
values map[attribute.Set]N
}

// NewSum returns an Aggregator that summarizes a set of measurements as their
// arithmetic sum. The zero value will be used as the start value for all new
// Aggregations.
func NewSum[N int64 | float64](f NewAtomicFunc[N]) Aggregator[N] {
return &sumAgg[N]{newFunc: f}
func NewSum[N int64 | float64]() Aggregator[N] {
return &sumAgg[N]{
values: map[attribute.Set]N{},
}
}

func (s *sumAgg[N]) Aggregate(value N, attr *attribute.Set) {
if v, ok := s.current.Load(*attr); ok {
v.(Atomic[N]).Add(value)
return
}
s.mu.Lock()
defer s.mu.Unlock()

v, _ := s.current.LoadOrStore(*attr, s.newFunc())
v.(Atomic[N]).Add(value)
s.values[*attr] += value
}

func (s *sumAgg[N]) flush() []Aggregation {
s.mu.Lock()
defer s.mu.Unlock()

now := time.Now().UnixNano()
var aggs []Aggregation
s.current.Range(func(key, val any) bool {
attrs := key.(attribute.Set)
aggs := make([]Aggregation, 0, len(s.values))

for attr, value := range s.values {
attr := attr
aggs = append(aggs, Aggregation{
Timestamp: now,
Attributes: &attrs,
Value: SingleValue[N]{Value: val.(Atomic[N]).Load()},
Attributes: &attr,
Value: SingleValue[N]{Value: value},
})

// Reset.
s.current.Delete(key)

return true
})

delete(s.values, attr)
}
return aggs
}
83 changes: 80 additions & 3 deletions sdk/metric/internal/sum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -58,7 +61,8 @@ func testSumAggregation[N int64 | float64](t *testing.T, agg Aggregator[N]) {

extra := make(map[attribute.Set]struct{})
got := make(map[attribute.Set]N)
for _, a := range agg.flush() {
flush := agg.flush()
for _, a := range flush {
got[*a.Attributes] = a.Value.(SingleValue[N]).Value
extra[*a.Attributes] = struct{}{}
}
Expand All @@ -75,5 +79,78 @@ func testSumAggregation[N int64 | float64](t *testing.T, agg Aggregator[N]) {
assert.Lenf(t, extra, 0, "unknown values added: %v", extra)
}

func TestInt64Sum(t *testing.T) { testSumAggregation(t, NewSum[int64](NewInt64)) }
func TestFloat64Sum(t *testing.T) { testSumAggregation(t, NewSum[float64](NewFloat64)) }
func TestInt64Sum(t *testing.T) { testSumAggregation(t, NewSum[int64]()) }
func TestFloat64Sum(t *testing.T) { testSumAggregation(t, NewSum[float64]()) }

func benchmarkSumAggregation[N int64 | float64](b *testing.B, agg Aggregator[N], count int) {
attrs := make([]attribute.Set, count)
for i := range attrs {
attrs[i] = attribute.NewSet(attribute.Int("value", i))
}

b.ResetTimer()

for n := 0; n < b.N; n++ {
for _, attr := range attrs {
agg.Aggregate(1, &attr)
}
agg.flush()
}
}

func BenchmarkInt64Sum(b *testing.B) {
for _, n := range []int{10, 50, 100} {
b.Run(fmt.Sprintf("count-%d", n), func(b *testing.B) {
benchmarkSumAggregation(b, NewSum(NewInt64), n)
})
}
}
func BenchmarkFloat64Sum(b *testing.B) {
for _, n := range []int{10, 50, 100} {
b.Run(fmt.Sprintf("count-%d", n), func(b *testing.B) {
benchmarkSumAggregation(b, NewSum(NewFloat64), n)
})
}
}

var aggsStore []Aggregation

// This isn't a perfect benchmark, because we don't get consistant writes. I would probably remove it for production.
func benchmarkSumAggregationParallel[N int64 | float64](b *testing.B, agg Aggregator[N]) {
attrs := make([]attribute.Set, 100)
for i := range attrs {
attrs[i] = attribute.NewSet(attribute.Int("value", i))
}

ctx, cancel := context.WithCancel(context.Background())
b.Cleanup(cancel)

for i := 0; i < 4; i++ {
go func(i int) {
for {
if ctx.Err() != nil {
return
}
for j := 0; j < 25; j++ {
agg.Aggregate(1, &attrs[i*25+j])
}
}
}(i)
}

agg.flush()
b.ResetTimer()

for n := 0; n < b.N; n++ {
aggsStore = agg.flush()
time.Sleep(time.Microsecond)

}
}

func BenchmarkInt64SumParallel(b *testing.B) {
benchmarkSumAggregationParallel(b, NewSum(NewInt64))
}
func BenchmarkFloat64SumParallel(b *testing.B) {
benchmarkSumAggregationParallel(b, NewSum(NewFloat64))
}

0 comments on commit c7f9013

Please sign in to comment.