Skip to content

Commit

Permalink
synchronize the sync-delta collector
Browse files Browse the repository at this point in the history
  • Loading branch information
Joshua MacDonald committed Jun 6, 2022
1 parent dfefd1e commit 54ac1b8
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 129 deletions.
6 changes: 3 additions & 3 deletions lightstep/sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type instrumentConstructor[T any] func(
instrument sdkinstrument.Descriptor,
opaque interface{},
compiled pipeline.Register[viewstate.Instrument],
) T
) *T

// configureInstrument applies the instrument configuration, checks
// for an existing definition for the same descriptor, and compiles
Expand All @@ -46,7 +46,7 @@ func configureInstrument[T any](
nk number.Kind,
ik sdkinstrument.Kind,
listPtr *[]*T,
ctor instrumentConstructor[*T],
ctor instrumentConstructor[T],
) (*T, error) {
// Compute the instrument descriptor
cfg := instrument.NewConfig(opts...)
Expand All @@ -65,7 +65,7 @@ func configureInstrument[T any](
conflicts.Combine(err)
}

return lookup.(T), conflicts.AsError()
return lookup.(*T), conflicts.AsError()
}

// Compile the instrument for each pipeline. the first time.
Expand Down
2 changes: 1 addition & 1 deletion lightstep/sdk/metric/internal/asyncstate/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (inst *Instrument) SnapshotAndProcess(state *State) {
defer state.lock.Unlock()

for _, acc := range state.store[inst] {
acc.SnapshotAndProcess()
acc.SnapshotAndProcess(false)
}
}

Expand Down
6 changes: 6 additions & 0 deletions lightstep/sdk/metric/internal/syncstate/refcount_mapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ type refcountMapped struct {
value int64
}

func newRefcountMapped() refcountMapped {
return refcountMapped{
value: 2,
}
}

// ref returns true if the entry is still mapped and increases the
// reference usages, if unmapped returns false.
func (rm *refcountMapped) ref() bool {
Expand Down
10 changes: 5 additions & 5 deletions lightstep/sdk/metric/internal/syncstate/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewInstrument(desc sdkinstrument.Descriptor, _ interface{}, compiled pipeli
func (inst *Instrument) SnapshotAndProcess() {
inst.current.Range(func(key interface{}, value interface{}) bool {
rec := value.(*record)
if rec.snapshotAndProcess() {
if rec.snapshotAndProcess(false) {
return true
}
// Having no updates since last collection, try to unmap:
Expand All @@ -103,7 +103,7 @@ func (inst *Instrument) SnapshotAndProcess() {
// now, but there could have been an update between
// the snapshotAndProcess() and tryUnmap() above, so
// snapshotAndProcess one last time.
_ = rec.snapshotAndProcess()
_ = rec.snapshotAndProcess(true)
return true
})
}
Expand Down Expand Up @@ -133,14 +133,14 @@ type record struct {
// SnapshotAndProcess on the associated accumulator and returns true.
// If updates happened since the last collection (by any reader),
// returns false.
func (rec *record) snapshotAndProcess() bool {
func (rec *record) snapshotAndProcess(final bool) bool {
mods := atomic.LoadInt64(&rec.updateCount)
coll := atomic.LoadInt64(&rec.collectedCount)

if mods == coll {
return false
}
rec.accumulator.SnapshotAndProcess()
rec.accumulator.SnapshotAndProcess(final)

// Updates happened in this interval, collect and continue.
atomic.StoreInt64(&rec.collectedCount, mods)
Expand Down Expand Up @@ -187,7 +187,7 @@ func acquireRecord[N number.Any](inst *Instrument, attrs []attribute.KeyValue) (
// if it is never returned, it will not be updated and can be
// safely discarded.
newRec := &record{
refMapped: refcountMapped{value: 2},
refMapped: newRefcountMapped(),
accumulator: inst.compiled.NewAccumulator(aset),
}

Expand Down
90 changes: 77 additions & 13 deletions lightstep/sdk/metric/internal/viewstate/accumulators.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,76 @@ package viewstate // import "github.com/lightstep/otel-launcher-go/lightstep/sdk

import (
"sync"
"sync/atomic"

"github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator"
"github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number"
"go.opentelemetry.io/otel/attribute"
)

// compiledSyncBase is any synchronous instrument view.
type compiledSyncBase[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct {
instrumentBase[N, Storage, int64, Methods]
}

// NewAccumulator returns a Accumulator for a synchronous instrument view.
func (csv *compiledSyncBase[N, Storage, Methods]) NewAccumulator(kvs attribute.Set) Accumulator {
sc := &syncAccumulator[N, Storage, Methods]{}
csv.initStorage(&sc.current)
csv.initStorage(&sc.snapshot)

holder := csv.findStorage(kvs)

sc.holder = holder

return sc
}

func (csv *compiledSyncBase[N, Storage, Methods]) findStorage(
kvs attribute.Set,
) *storageHolder[Storage, int64] {
kvs = csv.applyKeysFilter(kvs)

csv.instLock.Lock()
defer csv.instLock.Unlock()

entry := csv.getOrCreateEntry(kvs)
atomic.AddInt64(&entry.auxiliary, 1)
return entry
}

// compiledAsyncBase is any asynchronous instrument view.
type compiledAsyncBase[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct {
instrumentBase[N, Storage, notUsed, Methods]
}

// NewAccumulator returns a Accumulator for an asynchronous instrument view.
func (cav *compiledAsyncBase[N, Storage, Methods]) NewAccumulator(kvs attribute.Set) Accumulator {
ac := &asyncAccumulator[N, Storage, Methods]{}

ac.holder = cav.findStorage(kvs)

return ac
}

func (cav *compiledAsyncBase[N, Storage, Methods]) findStorage(
kvs attribute.Set,
) *storageHolder[Storage, notUsed] {
kvs = cav.applyKeysFilter(kvs)

cav.instLock.Lock()
defer cav.instLock.Unlock()

entry := cav.getOrCreateEntry(kvs)
return entry
}

// multiAccumulator
type multiAccumulator[N number.Any] []Accumulator

func (acc multiAccumulator[N]) SnapshotAndProcess() {
func (acc multiAccumulator[N]) SnapshotAndProcess(final bool) {
for _, coll := range acc {
coll.SnapshotAndProcess()
coll.SnapshotAndProcess(final)
}
}

Expand All @@ -40,30 +99,35 @@ func (acc multiAccumulator[N]) Update(value N) {
type syncAccumulator[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct {
// syncLock prevents two readers from calling
// SnapshotAndProcess at the same moment.
syncLock sync.Mutex
current Storage
snapshot Storage
findStorage func() *Storage
syncLock sync.Mutex
current Storage
snapshot Storage
holder *storageHolder[Storage, int64]
}

func (acc *syncAccumulator[N, Storage, Methods]) Update(number N) {
var methods Methods
methods.Update(&acc.current, number)
}

func (acc *syncAccumulator[N, Storage, Methods]) SnapshotAndProcess() {
func (acc *syncAccumulator[N, Storage, Methods]) SnapshotAndProcess(final bool) {
var methods Methods
acc.syncLock.Lock()
defer acc.syncLock.Unlock()
methods.Move(&acc.current, &acc.snapshot)
methods.Merge(&acc.snapshot, acc.findStorage())
methods.Merge(&acc.snapshot, &acc.holder.storage)
if final {
atomic.AddInt64(&acc.holder.auxiliary, -1)
}
}

type notUsed struct{}

// asyncAccumulator
type asyncAccumulator[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct {
asyncLock sync.Mutex
current N
findStorage func() *Storage
asyncLock sync.Mutex
current N
holder *storageHolder[Storage, notUsed]
}

func (acc *asyncAccumulator[N, Storage, Methods]) Update(number N) {
Expand All @@ -72,10 +136,10 @@ func (acc *asyncAccumulator[N, Storage, Methods]) Update(number N) {
acc.current = number
}

func (acc *asyncAccumulator[N, Storage, Methods]) SnapshotAndProcess() {
func (acc *asyncAccumulator[N, Storage, Methods]) SnapshotAndProcess(_ bool) {
acc.asyncLock.Lock()
defer acc.asyncLock.Unlock()

var methods Methods
methods.Update(acc.findStorage(), acc.current)
methods.Update(&acc.holder.storage, acc.current)
}
66 changes: 35 additions & 31 deletions lightstep/sdk/metric/internal/viewstate/base_instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,79 +26,83 @@ import (
"go.opentelemetry.io/otel/attribute"
)

type syncAuxiliary int64

type asyncAuxiliary struct{}

type storageHolder[Storage, Auxiliary any] struct {
auxiliary Auxiliary
storage Storage
}

// instrumentBase is the common type embedded in any of the compiled instrument views.
type instrumentBase[N number.Any, Storage any, Methods aggregator.Methods[N, Storage]] struct {
type instrumentBase[N number.Any, Storage, Auxiliary any, Methods aggregator.Methods[N, Storage]] struct {
instLock sync.Mutex
fromName string
desc sdkinstrument.Descriptor
acfg aggregator.Config
data map[attribute.Set]*Storage
data map[attribute.Set]*storageHolder[Storage, Auxiliary]

keysSet *attribute.Set
keysFilter *attribute.Filter
}

func (metric *instrumentBase[N, Storage, Methods]) Aggregation() aggregation.Kind {
func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) Aggregation() aggregation.Kind {
var methods Methods
return methods.Kind()
}

func (metric *instrumentBase[N, Storage, Methods]) OriginalName() string {
func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) OriginalName() string {
return metric.fromName
}

func (metric *instrumentBase[N, Storage, Methods]) Descriptor() sdkinstrument.Descriptor {
func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) Descriptor() sdkinstrument.Descriptor {
return metric.desc
}

func (metric *instrumentBase[N, Storage, Methods]) Keys() *attribute.Set {
func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) Keys() *attribute.Set {
return metric.keysSet
}

func (metric *instrumentBase[N, Storage, Methods]) Config() aggregator.Config {
func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) Config() aggregator.Config {
return metric.acfg
}

func (metric *instrumentBase[N, Storage, Methods]) initStorage(s *Storage) {
func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) initStorage(s *Storage) {
var methods Methods
methods.Init(s, metric.acfg)
}

func (metric *instrumentBase[N, Storage, Methods]) mergeDescription(d string) {
func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) mergeDescription(d string) {
metric.instLock.Lock()
defer metric.instLock.Unlock()
if len(d) > len(metric.desc.Description) {
metric.desc.Description = d
}
}

// storageFinder searches for and possibly allocates an output Storage
// for this metric. Filtered keys, if a filter is provided, will be
// computed once.
func (metric *instrumentBase[N, Storage, Methods]) storageFinder(
kvs attribute.Set,
) func() *Storage {
func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) applyKeysFilter(kvs attribute.Set) attribute.Set {
if metric.keysFilter != nil {
kvs, _ = attribute.NewSetWithFiltered(kvs.ToSlice(), *metric.keysFilter)
}
return kvs
}

return func() *Storage {
metric.instLock.Lock()
defer metric.instLock.Unlock()

storage, has := metric.data[kvs]
if has {
return storage
}

ns := metric.newStorage()
metric.data[kvs] = ns
return ns
func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) getOrCreateEntry(kvs attribute.Set) *storageHolder[Storage, Auxiliary] {
entry, has := metric.data[kvs]
if has {
return entry
}

var methods Methods
entry = &storageHolder[Storage, Auxiliary]{}
methods.Init(&entry.storage, metric.acfg)
metric.data[kvs] = entry
return entry
}

// newStorage allocates and initializes a new Storage.
func (metric *instrumentBase[N, Storage, Methods]) newStorage() *Storage {
func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) newStorage() *Storage {
ns := new(Storage)
metric.initStorage(ns)
return ns
Expand All @@ -110,15 +114,15 @@ func (metric *instrumentBase[N, Storage, Methods]) newStorage() *Storage {
// be produced (in the same order); consumers of delta temporality
// should expect to see empty instruments in the output for metric
// data that is unchanged.
func (metric *instrumentBase[N, Storage, Methods]) appendInstrument(output *[]data.Instrument) *data.Instrument {
func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) appendInstrument(output *[]data.Instrument) *data.Instrument {
inst := data.ReallocateFrom(output)
inst.Descriptor = metric.desc
return inst
}

// copyPoint is used in cases where the output Aggregation is a copy
// of the stored object.
func (metric *instrumentBase[N, Storage, Methods]) appendPoint(inst *data.Instrument, set attribute.Set, storage *Storage, tempo aggregation.Temporality, start, end time.Time, reset bool) {
func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) appendPoint(inst *data.Instrument, set attribute.Set, storage *Storage, tempo aggregation.Temporality, start, end time.Time, reset bool) {
var methods Methods

// Possibly re-use the underlying storage.
Expand All @@ -144,7 +148,7 @@ func (metric *instrumentBase[N, Storage, Methods]) appendPoint(inst *data.Instru

// appendOrReusePoint is an alternate to appendPoint; this form is used when
// the storage will be reset on collection.
func (metric *instrumentBase[N, Storage, Methods]) appendOrReusePoint(inst *data.Instrument) (*data.Point, *Storage) {
func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) appendOrReusePoint(inst *data.Instrument) (*data.Point, *Storage) {
point := data.ReallocateFrom(&inst.Points)

var methods Methods
Expand Down
Loading

0 comments on commit 54ac1b8

Please sign in to comment.