Skip to content

Commit

Permalink
Move more code from model to model processors (#5687) (#5724)
Browse files Browse the repository at this point in the history
Move more logic out of model transformation,
into dedicated model processors:

 - modelprocessor.SetExcludeFromGrouping matches
   stack frames against a regular expression,
   setting the "exclude_from_grouping" field

 - modelprocessor.SetLibraryFrame matches stack
   frames against a regular expression, setting the
   "library_frame" field

 - modelprocessor.SetCulprit identifies the first
   source mapped, non-library, stack frame and uses
   it to set or update the error culprit

The "library_frame" field is now only set if it is
true. The UI uses this field but takes omission to
mean the same thing as false (i.e. "falsy")

The new model processors are only installed for
the RUM intake APIs. With these changes, we no
longer need to identify events as originating from
the RUM endpoint, so we remove the model.Span.RUM
and model.Error.RUM fields.

(cherry picked from commit ce483db)

Co-authored-by: Andrew Wilkins <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
mergify[bot] and axw authored Jul 15, 2021
1 parent c35dd1b commit b8e06dc
Show file tree
Hide file tree
Showing 30 changed files with 599 additions and 447 deletions.
36 changes: 28 additions & 8 deletions beater/api/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package api
import (
"net/http"
"net/http/pprof"
"regexp"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
Expand Down Expand Up @@ -173,17 +176,34 @@ func (r *routeBuilder) rumIntakeHandler(newProcessor func(*config.Config) *strea
requestMetadataFunc = rumRequestMetadata
}
return func() (request.Handler, error) {
batchProcessor := r.batchProcessor
var batchProcessors modelprocessor.Chained
// The order of these processors is important. Source mapping must happen before identifying library frames, or
// frames to exclude from error grouping; identifying library frames must happen before updating the error culprit.
if r.sourcemapStore != nil {
batchProcessor = modelprocessor.Chained{
sourcemap.BatchProcessor{
Store: r.sourcemapStore,
Timeout: r.cfg.RumConfig.SourceMapping.Timeout,
},
batchProcessor,
batchProcessors = append(batchProcessors, sourcemap.BatchProcessor{
Store: r.sourcemapStore,
Timeout: r.cfg.RumConfig.SourceMapping.Timeout,
})
}
if r.cfg.RumConfig.LibraryPattern != "" {
re, err := regexp.Compile(r.cfg.RumConfig.LibraryPattern)
if err != nil {
return nil, errors.Wrap(err, "invalid library pattern regex")
}
batchProcessors = append(batchProcessors, modelprocessor.SetLibraryFrame{Pattern: re})
}
if r.cfg.RumConfig.ExcludeFromGrouping != "" {
re, err := regexp.Compile(r.cfg.RumConfig.ExcludeFromGrouping)
if err != nil {
return nil, errors.Wrap(err, "invalid exclude from grouping regex")
}
batchProcessors = append(batchProcessors, modelprocessor.SetExcludeFromGrouping{Pattern: re})
}
if r.sourcemapStore != nil {
batchProcessors = append(batchProcessors, modelprocessor.SetCulprit{})
}
h := intake.Handler(newProcessor(r.cfg), requestMetadataFunc, batchProcessor)
batchProcessors = append(batchProcessors, r.batchProcessor) // r.batchProcessor always goes last
h := intake.Handler(newProcessor(r.cfg), requestMetadataFunc, batchProcessors)
return middleware.Wrap(h, rumMiddleware(r.cfg, r.authenticator, r.ratelimitStore, intake.MonitoringMap)...)
}
}
Expand Down
5 changes: 0 additions & 5 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"net"
"net/http"
"os"
"regexp"
"runtime"
"strings"
"sync"
Expand Down Expand Up @@ -628,10 +627,6 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ
func newTransformConfig(beatInfo beat.Info, cfg *config.Config) *transform.Config {
return &transform.Config{
DataStreams: cfg.DataStreams.Enabled,
RUM: transform.RUMConfig{
LibraryPattern: regexp.MustCompile(cfg.RumConfig.LibraryPattern),
ExcludeFromGrouping: regexp.MustCompile(cfg.RumConfig.ExcludeFromGrouping),
},
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@
"exclude_from_grouping": false,
"filename": "/webpack/file/name.py",
"function": "foo",
"library_frame": false,
"line": {
"column": 4,
"context": "line3",
Expand Down
1 change: 0 additions & 1 deletion docs/data/elasticsearch/generated/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@
"exclude_from_grouping": false,
"filename": "/webpack/file/name.py",
"function": "foo",
"library_frame": false,
"line": {
"column": 4,
"context": "line3",
Expand Down
59 changes: 8 additions & 51 deletions model/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ type Error struct {
TransactionSampled *bool
TransactionType string

// RUM records whether or not this is a RUM error,
// and should have its stack frames sourcemapped.
RUM bool

Experimental interface{}
}

Expand Down Expand Up @@ -111,7 +107,7 @@ func (e *Error) appendBeatEvents(ctx context.Context, cfg *transform.Config, eve
}

fields := mapStr{
"error": e.fields(ctx, cfg),
"error": e.fields(),
"processor": errorProcessorEntry,
}

Expand Down Expand Up @@ -157,63 +153,24 @@ func (e *Error) appendBeatEvents(ctx context.Context, cfg *transform.Config, eve
})
}

func (e *Error) fields(ctx context.Context, cfg *transform.Config) common.MapStr {
func (e *Error) fields() common.MapStr {
var fields mapStr
fields.maybeSetString("id", e.ID)
fields.maybeSetMapStr("page", e.Page.Fields())

exceptionChain := flattenExceptionTree(e.Exception)
if exception := e.exceptionFields(ctx, cfg, exceptionChain); len(exception) > 0 {
if exception := e.exceptionFields(exceptionChain); len(exception) > 0 {
fields.set("exception", exception)
}
fields.maybeSetMapStr("log", e.logFields(ctx, cfg))
fields.maybeSetMapStr("log", e.logFields())

e.updateCulprit()
fields.maybeSetString("culprit", e.Culprit)
fields.maybeSetMapStr("custom", customFields(e.Custom))
fields.maybeSetString("grouping_key", e.calcGroupingKey(exceptionChain))
return common.MapStr(fields)
}

// TODO(axw) introduce another processor which sets library_frame
// and exclude_from_grouping, only applied for RUM. Then we get rid
// of Error.RUM and Span.RUM.
func (e *Error) updateCulprit() {
if !e.RUM {
return
}
var fr *StacktraceFrame
if e.Log != nil {
fr = findSmappedNonLibraryFrame(e.Log.Stacktrace)
}
if fr == nil && e.Exception != nil {
fr = findSmappedNonLibraryFrame(e.Exception.Stacktrace)
}
if fr == nil {
return
}
var culprit string
if fr.Filename != "" {
culprit = fr.Filename
} else if fr.Classname != "" {
culprit = fr.Classname
}
if fr.Function != "" {
culprit += fmt.Sprintf(" in %v", fr.Function)
}
e.Culprit = culprit
}

func findSmappedNonLibraryFrame(frames []*StacktraceFrame) *StacktraceFrame {
for _, fr := range frames {
if fr.SourcemapUpdated && !fr.IsLibraryFrame() {
return fr
}
}
return nil
}

func (e *Error) exceptionFields(ctx context.Context, cfg *transform.Config, chain []Exception) []common.MapStr {
func (e *Error) exceptionFields(chain []Exception) []common.MapStr {
var result []common.MapStr
for _, exception := range chain {
var ex mapStr
Expand Down Expand Up @@ -242,7 +199,7 @@ func (e *Error) exceptionFields(ctx context.Context, cfg *transform.Config, chai
if n := len(exception.Stacktrace); n > 0 {
frames := make([]common.MapStr, n)
for i, frame := range exception.Stacktrace {
frames[i] = frame.transform(cfg, e.RUM)
frames[i] = frame.transform()
}
ex.set("stacktrace", frames)
}
Expand All @@ -252,7 +209,7 @@ func (e *Error) exceptionFields(ctx context.Context, cfg *transform.Config, chai
return result
}

func (e *Error) logFields(ctx context.Context, cfg *transform.Config) common.MapStr {
func (e *Error) logFields() common.MapStr {
if e.Log == nil {
return nil
}
Expand All @@ -261,7 +218,7 @@ func (e *Error) logFields(ctx context.Context, cfg *transform.Config) common.Map
log.maybeSetString("param_message", e.Log.ParamMessage)
log.maybeSetString("logger_name", e.Log.LoggerName)
log.maybeSetString("level", e.Log.Level)
if st := e.Log.Stacktrace.transform(ctx, cfg, e.RUM); len(st) > 0 {
if st := e.Log.Stacktrace.transform(); len(st) > 0 {
log.set("stacktrace", st)
}
return common.MapStr(log)
Expand Down
115 changes: 0 additions & 115 deletions model/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ func TestEventFields(t *testing.T) {
Exception: &exception,
Log: &log,
TransactionID: trID,
RUM: true,

// Service name and version are required for sourcemapping.
Metadata: Metadata{
Expand Down Expand Up @@ -357,7 +356,6 @@ func TestEvents(t *testing.T) {
HTTP: &Http{Request: &Req{Referer: referer}},
URL: &URL{Original: url},
Custom: custom,
RUM: true,
},

Output: common.MapStr{
Expand Down Expand Up @@ -410,119 +408,6 @@ func TestEvents(t *testing.T) {
}
}

func TestCulprit(t *testing.T) {
c := "foo"
fct := "fct"
truthy := true
st := Stacktrace{
&StacktraceFrame{Filename: "a", Function: fct},
}
stUpdate := Stacktrace{
&StacktraceFrame{Filename: "a", Function: fct},
&StacktraceFrame{Filename: "a", LibraryFrame: &truthy, SourcemapUpdated: true},
&StacktraceFrame{Filename: "f", Function: fct, SourcemapUpdated: true},
&StacktraceFrame{Filename: "bar", Function: fct, SourcemapUpdated: true},
}
tests := []struct {
event Error
culprit string
msg string
}{
{
event: Error{Culprit: c, RUM: false},
culprit: "foo",
msg: "Not a RUM event",
},
{
event: Error{Culprit: c, RUM: true},
culprit: "foo",
msg: "No Stacktrace Frame given.",
},
{
event: Error{Culprit: c, RUM: true, Log: &Log{Stacktrace: st}},
culprit: "foo",
msg: "Log.StacktraceFrame has no updated frame",
},
{
event: Error{
Culprit: c,
RUM: true,
Log: &Log{
Stacktrace: Stacktrace{
&StacktraceFrame{
Filename: "f",
Classname: "xyz",
SourcemapUpdated: true,
},
},
},
},
culprit: "f",
msg: "Adapt culprit to first valid Log.StacktraceFrame filename information.",
},
{
event: Error{
Culprit: c,
RUM: true,
Log: &Log{
Stacktrace: Stacktrace{
&StacktraceFrame{
Classname: "xyz",
SourcemapUpdated: true,
},
},
},
},
culprit: "xyz",
msg: "Adapt culprit Log.StacktraceFrame classname information.",
},
{
event: Error{
Culprit: c,
RUM: true,
Exception: &Exception{Stacktrace: stUpdate},
},
culprit: "f in fct",
msg: "Adapt culprit to first valid Exception.StacktraceFrame information.",
},
{
event: Error{
Culprit: c,
RUM: true,
Log: &Log{Stacktrace: st},
Exception: &Exception{Stacktrace: stUpdate},
},
culprit: "f in fct",
msg: "Log and Exception StacktraceFrame given, only one changes culprit.",
},
{
event: Error{
Culprit: c,
RUM: true,
Log: &Log{
Stacktrace: Stacktrace{
&StacktraceFrame{
Filename: "a",
Function: fct,
SourcemapUpdated: true,
},
},
},
Exception: &Exception{Stacktrace: stUpdate},
},
culprit: "a in fct",
msg: "Log Stacktrace is prioritized over Exception StacktraceFrame",
},
}
for idx, test := range tests {
t.Run(fmt.Sprint(idx), func(t *testing.T) {
test.event.updateCulprit()
assert.Equal(t, test.culprit, test.event.Culprit,
fmt.Sprintf("(%v) %s: expected <%v>, received <%v>", idx, test.msg, test.culprit, test.event.Culprit))
})
}
}

func TestErrorTransformPage(t *testing.T) {
id := "123"
urlExample := "http://example.com/path"
Expand Down
2 changes: 0 additions & 2 deletions model/modeldecoder/rumv3/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ func TestDecodeMapToErrorModel(t *testing.T) {
"Experimental",
// URL parts are derived from url (separately tested)
"URL", "Page.URL",
// RUM is set in stream processor
"RUM",
// exception.parent is only set after calling `flattenExceptionTree` (not part of decoding)
"Exception.Parent",
// stacktrace original and sourcemap values are set when sourcemapping is applied
Expand Down
4 changes: 0 additions & 4 deletions model/modeldecoder/rumv3/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ func TestDecodeMapToTransactionModel(t *testing.T) {
// URL parts are derived from page.url (separately tested)
"URL", "Page.URL",
// HTTP.Request.Referrer is derived from page.referer (separately tested)
// RUM is set in stream processor
"RUM",
} {
if strings.HasPrefix(key, s) {
return true
Expand Down Expand Up @@ -243,8 +241,6 @@ func TestDecodeMapToTransactionModel(t *testing.T) {
"Stacktrace.Sourcemap",
// ExcludeFromGrouping is set when processing the event
"Stacktrace.ExcludeFromGrouping",
// RUM is set in stream processor
"RUM",
// Transaction related information is set within the DecodeNestedTransaction method
// it is separatly tested in TestDecodeNestedTransaction
"TransactionID", "TraceID", "ParentID",
Expand Down
2 changes: 1 addition & 1 deletion model/modeldecoder/v2/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ func mapToStracktraceModel(from []stacktraceFrame, out model.Stacktrace) {
}
if eventFrame.LibraryFrame.IsSet() {
val := eventFrame.LibraryFrame.Val
fr.LibraryFrame = &val
fr.LibraryFrame = val
}
if eventFrame.LineNumber.IsSet() {
val := eventFrame.LineNumber.Val
Expand Down
2 changes: 0 additions & 2 deletions model/modeldecoder/v2/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ func TestDecodeMapToErrorModel(t *testing.T) {
"Metadata",
// URL parts are derived from url (separately tested)
"Page.URL",
// RUM is set in stream processor
"RUM",
// exception.parent is only set after calling `flattenExceptionTree` (not part of decoding)
"Exception.Parent",
// stacktrace original and sourcemap values are set when sourcemapping is applied
Expand Down
2 changes: 0 additions & 2 deletions model/modeldecoder/v2/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ func TestDecodeMapToSpanModel(t *testing.T) {
for _, s := range []string{
// experimental is tested in test 'experimental'
"Experimental",
// RUM is set in stream processor
"RUM",
// RepresentativeCount is tested further down in test 'sample-rate'
"RepresentativeCount"} {
if key == s {
Expand Down
Loading

0 comments on commit b8e06dc

Please sign in to comment.