Skip to content

Commit

Permalink
First test failed so i had to do some tinkering and refining and put …
Browse files Browse the repository at this point in the history
…some logging statements

Will clean code up once i am done
  • Loading branch information
AkhigbeEromo committed Dec 2, 2024
1 parent ff00870 commit a8b67a6
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 32 deletions.
4 changes: 2 additions & 2 deletions exporter/sematextexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ func (cfg *Config) Validate() error {
}
if strings.ToLower(cfg.Region) == "eu" {
cfg.MetricsEndpoint ="https://spm-receiver.eu.sematext.com"
cfg.LogsEndpoint ="logsene-receiver.eu.sematext.com/_bulk"
cfg.LogsEndpoint ="https://logsene-receiver.eu.sematext.com"
}
if strings.ToLower(cfg.Region) == "us"{
cfg.MetricsEndpoint ="https://spm-receiver.sematext.com"
cfg.LogsEndpoint = "logsene-receiver.sematext.com/_bulk"
cfg.LogsEndpoint = "https://logsene-receiver.sematext.com"
}

return nil
Expand Down
15 changes: 5 additions & 10 deletions exporter/sematextexporter/es.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ import (
json "github.com/json-iterator/go"
)

const (
// artificialDocType designates a syntenic doc type for ES documents
artificialDocType = "_doc"
)

type group struct {
client *elastic.Client
token string
Expand Down Expand Up @@ -49,7 +44,6 @@ func NewClient(config *Config, logger *logrus.Logger, writer FlatWriter) (Client
if err != nil {
return nil, err
}
// clients := map[string]group{}
clients[config.LogsEndpoint] = group{
client: c,
token: config.LogsConfig.AppToken,
Expand All @@ -74,9 +68,8 @@ func (c *client) Bulk(body interface{}, config *Config) error {
v := reflect.ValueOf(body)
for i := 0; i < v.Len(); i++ {
req := elastic.NewBulkIndexRequest().
Index(grp.token).
Type(artificialDocType).
Doc(v.Index(i).Interface())
Index(grp.token).
Doc(v.Index(i).Interface())
bulkRequest.Add(req)
}
}
Expand Down Expand Up @@ -125,7 +118,9 @@ func (c *client) Bulk(body interface{}, config *Config) error {
func (c *client) writePayload(payload string, status string) {
if c.config.WriteEvents.Load() {
c.writer.Write(Formatl(payload, status))
}
} else {
c.logger.Debugf("WriteEvents disabled. Payload: %s, Status: %s", payload, status)
}
}
// Formatl delimits and formats the response returned by receiver.
func Formatl(payload string, status string) string {
Expand Down
52 changes: 33 additions & 19 deletions exporter/sematextexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package sematextexporter

import (
"context"
// "fmt"
"fmt"
"time"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/component"
Expand All @@ -17,7 +17,7 @@ type sematextLogsExporter struct {
logger *logrus.Logger
}

func NewExporter(cfg *Config, set exporter.Settings) *sematextLogsExporter {
func newExporter(cfg *Config, set exporter.Settings) *sematextLogsExporter {
logger := logrus.New()
logger.SetFormatter(&FlatFormatter{})

Expand All @@ -37,22 +37,27 @@ func NewExporter(cfg *Config, set exporter.Settings) *sematextLogsExporter {


func (e *sematextLogsExporter) pushLogsData(ctx context.Context, logs plog.Logs) error {
// Convert logs to Bulk API payload
bulkPayload, err := convertLogsToBulkPayload(logs)
if err != nil {
e.logger.Errorf("Failed to convert logs: %v", err)
return err
}
// Convert logs to bulk payload
bulkPayload, err := convertLogsToBulkPayload(logs, e.config.LogsConfig.AppToken)
if err != nil {
e.logger.Errorf("Failed to convert logs: %v", err)
return err
}

// Send logs using the Sematext client
if err := e.client.Bulk(bulkPayload, e.config); err != nil {
e.logger.Errorf("Failed to send logs to Sematext: %v", err)
return err
}
// Debug: Print the bulk payload
for _, payload := range bulkPayload {
fmt.Printf("Bulk payload: %v\n", payload)
}

return nil
// Send the bulk payload to Sematext
if err := e.client.Bulk(bulkPayload, e.config); err != nil {
e.logger.Errorf("Failed to send logs to Sematext: %v", err)
return err
}

return nil
}
func convertLogsToBulkPayload(logs plog.Logs) ([]map[string]interface{}, error) {
func convertLogsToBulkPayload(logs plog.Logs, appToken string) ([]map[string]interface{}, error) {
var bulkPayload []map[string]interface{}

resourceLogs := logs.ResourceLogs()
Expand All @@ -65,13 +70,20 @@ func convertLogsToBulkPayload(logs plog.Logs) ([]map[string]interface{}, error)
for k := 0; k < logRecords.Len(); k++ {
record := logRecords.At(k)

// Add metadata for indexing
meta := map[string]interface{}{
"index": map[string]interface{}{
"_index": appToken,
},
}
bulkPayload = append(bulkPayload, meta)

// Build the log entry
logEntry := map[string]interface{}{
"@timestamp": record.Timestamp().AsTime().Format(time.RFC3339),
"message": record.Body().AsString(),
"severity": record.SeverityText(),
}

bulkPayload = append(bulkPayload, logEntry)
}
}
Expand All @@ -80,10 +92,12 @@ func convertLogsToBulkPayload(logs plog.Logs) ([]map[string]interface{}, error)
return bulkPayload, nil
}
func (e *sematextLogsExporter) Start(ctx context.Context, host component.Host) error {
e.logger.Info("Starting Sematext Logs Exporter...")
return nil
if e.client == nil {
return fmt.Errorf("sematext client is not initialized")
}
e.logger.Info("Starting Sematext Logs Exporter...")
return nil
}

func (e *sematextLogsExporter) Shutdown(ctx context.Context) error {
e.logger.Info("Shutting down Sematext Logs Exporter...")
return nil
Expand Down
2 changes: 1 addition & 1 deletion exporter/sematextexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func createLogsExporter(
set.Logger.Info("Creating Sematext Logs Exporter")

// Create the Sematext logs exporter
exporter := NewExporter(cf, set)
exporter := newExporter(cf, set)

// Wrap the exporter with OpenTelemetry helper functions
return exporterhelper.NewLogsExporter(
Expand Down

0 comments on commit a8b67a6

Please sign in to comment.