diff --git a/lightstep/sdk/metric/instrument.go b/lightstep/sdk/metric/instrument.go deleted file mode 100644 index f66bf99f..00000000 --- a/lightstep/sdk/metric/instrument.go +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metric // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric" - -import ( - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/metric/instrument" - "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/asyncstate" - "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/pipeline" - "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/syncstate" - "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/viewstate" - "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number" - "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument" -) - -// instrumentConstructor refers to either the syncstate or asyncstate -// NewInstrument method. Although both receive an opaque interface{} -// to distinguish providers, only the asyncstate package needs to know -// this information. The unused parameter is passed to the syncstate -// package for the generalization used here to work. -type instrumentConstructor[T any] func( - instrument sdkinstrument.Descriptor, - opaque interface{}, - compiled pipeline.Register[viewstate.Instrument], -) T - -// configureInstrument applies the instrument configuration, checks -// for an existing definition for the same descriptor, and compiles -// and constructs the instrument if necessary. -func configureInstrument[T any]( - m *meter, - name string, - opts []instrument.Option, - nk number.Kind, - ik sdkinstrument.Kind, - listPtr *[]T, - ctor instrumentConstructor[T], -) (T, error) { - // Compute the instrument descriptor - cfg := instrument.NewConfig(opts...) - desc := sdkinstrument.NewDescriptor(name, ik, nk, cfg.Description(), cfg.Unit()) - - m.lock.Lock() - defer m.lock.Unlock() - - // Lookup a pre-existing instrument by descriptor. - if lookup, has := m.byDesc[desc]; has { - // Recompute conflicts since they may have changed. - var conflicts viewstate.ViewConflictsBuilder - - for _, compiler := range m.compilers { - _, err := compiler.Compile(desc) - conflicts.Combine(err) - } - - return lookup.(T), conflicts.AsError() - } - - // Compile the instrument for each pipeline. the first time. - var conflicts viewstate.ViewConflictsBuilder - compiled := pipeline.NewRegister[viewstate.Instrument](len(m.compilers)) - - for pipe, compiler := range m.compilers { - comp, err := compiler.Compile(desc) - compiled[pipe] = comp - conflicts.Combine(err) - } - - // Build the new instrument, cache it, append to the list. - inst := ctor(desc, m, compiled) - err := conflicts.AsError() - - m.byDesc[desc] = inst - *listPtr = append(*listPtr, inst) - if err != nil { - // Handle instrument creation errors when they're new, - // not for repeat entries above. - otel.Handle(err) - } - return inst, err -} - -// synchronousInstrument configures a synchronous instrument. -func (m *meter) synchronousInstrument(name string, opts []instrument.Option, nk number.Kind, ik sdkinstrument.Kind) (*syncstate.Instrument, error) { - return configureInstrument(m, name, opts, nk, ik, &m.syncInsts, syncstate.NewInstrument) -} - -// synchronousInstrument configures an asynchronous instrument. -func (m *meter) asynchronousInstrument(name string, opts []instrument.Option, nk number.Kind, ik sdkinstrument.Kind) (*asyncstate.Instrument, error) { - return configureInstrument(m, name, opts, nk, ik, &m.asyncInsts, asyncstate.NewInstrument) -} diff --git a/lightstep/sdk/metric/internal/asyncstate/async.go b/lightstep/sdk/metric/internal/asyncstate/async.go new file mode 100644 index 00000000..324c779f --- /dev/null +++ b/lightstep/sdk/metric/internal/asyncstate/async.go @@ -0,0 +1,159 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package asyncstate // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/asyncstate" + +import ( + "context" + "fmt" + "sync" + + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/pipeline" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/viewstate" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" +) + +type ( + // State is the object used to maintain independent collection + // state for each asynchronous meter. + State struct { + // pipe is the pipeline.Register number of this state. + pipe int + + // lock protects against errant use of the instrument + // w/ copied context after the callback returns. + lock sync.Mutex + + // store is a map from instrument to set of values + // observed during one collection. + store map[*Instrument]map[attribute.Set]viewstate.Accumulator + } + + // Instrument is the implementation object associated with one + // asynchronous instrument. + Instrument struct { + // opaque is used to ensure that callbacks are + // registered with instruments from the same provider. + opaque interface{} + + // compiled is the per-pipeline compiled instrument. + compiled pipeline.Register[viewstate.Instrument] + + // descriptor describes the API-level instrument. + // + // Note: Not clear why we need this. It's used for a + // range test, but shouldn't the range test be + // performed by the aggregator? If a View is allowed + // to reconfigure the aggregation in ways that change + // semantics, should the range test be based on the + // aggregation, not the original instrument? + descriptor sdkinstrument.Descriptor + } + + // contextKey is used with context.WithValue() to lookup + // per-reader state from within an executing callback + // function. + contextKey struct{} +) + +func NewState(pipe int) *State { + return &State{ + pipe: pipe, + store: map[*Instrument]map[attribute.Set]viewstate.Accumulator{}, + } +} + +// NewInstrument returns a new Instrument; this compiles individual +// instruments for each reader. +func NewInstrument(desc sdkinstrument.Descriptor, opaque interface{}, compiled pipeline.Register[viewstate.Instrument]) *Instrument { + // Note: we return a non-nil instrument even when all readers + // disabled the instrument. This ensures that certain error + // checks still work (wrong meter, wrong callback, etc). + return &Instrument{ + opaque: opaque, + descriptor: desc, + compiled: compiled, + } +} + +// SnapshotAndProcess calls SnapshotAndProcess() on each of the pending +// aggregations for a given reader. +func (inst *Instrument) SnapshotAndProcess(state *State) { + state.lock.Lock() + defer state.lock.Unlock() + + for _, acc := range state.store[inst] { + // SnapshotAndProcess is always final for asynchronous state, since + // the map is built anew for each collection. + acc.SnapshotAndProcess(true) + } +} + +func (inst *Instrument) getOrCreate(cs *callbackState, attrs []attribute.KeyValue) viewstate.Accumulator { + comp := inst.compiled[cs.state.pipe] + + if comp == nil { + // The view disabled the instrument. + return nil + } + + cs.state.lock.Lock() + defer cs.state.lock.Unlock() + + imap, has := cs.state.store[inst] + + if !has { + imap = map[attribute.Set]viewstate.Accumulator{} + cs.state.store[inst] = imap + } + + aset := attribute.NewSet(attrs...) + se, has := imap[aset] + if !has { + se = comp.NewAccumulator(aset) + imap[aset] = se + } + return se +} + +func capture[N number.Any, Traits number.Traits[N]](ctx context.Context, inst *Instrument, value N, attrs []attribute.KeyValue) { + lookup := ctx.Value(contextKey{}) + if lookup == nil { + otel.Handle(fmt.Errorf("async instrument used outside of callback")) + return + } + + cs := lookup.(*callbackState) + cb := cs.getCallback() + if cb == nil { + otel.Handle(fmt.Errorf("async instrument used after callback return")) + return + } + if _, ok := cb.instruments[inst]; !ok { + otel.Handle(fmt.Errorf("async instrument not declared for use in callback")) + return + } + + if !aggregator.RangeTest[N, Traits](value, inst.descriptor.Kind) { + return + } + + if acc := inst.getOrCreate(cs, attrs); acc != nil { + acc.(viewstate.Updater[N]).Update(value) + } +} diff --git a/lightstep/sdk/metric/internal/asyncstate/async_test.go b/lightstep/sdk/metric/internal/asyncstate/async_test.go new file mode 100644 index 00000000..086ae127 --- /dev/null +++ b/lightstep/sdk/metric/internal/asyncstate/async_test.go @@ -0,0 +1,408 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package asyncstate // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/asyncstate" + +import ( + "context" + "math" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/aggregation" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/sum" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/data" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/pipeline" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/test" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/viewstate" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/view" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/sdk/instrumentation" +) + +var ( + testLibrary = instrumentation.Library{ + Name: "test", + } + + endTime = time.Unix(100, 0) + middleTime = endTime.Add(-time.Millisecond) + startTime = endTime.Add(-2 * time.Millisecond) + + testSequence = data.Sequence{ + Start: startTime, + Last: middleTime, + Now: endTime, + } +) + +type testSDK struct { + compilers []*viewstate.Compiler +} + +func (tsdk *testSDK) compile(desc sdkinstrument.Descriptor) pipeline.Register[viewstate.Instrument] { + reg := pipeline.NewRegister[viewstate.Instrument](len(tsdk.compilers)) + + for i, comp := range tsdk.compilers { + inst, err := comp.Compile(desc) + if err != nil { + panic(err) + } + reg[i] = inst + } + return reg +} + +func testAsync(name string, opts ...view.Option) *testSDK { + return &testSDK{ + compilers: []*viewstate.Compiler{ + viewstate.New(testLibrary, view.New(name, opts...)), + viewstate.New(testLibrary, view.New(name, opts...)), + }, + } +} + +func testAsync2(name string, opts1, opts2 []view.Option) *testSDK { + return &testSDK{ + compilers: []*viewstate.Compiler{ + viewstate.New(testLibrary, view.New(name, opts1...)), + viewstate.New(testLibrary, view.New(name, opts2...)), + }, + } +} + +func testState(num int) *State { + return NewState(num) +} + +func testObserver[N number.Any, Traits number.Traits[N]](tsdk *testSDK, name string, ik sdkinstrument.Kind, opts ...instrument.Option) Observer[N, Traits] { + var t Traits + desc := test.Descriptor(name, ik, t.Kind(), opts...) + impl := NewInstrument(desc, tsdk, tsdk.compile(desc)) + return NewObserver[N, Traits](impl) +} + +func TestNewCallbackError(t *testing.T) { + tsdk := testAsync("test") + + // no instruments error + cb, err := NewCallback(nil, tsdk, nil) + require.Error(t, err) + require.Nil(t, cb) + + // nil callback error + cntr := testObserver[int64, number.Int64Traits](tsdk, "counter", sdkinstrument.CounterObserverKind) + cb, err = NewCallback([]instrument.Asynchronous{cntr}, tsdk, nil) + require.Error(t, err) + require.Nil(t, cb) +} + +func TestNewCallbackProviderMismatch(t *testing.T) { + test0 := testAsync("test0") + test1 := testAsync("test1") + + instA0 := testObserver[int64, number.Int64Traits](test0, "A", sdkinstrument.CounterObserverKind) + instB1 := testObserver[float64, number.Float64Traits](test1, "A", sdkinstrument.CounterObserverKind) + + cb, err := NewCallback([]instrument.Asynchronous{instA0, instB1}, test0, func(context.Context) {}) + require.Error(t, err) + require.Contains(t, err.Error(), "asynchronous instrument belongs to a different meter") + require.Nil(t, cb) + + cb, err = NewCallback([]instrument.Asynchronous{instA0, instB1}, test1, func(context.Context) {}) + require.Error(t, err) + require.Contains(t, err.Error(), "asynchronous instrument belongs to a different meter") + require.Nil(t, cb) + + cb, err = NewCallback([]instrument.Asynchronous{instA0}, test0, func(context.Context) {}) + require.NoError(t, err) + require.NotNil(t, cb) + + cb, err = NewCallback([]instrument.Asynchronous{instB1}, test1, func(context.Context) {}) + require.NoError(t, err) + require.NotNil(t, cb) + + // nil value not of this SDK + var fake0 instrument.Asynchronous + cb, err = NewCallback([]instrument.Asynchronous{fake0}, test0, func(context.Context) {}) + require.Error(t, err) + require.Contains(t, err.Error(), "asynchronous instrument does not belong to this SDK") + require.Nil(t, cb) + + // non-nil value not of this SDK + var fake1 struct { + instrument.Asynchronous + } + cb, err = NewCallback([]instrument.Asynchronous{fake1}, test0, func(context.Context) {}) + require.Error(t, err) + require.Contains(t, err.Error(), "asynchronous instrument does not belong to this SDK") + require.Nil(t, cb) +} + +func TestCallbackInvalidation(t *testing.T) { + errors := test.OTelErrors() + + tsdk := testAsync("test") + + var called int64 + var saveCtx context.Context + + cntr := testObserver[int64, number.Int64Traits](tsdk, "counter", sdkinstrument.CounterObserverKind) + cb, err := NewCallback([]instrument.Asynchronous{cntr}, tsdk, func(ctx context.Context) { + cntr.Observe(ctx, called) + saveCtx = ctx + called++ + }) + require.NoError(t, err) + + state := testState(0) + + // run the callback once legitimately + cb.Run(context.Background(), state) + + // simulate use after callback return + cntr.Observe(saveCtx, 10000000) + + cntr.inst.SnapshotAndProcess(state) + + require.Equal(t, int64(1), called) + require.Equal(t, 1, len(*errors)) + require.Contains(t, (*errors)[0].Error(), "used after callback return") + + test.RequireEqualMetrics( + t, + test.CollectScope( + t, + tsdk.compilers[0].Collectors(), + testSequence, + ), + test.Instrument( + cntr.inst.descriptor, + test.Point(startTime, endTime, sum.NewMonotonicInt64(0), aggregation.CumulativeTemporality), + ), + ) +} + +func TestCallbackInstrumentUndeclaredForCalback(t *testing.T) { + errors := test.OTelErrors() + + tt := testAsync("test") + + var called int64 + + cntr1 := testObserver[int64, number.Int64Traits](tt, "counter1", sdkinstrument.CounterObserverKind) + cntr2 := testObserver[int64, number.Int64Traits](tt, "counter2", sdkinstrument.CounterObserverKind) + + cb, err := NewCallback([]instrument.Asynchronous{cntr1}, tt, func(ctx context.Context) { + cntr2.Observe(ctx, called) + called++ + }) + require.NoError(t, err) + + state := testState(0) + + // run the callback once legitimately + cb.Run(context.Background(), state) + + cntr1.inst.SnapshotAndProcess(state) + cntr2.inst.SnapshotAndProcess(state) + + require.Equal(t, int64(1), called) + require.Equal(t, 1, len(*errors)) + require.Contains(t, (*errors)[0].Error(), "instrument not declared for use in callback") + + test.RequireEqualMetrics( + t, + test.CollectScope( + t, + tt.compilers[0].Collectors(), + testSequence, + ), + test.Instrument( + cntr1.inst.descriptor, + ), + test.Instrument( + cntr2.inst.descriptor, + ), + ) +} + +func TestInstrumentUseOutsideCallback(t *testing.T) { + errors := test.OTelErrors() + + tt := testAsync("test") + + cntr := testObserver[float64, number.Float64Traits](tt, "cntr", sdkinstrument.CounterObserverKind) + + cntr.Observe(context.Background(), 1000) + + state := testState(0) + + cntr.inst.SnapshotAndProcess(state) + + require.Equal(t, 1, len(*errors)) + require.Contains(t, (*errors)[0].Error(), "async instrument used outside of callback") + + test.RequireEqualMetrics( + t, + test.CollectScope( + t, + tt.compilers[0].Collectors(), + testSequence, + ), + test.Instrument( + cntr.inst.descriptor, + ), + ) +} + +func TestCallbackDisabledInstrument(t *testing.T) { + tt := testAsync2( + "test", + []view.Option{ + view.WithClause( + view.MatchInstrumentName("drop1"), + view.WithAggregation(aggregation.DropKind), + ), + view.WithClause( + view.MatchInstrumentName("drop2"), + view.WithAggregation(aggregation.DropKind), + ), + }, + []view.Option{ + view.WithClause( + view.MatchInstrumentName("drop2"), + view.WithAggregation(aggregation.DropKind), + ), + }, + ) + + cntrDrop1 := testObserver[float64, number.Float64Traits](tt, "drop1", sdkinstrument.CounterObserverKind) + cntrDrop2 := testObserver[float64, number.Float64Traits](tt, "drop2", sdkinstrument.CounterObserverKind) + cntrKeep := testObserver[float64, number.Float64Traits](tt, "keep", sdkinstrument.CounterObserverKind) + + cb, _ := NewCallback([]instrument.Asynchronous{cntrDrop1, cntrDrop2, cntrKeep}, tt, func(ctx context.Context) { + cntrKeep.Observe(ctx, 1000) + cntrDrop1.Observe(ctx, 1001) + cntrDrop2.Observe(ctx, 1002) + }) + + runFor := func(num int) { + state := testState(num) + + cb.Run(context.Background(), state) + + cntrKeep.inst.SnapshotAndProcess(state) + cntrDrop1.inst.SnapshotAndProcess(state) + cntrDrop2.inst.SnapshotAndProcess(state) + } + + runFor(0) + runFor(1) + + test.RequireEqualMetrics( + t, + test.CollectScope( + t, + tt.compilers[0].Collectors(), + testSequence, + ), + test.Instrument( + cntrKeep.inst.descriptor, + test.Point(startTime, endTime, sum.NewMonotonicFloat64(1000), aggregation.CumulativeTemporality), + ), + ) + test.RequireEqualMetrics( + t, + test.CollectScope( + t, + tt.compilers[1].Collectors(), + testSequence, + ), + test.Instrument( + cntrDrop1.inst.descriptor, + test.Point(startTime, endTime, sum.NewMonotonicFloat64(1001), aggregation.CumulativeTemporality), + ), + test.Instrument( + cntrKeep.inst.descriptor, + test.Point(startTime, endTime, sum.NewMonotonicFloat64(1000), aggregation.CumulativeTemporality), + ), + ) +} + +func TestOutOfRangeValues(t *testing.T) { + errors := test.OTelErrors() + + tt := testAsync("test") + + c := testObserver[float64, number.Float64Traits](tt, "c", sdkinstrument.CounterObserverKind) + u := testObserver[float64, number.Float64Traits](tt, "u", sdkinstrument.UpDownCounterObserverKind) + g := testObserver[float64, number.Float64Traits](tt, "g", sdkinstrument.GaugeObserverKind) + + cb, _ := NewCallback([]instrument.Asynchronous{ + c, u, g, + }, tt, func(ctx context.Context) { + c.Observe(ctx, math.NaN()) + c.Observe(ctx, math.Inf(+1)) + c.Observe(ctx, math.Inf(-1)) + u.Observe(ctx, math.NaN()) + u.Observe(ctx, math.Inf(+1)) + u.Observe(ctx, math.Inf(-1)) + g.Observe(ctx, math.NaN()) + g.Observe(ctx, math.Inf(+1)) + g.Observe(ctx, math.Inf(-1)) + }) + + runFor := func(num int) { + state := testState(num) + + cb.Run(context.Background(), state) + + c.inst.SnapshotAndProcess(state) + u.inst.SnapshotAndProcess(state) + g.inst.SnapshotAndProcess(state) + } + + for i := 0; i < 2; i++ { + runFor(i) + + test.RequireEqualMetrics( + t, + test.CollectScope( + t, + tt.compilers[i].Collectors(), + testSequence, + ), + test.Instrument( + c.inst.descriptor, + ), + test.Instrument( + u.inst.descriptor, + ), + test.Instrument( + g.inst.descriptor, + ), + ) + } + + // 2 readers x 3 error conditions x 3 instruments + require.Equal(t, 2*3*3, len(*errors)) + require.Contains(t, (*errors), aggregator.ErrNaNInput) + require.Contains(t, (*errors), aggregator.ErrInfInput) +} diff --git a/lightstep/sdk/metric/internal/asyncstate/callback.go b/lightstep/sdk/metric/internal/asyncstate/callback.go new file mode 100644 index 00000000..22fd7b79 --- /dev/null +++ b/lightstep/sdk/metric/internal/asyncstate/callback.go @@ -0,0 +1,102 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package asyncstate // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/asyncstate" + +import ( + "context" + "fmt" + "sync" + + "go.opentelemetry.io/otel/metric/instrument" +) + +// Callback is the implementation object associated with one +// asynchronous callback. +type Callback struct { + // function is the user-provided callback function. + function func(context.Context) + + // instruments are the set of instruments permitted to be used + // inside this callback. + instruments map[*Instrument]struct{} +} + +// NewCallback returns a new Callback; this checks that each of the +// provided instruments belongs to the same meter provider. +func NewCallback(instruments []instrument.Asynchronous, opaque interface{}, function func(context.Context)) (*Callback, error) { + if len(instruments) == 0 { + return nil, fmt.Errorf("asynchronous callback without instruments") + } + if function == nil { + return nil, fmt.Errorf("asynchronous callback with nil function") + } + + cb := &Callback{ + function: function, + instruments: map[*Instrument]struct{}{}, + } + + for _, inst := range instruments { + ai, ok := inst.(memberInstrument) + if !ok { + return nil, fmt.Errorf("asynchronous instrument does not belong to this SDK: %T", inst) + } + thisInst := ai.instrument() + if thisInst.opaque != opaque { + return nil, fmt.Errorf("asynchronous instrument belongs to a different meter") + } + + cb.instruments[thisInst] = struct{}{} + } + + return cb, nil +} + +// Run executes the callback after setting up the appropriate context +// for a specific reader. +func (c *Callback) Run(ctx context.Context, state *State) { + cp := &callbackState{ + callback: c, + state: state, + } + c.function(context.WithValue(ctx, contextKey{}, cp)) + cp.invalidate() +} + +// callbackState is used to lookup the current callback and +// pipeline from within an executing callback function. +type callbackState struct { + // lock protects callback, see invalidate() and getCallback() + lock sync.Mutex + + // callback is the currently running callback; this is set to nil + // after the associated callback function returns. + callback *Callback + + // state is a single collection of data. + state *State +} + +func (cp *callbackState) invalidate() { + cp.lock.Lock() + defer cp.lock.Unlock() + cp.callback = nil +} + +func (cp *callbackState) getCallback() *Callback { + cp.lock.Lock() + defer cp.lock.Unlock() + return cp.callback +} diff --git a/lightstep/sdk/metric/internal/asyncstate/observer.go b/lightstep/sdk/metric/internal/asyncstate/observer.go new file mode 100644 index 00000000..a746a6df --- /dev/null +++ b/lightstep/sdk/metric/internal/asyncstate/observer.go @@ -0,0 +1,66 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package asyncstate // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/asyncstate" + +import ( + "context" + + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/asyncfloat64" + "go.opentelemetry.io/otel/metric/instrument/asyncint64" +) + +// Observer is a generic (int64 or float64) instrument which +// satisfies any of the asynchronous instrument API interfaces. +type Observer[N number.Any, Traits number.Traits[N]] struct { + instrument.Asynchronous // Note: wasted space + + inst *Instrument +} + +// Observer implements 6 instruments and memberInstrument. +var ( + _ asyncint64.Counter = Observer[int64, number.Int64Traits]{} + _ asyncint64.UpDownCounter = Observer[int64, number.Int64Traits]{} + _ asyncint64.Gauge = Observer[int64, number.Int64Traits]{} + _ memberInstrument = Observer[int64, number.Int64Traits]{} + + _ asyncfloat64.Counter = Observer[float64, number.Float64Traits]{} + _ asyncfloat64.UpDownCounter = Observer[float64, number.Float64Traits]{} + _ asyncfloat64.Gauge = Observer[float64, number.Float64Traits]{} + _ memberInstrument = Observer[float64, number.Float64Traits]{} +) + +// memberInstrument indicates whether a user-provided +// instrument was returned by this SDK. +type memberInstrument interface { + instrument() *Instrument +} + +// NewObserver returns an generic value suitable for use as any of the +// asynchronous instrument APIs. +func NewObserver[N number.Any, Traits number.Traits[N]](inst *Instrument) Observer[N, Traits] { + return Observer[N, Traits]{inst: inst} +} + +func (o Observer[N, Traits]) instrument() *Instrument { + return o.inst +} + +func (o Observer[N, Traits]) Observe(ctx context.Context, value N, attrs ...attribute.KeyValue) { + capture[N, Traits](ctx, o.inst, value, attrs) +} diff --git a/lightstep/sdk/metric/internal/syncstate/counter.go b/lightstep/sdk/metric/internal/syncstate/counter.go new file mode 100644 index 00000000..66fcfb84 --- /dev/null +++ b/lightstep/sdk/metric/internal/syncstate/counter.go @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncstate // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/syncstate" + +import ( + "context" + + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/syncfloat64" + "go.opentelemetry.io/otel/metric/instrument/syncint64" +) + +// Counter is a synchronous instrument having an Add() method. +type Counter[N number.Any, Traits number.Traits[N]] struct { + instrument.Synchronous // Note: wasted space + + inst *Instrument +} + +// Counter satisfies 4 instrument APIs. +var ( + _ syncint64.Counter = Counter[int64, number.Int64Traits]{} + _ syncint64.UpDownCounter = Counter[int64, number.Int64Traits]{} + _ syncfloat64.Counter = Counter[float64, number.Float64Traits]{} + _ syncfloat64.UpDownCounter = Counter[float64, number.Float64Traits]{} +) + +// NewCounter returns a value that implements the Counter and UpDownCounter APIs. +func NewCounter[N number.Any, Traits number.Traits[N]](inst *Instrument) Counter[N, Traits] { + return Counter[N, Traits]{inst: inst} +} + +// Add increments a Counter or UpDownCounter. +func (c Counter[N, Traits]) Add(ctx context.Context, incr N, attrs ...attribute.KeyValue) { + capture[N, Traits](ctx, c.inst, incr, attrs) +} diff --git a/lightstep/sdk/metric/internal/syncstate/histogram.go b/lightstep/sdk/metric/internal/syncstate/histogram.go new file mode 100644 index 00000000..a35b90e3 --- /dev/null +++ b/lightstep/sdk/metric/internal/syncstate/histogram.go @@ -0,0 +1,48 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncstate // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/syncstate" + +import ( + "context" + + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/syncfloat64" + "go.opentelemetry.io/otel/metric/instrument/syncint64" +) + +// Histogram is a synchronous instrument having a Record() method. +type Histogram[N number.Any, Traits number.Traits[N]] struct { + instrument.Synchronous // Note: wasted space + + inst *Instrument +} + +// Histogram satisfies 2 instrument APIs. +var ( + _ syncint64.Histogram = Histogram[int64, number.Int64Traits]{} + _ syncfloat64.Histogram = Histogram[float64, number.Float64Traits]{} +) + +// NewCounter returns a value that implements the Histogram API. +func NewHistogram[N number.Any, Traits number.Traits[N]](inst *Instrument) Histogram[N, Traits] { + return Histogram[N, Traits]{inst: inst} +} + +// Record records a Histogram observation. +func (h Histogram[N, Traits]) Record(ctx context.Context, incr N, attrs ...attribute.KeyValue) { + capture[N, Traits](ctx, h.inst, incr, attrs) +} diff --git a/lightstep/sdk/metric/internal/syncstate/refcount_mapped.go b/lightstep/sdk/metric/internal/syncstate/refcount_mapped.go new file mode 100644 index 00000000..cbf60a22 --- /dev/null +++ b/lightstep/sdk/metric/internal/syncstate/refcount_mapped.go @@ -0,0 +1,65 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncstate + +import ( + "sync/atomic" +) + +// refcountMapped atomically counts the number of references (usages) of an entry +// while also keeping a state of mapped/unmapped into a different data structure +// (an external map or list for example). +// +// refcountMapped uses an atomic value where the least significant bit is used to +// keep the state of mapping ('1' is used for unmapped and '0' is for mapped) and +// the rest of the bits are used for refcounting. +type refcountMapped struct { + // refcount has to be aligned for 64-bit atomic operations. + value int64 +} + +func newRefcountMapped() refcountMapped { + return refcountMapped{ + value: 2, + } +} + +// ref returns true if the entry is still mapped and increases the +// reference usages, if unmapped returns false. +func (rm *refcountMapped) ref() bool { + // Check if this entry was marked as unmapped between the moment + // we got a reference to it (or will be removed very soon) and here. + return atomic.AddInt64(&rm.value, 2)&1 == 0 +} + +func (rm *refcountMapped) unref() { + atomic.AddInt64(&rm.value, -2) +} + +// tryUnmap flips the mapped bit to "unmapped" state and returns true if both of the +// following conditions are true upon entry to this function: +// * There are no active references; +// * The mapped bit is in "mapped" state. +// Otherwise no changes are done to mapped bit and false is returned. +func (rm *refcountMapped) tryUnmap() bool { + if atomic.LoadInt64(&rm.value) != 0 { + return false + } + return atomic.CompareAndSwapInt64( + &rm.value, + 0, + 1, + ) +} diff --git a/lightstep/sdk/metric/internal/syncstate/sync.go b/lightstep/sdk/metric/internal/syncstate/sync.go new file mode 100644 index 00000000..b481197b --- /dev/null +++ b/lightstep/sdk/metric/internal/syncstate/sync.go @@ -0,0 +1,215 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncstate + +import ( + "context" + "runtime" + "sync" + "sync/atomic" + + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/pipeline" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/viewstate" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument" + "go.opentelemetry.io/otel/attribute" +) + +// Instrument maintains a mapping from attribute.Set to an internal +// record type for a single API-level instrument. This type is +// organized so that a single attribute.Set lookup is performed +// regardless of the number of reader and instrument-view behaviors. +// Entries in the map have their accumulator's SnapshotAndProcess() +// method called whenever they are removed from the map, which can +// happen when any reader collects the instrument. +type Instrument struct { + // descriptor is the API-provided descriptor for the + // instrument, unmodified by views. + descriptor sdkinstrument.Descriptor + + // compiled will be a single compiled instrument or a + // multi-instrument in case of multiple view behaviors + // and/or readers; these distinctions do not matter + // for synchronous aggregation. + compiled viewstate.Instrument + + // current is a synchronous form of map[attribute.Set]*record. + current sync.Map +} + +// NewInstruments builds a new synchronous instrument given the +// per-pipeline instrument-views compiled. Note that the unused +// second parameter is an opaque value used in the asyncstate package, +// passed here to make these two packages generalize. +func NewInstrument(desc sdkinstrument.Descriptor, _ interface{}, compiled pipeline.Register[viewstate.Instrument]) *Instrument { + var nonnil []viewstate.Instrument + for _, comp := range compiled { + if comp != nil { + nonnil = append(nonnil, comp) + } + } + if nonnil == nil { + // When no readers enable the instrument, no need for an instrument. + return nil + } + return &Instrument{ + descriptor: desc, + + // Note that viewstate.Combine is used to eliminate + // the per-pipeline distinction that is useful in the + // asyncstate package. Here, in the common case there + // will be one pipeline and one view, such that + // viewstate.Combine produces a single concrete + // viewstate.Instrument. Only when there are multiple + // views or multiple pipelines will the combination + // produce a viewstate.multiInstrument here. + compiled: viewstate.Combine(desc, nonnil...), + } +} + +// SnapshotAndProcess calls SnapshotAndProcess() for all live +// accumulators of this instrument. Inactive accumulators will be +// subsequently removed from the map. +func (inst *Instrument) SnapshotAndProcess() { + inst.current.Range(func(key interface{}, value interface{}) bool { + rec := value.(*record) + if rec.snapshotAndProcess(false) { + return true + } + // Having no updates since last collection, try to unmap: + if unmapped := rec.refMapped.tryUnmap(); !unmapped { + // The record is still referenced, continue. + return true + } + + // If any other goroutines are now trying to re-insert this + // entry in the map, they are busy calling Gosched() awaiting + // this deletion: + inst.current.Delete(key) + + // We have an exclusive reference to this accumulator + // now, but there could have been an update between + // the snapshotAndProcess() and tryUnmap() above, so + // snapshotAndProcess one last time. + _ = rec.snapshotAndProcess(true) + return true + }) +} + +// record consists of an accumulator, a reference count, the number of +// updates, and the number of collected updates. +type record struct { + refMapped refcountMapped + + // updateCount is incremented on every Update. + updateCount int64 + + // collectedCount is set to updateCount on collection, + // supports checking for no updates during a round. + collectedCount int64 + + // accumulator can be a multi-accumulator if there + // are multiple behaviors or multiple readers, but + // these distinctions are not relevant for synchronous + // instruments. + accumulator viewstate.Accumulator +} + +// snapshotAndProcess checks whether the accumulator has been +// modified since the last collection (by any reader), returns a +// boolean indicating whether the record is active. If active, calls +// SnapshotAndProcess on the associated accumulator and returns true. +// If updates happened since the last collection (by any reader), +// returns false. +func (rec *record) snapshotAndProcess(final bool) bool { + mods := atomic.LoadInt64(&rec.updateCount) + coll := atomic.LoadInt64(&rec.collectedCount) + + if mods == coll { + return false + } + rec.accumulator.SnapshotAndProcess(final) + + // Updates happened in this interval, collect and continue. + atomic.StoreInt64(&rec.collectedCount, mods) + return true +} + +// capture performs a single update for any synchronous instrument. +func capture[N number.Any, Traits number.Traits[N]](_ context.Context, inst *Instrument, num N, attrs []attribute.KeyValue) { + if inst == nil { + // Instrument was completely disabled by the view. + return + } + + // Note: Here, this is the place to use context, e.g., extract baggage. + + if !aggregator.RangeTest[N, Traits](num, inst.descriptor.Kind) { + return + } + + rec, updater := acquireRecord[N](inst, attrs) + defer rec.refMapped.unref() + + updater.Update(num) + + // Record was modified. + atomic.AddInt64(&rec.updateCount, 1) +} + +// acquireRecord gets or creates a `*record` corresponding to `attrs`, +// the input attributes. +func acquireRecord[N number.Any](inst *Instrument, attrs []attribute.KeyValue) (*record, viewstate.Updater[N]) { + aset := attribute.NewSet(attrs...) + + if lookup, ok := inst.current.Load(aset); ok { + // Existing record case. + rec := lookup.(*record) + + if rec.refMapped.ref() { + // At this moment it is guaranteed that the + // record is in the map and will not be removed. + return rec, rec.accumulator.(viewstate.Updater[N]) + } + // This record is no longer mapped, try to add a new + // record below. + } + + // Note: the accumulator set below is created speculatively; + // if it is never returned, it will not be updated and can be + // safely discarded. + newRec := &record{ + refMapped: newRefcountMapped(), + accumulator: inst.compiled.NewAccumulator(aset), + } + + for { + if found, loaded := inst.current.LoadOrStore(aset, newRec); loaded { + oldRec := found.(*record) + if oldRec.refMapped.ref() { + return oldRec, oldRec.accumulator.(viewstate.Updater[N]) + } + // When this happens, we are waiting for the call to Delete() + // inside SnapshotAndProcess() to complete before inserting + // a new record. This avoids busy-waiting. + runtime.Gosched() + continue + } + break + } + + return newRec, newRec.accumulator.(viewstate.Updater[N]) +} diff --git a/lightstep/sdk/metric/internal/syncstate/sync_test.go b/lightstep/sdk/metric/internal/syncstate/sync_test.go new file mode 100644 index 00000000..11f92ff7 --- /dev/null +++ b/lightstep/sdk/metric/internal/syncstate/sync_test.go @@ -0,0 +1,386 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncstate + +import ( + "context" + "math" + "math/rand" + "sync" + "testing" + "time" + + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/aggregation" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/histogram" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/sum" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/data" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/pipeline" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/test" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/viewstate" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/view" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/instrumentation" +) + +var ( + endTime = time.Unix(100, 0) + middleTime = endTime.Add(-time.Millisecond) + startTime = endTime.Add(-2 * time.Millisecond) + + testSequence = data.Sequence{ + Start: startTime, + Last: middleTime, + Now: endTime, + } +) + +func deltaUpdate[N number.Any](old, new N) N { + return old + new +} + +func cumulativeUpdate[N number.Any](_, new N) N { + return new +} + +const testAttr = attribute.Key("key") + +var ( + deltaSelector = view.WithDefaultAggregationTemporalitySelector(func(_ sdkinstrument.Kind) aggregation.Temporality { + return aggregation.DeltaTemporality + }) + + cumulativeSelector = view.WithDefaultAggregationTemporalitySelector(func(_ sdkinstrument.Kind) aggregation.Temporality { + return aggregation.CumulativeTemporality + }) + + keyFilter = view.WithClause( + view.WithKeys([]attribute.Key{}), + ) +) + +func TestSyncStateDeltaConcurrencyInt(t *testing.T) { + testSyncStateConcurrency[int64, number.Int64Traits](t, deltaUpdate[int64], deltaSelector) +} + +func TestSyncStateCumulativeConcurrencyInt(t *testing.T) { + testSyncStateConcurrency[int64, number.Int64Traits](t, cumulativeUpdate[int64], cumulativeSelector) +} + +func TestSyncStateCumulativeConcurrencyIntFiltered(t *testing.T) { + testSyncStateConcurrency[int64, number.Int64Traits](t, cumulativeUpdate[int64], cumulativeSelector, keyFilter) +} + +func TestSyncStateDeltaConcurrencyFloat(t *testing.T) { + testSyncStateConcurrency[float64, number.Float64Traits](t, deltaUpdate[float64], deltaSelector) +} + +func TestSyncStateCumulativeConcurrencyFloat(t *testing.T) { + testSyncStateConcurrency[float64, number.Float64Traits](t, cumulativeUpdate[float64], cumulativeSelector) +} + +func TestSyncStateCumulativeConcurrencyFloatFiltered(t *testing.T) { + testSyncStateConcurrency[float64, number.Float64Traits](t, cumulativeUpdate[float64], cumulativeSelector, keyFilter) +} + +func testSyncStateConcurrency[N number.Any, Traits number.Traits[N]](t *testing.T, update func(old, new N) N, vopts ...view.Option) { + const ( + numReaders = 2 + numRoutines = 10 + numAttrs = 10 + numUpdates = 1e6 + ) + + var traits Traits + var writers sync.WaitGroup + var readers sync.WaitGroup + + readers.Add(numReaders) + writers.Add(numRoutines) + + lib := instrumentation.Library{ + Name: "testlib", + } + vcs := make([]*viewstate.Compiler, numReaders) + for vci := range vcs { + vcs[vci] = viewstate.New(lib, view.New("test", vopts...)) + } + attrs := make([]attribute.KeyValue, numAttrs) + for i := range attrs { + attrs[i] = testAttr.Int(i) + } + + desc := test.Descriptor("tester", sdkinstrument.CounterKind, traits.Kind()) + + pipes := make(pipeline.Register[viewstate.Instrument], numReaders) + for vci := range vcs { + pipes[vci], _ = vcs[vci].Compile(desc) + } + + inst := NewInstrument(desc, nil, pipes) + require.NotNil(t, inst) + + cntr := NewCounter[N, Traits](inst) + require.NotNil(t, cntr) + + ctx, cancel := context.WithCancel(context.Background()) + + partialCounts := make([]map[attribute.Set]N, numReaders) + + for vci := range vcs { + partialCounts[vci] = map[attribute.Set]N{} + } + + // Reader loops + for vci := range vcs { + go func(vci int, partial map[attribute.Set]N, vc *viewstate.Compiler) { + defer readers.Done() + + // scope will be reused by this reader + var scope data.Scope + seq := data.Sequence{ + Start: time.Now(), + } + seq.Now = seq.Start + + collect := func() { + seq.Last = seq.Now + seq.Now = time.Now() + + inst.SnapshotAndProcess() + + scope.Reset() + + vc.Collectors()[0].Collect(seq, &scope.Instruments) + + for _, pt := range scope.Instruments[0].Points { + partial[pt.Attributes] = update(partial[pt.Attributes], traits.FromNumber(pt.Aggregation.(*sum.State[N, Traits, sum.Monotonic]).Sum())) + } + } + + for { + select { + case <-ctx.Done(): + collect() + return + default: + collect() + } + } + }(vci, partialCounts[vci], vcs[vci]) + } + + // Writer loops + for i := 0; i < numRoutines; i++ { + go func() { + defer writers.Done() + rnd := rand.New(rand.NewSource(rand.Int63())) + + for j := 0; j < numUpdates/numRoutines; j++ { + cntr.Add(ctx, 1, attrs[rnd.Intn(len(attrs))]) + } + }() + } + + writers.Wait() + cancel() + readers.Wait() + + for vci := range vcs { + var sum N + for _, count := range partialCounts[vci] { + sum += count + } + require.Equal(t, N(numUpdates), sum, "vci==%d", vci) + } +} + +func TestSyncStatePartialNoopInstrument(t *testing.T) { + ctx := context.Background() + vopts := []view.Option{ + view.WithClause( + view.MatchInstrumentName("dropme"), + view.WithAggregation(aggregation.DropKind), + ), + } + lib := instrumentation.Library{ + Name: "testlib", + } + vcs := make([]*viewstate.Compiler, 2) + vcs[0] = viewstate.New(lib, view.New("dropper", vopts...)) + vcs[1] = viewstate.New(lib, view.New("keeper")) + + desc := test.Descriptor("dropme", sdkinstrument.HistogramKind, number.Float64Kind) + + pipes := make(pipeline.Register[viewstate.Instrument], 2) + pipes[0], _ = vcs[0].Compile(desc) + pipes[1], _ = vcs[1].Compile(desc) + + require.Nil(t, pipes[0]) + require.NotNil(t, pipes[1]) + + inst := NewInstrument(desc, nil, pipes) + require.NotNil(t, inst) + + hist := NewHistogram[float64, number.Float64Traits](inst) + require.NotNil(t, hist) + + hist.Record(ctx, 1) + hist.Record(ctx, 2) + hist.Record(ctx, 3) + + inst.SnapshotAndProcess() + + test.RequireEqualMetrics( + t, + test.CollectScope( + t, + vcs[0].Collectors(), + testSequence, + ), + ) + + // Note: Create a merged histogram that is exactly equal to + // the one we expect. Merging creates a slightly different + // struct, despite identical value, so we merge to create the + // expected value: + expectHist := histogram.NewFloat64(aggregator.HistogramConfig{}) + mergeIn := histogram.NewFloat64(aggregator.HistogramConfig{}, 1, 2, 3) + expectHist.Merge(mergeIn) + + test.RequireEqualMetrics( + t, + test.CollectScope( + t, + vcs[1].Collectors(), + testSequence, + ), + test.Instrument( + desc, + test.Point(startTime, endTime, + expectHist, + aggregation.CumulativeTemporality, + ), + ), + ) +} + +func TestSyncStateFullNoopInstrument(t *testing.T) { + ctx := context.Background() + vopts := []view.Option{ + view.WithClause( + view.MatchInstrumentName("dropme"), + view.WithAggregation(aggregation.DropKind), + ), + } + lib := instrumentation.Library{ + Name: "testlib", + } + vcs := make([]*viewstate.Compiler, 2) + vcs[0] = viewstate.New(lib, view.New("dropper", vopts...)) + vcs[1] = viewstate.New(lib, view.New("keeper", vopts...)) + + desc := test.Descriptor("dropme", sdkinstrument.HistogramKind, number.Float64Kind) + + pipes := make(pipeline.Register[viewstate.Instrument], 2) + pipes[0], _ = vcs[0].Compile(desc) + pipes[1], _ = vcs[1].Compile(desc) + + require.Nil(t, pipes[0]) + require.Nil(t, pipes[1]) + + inst := NewInstrument(desc, nil, pipes) + require.Nil(t, inst) + + hist := NewHistogram[float64, number.Float64Traits](inst) + require.NotNil(t, hist) + + hist.Record(ctx, 1) + hist.Record(ctx, 2) + hist.Record(ctx, 3) + + // There's no instrument, nothing to Snapshot + require.Equal(t, 0, len(vcs[0].Collectors())) + require.Equal(t, 0, len(vcs[1].Collectors())) +} + +func TestOutOfRangeValues(t *testing.T) { + for _, desc := range []sdkinstrument.Descriptor{ + test.Descriptor("cf", sdkinstrument.CounterKind, number.Float64Kind), + test.Descriptor("uf", sdkinstrument.UpDownCounterKind, number.Float64Kind), + test.Descriptor("hf", sdkinstrument.HistogramKind, number.Float64Kind), + test.Descriptor("ci", sdkinstrument.CounterKind, number.Int64Kind), + test.Descriptor("ui", sdkinstrument.UpDownCounterKind, number.Int64Kind), + test.Descriptor("hi", sdkinstrument.HistogramKind, number.Int64Kind), + } { + ctx := context.Background() + lib := instrumentation.Library{ + Name: "testlib", + } + vcs := make([]*viewstate.Compiler, 1) + vcs[0] = viewstate.New(lib, view.New("test")) + + pipes := make(pipeline.Register[viewstate.Instrument], 1) + pipes[0], _ = vcs[0].Compile(desc) + + inst := NewInstrument(desc, nil, pipes) + require.NotNil(t, inst) + + var negOne aggregation.Aggregation + + if desc.NumberKind == number.Float64Kind { + cntr := NewCounter[float64, number.Float64Traits](inst) + + cntr.Add(ctx, -1) + cntr.Add(ctx, math.NaN()) + cntr.Add(ctx, math.Inf(+1)) + cntr.Add(ctx, math.Inf(-1)) + negOne = sum.NewNonMonotonicFloat64(-1) + } else { + cntr := NewCounter[int64, number.Int64Traits](inst) + + cntr.Add(ctx, -1) + negOne = sum.NewNonMonotonicInt64(-1) + } + + inst.SnapshotAndProcess() + + var expectPoints []data.Point + + if desc.Kind == sdkinstrument.UpDownCounterKind { + expectPoints = append(expectPoints, test.Point( + startTime, endTime, + negOne, + aggregation.CumulativeTemporality, + )) + } + + test.RequireEqualMetrics( + t, + test.CollectScope( + t, + vcs[0].Collectors(), + testSequence, + ), + test.Instrument( + desc, + expectPoints..., + ), + ) + } +} diff --git a/lightstep/sdk/metric/internal/test/test.go b/lightstep/sdk/metric/internal/test/test.go index 7fe01390..ebf13ee1 100644 --- a/lightstep/sdk/metric/internal/test/test.go +++ b/lightstep/sdk/metric/internal/test/test.go @@ -90,7 +90,7 @@ func CollectScopeReuse(t *testing.T, collectors []data.Collector, seq data.Seque func RequireEqualPoints(t *testing.T, output []data.Point, expected ...data.Point) { t.Helper() - require.Equal(t, len(output), len(expected)) + require.Equal(t, len(output), len(expected), "points have different length") cpy := make([]data.Point, len(expected)) copy(cpy, expected) diff --git a/lightstep/sdk/metric/internal/viewstate/accumulators.go b/lightstep/sdk/metric/internal/viewstate/accumulators.go index f6705612..e94a2289 100644 --- a/lightstep/sdk/metric/internal/viewstate/accumulators.go +++ b/lightstep/sdk/metric/internal/viewstate/accumulators.go @@ -16,17 +16,74 @@ package viewstate // import "github.com/lightstep/otel-launcher-go/lightstep/sdk import ( "sync" + "sync/atomic" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number" + "go.opentelemetry.io/otel/attribute" ) +// compiledSyncBase is any synchronous instrument view. +type compiledSyncBase[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct { + instrumentBase[N, Storage, int64, Methods] +} + +// NewAccumulator returns a Accumulator for a synchronous instrument view. +func (csv *compiledSyncBase[N, Storage, Methods]) NewAccumulator(kvs attribute.Set) Accumulator { + sc := &syncAccumulator[N, Storage, Methods]{} + csv.initStorage(&sc.current) + csv.initStorage(&sc.snapshot) + + sc.holder = csv.findStorage(kvs) + return sc +} + +// findStorage locates the output Storage and adds to the auxiliary +// reference count for synchronous instruments. +func (csv *compiledSyncBase[N, Storage, Methods]) findStorage( + kvs attribute.Set, +) *storageHolder[Storage, int64] { + kvs = csv.applyKeysFilter(kvs) + + csv.instLock.Lock() + defer csv.instLock.Unlock() + + entry := csv.getOrCreateEntry(kvs) + atomic.AddInt64(&entry.auxiliary, 1) + return entry +} + +// compiledAsyncBase is any asynchronous instrument view. +type compiledAsyncBase[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct { + instrumentBase[N, Storage, notUsed, Methods] +} + +// NewAccumulator returns a Accumulator for an asynchronous instrument view. +func (cav *compiledAsyncBase[N, Storage, Methods]) NewAccumulator(kvs attribute.Set) Accumulator { + ac := &asyncAccumulator[N, Storage, Methods]{} + + ac.holder = cav.findStorage(kvs) + return ac +} + +// findStorage locates the output Storage for asynchronous instruments. +func (cav *compiledAsyncBase[N, Storage, Methods]) findStorage( + kvs attribute.Set, +) *storageHolder[Storage, notUsed] { + kvs = cav.applyKeysFilter(kvs) + + cav.instLock.Lock() + defer cav.instLock.Unlock() + + return cav.getOrCreateEntry(kvs) +} + // multiAccumulator type multiAccumulator[N number.Any] []Accumulator -func (acc multiAccumulator[N]) SnapshotAndProcess() { +func (acc multiAccumulator[N]) SnapshotAndProcess(final bool) { for _, coll := range acc { - coll.SnapshotAndProcess() + coll.SnapshotAndProcess(final) } } @@ -38,9 +95,12 @@ func (acc multiAccumulator[N]) Update(value N) { // syncAccumulator type syncAccumulator[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct { - current Storage - snapshot Storage - findStorage func() *Storage + // syncLock prevents two readers from calling + // SnapshotAndProcess at the same moment. + syncLock sync.Mutex + current Storage + snapshot Storage + holder *storageHolder[Storage, int64] } func (acc *syncAccumulator[N, Storage, Methods]) Update(number N) { @@ -48,29 +108,35 @@ func (acc *syncAccumulator[N, Storage, Methods]) Update(number N) { methods.Update(&acc.current, number) } -func (acc *syncAccumulator[N, Storage, Methods]) SnapshotAndProcess() { +func (acc *syncAccumulator[N, Storage, Methods]) SnapshotAndProcess(final bool) { var methods Methods - methods.SynchronizedMove(&acc.current, &acc.snapshot) - methods.Merge(acc.findStorage(), &acc.snapshot) + acc.syncLock.Lock() + defer acc.syncLock.Unlock() + methods.Move(&acc.current, &acc.snapshot) + methods.Merge(&acc.snapshot, &acc.holder.storage) + if final { + // On the final snapshot-and-process, decrement the auxiliary reference count. + atomic.AddInt64(&acc.holder.auxiliary, -1) + } } // asyncAccumulator type asyncAccumulator[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct { - lock sync.Mutex - current N - findStorage func() *Storage + asyncLock sync.Mutex + current N + holder *storageHolder[Storage, notUsed] } func (acc *asyncAccumulator[N, Storage, Methods]) Update(number N) { - acc.lock.Lock() - defer acc.lock.Unlock() + acc.asyncLock.Lock() + defer acc.asyncLock.Unlock() acc.current = number } -func (acc *asyncAccumulator[N, Storage, Methods]) SnapshotAndProcess() { - acc.lock.Lock() - defer acc.lock.Unlock() +func (acc *asyncAccumulator[N, Storage, Methods]) SnapshotAndProcess(_ bool) { + acc.asyncLock.Lock() + defer acc.asyncLock.Unlock() var methods Methods - methods.Update(acc.findStorage(), acc.current) + methods.Update(&acc.holder.storage, acc.current) } diff --git a/lightstep/sdk/metric/internal/viewstate/base_instrument.go b/lightstep/sdk/metric/internal/viewstate/base_instrument.go index 7ed0babc..47eec43d 100644 --- a/lightstep/sdk/metric/internal/viewstate/base_instrument.go +++ b/lightstep/sdk/metric/internal/viewstate/base_instrument.go @@ -26,79 +26,88 @@ import ( "go.opentelemetry.io/otel/attribute" ) +// storageHolder is a generic struct for holding one storage and one +// auxiliary field. Storage will be one of the aggregators. The +// auxiliary type depends on whether synchronous or asynchronous. +// +// Auxiliary is an int64 reference count for synchronous instruments +// and notUsed for asynchronous instruments. +type storageHolder[Storage, Auxiliary any] struct { + auxiliary Auxiliary + storage Storage +} + +// notUsed is the Auxiliary type for asynchronous instruments. +type notUsed struct{} + // instrumentBase is the common type embedded in any of the compiled instrument views. -type instrumentBase[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct { - lock sync.Mutex +type instrumentBase[N number.Any, Storage, Auxiliary any, Methods aggregator.Methods[N, Storage]] struct { + instLock sync.Mutex fromName string desc sdkinstrument.Descriptor acfg aggregator.Config - data map[attribute.Set]*Storage + data map[attribute.Set]*storageHolder[Storage, Auxiliary] keysSet *attribute.Set keysFilter *attribute.Filter } -func (metric *instrumentBase[N, Storage, Methods]) Aggregation() aggregation.Kind { +func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) Aggregation() aggregation.Kind { var methods Methods return methods.Kind() } -func (metric *instrumentBase[N, Storage, Methods]) OriginalName() string { +func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) OriginalName() string { return metric.fromName } -func (metric *instrumentBase[N, Storage, Methods]) Descriptor() sdkinstrument.Descriptor { +func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) Descriptor() sdkinstrument.Descriptor { return metric.desc } -func (metric *instrumentBase[N, Storage, Methods]) Keys() *attribute.Set { +func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) Keys() *attribute.Set { return metric.keysSet } -func (metric *instrumentBase[N, Storage, Methods]) Config() aggregator.Config { +func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) Config() aggregator.Config { return metric.acfg } -func (metric *instrumentBase[N, Storage, Methods]) initStorage(s *Storage) { +func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) initStorage(s *Storage) { var methods Methods methods.Init(s, metric.acfg) } -func (metric *instrumentBase[N, Storage, Methods]) mergeDescription(d string) { - metric.lock.Lock() - defer metric.lock.Unlock() +func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) mergeDescription(d string) { + metric.instLock.Lock() + defer metric.instLock.Unlock() if len(d) > len(metric.desc.Description) { metric.desc.Description = d } } -// storageFinder searches for and possibly allocates an output Storage -// for this metric. Filtered keys, if a filter is provided, will be -// computed once. -func (metric *instrumentBase[N, Storage, Methods]) storageFinder( - kvs attribute.Set, -) func() *Storage { +func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) applyKeysFilter(kvs attribute.Set) attribute.Set { if metric.keysFilter != nil { kvs, _ = attribute.NewSetWithFiltered(kvs.ToSlice(), *metric.keysFilter) } + return kvs +} - return func() *Storage { - metric.lock.Lock() - defer metric.lock.Unlock() - - storage, has := metric.data[kvs] - if has { - return storage - } - - ns := metric.newStorage() - metric.data[kvs] = ns - return ns +func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) getOrCreateEntry(kvs attribute.Set) *storageHolder[Storage, Auxiliary] { + entry, has := metric.data[kvs] + if has { + return entry } + + var methods Methods + entry = &storageHolder[Storage, Auxiliary]{} + methods.Init(&entry.storage, metric.acfg) + metric.data[kvs] = entry + return entry } // newStorage allocates and initializes a new Storage. -func (metric *instrumentBase[N, Storage, Methods]) newStorage() *Storage { +func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) newStorage() *Storage { ns := new(Storage) metric.initStorage(ns) return ns @@ -110,22 +119,37 @@ func (metric *instrumentBase[N, Storage, Methods]) newStorage() *Storage { // be produced (in the same order); consumers of delta temporality // should expect to see empty instruments in the output for metric // data that is unchanged. -func (metric *instrumentBase[N, Storage, Methods]) appendInstrument(output *[]data.Instrument) *data.Instrument { +func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) appendInstrument(output *[]data.Instrument) *data.Instrument { inst := data.ReallocateFrom(output) inst.Descriptor = metric.desc return inst } -// appendPoint is used in cases where the output Aggregation is the -// stored object; use appendOrReusePoint in the case where the output -// Aggregation is a copy of the stored object (in case the stored -// object will be reset on collection, as opposed to a second pass to -// reset delta temporality outputs before the next accumulation. -func (metric *instrumentBase[N, Storage, Methods]) appendPoint(inst *data.Instrument, set attribute.Set, agg aggregation.Aggregation, tempo aggregation.Temporality, start, end time.Time) { - point := data.ReallocateFrom(&inst.Points) +// appendPoint adds a new point to the output. Note that the existing +// slice will be extended, if possible, and the existing Aggregation +// is potentially re-used. The variable `reset` determines whether +// Move() or Copy() is used. Note that both Move and Copy are +// synchronized with respect to Update() and Merge(), necesary for the +// synchronous code path which may see concurrent collection. +func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) appendPoint(inst *data.Instrument, set attribute.Set, storage *Storage, tempo aggregation.Temporality, start, end time.Time, reset bool) { + var methods Methods + + // Possibly re-use the underlying storage. + point, out := metric.appendOrReusePoint(inst) + if out == nil { + out = metric.newStorage() + } + + if reset { + // Note: synchronized move uses swap for expensive + // copies, like histogram. + methods.Move(storage, out) + } else { + methods.Copy(storage, out) + } point.Attributes = set - point.Aggregation = agg + point.Aggregation = methods.ToAggregation(out) point.Temporality = tempo point.Start = start point.End = end @@ -133,7 +157,7 @@ func (metric *instrumentBase[N, Storage, Methods]) appendPoint(inst *data.Instru // appendOrReusePoint is an alternate to appendPoint; this form is used when // the storage will be reset on collection. -func (metric *instrumentBase[N, Storage, Methods]) appendOrReusePoint(inst *data.Instrument) (*data.Point, *Storage) { +func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) appendOrReusePoint(inst *data.Instrument) (*data.Point, *Storage) { point := data.ReallocateFrom(&inst.Points) var methods Methods diff --git a/lightstep/sdk/metric/internal/viewstate/collectors.go b/lightstep/sdk/metric/internal/viewstate/collectors.go index 3c026abe..54afbf06 100644 --- a/lightstep/sdk/metric/internal/viewstate/collectors.go +++ b/lightstep/sdk/metric/internal/viewstate/collectors.go @@ -15,6 +15,8 @@ package viewstate // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/viewstate" import ( + "sync/atomic" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/aggregation" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/data" @@ -22,36 +24,6 @@ import ( "go.opentelemetry.io/otel/attribute" ) -// compiledSyncBase is any synchronous instrument view. -type compiledSyncBase[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct { - instrumentBase[N, Storage, Methods] -} - -// NewAccumulator returns a Accumulator for a synchronous instrument view. -func (csv *compiledSyncBase[N, Storage, Methods]) NewAccumulator(kvs attribute.Set) Accumulator { - sc := &syncAccumulator[N, Storage, Methods]{} - csv.initStorage(&sc.current) - csv.initStorage(&sc.snapshot) - - sc.findStorage = csv.storageFinder(kvs) - - return sc -} - -// compiledSyncBase is any asynchronous instrument view. -type compiledAsyncBase[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct { - instrumentBase[N, Storage, Methods] -} - -// NewAccumulator returns a Accumulator for an asynchronous instrument view. -func (cav *compiledAsyncBase[N, Storage, Methods]) NewAccumulator(kvs attribute.Set) Accumulator { - ac := &asyncAccumulator[N, Storage, Methods]{} - - ac.findStorage = cav.storageFinder(kvs) - - return ac -} - // statefulSyncInstrument is a synchronous instrument that maintains cumulative state. type statefulSyncInstrument[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct { compiledSyncBase[N, Storage, Methods] @@ -59,15 +31,13 @@ type statefulSyncInstrument[N number.Any, Storage any, Methods aggregator.Method // Collect for synchronous cumulative temporality. func (p *statefulSyncInstrument[N, Storage, Methods]) Collect(seq data.Sequence, output *[]data.Instrument) { - var methods Methods - - p.lock.Lock() - defer p.lock.Unlock() + p.instLock.Lock() + defer p.instLock.Unlock() ioutput := p.appendInstrument(output) - for set, storage := range p.data { - p.appendPoint(ioutput, set, methods.ToAggregation(storage), aggregation.CumulativeTemporality, seq.Start, seq.Now) + for set, entry := range p.data { + p.appendPoint(ioutput, set, &entry.storage, aggregation.CumulativeTemporality, seq.Start, seq.Now, false) } } @@ -80,41 +50,33 @@ type statelessSyncInstrument[N number.Any, Storage any, Methods aggregator.Metho func (p *statelessSyncInstrument[N, Storage, Methods]) Collect(seq data.Sequence, output *[]data.Instrument) { var methods Methods - p.lock.Lock() - defer p.lock.Unlock() + p.instLock.Lock() + defer p.instLock.Unlock() ioutput := p.appendInstrument(output) - for set, storage := range p.data { - if !methods.HasChange(storage) { - delete(p.data, set) - continue - } - - // Possibly re-use the underlying storage. For - // synchronous instruments, where accumulation happens - // between collection events (e.g., due to other - // readers collecting), we must reset the storage now - // or completely clear the map. - point, exists := p.appendOrReusePoint(ioutput) - if exists == nil { - exists = p.newStorage() - } else { - methods.Reset(exists) + for set, entry := range p.data { + p.appendPoint(ioutput, set, &entry.storage, aggregation.DeltaTemporality, seq.Last, seq.Now, true) + + // By passing reset=true above, the aggregator data in + // entry.storage has been moved into the last index of + // ioutput.Points. + ptsArr := ioutput.Points + point := &ptsArr[len(ptsArr)-1] + + cpy, _ := methods.ToStorage(point.Aggregation) + if !methods.HasChange(cpy) { + // Now If the data is unchanged, truncate. + ioutput.Points = ptsArr[0 : len(ptsArr)-1 : cap(ptsArr)] + // If there are no more accumulators, remove from the map. + if atomic.LoadInt64(&entry.auxiliary) == 0 { + delete(p.data, set) + } } - - // Note: This can be improved with a Copy() or Swap() - // operation in the Methods, since Merge() may be - // relatively expensive by comparison. - methods.Merge(exists, storage) - - point.Attributes = set - point.Aggregation = methods.ToAggregation(exists) - point.Temporality = aggregation.DeltaTemporality - point.Start = seq.Last - point.End = seq.Now - - methods.Reset(storage) + // Another design here would avoid the point before + // appending/truncating. This choice uses the slice + // of points to store the extra allocator used, even + // until the next collection. } } @@ -126,57 +88,54 @@ type statelessAsyncInstrument[N number.Any, Storage any, Methods aggregator.Meth // Collect for asynchronous cumulative temporality. func (p *statelessAsyncInstrument[N, Storage, Methods]) Collect(seq data.Sequence, output *[]data.Instrument) { - var methods Methods - - p.lock.Lock() - defer p.lock.Unlock() + p.instLock.Lock() + defer p.instLock.Unlock() ioutput := p.appendInstrument(output) - for set, storage := range p.data { - // Copy the underlying storage. - p.appendPoint(ioutput, set, methods.ToAggregation(storage), aggregation.CumulativeTemporality, seq.Start, seq.Now) + for set, entry := range p.data { + p.appendPoint(ioutput, set, &entry.storage, aggregation.CumulativeTemporality, seq.Start, seq.Now, false) } // Reset the entire map. - p.data = map[attribute.Set]*Storage{} + p.data = map[attribute.Set]*storageHolder[Storage, notUsed]{} } // statefulAsyncInstrument is an instrument that keeps asynchronous instrument state // in order to perform cumulative to delta translation. type statefulAsyncInstrument[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct { compiledAsyncBase[N, Storage, Methods] - prior map[attribute.Set]*Storage + prior map[attribute.Set]*storageHolder[Storage, notUsed] } // Collect for asynchronous delta temporality. func (p *statefulAsyncInstrument[N, Storage, Methods]) Collect(seq data.Sequence, output *[]data.Instrument) { var methods Methods - p.lock.Lock() - defer p.lock.Unlock() + p.instLock.Lock() + defer p.instLock.Unlock() ioutput := p.appendInstrument(output) - for set, storage := range p.data { + for set, entry := range p.data { pval, has := p.prior[set] if has { // This does `*pval := *storage - *pval` - methods.SubtractSwap(storage, pval) + methods.SubtractSwap(&entry.storage, &pval.storage) // Skip the series if it has not changed. - if !methods.HasChange(pval) { + if !methods.HasChange(&pval.storage) { continue } // Output the difference except for Gauge, in // which case output the new value. if p.desc.Kind.HasTemporality() { - storage = pval + entry = pval } } - p.appendPoint(ioutput, set, methods.ToAggregation(storage), aggregation.DeltaTemporality, seq.Last, seq.Now) + p.appendPoint(ioutput, set, &entry.storage, aggregation.DeltaTemporality, seq.Last, seq.Now, false) } // Copy the current to the prior and reset. p.prior = p.data - p.data = map[attribute.Set]*Storage{} + p.data = map[attribute.Set]*storageHolder[Storage, notUsed]{} } diff --git a/lightstep/sdk/metric/internal/viewstate/doc.go b/lightstep/sdk/metric/internal/viewstate/doc.go new file mode 100644 index 00000000..f4c3e156 --- /dev/null +++ b/lightstep/sdk/metric/internal/viewstate/doc.go @@ -0,0 +1,19 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// package viewstate implements a View compiler, which combines +// information about the instrument kind (especially synchronous +// vs. asynchronous), the configured view(s) and reader defaults, and +// conflicting instruments in the same namespace. +package viewstate // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/viewstate" diff --git a/lightstep/sdk/metric/internal/viewstate/viewstate.go b/lightstep/sdk/metric/internal/viewstate/viewstate.go index bea0843e..55be61d3 100644 --- a/lightstep/sdk/metric/internal/viewstate/viewstate.go +++ b/lightstep/sdk/metric/internal/viewstate/viewstate.go @@ -60,7 +60,7 @@ type Compiler struct { library instrumentation.Library // lock protects collectors and names. - lock sync.Mutex + compilerLock sync.Mutex // collectors is the de-duplicated list of metric outputs, which may // contain conflicting identities. @@ -110,7 +110,11 @@ type Accumulator interface { // There is no return value from this method; the caller can // safely forget an Accumulator after this method is called, // provided Update is not used again. - SnapshotAndProcess() + // + // When `final` is true, this is the last time the Accumulator + // will be snapshot and processed (according to the caller's + // reference counting). + SnapshotAndProcess(final bool) } // leafInstrument is one of the (synchronous or asynchronous), @@ -170,8 +174,8 @@ func New(library instrumentation.Library, views *view.Views) *Compiler { } func (v *Compiler) Collectors() []data.Collector { - v.lock.Lock() - defer v.lock.Unlock() + v.compilerLock.Lock() + defer v.compilerLock.Unlock() return v.collectors } @@ -228,8 +232,8 @@ func (v *Compiler) Compile(instrument sdkinstrument.Descriptor) (Instrument, Vie } } - v.lock.Lock() - defer v.lock.Unlock() + v.compilerLock.Lock() + defer v.compilerLock.Unlock() var conflicts ViewConflictsBuilder var compiled []Instrument @@ -294,7 +298,6 @@ func (v *Compiler) Compile(instrument sdkinstrument.Descriptor) (Instrument, Vie v.collectors = append(v.collectors, leaf) existingInsts = append(existingInsts, leaf) v.names[behavior.desc.Name] = existingInsts - } if len(existingInsts) > 1 || semanticErr != nil { c := Conflict{ @@ -333,11 +336,11 @@ func newSyncView[ // is being copied before the new object is returned to the // user, and the extra allocation cost here would be // noticeable. - metric := instrumentBase[N, Storage, Methods]{ + metric := instrumentBase[N, Storage, int64, Methods]{ fromName: behavior.fromName, desc: behavior.desc, acfg: behavior.acfg, - data: map[attribute.Set]*Storage{}, + data: map[attribute.Set]*storageHolder[Storage, int64]{}, keysSet: behavior.keysSet, keysFilter: behavior.keysFilter, } @@ -394,11 +397,11 @@ func newAsyncView[ // is being copied before the new object is returned to the // user, and the extra allocation cost here would be // noticeable. - metric := instrumentBase[N, Storage, Methods]{ + metric := instrumentBase[N, Storage, notUsed, Methods]{ fromName: behavior.fromName, desc: behavior.desc, acfg: behavior.acfg, - data: map[attribute.Set]*Storage{}, + data: map[attribute.Set]*storageHolder[Storage, notUsed]{}, keysSet: behavior.keysSet, keysFilter: behavior.keysFilter, } diff --git a/lightstep/sdk/metric/internal/viewstate/viewstate_test.go b/lightstep/sdk/metric/internal/viewstate/viewstate_test.go index b9c43780..b4d10d15 100644 --- a/lightstep/sdk/metric/internal/viewstate/viewstate_test.go +++ b/lightstep/sdk/metric/internal/viewstate/viewstate_test.go @@ -432,7 +432,7 @@ func TestDuplicatesMergeDescriptor(t *testing.T) { accUpp := inst1.NewAccumulator(attribute.NewSet()) accUpp.(Updater[int64]).Update(1) - accUpp.SnapshotAndProcess() + accUpp.SnapshotAndProcess(false) output := testCollect(t, vc) @@ -468,7 +468,7 @@ func TestViewDescription(t *testing.T) { accUpp := inst1.NewAccumulator(attribute.NewSet(attrs...)) accUpp.(Updater[int64]).Update(1) - accUpp.SnapshotAndProcess() + accUpp.SnapshotAndProcess(false) output := testCollect(t, vc) @@ -507,8 +507,8 @@ func TestKeyFilters(t *testing.T) { accUpp1.(Updater[int64]).Update(1) accUpp2.(Updater[int64]).Update(1) - accUpp1.SnapshotAndProcess() - accUpp2.SnapshotAndProcess() + accUpp1.SnapshotAndProcess(false) + accUpp2.SnapshotAndProcess(false) output := testCollect(t, vc) @@ -557,7 +557,7 @@ func TestTwoViewsOneInt64Instrument(t *testing.T) { inst.NewAccumulator(attribute.NewSet(attribute.String("a", "2"), attribute.String("b", "2"))), } { acc.(Updater[int64]).Update(1) - acc.SnapshotAndProcess() + acc.SnapshotAndProcess(false) } output := testCollect(t, vc) @@ -619,7 +619,7 @@ func TestHistogramTwoAggregations(t *testing.T) { acc.(Updater[float64]).Update(2) acc.(Updater[float64]).Update(3) acc.(Updater[float64]).Update(4) - acc.SnapshotAndProcess() + acc.SnapshotAndProcess(false) output := testCollect(t, vc) @@ -654,11 +654,11 @@ func TestAllKeysFilter(t *testing.T) { acc1 := inst.NewAccumulator(attribute.NewSet(attribute.String("a", "1"))) acc1.(Updater[float64]).Update(1) - acc1.SnapshotAndProcess() + acc1.SnapshotAndProcess(false) acc2 := inst.NewAccumulator(attribute.NewSet(attribute.String("b", "2"))) acc2.(Updater[float64]).Update(1) - acc2.SnapshotAndProcess() + acc2.SnapshotAndProcess(false) output := testCollect(t, vc) @@ -705,7 +705,7 @@ func TestAnySumAggregation(t *testing.T) { acc := inst.NewAccumulator(attribute.NewSet()) acc.(Updater[float64]).Update(1) - acc.SnapshotAndProcess() + acc.SnapshotAndProcess(false) } output := testCollect(t, vc) @@ -758,7 +758,7 @@ func TestDuplicateAsyncMeasurementsIngored(t *testing.T) { acc.(Updater[float64]).Update(1000) acc.(Updater[float64]).Update(10000) acc.(Updater[float64]).Update(100000) - acc.SnapshotAndProcess() + acc.SnapshotAndProcess(false) } output := testCollect(t, vc) @@ -810,7 +810,7 @@ func TestCumulativeTemporality(t *testing.T) { inst2.NewAccumulator(setB), } { acc.(Updater[float64]).Update(1) - acc.SnapshotAndProcess() + acc.SnapshotAndProcess(false) } test.RequireEqualMetrics(t, testCollect(t, vc), @@ -865,7 +865,7 @@ func TestDeltaTemporalityCounter(t *testing.T) { inst2.NewAccumulator(setB), } { acc.(Updater[float64]).Update(float64(rounds)) - acc.SnapshotAndProcess() + acc.SnapshotAndProcess(false) } test.RequireEqualMetrics(t, testCollectSequence(t, vc, seq), @@ -912,11 +912,11 @@ func TestDeltaTemporalityGauge(t *testing.T) { observe := func(x int) { accI := instI.NewAccumulator(set) accI.(Updater[int64]).Update(int64(x)) - accI.SnapshotAndProcess() + accI.SnapshotAndProcess(false) accF := instF.NewAccumulator(set) accF.(Updater[float64]).Update(float64(x)) - accF.SnapshotAndProcess() + accF.SnapshotAndProcess(false) } expectValues := func(x int, seq data.Sequence) { @@ -976,7 +976,7 @@ func TestDeltaTemporalityGauge(t *testing.T) { } // TestSyncDeltaTemporalityCounter ensures that counter and updowncounter -// are skip points with delta temporality and no change. +// skip points with delta temporality and no change. func TestSyncDeltaTemporalityCounter(t *testing.T) { views := view.New( "test", @@ -1007,19 +1007,19 @@ func TestSyncDeltaTemporalityCounter(t *testing.T) { observe := func(mono, nonMono int) { accCI := instCI.NewAccumulator(set) accCI.(Updater[int64]).Update(int64(mono)) - accCI.SnapshotAndProcess() + accCI.SnapshotAndProcess(false) accCF := instCF.NewAccumulator(set) accCF.(Updater[float64]).Update(float64(mono)) - accCF.SnapshotAndProcess() + accCF.SnapshotAndProcess(false) accUI := instUI.NewAccumulator(set) accUI.(Updater[int64]).Update(int64(nonMono)) - accUI.SnapshotAndProcess() + accUI.SnapshotAndProcess(false) accUF := instUF.NewAccumulator(set) accUF.(Updater[float64]).Update(float64(nonMono)) - accUF.SnapshotAndProcess() + accUF.SnapshotAndProcess(false) } expectValues := func(mono, nonMono int, seq data.Sequence) { @@ -1080,3 +1080,57 @@ func TestSyncDeltaTemporalityCounter(t *testing.T) { expectValues(100, 100, seq) tick() } + +func TestSyncDeltaTemporalityMapDeletion(t *testing.T) { + views := view.New( + "test", + view.WithDefaultAggregationTemporalitySelector( + func(ik sdkinstrument.Kind) aggregation.Temporality { + return aggregation.DeltaTemporality // Always delta + }), + ) + + vc := New(testLib, views) + + inst, err := testCompile(vc, "counter", sdkinstrument.CounterKind, number.Float64Kind) + require.NoError(t, err) + + attr := attribute.String("A", "1") + set := attribute.NewSet(attr) + + acc1 := inst.NewAccumulator(set) + acc2 := inst.NewAccumulator(set) + + acc1.(Updater[float64]).Update(1) + acc2.(Updater[float64]).Update(1) + + // There are two references to one entry in the map. + require.Equal(t, 1, len(inst.(*statelessSyncInstrument[float64, sum.MonotonicFloat64, sum.MonotonicFloat64Methods]).data)) + + acc1.SnapshotAndProcess(false) + acc2.SnapshotAndProcess(true) + + var output data.Scope + + test.RequireEqualMetrics(t, + testCollectSequenceReuse(t, vc, testSequence, &output), + test.Instrument( + test.Descriptor("counter", sdkinstrument.CounterKind, number.Float64Kind), + test.Point(middleTime, endTime, sum.NewMonotonicFloat64(2), delta, attr), + ), + ) + + require.Equal(t, 1, len(inst.(*statelessSyncInstrument[float64, sum.MonotonicFloat64, sum.MonotonicFloat64Methods]).data)) + + acc1.SnapshotAndProcess(true) + + test.RequireEqualMetrics(t, + testCollectSequenceReuse(t, vc, testSequence, &output), + test.Instrument( + test.Descriptor("counter", sdkinstrument.CounterKind, number.Float64Kind), + ), + ) + + require.Equal(t, 0, len(inst.(*statelessSyncInstrument[float64, sum.MonotonicFloat64, sum.MonotonicFloat64Methods]).data)) + +} diff --git a/lightstep/sdk/metric/meter.go b/lightstep/sdk/metric/meter.go index 6831ef44..bdc9d91d 100644 --- a/lightstep/sdk/metric/meter.go +++ b/lightstep/sdk/metric/meter.go @@ -18,6 +18,13 @@ import ( "context" "sync" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/asyncstate" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/pipeline" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/syncstate" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/viewstate" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/instrument" "go.opentelemetry.io/otel/metric/instrument/asyncfloat64" @@ -25,11 +32,6 @@ import ( "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "go.opentelemetry.io/otel/metric/instrument/syncint64" "go.opentelemetry.io/otel/sdk/instrumentation" - "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/asyncstate" - "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/pipeline" - "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/syncstate" - "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/viewstate" - "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument" ) // meter handles the creation and coordination of all metric instruments. A @@ -83,3 +85,82 @@ func (m *meter) SyncInt64() syncint64.InstrumentProvider { func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { return syncfloat64Instruments{m} } + +// instrumentConstructor refers to either the syncstate or asyncstate +// NewInstrument method. Although both receive an opaque interface{} +// to distinguish providers, only the asyncstate package needs to know +// this information. The unused parameter is passed to the syncstate +// package for the generalization used here to work. +type instrumentConstructor[T any] func( + instrument sdkinstrument.Descriptor, + opaque interface{}, + compiled pipeline.Register[viewstate.Instrument], +) *T + +// configureInstrument applies the instrument configuration, checks +// for an existing definition for the same descriptor, and compiles +// and constructs the instrument if necessary. +func configureInstrument[T any]( + m *meter, + name string, + opts []instrument.Option, + nk number.Kind, + ik sdkinstrument.Kind, + listPtr *[]*T, + ctor instrumentConstructor[T], +) (*T, error) { + // Compute the instrument descriptor + cfg := instrument.NewConfig(opts...) + desc := sdkinstrument.NewDescriptor(name, ik, nk, cfg.Description(), cfg.Unit()) + + m.lock.Lock() + defer m.lock.Unlock() + + // Lookup a pre-existing instrument by descriptor. + if lookup, has := m.byDesc[desc]; has { + // Recompute conflicts since they may have changed. + var conflicts viewstate.ViewConflictsBuilder + + for _, compiler := range m.compilers { + _, err := compiler.Compile(desc) + conflicts.Combine(err) + } + + return lookup.(*T), conflicts.AsError() + } + + // Compile the instrument for each pipeline. the first time. + var conflicts viewstate.ViewConflictsBuilder + compiled := pipeline.NewRegister[viewstate.Instrument](len(m.compilers)) + + for pipe, compiler := range m.compilers { + comp, err := compiler.Compile(desc) + compiled[pipe] = comp + conflicts.Combine(err) + } + + // Build the new instrument, cache it, append to the list. + inst := ctor(desc, m, compiled) + err := conflicts.AsError() + + if inst != nil { + m.byDesc[desc] = inst + } + *listPtr = append(*listPtr, inst) + if err != nil { + // Handle instrument creation errors when they're new, + // not for repeat entries above. + otel.Handle(err) + } + return inst, err +} + +// synchronousInstrument configures a synchronous instrument. +func (m *meter) synchronousInstrument(name string, opts []instrument.Option, nk number.Kind, ik sdkinstrument.Kind) (*syncstate.Instrument, error) { + return configureInstrument(m, name, opts, nk, ik, &m.syncInsts, syncstate.NewInstrument) +} + +// synchronousInstrument configures an asynchronous instrument. +func (m *meter) asynchronousInstrument(name string, opts []instrument.Option, nk number.Kind, ik sdkinstrument.Kind) (*asyncstate.Instrument, error) { + return configureInstrument(m, name, opts, nk, ik, &m.asyncInsts, asyncstate.NewInstrument) +} diff --git a/lightstep/sdk/metric/number/number.go b/lightstep/sdk/metric/number/number.go index 4d8ad69e..01540436 100644 --- a/lightstep/sdk/metric/number/number.go +++ b/lightstep/sdk/metric/number/number.go @@ -22,14 +22,14 @@ import "math" type Kind int8 const ( - // Int64Kind means that the Number stores int64. + // Int64Kind indicates int64. Int64Kind Kind = iota - // Float64Kind means that the Number stores float64. + // Float64Kind indicates float64. Float64Kind ) -// Number is a generic 64bit numeric value. +// Number is a 64bit numeric value, one of the Any interface types. type Number uint64 // Any is any of the supported generic Number types. diff --git a/lightstep/sdk/metric/number/traits.go b/lightstep/sdk/metric/number/traits.go index 3921c54b..e23b708a 100644 --- a/lightstep/sdk/metric/number/traits.go +++ b/lightstep/sdk/metric/number/traits.go @@ -21,7 +21,7 @@ import ( ) // Traits is the generic traits interface for numbers used in the SDK. -type Traits[N int64 | float64] interface { +type Traits[N Any] interface { // FromNumber turns a generic 64bits into the correct machine type. FromNumber(Number) N @@ -31,10 +31,13 @@ type Traits[N int64 | float64] interface { // SetAtomic sets `ptr` to `value`. SetAtomic(ptr *N, value N) + // GetAtomic reads `ptr`. + GetAtomic(ptr *N) N + // AddAtomic sets `ptr` to `value+*ptr`. AddAtomic(ptr *N, value N) - // AddAtomic sets `ptr` to `value` and returns the former value. + // SwapAtomic sets `ptr` to `value` and returns the former value. SwapAtomic(ptr *N, value N) N // IsNaN indicates whether `math.IsNaN()` is true (impossible for int64). @@ -43,13 +46,15 @@ type Traits[N int64 | float64] interface { // IsInf indicates whether `math.IsInf()` is true (impossible for int64). IsInf(value N) bool - // Kind of + // Kind returns the number kind of these Traits. Kind() Kind } // Int64Traits implements Traits[int64]. type Int64Traits struct{} +var _ Traits[int64] = Int64Traits{} + func (Int64Traits) ToNumber(x int64) Number { return Number(x) } @@ -58,6 +63,10 @@ func (Int64Traits) FromNumber(n Number) int64 { return int64(n) } +func (Int64Traits) GetAtomic(ptr *int64) int64 { + return atomic.LoadInt64(ptr) +} + func (Int64Traits) SetAtomic(ptr *int64, value int64) { atomic.StoreInt64(ptr, value) } @@ -85,6 +94,8 @@ func (Int64Traits) Kind() Kind { // Float64Traits implements Traits[float64]. type Float64Traits struct{} +var _ Traits[float64] = Float64Traits{} + func (Float64Traits) ToNumber(x float64) Number { return Number(math.Float64bits(x)) } @@ -93,6 +104,10 @@ func (Float64Traits) FromNumber(n Number) float64 { return math.Float64frombits(uint64(n)) } +func (Float64Traits) GetAtomic(ptr *float64) float64 { + return math.Float64frombits(atomic.LoadUint64((*uint64)(unsafe.Pointer(ptr)))) +} + func (Float64Traits) SetAtomic(ptr *float64, value float64) { atomic.StoreUint64((*uint64)(unsafe.Pointer(ptr)), math.Float64bits(value)) } diff --git a/lightstep/sdk/metric/periodic.go b/lightstep/sdk/metric/periodic.go new file mode 100644 index 00000000..6ef7fda2 --- /dev/null +++ b/lightstep/sdk/metric/periodic.go @@ -0,0 +1,121 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric" + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/data" + "go.opentelemetry.io/otel" +) + +// PushExporter is an interface for push-based exporters. +type PushExporter interface { + String() string + ExportMetrics(context.Context, data.Metrics) error + ShutdownMetrics(context.Context, data.Metrics) error + ForceFlushMetrics(context.Context, data.Metrics) error +} + +// PeriodicReader is an implementation of Reader that manages periodic +// exporter, flush, and shutdown. This implementation re-uses data +// from one collection to the next, to lower memory costs. +type PeriodicReader struct { + lock sync.Mutex + data data.Metrics + interval time.Duration + exporter PushExporter + producer Producer + stop context.CancelFunc + wait sync.WaitGroup +} + +// NewPeriodicReader constructs a PeriodicReader from a push-based +// exporter given an interval (TODO: and options). +func NewPeriodicReader(exporter PushExporter, interval time.Duration /* opts ...Option*/) Reader { + return &PeriodicReader{ + interval: interval, + exporter: exporter, + } +} + +// String returns the exporter name and the configured interval. +func (pr *PeriodicReader) String() string { + return fmt.Sprintf("%v interval %v", pr.exporter.String(), pr.interval) +} + +// Register starts the periodic export loop. +func (pr *PeriodicReader) Register(producer Producer) { + ctx, cancel := context.WithCancel(context.Background()) + + pr.producer = producer + pr.stop = cancel + pr.wait.Add(1) + + go pr.start(ctx) +} + +// start runs the export loop. +func (pr *PeriodicReader) start(ctx context.Context) { + defer pr.wait.Done() + ticker := time.NewTicker(pr.interval) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := pr.collect(ctx, pr.exporter.ExportMetrics); err != nil { + otel.Handle(err) + } + } + } +} + +// Shutdown stops the export loop, canceling its Context, and waits +// for it to return. Then it issues a ShutdownMetrics with final +// data. +func (pr *PeriodicReader) Shutdown(ctx context.Context) error { + pr.stop() + pr.wait.Wait() + + return pr.collect(ctx, pr.exporter.ShutdownMetrics) +} + +// ForceFlush immediately waits for an existing collection, otherwise +// immediately begins collection without regards to timing and calls +// ForceFlush with current data. +func (pr *PeriodicReader) ForceFlush(ctx context.Context) error { + return pr.collect(ctx, pr.exporter.ForceFlushMetrics) +} + +// collect serializes access to re-usable metrics data, in each case +// calling through to an underlying PushExporter method with current +// data. +func (pr *PeriodicReader) collect(ctx context.Context, method func(context.Context, data.Metrics) error) error { + pr.lock.Lock() + defer pr.lock.Unlock() + + // The lock ensures that re-use of `pr.data` is successful, it + // means that shutdown, flush, and ordinary collection are + // exclusive. Note that shutdown will cancel a concurrent + // (ordinary) export, while flush will wait for a concurrent + // export. + pr.data = pr.producer.Produce(&pr.data) + + return method(ctx, pr.data) +} diff --git a/lightstep/sdk/metric/producer.go b/lightstep/sdk/metric/producer.go index 9334afa7..5cc00841 100644 --- a/lightstep/sdk/metric/producer.go +++ b/lightstep/sdk/metric/producer.go @@ -90,7 +90,15 @@ func (pp *providerProducer) Produce(inout *data.Metrics) data.Metrics { // collectFor collects from a single meter. func (m *meter) collectFor(ctx context.Context, pipe int, seq data.Sequence, output *data.Metrics) { - // Use m.lock to briefly access the current lists: syncInsts, asyncInsts, callbacks + // Use m.lock to briefly access the current lists: syncInsts, + // asyncInsts, callbacks. By releasing these locks, we allow + // new instruments and callbacks to be registered while + // collection happens. The items themselves are synchronized, + // and the slices are only appended to, so shallow copies are + // safe. If new instruments and callbacks are registered while + // this collection happens, they simply will not collect and + // any activity they experience concurrently will be + // registered on the next collection by this reader. m.lock.Lock() syncInsts := m.syncInsts asyncInsts := m.asyncInsts diff --git a/lightstep/sdk/metric/provider_test.go b/lightstep/sdk/metric/provider_test.go index b6e0f329..17f937c7 100644 --- a/lightstep/sdk/metric/provider_test.go +++ b/lightstep/sdk/metric/provider_test.go @@ -299,6 +299,5 @@ func TestDuplicateInstrumentConflict(t *testing.T) { // there is only one conflict passed to otel.Handle()--only // the first error is handled, even though it happened twice. require.Equal(t, 1, len(*errs)) - fmt.Println((*errs)[0]) require.True(t, errors.Is((*errs)[0], viewstate.ViewConflictsError{})) } diff --git a/lightstep/sdk/metric/reader.go b/lightstep/sdk/metric/reader.go index b801de60..ac45f6a4 100644 --- a/lightstep/sdk/metric/reader.go +++ b/lightstep/sdk/metric/reader.go @@ -23,9 +23,9 @@ import ( // Reader is the interface used between the SDK and an // exporter. Control flow is bi-directional through the // Reader, since the SDK initiates ForceFlush and Shutdown -// while the initiates collection. The Register() method here -// informs the Reader that it can begin reading, signaling the -// start of bi-directional control flow. +// while the Reader initiates collection. The Register() +// method here informs the Reader that it can begin reading, +// signaling the start of bi-directional control flow. // // Typically, push-based exporters that are periodic will // implement PeroidicExporter themselves and construct a @@ -54,7 +54,7 @@ type Reader interface { type Producer interface { // Produce returns metrics from a single collection. // - // Produce may be called concurrently, + // Produce may be called concurrently. // // The `in` parameter supports re-use of memory from // one collection to the next. Callers that pass `in` diff --git a/lightstep/sdk/metric/sdkinstrument/kind.go b/lightstep/sdk/metric/sdkinstrument/kind.go index dfe761d8..b59d45b5 100644 --- a/lightstep/sdk/metric/sdkinstrument/kind.go +++ b/lightstep/sdk/metric/sdkinstrument/kind.go @@ -27,6 +27,9 @@ const ( // HistogramKind indicates a Histogram instrument. HistogramKind + // TODO: replace "Observer" with "Asynchronous" or "Observable", + // both of which are recommended in the API spec. + // CounterObserverKind indicates a CounterObserver instrument. CounterObserverKind // UpDownCounterObserverKind indicates a UpDownCounterObserver diff --git a/lightstep/sdk/metric/view/standard.go b/lightstep/sdk/metric/view/standard.go index 48df963a..ce8ffc5d 100644 --- a/lightstep/sdk/metric/view/standard.go +++ b/lightstep/sdk/metric/view/standard.go @@ -20,8 +20,8 @@ import ( "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument" ) -// StandardAggregation is the specified default aggregation Kind for -// each instrument Kind. +// StandardAggregationKind returns a function that configures the +// specified default aggregation Kind for each instrument Kind. func StandardAggregationKind(ik sdkinstrument.Kind) aggregation.Kind { switch ik { case sdkinstrument.HistogramKind: @@ -35,14 +35,15 @@ func StandardAggregationKind(ik sdkinstrument.Kind) aggregation.Kind { } } -// StandardTemporality returns the specified default Cumulative -// temporality for all instrument kinds. +// StandardTemporality returns a function that conifigures the +// specified default Cumulative temporality for all instrument kinds. func StandardTemporality(ik sdkinstrument.Kind) aggregation.Temporality { return aggregation.CumulativeTemporality } -// DeltaPreferredTemporality returns the specified Delta temporality -// for all instrument kinds except UpDownCounter, which remain Cumulative. +// DeltaPreferredTemporality returns a function that configures a +// preference for Delta temporality for all instrument kinds except +// UpDownCounter, which remain Cumulative. func DeltaPreferredTemporality(ik sdkinstrument.Kind) aggregation.Temporality { switch ik { case sdkinstrument.UpDownCounterKind, sdkinstrument.UpDownCounterObserverKind: @@ -52,7 +53,7 @@ func DeltaPreferredTemporality(ik sdkinstrument.Kind) aggregation.Temporality { } } -// StandardConfig returns two default-initialized aggregator.Configs. +// StandardConfig returns a function that configures two default aggregator.Configs. func StandardConfig(ik sdkinstrument.Kind) (ints, floats aggregator.Config) { return aggregator.Config{}, aggregator.Config{} }