Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/datasetexporter]: Add support for sending logs #21513

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
569 changes: 569 additions & 0 deletions cmd/configschema/go.sum

Large diffs are not rendered by default.

274 changes: 274 additions & 0 deletions cmd/mdatagen/go.sum

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ require (
github.com/containerd/ttrpc v1.1.0 // indirect
github.com/coreos/go-oidc v2.2.1+incompatible // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cskr/pubsub v1.0.2 // indirect
github.com/cyphar/filepath-securejoin v0.2.3 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down Expand Up @@ -549,6 +550,7 @@ require (
github.com/rs/cors v1.9.0 // indirect
github.com/samber/lo v1.37.0 // indirect
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.14 // indirect
github.com/scalyr/dataset-go v0.0.6 // indirect
github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646 // indirect
github.com/secure-systems-lab/go-securesystemslib v0.4.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
Expand Down
576 changes: 576 additions & 0 deletions cmd/otelcontribcol/go.sum

Large diffs are not rendered by default.

563 changes: 563 additions & 0 deletions cmd/oteltestbedcol/go.sum

Large diffs are not rendered by default.

300 changes: 300 additions & 0 deletions cmd/telemetrygen/go.sum

Large diffs are not rendered by default.

128 changes: 128 additions & 0 deletions confmap/provider/s3provider/go.sum

Large diffs are not rendered by default.

276 changes: 276 additions & 0 deletions connector/countconnector/go.sum

Large diffs are not rendered by default.

242 changes: 242 additions & 0 deletions connector/servicegraphconnector/go.sum

Large diffs are not rendered by default.

273 changes: 273 additions & 0 deletions connector/spanmetricsconnector/go.sum

Large diffs are not rendered by default.

187 changes: 187 additions & 0 deletions examples/demo/client/go.sum

Large diffs are not rendered by default.

187 changes: 187 additions & 0 deletions examples/demo/server/go.sum

Large diffs are not rendered by default.

341 changes: 341 additions & 0 deletions exporter/alibabacloudlogserviceexporter/go.sum

Large diffs are not rendered by default.

253 changes: 253 additions & 0 deletions exporter/awscloudwatchlogsexporter/go.sum

Large diffs are not rendered by default.

251 changes: 251 additions & 0 deletions exporter/awsemfexporter/go.sum

Large diffs are not rendered by default.

307 changes: 307 additions & 0 deletions exporter/awskinesisexporter/go.sum

Large diffs are not rendered by default.

272 changes: 272 additions & 0 deletions exporter/awss3exporter/go.sum

Large diffs are not rendered by default.

253 changes: 253 additions & 0 deletions exporter/awsxrayexporter/go.sum

Large diffs are not rendered by default.

256 changes: 256 additions & 0 deletions exporter/azuredataexplorerexporter/go.sum

Large diffs are not rendered by default.

261 changes: 261 additions & 0 deletions exporter/azuremonitorexporter/go.sum

Large diffs are not rendered by default.

256 changes: 256 additions & 0 deletions exporter/carbonexporter/go.sum

Large diffs are not rendered by default.

256 changes: 256 additions & 0 deletions exporter/cassandraexporter/go.sum

Large diffs are not rendered by default.

289 changes: 289 additions & 0 deletions exporter/clickhouseexporter/go.sum

Large diffs are not rendered by default.

253 changes: 253 additions & 0 deletions exporter/coralogixexporter/go.sum

Large diffs are not rendered by default.

280 changes: 280 additions & 0 deletions exporter/datadogexporter/go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion exporter/datasetexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development]: traces, logs |
| Stability | [development]: logs, traces |
| Distributions | [] |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
Expand Down
72 changes: 54 additions & 18 deletions exporter/datasetexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,34 @@ package datasetexporter // import "github.com/open-telemetry/opentelemetry-colle
import (
"fmt"
"os"
"strconv"
"time"

"github.com/scalyr/dataset-go/pkg/buffer"
datasetConfig "github.com/scalyr/dataset-go/pkg/config"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

const maxDelayMs = "15000"
const maxDelay = 15 * time.Millisecond
const tracesMaxWait = 5 * time.Second

type TracesSettings struct {
MaxWait time.Duration `mapstructure:"max_wait"`
}

// NewDefaultTracesSettings returns the default settings for TracesSettings.
func NewDefaultTracesSettings() TracesSettings {
return TracesSettings{
MaxWait: tracesMaxWait,
}
}

type Config struct {
DatasetURL string `mapstructure:"dataset_url"`
APIKey string `mapstructure:"api_key"`
MaxDelayMs string `mapstructure:"max_delay_ms"`
GroupBy []string `mapstructure:"group_by"`
DatasetURL string `mapstructure:"dataset_url"`
APIKey string `mapstructure:"api_key"`
MaxDelay time.Duration `mapstructure:"max_delay"`
GroupBy []string `mapstructure:"group_by"`
TracesSettings `mapstructure:"traces"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.TimeoutSettings `mapstructure:"timeout"`
Expand All @@ -47,8 +62,12 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error {
c.APIKey = os.Getenv("DATASET_API_KEY")
}

if len(c.MaxDelayMs) == 0 {
c.MaxDelayMs = maxDelayMs
if c.MaxDelay == 0 {
c.MaxDelay = maxDelay
}

if c.TracesSettings.MaxWait == 0 {
c.TracesSettings.MaxWait = tracesMaxWait
}

return nil
Expand All @@ -64,15 +83,6 @@ func (c *Config) Validate() error {
return fmt.Errorf("dataset_url is required")
}

_, err := strconv.Atoi(c.MaxDelayMs)
if err != nil {
return fmt.Errorf(
"max_delay_ms must be integer, but %s was used: %w",
c.MaxDelayMs,
err,
)
}

return nil
}

Expand All @@ -81,11 +91,37 @@ func (c *Config) Validate() error {
func (c *Config) String() string {
s := ""
s += fmt.Sprintf("%s: %s; ", "DatasetURL", c.DatasetURL)
s += fmt.Sprintf("%s: %s; ", "MaxDelayMs", c.MaxDelayMs)
s += fmt.Sprintf("%s: %s; ", "MaxDelay", c.MaxDelay)
s += fmt.Sprintf("%s: %s; ", "GroupBy", c.GroupBy)
s += fmt.Sprintf("%s: %+v; ", "TracesSettings", c.TracesSettings)
s += fmt.Sprintf("%s: %+v; ", "RetrySettings", c.RetrySettings)
s += fmt.Sprintf("%s: %+v; ", "QueueSettings", c.QueueSettings)
s += fmt.Sprintf("%s: %+v", "TimeoutSettings", c.TimeoutSettings)

return s
}

func (c *Config) Convert() (*ExporterConfig, error) {
err := c.Validate()
if err != nil {
return nil, fmt.Errorf("config is not valid: %w", err)
}

return &ExporterConfig{
datasetConfig: &datasetConfig.DataSetConfig{
Endpoint: c.DatasetURL,
Tokens: datasetConfig.DataSetTokens{WriteLog: c.APIKey},
MaxBufferDelay: c.MaxDelay,
MaxPayloadB: buffer.LimitBufferSize,
GroupBy: c.GroupBy,
RetryBase: 5 * time.Second,
},
tracesSettings: c.TracesSettings,
},
nil
}

type ExporterConfig struct {
datasetConfig *datasetConfig.DataSetConfig
tracesSettings TracesSettings
}
41 changes: 18 additions & 23 deletions exporter/datasetexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package datasetexporter
import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/suite"
"go.opentelemetry.io/collector/confmap"
Expand Down Expand Up @@ -77,19 +78,19 @@ func (s *SuiteConfig) TestConfigUseEnvWhenSet() {
s.Equal("api_key", config.APIKey)
}

func (s *SuiteConfig) TestConfigUseDefaultForMaxDelay() {
func (s *SuiteConfig) TestConfigUseDefaults() {
config := Config{}
conf := confmap.NewFromStringMap(map[string]interface{}{
"dataset_url": "https://example.com",
"api_key": "secret",
"max_delay_ms": "",
"dataset_url": "https://example.com",
"api_key": "secret",
})
err := config.Unmarshal(conf)
s.Nil(err)

s.Equal("https://example.com", config.DatasetURL)
s.Equal("secret", config.APIKey)
s.Equal("15000", config.MaxDelayMs)
s.Equal(maxDelay, config.MaxDelay)
s.Equal(tracesMaxWait, config.TracesSettings.MaxWait)
}

func (s *SuiteConfig) TestConfigValidate() {
Expand All @@ -103,35 +104,26 @@ func (s *SuiteConfig) TestConfigValidate() {
config: Config{
DatasetURL: "https://example.com",
APIKey: "secret",
MaxDelayMs: "12345",
MaxDelay: 123 * time.Millisecond,
},
expected: nil,
},
{
name: "missing api_key",
config: Config{
DatasetURL: "https://example.com",
MaxDelayMs: "15000",
MaxDelay: maxDelay,
},
expected: fmt.Errorf("api_key is required"),
},
{
name: "missing dataset_url",
config: Config{
APIKey: "1234",
MaxDelayMs: "15000",
APIKey: "1234",
MaxDelay: maxDelay,
},
expected: fmt.Errorf("dataset_url is required"),
},
{
name: "invalid max_delay_ms",
config: Config{
DatasetURL: "https://example.com",
APIKey: "1234",
MaxDelayMs: "abc",
},
expected: fmt.Errorf("max_delay_ms must be integer, but abc was used: strconv.Atoi: parsing \"abc\": invalid syntax"),
},
}

for _, tt := range tests {
Expand All @@ -148,17 +140,20 @@ func (s *SuiteConfig) TestConfigValidate() {

func (s *SuiteConfig) TestConfigString() {
config := Config{
DatasetURL: "https://example.com",
APIKey: "secret",
MaxDelayMs: "1234",
GroupBy: []string{"field1", "field2"},
DatasetURL: "https://example.com",
APIKey: "secret",
MaxDelay: 123,
GroupBy: []string{"field1", "field2"},
TracesSettings: TracesSettings{
MaxWait: 45 * time.Second,
},
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
}

s.Equal(
"DatasetURL: https://example.com; MaxDelayMs: 1234; GroupBy: [field1 field2]; RetrySettings: {Enabled:true InitialInterval:5s RandomizationFactor:0.5 Multiplier:1.5 MaxInterval:30s MaxElapsedTime:5m0s}; QueueSettings: {Enabled:true NumConsumers:10 QueueSize:1000 StorageID:<nil>}; TimeoutSettings: {Timeout:5s}",
"DatasetURL: https://example.com; MaxDelay: 123ns; GroupBy: [field1 field2]; TracesSettings: {MaxWait:45s}; RetrySettings: {Enabled:true InitialInterval:5s RandomizationFactor:0.5 Multiplier:1.5 MaxInterval:30s MaxElapsedTime:5m0s}; QueueSettings: {Enabled:true NumConsumers:10 QueueSize:1000 StorageID:<nil>}; TimeoutSettings: {Timeout:5s}",
config.String(),
)
}
124 changes: 80 additions & 44 deletions exporter/datasetexporter/datasetexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,71 +15,107 @@
package datasetexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datasetexporter"

import (
"context"
"fmt"
"sync"
"net/http"
"reflect"
"strconv"
"time"

"github.com/google/uuid"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
"github.com/scalyr/dataset-go/pkg/api/add_events"
"github.com/scalyr/dataset-go/pkg/client"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

type datasetExporter struct {
limiter *rate.Limiter
logger *zap.Logger
session string
type DatasetExporter struct {
client *client.DataSetClient
limiter *rate.Limiter
logger *zap.Logger
session string
spanTracker spanTracker
}

var exporterInstance *datasetExporter
func NewDatasetExporter(entity string, config *Config, logger *zap.Logger) (*DatasetExporter, error) {
logger.Info("Creating new DataSetExporter",
zap.String("config", config.String()),
zap.String("entity", entity),
)
exporterCfg, err := config.Convert()
if err != nil {
return nil, fmt.Errorf(
"cannot convert config: %s; %w",
config.String(), err,
)
}

func newDatasetExporter(logger *zap.Logger) (*datasetExporter, error) {
logger.Info("Creating new DataSet Exporter with config")
if logger == nil {
return nil, fmt.Errorf("logger has to be set")
client, err := client.NewClient(
exporterCfg.datasetConfig,
&http.Client{Timeout: time.Second * 60},
logger,
)
if err != nil {
logger.Error("Cannot create DataSetClient: ", zap.Error(err))
return nil, fmt.Errorf("cannot create newDatasetExporter: %w", err)
}

return &datasetExporter{
limiter: rate.NewLimiter(100*rate.Every(1*time.Minute), 100), // 100 requests / minute
session: uuid.New().String(),
logger: logger,
return &DatasetExporter{
client: client,
limiter: rate.NewLimiter(100*rate.Every(1*time.Minute), 100), // 100 requests / minute
session: uuid.New().String(),
logger: logger,
spanTracker: newSpanTracker(exporterCfg.tracesSettings.MaxWait),
}, nil
}

var lock = &sync.Mutex{}
func (e *DatasetExporter) shutdown() {
e.client.SendAllAddEventsBuffers()
}

func getDatasetExporter(entity string, config *Config, logger *zap.Logger) (*datasetExporter, error) {
logger.Info(
"Get logger for: ",
zap.String("entity", entity),
)
// TODO: create exporter per config
if exporterInstance == nil {
lock.Lock()
defer lock.Unlock()
if exporterInstance == nil {
logger.Info(
"DataSetExport is using config: ",
zap.String("config", config.String()),
zap.String("entity", entity),
)
instance, err := newDatasetExporter(logger)
if err != nil {
return nil, fmt.Errorf("cannot create new dataset exporter: %w", err)
}
exporterInstance = instance
}
func sendBatch(events []*add_events.EventBundle, client *client.DataSetClient) error {
return client.AddEvents(events)
}

func buildKey(prefix string, separator string, key string, depth int) string {
res := prefix
if depth > 0 && len(prefix) > 0 {
res += separator
}
res += key
return res
}

return exporterInstance, nil
func updateWithPrefixedValuesMap(target map[string]interface{}, prefix string, separator string, source map[string]interface{}, depth int) {
for k, v := range source {
key := buildKey(prefix, separator, k, depth)
updateWithPrefixedValues(target, key, separator, v, depth+1)
}
}

func (e *datasetExporter) consumeLogs(ctx context.Context, ld plog.Logs) error {
return nil
func updateWithPrefixedValuesArray(target map[string]interface{}, prefix string, separator string, source []interface{}, depth int) {
for i, v := range source {
key := buildKey(prefix, separator, strconv.FormatInt(int64(i), 10), depth)
updateWithPrefixedValues(target, key, separator, v, depth+1)
}
}

func (e *datasetExporter) consumeTraces(ctx context.Context, ld ptrace.Traces) error {
return nil
func updateWithPrefixedValues(target map[string]interface{}, prefix string, separator string, source interface{}, depth int) {
st := reflect.TypeOf(source)
switch st.Kind() {
case reflect.Map:
updateWithPrefixedValuesMap(target, prefix, separator, source.(map[string]interface{}), depth)
case reflect.Array, reflect.Slice:
updateWithPrefixedValuesArray(target, prefix, separator, source.([]interface{}), depth)
default:
for {
_, found := target[prefix]
if found {
prefix += separator
} else {
target[prefix] = source
break
}
}

}
}
3 changes: 3 additions & 0 deletions exporter/datasetexporter/examples/e2e/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
docker-compose.yml
Dockerfile
otel-config.yaml
Loading