Skip to content

Commit

Permalink
decoder v2: add metricset (elastic#4333)
Browse files Browse the repository at this point in the history
Add support for metricset to model decoder/v2

part of elastic#3551
  • Loading branch information
simitt authored Oct 21, 2020
1 parent db60306 commit 7cf56b5
Show file tree
Hide file tree
Showing 12 changed files with 783 additions and 360 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
},
{
"@timestamp": "2017-05-30T18:53:42.281Z",
"a": 3.2,
"agent": {
"name": "elastic-node",
"version": "3.14.0"
Expand Down
2 changes: 1 addition & 1 deletion model/modeldecoder/generator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func main() {
}

func genV2() {
rootObjs := []string{"metadataRoot", "transactionRoot", "errorRoot", "spanRoot"}
rootObjs := []string{"metadataRoot", "errorRoot", "metricsetRoot", "spanRoot", "transactionRoot"}
out := filepath.Join(filepath.FromSlash(modeldecoderPath), pkgV2, "model_generated.go")
gen, err := generator.NewGenerator(importPath, pkgV2, typPath, rootObjs)
if err != nil {
Expand Down
15 changes: 5 additions & 10 deletions model/modeldecoder/generator/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"strings"
)

var mapSupportedTags = []string{tagMaxVals, tagPatternKeys, tagTypesVals, tagRequired}
var mapSupportedTags = []string{tagMaxVals, tagPatternKeys, tagRequired, tagTypesVals}

func generateMapValidation(w io.Writer, fields []structField, f structField, isCustomStruct bool) error {
typ := f.Type().Underlying().(*types.Map)
Expand Down Expand Up @@ -93,14 +93,13 @@ if err := v.validate(); err != nil{
if typesValsValue, ok := vTag[tagTypesVals]; ok {
mapRuleTypesVals(w, f, vTag, validationRule{name: tagTypesVals, value: typesValsValue})
}
// close iteration over map
fmt.Fprintf(w, `
}
`[1:])
return nil
}

func mapRuleTypesVals(w io.Writer, f structField, rules map[string]string, rule validationRule) error {
func mapRuleTypesVals(w io.Writer, f structField, rules map[string]string, rule validationRule) {
fmt.Fprintf(w, `
switch t := v.(type){
`[1:])
Expand Down Expand Up @@ -128,32 +127,28 @@ default:
return fmt.Errorf("'%s': validation rule '%s(%s)' violated for key %%s",k)
}
`[1:], jsonName(f), rule.name, rule.value)
return nil
}

func mapRuleRequired(w io.Writer, f structField, rule validationRule) error {
func mapRuleRequired(w io.Writer, f structField, rule validationRule) {
fmt.Fprintf(w, `
if len(val.%s) == 0{
return fmt.Errorf("'%s' required")
}
`[1:], f.Name(), jsonName(f))
return nil
}

func mapRulePatternKeys(w io.Writer, f structField, rule validationRule) error {
func mapRulePatternKeys(w io.Writer, f structField, rule validationRule) {
fmt.Fprintf(w, `
if k != "" && !%s.MatchString(k){
return fmt.Errorf("'%s': validation rule '%s(%s)' violated")
}
`[1:], rule.value, jsonName(f), rule.name, rule.value)
return nil
}

func mapRuleMaxVals(w io.Writer, f structField, rule validationRule) error {
func mapRuleMaxVals(w io.Writer, f structField, rule validationRule) {
fmt.Fprintf(w, `
if utf8.RuneCountInString(t) > %s{
return fmt.Errorf("'%s': validation rule '%s(%s)' violated")
}
`[1:], rule.value, jsonName(f), rule.name, rule.value)
return nil
}
1 change: 0 additions & 1 deletion model/modeldecoder/modeldecodertest/populator.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ func iterateStruct(v reflect.Value, key string, fn func(f reflect.Value, fKey st
ptr.Elem().Set(mVal)
iterateStruct(ptr.Elem(), fmt.Sprintf("%s.[%s]", fKey, mKey), fn)
f.SetMapIndex(mKey, ptr.Elem())
i++
}
case reflect.Slice, reflect.Array:
if v.Type() == f.Type().Elem() {
Expand Down
9 changes: 6 additions & 3 deletions model/modeldecoder/modeldecodertest/testdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ func DecodeData(t *testing.T, r io.Reader, eventType string, out interface{}) {
var et string
var err error
for et != eventType {
et, err = readEventType(dec)
require.NoError(t, err)
if et, err = readEventType(dec); err != nil {
require.Equal(t, io.EOF, err)
}
}
// decode data
require.NoError(t, dec.Decode(&out))
if err = dec.Decode(&out); err != nil {
require.Equal(t, io.EOF, err)
}
}

// DecodeDataWithReplacement decodes input from the io.Reader and replaces data for the
Expand Down
91 changes: 87 additions & 4 deletions model/modeldecoder/v2/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ var (
return &metadataRoot{}
},
}
metricsetRootPool = sync.Pool{
New: func() interface{} {
return &metricsetRoot{}
},
}
spanRootPool = sync.Pool{
New: func() interface{} {
return &spanRoot{}
Expand Down Expand Up @@ -113,6 +118,15 @@ func releaseMetadataRoot(root *metadataRoot) {
metadataRootPool.Put(root)
}

func fetchMetricsetRoot() *metricsetRoot {
return metricsetRootPool.Get().(*metricsetRoot)
}

func releaseMetricsetRoot(root *metricsetRoot) {
root.Reset()
metricsetRootPool.Put(root)
}

func fetchSpanRoot() *spanRoot {
return spanRootPool.Get().(*spanRoot)
}
Expand Down Expand Up @@ -169,6 +183,25 @@ func DecodeNestedError(d decoder.Decoder, input *modeldecoder.Input, out *model.
return err
}

// DecodeNestedMetricset uses the given decoder to create the input model,
// then runs the defined validations on the input model
// and finally maps the values fom the input model to the given *model.Metricset instance
//
// DecodeNestedMetricset should be used when the stream in the decoder contains the `metricset` key
func DecodeNestedMetricset(d decoder.Decoder, input *modeldecoder.Input, out *model.Metricset) error {
root := fetchMetricsetRoot()
defer releaseMetricsetRoot(root)
var err error
if err = d.Decode(&root); err != nil && err != io.EOF {
return newDecodeErrFromJSONIter(err)
}
if err := root.validate(); err != nil {
return ValidationError{err}
}
mapToMetricsetModel(&root.Metricset, &input.Metadata, input.RequestTime, input.Config, out)
return err
}

// DecodeNestedSpan uses the given decoder to create the input model,
// then runs the defined validations on the input model
// and finally maps the values fom the input model to the given *model.Span instance
Expand Down Expand Up @@ -247,7 +280,9 @@ func mapToClientModel(from contextRequest, out *model.Metadata) {

func mapToErrorModel(from *errorEvent, metadata *model.Metadata, reqTime time.Time, config modeldecoder.Config, out *model.Error) {
// set metadata information
out.Metadata = *metadata
if metadata != nil {
out.Metadata = *metadata
}
if from == nil {
return
}
Expand Down Expand Up @@ -393,7 +428,7 @@ func mapToExceptionModel(from errorException, out *model.Exception) {

func mapToMetadataModel(from *metadata, out *model.Metadata) {
// Cloud
if from == nil {
if from == nil || out == nil {
return
}
if from.Cloud.Account.ID.IsSet() {
Expand Down Expand Up @@ -534,6 +569,50 @@ func mapToMetadataModel(from *metadata, out *model.Metadata) {
}
}

func mapToMetricsetModel(from *metricset, metadata *model.Metadata, reqTime time.Time, config modeldecoder.Config, out *model.Metricset) {
// set metadata as they are - no values are overwritten by the event
if metadata != nil {
out.Metadata = *metadata
}
if from == nil {
return
}
// set timestamp from input or requst time
if from.Timestamp.Val.IsZero() {
out.Timestamp = reqTime
} else {
out.Timestamp = from.Timestamp.Val
}

// map samples information
if len(from.Samples) > 0 {
out.Samples = make([]model.Sample, len(from.Samples))
i := 0
for name, sample := range from.Samples {
out.Samples[i] = model.Sample{Name: name, Value: sample.Value.Val}
i++
}
}

if len(from.Tags) > 0 {
out.Labels = from.Tags.Clone()
}
// map span information
if from.Span.Subtype.IsSet() {
out.Span.Subtype = from.Span.Subtype.Val
}
if from.Span.Type.IsSet() {
out.Span.Type = from.Span.Type.Val
}
// map transaction information
if from.Transaction.Name.IsSet() {
out.Transaction.Name = from.Transaction.Name.Val
}
if from.Transaction.Type.IsSet() {
out.Transaction.Type = from.Transaction.Type.Val
}
}

func mapToPageModel(from contextPage, out *model.Page) {
if from.URL.IsSet() {
out.URL = model.ParseURL(from.URL.Val, "")
Expand Down Expand Up @@ -684,7 +763,9 @@ func mapToServiceModel(from contextService, out *model.Service) {

func mapToSpanModel(from *span, metadata *model.Metadata, reqTime time.Time, config modeldecoder.Config, out *model.Span) {
// set metadata information for span
out.Metadata = *metadata
if metadata != nil {
out.Metadata = *metadata
}
if from == nil {
return
}
Expand Down Expand Up @@ -956,7 +1037,9 @@ func mapToStracktraceModel(from []stacktraceFrame, out model.Stacktrace) {

func mapToTransactionModel(from *transaction, metadata *model.Metadata, reqTime time.Time, config modeldecoder.Config, out *model.Transaction) {
// set metadata information
out.Metadata = *metadata
if metadata != nil {
out.Metadata = *metadata
}
if from == nil {
return
}
Expand Down
137 changes: 137 additions & 0 deletions model/modeldecoder/v2/metricset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package v2

import (
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/apm-server/decoder"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/model/modeldecoder"
"github.com/elastic/apm-server/model/modeldecoder/modeldecodertest"
)

func TestResetMetricsetOnRelease(t *testing.T) {
inp := `{"metricset":{"samples":{"a.b.":{"value":2048}}}}`
root := fetchMetricsetRoot()
require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(root))
require.True(t, root.IsSet())
releaseMetricsetRoot(root)
assert.False(t, root.IsSet())
}

func TestDecodeNestedMetricset(t *testing.T) {
t.Run("decode", func(t *testing.T) {
now := time.Now()
input := modeldecoder.Input{Metadata: model.Metadata{}, RequestTime: now, Config: modeldecoder.Config{}}
str := `{"metricset":{"timestamp":1599996822281000,"samples":{"a.b":{"value":2048}}}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
var out model.Metricset
require.NoError(t, DecodeNestedMetricset(dec, &input, &out))
assert.Equal(t, []model.Sample{{Name: "a.b", Value: 2048}}, out.Samples)
assert.Equal(t, "2020-09-13 11:33:42.281 +0000 UTC", out.Timestamp.String())

// invalid type
err := DecodeNestedMetricset(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &out)
require.Error(t, err)
assert.Contains(t, err.Error(), "decode")
})

t.Run("validate", func(t *testing.T) {
var out model.Metricset
err := DecodeNestedMetricset(decoder.NewJSONDecoder(strings.NewReader(`{}`)), &modeldecoder.Input{}, &out)
require.Error(t, err)
assert.Contains(t, err.Error(), "validation")
})
}

func TestDecodeMapToMetricsetModel(t *testing.T) {
// gatewayIP := net.ParseIP("192.168.0.1")
// randomIP := net.ParseIP("71.0.54.1")
exceptions := func(key string) bool { return false }

t.Run("metadata-set", func(t *testing.T) {
// set metadata - metricsets do not hold metadata themselves
var input metricset
var out model.Metricset
otherVal := modeldecodertest.NonDefaultValues()
modeldecodertest.SetStructValues(&input, otherVal)
mapToMetricsetModel(&input, initializedMetadata(), time.Now(), modeldecoder.Config{}, &out)
// iterate through metadata model and assert values are set to default values
modeldecodertest.AssertStructValues(t, &out.Metadata, exceptions, modeldecodertest.DefaultValues())
})

t.Run("metricset-values", func(t *testing.T) {
exceptions := func(key string) bool {
// metadata are tested separately
if strings.HasPrefix(key, "Metadata") ||
// only set by aggregator
strings.HasPrefix(key, "Event") ||
key == "TimeseriesInstanceID" ||
key == "Transaction.Result" ||
key == "Transaction.Root" ||
strings.HasPrefix(key, "Span.DestinationService") ||
// test Samples separately
strings.HasPrefix(key, "Samples") {
return true
}
return false
}

var input metricset
var out1, out2 model.Metricset
reqTime := time.Now().Add(time.Second)
defaultVal := modeldecodertest.DefaultValues()
modeldecodertest.SetStructValues(&input, defaultVal)
mapToMetricsetModel(&input, initializedMetadata(), reqTime, modeldecoder.Config{}, &out1)
input.Reset()
modeldecodertest.AssertStructValues(t, &out1, exceptions, defaultVal)
defaultSamples := []model.Sample{
{Name: defaultVal.Str + "0", Value: defaultVal.Float},
{Name: defaultVal.Str + "1", Value: defaultVal.Float},
{Name: defaultVal.Str + "2", Value: defaultVal.Float},
}
assert.ElementsMatch(t, defaultSamples, out1.Samples)

// set Timestamp to requestTime if eventTime is zero
defaultVal.Update(time.Time{})
modeldecodertest.SetStructValues(&input, defaultVal)
mapToMetricsetModel(&input, initializedMetadata(), reqTime, modeldecoder.Config{}, &out1)
defaultVal.Update(reqTime)
input.Reset()
modeldecodertest.AssertStructValues(t, &out1, exceptions, defaultVal)

// ensure memory is not shared by reusing input model
otherVal := modeldecodertest.NonDefaultValues()
modeldecodertest.SetStructValues(&input, otherVal)
mapToMetricsetModel(&input, initializedMetadata(), reqTime, modeldecoder.Config{}, &out2)
modeldecodertest.AssertStructValues(t, &out2, exceptions, otherVal)
otherSamples := []model.Sample{
{Name: otherVal.Str + "0", Value: otherVal.Float},
{Name: otherVal.Str + "1", Value: otherVal.Float},
}
assert.ElementsMatch(t, otherSamples, out2.Samples)
modeldecodertest.AssertStructValues(t, &out1, exceptions, defaultVal)
assert.ElementsMatch(t, defaultSamples, out1.Samples)
})
}
Loading

0 comments on commit 7cf56b5

Please sign in to comment.