diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index d3bced17a28c..3b8b36bee445 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -29,6 +29,7 @@ import ( elasticsearch7 "github.com/elastic/go-elasticsearch/v7" esutil7 "github.com/elastic/go-elasticsearch/v7/esutil" "go.opentelemetry.io/collector/consumer/pdata" + "go.uber.org/multierr" "go.uber.org/zap" ) @@ -46,6 +47,7 @@ type elasticsearchExporter struct { client *esClientCurrent bulkIndexer esBulkIndexerCurrent + model mappingModel } var retryOnStatus = []int{500, 502, 503, 504, 429} @@ -72,6 +74,9 @@ func newExporter(logger *zap.Logger, cfg *Config) (*elasticsearchExporter, error maxAttempts = cfg.Retry.MaxRequests } + // TODO: Apply encoding and field mapping settings. + model := &encodeModel{dedup: true, dedot: false} + return &elasticsearchExporter{ logger: logger, client: client, @@ -79,6 +84,7 @@ func newExporter(logger *zap.Logger, cfg *Config) (*elasticsearchExporter, error index: cfg.Index, maxAttempts: maxAttempts, + model: model, }, nil } @@ -87,7 +93,36 @@ func (e *elasticsearchExporter) Shutdown(ctx context.Context) error { } func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld pdata.Logs) error { - panic("TODO") + var errs []error + + rls := ld.ResourceLogs() + for i := 0; i < rls.Len(); i++ { + rl := rls.At(i) + resource := rl.Resource() + ills := rl.InstrumentationLibraryLogs() + for j := 0; j < ills.Len(); j++ { + logs := ills.At(i).Logs() + for k := 0; k < logs.Len(); k++ { + if err := e.pushLogRecord(ctx, resource, logs.At(k)); err != nil { + if cerr := ctx.Err(); cerr != nil { + return cerr + } + + errs = append(errs, err) + } + } + } + } + + return multierr.Combine(errs...) +} + +func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pdata.Resource, record pdata.LogRecord) error { + document, err := e.model.encodeLog(resource, record) + if err != nil { + return fmt.Errorf("Failed to encode log event: %w", err) + } + return e.pushEvent(ctx, document) } func (e *elasticsearchExporter) pushEvent(ctx context.Context, document []byte) error { @@ -184,17 +219,10 @@ func newElasticsearchClient(logger *zap.Logger, config *Config) (*esClientCurren // maxRetries configures the maximum number of event publishing attempts, // including the first send and additional retries. - // Issue: https://github.com/elastic/go-elasticsearch/issues/232 - // - // The elasticsearch7.Client retry requires the count to be >= 1, otherwise - // it defaults to 3. Internally the Clients starts the number of send attempts with 1. - // When maxRetries is 1, retries are disabled, meaning that the event is - // dropped if the first HTTP request failed. - // - // Once the issue is resolved we want `maxRetries = config.Retry.MaxRequests - 1`. - maxRetries := config.Retry.MaxRequests - if maxRetries < 1 || !config.Retry.Enabled { - maxRetries = 1 + maxRetries := config.Retry.MaxRequests - 1 + retryDisabled := !config.Retry.Enabled || maxRetries <= 0 + if retryDisabled { + maxRetries = 0 } return elasticsearch7.NewClient(esConfigCurrent{ @@ -210,7 +238,7 @@ func newElasticsearchClient(logger *zap.Logger, config *Config) (*esClientCurren // configure retry behavior RetryOnStatus: retryOnStatus, - DisableRetry: !config.Retry.Enabled, + DisableRetry: retryDisabled, EnableRetryOnTimeout: config.Retry.Enabled, MaxRetries: maxRetries, RetryBackoff: createElasticsearchBackoffFunc(&config.Retry), diff --git a/exporter/elasticsearchexporter/go.mod b/exporter/elasticsearchexporter/go.mod index 6f47e5560742..e5ef74dafee7 100644 --- a/exporter/elasticsearchexporter/go.mod +++ b/exporter/elasticsearchexporter/go.mod @@ -6,6 +6,7 @@ require ( github.com/armon/go-metrics v0.3.3 // indirect github.com/cenkalti/backoff/v4 v4.1.0 github.com/elastic/go-elasticsearch/v7 v7.12.0 + github.com/elastic/go-structform v0.0.8 github.com/gogo/googleapis v1.3.0 // indirect github.com/hashicorp/go-immutable-radix v1.2.0 // indirect github.com/hashicorp/go-msgpack v0.5.5 // indirect @@ -15,6 +16,7 @@ require ( github.com/pelletier/go-toml v1.8.0 // indirect github.com/stretchr/testify v1.7.0 go.opentelemetry.io/collector v0.27.0 + go.uber.org/multierr v1.5.0 go.uber.org/zap v1.16.0 gopkg.in/ini.v1 v1.57.0 // indirect ) diff --git a/exporter/elasticsearchexporter/go.sum b/exporter/elasticsearchexporter/go.sum index ce5a16732990..9f6ff9000628 100644 --- a/exporter/elasticsearchexporter/go.sum +++ b/exporter/elasticsearchexporter/go.sum @@ -191,6 +191,7 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw= github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b h1:WR1qVJzbvrVywhAk4kMQKRPx09AZVI0NdEdYs59iHcA= github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b/go.mod h1:v9FBN7gdVTpiD/+LZ7Po0UKvROyT87uLVxTHVky/dlQ= github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg= github.com/davecgh/go-spew v0.0.0-20161028175848-04cdfd42973b/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -232,6 +233,8 @@ github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7j github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/elastic/go-elasticsearch/v7 v7.12.0 h1:j4tvcMrZJLp39L2NYvBb7f+lHKPqPHSL3nvB8+/DV+s= github.com/elastic/go-elasticsearch/v7 v7.12.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= +github.com/elastic/go-structform v0.0.8 h1:U0qnb9Zqig7w+FhF+sLI3VZPPi/+2aJ0bIEW6R1z6Tk= +github.com/elastic/go-structform v0.0.8/go.mod h1:CZWf9aIRYY5SuKSmOhtXScE5uQiLZNqAFnwKR4OrIM4= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= diff --git a/exporter/elasticsearchexporter/internal/objmodel/objmodel.go b/exporter/elasticsearchexporter/internal/objmodel/objmodel.go new file mode 100644 index 000000000000..c606385dabb7 --- /dev/null +++ b/exporter/elasticsearchexporter/internal/objmodel/objmodel.go @@ -0,0 +1,431 @@ +// Copyright 2021, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The objmodel package provides tools for converting OpenTelemetry Log records into +// JSON documents. +// +// The JSON parsing in Elasticsearch does not support parsing JSON documents +// with duplicate fields. The fields in the docuemt can be sort and duplicate entries +// can be removed before serializing. Deduplication ensures that ambigious +// events can still be indexed. +// +// With attributes map encoded as a list of key value +// pairs, we might find some structured loggers that create log records with +// duplicate fields. Although the AttributeMap wrapper tries to give a +// dictionary like view into the list, it is not 'complete'. When iterating the map +// for encoding, we still will encounter the duplicates. +// The AttributeMap helpers treat the first occurrence as the actual field. +// For high-performance structured loggers (e.g. zap) the AttributeMap +// semantics are not necessarily correct. Most often the last occurrence will be +// what we want to export, as the last occurrence represents the last overwrite +// within a context/dictionary (the leaf-logger its context). +// Some Loggers might even allow users to create a mix of dotted and dedotted fields. +// The Document type also tries to combine these into a proper structure, such that these mixed +// representations have a unique encoding only, which allows us to properly remove duplicates. +// +// The `.` is special to Elasticsearch. In order to handle common prefixes and attributes +// being a mix of key value pairs with dots and complex objects, we flatten the document first +// before we deduplicate. Final dedotting is optional and only required when +// Ingest Node is used. But either way, we try to present only well formed +// document to Elasticsearch. + +package objmodel + +import ( + "errors" + "io" + "math" + "sort" + "strings" + "time" + + "github.com/elastic/go-structform" + "github.com/elastic/go-structform/json" + "go.opentelemetry.io/collector/consumer/pdata" +) + +// Document is an intermediate representation for converting open telemetry records with arbitrary attributes +// into a JSON document that can be processed by Elasticsearch. +type Document struct { + fields []field +} + +type field struct { + key string + value Value +} + +// Value type that can be added to a Document. +type Value struct { + kind Kind + primitive uint64 + dbl float64 + str string + arr []Value + doc Document + ts time.Time +} + +// Kind represent the internal kind of a value stored in a Document. +type Kind uint8 + +const ( + KindNil Kind = iota + KindBool + KindInt + KindDouble + KindString + KindArr + KindObject + KindTimestamp + KindIgnore +) + +const tsLayout = "2006-01-02T15:04:05.000000000Z" + +var nilValue = Value{kind: KindNil} +var ignoreValue = Value{kind: KindIgnore} + +type idValue interface { + IsEmpty() bool + HexString() string +} + +// DocumentFromAttributes creates a document from a OpenTelemetry attribute +// map. All nested maps will be flattened, with keys being joined using a `.` symbol. +func DocumentFromAttributes(am pdata.AttributeMap) Document { + return DocumentFromAttributesWithPath("", am) +} + +// DocumentFromAttributesWithPath creates a document from a OpenTelemetry attribute +// map. All nested maps will be flattened, with keys being joined using a `.` symbol. +// +// All keys in the map will be prefixed with path. +func DocumentFromAttributesWithPath(path string, am pdata.AttributeMap) Document { + if am.Len() == 0 { + return Document{} + } + + fields := make([]field, 0, am.Len()) + fields = appendAttributeFields(fields, path, am) + return Document{fields} +} + +// AddTimestamp adds a raw timestamp value to the Document. +func (doc *Document) AddTimestamp(key string, ts pdata.Timestamp) { + doc.Add(key, TimestampValue(ts.AsTime())) +} + +// Add adds a converted value to the document. +func (doc *Document) Add(key string, v Value) { + doc.fields = append(doc.fields, field{key: key, value: v}) +} + +// AddString adds a string to the document. +func (doc *Document) AddString(key string, v string) { + if v != "" { + doc.Add(key, StringValue(v)) + } +} + +// AddID adds the hex presentation of an id value to the document. If the ID +// is empty, no value will be added. +func (doc *Document) AddID(key string, id idValue) { + if !id.IsEmpty() { + doc.AddString(key, id.HexString()) + } +} + +// AddInt adds an integer value to the document. +func (doc *Document) AddInt(key string, value int64) { + doc.Add(key, IntValue(value)) +} + +// AddAttributes expands and flattens all key-value pairs from the input attribute map into +// the document. +func (doc *Document) AddAttributes(key string, attributes pdata.AttributeMap) { + doc.fields = appendAttributeFields(doc.fields, key, attributes) +} + +// AddAttribute converts and adds a AttributeValue to the document. If the attribute represents a map, +// the fields will be flattened. +func (doc *Document) AddAttribute(key string, attribute pdata.AttributeValue) { + switch attribute.Type() { + case pdata.AttributeValueTypeNull: + // do not add 'null' + case pdata.AttributeValueTypeMap: + doc.AddAttributes(key, attribute.MapVal()) + default: + doc.Add(key, ValueFromAttribute(attribute)) + } +} + +// Sort sorts all fields in the document by key name. +func (doc *Document) Sort() { + sort.SliceStable(doc.fields, func(i, j int) bool { + return doc.fields[i].key < doc.fields[j].key + }) + + for i := range doc.fields { + fld := &doc.fields[i] + fld.value.Sort() + } +} + +// Dedup removes fields from the document, that have duplicate keys. +// The filtering only keeps the last value for a key. +// +// Dedup ensure that keys are sorted. +func (doc *Document) Dedup() { + // 1. Always ensure the fields are sorted, Dedup support requires + // Fields to be sorted. + doc.Sort() + + // 2. rename fields if a primitive value is overwritten by an object. + // For example the pair (path.x=1, path.x.a="test") becomes: + // (path.x.value=1, path.x.a="test"). + // + // NOTE: We do the renaming, in order to preserve the original value + // in case of conflicts after dedotting, which would lead to the removal of the field. + // For example docker/k8s labels tend to use `.`, which need to be handled in case + // The collector does pass us these kind of labels as an AttributeMap. + // + // NOTE: If the embedded document already has a field name `value`, we will remove the renamed + // field in favor of the `value` field in the document. + // + // This step removes potential conflicts when dedotting and serializing fields. + var renamed bool + for i := 0; i < len(doc.fields)-1; i++ { + key, nextKey := doc.fields[i].key, doc.fields[i+1].key + if len(key) < len(nextKey) && strings.HasPrefix(nextKey, key) && nextKey[len(key)] == '.' { + renamed = true + doc.fields[i].key = key + ".value" + } + } + if renamed { + doc.Sort() + } + + // 3. mark duplicates as 'ignore' + // + // This step ensures that we do not have duplicate fields names when serializing. + // Elasticsearch JSON parser will fail otherwise. + for i := 0; i < len(doc.fields)-1; i++ { + if doc.fields[i].key == doc.fields[i+1].key { + doc.fields[i].value = ignoreValue + } + } + + // 4. fix objects that might be stored in arrays + for i := range doc.fields { + doc.fields[i].value.Dedup() + } +} + +// Serializes writes the document to the given writer. The serializer will create nested objects if dedot is true. +// +// NOTE: The documented MUST be sorted if dedot is true. +func (doc *Document) Serialize(w io.Writer, dedot bool) error { + v := json.NewVisitor(w) + return doc.iterJSON(v, dedot) +} + +func (doc *Document) iterJSON(v *json.Visitor, dedot bool) error { + if dedot { + return doc.iterJSONDedot(v) + } + return doc.iterJSONFlat(v) +} + +func (doc *Document) iterJSONFlat(w *json.Visitor) error { + w.OnObjectStart(-1, structform.AnyType) + defer w.OnObjectFinished() + + for i := range doc.fields { + fld := &doc.fields[i] + + // filter out empty values + if fld.value.kind == KindIgnore || + fld.value.kind == KindNil || + (fld.value.kind == KindArr && len(fld.value.arr) == 0) { + continue + } + + w.OnKey(fld.key) + if err := fld.value.iterJSON(w, true); err != nil { + return err + } + } + + return nil +} + +func (doc *Document) iterJSONDedot(w *json.Visitor) error { + return errors.New("TODO") +} + +// StringValue create a new value from a string. +func StringValue(str string) Value { return Value{kind: KindString, str: str} } + +// IntValue creates a new value from an integer. +func IntValue(i int64) Value { return Value{kind: KindInt, primitive: uint64(i)} } + +// DoubleValue creates a new value from a double value.. +func DoubleValue(d float64) Value { return Value{kind: KindDouble, dbl: d} } + +// BoolValue creates a new value from a double value.. +func BoolValue(b bool) Value { + var v uint64 + if b { + v = 1 + } + return Value{kind: KindBool, primitive: v} +} + +// ArrValue combines multiple values into an array value. +func ArrValue(values ...Value) Value { + return Value{kind: KindArr, arr: values} +} + +// TimestampValue create a new value from a time.Time. +func TimestampValue(ts time.Time) Value { + return Value{kind: KindTimestamp, ts: ts} +} + +// ValueFromAttribute converts a AttributeValue into a value. +func ValueFromAttribute(attr pdata.AttributeValue) Value { + switch attr.Type() { + case pdata.AttributeValueTypeInt: + return IntValue(attr.IntVal()) + case pdata.AttributeValueTypeDouble: + return DoubleValue(attr.DoubleVal()) + case pdata.AttributeValueTypeString: + return StringValue(attr.StringVal()) + case pdata.AttributeValueTypeBool: + return BoolValue(attr.BoolVal()) + case pdata.AttributeValueTypeArray: + sub := arrFromAttributes(attr.ArrayVal()) + return ArrValue(sub...) + case pdata.AttributeValueTypeMap: + sub := DocumentFromAttributes(attr.MapVal()) + return Value{kind: KindObject, doc: sub} + default: + return nilValue + } +} + +// Sort recursively sorts all keys in docuemts held by the value. +func (v *Value) Sort() { + switch v.kind { + case KindObject: + v.doc.Sort() + case KindArr: + for i := range v.arr { + v.arr[i].Sort() + } + } +} + +// Dedup recursively dedups keys in stored documents. +// +// NOTE: The value MUST be sorted. +func (v *Value) Dedup() { + switch v.kind { + case KindObject: + v.doc.Dedup() + case KindArr: + for i := range v.arr { + v.arr[i].Dedup() + } + } +} + +func (v *Value) iterJSON(w *json.Visitor, dedot bool) error { + switch v.kind { + case KindNil: + return w.OnNil() + case KindBool: + return w.OnBool(v.primitive == 1) + case KindInt: + return w.OnInt64(int64(v.primitive)) + case KindDouble: + if math.IsNaN(v.dbl) || math.IsInf(v.dbl, 0) { + // NaN and Inf are undefined for JSON. Let's serialize to "null" + return w.OnNil() + } + return w.OnFloat64(v.dbl) + case KindString: + return w.OnString(v.str) + case KindTimestamp: + str := v.ts.UTC().Format(tsLayout) + return w.OnString(str) + case KindObject: + if len(v.doc.fields) == 0 { + return w.OnNil() + } + return v.doc.iterJSON(w, dedot) + case KindArr: + w.OnArrayStart(-1, structform.AnyType) + for i := range v.arr { + if err := v.arr[i].iterJSON(w, dedot); err != nil { + return err + } + } + w.OnArrayFinished() + } + + return nil +} + +func arrFromAttributes(aa pdata.AnyValueArray) []Value { + if aa.Len() == 0 { + return nil + } + + values := make([]Value, aa.Len()) + for i := 0; i < aa.Len(); i++ { + values[i] = ValueFromAttribute(aa.At(i)) + } + return values +} + +func appendAttributeFields(fields []field, path string, am pdata.AttributeMap) []field { + am.Range(func(k string, val pdata.AttributeValue) bool { + fields = appendAttributeValue(fields, path, k, val) + return true + }) + return fields +} + +func appendAttributeValue(fields []field, path string, key string, attr pdata.AttributeValue) []field { + if attr.Type() == pdata.AttributeValueTypeNull { + return fields + } + + if attr.Type() == pdata.AttributeValueTypeMap { + return appendAttributeFields(fields, flattenKey(path, key), attr.MapVal()) + } + + return append(fields, field{ + key: flattenKey(path, key), + value: ValueFromAttribute(attr), + }) +} + +func flattenKey(path, key string) string { + if path == "" { + return key + } + return path + "." + key +} diff --git a/exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go b/exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go new file mode 100644 index 000000000000..11c112275417 --- /dev/null +++ b/exporter/elasticsearchexporter/internal/objmodel/objmodel_test.go @@ -0,0 +1,331 @@ +// Copyright 2021, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package objmodel + +import ( + "math" + "strings" + "testing" + "time" + + "github.com/elastic/go-structform/json" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/pdata" +) + +var dijkstra = time.Date(1930, 5, 11, 16, 33, 11, 123456789, time.UTC) + +func TestObjectModel_CreateMap(t *testing.T) { + tests := map[string]struct { + build func() Document + want Document + }{ + "from empty map": { + build: func() Document { + return DocumentFromAttributes(pdata.NewAttributeMap().InitFromMap(map[string]pdata.AttributeValue{})) + }, + }, + "from map": { + build: func() Document { + return DocumentFromAttributes(pdata.NewAttributeMap().InitFromMap(map[string]pdata.AttributeValue{ + "i": pdata.NewAttributeValueInt(42), + "str": pdata.NewAttributeValueString("test"), + })) + }, + want: Document{[]field{{"i", IntValue(42)}, {"str", StringValue("test")}}}, + }, + "ignores nil values": { + build: func() Document { + return DocumentFromAttributes(pdata.NewAttributeMap().InitFromMap(map[string]pdata.AttributeValue{ + "null": pdata.NewAttributeValueNull(), + "str": pdata.NewAttributeValueString("test"), + })) + }, + want: Document{[]field{{"str", StringValue("test")}}}, + }, + "from map with prefix": { + build: func() Document { + return DocumentFromAttributesWithPath("prefix", pdata.NewAttributeMap().InitFromMap(map[string]pdata.AttributeValue{ + "i": pdata.NewAttributeValueInt(42), + "str": pdata.NewAttributeValueString("test"), + })) + }, + want: Document{[]field{{"prefix.i", IntValue(42)}, {"prefix.str", StringValue("test")}}}, + }, + "add attributes with key": { + build: func() (doc Document) { + doc.AddAttributes("prefix", pdata.NewAttributeMap().InitFromMap(map[string]pdata.AttributeValue{ + "i": pdata.NewAttributeValueInt(42), + "str": pdata.NewAttributeValueString("test"), + })) + return doc + }, + want: Document{[]field{{"prefix.i", IntValue(42)}, {"prefix.str", StringValue("test")}}}, + }, + "add attribute flattens a map value": { + build: func() (doc Document) { + mapVal := pdata.NewAttributeValueMap() + m := mapVal.MapVal() + m.InsertInt("i", 42) + m.InsertString("str", "test") + doc.AddAttribute("prefix", mapVal) + return doc + }, + want: Document{[]field{{"prefix.i", IntValue(42)}, {"prefix.str", StringValue("test")}}}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + doc := test.build() + doc.Sort() + assert.Equal(t, test.want, doc) + }) + } +} + +func TestDocument_Sort(t *testing.T) { + tests := map[string]struct { + build func() Document + want Document + }{ + "keys are sorted": { + build: func() (doc Document) { + doc.AddInt("z", 26) + doc.AddInt("a", 1) + return doc + }, + want: Document{[]field{{"a", IntValue(1)}, {"z", IntValue(26)}}}, + }, + "sorting is stable": { + build: func() (doc Document) { + doc.AddInt("a", 1) + doc.AddInt("c", 3) + doc.AddInt("a", 2) + return doc + }, + want: Document{[]field{{"a", IntValue(1)}, {"a", IntValue(2)}, {"c", IntValue(3)}}}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + doc := test.build() + doc.Sort() + assert.Equal(t, test.want, doc) + }) + } + +} + +func TestObjectModel_Dedup(t *testing.T) { + tests := map[string]struct { + build func() Document + want Document + }{ + "no duplicates": { + build: func() (doc Document) { + doc.AddInt("a", 1) + doc.AddInt("c", 3) + return doc + }, + want: Document{[]field{{"a", IntValue(1)}, {"c", IntValue(3)}}}, + }, + "duplicate keys": { + build: func() (doc Document) { + doc.AddInt("a", 1) + doc.AddInt("c", 3) + doc.AddInt("a", 2) + return doc + }, + want: Document{[]field{{"a", ignoreValue}, {"a", IntValue(2)}, {"c", IntValue(3)}}}, + }, + "duplicate after flattening from map: namespace object at end": { + build: func() Document { + namespace := pdata.NewAttributeValueMap() + namespace.MapVal().InsertInt("a", 23) + + am := pdata.NewAttributeMap() + am.InsertInt("namespace.a", 42) + am.InsertString("toplevel", "test") + am.Insert("namespace", namespace) + return DocumentFromAttributes(am) + }, + want: Document{[]field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(23)}, {"toplevel", StringValue("test")}}}, + }, + "duplicate after flattening from map: namespace object at beginning": { + build: func() Document { + namespace := pdata.NewAttributeValueMap() + namespace.MapVal().InsertInt("a", 23) + + am := pdata.NewAttributeMap() + am.Insert("namespace", namespace) + am.InsertInt("namespace.a", 42) + am.InsertString("toplevel", "test") + return DocumentFromAttributes(am) + }, + want: Document{[]field{{"namespace.a", ignoreValue}, {"namespace.a", IntValue(42)}, {"toplevel", StringValue("test")}}}, + }, + "dedup in arrays": { + build: func() (doc Document) { + var embedded Document + embedded.AddInt("a", 1) + embedded.AddInt("c", 3) + embedded.AddInt("a", 2) + + doc.Add("arr", ArrValue(Value{kind: KindObject, doc: embedded})) + return doc + }, + want: Document{[]field{{"arr", ArrValue(Value{kind: KindObject, doc: Document{[]field{ + {"a", ignoreValue}, + {"a", IntValue(2)}, + {"c", IntValue(3)}, + }}})}}}, + }, + "dedup mix of primitive and object lifts primitive": { + build: func() (doc Document) { + doc.AddInt("namespace", 1) + doc.AddInt("namespace.a", 2) + return doc + }, + want: Document{[]field{{"namespace.a", IntValue(2)}, {"namespace.value", IntValue(1)}}}, + }, + "dedup removes primitive if value exists": { + build: func() (doc Document) { + doc.AddInt("namespace", 1) + doc.AddInt("namespace.a", 2) + doc.AddInt("namespace.value", 3) + return doc + }, + want: Document{[]field{{"namespace.a", IntValue(2)}, {"namespace.value", ignoreValue}, {"namespace.value", IntValue(3)}}}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + doc := test.build() + doc.Sort() + doc.Dedup() + assert.Equal(t, test.want, doc) + }) + } +} + +func TestValue_FromAttribute(t *testing.T) { + tests := map[string]struct { + in pdata.AttributeValue + want Value + }{ + "null": { + in: pdata.NewAttributeValueNull(), + want: nilValue, + }, + "string": { + in: pdata.NewAttributeValueString("test"), + want: StringValue("test"), + }, + "int": { + in: pdata.NewAttributeValueInt(23), + want: IntValue(23), + }, + "double": { + in: pdata.NewAttributeValueDouble(3.14), + want: DoubleValue(3.14), + }, + "bool": { + in: pdata.NewAttributeValueBool(true), + want: BoolValue(true), + }, + "empty array": { + in: pdata.NewAttributeValueArray(), + want: Value{kind: KindArr}, + }, + "non-empty array": { + in: func() pdata.AttributeValue { + v := pdata.NewAttributeValueArray() + arr := v.ArrayVal() + arr.Append(pdata.NewAttributeValueInt(1)) + return v + }(), + want: ArrValue(IntValue(1)), + }, + "empty map": { + in: pdata.NewAttributeValueMap(), + want: Value{kind: KindObject}, + }, + "non-empty map": { + in: func() pdata.AttributeValue { + v := pdata.NewAttributeValueMap() + m := v.MapVal() + m.Insert("a", pdata.NewAttributeValueInt(1)) + return v + }(), + want: Value{kind: KindObject, doc: Document{[]field{{"a", IntValue(1)}}}}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + v := ValueFromAttribute(test.in) + assert.Equal(t, test.want, v) + }) + } +} + +func TestValue_Serialize(t *testing.T) { + tests := map[string]struct { + value Value + want string + }{ + "nil value": {value: nilValue, want: "null"}, + "bool value: true": {value: BoolValue(true), want: "true"}, + "bool value: false": {value: BoolValue(false), want: "false"}, + "int value": {value: IntValue(42), want: "42"}, + "double value": {value: DoubleValue(3.14), want: "3.14"}, + "NaN is undefined": {value: DoubleValue(math.NaN()), want: "null"}, + "Inf is undefined": {value: DoubleValue(math.Inf(0)), want: "null"}, + "string value": {value: StringValue("Hello World!"), want: `"Hello World!"`}, + "timestamp": { + value: TimestampValue(dijkstra), + want: `"1930-05-11T16:33:11.123456789Z"`, + }, + "array": { + value: ArrValue(BoolValue(true), IntValue(23)), + want: `[true,23]`, + }, + "object": { + value: func() Value { + doc := Document{} + doc.AddString("a", "b") + return Value{kind: KindObject, doc: doc} + }(), + want: `{"a":"b"}`, + }, + "empty object": { + value: Value{kind: KindObject, doc: Document{}}, + want: "null", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + var buf strings.Builder + err := test.value.iterJSON(json.NewVisitor(&buf), false) + require.NoError(t, err) + assert.Equal(t, test.want, buf.String()) + }) + } +} diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go new file mode 100644 index 000000000000..0347e6f8034b --- /dev/null +++ b/exporter/elasticsearchexporter/model.go @@ -0,0 +1,62 @@ +// Copyright 2021, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package elasticsearchexporter + +import ( + "bytes" + + "go.opentelemetry.io/collector/consumer/pdata" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel" +) + +type mappingModel interface { + encodeLog(pdata.Resource, pdata.LogRecord) ([]byte, error) +} + +// encodeModel tries to keep the event as close to the original open telemetry semantics as is. +// No fields will be mapped by default. +// +// Field deduplication and dedotting of attributes is supported by the encodeModel. +// +// See: https://github.com/open-telemetry/oteps/blob/master/text/logs/0097-log-data-model.md +type encodeModel struct { + dedup bool + dedot bool +} + +func (m *encodeModel) encodeLog(resource pdata.Resource, record pdata.LogRecord) ([]byte, error) { + var document objmodel.Document + document.AddTimestamp("@timestamp", record.Timestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used. + document.AddID("TraceId", record.TraceID()) + document.AddID("SpanId", record.SpanID()) + document.AddInt("TraceFlags", int64(record.Flags())) + document.AddString("SeverityText", record.SeverityText()) + document.AddInt("SeverityNumber", int64(record.SeverityNumber())) + document.AddString("Name", record.Name()) + document.AddAttribute("Body", record.Body()) + document.AddAttributes("Attributes", record.Attributes()) + document.AddAttributes("Resource", resource.Attributes()) + + if m.dedup { + document.Dedup() + } else if m.dedot { + document.Sort() + } + + var buf bytes.Buffer + err := document.Serialize(&buf, m.dedot) + return buf.Bytes(), err +}