Skip to content

Commit

Permalink
Another heavy commit
Browse files Browse the repository at this point in the history
Had to setup factory settings for the exporter
  • Loading branch information
AkhigbeEromo committed Nov 29, 2024
1 parent a94b20b commit ff00870
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 54 deletions.
9 changes: 9 additions & 0 deletions exporter/sematextexporter/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Sematext Exporter
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development]: metrics, logs |
| Distributions | [contrib] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Fsematext%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Fsematext) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Fsematext%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Fsematext) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@Eromosele-SM](https://github.com/Eromosele-SM) |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
<!-- end autogenerated section -->

This exporter supports sending metrics to [Sematext Cloud](https://sematext.com/) in Influx line protocol format
Expand Down
4 changes: 2 additions & 2 deletions exporter/sematextexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ type MetricsConfig struct {
type LogsConfig struct {
AppToken string `mapstructure:"app_token"`
LogsEndpoint string `mapstructure:"logs_endpoint"`
LogRequests bool
LogRequests bool `mapstructure:"logs_requests"`
LogMaxAge int `mapstructure:"logs_max_age"`
LogMaxBackups int `mapstructure:"logs_max_backups"`
LogMaxSize int `mapstructure:"logs_max_size"`
// WriteEvents determines if events are logged
WriteEvents a.Bool
WriteEvents a.Bool
}


Expand Down
95 changes: 55 additions & 40 deletions exporter/sematextexporter/es.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,48 +65,63 @@ func NewClient(config *Config, logger *logrus.Logger, writer FlatWriter) (Client
}

func (c *client) Bulk(body interface{}, config *Config) error {
// lookup for client by endpoint
if grp, ok := c.clients[config.LogsEndpoint]; ok {
// build bulk request for each element
// in the underlying slice
bulkRequest := grp.client.Bulk()
if reflect.TypeOf(body).Kind() == reflect.Slice {
v := reflect.ValueOf(body)
for i := 0; i < v.Len(); i++ {
req := elastic.NewBulkIndexRequest().Index(grp.token).Type(artificialDocType).Doc(v.Index(i).Interface())
bulkRequest.Add(req)
}
}
if bulkRequest.NumberOfActions() > 0 {
if c.config.LogRequests {
c.logger.Infof("sending bulk to %s", config.LogsEndpoint)
}
// required for writing events to log file
p, err := json.Marshal(body)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
res, err := bulkRequest.Do(ctx)
if err != nil {
c.writePayload(string(p), err.Error())
return err
}
if res.Errors {
for _, item := range res.Failed() {
if item.Error != nil {
c.logger.Errorf("document %s failed to index: %s - %s", item.Id, item.Error.Type, item.Error.Reason)
}
}
}
c.writePayload(string(p), "200")
return nil
}
}
return fmt.Errorf("no client known for %s endpoint", config.LogsEndpoint)
// Lookup for client by endpoint
if grp, ok := c.clients[config.LogsEndpoint]; ok {
bulkRequest := grp.client.Bulk()

// Dynamically process the body as a slice
if reflect.TypeOf(body).Kind() == reflect.Slice {
v := reflect.ValueOf(body)
for i := 0; i < v.Len(); i++ {
req := elastic.NewBulkIndexRequest().
Index(grp.token).
Type(artificialDocType).
Doc(v.Index(i).Interface())
bulkRequest.Add(req)
}
}

if bulkRequest.NumberOfActions() > 0 {
// Serialize the payload for debugging or printing
payloadBytes, err := json.Marshal(body)
if err != nil {
return fmt.Errorf("failed to serialize payload: %w", err)
}

// Print or log the payload
fmt.Printf("Payload being sent to Sematext:\n%s\n", string(payloadBytes))

if c.config.LogRequests {
c.logger.Infof("Sending bulk to %s", config.LogsEndpoint)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

// Send the payload
res, err := bulkRequest.Do(ctx)
if err != nil {
c.writePayload(string(payloadBytes), err.Error())
return err
}

// Check for errors in the response
if res.Errors {
for _, item := range res.Failed() {
if item.Error != nil {
c.logger.Errorf("Document %s failed to index: %s - %s", item.Id, item.Error.Type, item.Error.Reason)
}
}
}

c.writePayload(string(payloadBytes), "200")
return nil
}
}
return fmt.Errorf("no client known for %s endpoint", config.LogsEndpoint)
}


func (c *client) writePayload(payload string, status string) {
if c.config.WriteEvents.Load() {
c.writer.Write(Formatl(payload, status))
Expand Down
90 changes: 90 additions & 0 deletions exporter/sematextexporter/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package sematextexporter

import (
"context"
// "fmt"
"time"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
)

type sematextLogsExporter struct {
config *Config
client Client
logger *logrus.Logger
}

func NewExporter(cfg *Config, set exporter.Settings) *sematextLogsExporter {
logger := logrus.New()
logger.SetFormatter(&FlatFormatter{})

// Initialize Sematext client
client, err := NewClient(cfg, logger, FlatWriter{})
if err != nil {
set.Logger.Error("Failed to create Sematext client", zap.Error(err))
return nil
}

return &sematextLogsExporter{
config: cfg,
client: client,
logger: logger,
}
}


func (e *sematextLogsExporter) pushLogsData(ctx context.Context, logs plog.Logs) error {
// Convert logs to Bulk API payload
bulkPayload, err := convertLogsToBulkPayload(logs)
if err != nil {
e.logger.Errorf("Failed to convert logs: %v", err)
return err
}

// Send logs using the Sematext client
if err := e.client.Bulk(bulkPayload, e.config); err != nil {
e.logger.Errorf("Failed to send logs to Sematext: %v", err)
return err
}

return nil
}
func convertLogsToBulkPayload(logs plog.Logs) ([]map[string]interface{}, error) {
var bulkPayload []map[string]interface{}

resourceLogs := logs.ResourceLogs()

// Iterate through logs to prepare the Bulk payload
for i := 0; i < resourceLogs.Len(); i++ {
scopeLogs := resourceLogs.At(i).ScopeLogs()
for j := 0; j < scopeLogs.Len(); j++ {
logRecords := scopeLogs.At(j).LogRecords()
for k := 0; k < logRecords.Len(); k++ {
record := logRecords.At(k)

// Build the log entry
logEntry := map[string]interface{}{
"@timestamp": record.Timestamp().AsTime().Format(time.RFC3339),
"message": record.Body().AsString(),
"severity": record.SeverityText(),
}

bulkPayload = append(bulkPayload, logEntry)
}
}
}

return bulkPayload, nil
}
func (e *sematextLogsExporter) Start(ctx context.Context, host component.Host) error {
e.logger.Info("Starting Sematext Logs Exporter...")
return nil
}

func (e *sematextLogsExporter) Shutdown(ctx context.Context) error {
e.logger.Info("Shutting down Sematext Logs Exporter...")
return nil
}
44 changes: 41 additions & 3 deletions exporter/sematextexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,32 @@ import (
"time"

"github.com/influxdata/influxdb-observability/otel2influx"
"sync/atomic"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/influxdata/influxdb-observability/common"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sematextexporter/internal/metadata"
)

const appToken string = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
// NewFactory creates a factory for the Sematext metrics exporter.
func NewFactory() exporter.Factory {
return exporter.NewFactory(
metadata.Type,
createDefaultConfig,
exporter.WithMetrics(createMetricsExporter, metadata.MetricsStability),
exporter.WithLogs(createLogsExporter, metadata.LogsStability),
)
}

func createDefaultConfig() component.Config {
return &Config{

cfg := &Config{
ClientConfig: confighttp.ClientConfig{
Timeout: 5 * time.Second,
Headers: map[string]configopaque.String{
Expand All @@ -46,10 +49,19 @@ func createDefaultConfig() component.Config {
QueueSettings: exporterhelper.NewDefaultQueueConfig(),
PayloadMaxLines: 1_000,
PayloadMaxBytes: 300_000,
},
},
LogsConfig: LogsConfig{
AppToken: appToken,
LogRequests:true,
LogMaxAge: 2,
LogMaxSize: 10,
WriteEvents: atomic.Bool{},
},
BackOffConfig: configretry.NewDefaultBackOffConfig(),
Region: "custom",
}
cfg.LogsConfig.WriteEvents.Store(false)
return cfg
}

func createMetricsExporter(
Expand Down Expand Up @@ -91,3 +103,29 @@ func createMetricsExporter(
exporterhelper.WithStart(writer.Start),
)
}
// createLogsExporter creates a new logs exporter for Sematext.
func createLogsExporter(
ctx context.Context,
set exporter.Settings,
cfg component.Config,
) (exporter.Logs, error) {
cf := cfg.(*Config)

// Log the creation of the exporter
set.Logger.Info("Creating Sematext Logs Exporter")

// Create the Sematext logs exporter
exporter := NewExporter(cf, set)

// Wrap the exporter with OpenTelemetry helper functions
return exporterhelper.NewLogsExporter(
ctx,
set,
cfg,
exporter.pushLogsData, // Function to process and send logs
exporterhelper.WithQueue(cf.MetricsConfig.QueueSettings), // Optional queue settings
exporterhelper.WithRetry(cf.BackOffConfig), // Optional retry settings
exporterhelper.WithStart(exporter.Start), // Lifecycle start function
exporterhelper.WithShutdown(exporter.Shutdown), // Lifecycle shutdown function
)
}
7 changes: 7 additions & 0 deletions exporter/sematextexporter/generated_component_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions exporter/sematextexporter/generated_package_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion exporter/sematextexporter/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ type: sematext
status:
class: exporter
stability:
development: [metrics]
development: [metrics,logs]
distributions: [contrib]
codeowners:
active: [Eromosele-SM]
Expand Down
12 changes: 6 additions & 6 deletions exporter/sematextexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ sematext/override-config:
max_interval: 3s
max_elapsed_time: 10s
metrics:
app_token: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
app_token: 5cf92744-fefb-4f4f-9d6b-63eb194690f8
sending_queue:
enabled: true
num_consumers: 3
queue_size: 10
payload_max_lines: 72
payload_max_bytes: 27
logs:
app_token: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
flush:
bytes: 10485760
sending_queue:
enabled: true
app_token: 7f7ff081-fcb7-4172-9942-eff5295fa2f6
logs_requests: true
logs_max_age : 2
logs_max_backups: 10




Expand Down

0 comments on commit ff00870

Please sign in to comment.