Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SC-20509 #4

Open
wants to merge 54 commits into
base: add-sematext-exporter
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
7098a62
Start working on structure for logs
AkhigbeEromo Nov 19, 2024
2160a71
Work on logs config
AkhigbeEromo Nov 22, 2024
c0ca7cb
Removed LogStash settings
AkhigbeEromo Nov 22, 2024
9179c5e
This is what our logs config will look like
AkhigbeEromo Nov 22, 2024
915f691
Add queue settings to log config
AkhigbeEromo Nov 22, 2024
3d8390a
Correct Metrics config
AkhigbeEromo Nov 22, 2024
cf6f43e
remove unnecessary things
AkhigbeEromo Nov 22, 2024
e86c09f
complete Logs Endpoint
AkhigbeEromo Nov 22, 2024
02d5643
A lot of experimentation (Still in progress) is being done, so code i…
AkhigbeEromo Nov 27, 2024
a94b20b
Fixed some typos
AkhigbeEromo Nov 28, 2024
ff00870
Another heavy commit
AkhigbeEromo Nov 29, 2024
a8b67a6
First test failed so i had to do some tinkering and refining and put …
AkhigbeEromo Dec 2, 2024
17c525b
Add hostname tag
AkhigbeEromo Dec 3, 2024
e0b95cd
Clean up code
AkhigbeEromo Dec 3, 2024
5cfc6e1
Work on config.go test
AkhigbeEromo Dec 4, 2024
9ad149b
Add tests for logs support part of writer.go
AkhigbeEromo Dec 4, 2024
e999c1f
Write tests for falt_formatter.go
AkhigbeEromo Dec 4, 2024
5303ef0
Fix lint issues
AkhigbeEromo Dec 4, 2024
14d74d2
Fix lint issues
AkhigbeEromo Dec 5, 2024
d3bb694
Remove unnecessary alias
AkhigbeEromo Dec 5, 2024
b31b205
Add mapstructure for write events
AkhigbeEromo Dec 5, 2024
886b1ac
Fix otel issues
AkhigbeEromo Dec 5, 2024
17e4f25
Fix more lint issues
AkhigbeEromo Dec 5, 2024
8260265
Fix CI issues
AkhigbeEromo Dec 5, 2024
75725c0
App token is not being passed as ._index
AkhigbeEromo Dec 6, 2024
4513518
Fix CI issues, removed "custom" from being an option in the regions
AkhigbeEromo Dec 9, 2024
27ceed9
Fix CI issues
AkhigbeEromo Dec 9, 2024
9bd25b1
Fix lint issues
AkhigbeEromo Dec 9, 2024
5035763
Fix CI issues
AkhigbeEromo Dec 9, 2024
ef17936
Update gendistributions
AkhigbeEromo Dec 9, 2024
8bdc051
Remove unnecessary error return value
AkhigbeEromo Dec 9, 2024
11ba415
Make anchor links lowercase
AkhigbeEromo Dec 9, 2024
91c7494
Fix anchor links
AkhigbeEromo Dec 9, 2024
8be0231
Fix anchor link
AkhigbeEromo Dec 9, 2024
cde2d8f
remove links
AkhigbeEromo Dec 9, 2024
ddd7911
Write more unit tests
AkhigbeEromo Dec 10, 2024
7b06688
Remove unused prameter
AkhigbeEromo Dec 10, 2024
5978836
Fix lint issues
AkhigbeEromo Dec 12, 2024
acfa996
Implement fix that Akshat suggested
AkhigbeEromo Dec 18, 2024
7bc9caf
Correct endpoints
AkhigbeEromo Dec 18, 2024
d4a6453
Fix lint issues
AkhigbeEromo Dec 18, 2024
1fbb2bc
Remove getHostName
AkhigbeEromo Dec 19, 2024
b0c1208
Remove deprecated library
AkhigbeEromo Dec 23, 2024
f087bd5
Replace logrus with zap
AkhigbeEromo Dec 23, 2024
748519a
Fix lint issues
AkhigbeEromo Dec 23, 2024
8c698d3
Add more tests
AkhigbeEromo Dec 24, 2024
de4f331
Remove logrus
AkhigbeEromo Dec 27, 2024
63406b6
Handle errors
AkhigbeEromo Dec 27, 2024
814b006
Fix issue with writer
AkhigbeEromo Dec 27, 2024
032d36e
Remove flatwriter
AkhigbeEromo Dec 27, 2024
3f84e20
Fix lint issues
AkhigbeEromo Dec 27, 2024
5e33287
Remove unnecessary things from exporter
AkhigbeEromo Dec 27, 2024
bad6f1a
Trim Noisy Tags
AkhigbeEromo Jan 8, 2025
4d96cfa
Fix lint issues
AkhigbeEromo Jan 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove deprecated library
  • Loading branch information
AkhigbeEromo committed Dec 23, 2024
commit b0c1208835fdb9e749b1d3321cd70fcdc99ed23c
98 changes: 51 additions & 47 deletions exporter/sematextexporter/es.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ import (
"reflect"
"strings"
"time"
"bytes"
"log"

json "github.com/json-iterator/go"
"github.com/olivere/elastic/v7"
"github.com/elastic/go-elasticsearch"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/sirupsen/logrus"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"github.com/sirupsen/logrus" is in maintenance mode. Lets use something that is being used by other exporters as well like "go.uber.org/zap". See them for the appropriate usage.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"golang.org/x/net/context"
)

// artificialDocType designates a syntenic doc type for ES documents
const artificialDocType = "_doc"

type group struct {
client *elastic.Client
client *elasticsearch.Client
token string
}

Expand All @@ -43,11 +45,12 @@ func newClient(config *Config, logger *logrus.Logger, writer FlatWriter) (Client

// client for shipping to logsene
if config.LogsConfig.AppToken != "" {
c, err := elastic.NewClient(elastic.SetURL(config.LogsEndpoint), elastic.SetSniff(false), elastic.SetHealthcheckTimeout(time.Second*2))
c, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{config.LogsEndpoint},
})
if err != nil {
return nil, err
}
defer c.Stop()
clients[config.LogsEndpoint] = group{
client: c,
token: config.LogsConfig.AppToken,
Expand All @@ -69,57 +72,58 @@ func newClient(config *Config, logger *logrus.Logger, writer FlatWriter) (Client

// Bulk processes a batch of documents and sends them to the specified LogsEndpoint.
func (c *client) Bulk(body any, config *Config) error {
if grp, ok := c.clients[config.LogsEndpoint]; ok {
bulkRequest := grp.client.Bulk()
if reflect.TypeOf(body).Kind() == reflect.Slice {
v := reflect.ValueOf(body)
for i := 0; i < v.Len(); i++ {
doc := v.Index(i).Interface()
if docMap, ok := doc.(map[string]any); ok {
docMap["os.host"] = c.hostname
}

req := elastic.NewBulkIndexRequest().
Index(grp.token).
Type(artificialDocType).
Doc(doc)
bulkRequest.Add(req)
grp, ok := c.clients[config.LogsEndpoint]
if !ok {
return fmt.Errorf("no client known for %s endpoint", config.LogsEndpoint)
}

var bulkBuffer bytes.Buffer

if reflect.TypeOf(body).Kind() == reflect.Slice {
v := reflect.ValueOf(body)
for i := 0; i < v.Len(); i++ {
doc := v.Index(i).Interface()
if docMap, ok := doc.(map[string]any); ok {
docMap["os.host"] = c.hostname
}
}

if bulkRequest.NumberOfActions() > 0 {
payloadBytes, err := json.Marshal(body)
if err != nil {
return fmt.Errorf("failed to serialize payload: %w", err)
meta := map[string]map[string]string{
"index": {"_index": grp.token},
}
metaBytes, _ := json.Marshal(meta)
docBytes, _ := json.Marshal(doc)

// Print or log the payload(Will delete this once everything is good)
fmt.Printf("Payload being sent to Sematext:\n%s\n", string(payloadBytes))
bulkBuffer.Write(metaBytes)
bulkBuffer.WriteByte('\n')
bulkBuffer.Write(docBytes)
bulkBuffer.WriteByte('\n')
}
}

if c.config.LogRequests {
c.logger.Infof("Sending bulk to %s", config.LogsEndpoint)
}
if bulkBuffer.Len() > 0 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
res, err := bulkRequest.Do(ctx)
if err != nil {
c.writePayload(string(payloadBytes), err.Error())
return err
}
if res.Errors {
for _, item := range res.Failed() {
if item.Error != nil {
c.logger.Errorf("Document %s failed to index: %s - %s", item.Id, item.Error.Type, item.Error.Reason)
}
}
}
req := esapi.BulkRequest{
Body: bytes.NewReader(bulkBuffer.Bytes()),
}

c.writePayload(string(payloadBytes), "200")
return nil
res, err := req.Do(ctx, grp.client)
if err != nil {
log.Printf("Bulk request failed: %v", err)
return err
}
defer res.Body.Close()

if res.IsError() {
log.Printf("Bulk request returned error: %s", res.String())
return fmt.Errorf("bulk request error: %s", res.String())
}

log.Printf("Bulk request successful: %s", res.String())
}
return fmt.Errorf("no client known for %s endpoint", config.LogsEndpoint)

return nil
}

// writePayload writes a formatted payload along with its status to the configured writer.
Expand Down
4 changes: 2 additions & 2 deletions exporter/sematextexporter/es_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"io"
"testing"

"github.com/olivere/elastic/v7"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -98,7 +98,7 @@ type mockElasticClient struct {
func (m *mockElasticClient) Stop() {
close(m.done)
}
func (m *mockElasticClient) Bulk() *elastic.BulkService {
func (m *mockElasticClient) Bulk() *esapi.Bulk {
m.BulkCalled = true
return nil
}
Expand Down
28 changes: 1 addition & 27 deletions exporter/sematextexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,35 +107,9 @@ func (e *sematextLogsExporter) Start(_ context.Context, _ component.Host) error
e.logger.Error("Sematext client is not initialized (nil)")
return fmt.Errorf("sematext client is not initialized")
}

// Assign the client and logger to the exporter
e.client = client
e.logger = logger

// Log a success message
e.logger.Info("Sematext Logs Exporter successfully started")
return nil
}

// Shutdown gracefully shuts down the Sematext Logs Exporter.
func (e *sematextLogsExporter) Shutdown(_ context.Context) error {
if e.logger == nil {
return fmt.Errorf("logger is not initialized")
}

e.logger.Info("Shutting down Sematext Logs Exporter...")

// Stop ElasticSearch client's background goroutines
if e.client != nil {
for endpoint, grp := range e.client.(*client).clients {
if grp.client != nil {
e.logger.Debugf("Stopping ElasticSearch client for endpoint: %s", endpoint)
grp.client.Stop() // Stop the ElasticSearch client's healthchecker goroutines
}
}
}

// Log completion of shutdown
e.logger.Info("Sematext Logs Exporter shutdown complete")
return nil
}
}
1 change: 0 additions & 1 deletion exporter/sematextexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,5 @@ func createLogsExporter(
exporter.pushLogsData, // Function to process and send logs
exporterhelper.WithRetry(cf.BackOffConfig),
exporterhelper.WithStart(exporter.Start),
exporterhelper.WithShutdown(exporter.Shutdown),
)
}
7 changes: 3 additions & 4 deletions exporter/sematextexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/exporter/semate
go 1.22.0

require (
github.com/elastic/go-elasticsearch v0.0.0
github.com/elastic/go-elasticsearch/v8 v8.17.0
github.com/influxdata/influxdb-observability/common v0.5.8
github.com/olivere/elastic/v7 v7.0.32
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5
go.opentelemetry.io/collector/config/confighttp v0.110.0
go.opentelemetry.io/collector/config/configopaque v1.16.0
Expand All @@ -15,15 +16,13 @@ require (
)

require (
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/go-viper/mapstructure/v2 v2.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
go.opentelemetry.io/collector/component/componentprofiles v0.110.0 // indirect
go.opentelemetry.io/collector/consumer/consumertest v0.110.0 // indirect
go.opentelemetry.io/collector/exporter/exporterprofiles v0.110.0 // indirect
Expand Down
16 changes: 6 additions & 10 deletions exporter/sematextexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.