diff --git a/agentcfg/reporter.go b/agentcfg/reporter.go index 32e46f691dd..97afbb3bcdc 100644 --- a/agentcfg/reporter.go +++ b/agentcfg/reporter.go @@ -85,13 +85,15 @@ func (r Reporter) Run(ctx context.Context) error { } batch := make(model.Batch, 0, len(applied)) for etag := range applied { - batch = append(batch, model.APMEvent{Metricset: &model.Metricset{ - Name: "agent_config", + batch = append(batch, model.APMEvent{ Labels: common.MapStr{"etag": etag}, - Samples: map[string]model.MetricsetSample{ - "agent_config_applied": {Value: 1}, + Metricset: &model.Metricset{ + Name: "agent_config", + Samples: map[string]model.MetricsetSample{ + "agent_config_applied": {Value: 1}, + }, }, - }}) + }) } // Reset applied map, so that we report only configs applied // during a given iteration. diff --git a/agentcfg/reporter_test.go b/agentcfg/reporter_test.go index 71613a37a71..af9bd45d0de 100644 --- a/agentcfg/reporter_test.go +++ b/agentcfg/reporter_test.go @@ -90,7 +90,7 @@ func (f fauxFetcher) Fetch(_ context.Context, q Query) (Result, error) { type batchProcessor struct { receivedc chan struct{} - received []*model.Metricset + received []model.APMEvent mu sync.Mutex } @@ -98,7 +98,7 @@ func (p *batchProcessor) ProcessBatch(_ context.Context, b *model.Batch) error { p.mu.Lock() defer p.mu.Unlock() for _, event := range *b { - p.received = append(p.received, event.Metricset) + p.received = append(p.received, event) } p.receivedc <- struct{}{} return nil diff --git a/beater/api/profile/convert.go b/beater/api/profile/convert.go index 8d97a228404..3c0248a5c39 100644 --- a/beater/api/profile/convert.go +++ b/beater/api/profile/convert.go @@ -25,8 +25,6 @@ import ( "github.com/gofrs/uuid" "github.com/google/pprof/profile" - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/apm-server/model" ) @@ -36,7 +34,7 @@ func appendProfileSampleBatch(pp *profile.Profile, baseEvent model.APMEvent, out // Precompute value field names for use in each event. // TODO(axw) limit to well-known value names? - profileTimestamp := time.Unix(0, pp.TimeNanos) + baseEvent.Timestamp = time.Unix(0, pp.TimeNanos) valueFieldNames := make([]string, len(pp.SampleType)) for i, sampleType := range pp.SampleType { sampleUnit := normalizeUnit(sampleType.Unit) @@ -88,11 +86,11 @@ func appendProfileSampleBatch(pp *profile.Profile, baseEvent model.APMEvent, out } } - var labels common.MapStr + event := baseEvent + event.Labels = event.Labels.Clone() if n := len(sample.Label); n > 0 { - labels = make(common.MapStr, n) for k, v := range sample.Label { - labels[k] = v + event.Labels[k] = v } } @@ -101,13 +99,10 @@ func appendProfileSampleBatch(pp *profile.Profile, baseEvent model.APMEvent, out values[valueFieldNames[i]] = value } - event := baseEvent event.ProfileSample = &model.ProfileSample{ - Timestamp: profileTimestamp, Duration: time.Duration(pp.DurationNanos), ProfileID: profileID, Stack: stack, - Labels: labels, Values: values, } out = append(out, event) diff --git a/beater/beater_test.go b/beater/beater_test.go index 41cb32f238d..a0e08238da6 100644 --- a/beater/beater_test.go +++ b/beater/beater_test.go @@ -155,16 +155,17 @@ func newTestBeater( Logger: logger, WrapRunServer: func(runServer RunServerFunc) RunServerFunc { var processor model.ProcessBatchFunc = func(ctx context.Context, batch *model.Batch) error { - for _, event := range *batch { + for i := range *batch { + event := &(*batch)[i] if event.Transaction == nil { continue } // Add a label to test that everything // goes through the wrapped reporter. - if event.Transaction.Labels == nil { - event.Transaction.Labels = common.MapStr{} + if event.Labels == nil { + event.Labels = common.MapStr{} } - event.Transaction.Labels["wrapped_reporter"] = true + event.Labels["wrapped_reporter"] = true } return nil } diff --git a/model/apmevent.go b/model/apmevent.go index 4f21f2ffec5..40ffb9c514c 100644 --- a/model/apmevent.go +++ b/model/apmevent.go @@ -19,7 +19,9 @@ package model import ( "context" + "time" + "github.com/elastic/apm-server/utility" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" ) @@ -46,11 +48,10 @@ type APMEvent struct { Cloud Cloud Network Network + // Timestamp holds the event timestamp. + Timestamp time.Time + // Labels holds labels to apply to the event. - // - // TODO(axw) remove Transaction.Labels, Span.Labels, etc., - // and merge into these labels at decoding time. There can - // be only one. Labels common.MapStr Transaction *Transaction @@ -61,30 +62,32 @@ type APMEvent struct { } func (e *APMEvent) appendBeatEvent(ctx context.Context, out []beat.Event) []beat.Event { - var event beat.Event - var eventLabels common.MapStr + event := beat.Event{Timestamp: e.Timestamp} switch { case e.Transaction != nil: - event = e.Transaction.toBeatEvent() - eventLabels = e.Transaction.Labels + event.Fields = e.Transaction.fields() case e.Span != nil: - event = e.Span.toBeatEvent(ctx) - eventLabels = e.Span.Labels + event.Fields = e.Span.fields() case e.Metricset != nil: - event = e.Metricset.toBeatEvent() - eventLabels = e.Metricset.Labels + event.Fields = e.Metricset.fields() case e.Error != nil: - event = e.Error.toBeatEvent(ctx) - eventLabels = e.Error.Labels + event.Fields = e.Error.fields() case e.ProfileSample != nil: - event = e.ProfileSample.toBeatEvent() - eventLabels = e.ProfileSample.Labels + event.Fields = e.ProfileSample.fields() default: return out } + // Set high resolution timestamp. + // + // TODO(axw) change @timestamp to use date_nanos, and remove this field. + if !e.Timestamp.IsZero() && (e.Transaction != nil || e.Span != nil || e.Error != nil) { + event.Fields["timestamp"] = utility.TimeAsMicros(e.Timestamp) + } + // Set fields common to all events. fields := (*mapStr)(&event.Fields) + event.Timestamp = e.Timestamp e.DataStream.setFields(fields) fields.maybeSetMapStr("service", e.Service.Fields()) fields.maybeSetMapStr("agent", e.Agent.fields()) @@ -104,6 +107,6 @@ func (e *APMEvent) appendBeatEvent(ctx context.Context, out []beat.Event) []beat fields.maybeSetMapStr("kubernetes", e.Kubernetes.fields()) fields.maybeSetMapStr("cloud", e.Cloud.fields()) fields.maybeSetMapStr("network", e.Network.fields()) - maybeSetLabels(fields, e.Labels, eventLabels) + fields.maybeSetMapStr("labels", sanitizeLabels(e.Labels)) return append(out, event) } diff --git a/model/apmevent_test.go b/model/apmevent_test.go index 8a3b920f6a9..ad7628b8139 100644 --- a/model/apmevent_test.go +++ b/model/apmevent_test.go @@ -20,6 +20,7 @@ package model import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -57,13 +58,12 @@ func TestAPMEventFields(t *testing.T) { Hostname: hostname, Name: host, }, - Client: Client{Domain: "client.domain"}, - Process: Process{Pid: pid}, - User: User{ID: uid, Email: mail}, - Labels: common.MapStr{"a": "a1", "b": "b1"}, - Transaction: &Transaction{ - Labels: common.MapStr{"b": "b2", "c": "c2"}, - }, + Client: Client{Domain: "client.domain"}, + Process: Process{Pid: pid}, + User: User{ID: uid, Email: mail}, + Labels: common.MapStr{"a": "b", "c": 123}, + Transaction: &Transaction{}, + Timestamp: time.Date(2019, 1, 3, 15, 17, 4, 908.596*1e6, time.FixedZone("+0100", 3600)), }, output: common.MapStr{ // common fields @@ -79,9 +79,8 @@ func TestAPMEventFields(t *testing.T) { "client": common.MapStr{"domain": "client.domain"}, "source": common.MapStr{"domain": "client.domain"}, "labels": common.MapStr{ - "a": "a1", - "b": "b2", - "c": "c2", + "a": "b", + "c": 123, }, // fields related to APMEvent.Transaction @@ -90,6 +89,7 @@ func TestAPMEventFields(t *testing.T) { "name": "transaction", "event": "transaction", }, + "timestamp": common.MapStr{"us": int64(1546525024908596)}, "transaction": common.MapStr{ "duration": common.MapStr{"us": 0}, "sampled": false, diff --git a/model/error.go b/model/error.go index 572dc220592..ea807f51c95 100644 --- a/model/error.go +++ b/model/error.go @@ -18,14 +18,8 @@ package model import ( - "context" - "time" - - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/monitoring" - - "github.com/elastic/apm-server/utility" ) var ( @@ -48,11 +42,8 @@ type Error struct { TraceID string ParentID string - Timestamp time.Time - GroupingKey string Culprit string - Labels common.MapStr Page *Page HTTP *HTTP URL *URL @@ -87,7 +78,7 @@ type Log struct { Stacktrace Stacktrace } -func (e *Error) toBeatEvent(ctx context.Context) beat.Event { +func (e *Error) fields() common.MapStr { errorTransformations.Inc() if e.Exception != nil { @@ -97,11 +88,7 @@ func (e *Error) toBeatEvent(ctx context.Context) beat.Event { addStacktraceCounter(e.Log.Stacktrace) } - fields := mapStr{ - "error": e.fields(), - "processor": errorProcessorEntry, - } - + fields := mapStr{"processor": errorProcessorEntry} if e.HTTP != nil { fields.maybeSetMapStr("http", e.HTTP.transactionTopLevelFields()) } @@ -123,28 +110,19 @@ func (e *Error) toBeatEvent(ctx context.Context) beat.Event { trace.maybeSetString("id", e.TraceID) fields.maybeSetMapStr("parent", common.MapStr(parent)) fields.maybeSetMapStr("trace", common.MapStr(trace)) - fields.maybeSetMapStr("timestamp", utility.TimeAsMicros(e.Timestamp)) - - return beat.Event{ - Fields: common.MapStr(fields), - Timestamp: e.Timestamp, - } -} - -func (e *Error) fields() common.MapStr { - var fields mapStr - fields.maybeSetString("id", e.ID) - fields.maybeSetMapStr("page", e.Page.Fields()) + var errorFields mapStr + errorFields.maybeSetString("id", e.ID) + errorFields.maybeSetMapStr("page", e.Page.Fields()) exceptionChain := flattenExceptionTree(e.Exception) if exception := e.exceptionFields(exceptionChain); len(exception) > 0 { - fields.set("exception", exception) + errorFields.set("exception", exception) } - fields.maybeSetMapStr("log", e.logFields()) - - fields.maybeSetString("culprit", e.Culprit) - fields.maybeSetMapStr("custom", customFields(e.Custom)) - fields.maybeSetString("grouping_key", e.GroupingKey) + errorFields.maybeSetMapStr("log", e.logFields()) + errorFields.maybeSetString("culprit", e.Culprit) + errorFields.maybeSetMapStr("custom", customFields(e.Custom)) + errorFields.maybeSetString("grouping_key", e.GroupingKey) + fields.set("error", common.MapStr(errorFields)) return common.MapStr(fields) } diff --git a/model/error_test.go b/model/error_test.go index a01e4c4dfd5..7bc21b2bd80 100644 --- a/model/error_test.go +++ b/model/error_test.go @@ -18,10 +18,8 @@ package model import ( - "context" "fmt" "testing" - "time" "github.com/stretchr/testify/assert" @@ -158,7 +156,6 @@ func TestEventFields(t *testing.T) { "withFrames": { Error: Error{ ID: id, - Timestamp: time.Now(), Culprit: culprit, Exception: &exception, Log: &log, @@ -191,9 +188,8 @@ func TestEventFields(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { - output := tc.Error.toBeatEvent(context.Background()) - fields := output.Fields["error"] - assert.Equal(t, tc.Output, fields) + fields := tc.Error.fields() + assert.Equal(t, tc.Output, fields["error"]) }) } } @@ -209,9 +205,8 @@ func TestErrorTransformPage(t *testing.T) { }{ { Error: Error{ - ID: id, - Timestamp: time.Now(), - URL: ParseURL("https://localhost:8200/", "", ""), + ID: id, + URL: ParseURL("https://localhost:8200/", "", ""), Page: &Page{ URL: ParseURL(urlExample, "", ""), }, @@ -229,7 +224,7 @@ func TestErrorTransformPage(t *testing.T) { } for idx, test := range tests { - output := test.Error.toBeatEvent(context.Background()) - assert.Equal(t, test.Output, output.Fields["url"], fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg)) + fields := test.Error.fields() + assert.Equal(t, test.Output, fields["url"], fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg)) } } diff --git a/model/labels.go b/model/labels.go index 66c1feca074..85951e3d8ed 100644 --- a/model/labels.go +++ b/model/labels.go @@ -23,31 +23,20 @@ import ( "github.com/elastic/beats/v7/libbeat/common" ) -// If either or both globalLabels/eventLabels is non-empty, set a "labels" -// field in out to the combination of the labels. -// // Label keys are sanitized, replacing the reserved characters '.', '*' and '"' -// with '_'. Event-specific labels take precedence over global labels. -// Null-valued labels are omitted. -func maybeSetLabels(out *mapStr, globalLabels, eventLabels common.MapStr) { - n := len(globalLabels) + len(eventLabels) - if n == 0 { - return - } - combined := make(common.MapStr, n) - for k, v := range globalLabels { +// with '_'. Null-valued labels are omitted. +func sanitizeLabels(labels common.MapStr) common.MapStr { + for k, v := range labels { if v == nil { + delete(labels, k) continue } - combined[sanitizeLabelKey(k)] = v - } - for k, v := range eventLabels { - if v == nil { - continue + if k2 := sanitizeLabelKey(k); k != k2 { + delete(labels, k) + labels[k2] = v } - combined[sanitizeLabelKey(k)] = v } - out.set("labels", combined) + return labels } func sanitizeLabelKey(k string) string { diff --git a/model/metricset.go b/model/metricset.go index c65322c816f..c0421f1106f 100644 --- a/model/metricset.go +++ b/model/metricset.go @@ -18,9 +18,6 @@ package model import ( - "time" - - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/monitoring" ) @@ -53,9 +50,6 @@ const ( // Metricset describes a set of metrics and associated metadata. type Metricset struct { - // Timestamp holds the time at which the metrics were published. - Timestamp time.Time - // Event holds information about the event category with which the // metrics are associated. Event MetricsetEventCategorization @@ -68,11 +62,6 @@ type Metricset struct { // metrics are associated. Span MetricsetSpan - // Labels holds arbitrary labels to apply to the metrics. - // - // These labels override any with the same names in Metadata.Labels. - Labels common.MapStr - // Samples holds the metrics in the set. Samples map[string]MetricsetSample @@ -165,7 +154,7 @@ type MetricsetSpan struct { DestinationService DestinationService } -func (me *Metricset) toBeatEvent() beat.Event { +func (me *Metricset) fields() common.MapStr { metricsetTransformations.Inc() var fields mapStr @@ -192,11 +181,7 @@ func (me *Metricset) toBeatEvent() beat.Event { metricDescriptions.maybeSetMapStr(name, common.MapStr(md)) } fields.maybeSetMapStr("_metric_descriptions", common.MapStr(metricDescriptions)) - - return beat.Event{ - Fields: common.MapStr(fields), - Timestamp: me.Timestamp, - } + return common.MapStr(fields) } func (e *MetricsetEventCategorization) fields() common.MapStr { diff --git a/model/metricset_test.go b/model/metricset_test.go index b65ceb5602d..987954010f4 100644 --- a/model/metricset_test.go +++ b/model/metricset_test.go @@ -21,7 +21,6 @@ import ( "context" "fmt" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -30,7 +29,6 @@ import ( ) func TestMetricset(t *testing.T) { - timestamp := time.Now() resource := "external-service" const ( @@ -50,14 +48,14 @@ func TestMetricset(t *testing.T) { Msg string }{ { - Metricset: &Metricset{Timestamp: timestamp}, + Metricset: &Metricset{}, Output: common.MapStr{ "processor": common.MapStr{"event": "metric", "name": "metric"}, }, Msg: "Payload with empty metric.", }, { - Metricset: &Metricset{Timestamp: timestamp, Name: "raj"}, + Metricset: &Metricset{Name: "raj"}, Output: common.MapStr{ "processor": common.MapStr{"event": "metric", "name": "metric"}, "metricset.name": "raj", @@ -66,8 +64,6 @@ func TestMetricset(t *testing.T) { }, { Metricset: &Metricset{ - Labels: common.MapStr{"a_b": "a.b.value"}, - Timestamp: timestamp, Samples: map[string]MetricsetSample{ "a.counter": {Value: 612}, "some.gauge": {Value: 9.16}, @@ -75,7 +71,6 @@ func TestMetricset(t *testing.T) { }, Output: common.MapStr{ "processor": common.MapStr{"event": "metric", "name": "metric"}, - "labels": common.MapStr{"a_b": "a.b.value"}, "a.counter": 612.0, "some.gauge": 9.16, }, @@ -83,7 +78,6 @@ func TestMetricset(t *testing.T) { }, { Metricset: &Metricset{ - Timestamp: timestamp, Span: MetricsetSpan{Type: spType, Subtype: spSubtype}, Transaction: MetricsetTransaction{Type: trType, Name: trName}, Samples: map[string]MetricsetSample{ @@ -102,8 +96,7 @@ func TestMetricset(t *testing.T) { }, { Metricset: &Metricset{ - Timestamp: timestamp, - Event: MetricsetEventCategorization{Outcome: eventOutcome}, + Event: MetricsetEventCategorization{Outcome: eventOutcome}, Transaction: MetricsetTransaction{ Type: trType, Name: trName, @@ -146,7 +139,6 @@ func TestMetricset(t *testing.T) { }, { Metricset: &Metricset{ - Timestamp: timestamp, Span: MetricsetSpan{Type: spType, Subtype: spSubtype, DestinationService: DestinationService{ Resource: resource, }}, @@ -168,7 +160,6 @@ func TestMetricset(t *testing.T) { }, { Metricset: &Metricset{ - Timestamp: timestamp, Samples: map[string]MetricsetSample{ "latency_histogram": { Type: "histogram", @@ -212,12 +203,9 @@ func TestMetricset(t *testing.T) { } for idx, test := range tests { - event := APMEvent{ - Metricset: test.Metricset, - } + event := APMEvent{Metricset: test.Metricset} outputEvents := event.appendBeatEvent(context.Background(), nil) require.Len(t, outputEvents, 1) assert.Equal(t, test.Output, outputEvents[0].Fields, fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg)) - assert.Equal(t, timestamp, outputEvents[0].Timestamp, fmt.Sprintf("Bad timestamp at idx %v; %s", idx, test.Msg)) } } diff --git a/model/modeldecoder/input.go b/model/modeldecoder/input.go index 537de5c9616..bef890d6041 100644 --- a/model/modeldecoder/input.go +++ b/model/modeldecoder/input.go @@ -18,18 +18,11 @@ package modeldecoder import ( - "time" - "github.com/elastic/apm-server/model" ) // Input holds the input required for decoding an event. type Input struct { - // RequestTime is the time at which the event was received - // by the server. This is used to set the timestamp for - // events sent by RUM. - RequestTime time.Time - // Base holds the base for decoding events. Base model.APMEvent diff --git a/model/modeldecoder/modeldecoderutil/labels.go b/model/modeldecoder/modeldecoderutil/labels.go index 308ab24ef5f..b24efa45094 100644 --- a/model/modeldecoder/modeldecoderutil/labels.go +++ b/model/modeldecoder/modeldecoderutil/labels.go @@ -23,6 +23,22 @@ import ( "github.com/elastic/beats/v7/libbeat/common" ) +// MergeLabels merges eventLabels onto commonLabels. This is used for +// combining event-specific labels onto (metadata) global labels. +// +// If commonLabels is non-nil, it is first cloned. If commonLabels +// is nil, then eventLabels is cloned. +func MergeLabels(commonLabels, eventLabels common.MapStr) common.MapStr { + if commonLabels == nil { + return eventLabels.Clone() + } + combinedLabels := commonLabels.Clone() + for k, v := range eventLabels { + combinedLabels[k] = v + } + return combinedLabels +} + // NormalizeLabelValues transforms the values in labels, replacing any // instance of json.Number with libbeat/common.Float, and returning // labels. diff --git a/model/modeldecoder/rumv3/decoder.go b/model/modeldecoder/rumv3/decoder.go index d6d8aaf025a..99cbca50e4d 100644 --- a/model/modeldecoder/rumv3/decoder.go +++ b/model/modeldecoder/rumv3/decoder.go @@ -119,7 +119,7 @@ func DecodeNestedError(d decoder.Decoder, input *modeldecoder.Input, batch *mode return modeldecoder.NewValidationErr(err) } event := input.Base - mapToErrorModel(&root.Error, input.RequestTime, &event) + mapToErrorModel(&root.Error, &event) *batch = append(*batch, event) return nil } @@ -137,7 +137,7 @@ func DecodeNestedMetricset(d decoder.Decoder, input *modeldecoder.Input, batch * return modeldecoder.NewValidationErr(err) } event := input.Base - mapToMetricsetModel(&root.Metricset, input.RequestTime, &event) + mapToMetricsetModel(&root.Metricset, &event) *batch = append(*batch, event) return nil } @@ -157,12 +157,12 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch } transaction := input.Base - mapToTransactionModel(&root.Transaction, input.RequestTime, &transaction) + mapToTransactionModel(&root.Transaction, &transaction) *batch = append(*batch, transaction) for _, m := range root.Transaction.Metricsets { metricset := input.Base - mapToMetricsetModel(&m, input.RequestTime, &metricset) + mapToMetricsetModel(&m, &metricset) metricset.Metricset.Transaction.Name = transaction.Transaction.Name metricset.Metricset.Transaction.Type = transaction.Transaction.Type *batch = append(*batch, metricset) @@ -171,7 +171,7 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch offset := len(*batch) for _, s := range root.Transaction.Spans { span := input.Base - mapToSpanModel(&s, input.RequestTime, &span) + mapToSpanModel(&s, &span) span.Span.TransactionID = transaction.Transaction.ID span.Span.TraceID = transaction.Transaction.TraceID *batch = append(*batch, span) @@ -187,7 +187,7 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch return nil } -func mapToErrorModel(from *errorEvent, reqTime time.Time, event *model.APMEvent) { +func mapToErrorModel(from *errorEvent, event *model.APMEvent) { out := &model.Error{} event.Error = out @@ -199,9 +199,11 @@ func mapToErrorModel(from *errorEvent, reqTime time.Time, event *model.APMEvent) // map errorEvent specific data if from.Context.IsSet() { - // metadata labels and context labels are merged only in the output model if len(from.Context.Tags) > 0 { - out.Labels = modeldecoderutil.NormalizeLabelValues(from.Context.Tags.Clone()) + event.Labels = modeldecoderutil.MergeLabels( + event.Labels, + modeldecoderutil.NormalizeLabelValues(from.Context.Tags), + ) } if from.Context.Request.IsSet() { out.HTTP = &model.HTTP{Request: &model.HTTPRequest{}} @@ -271,10 +273,8 @@ func mapToErrorModel(from *errorEvent, reqTime time.Time, event *model.APMEvent) if from.ParentID.IsSet() { out.ParentID = from.ParentID.Val } - if from.Timestamp.Val.IsZero() { - out.Timestamp = reqTime - } else { - out.Timestamp = from.Timestamp.Val + if !from.Timestamp.Val.IsZero() { + event.Timestamp = from.Timestamp.Val } if from.TraceID.IsSet() { out.TraceID = from.TraceID.Val @@ -383,13 +383,10 @@ func mapToMetadataModel(m *metadata, out *model.APMEvent) { } } -func mapToMetricsetModel(from *metricset, reqTime time.Time, event *model.APMEvent) { +func mapToMetricsetModel(from *metricset, event *model.APMEvent) { out := &model.Metricset{} event.Metricset = out - // set timestamp from requst time - out.Timestamp = reqTime - // map samples information if from.Samples.IsSet() { out.Samples = make(map[string]model.MetricsetSample) @@ -421,7 +418,10 @@ func mapToMetricsetModel(from *metricset, reqTime time.Time, event *model.APMEve } if len(from.Tags) > 0 { - out.Labels = modeldecoderutil.NormalizeLabelValues(from.Tags.Clone()) + event.Labels = modeldecoderutil.MergeLabels( + event.Labels, + modeldecoderutil.NormalizeLabelValues(from.Tags), + ) } // map span information if from.Span.Subtype.IsSet() { @@ -513,7 +513,7 @@ func mapToAgentModel(from contextServiceAgent, out *model.Agent) { } } -func mapToSpanModel(from *span, reqTime time.Time, event *model.APMEvent) { +func mapToSpanModel(from *span, event *model.APMEvent) { out := &model.Span{} event.Span = out @@ -599,7 +599,10 @@ func mapToSpanModel(from *span, reqTime time.Time, event *model.APMEvent) { } } if len(from.Context.Tags) > 0 { - out.Labels = modeldecoderutil.NormalizeLabelValues(from.Context.Tags.Clone()) + event.Labels = modeldecoderutil.MergeLabels( + event.Labels, + modeldecoderutil.NormalizeLabelValues(from.Context.Tags), + ) } if from.Duration.IsSet() { out.Duration = from.Duration.Val @@ -640,10 +643,13 @@ func mapToSpanModel(from *span, reqTime time.Time, event *model.APMEvent) { out.Sync = &val } if from.Start.IsSet() { - // adjust timestamp to be reqTime + start - reqTime = reqTime.Add(time.Duration(float64(time.Millisecond) * from.Start.Val)) + // event.Timestamp is initialized to the time the payload was + // received by apm-server; offset that by "start" milliseconds + // for RUM. + event.Timestamp = event.Timestamp.Add( + time.Duration(float64(time.Millisecond) * from.Start.Val), + ) } - out.Timestamp = reqTime } func mapToStracktraceModel(from []stacktraceFrame, out model.Stacktrace) { @@ -687,7 +693,7 @@ func mapToStracktraceModel(from []stacktraceFrame, out model.Stacktrace) { } } -func mapToTransactionModel(from *transaction, reqTime time.Time, event *model.APMEvent) { +func mapToTransactionModel(from *transaction, event *model.APMEvent) { out := &model.Transaction{} event.Transaction = out @@ -702,9 +708,11 @@ func mapToTransactionModel(from *transaction, reqTime time.Time, event *model.AP if len(from.Context.Custom) > 0 { out.Custom = modeldecoderutil.NormalizeLabelValues(from.Context.Custom.Clone()) } - // metadata labels and context labels are merged when transforming the output model if len(from.Context.Tags) > 0 { - out.Labels = modeldecoderutil.NormalizeLabelValues(from.Context.Tags.Clone()) + event.Labels = modeldecoderutil.MergeLabels( + event.Labels, + modeldecoderutil.NormalizeLabelValues(from.Context.Tags), + ) } if from.Context.Request.IsSet() { out.HTTP = &model.HTTP{Request: &model.HTTPRequest{}} @@ -797,7 +805,6 @@ func mapToTransactionModel(from *transaction, reqTime time.Time, event *model.AP started := from.SpanCount.Started.Val out.SpanCount.Started = &started } - out.Timestamp = reqTime if from.TraceID.IsSet() { out.TraceID = from.TraceID.Val } diff --git a/model/modeldecoder/rumv3/error_test.go b/model/modeldecoder/rumv3/error_test.go index f4c39f4912a..0c6ff01625b 100644 --- a/model/modeldecoder/rumv3/error_test.go +++ b/model/modeldecoder/rumv3/error_test.go @@ -46,23 +46,25 @@ func TestDecodeNestedError(t *testing.T) { t.Run("decode", func(t *testing.T) { now := time.Now() eventBase := initializedMetadata() - input := modeldecoder.Input{RequestTime: now, Base: eventBase} + eventBase.Timestamp = now + input := modeldecoder.Input{Base: eventBase} str := `{"e":{"id":"a-b-c","timestamp":1599996822281000,"log":{"mg":"abc"}}}` dec := decoder.NewJSONDecoder(strings.NewReader(str)) var batch model.Batch require.NoError(t, DecodeNestedError(dec, &input, &batch)) require.Len(t, batch, 1) require.NotNil(t, batch[0].Error) - assert.Equal(t, "2020-09-13 11:33:42.281 +0000 UTC", batch[0].Error.Timestamp.String()) - modeldecodertest.AssertStructValues(t, &batch[0], metadataExceptions(), modeldecodertest.DefaultValues()) + defaultValues := modeldecodertest.DefaultValues() + defaultValues.Update(time.Unix(1599996822, 281000000).UTC()) + modeldecodertest.AssertStructValues(t, &batch[0], metadataExceptions(), defaultValues) - // if no timestamp is provided, fall back to request time - input = modeldecoder.Input{RequestTime: now} + // if no timestamp is provided, leave base event timestamp unmodified + input = modeldecoder.Input{Base: eventBase} str = `{"e":{"id":"a-b-c","log":{"mg":"abc"}}}` dec = decoder.NewJSONDecoder(strings.NewReader(str)) batch = model.Batch{} require.NoError(t, DecodeNestedError(dec, &input, &batch)) - assert.Equal(t, now, batch[0].Error.Timestamp) + assert.Equal(t, now, batch[0].Timestamp) // test decode err := DecodeNestedError(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &batch) @@ -85,7 +87,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { out := initializedMetadata() otherVal := modeldecodertest.NonDefaultValues() modeldecodertest.SetStructValues(&input, otherVal) - mapToErrorModel(&input, time.Now(), &out) + mapToErrorModel(&input, &out) input.Reset() // ensure event Metadata are updated where expected @@ -95,11 +97,10 @@ func TestDecodeMapToErrorModel(t *testing.T) { // do not overwrite client.ip if already set in metadata ip := modeldecodertest.DefaultValues().IP assert.Equal(t, ip, out.Client.IP, out.Client.IP.String()) - // metadata labels and event labels should not be merged - mLabels := common.MapStr{"init0": "init", "init1": "init", "init2": "init"} - tLabels := common.MapStr{"overwritten0": "overwritten", "overwritten1": "overwritten"} - assert.Equal(t, mLabels, out.Labels) - assert.Equal(t, tLabels, out.Error.Labels) + assert.Equal(t, common.MapStr{ + "init0": "init", "init1": "init", "init2": "init", + "overwritten0": "overwritten", "overwritten1": "overwritten", + }, out.Labels) // service and user values should be set modeldecodertest.AssertStructValues(t, &out.Service, metadataExceptions("Node", "Agent.EphemeralID"), otherVal) modeldecodertest.AssertStructValues(t, &out.User, metadataExceptions(), otherVal) @@ -145,25 +146,28 @@ func TestDecodeMapToErrorModel(t *testing.T) { var input errorEvent var out1, out2 model.APMEvent reqTime := time.Now().Add(time.Second) + out1.Timestamp = reqTime defaultVal := modeldecodertest.DefaultValues() modeldecodertest.SetStructValues(&input, defaultVal) - mapToErrorModel(&input, reqTime, &out1) + mapToErrorModel(&input, &out1) input.Reset() modeldecodertest.AssertStructValues(t, out1.Error, exceptions, defaultVal) - // set Timestamp to requestTime if eventTime is zero + // leave event timestamp unmodified if eventTime is zero defaultVal.Update(time.Time{}) + out1.Timestamp = reqTime modeldecodertest.SetStructValues(&input, defaultVal) - mapToErrorModel(&input, reqTime, &out1) + mapToErrorModel(&input, &out1) defaultVal.Update(reqTime) input.Reset() modeldecodertest.AssertStructValues(t, out1.Error, exceptions, defaultVal) // reuse input model for different event // ensure memory is not shared by reusing input model + out2.Timestamp = reqTime otherVal := modeldecodertest.NonDefaultValues() modeldecodertest.SetStructValues(&input, otherVal) - mapToErrorModel(&input, reqTime, &out2) + mapToErrorModel(&input, &out2) modeldecodertest.AssertStructValues(t, out2.Error, exceptions, otherVal) modeldecodertest.AssertStructValues(t, out1.Error, exceptions, defaultVal) }) @@ -172,7 +176,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { var input errorEvent input.Context.Page.URL.Set("https://my.site.test:9201") var out model.APMEvent - mapToErrorModel(&input, time.Now(), &out) + mapToErrorModel(&input, &out) assert.Equal(t, "https://my.site.test:9201", out.Error.Page.URL.Full) assert.Equal(t, "https://my.site.test:9201", out.Error.URL.Full) assert.Equal(t, 9201, out.Error.Page.URL.Port) @@ -183,7 +187,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { var input errorEvent input.Context.Page.Referer.Set("https://my.site.test:9201") var out model.APMEvent - mapToErrorModel(&input, time.Now(), &out) + mapToErrorModel(&input, &out) assert.Equal(t, "https://my.site.test:9201", out.Error.Page.Referer) assert.Equal(t, "https://my.site.test:9201", out.Error.HTTP.Request.Referrer) }) @@ -192,7 +196,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { var input errorEvent input.Log.Message.Set("log message") var out model.APMEvent - mapToErrorModel(&input, time.Now(), &out) + mapToErrorModel(&input, &out) require.NotNil(t, out.Error.Log.LoggerName) assert.Equal(t, "default", out.Error.Log.LoggerName) }) @@ -202,7 +206,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { input.Context.Request.Headers.Set(http.Header{"a": []string{"b"}, "c": []string{"d", "e"}}) input.Context.Response.Headers.Set(http.Header{"f": []string{"g"}}) var out model.APMEvent - mapToErrorModel(&input, time.Now(), &out) + mapToErrorModel(&input, &out) assert.Equal(t, common.MapStr{"a": []string{"b"}, "c": []string{"d", "e"}}, out.Error.HTTP.Request.Headers) assert.Equal(t, common.MapStr{"f": []string{"g"}}, out.Error.HTTP.Response.Headers) }) @@ -211,7 +215,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { var input errorEvent var out model.APMEvent input.Exception.Code.Set(123.456) - mapToErrorModel(&input, time.Now(), &out) + mapToErrorModel(&input, &out) assert.Equal(t, "123", out.Error.Exception.Code) }) } diff --git a/model/modeldecoder/rumv3/metricset_test.go b/model/modeldecoder/rumv3/metricset_test.go index dd2ed4d948a..fb4cc0198fd 100644 --- a/model/modeldecoder/rumv3/metricset_test.go +++ b/model/modeldecoder/rumv3/metricset_test.go @@ -44,7 +44,8 @@ func TestDecodeNestedMetricset(t *testing.T) { t.Run("decode", func(t *testing.T) { now := time.Now() eventBase := initializedMetadata() - input := modeldecoder.Input{RequestTime: now, Base: eventBase} + eventBase.Timestamp = now + input := modeldecoder.Input{Base: eventBase} str := `{"me":{"sa":{"xds":{"v":2048}}}}` dec := decoder.NewJSONDecoder(strings.NewReader(str)) var batch model.Batch @@ -52,8 +53,9 @@ func TestDecodeNestedMetricset(t *testing.T) { require.Len(t, batch, 1) require.NotNil(t, batch[0].Metricset) assert.Equal(t, map[string]model.MetricsetSample{"transaction.duration.sum.us": {Value: 2048}}, batch[0].Metricset.Samples) - assert.Equal(t, now, batch[0].Metricset.Timestamp) - modeldecodertest.AssertStructValues(t, &batch[0], metadataExceptions(), modeldecodertest.DefaultValues()) + defaultValues := modeldecodertest.DefaultValues() + defaultValues.Update(now) + modeldecodertest.AssertStructValues(t, &batch[0], metadataExceptions(), defaultValues) // invalid type err := DecodeNestedMetricset(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &batch) @@ -102,19 +104,21 @@ func TestDecodeMapToMetricsetModel(t *testing.T) { var input metricset var out1, out2 model.APMEvent reqTime := time.Now().Add(time.Second) + out1.Timestamp = reqTime defaultVal := modeldecodertest.DefaultValues() modeldecodertest.SetStructValues(&input, defaultVal) - mapToMetricsetModel(&input, reqTime, &out1) + mapToMetricsetModel(&input, &out1) input.Reset() - // metricset timestamp is always set to request time + // metricset timestamp is always set to base event timestamp defaultVal.Update(reqTime) modeldecodertest.AssertStructValues(t, out1.Metricset, exceptions, defaultVal) assert.Equal(t, samples(defaultVal.Float), out1.Metricset.Samples) // ensure memory is not shared by reusing input model + out2.Timestamp = reqTime otherVal := modeldecodertest.NonDefaultValues() modeldecodertest.SetStructValues(&input, otherVal) - mapToMetricsetModel(&input, reqTime, &out2) + mapToMetricsetModel(&input, &out2) otherVal.Update(reqTime) modeldecodertest.AssertStructValues(t, out2.Metricset, exceptions, otherVal) assert.Equal(t, samples(otherVal.Float), out2.Metricset.Samples) diff --git a/model/modeldecoder/rumv3/transaction_test.go b/model/modeldecoder/rumv3/transaction_test.go index 8adbffaaf51..0a5fd728ad7 100644 --- a/model/modeldecoder/rumv3/transaction_test.go +++ b/model/modeldecoder/rumv3/transaction_test.go @@ -47,7 +47,8 @@ func TestDecodeNestedTransaction(t *testing.T) { t.Run("decode", func(t *testing.T) { now := time.Now() eventBase := initializedMetadata() - input := modeldecoder.Input{RequestTime: now, Base: eventBase} + eventBase.Timestamp = now + input := modeldecoder.Input{Base: eventBase} str := `{"x":{"n":"tr-a","d":100,"id":"100","tid":"1","t":"request","yc":{"sd":2},"y":[{"n":"a","d":10,"t":"http","id":"123","s":20}],"me":[{"sa":{"xds":{"v":2048}}},{"sa":{"ysc":{"v":5}}}]}}` dec := decoder.NewJSONDecoder(strings.NewReader(str)) var batch model.Batch @@ -57,29 +58,32 @@ func TestDecodeNestedTransaction(t *testing.T) { require.NotNil(t, batch[1].Metricset) require.NotNil(t, batch[2].Metricset) require.NotNil(t, batch[3].Span) - for _, event := range batch { - modeldecodertest.AssertStructValues(t, &event, metadataExceptions(), modeldecodertest.DefaultValues()) - } assert.Equal(t, "request", batch[0].Transaction.Type) // fall back to request time - assert.Equal(t, now, batch[0].Transaction.Timestamp) + assert.Equal(t, now, batch[0].Timestamp) // ensure nested metricsets are decoded assert.Equal(t, map[string]model.MetricsetSample{"transaction.duration.sum.us": {Value: 2048}}, batch[1].Metricset.Samples) - m := batch[2].Metricset - assert.Equal(t, map[string]model.MetricsetSample{"span.self_time.count": {Value: 5}}, m.Samples) - assert.Equal(t, "tr-a", m.Transaction.Name) - assert.Equal(t, "request", m.Transaction.Type) - assert.Equal(t, now, m.Timestamp) + assert.Equal(t, map[string]model.MetricsetSample{"span.self_time.count": {Value: 5}}, batch[2].Metricset.Samples) + assert.Equal(t, "tr-a", batch[2].Metricset.Transaction.Name) + assert.Equal(t, "request", batch[2].Metricset.Transaction.Type) + assert.Equal(t, now, batch[2].Timestamp) // ensure nested spans are decoded - sp := batch[3].Span start := time.Duration(20 * 1000 * 1000) - assert.Equal(t, now.Add(start), sp.Timestamp) //add start to timestamp - assert.Equal(t, "100", sp.TransactionID) - assert.Equal(t, "1", sp.TraceID) - assert.Equal(t, "100", sp.ParentID) + assert.Equal(t, now.Add(start), batch[3].Timestamp) //add start to timestamp + assert.Equal(t, "100", batch[3].Span.TransactionID) + assert.Equal(t, "1", batch[3].Span.TraceID) + assert.Equal(t, "100", batch[3].Span.ParentID) + + for _, event := range batch { + modeldecodertest.AssertStructValues( + t, &event, + metadataExceptions("Timestamp"), // timestamp checked above + modeldecodertest.DefaultValues(), + ) + } err := DecodeNestedTransaction(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &batch) require.Error(t, err) @@ -88,7 +92,8 @@ func TestDecodeNestedTransaction(t *testing.T) { t.Run("decode-marks", func(t *testing.T) { now := time.Now() - input := modeldecoder.Input{RequestTime: now} + eventBase := model.APMEvent{Timestamp: now} + input := modeldecoder.Input{Base: eventBase} str := `{"x":{"d":100,"id":"100","tid":"1","t":"request","yc":{"sd":2},"k":{"a":{"dc":0.1,"di":0.2,"ds":0.3,"de":0.4,"fb":0.5,"fp":0.6,"lp":0.7,"long":0.8},"nt":{"fs":0.1,"ls":0.2,"le":0.3,"cs":0.4,"ce":0.5,"qs":0.6,"rs":0.7,"re":0.8,"dl":0.9,"di":0.11,"ds":0.21,"de":0.31,"dc":0.41,"es":0.51,"ee":6,"long":0.99},"long":{"long":0.1}}}}` dec := decoder.NewJSONDecoder(strings.NewReader(str)) var batch model.Batch @@ -146,17 +151,16 @@ func TestDecodeMapToTransactionModel(t *testing.T) { out := initializedMetadata() otherVal := modeldecodertest.NonDefaultValues() modeldecodertest.SetStructValues(&input, otherVal) - mapToTransactionModel(&input, time.Now(), &out) + mapToTransactionModel(&input, &out) // user-agent should be set to context request header values assert.Equal(t, "d, e", out.UserAgent.Original) // do not overwrite client.ip if already set in metadata assert.Equal(t, localhostIP, out.Client.IP, out.Client.IP.String()) - // metadata labels and transaction labels should not be merged - mLabels := common.MapStr{"init0": "init", "init1": "init", "init2": "init"} - assert.Equal(t, mLabels, out.Labels) - tLabels := common.MapStr{"overwritten0": "overwritten", "overwritten1": "overwritten"} - assert.Equal(t, tLabels, out.Transaction.Labels) + assert.Equal(t, common.MapStr{ + "init0": "init", "init1": "init", "init2": "init", + "overwritten0": "overwritten", "overwritten1": "overwritten", + }, out.Labels) // service values should be set modeldecodertest.AssertStructValues(t, &out.Service, metadataExceptions("Node", "Agent.EphemeralID"), otherVal) // user values should be set @@ -168,7 +172,7 @@ func TestDecodeMapToTransactionModel(t *testing.T) { var input transaction var out model.APMEvent input.Context.User.Email.Set("test@user.com") - mapToTransactionModel(&input, time.Now(), &out) + mapToTransactionModel(&input, &out) assert.Equal(t, "test@user.com", out.User.Email) assert.Zero(t, out.User.ID) assert.Zero(t, out.User.Name) @@ -199,18 +203,20 @@ func TestDecodeMapToTransactionModel(t *testing.T) { var input transaction var out1, out2 model.APMEvent reqTime := time.Now().Add(time.Second) + out1.Timestamp = reqTime defaultVal := modeldecodertest.DefaultValues() modeldecodertest.SetStructValues(&input, defaultVal) - mapToTransactionModel(&input, reqTime, &out1) + mapToTransactionModel(&input, &out1) input.Reset() - defaultVal.Update(reqTime) //for rumv3 the timestamp is always set from the request time + defaultVal.Update(reqTime) //for rumv3 the timestamp is always set from the base event modeldecodertest.AssertStructValues(t, out1.Transaction, exceptions, defaultVal) // ensure memory is not shared by reusing input model + out2.Timestamp = reqTime otherVal := modeldecodertest.NonDefaultValues() - otherVal.Update(reqTime) //for rumv3 the timestamp is always set from the request time + otherVal.Update(reqTime) //for rumv3 the timestamp is always set from the base event modeldecodertest.SetStructValues(&input, otherVal) - mapToTransactionModel(&input, reqTime, &out2) + mapToTransactionModel(&input, &out2) modeldecodertest.AssertStructValues(t, out2.Transaction, exceptions, otherVal) modeldecodertest.AssertStructValues(t, out1.Transaction, exceptions, defaultVal) }) @@ -261,20 +267,22 @@ func TestDecodeMapToTransactionModel(t *testing.T) { var input span var out1, out2 model.APMEvent reqTime := time.Now().Add(time.Second) + out1.Timestamp = reqTime defaultVal := modeldecodertest.DefaultValues() modeldecodertest.SetStructValues(&input, defaultVal) - mapToSpanModel(&input, reqTime, &out1) + mapToSpanModel(&input, &out1) input.Reset() defaultStart := time.Duration(defaultVal.Float * 1000 * 1000) - defaultVal.Update(reqTime.Add(defaultStart)) //for rumv3 the timestamp is always set from the request time + defaultVal.Update(reqTime.Add(defaultStart)) //for rumv3 the timestamp is always set from the base event modeldecodertest.AssertStructValues(t, out1.Span, exceptions, defaultVal) // ensure memory is not shared by reusing input model + out2.Timestamp = reqTime otherVal := modeldecodertest.NonDefaultValues() modeldecodertest.SetStructValues(&input, otherVal) - mapToSpanModel(&input, reqTime, &out2) + mapToSpanModel(&input, &out2) otherStart := time.Duration(otherVal.Float * 1000 * 1000) - otherVal.Update(reqTime.Add(otherStart)) //for rumv3 the timestamp is always set from the request time + otherVal.Update(reqTime.Add(otherStart)) //for rumv3 the timestamp is always set from the base event modeldecodertest.AssertStructValues(t, out2.Span, exceptions, otherVal) modeldecodertest.AssertStructValues(t, out1.Span, exceptions, defaultVal) }) @@ -286,22 +294,22 @@ func TestDecodeMapToTransactionModel(t *testing.T) { // set from input, ignore status code input.Outcome.Set("failure") input.Context.HTTP.StatusCode.Set(http.StatusPermanentRedirect) - mapToSpanModel(&input, time.Now(), &out) + mapToSpanModel(&input, &out) assert.Equal(t, "failure", out.Span.Outcome) // derive from span fields - success input.Outcome.Reset() input.Context.HTTP.StatusCode.Set(http.StatusPermanentRedirect) - mapToSpanModel(&input, time.Now(), &out) + mapToSpanModel(&input, &out) assert.Equal(t, "success", out.Span.Outcome) // derive from span fields - failure input.Outcome.Reset() input.Context.HTTP.StatusCode.Set(http.StatusBadRequest) - mapToSpanModel(&input, time.Now(), &out) + mapToSpanModel(&input, &out) assert.Equal(t, "failure", out.Span.Outcome) // derive from span fields - unknown input.Outcome.Reset() input.Context.HTTP.StatusCode.Reset() - mapToSpanModel(&input, time.Now(), &out) + mapToSpanModel(&input, &out) assert.Equal(t, "unknown", out.Span.Outcome) }) @@ -312,22 +320,22 @@ func TestDecodeMapToTransactionModel(t *testing.T) { // set from input, ignore status code input.Outcome.Set("failure") input.Context.Response.StatusCode.Set(http.StatusBadRequest) - mapToTransactionModel(&input, time.Now(), &out) + mapToTransactionModel(&input, &out) assert.Equal(t, "failure", out.Transaction.Outcome) // derive from span fields - success input.Outcome.Reset() input.Context.Response.StatusCode.Set(http.StatusBadRequest) - mapToTransactionModel(&input, time.Now(), &out) + mapToTransactionModel(&input, &out) assert.Equal(t, "success", out.Transaction.Outcome) // derive from span fields - failure input.Outcome.Reset() input.Context.Response.StatusCode.Set(http.StatusInternalServerError) - mapToTransactionModel(&input, time.Now(), &out) + mapToTransactionModel(&input, &out) assert.Equal(t, "failure", out.Transaction.Outcome) // derive from span fields - unknown input.Outcome.Reset() input.Context.Response.StatusCode.Reset() - mapToTransactionModel(&input, time.Now(), &out) + mapToTransactionModel(&input, &out) assert.Equal(t, "unknown", out.Transaction.Outcome) }) @@ -335,7 +343,7 @@ func TestDecodeMapToTransactionModel(t *testing.T) { var input transaction input.Context.Page.URL.Set("https://my.site.test:9201") var out model.APMEvent - mapToTransactionModel(&input, time.Now(), &out) + mapToTransactionModel(&input, &out) assert.Equal(t, "https://my.site.test:9201", out.Transaction.Page.URL.Full) assert.Equal(t, "https://my.site.test:9201", out.Transaction.URL.Full) assert.Equal(t, 9201, out.Transaction.Page.URL.Port) @@ -346,7 +354,7 @@ func TestDecodeMapToTransactionModel(t *testing.T) { var input transaction input.Context.Page.Referer.Set("https://my.site.test:9201") var out model.APMEvent - mapToTransactionModel(&input, time.Now(), &out) + mapToTransactionModel(&input, &out) assert.Equal(t, "https://my.site.test:9201", out.Transaction.Page.Referer) assert.Equal(t, "https://my.site.test:9201", out.Transaction.HTTP.Request.Referrer) }) @@ -356,7 +364,7 @@ func TestDecodeMapToTransactionModel(t *testing.T) { input.Context.Request.Headers.Set(http.Header{"a": []string{"b"}, "c": []string{"d", "e"}}) input.Context.Response.Headers.Set(http.Header{"f": []string{"g"}}) var out model.APMEvent - mapToTransactionModel(&input, time.Now(), &out) + mapToTransactionModel(&input, &out) assert.Equal(t, common.MapStr{"a": []string{"b"}, "c": []string{"d", "e"}}, out.Transaction.HTTP.Request.Headers) assert.Equal(t, common.MapStr{"f": []string{"g"}}, out.Transaction.HTTP.Response.Headers) }) @@ -366,12 +374,12 @@ func TestDecodeMapToTransactionModel(t *testing.T) { var out model.APMEvent modeldecodertest.SetStructValues(&input, modeldecodertest.DefaultValues()) input.Session.ID.Reset() - mapToTransactionModel(&input, time.Now(), &out) + mapToTransactionModel(&input, &out) assert.Equal(t, model.TransactionSession{}, out.Transaction.Session) input.Session.ID.Set("session_id") input.Session.Sequence.Set(123) - mapToTransactionModel(&input, time.Now(), &out) + mapToTransactionModel(&input, &out) assert.Equal(t, model.TransactionSession{ ID: "session_id", Sequence: 123, diff --git a/model/modeldecoder/v2/decoder.go b/model/modeldecoder/v2/decoder.go index beaa227d453..960a0135524 100644 --- a/model/modeldecoder/v2/decoder.go +++ b/model/modeldecoder/v2/decoder.go @@ -137,7 +137,7 @@ func DecodeNestedError(d decoder.Decoder, input *modeldecoder.Input, batch *mode return modeldecoder.NewValidationErr(err) } event := input.Base - mapToErrorModel(&root.Error, input.RequestTime, input.Config, &event) + mapToErrorModel(&root.Error, input.Config, &event) *batch = append(*batch, event) return err } @@ -156,7 +156,7 @@ func DecodeNestedMetricset(d decoder.Decoder, input *modeldecoder.Input, batch * return modeldecoder.NewValidationErr(err) } event := input.Base - mapToMetricsetModel(&root.Metricset, input.RequestTime, input.Config, &event) + mapToMetricsetModel(&root.Metricset, input.Config, &event) *batch = append(*batch, event) return err } @@ -175,7 +175,7 @@ func DecodeNestedSpan(d decoder.Decoder, input *modeldecoder.Input, batch *model return modeldecoder.NewValidationErr(err) } event := input.Base - mapToSpanModel(&root.Span, input.RequestTime, input.Config, &event) + mapToSpanModel(&root.Span, input.Config, &event) *batch = append(*batch, event) return err } @@ -194,7 +194,7 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch return modeldecoder.NewValidationErr(err) } event := input.Base - mapToTransactionModel(&root.Transaction, input.RequestTime, input.Config, &event) + mapToTransactionModel(&root.Transaction, input.Config, &event) *batch = append(*batch, event) return err } @@ -237,7 +237,7 @@ func mapToClientModel(from contextRequest, out *model.Client) { } } -func mapToErrorModel(from *errorEvent, reqTime time.Time, config modeldecoder.Config, event *model.APMEvent) { +func mapToErrorModel(from *errorEvent, config modeldecoder.Config, event *model.APMEvent) { out := &model.Error{} event.Error = out @@ -254,9 +254,11 @@ func mapToErrorModel(from *errorEvent, reqTime time.Time, config modeldecoder.Co if config.Experimental && from.Context.Experimental.IsSet() { out.Experimental = from.Context.Experimental.Val } - // metadata labels and context labels are merged only in the output model if len(from.Context.Tags) > 0 { - out.Labels = modeldecoderutil.NormalizeLabelValues(from.Context.Tags.Clone()) + event.Labels = modeldecoderutil.MergeLabels( + event.Labels, + modeldecoderutil.NormalizeLabelValues(from.Context.Tags), + ) } if from.Context.Request.IsSet() { out.HTTP = &model.HTTP{Request: &model.HTTPRequest{}} @@ -331,10 +333,8 @@ func mapToErrorModel(from *errorEvent, reqTime time.Time, config modeldecoder.Co if from.ParentID.IsSet() { out.ParentID = from.ParentID.Val } - if from.Timestamp.Val.IsZero() { - out.Timestamp = reqTime - } else { - out.Timestamp = from.Timestamp.Val + if !from.Timestamp.Val.IsZero() { + event.Timestamp = from.Timestamp.Val } if from.TraceID.IsSet() { out.TraceID = from.TraceID.Val @@ -533,15 +533,12 @@ func mapToMetadataModel(from *metadata, out *model.APMEvent) { } } -func mapToMetricsetModel(from *metricset, reqTime time.Time, config modeldecoder.Config, event *model.APMEvent) { +func mapToMetricsetModel(from *metricset, config modeldecoder.Config, event *model.APMEvent) { out := &model.Metricset{} event.Metricset = out - // set timestamp from input or requst time - if from.Timestamp.Val.IsZero() { - out.Timestamp = reqTime - } else { - out.Timestamp = from.Timestamp.Val + if !from.Timestamp.Val.IsZero() { + event.Timestamp = from.Timestamp.Val } // map samples information @@ -569,7 +566,10 @@ func mapToMetricsetModel(from *metricset, reqTime time.Time, config modeldecoder } if len(from.Tags) > 0 { - out.Labels = modeldecoderutil.NormalizeLabelValues(from.Tags.Clone()) + event.Labels = modeldecoderutil.MergeLabels( + event.Labels, + modeldecoderutil.NormalizeLabelValues(from.Tags), + ) } // map span information if from.Span.Subtype.IsSet() { @@ -729,7 +729,7 @@ func mapToAgentModel(from contextServiceAgent, out *model.Agent) { } } -func mapToSpanModel(from *span, reqTime time.Time, config modeldecoder.Config, event *model.APMEvent) { +func mapToSpanModel(from *span, config modeldecoder.Config, event *model.APMEvent) { out := &model.Span{} event.Span = out @@ -882,7 +882,10 @@ func mapToSpanModel(from *span, reqTime time.Time, config modeldecoder.Config, e mapToAgentModel(from.Context.Service.Agent, &event.Agent) } if len(from.Context.Tags) > 0 { - out.Labels = modeldecoderutil.NormalizeLabelValues(from.Context.Tags.Clone()) + event.Labels = modeldecoderutil.MergeLabels( + event.Labels, + modeldecoderutil.NormalizeLabelValues(from.Context.Tags), + ) } if from.Duration.IsSet() { out.Duration = from.Duration.Val @@ -925,15 +928,15 @@ func mapToSpanModel(from *span, reqTime time.Time, config modeldecoder.Config, e val := from.Sync.Val out.Sync = &val } - if from.Timestamp.IsSet() && !from.Timestamp.Val.IsZero() { - out.Timestamp = from.Timestamp.Val - } else { - timestamp := reqTime - if from.Start.IsSet() { - // adjust timestamp to be reqTime + start - timestamp = timestamp.Add(time.Duration(float64(time.Millisecond) * from.Start.Val)) - } - out.Timestamp = timestamp + if !from.Timestamp.Val.IsZero() { + event.Timestamp = from.Timestamp.Val + } else if from.Start.IsSet() { + // event.Timestamp is initialized to the time the payload was + // received by apm-server; offset that by "start" milliseconds + // for RUM. + event.Timestamp = event.Timestamp.Add( + time.Duration(float64(time.Millisecond) * from.Start.Val), + ) } if from.TraceID.IsSet() { out.TraceID = from.TraceID.Val @@ -991,7 +994,7 @@ func mapToStracktraceModel(from []stacktraceFrame, out model.Stacktrace) { } } -func mapToTransactionModel(from *transaction, reqTime time.Time, config modeldecoder.Config, event *model.APMEvent) { +func mapToTransactionModel(from *transaction, config modeldecoder.Config, event *model.APMEvent) { out := &model.Transaction{} event.Transaction = out @@ -1011,9 +1014,11 @@ func mapToTransactionModel(from *transaction, reqTime time.Time, config modeldec if config.Experimental && from.Context.Experimental.IsSet() { out.Experimental = from.Context.Experimental.Val } - // metadata labels and context labels are merged when transforming the output model if len(from.Context.Tags) > 0 { - out.Labels = modeldecoderutil.NormalizeLabelValues(from.Context.Tags.Clone()) + event.Labels = modeldecoderutil.MergeLabels( + event.Labels, + modeldecoderutil.NormalizeLabelValues(from.Context.Tags), + ) } if from.Context.Message.IsSet() { out.Message = &model.Message{} @@ -1129,10 +1134,8 @@ func mapToTransactionModel(from *transaction, reqTime time.Time, config modeldec started := from.SpanCount.Started.Val out.SpanCount.Started = &started } - if from.Timestamp.Val.IsZero() { - out.Timestamp = reqTime - } else { - out.Timestamp = from.Timestamp.Val + if !from.Timestamp.Val.IsZero() { + event.Timestamp = from.Timestamp.Val } if from.TraceID.IsSet() { out.TraceID = from.TraceID.Val diff --git a/model/modeldecoder/v2/error_test.go b/model/modeldecoder/v2/error_test.go index 59130e5bcc7..b516b55f8f8 100644 --- a/model/modeldecoder/v2/error_test.go +++ b/model/modeldecoder/v2/error_test.go @@ -48,7 +48,8 @@ func TestDecodeNestedError(t *testing.T) { now := time.Now() defaultVal := modeldecodertest.DefaultValues() _, eventBase := initializedInputMetadata(defaultVal) - input := modeldecoder.Input{RequestTime: now, Base: eventBase, Config: modeldecoder.Config{Experimental: true}} + eventBase.Timestamp = now + input := modeldecoder.Input{Base: eventBase, Config: modeldecoder.Config{Experimental: true}} str := `{"error":{"id":"a-b-c","timestamp":1599996822281000,"log":{"message":"abc"},"context":{"experimental":"exp"}}}` dec := decoder.NewJSONDecoder(strings.NewReader(str)) var batch model.Batch @@ -56,18 +57,18 @@ func TestDecodeNestedError(t *testing.T) { require.Len(t, batch, 1) require.NotNil(t, batch[0].Error) assert.Equal(t, "exp", batch[0].Error.Experimental) - assert.Equal(t, "2020-09-13 11:33:42.281 +0000 UTC", batch[0].Error.Timestamp.String()) + defaultVal.Update(time.Unix(1599996822, 281000000).UTC()) modeldecodertest.AssertStructValues(t, &batch[0], isMetadataException, defaultVal) - input = modeldecoder.Input{RequestTime: now, Config: modeldecoder.Config{Experimental: false}} + input = modeldecoder.Input{Base: eventBase, Config: modeldecoder.Config{Experimental: false}} str = `{"error":{"id":"a-b-c","log":{"message":"abc"},"context":{"experimental":"exp"}}}` dec = decoder.NewJSONDecoder(strings.NewReader(str)) batch = model.Batch{} require.NoError(t, DecodeNestedError(dec, &input, &batch)) // experimental should only be set if allowed by configuration assert.Nil(t, batch[0].Error.Experimental) - // if no timestamp is provided, fall back to request time - assert.Equal(t, now, batch[0].Error.Timestamp) + // if no timestamp is provided, leave base event time unmodified + assert.Equal(t, now, batch[0].Timestamp) err := DecodeNestedError(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &batch) require.Error(t, err) @@ -93,7 +94,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { _, out := initializedInputMetadata(modeldecodertest.DefaultValues()) otherVal := modeldecodertest.NonDefaultValues() modeldecodertest.SetStructValues(&input, otherVal) - mapToErrorModel(&input, time.Now(), modeldecoder.Config{Experimental: true}, &out) + mapToErrorModel(&input, modeldecoder.Config{Experimental: true}, &out) input.Reset() // ensure event Metadata are updated where expected @@ -102,11 +103,10 @@ func TestDecodeMapToErrorModel(t *testing.T) { // do not overwrite client.ip if already set in metadata ip := modeldecodertest.DefaultValues().IP assert.Equal(t, ip, out.Client.IP, out.Client.IP.String()) - // metadata labels and event labels should not be merged - mLabels := common.MapStr{"init0": "init", "init1": "init", "init2": "init"} - tLabels := common.MapStr{"overwritten0": "overwritten", "overwritten1": "overwritten"} - assert.Equal(t, mLabels, out.Labels) - assert.Equal(t, tLabels, out.Error.Labels) + assert.Equal(t, common.MapStr{ + "init0": "init", "init1": "init", "init2": "init", + "overwritten0": "overwritten", "overwritten1": "overwritten", + }, out.Labels) // service and user values should be set modeldecodertest.AssertStructValues(t, &out.Service, exceptions, otherVal) modeldecodertest.AssertStructValues(t, &out.User, exceptions, otherVal) @@ -118,7 +118,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { input.Context.Request.Headers.Set(http.Header{}) input.Context.Request.Headers.Val.Add("x-real-ip", gatewayIP.String()) input.Context.Request.Socket.RemoteAddress.Set(randomIP.String()) - mapToErrorModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToErrorModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, gatewayIP, out.Client.IP, out.Client.IP.String()) }) @@ -126,7 +126,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { var input errorEvent var out model.APMEvent input.Context.Request.Socket.RemoteAddress.Set(randomIP.String()) - mapToErrorModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToErrorModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, randomIP, out.Client.IP, out.Client.IP.String()) }) @@ -160,18 +160,9 @@ func TestDecodeMapToErrorModel(t *testing.T) { } var input errorEvent var out1, out2 model.APMEvent - reqTime := time.Now().Add(time.Second) defaultVal := modeldecodertest.DefaultValues() modeldecodertest.SetStructValues(&input, defaultVal) - mapToErrorModel(&input, reqTime, modeldecoder.Config{Experimental: true}, &out1) - input.Reset() - modeldecodertest.AssertStructValues(t, out1.Error, exceptions, defaultVal) - - // set Timestamp to requestTime if eventTime is zero - defaultVal.Update(time.Time{}) - modeldecodertest.SetStructValues(&input, defaultVal) - mapToErrorModel(&input, reqTime, modeldecoder.Config{Experimental: true}, &out1) - defaultVal.Update(reqTime) + mapToErrorModel(&input, modeldecoder.Config{Experimental: true}, &out1) input.Reset() modeldecodertest.AssertStructValues(t, out1.Error, exceptions, defaultVal) @@ -179,7 +170,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { // ensure memory is not shared by reusing input model otherVal := modeldecodertest.NonDefaultValues() modeldecodertest.SetStructValues(&input, otherVal) - mapToErrorModel(&input, reqTime, modeldecoder.Config{Experimental: true}, &out2) + mapToErrorModel(&input, modeldecoder.Config{Experimental: true}, &out2) modeldecodertest.AssertStructValues(t, out2.Error, exceptions, otherVal) modeldecodertest.AssertStructValues(t, out1.Error, exceptions, defaultVal) }) @@ -189,7 +180,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { input.Context.Request.Headers.Set(http.Header{"a": []string{"b"}, "c": []string{"d", "e"}}) input.Context.Response.Headers.Set(http.Header{"f": []string{"g"}}) var out model.APMEvent - mapToErrorModel(&input, time.Now(), modeldecoder.Config{Experimental: false}, &out) + mapToErrorModel(&input, modeldecoder.Config{Experimental: false}, &out) assert.Equal(t, common.MapStr{"a": []string{"b"}, "c": []string{"d", "e"}}, out.Error.HTTP.Request.Headers) assert.Equal(t, common.MapStr{"f": []string{"g"}}, out.Error.HTTP.Response.Headers) }) @@ -198,7 +189,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { var input errorEvent input.Context.Page.URL.Set("https://my.site.test:9201") var out model.APMEvent - mapToErrorModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToErrorModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, "https://my.site.test:9201", out.Error.Page.URL.Full) assert.Equal(t, "https://my.site.test:9201", out.Error.URL.Full) assert.Equal(t, 9201, out.Error.Page.URL.Port) @@ -209,7 +200,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { var input errorEvent input.Context.Page.Referer.Set("https://my.site.test:9201") var out model.APMEvent - mapToErrorModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToErrorModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, "https://my.site.test:9201", out.Error.Page.Referer) assert.Equal(t, "https://my.site.test:9201", out.Error.HTTP.Request.Referrer) }) @@ -218,7 +209,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { var input errorEvent var out model.APMEvent input.Exception.Code.Set(123.456) - mapToErrorModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToErrorModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, "123", out.Error.Exception.Code) }) } diff --git a/model/modeldecoder/v2/metadata_test.go b/model/modeldecoder/v2/metadata_test.go index 31c06678eaa..da87e233ef6 100644 --- a/model/modeldecoder/v2/metadata_test.go +++ b/model/modeldecoder/v2/metadata_test.go @@ -145,6 +145,7 @@ func TestDecodeMapToMetadataModel(t *testing.T) { // enhanced data that are never set by the modeldecoder defaultVal := modeldecodertest.DefaultValues() input, out := initializedInputMetadata(defaultVal) + out.Timestamp = defaultVal.Time // iterate through model and assert values are set modeldecodertest.AssertStructValues(t, &out, isMetadataException, defaultVal) @@ -157,6 +158,7 @@ func TestDecodeMapToMetadataModel(t *testing.T) { otherVal.Update(defaultVal.IP) input.Reset() modeldecodertest.SetStructValues(&input, otherVal) + out.Timestamp = otherVal.Time mapToMetadataModel(&input, &out) modeldecodertest.AssertStructValues(t, &out, isMetadataException, otherVal) @@ -172,6 +174,7 @@ func TestDecodeMapToMetadataModel(t *testing.T) { var out2 model.APMEvent defaultVal := modeldecodertest.DefaultValues() input, out1 := initializedInputMetadata(defaultVal) + out1.Timestamp = defaultVal.Time // iterate through model and assert values are set modeldecodertest.AssertStructValues(t, &out1, isMetadataException, defaultVal) @@ -185,6 +188,7 @@ func TestDecodeMapToMetadataModel(t *testing.T) { input.Reset() modeldecodertest.SetStructValues(&input, otherVal) mapToMetadataModel(&input, &out2) + out2.Timestamp = otherVal.Time out2.Host.IP, out2.Client.IP = defaultVal.IP, defaultVal.IP modeldecodertest.AssertStructValues(t, &out2, isMetadataException, otherVal) modeldecodertest.AssertStructValues(t, &out1, isMetadataException, defaultVal) diff --git a/model/modeldecoder/v2/metricset_test.go b/model/modeldecoder/v2/metricset_test.go index afdf58a0ca5..d67d01814c7 100644 --- a/model/modeldecoder/v2/metricset_test.go +++ b/model/modeldecoder/v2/metricset_test.go @@ -45,7 +45,8 @@ func TestDecodeNestedMetricset(t *testing.T) { now := time.Now() defaultVal := modeldecodertest.DefaultValues() _, eventBase := initializedInputMetadata(defaultVal) - input := modeldecoder.Input{RequestTime: now, Base: eventBase, Config: modeldecoder.Config{}} + eventBase.Timestamp = now + input := modeldecoder.Input{Base: eventBase, Config: modeldecoder.Config{}} str := `{"metricset":{"timestamp":1599996822281000,"samples":{"a.b":{"value":2048}}}}` dec := decoder.NewJSONDecoder(strings.NewReader(str)) var batch model.Batch @@ -53,7 +54,7 @@ func TestDecodeNestedMetricset(t *testing.T) { require.Len(t, batch, 1) require.NotNil(t, batch[0].Metricset) assert.Equal(t, map[string]model.MetricsetSample{"a.b": {Value: 2048}}, batch[0].Metricset.Samples) - assert.Equal(t, "2020-09-13 11:33:42.281 +0000 UTC", batch[0].Metricset.Timestamp.String()) + defaultVal.Update(time.Unix(1599996822, 281000000).UTC()) modeldecodertest.AssertStructValues(t, &batch[0], isMetadataException, defaultVal) // invalid type @@ -92,11 +93,11 @@ func TestDecodeMapToMetricsetModel(t *testing.T) { t.Run("metricset-values", func(t *testing.T) { var input metricset var out1, out2 model.APMEvent - reqTime := time.Now().Add(time.Second) + now := time.Now().Add(time.Second) defaultVal := modeldecodertest.DefaultValues() modeldecodertest.SetStructValues(&input, defaultVal) - mapToMetricsetModel(&input, reqTime, modeldecoder.Config{}, &out1) + mapToMetricsetModel(&input, modeldecoder.Config{}, &out1) input.Reset() modeldecodertest.AssertStructValues(t, out1.Metricset, metadataExceptions, defaultVal) defaultSamples := map[string]model.MetricsetSample{ @@ -124,18 +125,19 @@ func TestDecodeMapToMetricsetModel(t *testing.T) { } assert.Equal(t, defaultSamples, out1.Metricset.Samples) - // set Timestamp to requestTime if eventTime is zero + // leave Timestamp unmodified if eventTime is zero + out1.Timestamp = now defaultVal.Update(time.Time{}) modeldecodertest.SetStructValues(&input, defaultVal) - mapToMetricsetModel(&input, reqTime, modeldecoder.Config{}, &out1) - defaultVal.Update(reqTime) + mapToMetricsetModel(&input, modeldecoder.Config{}, &out1) + defaultVal.Update(now) input.Reset() modeldecodertest.AssertStructValues(t, out1.Metricset, metadataExceptions, defaultVal) // ensure memory is not shared by reusing input model otherVal := modeldecodertest.NonDefaultValues() modeldecodertest.SetStructValues(&input, otherVal) - mapToMetricsetModel(&input, reqTime, modeldecoder.Config{}, &out2) + mapToMetricsetModel(&input, modeldecoder.Config{}, &out2) modeldecodertest.AssertStructValues(t, out2.Metricset, metadataExceptions, otherVal) otherSamples := map[string]model.MetricsetSample{ otherVal.Str + "0": { diff --git a/model/modeldecoder/v2/span_test.go b/model/modeldecoder/v2/span_test.go index 54079637407..ea506eab9fa 100644 --- a/model/modeldecoder/v2/span_test.go +++ b/model/modeldecoder/v2/span_test.go @@ -47,13 +47,14 @@ func TestDecodeNestedSpan(t *testing.T) { t.Run("decode", func(t *testing.T) { defaultVal := modeldecodertest.DefaultValues() _, eventBase := initializedInputMetadata(defaultVal) - input := modeldecoder.Input{RequestTime: time.Now(), Base: eventBase, Config: modeldecoder.Config{}} + input := modeldecoder.Input{Base: eventBase, Config: modeldecoder.Config{}} str := `{"span":{"duration":100,"id":"a-b-c","name":"s","parent_id":"parent-123","trace_id":"trace-ab","type":"db","start":143}}` dec := decoder.NewJSONDecoder(strings.NewReader(str)) var batch model.Batch require.NoError(t, DecodeNestedSpan(dec, &input, &batch)) require.Len(t, batch, 1) require.NotNil(t, batch[0].Span) + defaultVal.Update(time.Time{}.Add(143 * time.Millisecond)) modeldecodertest.AssertStructValues(t, &batch[0], isMetadataException, defaultVal) err := DecodeNestedSpan(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &batch) @@ -76,13 +77,13 @@ func TestDecodeMapToSpanModel(t *testing.T) { defaultVal := modeldecodertest.DefaultValues() modeldecodertest.SetStructValues(&input, defaultVal) _, out := initializedInputMetadata(defaultVal) - mapToSpanModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToSpanModel(&input, modeldecoder.Config{}, &out) modeldecodertest.AssertStructValues(t, &out.Service, exceptions, defaultVal) }) t.Run("experimental", func(t *testing.T) { // experimental enabled - input := modeldecoder.Input{RequestTime: time.Now(), Config: modeldecoder.Config{Experimental: true}} + input := modeldecoder.Input{Config: modeldecoder.Config{Experimental: true}} str := `{"span":{"context":{"experimental":"exp"},"duration":100,"id":"a-b-c","name":"s","parent_id":"parent-123","trace_id":"trace-ab","type":"db","start":143}}` dec := decoder.NewJSONDecoder(strings.NewReader(str)) var batch model.Batch @@ -90,7 +91,7 @@ func TestDecodeMapToSpanModel(t *testing.T) { assert.Equal(t, "exp", batch[0].Span.Experimental) // experimental disabled - input = modeldecoder.Input{RequestTime: time.Now(), Config: modeldecoder.Config{Experimental: false}} + input = modeldecoder.Input{Config: modeldecoder.Config{Experimental: false}} str = `{"span":{"context":{"experimental":"exp"},"duration":100,"id":"a-b-c","name":"s","parent_id":"parent-123","trace_id":"trace-ab","type":"db","start":143}}` dec = decoder.NewJSONDecoder(strings.NewReader(str)) batch = model.Batch{} @@ -136,10 +137,9 @@ func TestDecodeMapToSpanModel(t *testing.T) { var input span var out1, out2 model.APMEvent - reqTime := time.Now().Add(time.Second) defaultVal := modeldecodertest.DefaultValues() modeldecodertest.SetStructValues(&input, defaultVal) - mapToSpanModel(&input, reqTime, modeldecoder.Config{Experimental: true}, &out1) + mapToSpanModel(&input, modeldecoder.Config{Experimental: true}, &out1) input.Reset() modeldecodertest.AssertStructValues(t, out1.Span, exceptions, defaultVal) @@ -147,7 +147,7 @@ func TestDecodeMapToSpanModel(t *testing.T) { // ensure memory is not shared by reusing input model otherVal := modeldecodertest.NonDefaultValues() modeldecodertest.SetStructValues(&input, otherVal) - mapToSpanModel(&input, reqTime, modeldecoder.Config{Experimental: true}, &out2) + mapToSpanModel(&input, modeldecoder.Config{Experimental: true}, &out2) input.Reset() modeldecodertest.AssertStructValues(t, out2.Span, exceptions, otherVal) modeldecodertest.AssertStructValues(t, out1.Span, exceptions, defaultVal) @@ -160,22 +160,22 @@ func TestDecodeMapToSpanModel(t *testing.T) { // set from input, ignore status code input.Outcome.Set("failure") input.Context.HTTP.StatusCode.Set(http.StatusPermanentRedirect) - mapToSpanModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToSpanModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, "failure", out.Span.Outcome) // derive from other fields - success input.Outcome.Reset() input.Context.HTTP.StatusCode.Set(http.StatusPermanentRedirect) - mapToSpanModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToSpanModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, "success", out.Span.Outcome) // derive from other fields - failure input.Outcome.Reset() input.Context.HTTP.StatusCode.Set(http.StatusBadRequest) - mapToSpanModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToSpanModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, "failure", out.Span.Outcome) // derive from other fields - unknown input.Outcome.Reset() input.Context.HTTP.StatusCode.Reset() - mapToSpanModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToSpanModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, "unknown", out.Span.Outcome) }) @@ -187,16 +187,17 @@ func TestDecodeMapToSpanModel(t *testing.T) { defaultVal := modeldecodertest.DefaultValues() defaultVal.Update(20.5, time.Time{}) modeldecodertest.SetStructValues(&input, defaultVal) - mapToSpanModel(&input, reqTime, modeldecoder.Config{}, &out) + out.Timestamp = reqTime + mapToSpanModel(&input, modeldecoder.Config{}, &out) timestamp := reqTime.Add(time.Duration((20.5) * float64(time.Millisecond))) - assert.Equal(t, timestamp, out.Span.Timestamp) - // set requestTime if eventTime is zero and start is not set - out = model.APMEvent{} + assert.Equal(t, timestamp, out.Timestamp) + // leave event timestamp unmodified if eventTime is zero and start is not set + out = model.APMEvent{Timestamp: reqTime} modeldecodertest.SetStructValues(&input, defaultVal) input.Start.Reset() - mapToSpanModel(&input, reqTime, modeldecoder.Config{}, &out) + mapToSpanModel(&input, modeldecoder.Config{}, &out) require.Nil(t, out.Span.Start) - assert.Equal(t, reqTime, out.Span.Timestamp) + assert.Equal(t, reqTime, out.Timestamp) }) t.Run("sample-rate", func(t *testing.T) { @@ -205,16 +206,16 @@ func TestDecodeMapToSpanModel(t *testing.T) { modeldecodertest.SetStructValues(&input, modeldecodertest.DefaultValues()) // sample rate is set to > 0 input.SampleRate.Set(0.25) - mapToSpanModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToSpanModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, 4.0, out.Span.RepresentativeCount) // sample rate is not set out.Span.RepresentativeCount = 0.0 input.SampleRate.Reset() - mapToSpanModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToSpanModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, 0.0, out.Span.RepresentativeCount) // sample rate is set to 0 input.SampleRate.Set(0) - mapToSpanModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToSpanModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, 0.0, out.Span.RepresentativeCount) }) @@ -253,7 +254,7 @@ func TestDecodeMapToSpanModel(t *testing.T) { input.Action.Reset() } var out model.APMEvent - mapToSpanModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToSpanModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, tc.typ, out.Span.Type) assert.Equal(t, tc.subtype, out.Span.Subtype) assert.Equal(t, tc.action, out.Span.Action) @@ -265,7 +266,7 @@ func TestDecodeMapToSpanModel(t *testing.T) { var input span input.Context.HTTP.Response.Headers.Set(http.Header{"a": []string{"b", "c"}}) var out model.APMEvent - mapToSpanModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToSpanModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, common.MapStr{"a": []string{"b", "c"}}, out.Span.HTTP.Response.Headers) }) } diff --git a/model/modeldecoder/v2/transaction_test.go b/model/modeldecoder/v2/transaction_test.go index 646e782e391..cd4f85d2f99 100644 --- a/model/modeldecoder/v2/transaction_test.go +++ b/model/modeldecoder/v2/transaction_test.go @@ -47,7 +47,7 @@ func TestResetTransactionOnRelease(t *testing.T) { func TestDecodeNestedTransaction(t *testing.T) { t.Run("decode", func(t *testing.T) { now := time.Now() - input := modeldecoder.Input{RequestTime: now, Config: modeldecoder.Config{Experimental: true}} + input := modeldecoder.Input{Config: modeldecoder.Config{Experimental: true}} str := `{"transaction":{"duration":100,"timestamp":1599996822281000,"id":"100","trace_id":"1","type":"request","span_count":{"started":2},"context":{"experimental":"exp"}}}` dec := decoder.NewJSONDecoder(strings.NewReader(str)) @@ -57,17 +57,17 @@ func TestDecodeNestedTransaction(t *testing.T) { require.NotNil(t, batch[0].Transaction) assert.Equal(t, "request", batch[0].Transaction.Type) assert.Equal(t, "exp", batch[0].Transaction.Experimental) - assert.Equal(t, "2020-09-13 11:33:42.281 +0000 UTC", batch[0].Transaction.Timestamp.String()) + assert.Equal(t, "2020-09-13 11:33:42.281 +0000 UTC", batch[0].Timestamp.String()) - input = modeldecoder.Input{RequestTime: now, Config: modeldecoder.Config{Experimental: false}} + input = modeldecoder.Input{Base: model.APMEvent{Timestamp: now}, Config: modeldecoder.Config{Experimental: false}} str = `{"transaction":{"duration":100,"id":"100","trace_id":"1","type":"request","span_count":{"started":2},"context":{"experimental":"exp"}}}` dec = decoder.NewJSONDecoder(strings.NewReader(str)) batch = model.Batch{} require.NoError(t, DecodeNestedTransaction(dec, &input, &batch)) // experimental should only be set if allowed by configuration assert.Nil(t, batch[0].Transaction.Experimental) - // if no timestamp is provided, fall back to request time - assert.Equal(t, now, batch[0].Transaction.Timestamp) + // if no timestamp is provided, fall back to base event timestamp + assert.Equal(t, now, batch[0].Timestamp) err := DecodeNestedTransaction(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &batch) require.Error(t, err) @@ -92,7 +92,7 @@ func TestDecodeMapToTransactionModel(t *testing.T) { _, out := initializedInputMetadata(modeldecodertest.DefaultValues()) otherVal := modeldecodertest.NonDefaultValues() modeldecodertest.SetStructValues(&input, otherVal) - mapToTransactionModel(&input, time.Now(), modeldecoder.Config{Experimental: true}, &out) + mapToTransactionModel(&input, modeldecoder.Config{Experimental: true}, &out) input.Reset() // ensure event Metadata are updated where expected @@ -102,11 +102,11 @@ func TestDecodeMapToTransactionModel(t *testing.T) { // do not overwrite client.ip if already set in metadata ip := modeldecodertest.DefaultValues().IP assert.Equal(t, ip, out.Client.IP, out.Client.IP.String()) - // metadata labels and event labels should not be merged - mLabels := common.MapStr{"init0": "init", "init1": "init", "init2": "init"} - tLabels := common.MapStr{"overwritten0": "overwritten", "overwritten1": "overwritten"} - assert.Equal(t, mLabels, out.Labels) - assert.Equal(t, tLabels, out.Transaction.Labels) + assert.Equal(t, common.MapStr{ + "init0": "init", "init1": "init", "init2": "init", + "overwritten0": "overwritten", "overwritten1": "overwritten", + }, out.Labels) + //assert.Equal(t, tLabels, out.Transaction.Labels) exceptions := func(key string) bool { return false } modeldecodertest.AssertStructValues(t, &out.Service, exceptions, otherVal) modeldecodertest.AssertStructValues(t, &out.User, exceptions, otherVal) @@ -119,13 +119,13 @@ func TestDecodeMapToTransactionModel(t *testing.T) { input.Context.Request.Socket.RemoteAddress.Set(randomIP.String()) // from headers (case insensitive) input.Context.Request.Headers.Val.Add("x-Real-ip", gatewayIP.String()) - mapToTransactionModel(&input, time.Now(), modeldecoder.Config{Experimental: false}, &out) + mapToTransactionModel(&input, modeldecoder.Config{Experimental: false}, &out) assert.Equal(t, gatewayIP.String(), out.Client.IP.String()) // ignore if set in event already out = model.APMEvent{ Client: model.Client{IP: net.ParseIP("192.17.1.1")}, } - mapToTransactionModel(&input, time.Now(), modeldecoder.Config{Experimental: false}, &out) + mapToTransactionModel(&input, modeldecoder.Config{Experimental: false}, &out) assert.Equal(t, "192.17.1.1", out.Client.IP.String()) }) @@ -136,7 +136,7 @@ func TestDecodeMapToTransactionModel(t *testing.T) { input.Context.Request.Headers.Set(http.Header{}) input.Context.Request.Headers.Val.Add("x-Real-ip", "192.13.14:8097") input.Context.Request.Socket.RemoteAddress.Set(randomIP.String()) - mapToTransactionModel(&input, time.Now(), modeldecoder.Config{Experimental: false}, &out) + mapToTransactionModel(&input, modeldecoder.Config{Experimental: false}, &out) // ensure client ip is populated from socket assert.Equal(t, randomIP.String(), out.Client.IP.String()) }) @@ -146,7 +146,7 @@ func TestDecodeMapToTransactionModel(t *testing.T) { var input transaction _, out := initializedInputMetadata(modeldecodertest.DefaultValues()) input.Context.User.Email.Set("test@user.com") - mapToTransactionModel(&input, time.Now(), modeldecoder.Config{Experimental: false}, &out) + mapToTransactionModel(&input, modeldecoder.Config{Experimental: false}, &out) assert.Equal(t, "test@user.com", out.User.Email) assert.Zero(t, out.User.ID) assert.Zero(t, out.User.Name) @@ -168,24 +168,27 @@ func TestDecodeMapToTransactionModel(t *testing.T) { var input transaction var out1, out2 model.APMEvent reqTime := time.Now().Add(time.Second) + out1.Timestamp = reqTime defaultVal := modeldecodertest.DefaultValues() modeldecodertest.SetStructValues(&input, defaultVal) - mapToTransactionModel(&input, reqTime, modeldecoder.Config{Experimental: true}, &out1) + mapToTransactionModel(&input, modeldecoder.Config{Experimental: true}, &out1) input.Reset() modeldecodertest.AssertStructValues(t, out1.Transaction, exceptions, defaultVal) - // set Timestamp to requestTime if eventTime is zero + // leave event timestamp unmodified if eventTime is zero + out1.Timestamp = reqTime defaultVal.Update(time.Time{}) modeldecodertest.SetStructValues(&input, defaultVal) - mapToTransactionModel(&input, reqTime, modeldecoder.Config{Experimental: true}, &out1) + mapToTransactionModel(&input, modeldecoder.Config{Experimental: true}, &out1) defaultVal.Update(reqTime) input.Reset() modeldecodertest.AssertStructValues(t, out1.Transaction, exceptions, defaultVal) // ensure memory is not shared by reusing input model + out2.Timestamp = reqTime otherVal := modeldecodertest.NonDefaultValues() modeldecodertest.SetStructValues(&input, otherVal) - mapToTransactionModel(&input, reqTime, modeldecoder.Config{Experimental: true}, &out2) + mapToTransactionModel(&input, modeldecoder.Config{Experimental: true}, &out2) modeldecodertest.AssertStructValues(t, out2.Transaction, exceptions, otherVal) modeldecodertest.AssertStructValues(t, out1.Transaction, exceptions, defaultVal) }) @@ -195,7 +198,7 @@ func TestDecodeMapToTransactionModel(t *testing.T) { input.Context.Request.Headers.Set(http.Header{"a": []string{"b"}, "c": []string{"d", "e"}}) input.Context.Response.Headers.Set(http.Header{"f": []string{"g"}}) var out model.APMEvent - mapToTransactionModel(&input, time.Now(), modeldecoder.Config{Experimental: false}, &out) + mapToTransactionModel(&input, modeldecoder.Config{Experimental: false}, &out) assert.Equal(t, common.MapStr{"a": []string{"b"}, "c": []string{"d", "e"}}, out.Transaction.HTTP.Request.Headers) assert.Equal(t, common.MapStr{"f": []string{"g"}}, out.Transaction.HTTP.Response.Headers) }) @@ -208,7 +211,7 @@ func TestDecodeMapToTransactionModel(t *testing.T) { "c": "d", }) var out model.APMEvent - mapToTransactionModel(&input, time.Now(), modeldecoder.Config{Experimental: false}, &out) + mapToTransactionModel(&input, modeldecoder.Config{Experimental: false}, &out) assert.Equal(t, map[string]interface{}{"a": common.Float(123.456), "c": "d"}, out.Transaction.HTTP.Request.Body) }) @@ -216,7 +219,7 @@ func TestDecodeMapToTransactionModel(t *testing.T) { var input transaction var out model.APMEvent input.Context.Page.URL.Set("https://my.site.test:9201") - mapToTransactionModel(&input, time.Now(), modeldecoder.Config{Experimental: false}, &out) + mapToTransactionModel(&input, modeldecoder.Config{Experimental: false}, &out) assert.Equal(t, "https://my.site.test:9201", out.Transaction.Page.URL.Full) assert.Equal(t, "https://my.site.test:9201", out.Transaction.URL.Full) assert.Equal(t, 9201, out.Transaction.Page.URL.Port) @@ -227,7 +230,7 @@ func TestDecodeMapToTransactionModel(t *testing.T) { var input transaction var out model.APMEvent input.Context.Page.Referer.Set("https://my.site.test:9201") - mapToTransactionModel(&input, time.Now(), modeldecoder.Config{Experimental: false}, &out) + mapToTransactionModel(&input, modeldecoder.Config{Experimental: false}, &out) assert.Equal(t, "https://my.site.test:9201", out.Transaction.Page.Referer) assert.Equal(t, "https://my.site.test:9201", out.Transaction.HTTP.Request.Referrer) }) @@ -238,17 +241,17 @@ func TestDecodeMapToTransactionModel(t *testing.T) { modeldecodertest.SetStructValues(&input, modeldecodertest.DefaultValues()) // sample rate is set to > 0 input.SampleRate.Set(0.25) - mapToTransactionModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToTransactionModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, 4.0, out.Transaction.RepresentativeCount) // sample rate is not set -> Representative Count should be 1 by default out.Transaction.RepresentativeCount = 0.0 //reset to zero value input.SampleRate.Reset() - mapToTransactionModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToTransactionModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, 1.0, out.Transaction.RepresentativeCount) // sample rate is set to 0 out.Transaction.RepresentativeCount = 0.0 //reset to zero value input.SampleRate.Set(0) - mapToTransactionModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToTransactionModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, 0.0, out.Transaction.RepresentativeCount) }) @@ -259,22 +262,22 @@ func TestDecodeMapToTransactionModel(t *testing.T) { // set from input, ignore status code input.Outcome.Set("failure") input.Context.Response.StatusCode.Set(http.StatusBadRequest) - mapToTransactionModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToTransactionModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, "failure", out.Transaction.Outcome) // derive from other fields - success input.Outcome.Reset() input.Context.Response.StatusCode.Set(http.StatusBadRequest) - mapToTransactionModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToTransactionModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, "success", out.Transaction.Outcome) // derive from other fields - failure input.Outcome.Reset() input.Context.Response.StatusCode.Set(http.StatusInternalServerError) - mapToTransactionModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToTransactionModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, "failure", out.Transaction.Outcome) // derive from other fields - unknown input.Outcome.Reset() input.Context.Response.StatusCode.Reset() - mapToTransactionModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToTransactionModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, "unknown", out.Transaction.Outcome) }) @@ -283,12 +286,12 @@ func TestDecodeMapToTransactionModel(t *testing.T) { var out model.APMEvent modeldecodertest.SetStructValues(&input, modeldecodertest.DefaultValues()) input.Session.ID.Reset() - mapToTransactionModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToTransactionModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, model.TransactionSession{}, out.Transaction.Session) input.Session.ID.Set("session_id") input.Session.Sequence.Set(123) - mapToTransactionModel(&input, time.Now(), modeldecoder.Config{}, &out) + mapToTransactionModel(&input, modeldecoder.Config{}, &out) assert.Equal(t, model.TransactionSession{ ID: "session_id", Sequence: 123, diff --git a/model/profile.go b/model/profile.go index ff1cf4278f7..bb2d4a6fbc4 100644 --- a/model/profile.go +++ b/model/profile.go @@ -20,7 +20,6 @@ package model import ( "time" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" ) @@ -37,11 +36,9 @@ var profileProcessorEntry = common.MapStr{ // ProfileSample holds a profiling sample. type ProfileSample struct { - Timestamp time.Time Duration time.Duration ProfileID string Stack []ProfileSampleStackframe - Labels common.MapStr Values map[string]int64 } @@ -53,7 +50,7 @@ type ProfileSampleStackframe struct { Line int64 } -func (p *ProfileSample) toBeatEvent() beat.Event { +func (p *ProfileSample) fields() common.MapStr { var profileFields mapStr profileFields.maybeSetString("id", p.ProfileID) if p.Duration > 0 { @@ -81,13 +78,8 @@ func (p *ProfileSample) toBeatEvent() beat.Event { profileFields.set(k, v) } - fields := mapStr{ + return common.MapStr{ "processor": profileProcessorEntry, profileDocType: common.MapStr(profileFields), } - - return beat.Event{ - Timestamp: p.Timestamp, - Fields: common.MapStr(fields), - } } diff --git a/model/profile_test.go b/model/profile_test.go index b5d572966fc..dda914e53dc 100644 --- a/model/profile_test.go +++ b/model/profile_test.go @@ -33,7 +33,6 @@ import ( func TestProfileSampleTransform(t *testing.T) { timestamp := time.Unix(123, 456) sample := model.ProfileSample{ - Timestamp: timestamp, Duration: 10 * time.Second, ProfileID: "profile_id", Stack: []model.ProfileSampleStackframe{{ @@ -46,10 +45,6 @@ func TestProfileSampleTransform(t *testing.T) { Function: "bar", Filename: "bar.go", }}, - Labels: common.MapStr{ - "key1": []string{"abc", "def"}, - "key2": []string{"ghi"}, - }, Values: map[string]int64{ "samples.count": 1, "cpu.ns": 123, @@ -58,7 +53,13 @@ func TestProfileSampleTransform(t *testing.T) { }, } - batch := &model.Batch{{ProfileSample: &sample}, {ProfileSample: &sample}} + batch := &model.Batch{{ + Timestamp: timestamp, + ProfileSample: &sample, + }, { + Timestamp: timestamp, + ProfileSample: &sample, + }} output := batch.Transform(context.Background()) require.Len(t, output, 2) assert.Equal(t, output[0], output[1]) @@ -67,10 +68,6 @@ func TestProfileSampleTransform(t *testing.T) { Timestamp: timestamp, Fields: common.MapStr{ "processor": common.MapStr{"event": "profile", "name": "profile"}, - "labels": common.MapStr{ - "key1": []string{"abc", "def"}, - "key2": []string{"ghi"}, - }, "profile": common.MapStr{ "id": "profile_id", "duration": int64(10 * time.Second), diff --git a/model/span.go b/model/span.go index e5d685b9306..c2ab0380c1f 100644 --- a/model/span.go +++ b/model/span.go @@ -18,10 +18,6 @@ package model import ( - "context" - "time" - - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/monitoring" @@ -47,8 +43,6 @@ type Span struct { ChildIDs []string TraceID string - Timestamp time.Time - Message *Message Name string Outcome string @@ -56,7 +50,6 @@ type Span struct { Duration float64 Stacktrace Stacktrace Sync *bool - Labels common.MapStr Type string Subtype string @@ -162,17 +155,14 @@ func (c *Composite) fields() common.MapStr { return common.MapStr(fields) } -func (e *Span) toBeatEvent(ctx context.Context) beat.Event { +func (e *Span) fields() common.MapStr { spanTransformations.Inc() if frames := len(e.Stacktrace); frames > 0 { spanStacktraceCounter.Inc() spanFrameCounter.Add(int64(frames)) } - fields := mapStr{ - "processor": spanProcessorEntry, - spanDocType: e.fields(ctx), - } + fields := mapStr{"processor": spanProcessorEntry} var trace, transaction, parent mapStr if trace.maybeSetString("id", e.TraceID) { @@ -189,7 +179,6 @@ func (e *Span) toBeatEvent(ctx context.Context) beat.Event { child.set("id", e.ChildIDs) fields.set("child", common.MapStr(child)) } - fields.maybeSetMapStr("timestamp", utility.TimeAsMicros(e.Timestamp)) if e.Experimental != nil { fields.set("experimental", e.Experimental) } @@ -201,43 +190,34 @@ func (e *Span) toBeatEvent(ctx context.Context) beat.Event { common.MapStr(fields).Put("event.outcome", e.Outcome) - return beat.Event{ - Fields: common.MapStr(fields), - Timestamp: e.Timestamp, - } -} - -func (e *Span) fields(ctx context.Context) common.MapStr { - if e == nil { - return nil - } - var fields mapStr - fields.set("name", e.Name) - fields.set("type", e.Type) - fields.maybeSetString("id", e.ID) - fields.maybeSetString("subtype", e.Subtype) - fields.maybeSetString("action", e.Action) - fields.maybeSetBool("sync", e.Sync) + var span mapStr + span.set("name", e.Name) + span.set("type", e.Type) + span.maybeSetString("id", e.ID) + span.maybeSetString("subtype", e.Subtype) + span.maybeSetString("action", e.Action) + span.maybeSetBool("sync", e.Sync) if e.Start != nil { - fields.set("start", utility.MillisAsMicros(*e.Start)) + span.set("start", utility.MillisAsMicros(*e.Start)) } - fields.set("duration", utility.MillisAsMicros(e.Duration)) + span.set("duration", utility.MillisAsMicros(e.Duration)) if e.HTTP != nil { - fields.maybeSetMapStr("http", e.HTTP.spanFields()) - fields.maybeSetString("http.url.original", e.URL) + span.maybeSetMapStr("http", e.HTTP.spanFields()) + span.maybeSetString("http.url.original", e.URL) } - fields.maybeSetMapStr("db", e.DB.fields()) - fields.maybeSetMapStr("message", e.Message.Fields()) - fields.maybeSetMapStr("composite", e.Composite.fields()) + span.maybeSetMapStr("db", e.DB.fields()) + span.maybeSetMapStr("message", e.Message.Fields()) + span.maybeSetMapStr("composite", e.Composite.fields()) if destinationServiceFields := e.DestinationService.fields(); len(destinationServiceFields) > 0 { - common.MapStr(fields).Put("destination.service", destinationServiceFields) + common.MapStr(span).Put("destination.service", destinationServiceFields) } - // TODO(axw) we should be using a merged service object, combining // the stream metadata and event-specific service info. if st := e.Stacktrace.transform(); len(st) > 0 { - fields.set("stacktrace", st) + span.set("stacktrace", st) } + fields.set("span", common.MapStr(span)) + return common.MapStr(fields) } diff --git a/model/span_test.go b/model/span_test.go index ca968be0300..132ea3cdbee 100644 --- a/model/span_test.go +++ b/model/span_test.go @@ -49,7 +49,7 @@ func TestSpanTransform(t *testing.T) { }{ { Msg: "Span without a Stacktrace", - Span: Span{Timestamp: timestamp}, + Span: Span{}, Output: common.MapStr{ "processor": common.MapStr{"event": "span", "name": "transaction"}, "span": common.MapStr{ @@ -63,7 +63,7 @@ func TestSpanTransform(t *testing.T) { }, { Msg: "Span with outcome", - Span: Span{Timestamp: timestamp, Outcome: "success"}, + Span: Span{Outcome: "success"}, Output: common.MapStr{ "processor": common.MapStr{"event": "span", "name": "transaction"}, "span": common.MapStr{ @@ -85,13 +85,11 @@ func TestSpanTransform(t *testing.T) { Type: "myspantype", Subtype: subtype, Action: action, - Timestamp: timestamp, Start: &start, Outcome: "unknown", RepresentativeCount: 5, Duration: 1.20, Stacktrace: Stacktrace{{AbsPath: path}}, - Labels: common.MapStr{"label_a": 12}, HTTP: &HTTP{ Request: &HTTPRequest{Method: method}, Response: &HTTPResponse{StatusCode: statusCode}, @@ -152,7 +150,6 @@ func TestSpanTransform(t *testing.T) { "compression_strategy": "exact_match", }, }, - "labels": common.MapStr{"label_a": 12}, "processor": common.MapStr{"event": "span", "name": "transaction"}, "timestamp": common.MapStr{"us": timestampUs}, "trace": common.MapStr{"id": traceID}, @@ -169,7 +166,7 @@ func TestSpanTransform(t *testing.T) { } for _, test := range tests { - event := APMEvent{Span: &test.Span} + event := APMEvent{Span: &test.Span, Timestamp: timestamp} output := event.appendBeatEvent(context.Background(), nil) require.Len(t, output, 1) assert.Equal(t, test.Output, output[0].Fields, test.Msg) diff --git a/model/transaction.go b/model/transaction.go index f9794c6f6d4..0609ca43490 100644 --- a/model/transaction.go +++ b/model/transaction.go @@ -18,9 +18,6 @@ package model import ( - "time" - - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/monitoring" @@ -44,8 +41,6 @@ type Transaction struct { ParentID string TraceID string - Timestamp time.Time - Type string Name string Result string @@ -58,7 +53,6 @@ type Transaction struct { Page *Page HTTP *HTTP URL *URL - Labels common.MapStr Custom common.MapStr UserExperience *UserExperience Session TransactionSession @@ -87,39 +81,11 @@ type SpanCount struct { Started *int } -// fields creates the fields to populate in the top-level "transaction" object field. func (e *Transaction) fields() common.MapStr { - var fields mapStr - fields.set("id", e.ID) - fields.set("type", e.Type) - fields.set("duration", utility.MillisAsMicros(e.Duration)) - fields.maybeSetString("name", e.Name) - fields.maybeSetString("result", e.Result) - fields.maybeSetMapStr("marks", e.Marks.fields()) - fields.maybeSetMapStr("page", e.Page.Fields()) - fields.maybeSetMapStr("custom", customFields(e.Custom)) - fields.maybeSetMapStr("message", e.Message.Fields()) - fields.maybeSetMapStr("experience", e.UserExperience.Fields()) - if e.SpanCount.Dropped != nil || e.SpanCount.Started != nil { - spanCount := common.MapStr{} - if e.SpanCount.Dropped != nil { - spanCount["dropped"] = *e.SpanCount.Dropped - } - if e.SpanCount.Started != nil { - spanCount["started"] = *e.SpanCount.Started - } - fields.set("span_count", spanCount) - } - fields.set("sampled", e.Sampled) - return common.MapStr(fields) -} - -func (e *Transaction) toBeatEvent() beat.Event { transactionTransformations.Inc() fields := mapStr{ - "processor": transactionProcessorEntry, - transactionDocType: e.fields(), + "processor": transactionProcessorEntry, } var parent, trace mapStr @@ -127,7 +93,6 @@ func (e *Transaction) toBeatEvent() beat.Event { trace.maybeSetString("id", e.TraceID) fields.maybeSetMapStr("parent", common.MapStr(parent)) fields.maybeSetMapStr("trace", common.MapStr(trace)) - fields.maybeSetMapStr("timestamp", utility.TimeAsMicros(e.Timestamp)) if e.HTTP != nil { fields.maybeSetMapStr("http", e.HTTP.transactionTopLevelFields()) } @@ -138,10 +103,31 @@ func (e *Transaction) toBeatEvent() beat.Event { } common.MapStr(fields).Put("event.outcome", e.Outcome) - return beat.Event{ - Timestamp: e.Timestamp, - Fields: common.MapStr(fields), + var transaction mapStr + transaction.set("id", e.ID) + transaction.set("type", e.Type) + transaction.set("duration", utility.MillisAsMicros(e.Duration)) + transaction.maybeSetString("name", e.Name) + transaction.maybeSetString("result", e.Result) + transaction.maybeSetMapStr("marks", e.Marks.fields()) + transaction.maybeSetMapStr("page", e.Page.Fields()) + transaction.maybeSetMapStr("custom", customFields(e.Custom)) + transaction.maybeSetMapStr("message", e.Message.Fields()) + transaction.maybeSetMapStr("experience", e.UserExperience.Fields()) + if e.SpanCount.Dropped != nil || e.SpanCount.Started != nil { + spanCount := common.MapStr{} + if e.SpanCount.Dropped != nil { + spanCount["dropped"] = *e.SpanCount.Dropped + } + if e.SpanCount.Started != nil { + spanCount["started"] = *e.SpanCount.Started + } + transaction.set("span_count", spanCount) } + transaction.set("sampled", e.Sampled) + fields.set("transaction", common.MapStr(transaction)) + + return common.MapStr(fields) } type TransactionMarks map[string]TransactionMark diff --git a/model/transaction_test.go b/model/transaction_test.go index 1d7c6e4e3d0..459d2808c12 100644 --- a/model/transaction_test.go +++ b/model/transaction_test.go @@ -22,7 +22,6 @@ import ( "fmt" "net" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -103,7 +102,6 @@ func TestTransactionTransform(t *testing.T) { Name: name, Type: "tx", Result: result, - Timestamp: time.Now(), Duration: 65.98, Sampled: true, SpanCount: SpanCount{Started: &startedSpans, Dropped: &dropped}, @@ -122,23 +120,21 @@ func TestTransactionTransform(t *testing.T) { } for idx, test := range tests { - output := test.Transaction.toBeatEvent() - assert.Equal(t, test.Output, output.Fields["transaction"], fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg)) + fields := test.Transaction.fields() + assert.Equal(t, test.Output, fields["transaction"], fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg)) } } func TestTransactionTransformOutcome(t *testing.T) { tx := Transaction{Outcome: "success"} - event := tx.toBeatEvent() - assert.Equal(t, common.MapStr{"outcome": "success"}, event.Fields["event"]) + fields := tx.fields() + assert.Equal(t, common.MapStr{"outcome": "success"}, fields["event"]) } func TestEventsTransformWithMetadata(t *testing.T) { hostname := "a.b.c" architecture := "darwin" platform := "x64" - timestamp := time.Date(2019, 1, 3, 15, 17, 4, 908.596*1e6, time.FixedZone("+0100", 3600)) - timestampUs := timestamp.UnixNano() / 1000 id, name, ip, userAgent := "123", "jane", "63.23.123.4", "node-js-2.3" url, referer := "https://localhost", "http://localhost" serviceName, serviceNodeName, serviceVersion := "myservice", "service-123", "2.1.3" @@ -160,16 +156,13 @@ func TestEventsTransformWithMetadata(t *testing.T) { User: User{ID: id, Name: name}, UserAgent: UserAgent{Original: userAgent}, Client: Client{IP: net.ParseIP(ip)}, - Labels: common.MapStr{"a": true}, Transaction: &Transaction{ - Timestamp: timestamp, - Labels: common.MapStr{"a": "b"}, - Page: &Page{URL: &URL{Original: url}, Referer: referer}, - HTTP: &HTTP{Request: &request, Response: &response}, - URL: &URL{Original: url}, - Custom: common.MapStr{"foo.bar": "baz"}, - Message: &Message{QueueName: "routeUser"}, - Sampled: true, + Page: &Page{URL: &URL{Original: url}, Referer: referer}, + HTTP: &HTTP{Request: &request, Response: &response}, + URL: &URL{Original: url}, + Custom: common.MapStr{"foo.bar": "baz"}, + Message: &Message{QueueName: "routeUser"}, + Sampled: true, }, } @@ -199,7 +192,6 @@ func TestEventsTransformWithMetadata(t *testing.T) { "version": serviceVersion, "node": common.MapStr{"name": serviceNodeName}, }, - "timestamp": common.MapStr{"us": timestampUs}, "transaction": common.MapStr{ "duration": common.MapStr{"us": 0}, "id": "", @@ -211,9 +203,8 @@ func TestEventsTransformWithMetadata(t *testing.T) { }, "message": common.MapStr{"queue": common.MapStr{"name": "routeUser"}}, }, - "event": common.MapStr{"outcome": ""}, - "labels": common.MapStr{"a": "b"}, - "url": common.MapStr{"original": url}, + "event": common.MapStr{"outcome": ""}, + "url": common.MapStr{"original": url}, "http": common.MapStr{ "request": common.MapStr{"method": "post", "referrer": referer}, "response": common.MapStr{"finished": false, "headers": common.MapStr{"content-type": []string{"text/html"}}}, @@ -226,13 +217,13 @@ func TestTransformTransactionHTTP(t *testing.T) { tx := Transaction{ HTTP: &HTTP{Request: &request}, } - event := tx.toBeatEvent() + fields := tx.fields() assert.Equal(t, common.MapStr{ "request": common.MapStr{ "method": request.Method, "body.original": request.Body, }, - }, event.Fields["http"]) + }, fields["http"]) } func TestTransactionTransformPage(t *testing.T) { @@ -246,11 +237,10 @@ func TestTransactionTransformPage(t *testing.T) { }{ { Transaction: Transaction{ - ID: id, - Type: "tx", - Timestamp: time.Now(), - Duration: 65.98, - URL: ParseURL("https://localhost:8200/", "", ""), + ID: id, + Type: "tx", + Duration: 65.98, + URL: ParseURL("https://localhost:8200/", "", ""), Page: &Page{ URL: ParseURL(urlExample, "", ""), }, @@ -268,8 +258,8 @@ func TestTransactionTransformPage(t *testing.T) { } for idx, test := range tests { - output := test.Transaction.toBeatEvent() - assert.Equal(t, test.Output, output.Fields["url"], fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg)) + fields := test.Transaction.fields() + assert.Equal(t, test.Output, fields["url"], fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg)) } } @@ -297,8 +287,8 @@ func TestTransactionTransformMarks(t *testing.T) { } for idx, test := range tests { - output := test.Transaction.toBeatEvent() - marks, _ := output.Fields.GetValue("transaction.marks") + fields := test.Transaction.fields() + marks, _ := fields.GetValue("transaction.marks") assert.Equal(t, test.Output, marks, fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg)) } } @@ -338,8 +328,8 @@ func TestTransactionSession(t *testing.T) { }} for _, test := range tests { - output := test.Transaction.toBeatEvent() - session, err := output.Fields.GetValue("session") + fields := test.Transaction.fields() + session, err := fields.GetValue("session") if test.Output == nil { assert.Equal(t, common.ErrKeyNotFound, err) } else { diff --git a/processor/otel/exceptions.go b/processor/otel/exceptions.go index 27db21b3ee7..81b4fe17761 100644 --- a/processor/otel/exceptions.go +++ b/processor/otel/exceptions.go @@ -40,7 +40,6 @@ import ( "regexp" "strconv" "strings" - "time" "github.com/elastic/apm-server/model" ) @@ -51,7 +50,6 @@ var ( ) func convertOpenTelemetryExceptionSpanEvent( - timestamp time.Time, exceptionType, exceptionMessage, exceptionStacktrace string, exceptionEscaped bool, language string, @@ -61,7 +59,6 @@ func convertOpenTelemetryExceptionSpanEvent( } exceptionHandled := !exceptionEscaped exceptionError := model.Error{ - Timestamp: timestamp, Exception: &model.Exception{ Message: exceptionMessage, Type: exceptionType, diff --git a/processor/otel/exceptions_test.go b/processor/otel/exceptions_test.go index 40ec86e08ba..93f2f515510 100644 --- a/processor/otel/exceptions_test.go +++ b/processor/otel/exceptions_test.go @@ -111,15 +111,15 @@ Caused by: LowLevelException service, agent := languageOnlyMetadata("java") transactionEvent, errorEvents := transformTransactionSpanEvents(t, "java", exceptionEvent1, exceptionEvent2) assert.Equal(t, []model.APMEvent{{ - Service: service, - Agent: agent, + Service: service, + Agent: agent, + Timestamp: timestamp, Error: &model.Error{ TraceID: transactionEvent.Transaction.TraceID, ParentID: transactionEvent.Transaction.ID, TransactionID: transactionEvent.Transaction.ID, TransactionType: transactionEvent.Transaction.Type, TransactionSampled: newBool(true), - Timestamp: timestamp, Exception: &model.Exception{ Type: "java.net.ConnectException.OSError", Message: "Division by zero", @@ -158,15 +158,15 @@ Caused by: LowLevelException }, }, }, { - Service: service, - Agent: agent, + Service: service, + Agent: agent, + Timestamp: timestamp, Error: &model.Error{ TraceID: transactionEvent.Transaction.TraceID, ParentID: transactionEvent.Transaction.ID, TransactionID: transactionEvent.Transaction.ID, TransactionType: transactionEvent.Transaction.Type, TransactionSampled: newBool(true), - Timestamp: timestamp, Exception: &model.Exception{ Type: "HighLevelException", Message: "MidLevelException: LowLevelException", @@ -314,15 +314,15 @@ func TestEncodeSpanEventsNonJavaExceptions(t *testing.T) { service, agent := languageOnlyMetadata("COBOL") assert.Equal(t, model.APMEvent{ - Service: service, - Agent: agent, + Service: service, + Agent: agent, + Timestamp: timestamp, Error: &model.Error{ TraceID: transactionEvent.Transaction.TraceID, ParentID: transactionEvent.Transaction.ID, TransactionID: transactionEvent.Transaction.ID, TransactionType: transactionEvent.Transaction.Type, TransactionSampled: newBool(true), - Timestamp: timestamp, Exception: &model.Exception{ Type: "the_type", Message: "the_message", diff --git a/processor/otel/metadata.go b/processor/otel/metadata.go index 09c19d46b2d..279ff85c4cd 100644 --- a/processor/otel/metadata.go +++ b/processor/otel/metadata.go @@ -219,3 +219,9 @@ func ifaceAnyValueArray(array pdata.AnyValueArray) []interface{} { } return values } + +// initEventLabels initializes an event-specific label map, either making a copy +// of commonLabels if it is non-nil, or otherwise creating a new map. +func initEventLabels(commonLabels common.MapStr) common.MapStr { + return commonLabels.Clone() +} diff --git a/processor/otel/metadata_test.go b/processor/otel/metadata_test.go index 3ae71f7698b..260adddfffe 100644 --- a/processor/otel/metadata_test.go +++ b/processor/otel/metadata_test.go @@ -19,6 +19,7 @@ package otel_test import ( "testing" + "time" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/model/pdata" @@ -233,5 +234,6 @@ func transformResourceMetadata(t *testing.T, resourceAttrs map[string]pdata.Attr otelSpan.SetSpanID(pdata.NewSpanID([8]byte{2})) events := transformTraces(t, traces) events[0].Transaction = nil + events[0].Timestamp = time.Time{} return events[0] } diff --git a/processor/otel/metrics.go b/processor/otel/metrics.go index 45bc57ae742..d8d58f1d903 100644 --- a/processor/otel/metrics.go +++ b/processor/otel/metrics.go @@ -46,7 +46,6 @@ import ( logs "github.com/elastic/apm-server/log" "github.com/elastic/apm-server/model" - "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -105,9 +104,15 @@ func (c *Consumer) convertInstrumentationLibraryMetrics( } } for _, m := range ms { - m.Timestamp = m.Timestamp.Add(timeDelta) event := baseEvent event.Metricset = m.Metricset + event.Timestamp = m.timestamp.Add(timeDelta) + if n := len(m.labels); n > 0 { + event.Labels = initEventLabels(event.Labels) + for _, label := range m.labels { + event.Labels[label.key] = label.value + } + } *out = append(*out, event) } if unsupported > 0 { @@ -278,7 +283,8 @@ type metricsets []metricset type metricset struct { *model.Metricset - labels []stringMapItem // sorted by key + timestamp time.Time + labels []stringMapItem // sorted by key } type stringMapItem struct { @@ -350,15 +356,13 @@ func (ms *metricsets) upsertOne(timestamp time.Time, name string, labels []strin if i < len(*ms) && compareMetricsets((*ms)[i], timestamp, labels) == 0 { m = (*ms)[i].Metricset } else { - m = &model.Metricset{Timestamp: timestamp, Samples: make(map[string]model.MetricsetSample)} - if len(labels) > 0 { - m.Labels = make(common.MapStr, len(labels)) - for _, label := range labels { - m.Labels[label.key] = label.value - } - } + m = &model.Metricset{Samples: make(map[string]model.MetricsetSample)} head := (*ms)[:i] - tail := append([]metricset{{Metricset: m, labels: labels}}, (*ms)[i:]...) + tail := append([]metricset{{ + Metricset: m, + timestamp: timestamp, + labels: labels, + }}, (*ms)[i:]...) *ms = append(head, tail...) } m.Samples[name] = sample @@ -371,7 +375,7 @@ func (ms *metricsets) search(timestamp time.Time, labels []stringMapItem) int { } func compareMetricsets(ms metricset, timestamp time.Time, labels []stringMapItem) int { - if d := ms.Timestamp.Sub(timestamp); d < 0 { + if d := ms.timestamp.Sub(timestamp); d < 0 { return -1 } else if d > 0 { return 1 diff --git a/processor/otel/metrics_test.go b/processor/otel/metrics_test.go index bc1a843c8a2..453760a1e47 100644 --- a/processor/otel/metrics_test.go +++ b/processor/otel/metrics_test.go @@ -155,57 +155,79 @@ func TestConsumeMetrics(t *testing.T) { metric.Summary().DataPoints().AppendEmpty() expectDropped++ - metricsets, stats := transformMetrics(t, metrics) + events, stats := transformMetrics(t, metrics) assert.Equal(t, expectDropped, stats.UnsupportedMetricsDropped) - assert.Equal(t, []*model.Metricset{{ + service := model.Service{Name: "unknown", Language: model.Language{Name: "unknown"}} + agent := model.Agent{Name: "otlp", Version: "unknown"} + assert.Equal(t, []model.APMEvent{{ + Agent: agent, + Service: service, Timestamp: timestamp0, - Samples: map[string]model.MetricsetSample{ - "int_gauge_metric": {Value: 1, Type: "gauge"}, - "gauge_metric": {Value: 5, Type: "gauge"}, - "int_sum_metric": {Value: 9, Type: "counter"}, - "sum_metric": {Value: 12, Type: "counter"}, - "histogram_metric": { - Type: "histogram", - Counts: []int64{1, 1, 2, 3}, - Values: []float64{-1, 0.5, 2.75, 3.5}, - }, - "int_histogram_metric": { - Type: "histogram", - Counts: []int64{1, 2, 3}, - Values: []float64{1.5, 2.5, 3}, + Metricset: &model.Metricset{ + Samples: map[string]model.MetricsetSample{ + "int_gauge_metric": {Value: 1, Type: "gauge"}, + "gauge_metric": {Value: 5, Type: "gauge"}, + "int_sum_metric": {Value: 9, Type: "counter"}, + "sum_metric": {Value: 12, Type: "counter"}, + "histogram_metric": { + Type: "histogram", + Counts: []int64{1, 1, 2, 3}, + Values: []float64{-1, 0.5, 2.75, 3.5}, + }, + "int_histogram_metric": { + Type: "histogram", + Counts: []int64{1, 2, 3}, + Values: []float64{1.5, 2.5, 3}, + }, }, }, }, { + Agent: agent, + Service: service, Timestamp: timestamp1, - Samples: map[string]model.MetricsetSample{ - "int_gauge_metric": {Value: 3, Type: "gauge"}, - "gauge_metric": {Value: 7, Type: "gauge"}, + Metricset: &model.Metricset{ + Samples: map[string]model.MetricsetSample{ + "int_gauge_metric": {Value: 3, Type: "gauge"}, + "gauge_metric": {Value: 7, Type: "gauge"}, + }, }, }, { - Timestamp: timestamp1, + Agent: agent, + Service: service, Labels: common.MapStr{"k": "v"}, - Samples: map[string]model.MetricsetSample{ - "int_gauge_metric": {Value: 2, Type: "gauge"}, - "gauge_metric": {Value: 6, Type: "gauge"}, - "int_sum_metric": {Value: 10, Type: "counter"}, - "sum_metric": {Value: 13, Type: "counter"}, + Timestamp: timestamp1, + Metricset: &model.Metricset{ + Samples: map[string]model.MetricsetSample{ + "int_gauge_metric": {Value: 2, Type: "gauge"}, + "gauge_metric": {Value: 6, Type: "gauge"}, + "int_sum_metric": {Value: 10, Type: "counter"}, + "sum_metric": {Value: 13, Type: "counter"}, + }, }, }, { - Timestamp: timestamp1, + Agent: agent, + Service: service, Labels: common.MapStr{"k": "v2"}, - Samples: map[string]model.MetricsetSample{ - "int_gauge_metric": {Value: 4, Type: "gauge"}, - "gauge_metric": {Value: 8, Type: "gauge"}, + Timestamp: timestamp1, + Metricset: &model.Metricset{ + Samples: map[string]model.MetricsetSample{ + "int_gauge_metric": {Value: 4, Type: "gauge"}, + "gauge_metric": {Value: 8, Type: "gauge"}, + }, }, }, { - Timestamp: timestamp1, + Agent: agent, + Service: service, Labels: common.MapStr{"k2": "v"}, - Samples: map[string]model.MetricsetSample{ - "int_sum_metric": {Value: 11, Type: "counter"}, - "sum_metric": {Value: 14, Type: "counter"}, + Timestamp: timestamp1, + Metricset: &model.Metricset{ + Samples: map[string]model.MetricsetSample{ + "int_sum_metric": {Value: 11, Type: "counter"}, + "sum_metric": {Value: 14, Type: "counter"}, + }, }, - }}, metricsets) + }}, events) } func TestConsumeMetrics_JVM(t *testing.T) { @@ -241,51 +263,69 @@ func TestConsumeMetrics_JVM(t *testing.T) { addInt64Sum("runtime.jvm.gc.count", 2, map[string]string{"gc": "G1 Young Generation"}) addInt64Gauge("runtime.jvm.memory.area", 42, map[string]string{"area": "heap", "type": "used"}) - metricsets, _ := transformMetrics(t, metrics) - assert.Equal(t, []*model.Metricset{{ + events, _ := transformMetrics(t, metrics) + service := model.Service{Name: "unknown", Language: model.Language{Name: "unknown"}} + agent := model.Agent{Name: "otlp", Version: "unknown"} + assert.Equal(t, []model.APMEvent{{ + Agent: agent, + Service: service, Timestamp: timestamp, - Samples: map[string]model.MetricsetSample{ - "jvm.memory.heap.used": { - Type: "gauge", - Value: 42, + Metricset: &model.Metricset{ + Samples: map[string]model.MetricsetSample{ + "jvm.memory.heap.used": { + Type: "gauge", + Value: 42, + }, }, }, }, { - Timestamp: timestamp, + Agent: agent, + Service: service, Labels: common.MapStr{"gc": "G1 Young Generation"}, - Samples: map[string]model.MetricsetSample{ - "runtime.jvm.gc.time": { - Type: "counter", - Value: 9, - }, - "runtime.jvm.gc.count": { - Type: "counter", - Value: 2, + Timestamp: timestamp, + Metricset: &model.Metricset{ + Samples: map[string]model.MetricsetSample{ + "runtime.jvm.gc.time": { + Type: "counter", + Value: 9, + }, + "runtime.jvm.gc.count": { + Type: "counter", + Value: 2, + }, }, }, }, { - Timestamp: timestamp, + Agent: agent, + Service: service, Labels: common.MapStr{"name": "G1 Young Generation"}, - Samples: map[string]model.MetricsetSample{ - "jvm.gc.time": { - Type: "counter", - Value: 9, - }, - "jvm.gc.count": { - Type: "counter", - Value: 2, + Timestamp: timestamp, + Metricset: &model.Metricset{ + Samples: map[string]model.MetricsetSample{ + "jvm.gc.time": { + Type: "counter", + Value: 9, + }, + "jvm.gc.count": { + Type: "counter", + Value: 2, + }, }, }, }, { - Timestamp: timestamp, + Agent: agent, + Service: service, Labels: common.MapStr{"area": "heap", "type": "used"}, - Samples: map[string]model.MetricsetSample{ - "runtime.jvm.memory.area": { - Type: "gauge", - Value: 42, + Timestamp: timestamp, + Metricset: &model.Metricset{ + Samples: map[string]model.MetricsetSample{ + "runtime.jvm.memory.area": { + Type: "gauge", + Value: 42, + }, }, }, - }}, metricsets) + }}, events) } func TestConsumeMetricsExportTimestamp(t *testing.T) { @@ -319,9 +359,9 @@ func TestConsumeMetricsExportTimestamp(t *testing.T) { dp.SetTimestamp(pdata.TimestampFromTime(exportedDataPointTimestamp)) dp.SetValue(1) - metricsets, _ := transformMetrics(t, metrics) - require.Len(t, metricsets, 1) - assert.InDelta(t, now.Add(dataPointOffset).Unix(), metricsets[0].Timestamp.Unix(), allowedError) + events, _ := transformMetrics(t, metrics) + require.Len(t, events, 1) + assert.InDelta(t, now.Add(dataPointOffset).Unix(), events[0].Timestamp.Unix(), allowedError) } func TestMetricsLogging(t *testing.T) { @@ -339,7 +379,7 @@ func TestMetricsLogging(t *testing.T) { } } -func transformMetrics(t *testing.T, metrics pdata.Metrics) ([]*model.Metricset, otel.ConsumerStats) { +func transformMetrics(t *testing.T, metrics pdata.Metrics) ([]model.APMEvent, otel.ConsumerStats) { var batches []*model.Batch recorder := batchRecorderBatchProcessor(&batches) @@ -347,11 +387,5 @@ func transformMetrics(t *testing.T, metrics pdata.Metrics) ([]*model.Metricset, err := consumer.ConsumeMetrics(context.Background(), metrics) require.NoError(t, err) require.Len(t, batches, 1) - - batch := *batches[0] - metricsets := make([]*model.Metricset, len(batch)) - for i, event := range batch { - metricsets[i] = event.Metricset - } - return metricsets, consumer.Stats() + return *batches[0], consumer.Stats() } diff --git a/processor/otel/traces.go b/processor/otel/traces.go index e31c0117e41..31a99061480 100644 --- a/processor/otel/traces.go +++ b/processor/otel/traces.go @@ -198,7 +198,6 @@ func (c *Consumer) convertSpan( if endTime.After(startTime) { durationMillis = endTime.Sub(startTime).Seconds() * 1000 } - timestamp := startTime.Add(timeDelta) // Message consumption results in either a transaction or a span based // on whether the consumption is active or passive. Otel spans @@ -207,33 +206,37 @@ func (c *Consumer) convertSpan( // therefore start a transaction whenever span kind == consumer. name := otelSpan.Name() event := baseEvent + event.Labels = initEventLabels(event.Labels) + event.Timestamp = startTime.Add(timeDelta) if root || otelSpan.Kind() == pdata.SpanKindServer || otelSpan.Kind() == pdata.SpanKindConsumer { event.Transaction = &model.Transaction{ - ID: spanID, - ParentID: parentID, - TraceID: traceID, - Timestamp: timestamp, - Duration: durationMillis, - Name: name, - Sampled: true, - Outcome: spanStatusOutcome(otelSpan.Status()), + ID: spanID, + ParentID: parentID, + TraceID: traceID, + Duration: durationMillis, + Name: name, + Sampled: true, + Outcome: spanStatusOutcome(otelSpan.Status()), } translateTransaction(otelSpan, otelLibrary, &event) } else { event.Span = &model.Span{ - ID: spanID, - ParentID: parentID, - TraceID: traceID, - Timestamp: timestamp, - Duration: durationMillis, - Name: name, - Outcome: spanStatusOutcome(otelSpan.Status()), + ID: spanID, + ParentID: parentID, + TraceID: traceID, + Duration: durationMillis, + Name: name, + Outcome: spanStatusOutcome(otelSpan.Status()), } translateSpan(otelSpan, &event) } + if len(event.Labels) == 0 { + event.Labels = nil + } *out = append(*out, event) events := otelSpan.Events() + event.Labels = baseEvent.Labels // only copy common labels to span events for i := 0; i < events.Len(); i++ { convertSpanEvent(logger, events.At(i), event, timeDelta, out) } @@ -245,7 +248,6 @@ func translateTransaction( event *model.APMEvent, ) { isJaeger := strings.HasPrefix(event.Agent.Name, "Jaeger") - labels := make(common.MapStr) var ( netHostName string @@ -289,11 +291,11 @@ func translateTransaction( k := replaceDots(kDots) switch v.Type() { case pdata.AttributeValueTypeArray: - labels[k] = ifaceAnyValueArray(v.ArrayVal()) + event.Labels[k] = ifaceAnyValueArray(v.ArrayVal()) case pdata.AttributeValueTypeBool: - labels[k] = v.BoolVal() + event.Labels[k] = v.BoolVal() case pdata.AttributeValueTypeDouble: - labels[k] = v.DoubleVal() + event.Labels[k] = v.DoubleVal() case pdata.AttributeValueTypeInt: switch kDots { case conventions.AttributeHTTPStatusCode: @@ -307,7 +309,7 @@ func translateTransaction( case "rpc.grpc.status_code": event.Transaction.Result = codes.Code(v.IntVal()).String() default: - labels[k] = v.IntVal() + event.Labels[k] = v.IntVal() } case pdata.AttributeValueTypeString: stringval := truncate(v.StringVal()) @@ -335,7 +337,7 @@ func translateTransaction( case "http.protocol": if !strings.HasPrefix(stringval, "HTTP/") { // Unexpected, store in labels for debugging. - labels[k] = stringval + event.Labels[k] = stringval break } stringval = strings.TrimPrefix(stringval, "HTTP/") @@ -410,7 +412,7 @@ func translateTransaction( component = stringval fallthrough default: - labels[k] = stringval + event.Labels[k] = stringval } } return true @@ -480,7 +482,7 @@ func translateTransaction( if samplerType != (pdata.AttributeValue{}) { // The client has reported its sampling rate, so we can use it to extrapolate span metrics. - parseSamplerAttributes(samplerType, samplerParam, &event.Transaction.RepresentativeCount, labels) + parseSamplerAttributes(samplerType, samplerParam, &event.Transaction.RepresentativeCount, event.Labels) } else { event.Transaction.RepresentativeCount = 1 } @@ -492,12 +494,10 @@ func translateTransaction( event.Service.Framework.Name = name event.Service.Framework.Version = library.Version() } - event.Transaction.Labels = labels } func translateSpan(span pdata.Span, event *model.APMEvent) { isJaeger := strings.HasPrefix(event.Agent.Name, "Jaeger") - labels := make(common.MapStr) var ( netPeerName string @@ -542,11 +542,11 @@ func translateSpan(span pdata.Span, event *model.APMEvent) { k := replaceDots(kDots) switch v.Type() { case pdata.AttributeValueTypeArray: - labels[k] = ifaceAnyValueArray(v.ArrayVal()) + event.Labels[k] = ifaceAnyValueArray(v.ArrayVal()) case pdata.AttributeValueTypeBool: - labels[k] = v.BoolVal() + event.Labels[k] = v.BoolVal() case pdata.AttributeValueTypeDouble: - labels[k] = v.DoubleVal() + event.Labels[k] = v.DoubleVal() case pdata.AttributeValueTypeInt: switch kDots { case "http.status_code": @@ -558,7 +558,7 @@ func translateSpan(span pdata.Span, event *model.APMEvent) { case "rpc.grpc.status_code": // Ignored for spans. default: - labels[k] = v.IntVal() + event.Labels[k] = v.IntVal() } case pdata.AttributeValueTypeString: stringval := truncate(v.StringVal()) @@ -661,7 +661,7 @@ func translateSpan(span pdata.Span, event *model.APMEvent) { component = stringval fallthrough default: - labels[k] = stringval + event.Labels[k] = stringval } } return true @@ -795,12 +795,10 @@ func translateSpan(span pdata.Span, event *model.APMEvent) { if samplerType != (pdata.AttributeValue{}) { // The client has reported its sampling rate, so we can use it to extrapolate transaction metrics. - parseSamplerAttributes(samplerType, samplerParam, &event.Span.RepresentativeCount, labels) + parseSamplerAttributes(samplerType, samplerParam, &event.Span.RepresentativeCount, event.Labels) } else { event.Span.RepresentativeCount = 1 } - - event.Span.Labels = labels } func parseSamplerAttributes(samplerType, samplerParam pdata.AttributeValue, representativeCount *float64, labels common.MapStr) { @@ -823,7 +821,7 @@ func parseSamplerAttributes(samplerType, samplerParam pdata.AttributeValue, repr func convertSpanEvent( logger *logp.Logger, - event pdata.SpanEvent, + spanEvent pdata.SpanEvent, parent model.APMEvent, // either span or transaction timeDelta time.Duration, out *model.Batch, @@ -831,7 +829,7 @@ func convertSpanEvent( var e *model.Error isJaeger := strings.HasPrefix(parent.Agent.Name, "Jaeger") if isJaeger { - e = convertJaegerErrorSpanEvent(logger, event) + e = convertJaegerErrorSpanEvent(logger, spanEvent) } else { // Translate exception span events to errors. // @@ -839,14 +837,14 @@ func convertSpanEvent( // // TODO(axw) we don't currently support arbitrary events, we only look // for exceptions and convert those to Elastic APM error events. - if event.Name() != "exception" { + if spanEvent.Name() != "exception" { // Per OpenTelemetry semantic conventions: // `The name of the event MUST be "exception"` return } var exceptionEscaped bool var exceptionMessage, exceptionStacktrace, exceptionType string - event.Attributes().Range(func(k string, v pdata.AttributeValue) bool { + spanEvent.Attributes().Range(func(k string, v pdata.AttributeValue) bool { switch k { case conventions.AttributeExceptionMessage: exceptionMessage = v.StringVal() @@ -866,10 +864,7 @@ func convertSpanEvent( // - exception.message` return } - timestamp := event.Timestamp().AsTime() - timestamp = timestamp.Add(timeDelta) e = convertOpenTelemetryExceptionSpanEvent( - timestamp, exceptionType, exceptionMessage, exceptionStacktrace, exceptionEscaped, parent.Service.Language.Name, ) @@ -877,6 +872,7 @@ func convertSpanEvent( if e != nil { event := parent event.Error = e + event.Timestamp = spanEvent.Timestamp().AsTime().Add(timeDelta) if parent.Transaction != nil { event.Transaction = nil addTransactionCtxToErr(parent.Transaction, event.Error) @@ -931,9 +927,7 @@ func convertJaegerErrorSpanEvent(logger *logp.Logger, event pdata.SpanEvent) *mo logger.Debugf("Cannot convert span event (name=%q) into elastic apm error: %v", event.Name()) return nil } - e := &model.Error{ - Timestamp: event.Timestamp().AsTime(), - } + e := &model.Error{} if logMessage != "" { e.Log = &model.Log{Message: logMessage} } diff --git a/processor/otel/traces_test.go b/processor/otel/traces_test.go index 2125c440eb5..79eee7d9487 100644 --- a/processor/otel/traces_test.go +++ b/processor/otel/traces_test.go @@ -438,7 +438,7 @@ func TestDatabaseSpan(t *testing.T) { assert.Equal(t, common.MapStr{ "db_connection_string": connectionString, "net_transport": "IP.TCP", - }, event.Span.Labels) + }, event.Labels) assert.Equal(t, &model.Destination{ Address: "shopdb.example.com", @@ -478,7 +478,7 @@ func TestRPCTransaction(t *testing.T) { }) assert.Equal(t, "request", event.Transaction.Type) assert.Equal(t, "Unavailable", event.Transaction.Result) - assert.Empty(t, event.Transaction.Labels) + assert.Empty(t, event.Labels) assert.Equal(t, model.Client{ Domain: "peer_name", IP: net.ParseIP("10.20.30.40"), @@ -497,7 +497,7 @@ func TestRPCSpan(t *testing.T) { }) assert.Equal(t, "external", event.Span.Type) assert.Equal(t, "grpc", event.Span.Subtype) - assert.Empty(t, event.Span.Labels) + assert.Empty(t, event.Labels) assert.Equal(t, &model.Destination{ Address: "10.20.30.40", Port: 123, @@ -520,7 +520,7 @@ func TestMessagingTransaction(t *testing.T) { s.SetParentSpanID(pdata.NewSpanID([8]byte{3})) }) assert.Equal(t, "messaging", event.Transaction.Type) - assert.Empty(t, event.Transaction.Labels) + assert.Empty(t, event.Labels) assert.Equal(t, &model.Message{ QueueName: "myQueue", }, event.Transaction.Message) @@ -538,7 +538,7 @@ func TestMessagingSpan(t *testing.T) { assert.Equal(t, "messaging", event.Span.Type) assert.Equal(t, "kafka", event.Span.Subtype) assert.Equal(t, "send", event.Span.Action) - assert.Empty(t, event.Span.Labels) + assert.Empty(t, event.Labels) assert.Equal(t, &model.Destination{ Address: "10.20.30.40", Port: 123, @@ -590,7 +590,7 @@ func TestArrayLabels(t *testing.T) { assert.Equal(t, common.MapStr{ "bool_array": []interface{}{false, true}, "string_array": []interface{}{"string1", "string2"}, - }, txEvent.Transaction.Labels) + }, txEvent.Labels) spanEvent := transformSpanWithAttributes(t, map[string]pdata.AttributeValue{ "string_array": stringArray, @@ -599,7 +599,7 @@ func TestArrayLabels(t *testing.T) { assert.Equal(t, common.MapStr{ "bool_array": []interface{}{false, true}, "string_array": []interface{}{"string1", "string2"}, - }, spanEvent.Span.Labels) + }, spanEvent.Labels) } func TestConsumeTracesExportTimestamp(t *testing.T) { @@ -656,9 +656,9 @@ func TestConsumeTracesExportTimestamp(t *testing.T) { require.Len(t, batch, 3) // Give some leeway for one event, and check other events' timestamps relative to that one. - assert.InDelta(t, now.Add(transactionOffset).Unix(), batch[0].Transaction.Timestamp.Unix(), allowedError) - assert.Equal(t, spanOffset-transactionOffset, batch[1].Span.Timestamp.Sub(batch[0].Transaction.Timestamp)) - assert.Equal(t, exceptionOffset-transactionOffset, batch[2].Error.Timestamp.Sub(batch[0].Transaction.Timestamp)) + assert.InDelta(t, now.Add(transactionOffset).Unix(), batch[0].Timestamp.Unix(), allowedError) + assert.Equal(t, spanOffset-transactionOffset, batch[1].Timestamp.Sub(batch[0].Timestamp)) + assert.Equal(t, exceptionOffset-transactionOffset, batch[2].Timestamp.Sub(batch[0].Timestamp)) // Durations should be unaffected. assert.Equal(t, float64(transactionDuration.Milliseconds()), batch[0].Transaction.Duration) diff --git a/processor/stream/processor.go b/processor/stream/processor.go index ae102cd3f89..0dfcce4e144 100644 --- a/processor/stream/processor.go +++ b/processor/stream/processor.go @@ -22,7 +22,6 @@ import ( "context" "io" "sync" - "time" "go.elastic.co/apm" @@ -134,7 +133,6 @@ func (p *Processor) identifyEventType(body []byte) []byte { // the error err. func (p *Processor) readBatch( ctx context.Context, - requestTime time.Time, baseEvent model.APMEvent, batchSize int, batch *model.Batch, @@ -160,7 +158,7 @@ func (p *Processor) readBatch( // required for backwards compatibility - sending empty lines was permitted in previous versions continue } - input := modeldecoder.Input{RequestTime: requestTime, Base: baseEvent, Config: p.Mconfig} + input := modeldecoder.Input{Base: baseEvent, Config: p.Mconfig} switch eventType := p.identifyEventType(body); string(eventType) { case errorEventType: err = v2.DecodeNestedError(reader, &input, batch) @@ -216,15 +214,14 @@ func (p *Processor) HandleStream( // no point in continuing if we couldn't read the metadata return err } - - requestTime := utility.RequestTime(ctx) + baseEvent.Timestamp = utility.RequestTime(ctx) sp, ctx := apm.StartSpan(ctx, "Stream", "Reporter") defer sp.End() for { var batch model.Batch - n, readErr := p.readBatch(ctx, requestTime, baseEvent, batchSize, &batch, sr, result) + n, readErr := p.readBatch(ctx, baseEvent, batchSize, &batch, sr, result) if n > 0 { // NOTE(axw) ProcessBatch takes ownership of batch, which means we cannot reuse // the slice memory. We should investigate alternative interfaces between the diff --git a/x-pack/apm-server/aggregation/spanmetrics/aggregator.go b/x-pack/apm-server/aggregation/spanmetrics/aggregator.go index 3361a18b4be..6f70926c461 100644 --- a/x-pack/apm-server/aggregation/spanmetrics/aggregator.go +++ b/x-pack/apm-server/aggregation/spanmetrics/aggregator.go @@ -258,14 +258,14 @@ type spanMetrics struct { func makeMetricset(timestamp time.Time, key aggregationKey, metrics spanMetrics, interval int64) model.APMEvent { out := model.APMEvent{ - Agent: model.Agent{Name: key.agentName}, + Timestamp: timestamp, + Agent: model.Agent{Name: key.agentName}, Service: model.Service{ Name: key.serviceName, Environment: key.serviceEnvironment, }, Metricset: &model.Metricset{ - Timestamp: timestamp, - Name: metricsetName, + Name: metricsetName, Event: model.MetricsetEventCategorization{ Outcome: key.outcome, }, diff --git a/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go b/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go index a89fe6422eb..fc39c617f68 100644 --- a/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go +++ b/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go @@ -298,8 +298,8 @@ func batchMetricsets(t testing.TB, batch model.Batch) []model.APMEvent { if event.Metricset == nil { continue } - require.NotZero(t, event.Metricset.Timestamp) - event.Metricset.Timestamp = time.Time{} + require.NotZero(t, event.Timestamp) + event.Timestamp = time.Time{} metricsets = append(metricsets, event) } return metricsets diff --git a/x-pack/apm-server/aggregation/txmetrics/aggregator.go b/x-pack/apm-server/aggregation/txmetrics/aggregator.go index 8c6a33c8edc..319613fdc2b 100644 --- a/x-pack/apm-server/aggregation/txmetrics/aggregator.go +++ b/x-pack/apm-server/aggregation/txmetrics/aggregator.go @@ -369,6 +369,7 @@ func makeMetricset( timeseriesInstanceID.WriteString(fmt.Sprintf("%x", hash)) return model.APMEvent{ + Timestamp: ts, Agent: model.Agent{Name: key.agentName}, Container: model.Container{ID: key.containerID}, Kubernetes: model.Kubernetes{PodName: key.kubernetesPodName}, @@ -381,8 +382,7 @@ func makeMetricset( Hostname: key.hostname, }, Metricset: &model.Metricset{ - Timestamp: ts, - Name: metricsetName, + Name: metricsetName, Event: model.MetricsetEventCategorization{ Outcome: key.transactionOutcome, }, diff --git a/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go b/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go index 8e5b81c8150..dc712ac472f 100644 --- a/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go +++ b/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go @@ -289,7 +289,7 @@ func TestAggregateRepresentativeCount(t *testing.T) { }) require.NotNil(t, m.Metricset) - m.Metricset.Timestamp = time.Time{} + m.Timestamp = time.Time{} assert.Equal(t, model.APMEvent{ Metricset: &model.Metricset{ Name: "transaction", @@ -529,8 +529,8 @@ func batchMetricsets(t testing.TB, batch model.Batch) []model.APMEvent { if event.Metricset == nil { continue } - require.NotZero(t, event.Metricset.Timestamp) - event.Metricset.Timestamp = time.Time{} + require.NotZero(t, event.Timestamp) + event.Timestamp = time.Time{} metricsets = append(metricsets, event) } return metricsets