diff --git a/receiver/fluentforwardreceiver/conversion.go b/receiver/fluentforwardreceiver/conversion.go index 3832ddeb0e5b..35a9d50560f7 100644 --- a/receiver/fluentforwardreceiver/conversion.go +++ b/receiver/fluentforwardreceiver/conversion.go @@ -17,7 +17,6 @@ package fluentforwardreceiver import ( "bytes" "compress/gzip" - "encoding/json" "fmt" "io" "time" @@ -81,31 +80,55 @@ func (em EventMode) String() string { } } -func insertToAttributeMap(key string, val interface{}, dest *pdata.AttributeMap) { +// parseInterfaceToMap takes map of interface objects and returns +// AttributeValueMap +func parseInterfaceToMap(msi map[string]interface{}) pdata.AttributeValue { + rv := pdata.NewAttributeValueMap() + am := rv.MapVal() + am.EnsureCapacity(len(msi)) + for k, value := range msi { + am.Insert(k, parseToAttributeValue(value)) + } + return rv +} + +// parseInterfaceToArray takes array of interface objects and returns +// AttributeValueArray +func parseInterfaceToArray(ai []interface{}) pdata.AttributeValue { + iv := pdata.NewAttributeValueArray() + av := iv.ArrayVal() + av.EnsureCapacity(len(ai)) + for _, value := range ai { + parseToAttributeValue(value).CopyTo(av.AppendEmpty()) + } + return iv +} + +// parseToAttributeValue converts interface object to AttributeValue +func parseToAttributeValue(val interface{}) pdata.AttributeValue { // See https://github.com/tinylib/msgp/wiki/Type-Mapping-Rules switch r := val.(type) { case bool: - dest.InsertBool(key, r) + return pdata.NewAttributeValueBool(r) case string: - dest.InsertString(key, r) + return pdata.NewAttributeValueString(r) case uint64: - dest.InsertInt(key, int64(r)) + return pdata.NewAttributeValueInt(int64(r)) case int64: - dest.InsertInt(key, r) + return pdata.NewAttributeValueInt(r) + // Sometimes strings come in as bytes array case []byte: - dest.InsertString(key, string(r)) - case map[string]interface{}, []interface{}: - encoded, err := json.Marshal(r) - if err != nil { - dest.InsertString(key, err.Error()) - } - dest.InsertString(key, string(encoded)) + return pdata.NewAttributeValueString(string(r)) + case map[string]interface{}: + return parseInterfaceToMap(r) + case []interface{}: + return parseInterfaceToArray(r) case float32: - dest.InsertDouble(key, float64(r)) + return pdata.NewAttributeValueDouble(float64(r)) case float64: - dest.InsertDouble(key, r) + return pdata.NewAttributeValueDouble(r) default: - dest.InsertString(key, fmt.Sprintf("%v", r)) + return pdata.NewAttributeValueString(fmt.Sprintf("%v", val)) } } @@ -160,19 +183,13 @@ func parseRecordToLogRecord(dc *msgp.Reader, lr pdata.LogRecord) error { return msgp.WrapError(err, "Record", key) } + av := parseToAttributeValue(val) + // fluentd uses message, fluentbit log. if key == "message" || key == "log" { - switch v := val.(type) { - case string: - lr.Body().SetStringVal(v) - case []uint8: - // Sometimes strings come in as uint8's. - lr.Body().SetStringVal(string(v)) - default: - return fmt.Errorf("cannot convert message type %T to string", val) - } + av.CopyTo(lr.Body()) } else { - insertToAttributeMap(key, val, &attrs) + attrs.Insert(key, av) } } diff --git a/receiver/fluentforwardreceiver/conversion_test.go b/receiver/fluentforwardreceiver/conversion_test.go index a1321cc24804..7af6371e6455 100644 --- a/receiver/fluentforwardreceiver/conversion_test.go +++ b/receiver/fluentforwardreceiver/conversion_test.go @@ -100,6 +100,12 @@ func TestAttributeTypeConversion(t *testing.T) { le := event.LogRecords().At(0) le.Attributes().Sort() + + nv := pdata.NewAttributeValueArray() + nv.ArrayVal().EnsureCapacity(2) + nv.ArrayVal().AppendEmpty().SetStringVal("first") + nv.ArrayVal().AppendEmpty().SetStringVal("second") + require.EqualValues(t, Logs( Log{ Timestamp: 5000000000000, @@ -119,7 +125,7 @@ func TestAttributeTypeConversion(t *testing.T) { "k": pdata.NewAttributeValueInt(-1), "l": pdata.NewAttributeValueString("(0+0i)"), "m": pdata.NewAttributeValueString("\001e\002"), - "n": pdata.NewAttributeValueString(`["first","second"]`), + "n": nv, "o": pdata.NewAttributeValueString("cde"), }, }, @@ -213,3 +219,60 @@ func TestPackedForwardEventConversionWithErrors(t *testing.T) { print(err.Error()) }) } + +func TestBodyConversion(t *testing.T) { + var b []byte + + b = msgp.AppendArrayHeader(b, 3) + b = msgp.AppendString(b, "my-tag") + b = msgp.AppendInt(b, 5000) + b = msgp.AppendMapHeader(b, 1) + b = msgp.AppendString(b, "log") + b = msgp.AppendMapHeader(b, 3) + b = msgp.AppendString(b, "a") + b = msgp.AppendString(b, "value") + b = msgp.AppendString(b, "b") + b = msgp.AppendArrayHeader(b, 2) + b = msgp.AppendString(b, "first") + b = msgp.AppendString(b, "second") + b = msgp.AppendString(b, "c") + b = msgp.AppendMapHeader(b, 1) + b = msgp.AppendString(b, "d") + b = msgp.AppendInt(b, 24) + b = msgp.AppendString(b, "o") + b, err := msgp.AppendIntf(b, []uint8{99, 100, 101}) + + require.NoError(t, err) + + reader := msgp.NewReader(bytes.NewReader(b)) + + var event MessageEventLogRecord + err = event.DecodeMsg(reader) + require.Nil(t, err) + + le := event.LogRecords().At(0) + le.Attributes().Sort() + + body := pdata.NewAttributeValueMap() + body.MapVal().InsertString("a", "value") + + bv := pdata.NewAttributeValueArray() + bv.ArrayVal().EnsureCapacity(2) + bv.ArrayVal().AppendEmpty().SetStringVal("first") + bv.ArrayVal().AppendEmpty().SetStringVal("second") + body.MapVal().Insert("b", bv) + + cv := pdata.NewAttributeValueMap() + cv.MapVal().InsertInt("d", 24) + body.MapVal().Insert("c", cv) + + require.EqualValues(t, Logs( + Log{ + Timestamp: 5000000000000, + Body: body, + Attributes: map[string]pdata.AttributeValue{ + "fluent.tag": pdata.NewAttributeValueString("my-tag"), + }, + }, + ).ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().At(0), le) +}