diff --git a/.chloggen/elasticsearchexporter_otel-mode-log-body.yaml b/.chloggen/elasticsearchexporter_otel-mode-log-body.yaml new file mode 100644 index 000000000000..aa11a7c95622 --- /dev/null +++ b/.chloggen/elasticsearchexporter_otel-mode-log-body.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Set body.* for log body in OTel mode + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35771] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Log record body in OTel mapping mode will be stored in body.text, body.structured, body.flattened based on body value type and presence of event.name attribute + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 413853cfcd51..a84ef00193a4 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -293,38 +293,104 @@ func TestExporterLogs(t *testing.T) { }) t.Run("publish otel mapping mode", func(t *testing.T) { - rec := newBulkRecorder() - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - rec.Record(docs) - return itemsAllOK(docs) - }) - - exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { - cfg.LogsDynamicIndex.Enabled = true - cfg.Mapping.Mode = "otel" - }) - mustSendLogs(t, exporter, newLogsWithAttributes( - map[string]any{ - "data_stream.dataset": "attr.dataset", - "attr.foo": "attr.foo.value", + for _, tc := range []struct { + body pcommon.Value + isEvent bool + wantDocument []byte + }{ + { + body: func() pcommon.Value { + return pcommon.NewValueStr("foo") + }(), + wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"text":"foo"}}`), }, - nil, - map[string]any{ - "data_stream.dataset": "resource.attribute.dataset", - "data_stream.namespace": "resource.attribute.namespace", - "resource.attr.foo": "resource.attr.foo.value", + { + body: func() pcommon.Value { + vm := pcommon.NewValueMap() + m := vm.SetEmptyMap() + m.PutBool("true", true) + m.PutBool("false", false) + m.PutEmptyMap("inner").PutStr("foo", "bar") + return vm + }(), + wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"flattened":{"true":true,"false":false,"inner":{"foo":"bar"}}}}`), + }, + { + body: func() pcommon.Value { + vm := pcommon.NewValueMap() + m := vm.SetEmptyMap() + m.PutBool("true", true) + m.PutBool("false", false) + m.PutEmptyMap("inner").PutStr("foo", "bar") + return vm + }(), + isEvent: true, + wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value","event.name":"foo"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"structured":{"true":true,"false":false,"inner":{"foo":"bar"}}}}`), }, - )) - rec.WaitItems(1) - - expected := []itemRequest{ { - Action: []byte(`{"create":{"_index":"logs-attr.dataset.otel-resource.attribute.namespace"}}`), - Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0}`), + body: func() pcommon.Value { + vs := pcommon.NewValueSlice() + s := vs.Slice() + s.AppendEmpty().SetStr("foo") + s.AppendEmpty().SetBool(false) + s.AppendEmpty().SetEmptyMap().PutStr("foo", "bar") + return vs + }(), + wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"flattened":{"value":["foo",false,{"foo":"bar"}]}}}`), }, + { + body: func() pcommon.Value { + vs := pcommon.NewValueSlice() + s := vs.Slice() + s.AppendEmpty().SetStr("foo") + s.AppendEmpty().SetBool(false) + s.AppendEmpty().SetEmptyMap().PutStr("foo", "bar") + return vs + }(), + isEvent: true, + wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value","event.name":"foo"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"structured":{"value":["foo",false,{"foo":"bar"}]}}}`), + }, + } { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.LogsDynamicIndex.Enabled = true + cfg.Mapping.Mode = "otel" + }) + recordAttrs := map[string]any{ + "data_stream.dataset": "attr.dataset", + "attr.foo": "attr.foo.value", + } + if tc.isEvent { + recordAttrs["event.name"] = "foo" + } + logs := newLogsWithAttributes( + recordAttrs, + nil, + map[string]any{ + "data_stream.dataset": "resource.attribute.dataset", + "data_stream.namespace": "resource.attribute.namespace", + "resource.attr.foo": "resource.attr.foo.value", + }, + ) + tc.body.CopyTo(logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body()) + mustSendLogs(t, exporter, logs) + rec.WaitItems(1) + + expected := []itemRequest{ + { + Action: []byte(`{"create":{"_index":"logs-attr.dataset.otel-resource.attribute.namespace"}}`), + Document: tc.wantDocument, + }, + } + + assertItemsEqual(t, expected, rec.Items(), false) } - assertItemsEqual(t, expected, rec.Items(), false) }) t.Run("retry http request", func(t *testing.T) { diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index ce4b9a3a22da..4bf95be05f3d 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -160,36 +160,53 @@ func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchem m.encodeScopeOTelMode(&document, scope, scopeSchemaURL) // Body - setOTelLogBody(&document, record.Body()) + setOTelLogBody(&document, record.Body(), record.Attributes()) return document } -func setOTelLogBody(doc *objmodel.Document, body pcommon.Value) { +func setOTelLogBody(doc *objmodel.Document, body pcommon.Value, attributes pcommon.Map) { + // Determine if this log record is an event, as they are mapped differently + // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/general/events.md + _, isEvent := attributes.Get("event.name") + switch body.Type() { case pcommon.ValueTypeMap: - doc.AddAttribute("body_structured", body) + if isEvent { + doc.AddAttribute("body.structured", body) + } else { + doc.AddAttribute("body.flattened", body) + } case pcommon.ValueTypeSlice: - slice := body.Slice() - for i := 0; i < slice.Len(); i++ { - switch slice.At(i).Type() { - case pcommon.ValueTypeMap, pcommon.ValueTypeSlice: - doc.AddAttribute("body_structured", body) - return + // output must be an array of objects due to ES limitations + // otherwise, wrap the array in an object + s := body.Slice() + allMaps := true + for i := 0; i < s.Len(); i++ { + if s.At(i).Type() != pcommon.ValueTypeMap { + allMaps = false } } - bodyTextVal := pcommon.NewValueSlice() - bodyTextSlice := bodyTextVal.Slice() - bodyTextSlice.EnsureCapacity(slice.Len()) + var outVal pcommon.Value + if allMaps { + outVal = body + } else { + vm := pcommon.NewValueMap() + m := vm.SetEmptyMap() + body.Slice().CopyTo(m.PutEmptySlice("value")) + outVal = vm + } - for i := 0; i < slice.Len(); i++ { - elem := slice.At(i) - bodyTextSlice.AppendEmpty().SetStr(elem.AsString()) + if isEvent { + doc.AddAttribute("body.structured", outVal) + } else { + doc.AddAttribute("body.flattened", outVal) } - doc.AddAttribute("body_text", bodyTextVal) + case pcommon.ValueTypeStr: + doc.AddString("body.text", body.Str()) default: - doc.AddString("body_text", body.AsString()) + doc.AddString("body.text", body.AsString()) } }