Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move more code from model to model processors #5687

Merged
merged 2 commits into from
Jul 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions beater/api/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package api
import (
"net/http"
"net/http/pprof"
"regexp"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
Expand Down Expand Up @@ -173,17 +176,34 @@ func (r *routeBuilder) rumIntakeHandler(newProcessor func(*config.Config) *strea
requestMetadataFunc = rumRequestMetadata
}
return func() (request.Handler, error) {
batchProcessor := r.batchProcessor
var batchProcessors modelprocessor.Chained
// The order of these processors is important. Source mapping must happen before identifying library frames, or
// frames to exclude from error grouping; identifying library frames must happen before updating the error culprit.
if r.sourcemapStore != nil {
batchProcessor = modelprocessor.Chained{
sourcemap.BatchProcessor{
Store: r.sourcemapStore,
Timeout: r.cfg.RumConfig.SourceMapping.Timeout,
},
batchProcessor,
batchProcessors = append(batchProcessors, sourcemap.BatchProcessor{
Store: r.sourcemapStore,
Timeout: r.cfg.RumConfig.SourceMapping.Timeout,
})
}
if r.cfg.RumConfig.LibraryPattern != "" {
re, err := regexp.Compile(r.cfg.RumConfig.LibraryPattern)
if err != nil {
return nil, errors.Wrap(err, "invalid library pattern regex")
}
batchProcessors = append(batchProcessors, modelprocessor.SetLibraryFrame{Pattern: re})
}
if r.cfg.RumConfig.ExcludeFromGrouping != "" {
re, err := regexp.Compile(r.cfg.RumConfig.ExcludeFromGrouping)
if err != nil {
return nil, errors.Wrap(err, "invalid exclude from grouping regex")
}
batchProcessors = append(batchProcessors, modelprocessor.SetExcludeFromGrouping{Pattern: re})
}
if r.sourcemapStore != nil {
batchProcessors = append(batchProcessors, modelprocessor.SetCulprit{})
}
h := intake.Handler(newProcessor(r.cfg), requestMetadataFunc, batchProcessor)
batchProcessors = append(batchProcessors, r.batchProcessor) // r.batchProcessor always goes last
h := intake.Handler(newProcessor(r.cfg), requestMetadataFunc, batchProcessors)
return middleware.Wrap(h, rumMiddleware(r.cfg, r.authenticator, r.ratelimitStore, intake.MonitoringMap)...)
}
}
Expand Down
5 changes: 0 additions & 5 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"net"
"net/http"
"os"
"regexp"
"runtime"
"strings"
"sync"
Expand Down Expand Up @@ -628,10 +627,6 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ
func newTransformConfig(beatInfo beat.Info, cfg *config.Config) *transform.Config {
return &transform.Config{
DataStreams: cfg.DataStreams.Enabled,
RUM: transform.RUMConfig{
LibraryPattern: regexp.MustCompile(cfg.RumConfig.LibraryPattern),
ExcludeFromGrouping: regexp.MustCompile(cfg.RumConfig.ExcludeFromGrouping),
},
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@
"exclude_from_grouping": false,
"filename": "/webpack/file/name.py",
"function": "foo",
"library_frame": false,
"line": {
"column": 4,
"context": "line3",
Expand Down
1 change: 0 additions & 1 deletion docs/data/elasticsearch/generated/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@
"exclude_from_grouping": false,
"filename": "/webpack/file/name.py",
"function": "foo",
"library_frame": false,
"line": {
"column": 4,
"context": "line3",
Expand Down
59 changes: 8 additions & 51 deletions model/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ type Error struct {
TransactionSampled *bool
TransactionType string

// RUM records whether or not this is a RUM error,
// and should have its stack frames sourcemapped.
RUM bool

Experimental interface{}
}

Expand Down Expand Up @@ -111,7 +107,7 @@ func (e *Error) appendBeatEvents(ctx context.Context, cfg *transform.Config, eve
}

fields := mapStr{
"error": e.fields(ctx, cfg),
"error": e.fields(),
"processor": errorProcessorEntry,
}

Expand Down Expand Up @@ -157,63 +153,24 @@ func (e *Error) appendBeatEvents(ctx context.Context, cfg *transform.Config, eve
})
}

func (e *Error) fields(ctx context.Context, cfg *transform.Config) common.MapStr {
func (e *Error) fields() common.MapStr {
var fields mapStr
fields.maybeSetString("id", e.ID)
fields.maybeSetMapStr("page", e.Page.Fields())

exceptionChain := flattenExceptionTree(e.Exception)
if exception := e.exceptionFields(ctx, cfg, exceptionChain); len(exception) > 0 {
if exception := e.exceptionFields(exceptionChain); len(exception) > 0 {
fields.set("exception", exception)
}
fields.maybeSetMapStr("log", e.logFields(ctx, cfg))
fields.maybeSetMapStr("log", e.logFields())

e.updateCulprit()
fields.maybeSetString("culprit", e.Culprit)
fields.maybeSetMapStr("custom", customFields(e.Custom))
fields.maybeSetString("grouping_key", e.calcGroupingKey(exceptionChain))
return common.MapStr(fields)
}

// TODO(axw) introduce another processor which sets library_frame
// and exclude_from_grouping, only applied for RUM. Then we get rid
// of Error.RUM and Span.RUM.
func (e *Error) updateCulprit() {
if !e.RUM {
return
}
var fr *StacktraceFrame
if e.Log != nil {
fr = findSmappedNonLibraryFrame(e.Log.Stacktrace)
}
if fr == nil && e.Exception != nil {
fr = findSmappedNonLibraryFrame(e.Exception.Stacktrace)
}
if fr == nil {
return
}
var culprit string
if fr.Filename != "" {
culprit = fr.Filename
} else if fr.Classname != "" {
culprit = fr.Classname
}
if fr.Function != "" {
culprit += fmt.Sprintf(" in %v", fr.Function)
}
e.Culprit = culprit
}

func findSmappedNonLibraryFrame(frames []*StacktraceFrame) *StacktraceFrame {
for _, fr := range frames {
if fr.SourcemapUpdated && !fr.IsLibraryFrame() {
return fr
}
}
return nil
}

func (e *Error) exceptionFields(ctx context.Context, cfg *transform.Config, chain []Exception) []common.MapStr {
func (e *Error) exceptionFields(chain []Exception) []common.MapStr {
var result []common.MapStr
for _, exception := range chain {
var ex mapStr
Expand Down Expand Up @@ -242,7 +199,7 @@ func (e *Error) exceptionFields(ctx context.Context, cfg *transform.Config, chai
if n := len(exception.Stacktrace); n > 0 {
frames := make([]common.MapStr, n)
for i, frame := range exception.Stacktrace {
frames[i] = frame.transform(cfg, e.RUM)
frames[i] = frame.transform()
}
ex.set("stacktrace", frames)
}
Expand All @@ -252,7 +209,7 @@ func (e *Error) exceptionFields(ctx context.Context, cfg *transform.Config, chai
return result
}

func (e *Error) logFields(ctx context.Context, cfg *transform.Config) common.MapStr {
func (e *Error) logFields() common.MapStr {
if e.Log == nil {
return nil
}
Expand All @@ -261,7 +218,7 @@ func (e *Error) logFields(ctx context.Context, cfg *transform.Config) common.Map
log.maybeSetString("param_message", e.Log.ParamMessage)
log.maybeSetString("logger_name", e.Log.LoggerName)
log.maybeSetString("level", e.Log.Level)
if st := e.Log.Stacktrace.transform(ctx, cfg, e.RUM); len(st) > 0 {
if st := e.Log.Stacktrace.transform(); len(st) > 0 {
log.set("stacktrace", st)
}
return common.MapStr(log)
Expand Down
115 changes: 0 additions & 115 deletions model/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ func TestEventFields(t *testing.T) {
Exception: &exception,
Log: &log,
TransactionID: trID,
RUM: true,

// Service name and version are required for sourcemapping.
Metadata: Metadata{
Expand Down Expand Up @@ -357,7 +356,6 @@ func TestEvents(t *testing.T) {
HTTP: &Http{Request: &Req{Referer: referer}},
URL: &URL{Original: url},
Custom: custom,
RUM: true,
},

Output: common.MapStr{
Expand Down Expand Up @@ -410,119 +408,6 @@ func TestEvents(t *testing.T) {
}
}

func TestCulprit(t *testing.T) {
c := "foo"
fct := "fct"
truthy := true
st := Stacktrace{
&StacktraceFrame{Filename: "a", Function: fct},
}
stUpdate := Stacktrace{
&StacktraceFrame{Filename: "a", Function: fct},
&StacktraceFrame{Filename: "a", LibraryFrame: &truthy, SourcemapUpdated: true},
&StacktraceFrame{Filename: "f", Function: fct, SourcemapUpdated: true},
&StacktraceFrame{Filename: "bar", Function: fct, SourcemapUpdated: true},
}
tests := []struct {
event Error
culprit string
msg string
}{
{
event: Error{Culprit: c, RUM: false},
culprit: "foo",
msg: "Not a RUM event",
},
{
event: Error{Culprit: c, RUM: true},
culprit: "foo",
msg: "No Stacktrace Frame given.",
},
{
event: Error{Culprit: c, RUM: true, Log: &Log{Stacktrace: st}},
culprit: "foo",
msg: "Log.StacktraceFrame has no updated frame",
},
{
event: Error{
Culprit: c,
RUM: true,
Log: &Log{
Stacktrace: Stacktrace{
&StacktraceFrame{
Filename: "f",
Classname: "xyz",
SourcemapUpdated: true,
},
},
},
},
culprit: "f",
msg: "Adapt culprit to first valid Log.StacktraceFrame filename information.",
},
{
event: Error{
Culprit: c,
RUM: true,
Log: &Log{
Stacktrace: Stacktrace{
&StacktraceFrame{
Classname: "xyz",
SourcemapUpdated: true,
},
},
},
},
culprit: "xyz",
msg: "Adapt culprit Log.StacktraceFrame classname information.",
},
{
event: Error{
Culprit: c,
RUM: true,
Exception: &Exception{Stacktrace: stUpdate},
},
culprit: "f in fct",
msg: "Adapt culprit to first valid Exception.StacktraceFrame information.",
},
{
event: Error{
Culprit: c,
RUM: true,
Log: &Log{Stacktrace: st},
Exception: &Exception{Stacktrace: stUpdate},
},
culprit: "f in fct",
msg: "Log and Exception StacktraceFrame given, only one changes culprit.",
},
{
event: Error{
Culprit: c,
RUM: true,
Log: &Log{
Stacktrace: Stacktrace{
&StacktraceFrame{
Filename: "a",
Function: fct,
SourcemapUpdated: true,
},
},
},
Exception: &Exception{Stacktrace: stUpdate},
},
culprit: "a in fct",
msg: "Log Stacktrace is prioritized over Exception StacktraceFrame",
},
}
for idx, test := range tests {
t.Run(fmt.Sprint(idx), func(t *testing.T) {
test.event.updateCulprit()
assert.Equal(t, test.culprit, test.event.Culprit,
fmt.Sprintf("(%v) %s: expected <%v>, received <%v>", idx, test.msg, test.culprit, test.event.Culprit))
})
}
}

func TestErrorTransformPage(t *testing.T) {
id := "123"
urlExample := "http://example.com/path"
Expand Down
2 changes: 0 additions & 2 deletions model/modeldecoder/rumv3/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ func TestDecodeMapToErrorModel(t *testing.T) {
"Experimental",
// URL parts are derived from url (separately tested)
"URL", "Page.URL",
// RUM is set in stream processor
"RUM",
// exception.parent is only set after calling `flattenExceptionTree` (not part of decoding)
"Exception.Parent",
// stacktrace original and sourcemap values are set when sourcemapping is applied
Expand Down
4 changes: 0 additions & 4 deletions model/modeldecoder/rumv3/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ func TestDecodeMapToTransactionModel(t *testing.T) {
// URL parts are derived from page.url (separately tested)
"URL", "Page.URL",
// HTTP.Request.Referrer is derived from page.referer (separately tested)
// RUM is set in stream processor
"RUM",
} {
if strings.HasPrefix(key, s) {
return true
Expand Down Expand Up @@ -243,8 +241,6 @@ func TestDecodeMapToTransactionModel(t *testing.T) {
"Stacktrace.Sourcemap",
// ExcludeFromGrouping is set when processing the event
"Stacktrace.ExcludeFromGrouping",
// RUM is set in stream processor
"RUM",
// Transaction related information is set within the DecodeNestedTransaction method
// it is separatly tested in TestDecodeNestedTransaction
"TransactionID", "TraceID", "ParentID",
Expand Down
2 changes: 1 addition & 1 deletion model/modeldecoder/v2/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ func mapToStracktraceModel(from []stacktraceFrame, out model.Stacktrace) {
}
if eventFrame.LibraryFrame.IsSet() {
val := eventFrame.LibraryFrame.Val
fr.LibraryFrame = &val
fr.LibraryFrame = val
}
if eventFrame.LineNumber.IsSet() {
val := eventFrame.LineNumber.Val
Expand Down
2 changes: 0 additions & 2 deletions model/modeldecoder/v2/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ func TestDecodeMapToErrorModel(t *testing.T) {
"Metadata",
// URL parts are derived from url (separately tested)
"Page.URL",
// RUM is set in stream processor
"RUM",
// exception.parent is only set after calling `flattenExceptionTree` (not part of decoding)
"Exception.Parent",
// stacktrace original and sourcemap values are set when sourcemapping is applied
Expand Down
2 changes: 0 additions & 2 deletions model/modeldecoder/v2/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ func TestDecodeMapToSpanModel(t *testing.T) {
for _, s := range []string{
// experimental is tested in test 'experimental'
"Experimental",
// RUM is set in stream processor
"RUM",
// RepresentativeCount is tested further down in test 'sample-rate'
"RepresentativeCount"} {
if key == s {
Expand Down
Loading