From fb114a67422b8acc56b96ffd106d4d6ac21aec3a Mon Sep 17 00:00:00 2001
From: Carson Ip <carsonip@users.noreply.github.com>
Date: Tue, 15 Oct 2024 10:57:04 +0100
Subject: [PATCH] [exporter/elasticsearch] Set body.* for log body in OTel mode
 (#35771)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

Update OTel mode to implementation to serialize log body into body.*
fields

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue

<!--Describe what testing was performed and which tests were added.-->
#### Testing

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->
---
 ...sticsearchexporter_otel-mode-log-body.yaml |  27 ++++
 .../elasticsearchexporter/exporter_test.go    | 118 ++++++++++++++----
 exporter/elasticsearchexporter/model.go       |  51 +++++---
 3 files changed, 153 insertions(+), 43 deletions(-)
 create mode 100644 .chloggen/elasticsearchexporter_otel-mode-log-body.yaml

diff --git a/.chloggen/elasticsearchexporter_otel-mode-log-body.yaml b/.chloggen/elasticsearchexporter_otel-mode-log-body.yaml
new file mode 100644
index 000000000000..aa11a7c95622
--- /dev/null
+++ b/.chloggen/elasticsearchexporter_otel-mode-log-body.yaml
@@ -0,0 +1,27 @@
+# Use this changelog template to create an entry for release notes.
+
+# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
+change_type: breaking
+
+# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
+component: elasticsearchexporter
+
+# A brief description of the change.  Surround your text with quotes ("") if it needs to start with a backtick (`).
+note: Set body.* for log body in OTel mode
+
+# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
+issues: [35771]
+
+# (Optional) One or more lines of additional information to render under the primary note.
+# These lines will be padded with 2 spaces and then inserted directly into the document.
+# Use pipe (|) for multiline entries.
+subtext: Log record body in OTel mapping mode will be stored in body.text, body.structured, body.flattened based on body value type and presence of event.name attribute
+
+# If your change doesn't affect end users or the exported elements of any package,
+# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
+# Optional: The change log or logs in which this entry should be included.
+# e.g. '[user]' or '[user, api]'
+# Include 'user' if the change is relevant to end users.
+# Include 'api' if there is a change to a library API.
+# Default: '[user]'
+change_logs: [user]
diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go
index 413853cfcd51..a84ef00193a4 100644
--- a/exporter/elasticsearchexporter/exporter_test.go
+++ b/exporter/elasticsearchexporter/exporter_test.go
@@ -293,38 +293,104 @@ func TestExporterLogs(t *testing.T) {
 	})
 
 	t.Run("publish otel mapping mode", func(t *testing.T) {
-		rec := newBulkRecorder()
-		server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
-			rec.Record(docs)
-			return itemsAllOK(docs)
-		})
-
-		exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
-			cfg.LogsDynamicIndex.Enabled = true
-			cfg.Mapping.Mode = "otel"
-		})
-		mustSendLogs(t, exporter, newLogsWithAttributes(
-			map[string]any{
-				"data_stream.dataset": "attr.dataset",
-				"attr.foo":            "attr.foo.value",
+		for _, tc := range []struct {
+			body         pcommon.Value
+			isEvent      bool
+			wantDocument []byte
+		}{
+			{
+				body: func() pcommon.Value {
+					return pcommon.NewValueStr("foo")
+				}(),
+				wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"text":"foo"}}`),
 			},
-			nil,
-			map[string]any{
-				"data_stream.dataset":   "resource.attribute.dataset",
-				"data_stream.namespace": "resource.attribute.namespace",
-				"resource.attr.foo":     "resource.attr.foo.value",
+			{
+				body: func() pcommon.Value {
+					vm := pcommon.NewValueMap()
+					m := vm.SetEmptyMap()
+					m.PutBool("true", true)
+					m.PutBool("false", false)
+					m.PutEmptyMap("inner").PutStr("foo", "bar")
+					return vm
+				}(),
+				wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"flattened":{"true":true,"false":false,"inner":{"foo":"bar"}}}}`),
+			},
+			{
+				body: func() pcommon.Value {
+					vm := pcommon.NewValueMap()
+					m := vm.SetEmptyMap()
+					m.PutBool("true", true)
+					m.PutBool("false", false)
+					m.PutEmptyMap("inner").PutStr("foo", "bar")
+					return vm
+				}(),
+				isEvent:      true,
+				wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value","event.name":"foo"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"structured":{"true":true,"false":false,"inner":{"foo":"bar"}}}}`),
 			},
-		))
-		rec.WaitItems(1)
-
-		expected := []itemRequest{
 			{
-				Action:   []byte(`{"create":{"_index":"logs-attr.dataset.otel-resource.attribute.namespace"}}`),
-				Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0}`),
+				body: func() pcommon.Value {
+					vs := pcommon.NewValueSlice()
+					s := vs.Slice()
+					s.AppendEmpty().SetStr("foo")
+					s.AppendEmpty().SetBool(false)
+					s.AppendEmpty().SetEmptyMap().PutStr("foo", "bar")
+					return vs
+				}(),
+				wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"flattened":{"value":["foo",false,{"foo":"bar"}]}}}`),
 			},
+			{
+				body: func() pcommon.Value {
+					vs := pcommon.NewValueSlice()
+					s := vs.Slice()
+					s.AppendEmpty().SetStr("foo")
+					s.AppendEmpty().SetBool(false)
+					s.AppendEmpty().SetEmptyMap().PutStr("foo", "bar")
+					return vs
+				}(),
+				isEvent:      true,
+				wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value","event.name":"foo"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"structured":{"value":["foo",false,{"foo":"bar"}]}}}`),
+			},
+		} {
+			rec := newBulkRecorder()
+			server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
+				rec.Record(docs)
+				return itemsAllOK(docs)
+			})
+
+			exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
+				cfg.LogsDynamicIndex.Enabled = true
+				cfg.Mapping.Mode = "otel"
+			})
+			recordAttrs := map[string]any{
+				"data_stream.dataset": "attr.dataset",
+				"attr.foo":            "attr.foo.value",
+			}
+			if tc.isEvent {
+				recordAttrs["event.name"] = "foo"
+			}
+			logs := newLogsWithAttributes(
+				recordAttrs,
+				nil,
+				map[string]any{
+					"data_stream.dataset":   "resource.attribute.dataset",
+					"data_stream.namespace": "resource.attribute.namespace",
+					"resource.attr.foo":     "resource.attr.foo.value",
+				},
+			)
+			tc.body.CopyTo(logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body())
+			mustSendLogs(t, exporter, logs)
+			rec.WaitItems(1)
+
+			expected := []itemRequest{
+				{
+					Action:   []byte(`{"create":{"_index":"logs-attr.dataset.otel-resource.attribute.namespace"}}`),
+					Document: tc.wantDocument,
+				},
+			}
+
+			assertItemsEqual(t, expected, rec.Items(), false)
 		}
 
-		assertItemsEqual(t, expected, rec.Items(), false)
 	})
 
 	t.Run("retry http request", func(t *testing.T) {
diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go
index ce4b9a3a22da..4bf95be05f3d 100644
--- a/exporter/elasticsearchexporter/model.go
+++ b/exporter/elasticsearchexporter/model.go
@@ -160,36 +160,53 @@ func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchem
 	m.encodeScopeOTelMode(&document, scope, scopeSchemaURL)
 
 	// Body
-	setOTelLogBody(&document, record.Body())
+	setOTelLogBody(&document, record.Body(), record.Attributes())
 
 	return document
 }
 
-func setOTelLogBody(doc *objmodel.Document, body pcommon.Value) {
+func setOTelLogBody(doc *objmodel.Document, body pcommon.Value, attributes pcommon.Map) {
+	// Determine if this log record is an event, as they are mapped differently
+	// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/general/events.md
+	_, isEvent := attributes.Get("event.name")
+
 	switch body.Type() {
 	case pcommon.ValueTypeMap:
-		doc.AddAttribute("body_structured", body)
+		if isEvent {
+			doc.AddAttribute("body.structured", body)
+		} else {
+			doc.AddAttribute("body.flattened", body)
+		}
 	case pcommon.ValueTypeSlice:
-		slice := body.Slice()
-		for i := 0; i < slice.Len(); i++ {
-			switch slice.At(i).Type() {
-			case pcommon.ValueTypeMap, pcommon.ValueTypeSlice:
-				doc.AddAttribute("body_structured", body)
-				return
+		// output must be an array of objects due to ES limitations
+		// otherwise, wrap the array in an object
+		s := body.Slice()
+		allMaps := true
+		for i := 0; i < s.Len(); i++ {
+			if s.At(i).Type() != pcommon.ValueTypeMap {
+				allMaps = false
 			}
 		}
 
-		bodyTextVal := pcommon.NewValueSlice()
-		bodyTextSlice := bodyTextVal.Slice()
-		bodyTextSlice.EnsureCapacity(slice.Len())
+		var outVal pcommon.Value
+		if allMaps {
+			outVal = body
+		} else {
+			vm := pcommon.NewValueMap()
+			m := vm.SetEmptyMap()
+			body.Slice().CopyTo(m.PutEmptySlice("value"))
+			outVal = vm
+		}
 
-		for i := 0; i < slice.Len(); i++ {
-			elem := slice.At(i)
-			bodyTextSlice.AppendEmpty().SetStr(elem.AsString())
+		if isEvent {
+			doc.AddAttribute("body.structured", outVal)
+		} else {
+			doc.AddAttribute("body.flattened", outVal)
 		}
-		doc.AddAttribute("body_text", bodyTextVal)
+	case pcommon.ValueTypeStr:
+		doc.AddString("body.text", body.Str())
 	default:
-		doc.AddString("body_text", body.AsString())
+		doc.AddString("body.text", body.AsString())
 	}
 }