Skip to content

Commit

Permalink
Enhance conversion error messages
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewkroh committed May 7, 2019
1 parent d347147 commit 40a02f9
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 53 deletions.
128 changes: 79 additions & 49 deletions libbeat/processors/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"net"
"strconv"
"strings"

"github.com/pkg/errors"

Expand Down Expand Up @@ -79,26 +80,12 @@ func resetValues(s []interface{}) {
func (p *processor) Run(event *beat.Event) (*beat.Event, error) {
defer resetValues(p.converted)

// Validate the conversions.
for i, conv := range p.Fields {
v, err := convertField(event, conv.From, conv.Type)
if err != nil {
switch cause := errors.Cause(err); cause {
case common.ErrKeyNotFound:
if !p.IgnoreMissing && p.FailOnError {
return event, annotateError(p.Tag, errors.Errorf("field [%v] is missing, cannot be converted to type [%v]", conv.From, conv.Type))
}
default:
if p.FailOnError {
return event, annotateError(p.Tag, errors.Wrapf(err, "unable to convert field [%v] value [%v] to [%v]", conv.From, v, conv.Type))
}
}
p.converted[i] = ignoredFailure
continue
}
p.converted[i] = v
// Convert the fields and write the results to temporary storage.
if err := p.convertFields(event); err != nil {
return event, err
}

// Backup original event.
saved := *event
if len(p.Fields) > 1 && p.FailOnError {
// Clone the fields to allow the processor to undo the operation on
Expand All @@ -109,47 +96,76 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) {
saved.Meta = event.Meta.Clone()
}

// Update the event with the converted values.
if err := p.writeToEvent(event); err != nil {
return &saved, err
}

return event, nil
}

func (p *processor) convertFields(event *beat.Event) error {
// Write conversion results to temporary storage.
for i, conv := range p.Fields {
v, err := p.convertField(event, conv)
if err != nil {
if p.FailOnError {
return err
}
v = ignoredFailure
}
p.converted[i] = v
}

return nil
}

func (p *processor) convertField(event *beat.Event, conversion field) (interface{}, error) {
v, err := event.GetValue(conversion.From)
if err != nil {
if p.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound {
return ignoredFailure, nil
}
return nil, newConvertError(conversion, err, p.Tag, "field [%v] is missing", conversion.From)
}

if conversion.Type > unset {
t, err := transformType(conversion.Type, v)
if err != nil {
return nil, newConvertError(conversion, err, p.Tag, "unable to convert value [%v]", v)
}
v = t
}

return v, nil
}

func (p *processor) writeToEvent(event *beat.Event) error {
for i, conversion := range p.Fields {
v := p.converted[i]
if v == ignoredFailure {
continue
}

if conv.To != "" {
if conversion.To != "" {
switch p.Mode {
case renameMode:
if _, err := event.PutValue(conv.To, v); err != nil && p.FailOnError {
return &saved, annotateError(p.Tag, errors.Wrapf(err, "failed to put field [%v]", conv.To))
if _, err := event.PutValue(conversion.To, v); err != nil && p.FailOnError {
return newConvertError(conversion, err, p.Tag, "failed to put field [%v]", conversion.To)
}
event.Delete(conv.From)
event.Delete(conversion.From)
case copyMode:
if _, err := event.PutValue(conv.To, cloneValue(v)); err != nil && p.FailOnError {
return &saved, annotateError(p.Tag, errors.Wrapf(err, "failed to put field [%v]", conv.To))
if _, err := event.PutValue(conversion.To, cloneValue(v)); err != nil && p.FailOnError {
return newConvertError(conversion, err, p.Tag, "failed to put field [%v]", conversion.To)
}
}
} else {
// In-place conversion.
event.PutValue(conv.From, v)
}
}

return event, nil
}

func convertField(event *beat.Event, from string, typ dataType) (interface{}, error) {
v, err := event.GetValue(from)
if err != nil {
return nil, err
}

if typ > unset {
v, err = transformType(typ, v)
if err != nil {
return nil, err
event.PutValue(conversion.From, v)
}
}

return v, nil
return nil
}

func transformType(typ dataType, value interface{}) (interface{}, error) {
Expand Down Expand Up @@ -337,18 +353,32 @@ func toIP(value interface{}) (string, error) {
if net.ParseIP(v) != nil {
return v, nil
}
return "", errors.New("value is not a valid IP address")
default:
return "", errors.Errorf("invalid conversion of [%T] to IP", value)
}
return "", errors.Errorf("invalid conversion of [%T] to IP", value)
}

func annotateError(id string, err error) error {
if err == nil {
return nil
func newConvertError(conversion field, cause error, tag string, message string, params ...interface{}) error {
var buf strings.Builder
buf.WriteString("failed in processor.convert")
if tag != "" {
buf.WriteString(" with instance_id=")
buf.WriteString(tag)
}
if id != "" {
return errors.Wrapf(err, "failed in processor.convert with instance_id=%v", id)
buf.WriteString(": conversion of field [")
buf.WriteString(conversion.From)
buf.WriteString("] to type [")
buf.WriteString(conversion.Type.String())
buf.WriteString("]")
if conversion.To != "" {
buf.WriteString(" with target field [")
buf.WriteString(conversion.To)
buf.WriteString("]")
}
return errors.Wrap(err, "failed in processor.convert")
buf.WriteString(" failed: ")
fmt.Fprintf(&buf, message, params...)
return errors.Wrapf(cause, buf.String())
}

// cloneValue returns a shallow copy of a map. All other types are passed
Expand Down
31 changes: 27 additions & 4 deletions libbeat/processors/convert/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,11 @@ func TestConvert(t *testing.T) {

func TestConvertRun(t *testing.T) {
tests := map[string]struct {
config common.MapStr
input beat.Event
expected beat.Event
fail bool
config common.MapStr
input beat.Event
expected beat.Event
fail bool
errContains string
}{
"missing field": {
config: common.MapStr{
Expand Down Expand Up @@ -213,6 +214,25 @@ func TestConvertRun(t *testing.T) {
},
fail: true,
},
"invalid conversion": {
config: common.MapStr{
"fields": []common.MapStr{
{"from": "address", "to": "ip", "type": "ip"},
},
},
input: beat.Event{
Fields: common.MapStr{
"address": "-",
},
},
expected: beat.Event{
Fields: common.MapStr{
"address": "-",
},
},
fail: true,
errContains: "unable to convert value [-]: value is not a valid IP address",
},
}

for title, tt := range tests {
Expand All @@ -230,6 +250,9 @@ func TestConvertRun(t *testing.T) {
if tt.fail {
assert.Error(t, err)
t.Log("got expected error", err)
if tt.errContains != "" {
assert.Contains(t, err.Error(), tt.errContains)
}
return
}
assert.NoError(t, err)
Expand Down

0 comments on commit 40a02f9

Please sign in to comment.