Skip to content

Commit

Permalink
Elasticsearch exporter: Init JSON encoding support (#3101)
Browse files Browse the repository at this point in the history
**Description:** The change adds support for encoding OpenTelemetry log records to JSON.
The encoder tries to remove duplicate entries in case the attribute map
(which is an array if key value pairs) contains duplicates.

Mixed style attributes, with key names having dots and other fields
having attribute maps as value will be normalized, such that the JSON
encoding will be either completely flat, or values are properly merged
into a single JSON object (when dedot is enabled). The normalization
helps with deduplication, and (not yet implemented) dedot support will
allow us to present a well formated JSON event if Ingest Node is used
(The dedotting in Elasticsearch does happen after Ingest Node).

Next:
- Dedotting support
- Custom (configurable) field mapping
- `publishLogs` unit testing
- Integration tests

**Link to tracking Issue:** #1800

**Testing:**
The internal document type with field deduplication is fully tested (89%) via unit tests.

The change also hooks up `publishLogs`, but this functionality is not covered by tests yet, as the PR has already grown quite a bit (I tested locally with a custom otel collector distribution). I would like to add additional tests in a separate PR, to keep focused on the JSON encoding only here.
  • Loading branch information
Steffen Siering authored May 20, 2021
1 parent f89ebd2 commit 9404a13
Show file tree
Hide file tree
Showing 6 changed files with 870 additions and 13 deletions.
54 changes: 41 additions & 13 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
elasticsearch7 "github.com/elastic/go-elasticsearch/v7"
esutil7 "github.com/elastic/go-elasticsearch/v7/esutil"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/multierr"
"go.uber.org/zap"
)

Expand All @@ -46,6 +47,7 @@ type elasticsearchExporter struct {

client *esClientCurrent
bulkIndexer esBulkIndexerCurrent
model mappingModel
}

var retryOnStatus = []int{500, 502, 503, 504, 429}
Expand All @@ -72,13 +74,17 @@ func newExporter(logger *zap.Logger, cfg *Config) (*elasticsearchExporter, error
maxAttempts = cfg.Retry.MaxRequests
}

// TODO: Apply encoding and field mapping settings.
model := &encodeModel{dedup: true, dedot: false}

return &elasticsearchExporter{
logger: logger,
client: client,
bulkIndexer: bulkIndexer,

index: cfg.Index,
maxAttempts: maxAttempts,
model: model,
}, nil
}

Expand All @@ -87,7 +93,36 @@ func (e *elasticsearchExporter) Shutdown(ctx context.Context) error {
}

func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld pdata.Logs) error {
panic("TODO")
var errs []error

rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
rl := rls.At(i)
resource := rl.Resource()
ills := rl.InstrumentationLibraryLogs()
for j := 0; j < ills.Len(); j++ {
logs := ills.At(i).Logs()
for k := 0; k < logs.Len(); k++ {
if err := e.pushLogRecord(ctx, resource, logs.At(k)); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}

errs = append(errs, err)
}
}
}
}

return multierr.Combine(errs...)
}

func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pdata.Resource, record pdata.LogRecord) error {
document, err := e.model.encodeLog(resource, record)
if err != nil {
return fmt.Errorf("Failed to encode log event: %w", err)
}
return e.pushEvent(ctx, document)
}

func (e *elasticsearchExporter) pushEvent(ctx context.Context, document []byte) error {
Expand Down Expand Up @@ -184,17 +219,10 @@ func newElasticsearchClient(logger *zap.Logger, config *Config) (*esClientCurren

// maxRetries configures the maximum number of event publishing attempts,
// including the first send and additional retries.
// Issue: https://github.com/elastic/go-elasticsearch/issues/232
//
// The elasticsearch7.Client retry requires the count to be >= 1, otherwise
// it defaults to 3. Internally the Clients starts the number of send attempts with 1.
// When maxRetries is 1, retries are disabled, meaning that the event is
// dropped if the first HTTP request failed.
//
// Once the issue is resolved we want `maxRetries = config.Retry.MaxRequests - 1`.
maxRetries := config.Retry.MaxRequests
if maxRetries < 1 || !config.Retry.Enabled {
maxRetries = 1
maxRetries := config.Retry.MaxRequests - 1
retryDisabled := !config.Retry.Enabled || maxRetries <= 0
if retryDisabled {
maxRetries = 0
}

return elasticsearch7.NewClient(esConfigCurrent{
Expand All @@ -210,7 +238,7 @@ func newElasticsearchClient(logger *zap.Logger, config *Config) (*esClientCurren

// configure retry behavior
RetryOnStatus: retryOnStatus,
DisableRetry: !config.Retry.Enabled,
DisableRetry: retryDisabled,
EnableRetryOnTimeout: config.Retry.Enabled,
MaxRetries: maxRetries,
RetryBackoff: createElasticsearchBackoffFunc(&config.Retry),
Expand Down
2 changes: 2 additions & 0 deletions exporter/elasticsearchexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/armon/go-metrics v0.3.3 // indirect
github.com/cenkalti/backoff/v4 v4.1.0
github.com/elastic/go-elasticsearch/v7 v7.12.0
github.com/elastic/go-structform v0.0.8
github.com/gogo/googleapis v1.3.0 // indirect
github.com/hashicorp/go-immutable-radix v1.2.0 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
Expand All @@ -15,6 +16,7 @@ require (
github.com/pelletier/go-toml v1.8.0 // indirect
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/collector v0.27.0
go.uber.org/multierr v1.5.0
go.uber.org/zap v1.16.0
gopkg.in/ini.v1 v1.57.0 // indirect
)
3 changes: 3 additions & 0 deletions exporter/elasticsearchexporter/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b h1:WR1qVJzbvrVywhAk4kMQKRPx09AZVI0NdEdYs59iHcA=
github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b/go.mod h1:v9FBN7gdVTpiD/+LZ7Po0UKvROyT87uLVxTHVky/dlQ=
github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg=
github.com/davecgh/go-spew v0.0.0-20161028175848-04cdfd42973b/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -232,6 +233,8 @@ github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7j
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/elastic/go-elasticsearch/v7 v7.12.0 h1:j4tvcMrZJLp39L2NYvBb7f+lHKPqPHSL3nvB8+/DV+s=
github.com/elastic/go-elasticsearch/v7 v7.12.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/elastic/go-structform v0.0.8 h1:U0qnb9Zqig7w+FhF+sLI3VZPPi/+2aJ0bIEW6R1z6Tk=
github.com/elastic/go-structform v0.0.8/go.mod h1:CZWf9aIRYY5SuKSmOhtXScE5uQiLZNqAFnwKR4OrIM4=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
Expand Down
Loading

0 comments on commit 9404a13

Please sign in to comment.