From 07444287fb4f5f716bc0b3e3e4f0c667a49a86fd Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Tue, 1 Jun 2021 10:45:21 +0800 Subject: [PATCH] Add support for histograms to metrics intake (#5360) * model/modeldecoder: add metric type and unit * systemtest: test histogram metrics * Update changelog * systemtest: fix min docs expectation in test --- ...PublishIntegrationMetricsets.approved.json | 66 +++++++++++ changelogs/head.asciidoc | 1 + docs/spec/v2/metricset.json | 111 +++++++++++++++++- model/modeldecoder/generator/code.go | 27 +++-- model/modeldecoder/generator/jsonnumber.go | 16 +++ model/modeldecoder/generator/jsonschema.go | 1 + model/modeldecoder/generator/nstring.go | 2 +- model/modeldecoder/generator/slice.go | 24 ++++ model/modeldecoder/generator/validation.go | 96 +++++++-------- .../modeldecodertest/populator.go | 5 +- model/modeldecoder/rumv3/model_generated.go | 36 +++--- model/modeldecoder/v2/decoder.go | 19 ++- model/modeldecoder/v2/metricset_test.go | 103 +++++++++++----- model/modeldecoder/v2/model.go | 35 +++++- model/modeldecoder/v2/model_generated.go | 61 ++++++---- ...tIntakeIntegrationMetricsets.approved.json | 56 +++++++++ ...tIntegrationResultMetricsets.approved.json | 2 +- .../TestApprovedMetrics.approved.json | 63 ++++++++++ systemtest/metrics_test.go | 61 +++++++--- systemtest/template_test.go | 1 + testdata/intake-v2/metricsets.ndjson | 1 + 21 files changed, 634 insertions(+), 153 deletions(-) diff --git a/beater/test_approved_es_documents/TestPublishIntegrationMetricsets.approved.json b/beater/test_approved_es_documents/TestPublishIntegrationMetricsets.approved.json index d677703c641..2946f563e00 100644 --- a/beater/test_approved_es_documents/TestPublishIntegrationMetricsets.approved.json +++ b/beater/test_approved_es_documents/TestPublishIntegrationMetricsets.approved.json @@ -298,6 +298,72 @@ "id": "axb123hg", "name": "logged-in-user" } + }, + { + "@timestamp": "2017-05-30T18:53:41.366Z", + "_doc_count": 6, + "_metric_descriptions": { + "latency_distribution": { + "type": "histogram", + "unit": "s" + } + }, + "agent": { + "name": "elastic-node", + "version": "3.14.0" + }, + "ecs": { + "version": "1.8.0" + }, + "host": { + "ip": "127.0.0.1" + }, + "labels": { + "tag1": "one", + "tag2": 2 + }, + "latency_distribution": { + "counts": [ + 1, + 2, + 3 + ], + "values": [ + 1.1, + 2.2, + 3.3 + ] + }, + "metricset.name": "app", + "observer": { + "ephemeral_id": "00000000-0000-0000-0000-000000000000", + "hostname": "", + "id": "fbba762a-14dd-412c-b7e9-b79f903eb492", + "type": "test-apm-server", + "version": "1.2.3", + "version_major": 1 + }, + "process": { + "pid": 1234 + }, + "processor": { + "event": "metric", + "name": "metric" + }, + "service": { + "language": { + "name": "ecmascript" + }, + "name": "1234_service-12a3", + "node": { + "name": "node-1" + } + }, + "user": { + "email": "user@mail.com", + "id": "axb123hg", + "name": "logged-in-user" + } } ] } diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 1f5f22a72ce..53d4e429b6c 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -26,6 +26,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits] * Translate otel messaging.* semantic conventions to ECS {pull}5334[5334] * Add support for dynamic histogram metrics {pull}5239[5239] * Tail-sampling processor now resumes subscription from previous position after restart {pull}5350[5350] +* Add support for histograms to metrics intake {pull}5360[5360] [float] ==== Deprecated diff --git a/docs/spec/v2/metricset.json b/docs/spec/v2/metricset.json index 13126665c97..391ae34809e 100644 --- a/docs/spec/v2/metricset.json +++ b/docs/spec/v2/metricset.json @@ -13,13 +13,118 @@ "object" ], "properties": { + "counts": { + "description": "Counts holds the bucket counts for histogram metrics. These numbers must be positive or zero. If Counts is specified, then Values is expected to be specified with the same number of elements, and with the same order.", + "type": [ + "null", + "array" + ], + "items": { + "type": "integer", + "minimum": 0 + }, + "minItems": 0 + }, + "type": { + "description": "Type holds an optional metric type: gauge, counter, or histogram. If Type is unknown, it will be ignored.", + "type": [ + "null", + "string" + ] + }, + "unit": { + "description": "Unit holds an optional unit for the metric. - \"percent\" (value is in the range [0,1]) - \"byte\" - a time unit: \"nanos\", \"micros\", \"ms\", \"s\", \"m\", \"h\", \"d\" If Unit is unknown, it will be ignored.", + "type": [ + "null", + "string" + ] + }, "value": { "description": "Value holds the value of a single metric sample.", - "type": "number" + "type": [ + "null", + "number" + ] + }, + "values": { + "description": "Values holds the bucket values for histogram metrics. Values must be provided in ascending order; failure to do so will result in the metric being discarded.", + "type": [ + "null", + "array" + ], + "items": { + "type": "number" + }, + "minItems": 0 } }, - "required": [ - "value" + "allOf": [ + { + "if": { + "properties": { + "counts": { + "type": "array" + } + }, + "required": [ + "counts" + ] + }, + "then": { + "properties": { + "values": { + "type": "array" + } + }, + "required": [ + "values" + ] + } + }, + { + "if": { + "properties": { + "values": { + "type": "array" + } + }, + "required": [ + "values" + ] + }, + "then": { + "properties": { + "counts": { + "type": "array" + } + }, + "required": [ + "counts" + ] + } + } + ], + "anyOf": [ + { + "properties": { + "value": { + "type": "number" + } + }, + "required": [ + "value" + ] + }, + { + "properties": { + "values": { + "type": "array" + } + }, + "required": [ + "values" + ] + } ] } } diff --git a/model/modeldecoder/generator/code.go b/model/modeldecoder/generator/code.go index a010956c65d..a33763e3f7a 100644 --- a/model/modeldecoder/generator/code.go +++ b/model/modeldecoder/generator/code.go @@ -161,21 +161,17 @@ func (val *%s) IsSet() bool { if key != "" { key += "." } - prefix := `` + prefix := ` ` for i := 0; i < len(structTyp.fields); i++ { f := structTyp.fields[i] if !f.Exported() { continue } - switch t := f.Type().Underlying().(type) { - case *types.Slice, *types.Map: - fmt.Fprintf(&g.buf, `%s len(val.%s) > 0`, prefix, f.Name()) - case *types.Struct: - fmt.Fprintf(&g.buf, `%s val.%s.IsSet()`, prefix, f.Name()) - default: - return fmt.Errorf("unhandled type %T for IsSet() for '%s%s'", t, key, jsonName(f)) + g.buf.WriteString(prefix) + if err := generateIsSet(&g.buf, f, "val."); err != nil { + return errors.Wrapf(err, "error generating IsSet() for '%s%s'", key, jsonName(f)) } - prefix = ` ||` + prefix = ` || ` } fmt.Fprint(&g.buf, ` } @@ -183,6 +179,19 @@ func (val *%s) IsSet() bool { return nil } +func generateIsSet(w io.Writer, field structField, fieldSelectorPrefix string) error { + switch typ := field.Type().Underlying(); typ.(type) { + case *types.Slice, *types.Map: + fmt.Fprintf(w, "(len(%s%s) > 0)", fieldSelectorPrefix, field.Name()) + return nil + case *types.Struct: + fmt.Fprintf(w, "%s%s.IsSet()", fieldSelectorPrefix, field.Name()) + return nil + default: + return fmt.Errorf("unhandled type %T generating IsSet() for '%s'", typ, jsonName(field)) + } +} + // generateReset creates `Reset` methods for struct fields setting them to // their zero values or calling their `Reset` methods // it only considers exported fields diff --git a/model/modeldecoder/generator/jsonnumber.go b/model/modeldecoder/generator/jsonnumber.go index b4b401e164b..ddc883dd572 100644 --- a/model/modeldecoder/generator/jsonnumber.go +++ b/model/modeldecoder/generator/jsonnumber.go @@ -17,8 +17,24 @@ package generator +import "encoding/json" + func generateJSONPropertyJSONNumber(info *fieldInfo, parent *property, child *property) error { child.Type.add(TypeNameNumber) parent.Properties[jsonSchemaName(info.field)] = child return setPropertyRulesInteger(info, child) } + +func setPropertyRulesNumber(info *fieldInfo, p *property) error { + for tagName, tagValue := range info.tags { + switch tagName { + case tagMax: + p.Max = json.Number(tagValue) + delete(info.tags, tagName) + case tagMin: + p.Min = json.Number(tagValue) + delete(info.tags, tagName) + } + } + return nil +} diff --git a/model/modeldecoder/generator/jsonschema.go b/model/modeldecoder/generator/jsonschema.go index a1aca2619f2..8b3b722f54b 100644 --- a/model/modeldecoder/generator/jsonschema.go +++ b/model/modeldecoder/generator/jsonschema.go @@ -215,6 +215,7 @@ var ( "float64": TypeNameNumber, nullableTypeInt: TypeNameInteger, "int": TypeNameInteger, + "int64": TypeNameInteger, nullableTypeTimeMicrosUnix: TypeNameInteger, nullableTypeString: TypeNameString, "string": TypeNameString, diff --git a/model/modeldecoder/generator/nstring.go b/model/modeldecoder/generator/nstring.go index 4576e5027d6..2b2d7dcb8e6 100644 --- a/model/modeldecoder/generator/nstring.go +++ b/model/modeldecoder/generator/nstring.go @@ -40,7 +40,7 @@ func generateNullableStringValidation(w io.Writer, fields []structField, f struc case tagRequired: ruleNullableRequired(w, f) case tagRequiredIfAny: - if err = ruleRequiredIfAny(w, fields, f, rule.value); err != nil { + if err := ruleRequiredIfAny(w, fields, f, rule.value); err != nil { return errors.Wrap(err, "nullableString") } default: diff --git a/model/modeldecoder/generator/slice.go b/model/modeldecoder/generator/slice.go index 00f46371ae0..65659ac51f3 100644 --- a/model/modeldecoder/generator/slice.go +++ b/model/modeldecoder/generator/slice.go @@ -18,6 +18,7 @@ package generator import ( + "encoding/json" "fmt" "go/types" "io" @@ -45,10 +46,14 @@ for _, elem := range val.%s{ switch rule.name { case tagMinLength, tagMaxLength: err = sliceRuleMinMaxLength(w, f, rule) + case tagMinVals: + err = sliceRuleMinVals(w, f, rule) case tagRequired: sliceRuleRequired(w, f, rule) case tagRequiredAnyOf: err = ruleRequiredOneOf(w, fields, rule.value) + case tagRequiredIfAny: + err = ruleRequiredIfAny(w, fields, f, rule.value) default: return errors.Wrap(errUnhandledTagRule(rule), "slice") } @@ -79,6 +84,17 @@ for _, elem := range val.%s{ return fmt.Errorf("unhandled tag rule max for type %s", f.Type().Underlying()) } +func sliceRuleMinVals(w io.Writer, f structField, rule validationRule) error { + fmt.Fprintf(w, ` +for _, elem := range val.%s{ + if elem %s %s{ + return fmt.Errorf("'%s': validation rule '%s(%s)' violated") + } +} +`[1:], f.Name(), ruleMinMaxOperator(rule.name), rule.value, jsonName(f), rule.name, rule.value) + return nil +} + func sliceRuleRequired(w io.Writer, f structField, rule validationRule) { fmt.Fprintf(w, ` if len(val.%s) == 0{ @@ -110,11 +126,19 @@ func generateJSONPropertySlice(info *fieldInfo, parent *property, child *propert // NOTE(simi): set required=true to be aligned with previous JSON schema definitions items := property{Type: &propertyType{names: []propertyTypeName{itemsType}, required: true}} switch itemsType { + case TypeNameInteger: + setPropertyRulesInteger(info, &items) + case TypeNameNumber: + setPropertyRulesNumber(info, &items) case TypeNameString: setPropertyRulesString(info, &items) default: return fmt.Errorf("unhandled slice item type %s", itemsType) } + if minVals, ok := info.tags[tagMinVals]; ok { + items.Min = json.Number(minVals) + delete(info.tags, tagMinVals) + } child.Items = &items return nil } diff --git a/model/modeldecoder/generator/validation.go b/model/modeldecoder/generator/validation.go index 5482d970464..9a31104549f 100644 --- a/model/modeldecoder/generator/validation.go +++ b/model/modeldecoder/generator/validation.go @@ -34,6 +34,7 @@ const ( tagMaxLengthVals = "maxLengthVals" tagMin = "min" tagMinLength = "minLength" + tagMinVals = "minVals" tagPattern = "pattern" tagPatternKeys = "patternKeys" tagRequired = "required" @@ -96,7 +97,7 @@ func validationRules(structTag reflect.StructTag) ([]validationRule, error) { func ruleMinMaxOperator(ruleName string) string { switch ruleName { - case tagMin, tagMinLength: + case tagMin, tagMinLength, tagMinVals: return "<" case tagMax, tagMaxLength: return ">" @@ -118,26 +119,22 @@ if !val.%s.IsSet() { } func ruleRequiredOneOf(w io.Writer, fields []structField, tagValue string) error { - oneOf := strings.Split(tagValue, ";") + oneOf, err := filteredFields(fields, strings.Split(tagValue, ";")) + if err != nil { + return err + } if len(oneOf) <= 1 { return fmt.Errorf("invalid usage of rule 'requiredOneOf' - try 'required' instead") } + fmt.Fprintf(w, `if `) - var matched bool - for i := 0; i < len(fields); i++ { - f := fields[i] - jName := jsonName(f) - if j := indexOf(oneOf, jName); j != -1 { - if matched { - fmt.Fprintf(w, ` && `) - } - fmt.Fprintf(w, ` !val.%s.IsSet()`[1:], f.Name()) - matched = true - // remove from ifAny names and check if we can return early - oneOf = append(oneOf[:j], oneOf[j+1:]...) - if len(oneOf) == 0 { - break - } + for i, oneOfField := range oneOf { + if i > 0 { + fmt.Fprintf(w, " && ") + } + fmt.Fprint(w, "!") + if err := generateIsSet(w, oneOfField, "val."); err != nil { + return err } } fmt.Fprintf(w, ` { @@ -151,44 +148,47 @@ func ruleRequiredOneOf(w io.Writer, fields []structField, tagValue string) error } func ruleRequiredIfAny(w io.Writer, fields []structField, field structField, tagValue string) error { - ifAny := make(map[string]struct{}) - for _, n := range strings.Split(tagValue, ";") { - ifAny[n] = struct{}{} + ifAny, err := filteredFields(fields, strings.Split(tagValue, ";")) + if err != nil { + return err } - // only check ifAny fields if the field itself is not set - fmt.Fprintf(w, ` -if !val.%s.IsSet() { -`[1:], field.Name()) - for i := 0; i < len(fields); i++ { - f := fields[i] - jName := jsonName(f) - if _, ok := ifAny[jName]; ok { - fmt.Fprintf(w, ` -if val.%s.IsSet() { + + // Only check ifAny fields if the field itself is not set + fmt.Fprint(w, "if !") + if err := generateIsSet(w, field, "val."); err != nil { + return err + } + fmt.Fprintln(w, " {") + + // Check if any of the fields is set. We create a separate "if" block + // for each field so we can include its name in the error. + for _, ifAnyField := range ifAny { + fmt.Fprint(w, "if ") + if err := generateIsSet(w, ifAnyField, "val."); err != nil { + return err + } + fmt.Fprintf(w, ` { return fmt.Errorf("'%s' required when '%s' is set") } -`[1:], f.Name(), jsonName(field), jsonName(f)) - // remove from ifAny names and check if we can return early - delete(ifAny, jName) - if len(ifAny) == 0 { - break - } - } +`, jsonName(field), jsonName(ifAnyField)) } - if len(ifAny) != 0 { - return fmt.Errorf("unhandled 'requiredIfAny' field name(s) for %s", field.Name()) - } - fmt.Fprintf(w, ` -} -`[1:]) + + fmt.Fprintln(w, "}") return nil } -func indexOf(s []string, key string) int { - for i := 0; i < len(s); i++ { - if s[i] == key { - return i +func filteredFields(fields []structField, jsonNames []string) ([]structField, error) { + mapped := make(map[string]structField) + for _, field := range fields { + mapped[jsonName(field)] = field + } + filtered := make([]structField, len(jsonNames)) + for i, jsonName := range jsonNames { + field, ok := mapped[jsonName] + if !ok { + return nil, fmt.Errorf("unknown field name %q", jsonName) } + filtered[i] = field } - return -1 + return filtered, nil } diff --git a/model/modeldecoder/modeldecodertest/populator.go b/model/modeldecoder/modeldecodertest/populator.go index ec7f3f3a193..6faa6d34199 100644 --- a/model/modeldecoder/modeldecodertest/populator.go +++ b/model/modeldecoder/modeldecodertest/populator.go @@ -130,6 +130,10 @@ func SetStructValues(in interface{}, values *Values, opts ...SetStructValuesOpti elemVal = reflect.ValueOf(values.Str) case []int: elemVal = reflect.ValueOf(values.Int) + case []int64: + elemVal = reflect.ValueOf(int64(values.Int)) + case []float64: + elemVal = reflect.ValueOf(values.Float) case net.IP: fieldVal = reflect.ValueOf(values.IP) default: @@ -139,7 +143,6 @@ func SetStructValues(in interface{}, values *Values, opts ...SetStructValuesOpti elemVal = reflect.Zero(f.Type().Elem()) } if elemVal.IsValid() { - fieldVal = reflect.MakeSlice(f.Type(), 0, values.N) for i := 0; i < values.N; i++ { fieldVal = reflect.Append(fieldVal, elemVal) } diff --git a/model/modeldecoder/rumv3/model_generated.go b/model/modeldecoder/rumv3/model_generated.go index bbe7b214170..6ec815ded8d 100644 --- a/model/modeldecoder/rumv3/model_generated.go +++ b/model/modeldecoder/rumv3/model_generated.go @@ -51,7 +51,7 @@ func (val *metadataRoot) validate() error { } func (val *metadata) IsSet() bool { - return len(val.Labels) > 0 || val.Service.IsSet() || val.User.IsSet() + return (len(val.Labels) > 0) || val.Service.IsSet() || val.User.IsSet() } func (val *metadata) Reset() { @@ -353,23 +353,23 @@ func (val *errorEvent) validate() error { return fmt.Errorf("'pid': validation rule 'maxLength(1024)' violated") } if !val.ParentID.IsSet() { - if val.TraceID.IsSet() { - return fmt.Errorf("'pid' required when 'tid' is set") - } if val.TransactionID.IsSet() { return fmt.Errorf("'pid' required when 'xid' is set") } + if val.TraceID.IsSet() { + return fmt.Errorf("'pid' required when 'tid' is set") + } } if val.TraceID.IsSet() && utf8.RuneCountInString(val.TraceID.Val) > 1024 { return fmt.Errorf("'tid': validation rule 'maxLength(1024)' violated") } if !val.TraceID.IsSet() { - if val.ParentID.IsSet() { - return fmt.Errorf("'tid' required when 'pid' is set") - } if val.TransactionID.IsSet() { return fmt.Errorf("'tid' required when 'xid' is set") } + if val.ParentID.IsSet() { + return fmt.Errorf("'tid' required when 'pid' is set") + } } if err := val.Transaction.validate(); err != nil { return errors.Wrapf(err, "x") @@ -384,7 +384,7 @@ func (val *errorEvent) validate() error { } func (val *context) IsSet() bool { - return len(val.Custom) > 0 || val.Page.IsSet() || val.Response.IsSet() || val.Request.IsSet() || val.Service.IsSet() || len(val.Tags) > 0 || val.User.IsSet() + return (len(val.Custom) > 0) || val.Page.IsSet() || val.Response.IsSet() || val.Request.IsSet() || val.Service.IsSet() || (len(val.Tags) > 0) || val.User.IsSet() } func (val *context) Reset() { @@ -472,7 +472,7 @@ func (val *contextResponse) validate() error { } func (val *contextRequest) IsSet() bool { - return len(val.Env) > 0 || val.Headers.IsSet() || val.HTTPVersion.IsSet() || val.Method.IsSet() + return (len(val.Env) > 0) || val.Headers.IsSet() || val.HTTPVersion.IsSet() || val.Method.IsSet() } func (val *contextRequest) Reset() { @@ -634,7 +634,7 @@ func (val *contextServiceRuntime) validate() error { } func (val *errorException) IsSet() bool { - return len(val.Attributes) > 0 || val.Code.IsSet() || len(val.Cause) > 0 || val.Handled.IsSet() || val.Message.IsSet() || val.Module.IsSet() || len(val.Stacktrace) > 0 || val.Type.IsSet() + return (len(val.Attributes) > 0) || val.Code.IsSet() || (len(val.Cause) > 0) || val.Handled.IsSet() || val.Message.IsSet() || val.Module.IsSet() || (len(val.Stacktrace) > 0) || val.Type.IsSet() } func (val *errorException) Reset() { @@ -697,7 +697,7 @@ func (val *errorException) validate() error { } func (val *stacktraceFrame) IsSet() bool { - return val.AbsPath.IsSet() || val.Classname.IsSet() || val.ColumnNumber.IsSet() || val.ContextLine.IsSet() || val.Filename.IsSet() || val.Function.IsSet() || val.LineNumber.IsSet() || val.Module.IsSet() || len(val.PostContext) > 0 || len(val.PreContext) > 0 + return val.AbsPath.IsSet() || val.Classname.IsSet() || val.ColumnNumber.IsSet() || val.ContextLine.IsSet() || val.Filename.IsSet() || val.Function.IsSet() || val.LineNumber.IsSet() || val.Module.IsSet() || (len(val.PostContext) > 0) || (len(val.PreContext) > 0) } func (val *stacktraceFrame) Reset() { @@ -724,7 +724,7 @@ func (val *stacktraceFrame) validate() error { } func (val *errorLog) IsSet() bool { - return val.Level.IsSet() || val.LoggerName.IsSet() || val.Message.IsSet() || val.ParamMessage.IsSet() || len(val.Stacktrace) > 0 + return val.Level.IsSet() || val.LoggerName.IsSet() || val.Message.IsSet() || val.ParamMessage.IsSet() || (len(val.Stacktrace) > 0) } func (val *errorLog) Reset() { @@ -800,7 +800,7 @@ func (val *metricsetRoot) validate() error { } func (val *metricset) IsSet() bool { - return val.Samples.IsSet() || val.Span.IsSet() || len(val.Tags) > 0 + return val.Samples.IsSet() || val.Span.IsSet() || (len(val.Tags) > 0) } func (val *metricset) Reset() { @@ -933,7 +933,7 @@ func (val *transactionRoot) validate() error { } func (val *transaction) IsSet() bool { - return val.Context.IsSet() || val.Duration.IsSet() || val.ID.IsSet() || val.Marks.IsSet() || len(val.Metricsets) > 0 || val.Name.IsSet() || val.Outcome.IsSet() || val.ParentID.IsSet() || val.Result.IsSet() || val.Sampled.IsSet() || val.SampleRate.IsSet() || val.Session.IsSet() || val.SpanCount.IsSet() || len(val.Spans) > 0 || val.TraceID.IsSet() || val.Type.IsSet() || val.UserExperience.IsSet() + return val.Context.IsSet() || val.Duration.IsSet() || val.ID.IsSet() || val.Marks.IsSet() || (len(val.Metricsets) > 0) || val.Name.IsSet() || val.Outcome.IsSet() || val.ParentID.IsSet() || val.Result.IsSet() || val.Sampled.IsSet() || val.SampleRate.IsSet() || val.Session.IsSet() || val.SpanCount.IsSet() || (len(val.Spans) > 0) || val.TraceID.IsSet() || val.Type.IsSet() || val.UserExperience.IsSet() } func (val *transaction) Reset() { @@ -1043,7 +1043,7 @@ func (val *transaction) validate() error { } func (val *transactionMarks) IsSet() bool { - return len(val.Events) > 0 + return (len(val.Events) > 0) } func (val *transactionMarks) Reset() { @@ -1060,7 +1060,7 @@ func (val *transactionMarks) validate() error { } func (val *transactionMarkEvents) IsSet() bool { - return len(val.Measurements) > 0 + return (len(val.Measurements) > 0) } func (val *transactionMarkEvents) Reset() { @@ -1118,7 +1118,7 @@ func (val *transactionSpanCount) validate() error { } func (val *span) IsSet() bool { - return val.Action.IsSet() || val.Context.IsSet() || val.Duration.IsSet() || val.ID.IsSet() || val.Name.IsSet() || val.Outcome.IsSet() || val.ParentIndex.IsSet() || val.SampleRate.IsSet() || len(val.Stacktrace) > 0 || val.Start.IsSet() || val.Subtype.IsSet() || val.Sync.IsSet() || val.Type.IsSet() + return val.Action.IsSet() || val.Context.IsSet() || val.Duration.IsSet() || val.ID.IsSet() || val.Name.IsSet() || val.Outcome.IsSet() || val.ParentIndex.IsSet() || val.SampleRate.IsSet() || (len(val.Stacktrace) > 0) || val.Start.IsSet() || val.Subtype.IsSet() || val.Sync.IsSet() || val.Type.IsSet() } func (val *span) Reset() { @@ -1201,7 +1201,7 @@ func (val *span) validate() error { } func (val *spanContext) IsSet() bool { - return val.Destination.IsSet() || val.HTTP.IsSet() || val.Service.IsSet() || len(val.Tags) > 0 + return val.Destination.IsSet() || val.HTTP.IsSet() || val.Service.IsSet() || (len(val.Tags) > 0) } func (val *spanContext) Reset() { diff --git a/model/modeldecoder/v2/decoder.go b/model/modeldecoder/v2/decoder.go index 566d08562a1..f8b9b50c198 100644 --- a/model/modeldecoder/v2/decoder.go +++ b/model/modeldecoder/v2/decoder.go @@ -562,7 +562,24 @@ func mapToMetricsetModel(from *metricset, metadata *model.Metadata, reqTime time out.Samples = make([]model.Sample, len(from.Samples)) i := 0 for name, sample := range from.Samples { - out.Samples[i] = model.Sample{Name: name, Value: sample.Value.Val} + var counts []int64 + var values []float64 + if n := len(sample.Values); n > 0 { + values = make([]float64, n) + copy(values, sample.Values) + } + if n := len(sample.Counts); n > 0 { + counts = make([]int64, n) + copy(counts, sample.Counts) + } + out.Samples[i] = model.Sample{ + Name: name, + Type: model.MetricType(sample.Type.Val), + Unit: sample.Unit.Val, + Value: sample.Value.Val, + Values: values, + Counts: counts, + } i++ } } diff --git a/model/modeldecoder/v2/metricset_test.go b/model/modeldecoder/v2/metricset_test.go index c2f76441430..f393a42029e 100644 --- a/model/modeldecoder/v2/metricset_test.go +++ b/model/modeldecoder/v2/metricset_test.go @@ -79,37 +79,55 @@ func TestDecodeMapToMetricsetModel(t *testing.T) { modeldecodertest.AssertStructValues(t, &out.Metadata, exceptions, modeldecodertest.DefaultValues()) }) - t.Run("metricset-values", func(t *testing.T) { - exceptions := func(key string) bool { - // metadata are tested separately - if strings.HasPrefix(key, "Metadata") || - // only set by aggregator - strings.HasPrefix(key, "Event") || - key == "Name" || - key == "TimeseriesInstanceID" || - key == "Transaction.Result" || - key == "Transaction.Root" || - strings.HasPrefix(key, "Span.DestinationService") || - // test Samples separately - strings.HasPrefix(key, "Samples") { - return true - } - return false + metadataExceptions := func(key string) bool { + // metadata are tested separately + if strings.HasPrefix(key, "Metadata") || + // only set by aggregator + strings.HasPrefix(key, "Event") || + key == "Name" || + key == "TimeseriesInstanceID" || + key == "Transaction.Result" || + key == "Transaction.Root" || + strings.HasPrefix(key, "Span.DestinationService") || + // test Samples separately + strings.HasPrefix(key, "Samples") { + return true } + return false + } + t.Run("metricset-values", func(t *testing.T) { var input metricset var out1, out2 model.Metricset reqTime := time.Now().Add(time.Second) defaultVal := modeldecodertest.DefaultValues() modeldecodertest.SetStructValues(&input, defaultVal) + mapToMetricsetModel(&input, initializedMetadata(), reqTime, modeldecoder.Config{}, &out1) input.Reset() - modeldecodertest.AssertStructValues(t, &out1, exceptions, defaultVal) - defaultSamples := []model.Sample{ - {Name: defaultVal.Str + "0", Value: defaultVal.Float}, - {Name: defaultVal.Str + "1", Value: defaultVal.Float}, - {Name: defaultVal.Str + "2", Value: defaultVal.Float}, - } + modeldecodertest.AssertStructValues(t, &out1, metadataExceptions, defaultVal) + defaultSamples := []model.Sample{{ + Name: defaultVal.Str + "0", + Type: model.MetricType(defaultVal.Str), + Unit: defaultVal.Str, + Value: defaultVal.Float, + Counts: repeatInt64(int64(defaultVal.Int), defaultVal.N), + Values: repeatFloat64(defaultVal.Float, defaultVal.N), + }, { + Name: defaultVal.Str + "1", + Type: model.MetricType(defaultVal.Str), + Unit: defaultVal.Str, + Value: defaultVal.Float, + Counts: repeatInt64(int64(defaultVal.Int), defaultVal.N), + Values: repeatFloat64(defaultVal.Float, defaultVal.N), + }, { + Name: defaultVal.Str + "2", + Type: model.MetricType(defaultVal.Str), + Unit: defaultVal.Str, + Value: defaultVal.Float, + Counts: repeatInt64(int64(defaultVal.Int), defaultVal.N), + Values: repeatFloat64(defaultVal.Float, defaultVal.N), + }} assert.ElementsMatch(t, defaultSamples, out1.Samples) // set Timestamp to requestTime if eventTime is zero @@ -118,19 +136,46 @@ func TestDecodeMapToMetricsetModel(t *testing.T) { mapToMetricsetModel(&input, initializedMetadata(), reqTime, modeldecoder.Config{}, &out1) defaultVal.Update(reqTime) input.Reset() - modeldecodertest.AssertStructValues(t, &out1, exceptions, defaultVal) + modeldecodertest.AssertStructValues(t, &out1, metadataExceptions, defaultVal) // ensure memory is not shared by reusing input model otherVal := modeldecodertest.NonDefaultValues() modeldecodertest.SetStructValues(&input, otherVal) mapToMetricsetModel(&input, initializedMetadata(), reqTime, modeldecoder.Config{}, &out2) - modeldecodertest.AssertStructValues(t, &out2, exceptions, otherVal) - otherSamples := []model.Sample{ - {Name: otherVal.Str + "0", Value: otherVal.Float}, - {Name: otherVal.Str + "1", Value: otherVal.Float}, - } + modeldecodertest.AssertStructValues(t, &out2, metadataExceptions, otherVal) + otherSamples := []model.Sample{{ + Name: otherVal.Str + "0", + Type: model.MetricType(otherVal.Str), + Unit: otherVal.Str, + Value: otherVal.Float, + Counts: repeatInt64(int64(otherVal.Int), otherVal.N), + Values: repeatFloat64(otherVal.Float, otherVal.N), + }, { + Name: otherVal.Str + "1", + Type: model.MetricType(otherVal.Str), + Unit: otherVal.Str, + Value: otherVal.Float, + Counts: repeatInt64(int64(otherVal.Int), otherVal.N), + Values: repeatFloat64(otherVal.Float, otherVal.N), + }} assert.ElementsMatch(t, otherSamples, out2.Samples) - modeldecodertest.AssertStructValues(t, &out1, exceptions, defaultVal) + modeldecodertest.AssertStructValues(t, &out1, metadataExceptions, defaultVal) assert.ElementsMatch(t, defaultSamples, out1.Samples) }) } + +func repeatInt64(v int64, n int) []int64 { + vs := make([]int64, n) + for i := range vs { + vs[i] = v + } + return vs +} + +func repeatFloat64(v float64, n int) []float64 { + vs := make([]float64, n) + for i := range vs { + vs[i] = v + } + return vs +} diff --git a/model/modeldecoder/v2/model.go b/model/modeldecoder/v2/model.go index 8074f75a93e..d81905b32b5 100644 --- a/model/modeldecoder/v2/model.go +++ b/model/modeldecoder/v2/model.go @@ -546,10 +546,41 @@ type metricset struct { Transaction metricsetTransactionRef `json:"transaction"` } -// TODO(axw/simitt): add support for ingesting counts/values (histogram metrics) type metricsetSampleValue struct { + // Type holds an optional metric type: gauge, counter, or histogram. + // + // If Type is unknown, it will be ignored. + Type nullable.String `json:"type"` + + // Unit holds an optional unit for the metric. + // + // - "percent" (value is in the range [0,1]) + // - "byte" + // - a time unit: "nanos", "micros", "ms", "s", "m", "h", "d" + // + // If Unit is unknown, it will be ignored. + Unit nullable.String `json:"unit"` + // Value holds the value of a single metric sample. - Value nullable.Float64 `json:"value" validate:"required"` + Value nullable.Float64 `json:"value"` + + // Values holds the bucket values for histogram metrics. + // + // Values must be provided in ascending order; failure to do + // so will result in the metric being discarded. + Values []float64 `json:"values" validate:"requiredIfAny=counts"` + + // Counts holds the bucket counts for histogram metrics. + // + // These numbers must be positive or zero. + // + // If Counts is specified, then Values is expected to be + // specified with the same number of elements, and with the + // same order. + Counts []int64 `json:"counts" validate:"requiredIfAny=values,minVals=0"` + + // At least one of value or values must be specified. + _ struct{} `validate:"requiredAnyOf=value;values"` } type metricsetSpanRef struct { diff --git a/model/modeldecoder/v2/model_generated.go b/model/modeldecoder/v2/model_generated.go index 525badbf3cd..50efcbd5b09 100644 --- a/model/modeldecoder/v2/model_generated.go +++ b/model/modeldecoder/v2/model_generated.go @@ -53,7 +53,7 @@ func (val *metadataRoot) validate() error { } func (val *metadata) IsSet() bool { - return val.Cloud.IsSet() || len(val.Labels) > 0 || val.Process.IsSet() || val.Service.IsSet() || val.System.IsSet() || val.User.IsSet() + return val.Cloud.IsSet() || (len(val.Labels) > 0) || val.Process.IsSet() || val.Service.IsSet() || val.System.IsSet() || val.User.IsSet() } func (val *metadata) Reset() { @@ -257,7 +257,7 @@ func (val *metadataCloudService) validate() error { } func (val *metadataProcess) IsSet() bool { - return len(val.Argv) > 0 || val.Pid.IsSet() || val.Ppid.IsSet() || val.Title.IsSet() + return (len(val.Argv) > 0) || val.Pid.IsSet() || val.Ppid.IsSet() || val.Title.IsSet() } func (val *metadataProcess) Reset() { @@ -694,23 +694,23 @@ func (val *errorEvent) validate() error { return fmt.Errorf("'parent_id': validation rule 'maxLength(1024)' violated") } if !val.ParentID.IsSet() { - if val.TraceID.IsSet() { - return fmt.Errorf("'parent_id' required when 'trace_id' is set") - } if val.TransactionID.IsSet() { return fmt.Errorf("'parent_id' required when 'transaction_id' is set") } + if val.TraceID.IsSet() { + return fmt.Errorf("'parent_id' required when 'trace_id' is set") + } } if val.TraceID.IsSet() && utf8.RuneCountInString(val.TraceID.Val) > 1024 { return fmt.Errorf("'trace_id': validation rule 'maxLength(1024)' violated") } if !val.TraceID.IsSet() { - if val.ParentID.IsSet() { - return fmt.Errorf("'trace_id' required when 'parent_id' is set") - } if val.TransactionID.IsSet() { return fmt.Errorf("'trace_id' required when 'transaction_id' is set") } + if val.ParentID.IsSet() { + return fmt.Errorf("'trace_id' required when 'parent_id' is set") + } } if err := val.Transaction.validate(); err != nil { return errors.Wrapf(err, "transaction") @@ -725,7 +725,7 @@ func (val *errorEvent) validate() error { } func (val *context) IsSet() bool { - return len(val.Custom) > 0 || val.Experimental.IsSet() || val.Message.IsSet() || val.Page.IsSet() || val.Response.IsSet() || val.Request.IsSet() || val.Service.IsSet() || len(val.Tags) > 0 || val.User.IsSet() + return (len(val.Custom) > 0) || val.Experimental.IsSet() || val.Message.IsSet() || val.Page.IsSet() || val.Response.IsSet() || val.Request.IsSet() || val.Service.IsSet() || (len(val.Tags) > 0) || val.User.IsSet() } func (val *context) Reset() { @@ -877,7 +877,7 @@ func (val *contextResponse) validate() error { } func (val *contextRequest) IsSet() bool { - return val.Body.IsSet() || len(val.Cookies) > 0 || len(val.Env) > 0 || val.Headers.IsSet() || val.HTTPVersion.IsSet() || val.Method.IsSet() || val.Socket.IsSet() || val.URL.IsSet() + return val.Body.IsSet() || (len(val.Cookies) > 0) || (len(val.Env) > 0) || val.Headers.IsSet() || val.HTTPVersion.IsSet() || val.Method.IsSet() || val.Socket.IsSet() || val.URL.IsSet() } func (val *contextRequest) Reset() { @@ -1160,7 +1160,7 @@ func (val *contextServiceRuntime) validate() error { } func (val *errorException) IsSet() bool { - return len(val.Attributes) > 0 || val.Code.IsSet() || len(val.Cause) > 0 || val.Handled.IsSet() || val.Message.IsSet() || val.Module.IsSet() || len(val.Stacktrace) > 0 || val.Type.IsSet() + return (len(val.Attributes) > 0) || val.Code.IsSet() || (len(val.Cause) > 0) || val.Handled.IsSet() || val.Message.IsSet() || val.Module.IsSet() || (len(val.Stacktrace) > 0) || val.Type.IsSet() } func (val *errorException) Reset() { @@ -1223,7 +1223,7 @@ func (val *errorException) validate() error { } func (val *stacktraceFrame) IsSet() bool { - return val.AbsPath.IsSet() || val.Classname.IsSet() || val.ColumnNumber.IsSet() || val.ContextLine.IsSet() || val.Filename.IsSet() || val.Function.IsSet() || val.LibraryFrame.IsSet() || val.LineNumber.IsSet() || val.Module.IsSet() || len(val.PostContext) > 0 || len(val.PreContext) > 0 || len(val.Vars) > 0 + return val.AbsPath.IsSet() || val.Classname.IsSet() || val.ColumnNumber.IsSet() || val.ContextLine.IsSet() || val.Filename.IsSet() || val.Function.IsSet() || val.LibraryFrame.IsSet() || val.LineNumber.IsSet() || val.Module.IsSet() || (len(val.PostContext) > 0) || (len(val.PreContext) > 0) || (len(val.Vars) > 0) } func (val *stacktraceFrame) Reset() { @@ -1254,7 +1254,7 @@ func (val *stacktraceFrame) validate() error { } func (val *errorLog) IsSet() bool { - return val.Level.IsSet() || val.LoggerName.IsSet() || val.Message.IsSet() || val.ParamMessage.IsSet() || len(val.Stacktrace) > 0 + return val.Level.IsSet() || val.LoggerName.IsSet() || val.Message.IsSet() || val.ParamMessage.IsSet() || (len(val.Stacktrace) > 0) } func (val *errorLog) Reset() { @@ -1330,7 +1330,7 @@ func (val *metricsetRoot) validate() error { } func (val *metricset) IsSet() bool { - return val.Timestamp.IsSet() || len(val.Samples) > 0 || val.Span.IsSet() || len(val.Tags) > 0 || val.Transaction.IsSet() + return val.Timestamp.IsSet() || (len(val.Samples) > 0) || val.Span.IsSet() || (len(val.Tags) > 0) || val.Transaction.IsSet() } func (val *metricset) Reset() { @@ -1383,19 +1383,38 @@ func (val *metricset) validate() error { } func (val *metricsetSampleValue) IsSet() bool { - return val.Value.IsSet() + return val.Type.IsSet() || val.Unit.IsSet() || val.Value.IsSet() || (len(val.Values) > 0) || (len(val.Counts) > 0) } func (val *metricsetSampleValue) Reset() { + val.Type.Reset() + val.Unit.Reset() val.Value.Reset() + val.Values = val.Values[:0] + val.Counts = val.Counts[:0] } func (val *metricsetSampleValue) validate() error { if !val.IsSet() { return nil } - if !val.Value.IsSet() { - return fmt.Errorf("'value' required") + if !(len(val.Values) > 0) { + if len(val.Counts) > 0 { + return fmt.Errorf("'values' required when 'counts' is set") + } + } + for _, elem := range val.Counts { + if elem < 0 { + return fmt.Errorf("'counts': validation rule 'minVals(0)' violated") + } + } + if !(len(val.Counts) > 0) { + if len(val.Values) > 0 { + return fmt.Errorf("'counts' required when 'values' is set") + } + } + if !val.Value.IsSet() && !(len(val.Values) > 0) { + return fmt.Errorf("requires at least one of the fields 'value;values'") } return nil } @@ -1463,7 +1482,7 @@ func (val *spanRoot) validate() error { } func (val *span) IsSet() bool { - return val.Action.IsSet() || len(val.ChildIDs) > 0 || val.Context.IsSet() || val.Duration.IsSet() || val.ID.IsSet() || val.Name.IsSet() || val.Outcome.IsSet() || val.ParentID.IsSet() || val.SampleRate.IsSet() || len(val.Stacktrace) > 0 || val.Start.IsSet() || val.Subtype.IsSet() || val.Sync.IsSet() || val.Timestamp.IsSet() || val.TraceID.IsSet() || val.TransactionID.IsSet() || val.Type.IsSet() + return val.Action.IsSet() || (len(val.ChildIDs) > 0) || val.Context.IsSet() || val.Duration.IsSet() || val.ID.IsSet() || val.Name.IsSet() || val.Outcome.IsSet() || val.ParentID.IsSet() || val.SampleRate.IsSet() || (len(val.Stacktrace) > 0) || val.Start.IsSet() || val.Subtype.IsSet() || val.Sync.IsSet() || val.Timestamp.IsSet() || val.TraceID.IsSet() || val.TransactionID.IsSet() || val.Type.IsSet() } func (val *span) Reset() { @@ -1570,7 +1589,7 @@ func (val *span) validate() error { } func (val *spanContext) IsSet() bool { - return val.Database.IsSet() || val.Destination.IsSet() || val.Experimental.IsSet() || val.HTTP.IsSet() || val.Message.IsSet() || val.Service.IsSet() || len(val.Tags) > 0 + return val.Database.IsSet() || val.Destination.IsSet() || val.Experimental.IsSet() || val.HTTP.IsSet() || val.Message.IsSet() || val.Service.IsSet() || (len(val.Tags) > 0) } func (val *spanContext) Reset() { @@ -1856,7 +1875,7 @@ func (val *transaction) validate() error { } func (val *transactionMarks) IsSet() bool { - return len(val.Events) > 0 + return (len(val.Events) > 0) } func (val *transactionMarks) Reset() { @@ -1873,7 +1892,7 @@ func (val *transactionMarks) validate() error { } func (val *transactionMarkEvents) IsSet() bool { - return len(val.Measurements) > 0 + return (len(val.Measurements) > 0) } func (val *transactionMarkEvents) Reset() { diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json index 7744fe896b0..61ebf079843 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json @@ -258,6 +258,62 @@ "id": "axb123hg", "name": "logged-in-user" } + }, + { + "@timestamp": "2017-05-30T18:53:41.366Z", + "_doc_count": 6, + "_metric_descriptions": { + "latency_distribution": { + "type": "histogram", + "unit": "s" + } + }, + "agent": { + "name": "elastic-node", + "version": "3.14.0" + }, + "data_stream.dataset": "apm.app.1234_service_12a3", + "data_stream.type": "metrics", + "host": { + "ip": "192.0.0.1" + }, + "labels": { + "tag1": "one", + "tag2": 2 + }, + "latency_distribution": { + "counts": [ + 1, + 2, + 3 + ], + "values": [ + 1.1, + 2.2, + 3.3 + ] + }, + "process": { + "pid": 1234 + }, + "processor": { + "event": "metric", + "name": "metric" + }, + "service": { + "language": { + "name": "ecmascript" + }, + "name": "1234_service-12a3", + "node": { + "name": "node-1" + } + }, + "user": { + "email": "user@mail.com", + "id": "axb123hg", + "name": "logged-in-user" + } } ] } diff --git a/processor/stream/test_approved_stream_result/testIntegrationResultMetricsets.approved.json b/processor/stream/test_approved_stream_result/testIntegrationResultMetricsets.approved.json index 6d6a93fd6b1..93ed508ba6f 100644 --- a/processor/stream/test_approved_stream_result/testIntegrationResultMetricsets.approved.json +++ b/processor/stream/test_approved_stream_result/testIntegrationResultMetricsets.approved.json @@ -1,3 +1,3 @@ { - "accepted": 4 + "accepted": 5 } diff --git a/systemtest/approvals/TestApprovedMetrics.approved.json b/systemtest/approvals/TestApprovedMetrics.approved.json index 4597d7ffd74..213e4bd859e 100644 --- a/systemtest/approvals/TestApprovedMetrics.approved.json +++ b/systemtest/approvals/TestApprovedMetrics.approved.json @@ -205,6 +205,69 @@ "name": "logged-in-user" } }, + { + "@timestamp": "2017-05-30T18:53:41.366Z", + "_doc_count": 6, + "agent": { + "name": "elastic-node", + "version": "3.14.0" + }, + "ecs": { + "version": "dynamic" + }, + "event": { + "ingested": "dynamic" + }, + "host": { + "ip": "127.0.0.1" + }, + "labels": { + "tag1": "one", + "tag2": 2 + }, + "latency_distribution": { + "counts": [ + 1, + 2, + 3 + ], + "values": [ + 1.1, + 2.2, + 3.3 + ] + }, + "metricset.name": "app", + "observer": { + "ephemeral_id": "dynamic", + "hostname": "dynamic", + "id": "dynamic", + "type": "apm-server", + "version": "dynamic", + "version_major": "dynamic" + }, + "process": { + "pid": 1234 + }, + "processor": { + "event": "metric", + "name": "metric" + }, + "service": { + "language": { + "name": "ecmascript" + }, + "name": "1234_service-12a3", + "node": { + "name": "node-1" + } + }, + "user": { + "email": "user@mail.com", + "id": "axb123hg", + "name": "logged-in-user" + } + }, { "@timestamp": "2017-05-30T18:53:42.281Z", "agent": { diff --git a/systemtest/metrics_test.go b/systemtest/metrics_test.go index c6098d698be..e19fa9d17bc 100644 --- a/systemtest/metrics_test.go +++ b/systemtest/metrics_test.go @@ -20,6 +20,7 @@ package systemtest_test import ( "bytes" "context" + "encoding/json" "io/ioutil" "net/http" "testing" @@ -43,19 +44,37 @@ func TestApprovedMetrics(t *testing.T) { eventsPayload, err := ioutil.ReadFile("../testdata/intake-v2/metricsets.ndjson") require.NoError(t, err) - req, _ := http.NewRequest("POST", srv.URL+"/intake/v2/events", bytes.NewReader(eventsPayload)) + req, _ := http.NewRequest("POST", srv.URL+"/intake/v2/events?verbose=true", bytes.NewReader(eventsPayload)) req.Header.Set("Content-Type", "application/x-ndjson") resp, err := http.DefaultClient.Do(req) require.NoError(t, err) defer resp.Body.Close() assert.Equal(t, http.StatusAccepted, resp.StatusCode) + var ingestResult struct { + Accepted int + } + err = json.NewDecoder(resp.Body).Decode(&ingestResult) + assert.NoError(t, err) // Check the metrics documents are exactly as we expect. - result := systemtest.Elasticsearch.ExpectMinDocs(t, 3, "apm-*", estest.TermQuery{ + result := systemtest.Elasticsearch.ExpectMinDocs(t, ingestResult.Accepted, "apm-*", estest.TermQuery{ Field: "processor.event", Value: "metric", }) systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits) + + // Check dynamic mapping of histograms. + mappings := getFieldMappings(t, []string{"apm-*"}, []string{"latency_distribution"}) + assert.Equal(t, map[string]interface{}{ + "latency_distribution": map[string]interface{}{ + "full_name": "latency_distribution", + "mapping": map[string]interface{}{ + "latency_distribution": map[string]interface{}{ + "type": "histogram", + }, + }, + }, + }, mappings) } func TestBreakdownMetrics(t *testing.T) { @@ -139,23 +158,7 @@ func TestApplicationMetrics(t *testing.T) { // Check that the index mapping has been updated for the custom // metrics, with the expected dynamically mapped field types. - var allMappings map[string]struct { - Mappings map[string]interface{} - } - _, err := systemtest.Elasticsearch.Do(context.Background(), &esapi.IndicesGetFieldMappingRequest{ - Index: []string{"apm-*"}, - Fields: []string{"a.b.c", "x.y.z"}, - }, &allMappings) - require.NoError(t, err) - - var mappings map[string]interface{} - for _, index := range allMappings { - if len(index.Mappings) != 0 { - mappings = index.Mappings - break - } - } - require.NotEmpty(t, mappings) + mappings := getFieldMappings(t, []string{"apm-*"}, []string{"a.b.c", "x.y.z"}) assert.Equal(t, map[string]interface{}{ "a.b.c": map[string]interface{}{ "full_name": "a.b.c", @@ -176,6 +179,26 @@ func TestApplicationMetrics(t *testing.T) { }, mappings) } +func getFieldMappings(t testing.TB, index []string, fields []string) map[string]interface{} { + var allMappings map[string]struct { + Mappings map[string]interface{} + } + _, err := systemtest.Elasticsearch.Do(context.Background(), &esapi.IndicesGetFieldMappingRequest{ + Index: index, + Fields: fields, + }, &allMappings) + require.NoError(t, err) + + mappings := make(map[string]interface{}) + for _, index := range allMappings { + for k, v := range index.Mappings { + assert.NotContains(t, mappings, k, "field %q exists in multiple indices", k) + mappings[k] = v + } + } + return mappings +} + type metricsetTransaction struct { Type string `json:"type"` } diff --git a/systemtest/template_test.go b/systemtest/template_test.go index b93fcef3505..27d6ee524ed 100644 --- a/systemtest/template_test.go +++ b/systemtest/template_test.go @@ -98,6 +98,7 @@ func TestIndexTemplateCoverage(t *testing.T) { "double_gauge", "byte_counter", "short_counter", + "latency_distribution", } for index, indexMappings := range indexMappings { diff --git a/testdata/intake-v2/metricsets.ndjson b/testdata/intake-v2/metricsets.ndjson index 3e765a529de..c91affe6bf5 100644 --- a/testdata/intake-v2/metricsets.ndjson +++ b/testdata/intake-v2/metricsets.ndjson @@ -3,3 +3,4 @@ {"metricset": { "samples": { "go.memstats.heap.sys.bytes": { "value": 6.520832e+06 }}, "timestamp": 1496170421364000}} {"metricset": { "samples": { "system.process.cgroup.memory.mem.limit.bytes":{"value":2048},"system.process.cgroup.memory.mem.usage.bytes":{"value":1024}},"timestamp": 1496170421366000}} {"metricset": { "samples": { "system.process.cgroup.cpu.id": { "value": 2048 }, "system.process.cgroup.cpu.cfs.quota.us": { "value": 2048 }, "system.process.cgroup.cpu.stats.periods": { "value": 2048 }, "system.process.cgroup.cpu.stats.throttled.periods": { "value": 2048 }, "system.process.cgroup.cpu.stats.throttled.ns": { "value": 2048 }, "system.process.cgroup.cpuacct.id": { "value": 2048 }, "system.process.cgroup.cpuacct.total.ns": { "value": 2048 }, "system.process.cgroup.cpu.cfs.period.us": { "value": 1024 } }, "timestamp": 1496170421366000 }} +{"metricset": { "samples": { "latency_distribution": { "type": "histogram", "unit": "s", "counts": [1,2,3], "values": [1.1,2.2,3.3] } }, "timestamp": 1496170421366000 }}