From 7e8f21bc693ed674e8e845c9c1f50ab2bdec2529 Mon Sep 17 00:00:00 2001 From: Chief-Rishab Date: Sun, 20 Aug 2023 08:11:36 +0530 Subject: [PATCH] refactor: consistent err handling for OpenTelemetry --- agent/agent.go | 4 +- cmd/run.go | 9 +-- metrics/opentelemetry_test.go | 6 +- metrics/otel_monitor.go | 34 +++++----- metrics/otelmw/processorsmw.go | 33 +++++----- metrics/otelmw/sinksmw.go | 35 +++++----- plugins/extractors/kafka/kafka.go | 104 +++++++++++++++++++----------- 7 files changed, 124 insertions(+), 101 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 03d2cf7b9..b685bfcae 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -236,7 +236,7 @@ func (r *Agent) setupProcessor(ctx context.Context, pr recipe.PluginRecipe, str return fmt.Errorf("find processor %q: %w", pr.Name, err) } - proc, err = otelmw.WithProcessorMW(proc, pr.Name, recipeName) + proc = otelmw.WithProcessor(pr.Name, recipeName)(proc) if err != nil { return fmt.Errorf("wrap processor %q: %w", pr.Name, err) } @@ -269,7 +269,7 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s return fmt.Errorf("find sink %q: %w", sr.Name, err) } - sink, err = otelmw.WithSinkMW(sink, sr.Name, recipeName) + sink = otelmw.WithSink(sr.Name, recipeName)(sink) if err != nil { return fmt.Errorf("wrap otel sink %q: %w", sr.Name, err) } diff --git a/cmd/run.go b/cmd/run.go index 1e6c0b889..37f16177a 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -88,14 +88,7 @@ func RunCmd() *cobra.Command { } defer doneOtlp() - mt, err := metrics.NewOtelMonitor() - if err != nil { - return err - } - - if mt != nil { - mts = append(mts, mt) - } + mts = append(mts, metrics.NewOtelMonitor()) } runner := agent.NewAgent(agent.Config{ diff --git a/metrics/opentelemetry_test.go b/metrics/opentelemetry_test.go index 5e67e742c..4c7c0ac1f 100644 --- a/metrics/opentelemetry_test.go +++ b/metrics/opentelemetry_test.go @@ -32,11 +32,10 @@ func TestOtelMonitor_RecordRun(t *testing.T) { defer done() assert.Nil(t, err) - monitor, err := metrics.NewOtelMonitor() + monitor := metrics.NewOtelMonitor() monitor.RecordRun(ctx, agent.Run{Recipe: recipe, DurationInMs: duration, RecordCount: recordCount, Success: false}) - assert.Nil(t, err) assert.NotNil(t, monitor) assert.NotNil(t, done) }) @@ -62,7 +61,7 @@ func TestOtelMonitor_RecordPlugin(t *testing.T) { defer done() assert.Nil(t, err) - monitor, err := metrics.NewOtelMonitor() + monitor := metrics.NewOtelMonitor() monitor.RecordPlugin(context.Background(), agent.PluginInfo{ @@ -71,7 +70,6 @@ func TestOtelMonitor_RecordPlugin(t *testing.T) { PluginType: "sink", Success: true, }) - assert.Nil(t, err) assert.NotNil(t, monitor) assert.NotNil(t, done) }) diff --git a/metrics/otel_monitor.go b/metrics/otel_monitor.go index d9d8fb198..7a88449f3 100644 --- a/metrics/otel_monitor.go +++ b/metrics/otel_monitor.go @@ -11,47 +11,39 @@ import ( // OtelMonitor represents the otel monitor. type OtelMonitor struct { - recipeDuration metric.Int64Histogram + recipeDuration metric.Float64Histogram extractorRetries metric.Int64Counter assetsExtracted metric.Int64Counter sinkRetries metric.Int64Counter } -func NewOtelMonitor() (*OtelMonitor, error) { +func NewOtelMonitor() *OtelMonitor { // init meters - meter := otel.Meter("") - recipeDuration, err := meter.Int64Histogram("meteor.recipe.duration", metric.WithUnit("ms")) - if err != nil { - return nil, err - } + meter := otel.Meter("github.com/raystack/meteor/metrics") + recipeDuration, err := meter.Float64Histogram("meteor.recipe.duration", metric.WithUnit("s")) + handleOtelErr(err) extractorRetries, err := meter.Int64Counter("meteor.extractor.retries") - if err != nil { - return nil, err - } + handleOtelErr(err) assetsExtracted, err := meter.Int64Counter("meteor.assets.extracted") - if err != nil { - return nil, err - } + handleOtelErr(err) sinkRetries, err := meter.Int64Counter("meteor.sink.retries") - if err != nil { - return nil, err - } + handleOtelErr(err) return &OtelMonitor{ recipeDuration: recipeDuration, extractorRetries: extractorRetries, assetsExtracted: assetsExtracted, sinkRetries: sinkRetries, - }, nil + } } // RecordRun records a run behavior func (m *OtelMonitor) RecordRun(ctx context.Context, run agent.Run) { m.recipeDuration.Record(ctx, - int64(run.DurationInMs), + float64(run.DurationInMs)/1000.0, metric.WithAttributes( attribute.String("recipe_name", run.Recipe.Name), attribute.String("extractor", run.Recipe.Source.Name), @@ -89,3 +81,9 @@ func (m *OtelMonitor) RecordSinkRetryCount(ctx context.Context, pluginInfo agent attribute.Int64("batch_size", int64(pluginInfo.BatchSize)), )) } + +func handleOtelErr(err error) { + if err != nil { + otel.Handle(err) + } +} diff --git a/metrics/otelmw/processorsmw.go b/metrics/otelmw/processorsmw.go index bb21cc153..1e78f1f2a 100644 --- a/metrics/otelmw/processorsmw.go +++ b/metrics/otelmw/processorsmw.go @@ -11,42 +11,43 @@ import ( "go.opentelemetry.io/otel/metric" ) -type ProcessorMW struct { +type Processor struct { next plugins.Processor duration metric.Int64Histogram pluginName string recipeName string } -func WithProcessorMW(p plugins.Processor, pluginName, recipeName string) (plugins.Processor, error) { - meter := otel.Meter("") - - processorDuration, err := meter.Int64Histogram("meteor.processor.duration", metric.WithUnit("ms")) +func WithProcessor(pluginName, recipeName string) func(plugins.Processor) plugins.Processor { + processorDuration, err := otel.Meter("github.com/raystack/meteor/metrics/otelmw"). + Int64Histogram("meteor.processor.duration", metric.WithUnit("ms")) if err != nil { - return nil, err + otel.Handle(err) } - return &ProcessorMW{ - next: p, - duration: processorDuration, - pluginName: pluginName, - recipeName: recipeName, - }, nil + return func(p plugins.Processor) plugins.Processor { + return &Processor{ + next: p, + duration: processorDuration, + pluginName: pluginName, + recipeName: recipeName, + } + } } -func (mw *ProcessorMW) Init(ctx context.Context, cfg plugins.Config) error { +func (mw *Processor) Init(ctx context.Context, cfg plugins.Config) error { return mw.next.Init(ctx, cfg) } -func (mw *ProcessorMW) Info() plugins.Info { +func (mw *Processor) Info() plugins.Info { return mw.next.Info() } -func (mw *ProcessorMW) Validate(cfg plugins.Config) error { +func (mw *Processor) Validate(cfg plugins.Config) error { return mw.next.Validate(cfg) } -func (mw *ProcessorMW) Process(ctx context.Context, src models.Record) (dst models.Record, err error) { +func (mw *Processor) Process(ctx context.Context, src models.Record) (dst models.Record, err error) { defer func(start time.Time) { mw.duration.Record(ctx, time.Since(start).Milliseconds(), diff --git a/metrics/otelmw/sinksmw.go b/metrics/otelmw/sinksmw.go index 2ebffad86..601996f10 100644 --- a/metrics/otelmw/sinksmw.go +++ b/metrics/otelmw/sinksmw.go @@ -11,46 +11,47 @@ import ( "go.opentelemetry.io/otel/metric" ) -type SinksMW struct { +type Sinks struct { next plugins.Syncer duration metric.Int64Histogram pluginName string recipeName string } -func WithSinkMW(s plugins.Syncer, pluginName, recipeName string) (plugins.Syncer, error) { - meter := otel.Meter("") - - sinkDuration, err := meter.Int64Histogram("meteor.sink.duration", metric.WithUnit("ms")) +func WithSink(pluginName, recipeName string) func(plugins.Syncer) plugins.Syncer { + sinkDuration, err := otel.Meter("github.com/raystack/meteor/metrics/otelmw"). + Int64Histogram("meteor.sink.duration", metric.WithUnit("ms")) if err != nil { - return nil, err + otel.Handle(err) } - return &SinksMW{ - next: s, - duration: sinkDuration, - pluginName: pluginName, - recipeName: recipeName, - }, nil + return func(s plugins.Syncer) plugins.Syncer { + return &Sinks{ + next: s, + duration: sinkDuration, + pluginName: pluginName, + recipeName: recipeName, + } + } } -func (mw *SinksMW) Init(ctx context.Context, cfg plugins.Config) error { +func (mw *Sinks) Init(ctx context.Context, cfg plugins.Config) error { return mw.next.Init(ctx, cfg) } -func (mw *SinksMW) Info() plugins.Info { +func (mw *Sinks) Info() plugins.Info { return mw.next.Info() } -func (mw *SinksMW) Validate(cfg plugins.Config) error { +func (mw *Sinks) Validate(cfg plugins.Config) error { return mw.next.Validate(cfg) } -func (mw *SinksMW) Close() error { +func (mw *Sinks) Close() error { return mw.next.Close() } -func (mw *SinksMW) Sink(ctx context.Context, batch []models.Record) (err error) { +func (mw *Sinks) Sink(ctx context.Context, batch []models.Record) (err error) { defer func(start time.Time) { mw.duration.Record(ctx, time.Since(start).Milliseconds(), diff --git a/plugins/extractors/kafka/kafka.go b/plugins/extractors/kafka/kafka.go index 242edcb72..8c87faa02 100644 --- a/plugins/extractors/kafka/kafka.go +++ b/plugins/extractors/kafka/kafka.go @@ -5,29 +5,32 @@ import ( "crypto/tls" "crypto/x509" _ "embed" // used to print the embedded assets + "errors" + "fmt" "os" + "strings" "time" - "github.com/pkg/errors" - "google.golang.org/protobuf/types/known/anypb" - "google.golang.org/protobuf/types/known/structpb" - "github.com/raystack/meteor/models" v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2" "github.com/raystack/meteor/plugins" "github.com/raystack/meteor/registry" - "github.com/segmentio/kafka-go" - "github.com/raystack/salt/log" + "github.com/segmentio/kafka-go" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/structpb" ) //go:embed README.md var summary string // default topics map to skip -var defaultTopics = map[string]byte{ - "__consumer_offsets": 0, - "_schemas": 0, +var defaultTopics = map[string]struct{}{ + "__consumer_offsets": {}, + "_schemas": {}, } // Config holds the set of configuration for the kafka extractor @@ -71,9 +74,10 @@ var info = plugins.Info{ type Extractor struct { plugins.BaseExtractor // internal states - conn *kafka.Conn - logger log.Logger - config Config + conn *kafka.Conn + logger log.Logger + config Config + clientDurn metric.Int64Histogram } // New returns a pointer to an initialized Extractor Object @@ -87,8 +91,16 @@ func New(logger log.Logger) *Extractor { } // Init initializes the extractor -func (e *Extractor) Init(ctx context.Context, config plugins.Config) (err error) { - if err = e.BaseExtractor.Init(ctx, config); err != nil { +func (e *Extractor) Init(ctx context.Context, config plugins.Config) error { + clientDurn, err := otel.Meter("github.com/raystack/meteor/plugins/extractors/kafka"). + Int64Histogram("meteor.kafka.client.duration", metric.WithUnit("ms")) + if err != nil { + otel.Handle(err) + } + + e.clientDurn = clientDurn + + if err := e.BaseExtractor.Init(ctx, config); err != nil { return err } @@ -101,7 +113,7 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) (err error) if e.config.Auth.TLS.Enabled { tlsConfig, err := e.createTLSConfig() if err != nil { - return errors.Wrap(err, "failed to create tls config") + return fmt.Errorf("create tls config: %w", err) } dialer.TLS = tlsConfig @@ -110,30 +122,25 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) (err error) // create connection e.conn, err = dialer.DialContext(ctx, "tcp", e.config.Broker) if err != nil { - return errors.Wrap(err, "failed to create connection") + return fmt.Errorf("create connection: %w", err) } - return + return nil } // Extract checks if the extractor is ready to extract // if so, then extracts metadata from the kafka broker -func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) { +func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error { defer e.conn.Close() - partitions, err := e.conn.ReadPartitions() + partitions, err := e.readPartitions(ctx) if err != nil { - return errors.Wrap(err, "failed to fetch partitions") + return fmt.Errorf("fetch partitions: %w", err) } // collect topic list from partition list topics := map[string]int{} for _, p := range partitions { - _, ok := topics[p.Topic] - if !ok { - topics[p.Topic] = 0 - } - topics[p.Topic]++ } @@ -144,22 +151,23 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) if isDefaultTopic { continue } + asset, err := e.buildAsset(topic, numOfPartitions) if err != nil { e.logger.Error("failed to build asset", "err", err, "topic", topic) continue } - record := models.NewRecord(asset) - emit(record) + emit(models.NewRecord(asset)) } - return + return nil } -func (e *Extractor) createTLSConfig() (tlsConfig *tls.Config, err error) { +func (e *Extractor) createTLSConfig() (*tls.Config, error) { authConfig := e.config.Auth.TLS if authConfig.CertFile == "" || authConfig.KeyFile == "" || authConfig.CAFile == "" { + //nolint:gosec return &tls.Config{ InsecureSkipVerify: e.config.Auth.TLS.InsecureSkipVerify, }, nil @@ -167,28 +175,27 @@ func (e *Extractor) createTLSConfig() (tlsConfig *tls.Config, err error) { cert, err := tls.LoadX509KeyPair(authConfig.CertFile, authConfig.KeyFile) if err != nil { - return nil, errors.Wrap(err, "unable to create cert") + return nil, fmt.Errorf("create cert: %w", err) } caCert, err := os.ReadFile(authConfig.CAFile) if err != nil { - return nil, errors.Wrap(err, "unable to read ca cert file") + return nil, fmt.Errorf("read ca cert file: %w", err) } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) - tlsConfig = &tls.Config{ + //nolint:gosec + return &tls.Config{ Certificates: []tls.Certificate{cert}, RootCAs: caCertPool, InsecureSkipVerify: e.config.Auth.TLS.InsecureSkipVerify, - } - - return tlsConfig, nil + }, nil } // Build topic metadata model using a topic and number of partitions -func (e *Extractor) buildAsset(topicName string, numOfPartitions int) (asset *v1beta2.Asset, err error) { +func (e *Extractor) buildAsset(topicName string, numOfPartitions int) (*v1beta2.Asset, error) { topic, err := anypb.New(&v1beta2.Topic{ Profile: &v1beta2.TopicProfile{ NumberOfPartitions: int64(numOfPartitions), @@ -208,6 +215,31 @@ func (e *Extractor) buildAsset(topicName string, numOfPartitions int) (asset *v1 }, nil } +func (e *Extractor) readPartitions(ctx context.Context) (partitions []kafka.Partition, err error) { + defer func(start time.Time) { + attributes := []attribute.KeyValue{ + attribute.String("kafka.broker", e.config.Broker), + attribute.Bool("success", err == nil), + } + if err != nil { + errorCode := "UNKNOWN" + var kErr kafka.Error + if errors.As(err, &kErr) { + errorCode = strings.ReplaceAll( + strings.ToUpper(kErr.Title()), " ", "_", + ) + } + attributes = append(attributes, attribute.String("kafka.error_code", errorCode)) + } + + e.clientDurn.Record( + ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attributes...), + ) + }(time.Now()) + + return e.conn.ReadPartitions() +} + func init() { if err := registry.Extractors.Register("kafka", func() plugins.Extractor { return New(plugins.GetLog())