Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

End-to-end demonstration of metric dimensionality reduction #1023

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 75 additions & 12 deletions api/label/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type (
encoded [maxConcurrentEncoders]string
}

Filter func(string) bool

// Distinct wraps a variable-size array of `kv.KeyValue`,
// constructed with keys in sorted order. This can be used as
// a map key or for equality checking between Sets.
Expand Down Expand Up @@ -100,6 +102,7 @@ func (l *Set) Get(idx int) (kv.KeyValue, bool) {
if l == nil {
return kv.KeyValue{}, false
}

value := l.equivalent.reflect()

if idx >= 0 && idx < value.Len() {
Expand All @@ -116,8 +119,10 @@ func (l *Set) Value(k kv.Key) (kv.Value, bool) {
if l == nil {
return kv.Value{}, false
}
rValue := l.equivalent.reflect()
vlen := rValue.Len()
var vlen int
var rValue reflect.Value
vlen = rValue.Len()
rValue = l.equivalent.reflect()

idx := sort.Search(vlen, func(idx int) bool {
return rValue.Index(idx).Interface().(kv.KeyValue).Key >= k
Expand Down Expand Up @@ -221,6 +226,12 @@ func (l *Set) Encoded(encoder Encoder) string {
return r
}

func empty() Set {
return Set{
equivalent: emptySet.equivalent,
}
}

// NewSet returns a new `*Set`. See the documentation for
// `NewSetWithSortable` for more details.
//
Expand All @@ -229,12 +240,10 @@ func (l *Set) Encoded(encoder Encoder) string {
func NewSet(kvs ...kv.KeyValue) Set {
// Check for empty set.
if len(kvs) == 0 {
return Set{
equivalent: emptySet.equivalent,
}
return empty()
}

return NewSetWithSortable(kvs, new(Sortable))
s, _ := NewSetWithSortableFiltered(kvs, new(Sortable), nil)
return s //nolint
}

// NewSetWithSortable returns a new `*Set`.
Expand All @@ -261,9 +270,24 @@ func NewSet(kvs ...kv.KeyValue) Set {
func NewSetWithSortable(kvs []kv.KeyValue, tmp *Sortable) Set {
// Check for empty set.
if len(kvs) == 0 {
return Set{
equivalent: emptySet.equivalent,
}
return empty()
}
s, _ := NewSetWithSortableFiltered(kvs, tmp, nil)
return s //nolint
}

func NewSetWithFiltered(kvs []kv.KeyValue, filter Filter) (Set, []kv.KeyValue) {
// Check for empty set.
if len(kvs) == 0 {
return empty(), nil
}
return NewSetWithSortableFiltered(kvs, new(Sortable), filter)
}

func NewSetWithSortableFiltered(kvs []kv.KeyValue, tmp *Sortable, filter Filter) (Set, []kv.KeyValue) {
// Check for empty set.
if len(kvs) == 0 {
return empty(), nil
}

*tmp = kvs
Expand All @@ -287,13 +311,52 @@ func NewSetWithSortable(kvs []kv.KeyValue, tmp *Sortable) Set {
if kvs[offset].Key == kvs[position].Key {
continue
}
kvs[offset], kvs[position-1] = kvs[position-1], kvs[offset]
position--
kvs[offset], kvs[position] = kvs[position], kvs[offset]
}
if filter != nil {
return filterSet(kvs[position:], filter)
}

return Set{
equivalent: computeDistinct(kvs[position:]),
}, nil
}

func filterSet(kvs []kv.KeyValue, filter Filter) (Set, []kv.KeyValue) {
var excluded []kv.KeyValue

// move labels that do not match the optional regexp so
// they're adjacent before calling computeDistinct().
distinctPosition := len(kvs)

// Similar to the logic above, swap indistinct keys
// forward and distinct keys toward the end of the
// slice.
offset := len(kvs) - 1
for ; offset >= 0; offset-- {
if filter(string(kvs[offset].Key)) {
distinctPosition--
kvs[offset], kvs[distinctPosition] = kvs[distinctPosition], kvs[offset]
continue
}
}
excluded = kvs[:distinctPosition]

return Set{
equivalent: computeDistinct(kvs[distinctPosition:]),
}, excluded
}

func (l *Set) Filter(re Filter) (Set, []kv.KeyValue) {
if re == nil {
return Set{
equivalent: l.equivalent,
}, nil
}

// Note: This could be refactored to avoid the temporary slice
// allocation, if it proves to be expensive.
return filterSet(l.ToSlice(), re)
}

// computeDistinct returns a `Distinct` using either the fixed- or
Expand Down
56 changes: 55 additions & 1 deletion api/label/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package label_test

import (
"regexp"
"testing"

"go.opentelemetry.io/otel/api/kv"
Expand All @@ -24,8 +25,12 @@ import (
)

type testCase struct {
kvs []kv.KeyValue
kvs []kv.KeyValue

keyRe *regexp.Regexp

encoding string
fullEnc string
}

func expect(enc string, kvs ...kv.KeyValue) testCase {
Expand All @@ -35,6 +40,15 @@ func expect(enc string, kvs ...kv.KeyValue) testCase {
}
}

func expectFiltered(enc, filter, fullEnc string, kvs ...kv.KeyValue) testCase {
return testCase{
kvs: kvs,
keyRe: regexp.MustCompile(filter),
encoding: enc,
fullEnc: fullEnc,
}
}

func TestSetDedup(t *testing.T) {
cases := []testCase{
expect("A=B", kv.String("A", "2"), kv.String("A", "B")),
Expand Down Expand Up @@ -114,3 +128,43 @@ func TestSetDedup(t *testing.T) {
}
}
}

func TestUniqueness(t *testing.T) {
short := []kv.KeyValue{
kv.String("A", "0"),
kv.String("B", "2"),
kv.String("A", "1"),
}
long := []kv.KeyValue{
kv.String("B", "2"),
kv.String("C", "5"),
kv.String("B", "2"),
kv.String("C", "1"),
kv.String("A", "4"),
kv.String("C", "3"),
kv.String("A", "1"),
}
cases := []testCase{
expectFiltered("A=1", "^A$", "B=2", short...),
expectFiltered("B=2", "^B$", "A=1", short...),
expectFiltered("A=1,B=2", "^A|B$", "", short...),
expectFiltered("", "^C", "A=1,B=2", short...),

expectFiltered("A=1,C=3", "A|C", "B=2", long...),
expectFiltered("B=2,C=3", "C|B", "A=1", long...),
expectFiltered("C=3", "C", "A=1,B=2", long...),
expectFiltered("", "D", "A=1,B=2,C=3", long...),
}
enc := label.DefaultEncoder()

for _, tc := range cases {
cpy := make([]kv.KeyValue, len(tc.kvs))
copy(cpy, tc.kvs)
distinct, uniq := label.NewSetWithFiltered(cpy, tc.keyRe.MatchString)

full := label.NewSet(uniq...)

require.Equal(t, tc.encoding, distinct.Encoded(enc))
require.Equal(t, tc.fullEnc, full.Encoded(enc))
}
}
21 changes: 21 additions & 0 deletions sdk/export/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ type Processor interface {
// disable metrics with active records.
AggregatorSelector

// LabelFilterSelector allows the Processor to reduce the
// dimensionality of input data before it reaches the
// Processor. The label.Set associated with Accumulation
// records will not include any of the keys that are removed
// by the filter.
LabelFilterSelector

// Process is called by the SDK once per internal record,
// passing the export Accumulation (a Descriptor, the corresponding
// Labels, and the checkpointed Aggregator). This call has no
Expand Down Expand Up @@ -167,6 +174,20 @@ type Subtractor interface {
Subtract(operand, result Aggregator, descriptor *metric.Descriptor) error
}

// LabelFilterSelector supports limiting the keys used to form unique label
// sets in the Metric Accumulator. Filtered label sets are associated
// with aggregated data entering and leaving the Processor. Label
// values that are excluded from a label set may be included in raw
// exemplars.
type LabelFilterSelector interface {
// LabelFilterFor returns the label filter to use for a given
// metric descriptor. Labels excluded by the filter may
// appear in raw exemplars, but they will not appear in the
// labels of exported records passed from the Accumulator to
// the Processor.
LabelFilterFor(*metric.Descriptor) label.Filter
}

// Exporter handles presentation of the checkpoint of aggregate
// metrics. This is the final stage of a metrics export pipeline,
// where metric data are formatted for a specific system.
Expand Down
4 changes: 4 additions & 0 deletions sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func newFixture(b *testing.B) *benchFixture {
return bf
}

func (f *benchFixture) LabelFilterFor(*metric.Descriptor) label.Filter {
return nil
}

func (f *benchFixture) Process(export.Accumulation) error {
return nil
}
Expand Down
24 changes: 12 additions & 12 deletions sdk/metric/controller/pull/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestPullNoCache(t *testing.T) {

ctx := context.Background()
meter := puller.Provider().Meter("nocache")
counter := metric.Must(meter).NewInt64Counter("counter")
counter := metric.Must(meter).NewInt64Counter("counter.sum")

counter.Add(ctx, 10, kv.String("A", "B"))

Expand All @@ -50,8 +50,8 @@ func TestPullNoCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))

require.EqualValues(t, map[string]float64{
"counter/A=B/": 10,
}, records.Map)
"counter.sum/A=B/": 10,
}, records.Map())

counter.Add(ctx, 10, kv.String("A", "B"))

Expand All @@ -60,8 +60,8 @@ func TestPullNoCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))

require.EqualValues(t, map[string]float64{
"counter/A=B/": 20,
}, records.Map)
"counter.sum/A=B/": 20,
}, records.Map())
}

func TestPullWithCache(t *testing.T) {
Expand All @@ -75,7 +75,7 @@ func TestPullWithCache(t *testing.T) {

ctx := context.Background()
meter := puller.Provider().Meter("nocache")
counter := metric.Must(meter).NewInt64Counter("counter")
counter := metric.Must(meter).NewInt64Counter("counter.sum")

counter.Add(ctx, 10, kv.String("A", "B"))

Expand All @@ -84,8 +84,8 @@ func TestPullWithCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))

require.EqualValues(t, map[string]float64{
"counter/A=B/": 10,
}, records.Map)
"counter.sum/A=B/": 10,
}, records.Map())

counter.Add(ctx, 10, kv.String("A", "B"))

Expand All @@ -95,8 +95,8 @@ func TestPullWithCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))

require.EqualValues(t, map[string]float64{
"counter/A=B/": 10,
}, records.Map)
"counter.sum/A=B/": 10,
}, records.Map())

mock.Add(time.Second)
runtime.Gosched()
Expand All @@ -107,7 +107,7 @@ func TestPullWithCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))

require.EqualValues(t, map[string]float64{
"counter/A=B/": 20,
}, records.Map)
"counter.sum/A=B/": 20,
}, records.Map())

}
Loading