Skip to content

Commit

Permalink
fix race condition; add test
Browse files Browse the repository at this point in the history
  • Loading branch information
Joshua MacDonald committed Jun 4, 2022
1 parent 9c70ced commit dfefd1e
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 28 deletions.
4 changes: 2 additions & 2 deletions lightstep/sdk/metric/internal/syncstate/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,10 @@ func (rec *record) snapshotAndProcess() bool {
if mods == coll {
return false
}
rec.accumulator.SnapshotAndProcess()

// Updates happened in this interval, collect and continue.
atomic.StoreInt64(&rec.collectedCount, mods)

rec.accumulator.SnapshotAndProcess()
return true
}

Expand Down
74 changes: 48 additions & 26 deletions lightstep/sdk/metric/internal/syncstate/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,61 +49,82 @@ var (
}
)

func deltaUpdate(old, new int64) int64 {
func deltaUpdate[N number.Any](old, new N) N {
return old + new
}

func cumulativeUpdate(_, new int64) int64 {
func cumulativeUpdate[N number.Any](_, new N) N {
return new
}

func TestSyncStateDeltaConcurrency1(t *testing.T) {
testSyncStateConcurrency(t, 1, aggregation.DeltaTemporality, deltaUpdate)
const testAttr = attribute.Key("key")

var (
deltaSelector = view.WithDefaultAggregationTemporalitySelector(func(_ sdkinstrument.Kind) aggregation.Temporality {
return aggregation.DeltaTemporality
})

cumulativeSelector = view.WithDefaultAggregationTemporalitySelector(func(_ sdkinstrument.Kind) aggregation.Temporality {
return aggregation.CumulativeTemporality
})

keyFilter = view.WithClause(
view.WithKeys([]attribute.Key{}),
)
)

func TestSyncStateDeltaConcurrencyInt(t *testing.T) {
testSyncStateConcurrency[int64, number.Int64Traits](t, deltaUpdate[int64], deltaSelector)
}

func TestSyncStateDeltaConcurrency2(t *testing.T) {
testSyncStateConcurrency(t, 2, aggregation.DeltaTemporality, deltaUpdate)
func TestSyncStateCumulativeConcurrencyInt(t *testing.T) {
testSyncStateConcurrency[int64, number.Int64Traits](t, cumulativeUpdate[int64], cumulativeSelector)
}

func TestSyncStateCumulativeConcurrency1(t *testing.T) {
testSyncStateConcurrency(t, 1, aggregation.CumulativeTemporality, cumulativeUpdate)
func TestSyncStateCumulativeConcurrencyIntFiltered(t *testing.T) {
testSyncStateConcurrency[int64, number.Int64Traits](t, cumulativeUpdate[int64], cumulativeSelector, keyFilter)
}

func TestSyncStateCumulativeConcurrency2(t *testing.T) {
testSyncStateConcurrency(t, 2, aggregation.CumulativeTemporality, cumulativeUpdate)
func TestSyncStateDeltaConcurrencyFloat(t *testing.T) {
testSyncStateConcurrency[float64, number.Float64Traits](t, deltaUpdate[float64], deltaSelector)
}

// TODO: Add float64, Histogram tests.
func testSyncStateConcurrency(t *testing.T, numReaders int, tempo aggregation.Temporality, update func(old, new int64) int64) {
func TestSyncStateCumulativeConcurrencyFloat(t *testing.T) {
testSyncStateConcurrency[float64, number.Float64Traits](t, cumulativeUpdate[float64], cumulativeSelector)
}

func TestSyncStateCumulativeConcurrencyFloatFiltered(t *testing.T) {
testSyncStateConcurrency[float64, number.Float64Traits](t, cumulativeUpdate[float64], cumulativeSelector, keyFilter)
}

func testSyncStateConcurrency[N number.Any, Traits number.Traits[N]](t *testing.T, update func(old, new N) N, vopts ...view.Option) {
const (
numReaders = 2
numRoutines = 10
numAttrs = 10
numUpdates = 1e6
)

var traits Traits
var writers sync.WaitGroup
var readers sync.WaitGroup

readers.Add(numReaders)
writers.Add(numRoutines)

lib := instrumentation.Library{
Name: "testlib",
}
vopts := []view.Option{
view.WithDefaultAggregationTemporalitySelector(func(_ sdkinstrument.Kind) aggregation.Temporality {
return tempo
}),
}
vcs := make([]*viewstate.Compiler, numReaders)
for vci := range vcs {
vcs[vci] = viewstate.New(lib, view.New("test", vopts...))
}
attrs := make([]attribute.KeyValue, numAttrs)
for i := range attrs {
attrs[i] = attribute.Int("i", i)
attrs[i] = testAttr.Int(i)
}

desc := test.Descriptor("tester", sdkinstrument.CounterKind, number.Int64Kind)
desc := test.Descriptor("tester", sdkinstrument.CounterKind, traits.Kind())

pipes := make(pipeline.Register[viewstate.Instrument], numReaders)
for vci := range vcs {
Expand All @@ -113,19 +134,20 @@ func testSyncStateConcurrency(t *testing.T, numReaders int, tempo aggregation.Te
inst := NewInstrument(desc, nil, pipes)
require.NotNil(t, inst)

cntr := NewCounter[int64, number.Int64Traits](inst)
cntr := NewCounter[N, Traits](inst)
require.NotNil(t, cntr)

ctx, cancel := context.WithCancel(context.Background())

partialCounts := make([]map[attribute.Set]int64, numReaders)
partialCounts := make([]map[attribute.Set]N, numReaders)

for vci := range vcs {
partialCounts[vci] = map[attribute.Set]int64{}
partialCounts[vci] = map[attribute.Set]N{}
}

// Reader loops
for vci := range vcs {
go func(vci int, partial map[attribute.Set]int64, vc *viewstate.Compiler) {
go func(vci int, partial map[attribute.Set]N, vc *viewstate.Compiler) {
defer readers.Done()

// scope will be reused by this reader
Expand All @@ -146,7 +168,7 @@ func testSyncStateConcurrency(t *testing.T, numReaders int, tempo aggregation.Te
vc.Collectors()[0].Collect(seq, &scope.Instruments)

for _, pt := range scope.Instruments[0].Points {
partial[pt.Attributes] = update(partial[pt.Attributes], pt.Aggregation.(*sum.MonotonicInt64).Sum().AsInt64())
partial[pt.Attributes] = update(partial[pt.Attributes], traits.FromNumber(pt.Aggregation.(*sum.State[N, Traits, sum.Monotonic]).Sum()))
}
}

Expand Down Expand Up @@ -179,11 +201,11 @@ func testSyncStateConcurrency(t *testing.T, numReaders int, tempo aggregation.Te
readers.Wait()

for vci := range vcs {
var sum int64
var sum N
for _, count := range partialCounts[vci] {
sum += count
}
require.Equal(t, int64(numUpdates), sum, "vci==%d", vci)
require.Equal(t, N(numUpdates), sum, "vci==%d", vci)
}
}

Expand Down

0 comments on commit dfefd1e

Please sign in to comment.