Skip to content

Commit

Permalink
[elasticsearchexporter] [chore] refactor routing
Browse files Browse the repository at this point in the history
Create documentRouter interface which encapsulates
all logic related to document routing.
  • Loading branch information
axw committed Feb 7, 2025
1 parent 499df64 commit 096e099
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 151 deletions.
203 changes: 142 additions & 61 deletions exporter/elasticsearchexporter/data_stream_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"regexp"
"strings"
"time"
"unicode"

"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -41,74 +42,154 @@ func sanitizeDataStreamField(field, disallowed, appendSuffix string) string {
return field
}

func routeWithDefaults(defaultDSType string) func(
pcommon.Map,
pcommon.Map,
pcommon.Map,
string,
bool,
string,
) elasticsearch.Index {
return func(
recordAttr pcommon.Map,
scopeAttr pcommon.Map,
resourceAttr pcommon.Map,
fIndex string,
otel bool,
scopeName string,
) elasticsearch.Index {
// Order:
// 1. read data_stream.* from attributes
// 2. read elasticsearch.index.* from attributes
// 3. receiver-based routing
// 4. use default hardcoded data_stream.*
dataset, datasetExists := getFromAttributes(elasticsearch.DataStreamDataset, defaultDataStreamDataset, recordAttr, scopeAttr, resourceAttr)
namespace, namespaceExists := getFromAttributes(elasticsearch.DataStreamNamespace, defaultDataStreamNamespace, recordAttr, scopeAttr, resourceAttr)
dataStreamMode := datasetExists || namespaceExists
if !dataStreamMode {
prefix, prefixExists := getFromAttributes(indexPrefix, "", resourceAttr, scopeAttr, recordAttr)
suffix, suffixExists := getFromAttributes(indexSuffix, "", resourceAttr, scopeAttr, recordAttr)
if prefixExists || suffixExists {
return elasticsearch.Index{Index: fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)}
}
}
// documentRouter is an interface for routing records to the appropriate
// index or data stream. The router may mutate record attributes.
type documentRouter interface {
routeLogRecord(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error)
routeDataPoint(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error)
routeSpan(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error)
routeSpanEvent(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error)
}

// Receiver-based routing
// For example, hostmetricsreceiver (or hostmetricsreceiver.otel in the OTel output mode)
// for the scope name
// github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper
if submatch := receiverRegex.FindStringSubmatch(scopeName); len(submatch) > 0 {
receiverName := submatch[1]
dataset = receiverName
func newDocumentRouter(mode MappingMode, dynamicIndex bool, defaultIndex string, cfg *Config) documentRouter {
var router documentRouter
if dynamicIndex {
router = dynamicDocumentRouter{
index: elasticsearch.Index{Index: defaultIndex},
otel: mode == MappingOTel,
}

// For dataset, the naming convention for datastream is expected to be "logs-[dataset].otel-[namespace]".
// This is in order to match the built-in logs-*.otel-* index template.
var datasetSuffix string
if otel {
datasetSuffix += ".otel"
} else {
router = staticDocumentRouter{
index: elasticsearch.Index{Index: defaultIndex},
}
}
if cfg.LogstashFormat.Enabled {
router = logstashDocumentRouter{inner: router, logstashFormat: cfg.LogstashFormat}
}
return router
}

type staticDocumentRouter struct {
index elasticsearch.Index
}

func (r staticDocumentRouter) routeLogRecord(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) {
return r.route(resource, scope, recordAttrs)
}

func (r staticDocumentRouter) routeDataPoint(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) {
return r.route(resource, scope, recordAttrs)
}

func (r staticDocumentRouter) routeSpan(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) {
return r.route(resource, scope, recordAttrs)
}

func (r staticDocumentRouter) routeSpanEvent(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) {
return r.route(resource, scope, recordAttrs)
}

func (r staticDocumentRouter) route(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) {
return r.index, nil
}

type dynamicDocumentRouter struct {
index elasticsearch.Index
otel bool
}

dataset = sanitizeDataStreamField(dataset, disallowedDatasetRunes, datasetSuffix)
namespace = sanitizeDataStreamField(namespace, disallowedNamespaceRunes, "")
return elasticsearch.NewDataStreamIndex(defaultDSType, dataset, namespace)
func (r dynamicDocumentRouter) routeLogRecord(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) {
return routeRecord(resource, scope, recordAttrs, r.index.Index, r.otel, defaultDataStreamTypeLogs), nil
}

func (r dynamicDocumentRouter) routeDataPoint(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) {
return routeRecord(resource, scope, recordAttrs, r.index.Index, r.otel, defaultDataStreamTypeMetrics), nil
}

func (r dynamicDocumentRouter) routeSpan(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) {
return routeRecord(resource, scope, recordAttrs, r.index.Index, r.otel, defaultDataStreamTypeTraces), nil
}

func (r dynamicDocumentRouter) routeSpanEvent(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) {
return routeRecord(resource, scope, recordAttrs, r.index.Index, r.otel, defaultDataStreamTypeLogs), nil
}

type logstashDocumentRouter struct {
inner documentRouter
logstashFormat LogstashFormatSettings
}

func (r logstashDocumentRouter) routeLogRecord(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) {
return r.route(r.inner.routeLogRecord(resource, scope, recordAttrs))
}

func (r logstashDocumentRouter) routeDataPoint(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) {
return r.route(r.inner.routeDataPoint(resource, scope, recordAttrs))
}

func (r logstashDocumentRouter) routeSpan(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) {
return r.route(r.inner.routeSpan(resource, scope, recordAttrs))
}

func (r logstashDocumentRouter) routeSpanEvent(resource pcommon.Resource, scope pcommon.InstrumentationScope, recordAttrs pcommon.Map) (elasticsearch.Index, error) {
return r.route(r.inner.routeSpanEvent(resource, scope, recordAttrs))
}

func (r logstashDocumentRouter) route(index elasticsearch.Index, err error) (elasticsearch.Index, error) {
if err != nil {
return elasticsearch.Index{}, err
}
formattedIndex, err := generateIndexWithLogstashFormat(index.Index, &r.logstashFormat, time.Now())
if err != nil {
return elasticsearch.Index{}, err
}
return elasticsearch.Index{Index: formattedIndex}, nil
}

var (
// routeLogRecord returns the name of the index to send the log record to according to data stream routing related attributes.
// This function may mutate record attributes.
routeLogRecord = routeWithDefaults(defaultDataStreamTypeLogs)
func routeRecord(
resource pcommon.Resource,
scope pcommon.InstrumentationScope,
recordAttr pcommon.Map,
index string,
otel bool,
defaultDSType string,
) elasticsearch.Index {
resourceAttr := resource.Attributes()
scopeAttr := scope.Attributes()

// Order:
// 1. read data_stream.* from attributes
// 2. read elasticsearch.index.* from attributes
// 3. receiver-based routing
// 4. use default hardcoded data_stream.*
dataset, datasetExists := getFromAttributes(elasticsearch.DataStreamDataset, defaultDataStreamDataset, recordAttr, scopeAttr, resourceAttr)
namespace, namespaceExists := getFromAttributes(elasticsearch.DataStreamNamespace, defaultDataStreamNamespace, recordAttr, scopeAttr, resourceAttr)
dataStreamMode := datasetExists || namespaceExists
if !dataStreamMode {
prefix, prefixExists := getFromAttributes(indexPrefix, "", resourceAttr, scopeAttr, recordAttr)
suffix, suffixExists := getFromAttributes(indexSuffix, "", resourceAttr, scopeAttr, recordAttr)
if prefixExists || suffixExists {
return elasticsearch.Index{Index: fmt.Sprintf("%s%s%s", prefix, index, suffix)}
}
}

// routeDataPoint returns the name of the index to send the data point to according to data stream routing related attributes.
// This function may mutate record attributes.
routeDataPoint = routeWithDefaults(defaultDataStreamTypeMetrics)
// Receiver-based routing
// For example, hostmetricsreceiver (or hostmetricsreceiver.otel in the OTel output mode)
// for the scope name
// github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/cpuscraper
if submatch := receiverRegex.FindStringSubmatch(scope.Name()); len(submatch) > 0 {
receiverName := submatch[1]
dataset = receiverName
}

// routeSpan returns the name of the index to send the span to according to data stream routing related attributes.
// This function may mutate record attributes.
routeSpan = routeWithDefaults(defaultDataStreamTypeTraces)
// For dataset, the naming convention for datastream is expected to be "logs-[dataset].otel-[namespace]".
// This is in order to match the built-in logs-*.otel-* index template.
var datasetSuffix string
if otel {
datasetSuffix += ".otel"
}

// routeSpanEvent returns the name of the index to send the span event to according to data stream routing related attributes.
// This function may mutate record attributes.
routeSpanEvent = routeWithDefaults(defaultDataStreamTypeLogs)
)
dataset = sanitizeDataStreamField(dataset, disallowedDatasetRunes, datasetSuffix)
namespace = sanitizeDataStreamField(namespace, disallowedNamespaceRunes, "")
return elasticsearch.NewDataStreamIndex(defaultDSType, dataset, namespace)
}
22 changes: 19 additions & 3 deletions exporter/elasticsearchexporter/data_stream_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch"
Expand Down Expand Up @@ -70,7 +71,12 @@ func TestRouteLogRecord(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ds := routeLogRecord(pcommon.NewMap(), pcommon.NewMap(), pcommon.NewMap(), "", tc.otel, tc.scopeName)
router := dynamicDocumentRouter{otel: tc.otel}
scope := pcommon.NewInstrumentationScope()
scope.SetName(tc.scopeName)

ds, err := router.routeLogRecord(pcommon.NewResource(), scope, pcommon.NewMap())
require.NoError(t, err)
assert.Equal(t, tc.want, ds)
})
}
Expand All @@ -81,7 +87,12 @@ func TestRouteDataPoint(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ds := routeDataPoint(pcommon.NewMap(), pcommon.NewMap(), pcommon.NewMap(), "", tc.otel, tc.scopeName)
router := dynamicDocumentRouter{otel: tc.otel}
scope := pcommon.NewInstrumentationScope()
scope.SetName(tc.scopeName)

ds, err := router.routeDataPoint(pcommon.NewResource(), scope, pcommon.NewMap())
require.NoError(t, err)
assert.Equal(t, tc.want, ds)
})
}
Expand All @@ -92,7 +103,12 @@ func TestRouteSpan(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ds := routeSpan(pcommon.NewMap(), pcommon.NewMap(), pcommon.NewMap(), "", tc.otel, tc.scopeName)
router := dynamicDocumentRouter{otel: tc.otel}
scope := pcommon.NewInstrumentationScope()
scope.SetName(tc.scopeName)

ds, err := router.routeSpan(pcommon.NewResource(), scope, pcommon.NewMap())
require.NoError(t, err)
assert.Equal(t, tc.want, ds)
})
}
Expand Down
Loading

0 comments on commit 096e099

Please sign in to comment.