diff --git a/exporter/sematextexporter/README.md b/exporter/sematextexporter/README.md index 9effe2645983..7baaab2908e1 100644 --- a/exporter/sematextexporter/README.md +++ b/exporter/sematextexporter/README.md @@ -1,5 +1,14 @@ # Sematext Exporter +| Status | | +| ------------- |-----------| +| Stability | [development]: metrics, logs | +| Distributions | [contrib] | +| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Fsematext%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Fsematext) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Fsematext%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Fsematext) | +| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@Eromosele-SM](https://www.github.com/Eromosele-SM) | + +[development]: https://github.com/open-telemetry/opentelemetry-collector#development +[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib This exporter supports sending metrics to [Sematext Cloud](https://sematext.com/) in Influx line protocol format diff --git a/exporter/sematextexporter/config.go b/exporter/sematextexporter/config.go index e314b797a3aa..65dec7f66bfc 100644 --- a/exporter/sematextexporter/config.go +++ b/exporter/sematextexporter/config.go @@ -41,12 +41,12 @@ type MetricsConfig struct { type LogsConfig struct { AppToken string `mapstructure:"app_token"` LogsEndpoint string `mapstructure:"logs_endpoint"` - LogRequests bool + LogRequests bool `mapstructure:"logs_requests"` LogMaxAge int `mapstructure:"logs_max_age"` LogMaxBackups int `mapstructure:"logs_max_backups"` LogMaxSize int `mapstructure:"logs_max_size"` // WriteEvents determines if events are logged - WriteEvents a.Bool + WriteEvents a.Bool } diff --git a/exporter/sematextexporter/es.go b/exporter/sematextexporter/es.go index 6afdabbd371e..db906f63dd98 100644 --- a/exporter/sematextexporter/es.go +++ b/exporter/sematextexporter/es.go @@ -65,48 +65,63 @@ func NewClient(config *Config, logger *logrus.Logger, writer FlatWriter) (Client } func (c *client) Bulk(body interface{}, config *Config) error { - // lookup for client by endpoint - if grp, ok := c.clients[config.LogsEndpoint]; ok { - // build bulk request for each element - // in the underlying slice - bulkRequest := grp.client.Bulk() - if reflect.TypeOf(body).Kind() == reflect.Slice { - v := reflect.ValueOf(body) - for i := 0; i < v.Len(); i++ { - req := elastic.NewBulkIndexRequest().Index(grp.token).Type(artificialDocType).Doc(v.Index(i).Interface()) - bulkRequest.Add(req) - } - } - if bulkRequest.NumberOfActions() > 0 { - if c.config.LogRequests { - c.logger.Infof("sending bulk to %s", config.LogsEndpoint) - } - // required for writing events to log file - p, err := json.Marshal(body) - if err != nil { - return err - } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - res, err := bulkRequest.Do(ctx) - if err != nil { - c.writePayload(string(p), err.Error()) - return err - } - if res.Errors { - for _, item := range res.Failed() { - if item.Error != nil { - c.logger.Errorf("document %s failed to index: %s - %s", item.Id, item.Error.Type, item.Error.Reason) - } - } - } - c.writePayload(string(p), "200") - return nil - } - } - return fmt.Errorf("no client known for %s endpoint", config.LogsEndpoint) + // Lookup for client by endpoint + if grp, ok := c.clients[config.LogsEndpoint]; ok { + bulkRequest := grp.client.Bulk() + + // Dynamically process the body as a slice + if reflect.TypeOf(body).Kind() == reflect.Slice { + v := reflect.ValueOf(body) + for i := 0; i < v.Len(); i++ { + req := elastic.NewBulkIndexRequest(). + Index(grp.token). + Type(artificialDocType). + Doc(v.Index(i).Interface()) + bulkRequest.Add(req) + } + } + + if bulkRequest.NumberOfActions() > 0 { + // Serialize the payload for debugging or printing + payloadBytes, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("failed to serialize payload: %w", err) + } + + // Print or log the payload + fmt.Printf("Payload being sent to Sematext:\n%s\n", string(payloadBytes)) + + if c.config.LogRequests { + c.logger.Infof("Sending bulk to %s", config.LogsEndpoint) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + // Send the payload + res, err := bulkRequest.Do(ctx) + if err != nil { + c.writePayload(string(payloadBytes), err.Error()) + return err + } + + // Check for errors in the response + if res.Errors { + for _, item := range res.Failed() { + if item.Error != nil { + c.logger.Errorf("Document %s failed to index: %s - %s", item.Id, item.Error.Type, item.Error.Reason) + } + } + } + + c.writePayload(string(payloadBytes), "200") + return nil + } + } + return fmt.Errorf("no client known for %s endpoint", config.LogsEndpoint) } + func (c *client) writePayload(payload string, status string) { if c.config.WriteEvents.Load() { c.writer.Write(Formatl(payload, status)) diff --git a/exporter/sematextexporter/exporter.go b/exporter/sematextexporter/exporter.go new file mode 100644 index 000000000000..a94d31ae2318 --- /dev/null +++ b/exporter/sematextexporter/exporter.go @@ -0,0 +1,90 @@ +package sematextexporter + +import ( + "context" + // "fmt" + "time" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" + "github.com/sirupsen/logrus" + "go.uber.org/zap" +) + +type sematextLogsExporter struct { + config *Config + client Client + logger *logrus.Logger +} + +func NewExporter(cfg *Config, set exporter.Settings) *sematextLogsExporter { + logger := logrus.New() + logger.SetFormatter(&FlatFormatter{}) + + // Initialize Sematext client + client, err := NewClient(cfg, logger, FlatWriter{}) + if err != nil { + set.Logger.Error("Failed to create Sematext client", zap.Error(err)) + return nil + } + + return &sematextLogsExporter{ + config: cfg, + client: client, + logger: logger, + } +} + + +func (e *sematextLogsExporter) pushLogsData(ctx context.Context, logs plog.Logs) error { + // Convert logs to Bulk API payload + bulkPayload, err := convertLogsToBulkPayload(logs) + if err != nil { + e.logger.Errorf("Failed to convert logs: %v", err) + return err + } + + // Send logs using the Sematext client + if err := e.client.Bulk(bulkPayload, e.config); err != nil { + e.logger.Errorf("Failed to send logs to Sematext: %v", err) + return err + } + + return nil +} +func convertLogsToBulkPayload(logs plog.Logs) ([]map[string]interface{}, error) { + var bulkPayload []map[string]interface{} + + resourceLogs := logs.ResourceLogs() + + // Iterate through logs to prepare the Bulk payload + for i := 0; i < resourceLogs.Len(); i++ { + scopeLogs := resourceLogs.At(i).ScopeLogs() + for j := 0; j < scopeLogs.Len(); j++ { + logRecords := scopeLogs.At(j).LogRecords() + for k := 0; k < logRecords.Len(); k++ { + record := logRecords.At(k) + + // Build the log entry + logEntry := map[string]interface{}{ + "@timestamp": record.Timestamp().AsTime().Format(time.RFC3339), + "message": record.Body().AsString(), + "severity": record.SeverityText(), + } + + bulkPayload = append(bulkPayload, logEntry) + } + } + } + + return bulkPayload, nil +} +func (e *sematextLogsExporter) Start(ctx context.Context, host component.Host) error { + e.logger.Info("Starting Sematext Logs Exporter...") + return nil +} + +func (e *sematextLogsExporter) Shutdown(ctx context.Context) error { + e.logger.Info("Shutting down Sematext Logs Exporter...") + return nil +} \ No newline at end of file diff --git a/exporter/sematextexporter/factory.go b/exporter/sematextexporter/factory.go index 34f9da06ceeb..b58a0e650c3e 100644 --- a/exporter/sematextexporter/factory.go +++ b/exporter/sematextexporter/factory.go @@ -11,17 +11,18 @@ import ( "time" "github.com/influxdata/influxdb-observability/otel2influx" + "sync/atomic" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" - "github.com/influxdata/influxdb-observability/common" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sematextexporter/internal/metadata" ) + const appToken string = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" // NewFactory creates a factory for the Sematext metrics exporter. func NewFactory() exporter.Factory { @@ -29,11 +30,13 @@ func NewFactory() exporter.Factory { metadata.Type, createDefaultConfig, exporter.WithMetrics(createMetricsExporter, metadata.MetricsStability), + exporter.WithLogs(createLogsExporter, metadata.LogsStability), ) } func createDefaultConfig() component.Config { - return &Config{ + + cfg := &Config{ ClientConfig: confighttp.ClientConfig{ Timeout: 5 * time.Second, Headers: map[string]configopaque.String{ @@ -46,10 +49,19 @@ func createDefaultConfig() component.Config { QueueSettings: exporterhelper.NewDefaultQueueConfig(), PayloadMaxLines: 1_000, PayloadMaxBytes: 300_000, - }, + }, + LogsConfig: LogsConfig{ + AppToken: appToken, + LogRequests:true, + LogMaxAge: 2, + LogMaxSize: 10, + WriteEvents: atomic.Bool{}, + }, BackOffConfig: configretry.NewDefaultBackOffConfig(), Region: "custom", } + cfg.LogsConfig.WriteEvents.Store(false) + return cfg } func createMetricsExporter( @@ -91,3 +103,29 @@ func createMetricsExporter( exporterhelper.WithStart(writer.Start), ) } +// createLogsExporter creates a new logs exporter for Sematext. +func createLogsExporter( + ctx context.Context, + set exporter.Settings, + cfg component.Config, +) (exporter.Logs, error) { + cf := cfg.(*Config) + + // Log the creation of the exporter + set.Logger.Info("Creating Sematext Logs Exporter") + + // Create the Sematext logs exporter + exporter := NewExporter(cf, set) + + // Wrap the exporter with OpenTelemetry helper functions + return exporterhelper.NewLogsExporter( + ctx, + set, + cfg, + exporter.pushLogsData, // Function to process and send logs + exporterhelper.WithQueue(cf.MetricsConfig.QueueSettings), // Optional queue settings + exporterhelper.WithRetry(cf.BackOffConfig), // Optional retry settings + exporterhelper.WithStart(exporter.Start), // Lifecycle start function + exporterhelper.WithShutdown(exporter.Shutdown), // Lifecycle shutdown function + ) +} \ No newline at end of file diff --git a/exporter/sematextexporter/generated_component_test.go b/exporter/sematextexporter/generated_component_test.go index b4fd792655f4..860701cca718 100644 --- a/exporter/sematextexporter/generated_component_test.go +++ b/exporter/sematextexporter/generated_component_test.go @@ -35,6 +35,13 @@ func TestComponentLifecycle(t *testing.T) { createFn func(ctx context.Context, set exporter.Settings, cfg component.Config) (component.Component, error) }{ + { + name: "logs", + createFn: func(ctx context.Context, set exporter.Settings, cfg component.Config) (component.Component, error) { + return factory.CreateLogsExporter(ctx, set, cfg) + }, + }, + { name: "metrics", createFn: func(ctx context.Context, set exporter.Settings, cfg component.Config) (component.Component, error) { diff --git a/exporter/sematextexporter/generated_package_test.go b/exporter/sematextexporter/generated_package_test.go index 770fcd6bf246..40fec63335b3 100644 --- a/exporter/sematextexporter/generated_package_test.go +++ b/exporter/sematextexporter/generated_package_test.go @@ -3,9 +3,8 @@ package sematextexporter import ( - "testing" - "go.uber.org/goleak" + "testing" ) func TestMain(m *testing.M) { diff --git a/exporter/sematextexporter/internal/metadata/generated_status.go b/exporter/sematextexporter/internal/metadata/generated_status.go index e221a6ab9c5d..49e2d6d1a7ef 100644 --- a/exporter/sematextexporter/internal/metadata/generated_status.go +++ b/exporter/sematextexporter/internal/metadata/generated_status.go @@ -13,4 +13,5 @@ var ( const ( MetricsStability = component.StabilityLevelDevelopment + LogsStability = component.StabilityLevelDevelopment ) diff --git a/exporter/sematextexporter/metadata.yaml b/exporter/sematextexporter/metadata.yaml index 3dee628c6c03..6c10e0f22f0b 100644 --- a/exporter/sematextexporter/metadata.yaml +++ b/exporter/sematextexporter/metadata.yaml @@ -3,7 +3,7 @@ type: sematext status: class: exporter stability: - development: [metrics] + development: [metrics,logs] distributions: [contrib] codeowners: active: [Eromosele-SM] diff --git a/exporter/sematextexporter/testdata/config.yaml b/exporter/sematextexporter/testdata/config.yaml index 4a97aa1a07f1..e75c480d5054 100644 --- a/exporter/sematextexporter/testdata/config.yaml +++ b/exporter/sematextexporter/testdata/config.yaml @@ -8,7 +8,7 @@ sematext/override-config: max_interval: 3s max_elapsed_time: 10s metrics: - app_token: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + app_token: 5cf92744-fefb-4f4f-9d6b-63eb194690f8 sending_queue: enabled: true num_consumers: 3 @@ -16,11 +16,11 @@ sematext/override-config: payload_max_lines: 72 payload_max_bytes: 27 logs: - app_token: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx - flush: - bytes: 10485760 - sending_queue: - enabled: true + app_token: 7f7ff081-fcb7-4172-9942-eff5295fa2f6 + logs_requests: true + logs_max_age : 2 + logs_max_backups: 10 +