From dfefd1e9f0e84955c185ca671e4741bb57403a30 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Fri, 3 Jun 2022 23:27:10 -0700 Subject: [PATCH] fix race condition; add test --- .../sdk/metric/internal/syncstate/sync.go | 4 +- .../metric/internal/syncstate/sync_test.go | 74 ++++++++++++------- 2 files changed, 50 insertions(+), 28 deletions(-) diff --git a/lightstep/sdk/metric/internal/syncstate/sync.go b/lightstep/sdk/metric/internal/syncstate/sync.go index 78777fb7..24be7fe1 100644 --- a/lightstep/sdk/metric/internal/syncstate/sync.go +++ b/lightstep/sdk/metric/internal/syncstate/sync.go @@ -140,10 +140,10 @@ func (rec *record) snapshotAndProcess() bool { if mods == coll { return false } + rec.accumulator.SnapshotAndProcess() + // Updates happened in this interval, collect and continue. atomic.StoreInt64(&rec.collectedCount, mods) - - rec.accumulator.SnapshotAndProcess() return true } diff --git a/lightstep/sdk/metric/internal/syncstate/sync_test.go b/lightstep/sdk/metric/internal/syncstate/sync_test.go index 1c53060c..e6672021 100644 --- a/lightstep/sdk/metric/internal/syncstate/sync_test.go +++ b/lightstep/sdk/metric/internal/syncstate/sync_test.go @@ -49,61 +49,82 @@ var ( } ) -func deltaUpdate(old, new int64) int64 { +func deltaUpdate[N number.Any](old, new N) N { return old + new } -func cumulativeUpdate(_, new int64) int64 { +func cumulativeUpdate[N number.Any](_, new N) N { return new } -func TestSyncStateDeltaConcurrency1(t *testing.T) { - testSyncStateConcurrency(t, 1, aggregation.DeltaTemporality, deltaUpdate) +const testAttr = attribute.Key("key") + +var ( + deltaSelector = view.WithDefaultAggregationTemporalitySelector(func(_ sdkinstrument.Kind) aggregation.Temporality { + return aggregation.DeltaTemporality + }) + + cumulativeSelector = view.WithDefaultAggregationTemporalitySelector(func(_ sdkinstrument.Kind) aggregation.Temporality { + return aggregation.CumulativeTemporality + }) + + keyFilter = view.WithClause( + view.WithKeys([]attribute.Key{}), + ) +) + +func TestSyncStateDeltaConcurrencyInt(t *testing.T) { + testSyncStateConcurrency[int64, number.Int64Traits](t, deltaUpdate[int64], deltaSelector) } -func TestSyncStateDeltaConcurrency2(t *testing.T) { - testSyncStateConcurrency(t, 2, aggregation.DeltaTemporality, deltaUpdate) +func TestSyncStateCumulativeConcurrencyInt(t *testing.T) { + testSyncStateConcurrency[int64, number.Int64Traits](t, cumulativeUpdate[int64], cumulativeSelector) } -func TestSyncStateCumulativeConcurrency1(t *testing.T) { - testSyncStateConcurrency(t, 1, aggregation.CumulativeTemporality, cumulativeUpdate) +func TestSyncStateCumulativeConcurrencyIntFiltered(t *testing.T) { + testSyncStateConcurrency[int64, number.Int64Traits](t, cumulativeUpdate[int64], cumulativeSelector, keyFilter) } -func TestSyncStateCumulativeConcurrency2(t *testing.T) { - testSyncStateConcurrency(t, 2, aggregation.CumulativeTemporality, cumulativeUpdate) +func TestSyncStateDeltaConcurrencyFloat(t *testing.T) { + testSyncStateConcurrency[float64, number.Float64Traits](t, deltaUpdate[float64], deltaSelector) } -// TODO: Add float64, Histogram tests. -func testSyncStateConcurrency(t *testing.T, numReaders int, tempo aggregation.Temporality, update func(old, new int64) int64) { +func TestSyncStateCumulativeConcurrencyFloat(t *testing.T) { + testSyncStateConcurrency[float64, number.Float64Traits](t, cumulativeUpdate[float64], cumulativeSelector) +} + +func TestSyncStateCumulativeConcurrencyFloatFiltered(t *testing.T) { + testSyncStateConcurrency[float64, number.Float64Traits](t, cumulativeUpdate[float64], cumulativeSelector, keyFilter) +} + +func testSyncStateConcurrency[N number.Any, Traits number.Traits[N]](t *testing.T, update func(old, new N) N, vopts ...view.Option) { const ( + numReaders = 2 numRoutines = 10 numAttrs = 10 numUpdates = 1e6 ) + var traits Traits var writers sync.WaitGroup var readers sync.WaitGroup + readers.Add(numReaders) writers.Add(numRoutines) lib := instrumentation.Library{ Name: "testlib", } - vopts := []view.Option{ - view.WithDefaultAggregationTemporalitySelector(func(_ sdkinstrument.Kind) aggregation.Temporality { - return tempo - }), - } vcs := make([]*viewstate.Compiler, numReaders) for vci := range vcs { vcs[vci] = viewstate.New(lib, view.New("test", vopts...)) } attrs := make([]attribute.KeyValue, numAttrs) for i := range attrs { - attrs[i] = attribute.Int("i", i) + attrs[i] = testAttr.Int(i) } - desc := test.Descriptor("tester", sdkinstrument.CounterKind, number.Int64Kind) + desc := test.Descriptor("tester", sdkinstrument.CounterKind, traits.Kind()) pipes := make(pipeline.Register[viewstate.Instrument], numReaders) for vci := range vcs { @@ -113,19 +134,20 @@ func testSyncStateConcurrency(t *testing.T, numReaders int, tempo aggregation.Te inst := NewInstrument(desc, nil, pipes) require.NotNil(t, inst) - cntr := NewCounter[int64, number.Int64Traits](inst) + cntr := NewCounter[N, Traits](inst) require.NotNil(t, cntr) ctx, cancel := context.WithCancel(context.Background()) - partialCounts := make([]map[attribute.Set]int64, numReaders) + partialCounts := make([]map[attribute.Set]N, numReaders) + for vci := range vcs { - partialCounts[vci] = map[attribute.Set]int64{} + partialCounts[vci] = map[attribute.Set]N{} } // Reader loops for vci := range vcs { - go func(vci int, partial map[attribute.Set]int64, vc *viewstate.Compiler) { + go func(vci int, partial map[attribute.Set]N, vc *viewstate.Compiler) { defer readers.Done() // scope will be reused by this reader @@ -146,7 +168,7 @@ func testSyncStateConcurrency(t *testing.T, numReaders int, tempo aggregation.Te vc.Collectors()[0].Collect(seq, &scope.Instruments) for _, pt := range scope.Instruments[0].Points { - partial[pt.Attributes] = update(partial[pt.Attributes], pt.Aggregation.(*sum.MonotonicInt64).Sum().AsInt64()) + partial[pt.Attributes] = update(partial[pt.Attributes], traits.FromNumber(pt.Aggregation.(*sum.State[N, Traits, sum.Monotonic]).Sum())) } } @@ -179,11 +201,11 @@ func testSyncStateConcurrency(t *testing.T, numReaders int, tempo aggregation.Te readers.Wait() for vci := range vcs { - var sum int64 + var sum N for _, count := range partialCounts[vci] { sum += count } - require.Equal(t, int64(numUpdates), sum, "vci==%d", vci) + require.Equal(t, N(numUpdates), sum, "vci==%d", vci) } }