Skip to content

Commit

Permalink
[exporter/elasticsearch] Set body.* for log body in OTel mode (#35771)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

Update OTel mode to implementation to serialize log body into body.*
fields

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue

<!--Describe what testing was performed and which tests were added.-->
#### Testing

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
carsonip authored Oct 15, 2024
1 parent 8b67271 commit fb114a6
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 43 deletions.
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_otel-mode-log-body.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Set body.* for log body in OTel mode

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35771]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Log record body in OTel mapping mode will be stored in body.text, body.structured, body.flattened based on body value type and presence of event.name attribute

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
118 changes: 92 additions & 26 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,38 +293,104 @@ func TestExporterLogs(t *testing.T) {
})

t.Run("publish otel mapping mode", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
cfg.LogsDynamicIndex.Enabled = true
cfg.Mapping.Mode = "otel"
})
mustSendLogs(t, exporter, newLogsWithAttributes(
map[string]any{
"data_stream.dataset": "attr.dataset",
"attr.foo": "attr.foo.value",
for _, tc := range []struct {
body pcommon.Value
isEvent bool
wantDocument []byte
}{
{
body: func() pcommon.Value {
return pcommon.NewValueStr("foo")
}(),
wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"text":"foo"}}`),
},
nil,
map[string]any{
"data_stream.dataset": "resource.attribute.dataset",
"data_stream.namespace": "resource.attribute.namespace",
"resource.attr.foo": "resource.attr.foo.value",
{
body: func() pcommon.Value {
vm := pcommon.NewValueMap()
m := vm.SetEmptyMap()
m.PutBool("true", true)
m.PutBool("false", false)
m.PutEmptyMap("inner").PutStr("foo", "bar")
return vm
}(),
wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"flattened":{"true":true,"false":false,"inner":{"foo":"bar"}}}}`),
},
{
body: func() pcommon.Value {
vm := pcommon.NewValueMap()
m := vm.SetEmptyMap()
m.PutBool("true", true)
m.PutBool("false", false)
m.PutEmptyMap("inner").PutStr("foo", "bar")
return vm
}(),
isEvent: true,
wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value","event.name":"foo"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"structured":{"true":true,"false":false,"inner":{"foo":"bar"}}}}`),
},
))
rec.WaitItems(1)

expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"logs-attr.dataset.otel-resource.attribute.namespace"}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0}`),
body: func() pcommon.Value {
vs := pcommon.NewValueSlice()
s := vs.Slice()
s.AppendEmpty().SetStr("foo")
s.AppendEmpty().SetBool(false)
s.AppendEmpty().SetEmptyMap().PutStr("foo", "bar")
return vs
}(),
wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"flattened":{"value":["foo",false,{"foo":"bar"}]}}}`),
},
{
body: func() pcommon.Value {
vs := pcommon.NewValueSlice()
s := vs.Slice()
s.AppendEmpty().SetStr("foo")
s.AppendEmpty().SetBool(false)
s.AppendEmpty().SetEmptyMap().PutStr("foo", "bar")
return vs
}(),
isEvent: true,
wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value","event.name":"foo"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"structured":{"value":["foo",false,{"foo":"bar"}]}}}`),
},
} {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
cfg.LogsDynamicIndex.Enabled = true
cfg.Mapping.Mode = "otel"
})
recordAttrs := map[string]any{
"data_stream.dataset": "attr.dataset",
"attr.foo": "attr.foo.value",
}
if tc.isEvent {
recordAttrs["event.name"] = "foo"
}
logs := newLogsWithAttributes(
recordAttrs,
nil,
map[string]any{
"data_stream.dataset": "resource.attribute.dataset",
"data_stream.namespace": "resource.attribute.namespace",
"resource.attr.foo": "resource.attr.foo.value",
},
)
tc.body.CopyTo(logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body())
mustSendLogs(t, exporter, logs)
rec.WaitItems(1)

expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"logs-attr.dataset.otel-resource.attribute.namespace"}}`),
Document: tc.wantDocument,
},
}

assertItemsEqual(t, expected, rec.Items(), false)
}

assertItemsEqual(t, expected, rec.Items(), false)
})

t.Run("retry http request", func(t *testing.T) {
Expand Down
51 changes: 34 additions & 17 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,36 +160,53 @@ func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchem
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL)

// Body
setOTelLogBody(&document, record.Body())
setOTelLogBody(&document, record.Body(), record.Attributes())

return document
}

func setOTelLogBody(doc *objmodel.Document, body pcommon.Value) {
func setOTelLogBody(doc *objmodel.Document, body pcommon.Value, attributes pcommon.Map) {
// Determine if this log record is an event, as they are mapped differently
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/general/events.md
_, isEvent := attributes.Get("event.name")

switch body.Type() {
case pcommon.ValueTypeMap:
doc.AddAttribute("body_structured", body)
if isEvent {
doc.AddAttribute("body.structured", body)
} else {
doc.AddAttribute("body.flattened", body)
}
case pcommon.ValueTypeSlice:
slice := body.Slice()
for i := 0; i < slice.Len(); i++ {
switch slice.At(i).Type() {
case pcommon.ValueTypeMap, pcommon.ValueTypeSlice:
doc.AddAttribute("body_structured", body)
return
// output must be an array of objects due to ES limitations
// otherwise, wrap the array in an object
s := body.Slice()
allMaps := true
for i := 0; i < s.Len(); i++ {
if s.At(i).Type() != pcommon.ValueTypeMap {
allMaps = false
}
}

bodyTextVal := pcommon.NewValueSlice()
bodyTextSlice := bodyTextVal.Slice()
bodyTextSlice.EnsureCapacity(slice.Len())
var outVal pcommon.Value
if allMaps {
outVal = body
} else {
vm := pcommon.NewValueMap()
m := vm.SetEmptyMap()
body.Slice().CopyTo(m.PutEmptySlice("value"))
outVal = vm
}

for i := 0; i < slice.Len(); i++ {
elem := slice.At(i)
bodyTextSlice.AppendEmpty().SetStr(elem.AsString())
if isEvent {
doc.AddAttribute("body.structured", outVal)
} else {
doc.AddAttribute("body.flattened", outVal)
}
doc.AddAttribute("body_text", bodyTextVal)
case pcommon.ValueTypeStr:
doc.AddString("body.text", body.Str())
default:
doc.AddString("body_text", body.AsString())
doc.AddString("body.text", body.AsString())
}
}

Expand Down

0 comments on commit fb114a6

Please sign in to comment.