diff --git a/agent/agent.go b/agent/agent.go index f7f9ac1e9..6380aa1e2 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -14,9 +14,7 @@ import ( "github.com/pkg/errors" ) -const ( - defaultBatchSize = 1 -) +const defaultBatchSize = 1 // Agent runs recipes for specified plugins. type Agent struct { @@ -110,20 +108,20 @@ func (r *Agent) Run(recipe recipe.Recipe) (run Run) { ) runExtractor, err := r.setupExtractor(ctx, recipe.Source, stream) if err != nil { - run.Error = err + run.Error = errors.Wrap(err, "failed to setup extractor") return } for _, pr := range recipe.Processors { if err := r.setupProcessor(ctx, pr, stream); err != nil { - run.Error = err + run.Error = errors.Wrap(err, "failed to setup processor") return } } for _, sr := range recipe.Sinks { if err := r.setupSink(ctx, sr, stream); err != nil { - run.Error = err + run.Error = errors.Wrap(err, "failed to setup sink") return } } @@ -145,14 +143,14 @@ func (r *Agent) Run(recipe recipe.Recipe) (run Run) { }() err = runExtractor() if err != nil { - run.Error = err + run.Error = errors.Wrap(err, "failed to run extractor") } }() // start listening. // this process is blocking if err := stream.broadcast(); err != nil { - run.Error = err + run.Error = errors.Wrap(err, "failed to stream") } // code will reach here stream.Listen() is done. @@ -178,15 +176,13 @@ func (r *Agent) setupExtractor(ctx context.Context, sr recipe.SourceRecipe, str err = errors.Wrapf(err, "could not find extractor \"%s\"", sr.Type) return } - err = extractor.Init(ctx, sr.Config) - if err != nil { + if err = extractor.Init(ctx, sr.Config); err != nil { err = errors.Wrapf(err, "could not initiate extractor \"%s\"", sr.Type) return } runFn = func() (err error) { - err = extractor.Extract(ctx, str.push) - if err != nil { + if err = extractor.Extract(ctx, str.push); err != nil { err = errors.Wrapf(err, "error running extractor \"%s\"", sr.Type) } @@ -197,15 +193,11 @@ func (r *Agent) setupExtractor(ctx context.Context, sr recipe.SourceRecipe, str func (r *Agent) setupProcessor(ctx context.Context, pr recipe.ProcessorRecipe, str *stream) (err error) { var proc plugins.Processor - proc, err = r.processorFactory.Get(pr.Name) - if err != nil { - err = errors.Wrapf(err, "could not find processor \"%s\"", pr.Name) - return + if proc, err = r.processorFactory.Get(pr.Name); err != nil { + return errors.Wrapf(err, "could not find processor \"%s\"", pr.Name) } - err = proc.Init(ctx, pr.Config) - if err != nil { - err = errors.Wrapf(err, "could not initiate processor \"%s\"", pr.Name) - return + if err = proc.Init(ctx, pr.Config); err != nil { + return errors.Wrapf(err, "could not initiate processor \"%s\"", pr.Name) } str.setMiddleware(func(src models.Record) (dst models.Record, err error) { @@ -223,22 +215,16 @@ func (r *Agent) setupProcessor(ctx context.Context, pr recipe.ProcessorRecipe, s func (r *Agent) setupSink(ctx context.Context, sr recipe.SinkRecipe, stream *stream) (err error) { var sink plugins.Syncer - sink, err = r.sinkFactory.Get(sr.Name) - if err != nil { - err = errors.Wrapf(err, "could not find sink \"%s\"", sr.Name) - return + if sink, err = r.sinkFactory.Get(sr.Name); err != nil { + return errors.Wrapf(err, "could not find sink \"%s\"", sr.Name) } - err = sink.Init(ctx, sr.Config) - if err != nil { - err = errors.Wrapf(err, "could not initiate sink \"%s\"", sr.Name) - return + if err = sink.Init(ctx, sr.Config); err != nil { + return errors.Wrapf(err, "could not initiate sink \"%s\"", sr.Name) } stream.subscribe(func(records []models.Record) (err error) { - err = sink.Sink(ctx, records) - if err != nil { - err = errors.Wrapf(err, "error running sink \"%s\"", sr.Name) - return + if err = sink.Sink(ctx, records); err != nil { + return errors.Wrapf(err, "error running sink \"%s\"", sr.Name) } return diff --git a/agent/batch.go b/agent/batch.go index f771acc38..c3d61bc7e 100644 --- a/agent/batch.go +++ b/agent/batch.go @@ -6,26 +6,30 @@ import ( "github.com/odpf/meteor/models" ) +// batch type batch struct { data []models.Record capacity int } +// newBatch returns a new batch func newBatch(capacity int) *batch { return &batch{ capacity: capacity, } } +// add appends a record to the batch func (b *batch) add(d models.Record) error { if b.isFull() { - return errors.New("batch: cannot add, batch is full!") + return errors.New("batch: cannot add, batch is full") } b.data = append(b.data, d) return nil } +// flush removes all records from the batch func (b *batch) flush() []models.Record { data := b.data b.data = []models.Record{} @@ -33,6 +37,7 @@ func (b *batch) flush() []models.Record { return data } +// isFull returns true if the batch is full func (b *batch) isFull() bool { // size 0 means there is no limit, hence will not ever be full if b.capacity == 0 { @@ -42,6 +47,7 @@ func (b *batch) isFull() bool { return len(b.data) >= b.capacity } +// isEmpty returns true if the batch is empty func (b *batch) isEmpty() bool { return len(b.data) == 0 } diff --git a/agent/run.go b/agent/run.go index d539d994f..83f91b796 100644 --- a/agent/run.go +++ b/agent/run.go @@ -2,7 +2,7 @@ package agent import "github.com/odpf/meteor/recipe" -// TaskType is the type of a task +// TaskType is the type of task type TaskType string const ( diff --git a/agent/stream.go b/agent/stream.go index 63c56a843..dc3b04dd7 100644 --- a/agent/stream.go +++ b/agent/stream.go @@ -55,7 +55,9 @@ func (s *stream) broadcast() error { batch := newBatch(l.batchSize) // listen to channel and emit data to subscriber callback if batch is full for d := range l.channel { - batch.add(d) + if err := batch.add(d); err != nil { + s.closeWithError(err) + } if batch.isFull() { if err := l.callback(batch.flush()); err != nil { s.closeWithError(err) @@ -78,7 +80,7 @@ func (s *stream) broadcast() error { } // push() will run the record through all the registered middleware -// and emit the record to all registerd subscribers. +// and emit the record to all registered subscribers. func (s *stream) push(data models.Record) { data, err := s.runMiddlewares(data) if err != nil { diff --git a/generator/recipe.go b/generator/recipe.go index f3d893e8c..c48150a6d 100644 --- a/generator/recipe.go +++ b/generator/recipe.go @@ -2,6 +2,7 @@ package generator import ( "embed" + "github.com/pkg/errors" "os" "strings" "text/template" @@ -32,45 +33,39 @@ func Recipe(name string, source string, sinks []string, processors []string) (er if source != "" { tem.Source = make(map[string]string) - sinfo, err := registry.Extractors.Info(source) + sourceInfo, err := registry.Extractors.Info(source) if err != nil { - return err + return errors.Wrap(err, "failed to provide extractor information") } - tem.Source[source] = sinfo.SampleConfig + tem.Source[source] = indent.String(sourceInfo.SampleConfig, size) } if len(sinks) > 0 { tem.Sinks = make(map[string]string) for _, sink := range sinks { info, err := registry.Sinks.Info(sink) if err != nil { - return err + return errors.Wrap(err, "failed to provide sink information") } tem.Sinks[sink] = info.SampleConfig } } - if len(processors) > 0 { tem.Processors = make(map[string]string) for _, procc := range processors { info, err := registry.Processors.Info(procc) if err != nil { - return err + return errors.Wrap(err, "failed to provide processor information") } tem.Processors[procc] = info.SampleConfig } } - tmpl := template.Must( - template.New("recipe.yaml").Funcs(templateFuncs).ParseFS(file, "*"), - ) - + tmpl, err := template.ParseFS(file, "*") if err != nil { - return err + return errors.Wrap(err, "failed to parse file") } - - err = tmpl.Execute(os.Stdout, tem) - if err != nil { - return err + if err = tmpl.Execute(os.Stdout, tem); err != nil { + return errors.Wrap(err, "failed to execute template") } return nil } diff --git a/main.go b/main.go index d24a8f243..3f8043350 100644 --- a/main.go +++ b/main.go @@ -48,7 +48,7 @@ func main() { command := cmd.New(lg, monitor) if err := command.Execute(); err != nil { - if strings.HasPrefix(err.Error(), "unknown command ") { + if strings.HasPrefix(err.Error(), "unknown command") { if !strings.HasSuffix(err.Error(), "\n") { fmt.Println() } diff --git a/metrics/statsd.go b/metrics/statsd.go index b91367880..4bc784978 100644 --- a/metrics/statsd.go +++ b/metrics/statsd.go @@ -2,6 +2,7 @@ package metrics import ( "fmt" + "github.com/pkg/errors" "net" "strconv" @@ -16,7 +17,7 @@ var ( runMetricName = "run" ) -// StatsdMonitor reprsents the statsd monitor. +// StatsdMonitor represents the statsd monitor. type StatsdMonitor struct { client statsdClient prefix string @@ -72,10 +73,12 @@ type statsdClient interface { func NewStatsdClient(statsdAddress string) (c *statsd.StatsdClient, err error) { statsdHost, statsdPortStr, err := net.SplitHostPort(statsdAddress) if err != nil { + err = errors.Wrap(err, "failed to split the network address") return } statsdPort, err := strconv.Atoi(statsdPortStr) if err != nil { + err = errors.Wrap(err, "failed to convert port type") return } c = statsd.New(statsdHost, statsdPort) diff --git a/models/metadata.go b/models/metadata.go index 3bda3ea7e..291168e14 100644 --- a/models/metadata.go +++ b/models/metadata.go @@ -5,6 +5,7 @@ import ( "github.com/odpf/meteor/models/odpf/assets/facets" ) +// Metadata is a wrapper for the meta type Metadata interface { GetResource() *common.Resource GetProperties() *facets.Properties diff --git a/models/record.go b/models/record.go index 801b6f0d2..4ad42da88 100644 --- a/models/record.go +++ b/models/record.go @@ -1,13 +1,16 @@ package models +// Record represents the metadata of a record type Record struct { data Metadata } +// NewRecord creates a new record func NewRecord(data Metadata) Record { return Record{data: data} } +// Data returns the record data func (r Record) Data() Metadata { return r.data } diff --git a/plugins/external/plugin.go b/plugins/external/plugin.go index 7992ee9e1..3f0f28848 100644 --- a/plugins/external/plugin.go +++ b/plugins/external/plugin.go @@ -1,7 +1,7 @@ package plugins import ( - "errors" + "github.com/pkg/errors" "os/exec" "github.com/hashicorp/go-hclog" @@ -48,11 +48,13 @@ func dispense(client *plugin.Client) (processor Processor, err error) { // Connect via RPC rpcClient, err := client.Client() if err != nil { + err = errors.Wrap(err, "failed to connect client") return } // Request the plugin raw, err := rpcClient.Dispense(processorPluginKey) if err != nil { + err = errors.Wrap(err, "failed to dispense a new instance of the plugin") return } diff --git a/plugins/extractors/bigquery/bigquery.go b/plugins/extractors/bigquery/bigquery.go index ed2c1862b..eb15bc188 100644 --- a/plugins/extractors/bigquery/bigquery.go +++ b/plugins/extractors/bigquery/bigquery.go @@ -81,7 +81,7 @@ func (e *Extractor) Validate(configMap map[string]interface{}) (err error) { return utils.BuildConfig(configMap, &Config{}) } -// Extract checks if the table is valid and extracts the table schema +// Init initializes the extractor func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) (err error) { err = utils.BuildConfig(configMap, &e.config) if err != nil { @@ -90,7 +90,7 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) e.client, err = e.createClient(ctx) if err != nil { - return errors.Wrap(err, "error creating client") + return errors.Wrap(err, "failed to create client") } return @@ -106,7 +106,7 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) break } if err != nil { - return errors.Wrapf(err, "failed to fetch dataset") + return errors.Wrap(err, "failed to fetch dataset") } e.extractTable(ctx, ds, emit) } @@ -133,13 +133,13 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit break } if err != nil { - e.logger.Error("failed to scan, skipping table", "err", err) + e.logger.Error("failed to get table, skipping table", "err", err) continue } e.logger.Debug("extracting table", "table", table.FullyQualifiedName()) tmd, err := table.Metadata(ctx) if err != nil { - e.logger.Error("failed to fetch table's metadata, skipping table", "err", err, "table", table.FullyQualifiedName()) + e.logger.Error("failed to fetch table metadata", "err", err, "table", table.FullyQualifiedName()) continue } diff --git a/plugins/extractors/cassandra/cassandra.go b/plugins/extractors/cassandra/cassandra.go index 7ecb3a21d..818fd0b1a 100644 --- a/plugins/extractors/cassandra/cassandra.go +++ b/plugins/extractors/cassandra/cassandra.go @@ -4,6 +4,7 @@ import ( "context" _ "embed" // used to print the embedded assets "fmt" + "github.com/pkg/errors" "github.com/gocql/gocql" "github.com/odpf/meteor/models" @@ -94,10 +95,8 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) cluster.Consistency = gocql.Quorum cluster.ProtoVersion = 4 cluster.Port = e.config.Port - e.session, err = cluster.CreateSession() - if err != nil { - fmt.Printf("show the error %s\n", err) - return err + if e.session, err = cluster.CreateSession(); err != nil { + return errors.Wrap(err, "failed to create session") } return @@ -118,7 +117,7 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) for scanner.Next() { var keyspace string if err = scanner.Scan(&keyspace); err != nil { - return err + return errors.Wrapf(err, "failed to iterate over %s", keyspace) } // skip if database is default @@ -126,7 +125,7 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) continue } if err = e.extractTables(keyspace); err != nil { - return err + return errors.Wrapf(err, "failed to extract tables from %s", keyspace) } } @@ -143,10 +142,10 @@ func (e *Extractor) extractTables(keyspace string) (err error) { for scanner.Next() { var tableName string if err = scanner.Scan(&tableName); err != nil { - return err + return errors.Wrapf(err, "failed to iterate over %s", tableName) } if err = e.processTable(keyspace, tableName); err != nil { - return err + return errors.Wrap(err, "failed to process table") } } @@ -158,7 +157,7 @@ func (e *Extractor) processTable(keyspace string, tableName string) (err error) var columns []*facets.Column columns, err = e.extractColumns(keyspace, tableName) if err != nil { - return + return errors.Wrap(err, "failed to extract columns") } // push table to channel @@ -188,9 +187,9 @@ func (e *Extractor) extractColumns(keyspace string, tableName string) (columns [ for scanner.Next() { var fieldName, dataType string - err = scanner.Scan(&fieldName, &dataType) - if err != nil { - return + if err = scanner.Scan(&fieldName, &dataType); err != nil { + e.logger.Error("failed to get fields", "error", err) + continue } columns = append(columns, &facets.Column{ diff --git a/plugins/extractors/clickhouse/clickhouse.go b/plugins/extractors/clickhouse/clickhouse.go index d3b1ef37f..bfa2d57b2 100644 --- a/plugins/extractors/clickhouse/clickhouse.go +++ b/plugins/extractors/clickhouse/clickhouse.go @@ -5,6 +5,7 @@ import ( "database/sql" _ "embed" // used to print the embedded assets "fmt" + "github.com/pkg/errors" _ "github.com/ClickHouse/clickhouse-go" // clickhouse driver "github.com/odpf/meteor/models" @@ -62,15 +63,16 @@ func (e *Extractor) Validate(configMap map[string]interface{}) (err error) { return utils.BuildConfig(configMap, &Config{}) } +// Init initializes the extractor func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) (err error) { - err = utils.BuildConfig(configMap, &e.config) - if err != nil { + if err = utils.BuildConfig(configMap, &e.config); err != nil { return plugins.InvalidConfigError{} } - e.db, err = sql.Open("clickhouse", fmt.Sprintf("tcp://%s?username=%s&password=%s&debug=true", e.config.Host, e.config.UserID, e.config.Password)) - if err != nil { - return + if e.db, err = sql.Open("clickhouse", + fmt.Sprintf("tcp://%s?username=%s&password=%s&debug=true", e.config.Host, e.config.UserID, e.config.Password)); + err != nil { + return errors.Wrap(err, "failed to create a client") } return @@ -82,16 +84,17 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) { err = e.extractTables(emit) if err != nil { - return + return errors.Wrap(err, "failed to extract tables") } return } +// extractTables extract tables from a given database func (e *Extractor) extractTables(emit plugins.Emit) (err error) { res, err := e.db.Query("SELECT name, database FROM system.tables WHERE database not like 'system'") if err != nil { - return + return errors.Wrap(err, "failed to execute query") } for res.Next() { var dbName, tableName string @@ -123,7 +126,8 @@ func (e *Extractor) getColumnsInfo(dbName string, tableName string) (result []*f rows, err := e.db.Query(sqlStr) if err != nil { - return + err = errors.Wrapf(err, "failed to execute query %s", sqlStr) + return } for rows.Next() { var colName, colDesc, dataType string diff --git a/plugins/extractors/elastic/elastic.go b/plugins/extractors/elastic/elastic.go index 1933f98c0..9a3779557 100644 --- a/plugins/extractors/elastic/elastic.go +++ b/plugins/extractors/elastic/elastic.go @@ -5,6 +5,7 @@ import ( _ "embed" "encoding/json" "fmt" + "github.com/pkg/errors" "reflect" "github.com/elastic/go-elasticsearch/v8" @@ -28,23 +29,25 @@ type Config struct { } var sampleConfig = ` -# Elasticsearch configuration -user: "elastic" -password: "changeme" -host: elastic_server` + user: "elastic" + password: "changeme" + host: elastic_server` +// Extractor manages the extraction of data from elastic type Extractor struct { config Config logger log.Logger client *elasticsearch.Client } +// New returns a pointer to an initialized Extractor Object func New(logger log.Logger) *Extractor { return &Extractor{ logger: logger, } } +// Info returns the brief information about the extractor func (e *Extractor) Info() plugins.Info { return plugins.Info{ Description: "Search engine based on the Lucene library.", @@ -54,10 +57,12 @@ func (e *Extractor) Info() plugins.Info { } } +// Validate validates the configuration of the extractor func (e *Extractor) Validate(configMap map[string]interface{}) (err error) { return utils.BuildConfig(configMap, &Config{}) } +// Init initializes the extractor func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) (err error) { //build config err = utils.BuildConfig(configMap, &e.config) @@ -73,20 +78,21 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) Username: e.config.User, Password: e.config.Password, } - e.client, err = elasticsearch.NewClient(cfg) - if err != nil { - return + if e.client, err = elasticsearch.NewClient(cfg); err != nil { + return errors.Wrap(err, "failed to create client") } return } +// Extract extracts the data from the elastic server +// and collected through the emitter func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) { res, err := e.client.Cluster.Health( e.client.Cluster.Health.WithLevel("indices"), ) if err != nil { - return + return errors.Wrap(err, "failed to fetch cluster information") } var r map[string]interface{} err = json.NewDecoder(res.Body).Decode(&r) @@ -139,12 +145,14 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) return } +// listIndexInfo returns the properties of the index func (e *Extractor) listIndexInfo(index string) (result map[string]interface{}, err error) { var r map[string]interface{} res, err := e.client.Indices.GetMapping( e.client.Indices.GetMapping.WithIndex(index), ) if err != nil { + err = errors.Wrap(err, "failed to retrieve index") return } err = json.NewDecoder(res.Body).Decode(&r) @@ -157,7 +165,7 @@ func (e *Extractor) listIndexInfo(index string) (result map[string]interface{}, return } -// Register the extractor to catalog +// init registers the extractor to catalog func init() { if err := registry.Extractors.Register("elastic", func() plugins.Extractor { return New(plugins.GetLog()) diff --git a/plugins/extractors/gcs/gcs.go b/plugins/extractors/gcs/gcs.go index 3fba5c2fd..70d7edb27 100644 --- a/plugins/extractors/gcs/gcs.go +++ b/plugins/extractors/gcs/gcs.go @@ -4,6 +4,7 @@ import ( "context" _ "embed" // used to print the embedded assets "fmt" + "github.com/pkg/errors" "github.com/odpf/meteor/models" "github.com/odpf/meteor/models/odpf/assets" @@ -78,6 +79,7 @@ func (e *Extractor) Validate(configMap map[string]interface{}) (err error) { return utils.BuildConfig(configMap, &Config{}) } +// Init initializes the extractor func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) (err error) { // build config err = utils.BuildConfig(configMap, &e.config) @@ -88,7 +90,7 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) // create client e.client, err = e.createClient(ctx) if err != nil { - return + return errors.Wrap(err, "failed to create client") } return @@ -102,14 +104,14 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) break } if err != nil { - return err + return errors.Wrapf(err, "failed to iterate over %s", bucket.Name) } var blobs []*assets.Blob if e.config.ExtractBlob { blobs, err = e.extractBlobs(ctx, bucket.Name, e.config.ProjectID) if err != nil { - return err + return errors.Wrapf(err, "failed to extract blobs from %s", bucket.Name) } } diff --git a/plugins/extractors/github/github.go b/plugins/extractors/github/github.go index c96df295c..280aba390 100644 --- a/plugins/extractors/github/github.go +++ b/plugins/extractors/github/github.go @@ -3,6 +3,7 @@ package github import ( "context" _ "embed" // used to print the embedded assets + "github.com/pkg/errors" "github.com/google/go-github/v37/github" "github.com/odpf/meteor/models" @@ -50,6 +51,7 @@ func (e *Extractor) Validate(configMap map[string]interface{}) (err error) { return utils.BuildConfig(configMap, &Config{}) } +// Init initializes the extractor func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) (err error) { err = utils.BuildConfig(configMap, &e.config) if err != nil { @@ -71,11 +73,12 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) users, _, err := e.client.Organizations.ListMembers(ctx, e.config.Org, nil) if err != nil { - return err + return errors.Wrap(err, "failed to fetch organizations") } for _, user := range users { usr, _, err := e.client.Users.Get(ctx, *user.Login) if err != nil { + e.logger.Error("failed to fetch user" , "error", err) continue } emit(models.NewRecord(&assets.User{ @@ -92,7 +95,7 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) return nil } -// Register the extractor to catalog +// init registers the extractor to catalog func init() { if err := registry.Extractors.Register("github", func() plugins.Extractor { return &Extractor{ diff --git a/plugins/extractors/grafana/grafana.go b/plugins/extractors/grafana/grafana.go index 9564c5948..26323bbed 100644 --- a/plugins/extractors/grafana/grafana.go +++ b/plugins/extractors/grafana/grafana.go @@ -4,6 +4,7 @@ import ( "context" _ "embed" // used to print the embedded assets "fmt" + "github.com/pkg/errors" "net/http" "github.com/odpf/meteor/models" @@ -57,6 +58,7 @@ func (e *Extractor) Validate(configMap map[string]interface{}) (err error) { return utils.BuildConfig(configMap, &Config{}) } +// Init initializes the extractor func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) (err error) { // build config err = utils.BuildConfig(configMap, &e.config) @@ -75,11 +77,11 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) { uids, err := e.client.SearchAllDashboardUIDs() if err != nil { - return + return errors.Wrap(err, "failed to fetch dashboards") } dashboardDetails, err := e.client.GetAllDashboardDetails(uids) if err != nil { - return + return errors.Wrap(err, "failed to fetch dashboard details") } for _, dashboardDetail := range dashboardDetails { @@ -90,6 +92,7 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) return } +// grafanaDashboardToMeteorDashboard converts a grafana dashboard to a meteor dashboard func (e *Extractor) grafanaDashboardToMeteorDashboard(dashboard DashboardDetail) *assets.Dashboard { charts := make([]*assets.Chart, len(dashboard.Dashboard.Panels)) for i, panel := range dashboard.Dashboard.Panels { @@ -108,6 +111,7 @@ func (e *Extractor) grafanaDashboardToMeteorDashboard(dashboard DashboardDetail) } } +// grafanaPanelToMeteorChart converts a grafana panel to a meteor chart func (e *Extractor) grafanaPanelToMeteorChart(panel Panel, dashboardUID string, metaURL string) assets.Chart { var rawQuery string if len(panel.Targets) > 0 { @@ -127,7 +131,7 @@ func (e *Extractor) grafanaPanelToMeteorChart(panel Panel, dashboardUID string, } } -// Register the extractor to catalog +// init registers the extractor to catalog func init() { if err := registry.Extractors.Register("grafana", func() plugins.Extractor { return New(plugins.GetLog()) diff --git a/plugins/extractors/kafka/kafka.go b/plugins/extractors/kafka/kafka.go index f2fe4fbcd..f34d8d6dd 100644 --- a/plugins/extractors/kafka/kafka.go +++ b/plugins/extractors/kafka/kafka.go @@ -3,6 +3,7 @@ package kafka import ( "context" _ "embed" // used to print the embedded assets + "github.com/pkg/errors" "github.com/odpf/meteor/models" "github.com/odpf/meteor/models/odpf/assets" @@ -30,7 +31,6 @@ broker: "localhost:9092"` // from a kafka broker type Extractor struct { // internal states - out chan<- models.Record conn *kafka.Conn logger log.Logger config Config @@ -58,16 +58,17 @@ func (e *Extractor) Validate(configMap map[string]interface{}) (err error) { return utils.BuildConfig(configMap, &Config{}) } +// Init initializes the extractor func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) (err error) { err = utils.BuildConfig(configMap, &e.config) if err != nil { return plugins.InvalidConfigError{} } - // create conn + // create connection e.conn, err = kafka.Dial("tcp", e.config.Broker) if err != nil { - return err + return errors.Wrap(err, "failed to create connection") } return @@ -80,7 +81,7 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) partitions, err := e.conn.ReadPartitions() if err != nil { - return + return errors.Wrap(err, "failed to fetch partitions") } // collect topic list from partition list diff --git a/plugins/extractors/metabase/metabase.go b/plugins/extractors/metabase/metabase.go index 0cb7fe39e..a122a6605 100644 --- a/plugins/extractors/metabase/metabase.go +++ b/plugins/extractors/metabase/metabase.go @@ -6,6 +6,7 @@ import ( _ "embed" // used to print the embedded assets "encoding/json" "fmt" + "github.com/pkg/errors" "io/ioutil" "net/http" "strconv" @@ -68,7 +69,7 @@ func (e *Extractor) Validate(configMap map[string]interface{}) (err error) { } func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) (err error) { - // build and validateconfig + // build and validate config err = utils.BuildConfig(configMap, &e.config) if err != nil { return plugins.InvalidConfigError{} @@ -79,9 +80,8 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) } // get session id for further api calls in metabase - e.sessionID, err = e.getSessionID() - if err != nil { - return + if e.sessionID, err = e.getSessionID(); err != nil { + return errors.Wrap(err, "failed to fetch session ID") } return nil @@ -91,12 +91,12 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) { dashboards, err := e.getDashboardsList() if err != nil { - return + return errors.Wrap(err, "failed to fetch dashboard list") } for _, dashboard := range dashboards { data, err := e.buildDashboard(strconv.Itoa(dashboard.ID), dashboard.Name) if err != nil { - return err + return errors.Wrap(err, "failed to fetch dashboard data") } emit(models.NewRecord(data)) } @@ -105,8 +105,8 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) func (e *Extractor) buildDashboard(id string, name string) (data *assets.Dashboard, err error) { var dashboard Dashboard - err = e.makeRequest("GET", e.config.Host+"/api/dashboard/"+id, nil, &dashboard) - if err != nil { + if err = e.makeRequest("GET", e.config.Host+"/api/dashboard/"+id, nil, &dashboard); err != nil { + err = errors.Wrap(err, "failed to fetch dashboard") return } var tempCards []*assets.Chart @@ -161,12 +161,12 @@ func (e *Extractor) getSessionID() (sessionID string, err error) { func (e *Extractor) makeRequest(method, url string, payload interface{}, data interface{}) (err error) { jsonifyPayload, err := json.Marshal(payload) if err != nil { - return + return errors.Wrap(err, "failed to encode the payload JSON") } body := bytes.NewBuffer(jsonifyPayload) req, err := http.NewRequest(method, url, body) if err != nil { - return + return errors.Wrap(err, "failed to create request") } if body != nil { req.Header.Set("Content-Type", "application/json") @@ -176,14 +176,15 @@ func (e *Extractor) makeRequest(method, url string, payload interface{}, data in } res, err := e.client.Do(req) if err != nil { - fmt.Println(err) - return + return errors.Wrap(err, "failed to generate response") } b, err := ioutil.ReadAll(res.Body) if err != nil { - return + return errors.Wrap(err, "failed to read response body") + } + if err = json.Unmarshal(b, &data); err != nil { + return errors.Wrapf(err, "failed to parse: %s", string(b)) } - err = json.Unmarshal(b, &data) return } diff --git a/plugins/extractors/mongodb/mongodb.go b/plugins/extractors/mongodb/mongodb.go index c34406032..5e30e535a 100644 --- a/plugins/extractors/mongodb/mongodb.go +++ b/plugins/extractors/mongodb/mongodb.go @@ -4,6 +4,7 @@ import ( "context" _ "embed" // used to print the embedded assets "fmt" + "github.com/pkg/errors" "sort" "github.com/odpf/meteor/models" @@ -43,7 +44,6 @@ password: "1234"` // Extractor manages the communication with the mongo server type Extractor struct { // internal states - out chan<- models.Record client *mongo.Client excluded map[string]bool logger log.Logger @@ -83,26 +83,24 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) // setup client uri := fmt.Sprintf("mongodb://%s:%s@%s", e.config.UserID, e.config.Password, e.config.Host) - e.client, err = createAndConnnectClient(ctx, uri) - if err != nil { - return + if e.client, err = createAndConnnectClient(ctx, uri); err != nil { + return errors.Wrap(err, "failed to create client") } return } -// Extract extracts the data from the mongo server -// and outputs the data to the out channel +// Extract collects metadata of each database through emitter func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) { databases, err := e.client.ListDatabaseNames(ctx, bson.M{}) if err != nil { - return + return errors.Wrap(err, "failed to list database names") } for _, dbName := range databases { database := e.client.Database(dbName) if err := e.extractCollections(ctx, database, emit); err != nil { - return err + return errors.Wrap(err, "failed to extract collections") } } @@ -113,7 +111,7 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) func (e *Extractor) extractCollections(ctx context.Context, db *mongo.Database, emit plugins.Emit) (err error) { collections, err := db.ListCollectionNames(ctx, bson.D{}) if err != nil { - return + return errors.Wrap(err, "failed to list collection names") } // we need to sort the collections for testing purpose @@ -128,7 +126,7 @@ func (e *Extractor) extractCollections(ctx context.Context, db *mongo.Database, table, err := e.buildTable(ctx, db, collectionName) if err != nil { - return err + return errors.Wrap(err, "failed to build table") } emit(models.NewRecord(table)) @@ -142,6 +140,7 @@ func (e *Extractor) buildTable(ctx context.Context, db *mongo.Database, collecti // get total rows totalRows, err := db.Collection(collectionName).EstimatedDocumentCount(ctx) if err != nil { + err = errors.Wrap(err, "failed to fetch total no of rows") return } diff --git a/plugins/extractors/mssql/mssql.go b/plugins/extractors/mssql/mssql.go index 6b2cb94af..73f44b942 100644 --- a/plugins/extractors/mssql/mssql.go +++ b/plugins/extractors/mssql/mssql.go @@ -5,6 +5,7 @@ import ( "database/sql" _ "embed" // used to print the embedded assets "fmt" + "github.com/pkg/errors" "github.com/odpf/salt/log" @@ -72,6 +73,7 @@ func (e *Extractor) Validate(configMap map[string]interface{}) (err error) { return utils.BuildConfig(configMap, &Config{}) } +// Init initializes the extractor func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) (err error) { err = utils.BuildConfig(configMap, &e.config) if err != nil { @@ -82,9 +84,8 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) e.buildExcludedDBs() // create client - e.db, err = sql.Open("mssql", fmt.Sprintf("sqlserver://%s:%s@%s/", e.config.UserID, e.config.Password, e.config.Host)) - if err != nil { - return + if e.db, err = sql.Open("mssql", fmt.Sprintf("sqlserver://%s:%s@%s/", e.config.UserID, e.config.Password, e.config.Host)); err != nil { + return errors.Wrap(err, "failed to create client") } return @@ -98,23 +99,23 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) res, err := e.db.Query("SELECT name FROM sys.databases;") if err != nil { - return + return errors.Wrap(err, "failed to fetch databases") } for res.Next() { var database string if err := res.Scan(&database); err != nil { - return err + return errors.Wrapf(err, "failed to iterate over %s", database) } if err := e.extractTables(database); err != nil { - return err + return errors.Wrapf(err, "failed to extract tables from %s", database) } } return } -// Extract tables from a given database +// extractTables extract tables from a given database func (e *Extractor) extractTables(database string) (err error) { // skip if database is excluded if e.isExcludedDB(database) { @@ -132,21 +133,22 @@ func (e *Extractor) extractTables(database string) (err error) { for rows.Next() { var tableName string if err := rows.Scan(&tableName); err != nil { - return err + return errors.Wrapf(err, "failed to iterate over %s", tableName) } if err := e.processTable(database, tableName); err != nil { - return err + return errors.Wrap(err, "failed to process Table") } } return } +// processTable builds and push table to emitter func (e *Extractor) processTable(database string, tableName string) (err error) { columns, err := e.getColumns(database, tableName) if err != nil { - return + return errors.Wrap(err, "failed to get columns") } // push table to channel @@ -163,6 +165,7 @@ func (e *Extractor) processTable(database string, tableName string) (err error) return } +// getColumns extract columns from the given table func (e *Extractor) getColumns(database, tableName string) (columns []*facets.Column, err error) { query := fmt.Sprintf( `SELECT COLUMN_NAME, DATA_TYPE, @@ -172,15 +175,16 @@ func (e *Extractor) getColumns(database, tableName string) (columns []*facets.Co ORDER BY COLUMN_NAME ASC`, database) rows, err := e.db.Query(query, tableName) if err != nil { + err = errors.Wrap(err, "failed to execute query") return } for rows.Next() { var fieldName, dataType, isNullableString string var length int - err = rows.Scan(&fieldName, &dataType, &isNullableString, &length) - if err != nil { - return + if err = rows.Scan(&fieldName, &dataType, &isNullableString, &length); err != nil { + e.logger.Error("failed to scan fields", "error", err) + continue } columns = append(columns, &facets.Column{ Name: fieldName, @@ -192,7 +196,7 @@ func (e *Extractor) getColumns(database, tableName string) (columns []*facets.Co return } - +// isExcludedDB checks if the given db is in the list of excluded databases func (e *Extractor) buildExcludedDBs() { excludedMap := make(map[string]bool) for _, db := range defaultDBList { @@ -202,15 +206,18 @@ func (e *Extractor) buildExcludedDBs() { e.excludedDbs = excludedMap } +// isExcludedDB checks if the given db is in the list of excluded databases func (e *Extractor) isExcludedDB(database string) bool { _, ok := e.excludedDbs[database] return ok } +// isNullable checks if the given string is null or not func (e *Extractor) isNullable(value string) bool { return value == "YES" } +// init register the extractor to the catalog func init() { if err := registry.Extractors.Register("mssql", func() plugins.Extractor { return New(plugins.GetLog()) diff --git a/plugins/extractors/mysql/mysql.go b/plugins/extractors/mysql/mysql.go index 18d78b45c..15832fa27 100644 --- a/plugins/extractors/mysql/mysql.go +++ b/plugins/extractors/mysql/mysql.go @@ -5,6 +5,7 @@ import ( "database/sql" _ "embed" // used to print the embedded assets "fmt" + "github.com/pkg/errors" _ "github.com/go-sql-driver/mysql" "github.com/odpf/meteor/models" @@ -70,9 +71,9 @@ func (e *Extractor) Validate(configMap map[string]interface{}) (err error) { return utils.BuildConfig(configMap, &Config{}) } +// Init initializes the extractor func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) (err error) { - err = utils.BuildConfig(configMap, &e.config) - if err != nil { + if err = utils.BuildConfig(configMap, &e.config); err != nil { return plugins.InvalidConfigError{} } @@ -80,32 +81,33 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) e.buildExcludedDBs() // create client - e.db, err = sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/", e.config.UserID, e.config.Password, e.config.Host)) - if err != nil { - return + if e.db, err = sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/", e.config.UserID, e.config.Password, e.config.Host)); err != nil { + return errors.Wrap(err, "failed to create client") } return } // Extract extracts the data from the MySQL server -// and collected through the out channel +// and collected through the emitter func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) { defer e.db.Close() e.emit = emit res, err := e.db.Query("SHOW DATABASES;") if err != nil { - return + return errors.Wrap(err, "failed to fetch databases") } for res.Next() { var database string if err := res.Scan(&database); err != nil { - return err + e.logger.Error("failed to connect, skipping database", "error", err) + continue } if err := e.extractTables(database); err != nil { - return err + e.logger.Error("failed to get tables, skipping database", "error", err) + continue } } @@ -122,34 +124,33 @@ func (e *Extractor) extractTables(database string) (err error) { // extract tables _, err = e.db.Exec(fmt.Sprintf("USE %s;", database)) if err != nil { - return + return errors.Wrapf(err, "failed to iterate over %s", database) } rows, err := e.db.Query("SHOW TABLES;") if err != nil { - return + return errors.Wrapf(err, "failed to show tables of %s", database) } // process each rows for rows.Next() { var tableName string if err := rows.Scan(&tableName); err != nil { - return err + return errors.Wrapf(err, "failed to iterate over %s", tableName) } if err := e.processTable(database, tableName); err != nil { - return err + return errors.Wrap(err, "failed to process table") } } return } -// Build and push table to out channel +// processTable builds and push table to emitter func (e *Extractor) processTable(database string, tableName string) (err error) { var columns []*facets.Column - columns, err = e.extractColumns(tableName) - if err != nil { - return + if columns, err = e.extractColumns(tableName); err != nil { + return errors.Wrap(err, "failed to extract columns") } // push table to channel @@ -175,15 +176,16 @@ func (e *Extractor) extractColumns(tableName string) (columns []*facets.Column, ORDER BY COLUMN_NAME ASC` rows, err := e.db.Query(query, tableName) if err != nil { + err = errors.Wrap(err, "failed to execute query") return } for rows.Next() { var fieldName, fieldDesc, dataType, isNullableString string var length int - err = rows.Scan(&fieldName, &fieldDesc, &dataType, &isNullableString, &length) - if err != nil { - return + if err = rows.Scan(&fieldName, &fieldDesc, &dataType, &isNullableString, &length); err != nil { + e.logger.Error("failed to get fields", "error", err) + continue } columns = append(columns, &facets.Column{ @@ -198,6 +200,7 @@ func (e *Extractor) extractColumns(tableName string) (columns []*facets.Column, return } +// buildExcludedDBs builds the list of excluded databases func (e *Extractor) buildExcludedDBs() { excludedMap := make(map[string]bool) for _, db := range defaultDBList { @@ -207,16 +210,18 @@ func (e *Extractor) buildExcludedDBs() { e.excludedDbs = excludedMap } +// isExcludedDB checks if the given db is in the list of excluded databases func (e *Extractor) isExcludedDB(database string) bool { _, ok := e.excludedDbs[database] return ok } +// isNullable checks if the given string is null or not func (e *Extractor) isNullable(value string) bool { return value == "YES" } -// Register the extractor to catalog +// init register the extractor to the catalog func init() { if err := registry.Extractors.Register("mysql", func() plugins.Extractor { return New(plugins.GetLog()) diff --git a/plugins/extractors/postgres/postgres.go b/plugins/extractors/postgres/postgres.go index a80e43251..106406f75 100644 --- a/plugins/extractors/postgres/postgres.go +++ b/plugins/extractors/postgres/postgres.go @@ -5,6 +5,7 @@ import ( "database/sql" _ "embed" // // used to print the embedded assets "fmt" + "github.com/pkg/errors" "strings" _ "github.com/lib/pq" // used to register the postgres driver @@ -59,7 +60,7 @@ func (e *Extractor) Info() plugins.Info { Description: "Table metadata and metrics from Postgres SQL sever.", SampleConfig: sampleConfig, Summary: summary, - Tags: []string{"oss,extractor"}, + Tags: []string{"oss", "extractor"}, } } @@ -68,8 +69,9 @@ func (e *Extractor) Validate(configMap map[string]interface{}) (err error) { return utils.BuildConfig(configMap, &Config{}) } +// Init initializes the extractor func (e *Extractor) Init(ctx context.Context, config map[string]interface{}) (err error) { - // Build and validate config received from receipe + // Build and validate config received from recipe if err := utils.BuildConfig(config, &e.config); err != nil { return plugins.InvalidConfigError{} } @@ -77,20 +79,20 @@ func (e *Extractor) Init(ctx context.Context, config map[string]interface{}) (er // Create database connection e.db, err = connection(e.config, e.config.Database) if err != nil { - return err + return errors.Wrap(err, "failed to create connection") } return } -// Extract collects metdata from the source. Metadata is collected through the out channel +// Extract collects metadata from the source. Metadata is collected through the emitter func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) { defer e.db.Close() // Get list of databases dbs, err := e.getDatabases() if err != nil { - return err + return errors.Wrap(err, "failed to fetch databases") } // Iterate through all tables and databases @@ -100,19 +102,19 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) // information will be returned db, err := connection(e.config, database) if err != nil { - e.logger.Error("failed to connect, skipping database", err) + e.logger.Error("failed to connect, skipping database", "error", err) continue } tables, err := e.getTables(db, database) if err != nil { - e.logger.Error("failed to get tables, skipping database ", err) + e.logger.Error("failed to get tables, skipping database", "error", err) continue } for _, table := range tables { result, err := e.getTableMetadata(db, database, table) if err != nil { - e.logger.Error("failed to get table metadata, skipping table", err) + e.logger.Error("failed to get table metadata, skipping table", "error", err) continue } // Publish metadata to channel @@ -199,14 +201,14 @@ func (e *Extractor) getColumnMetadata(db *sql.DB, dbName string, tableName strin WHERE TABLE_NAME = '%s' ORDER BY COLUMN_NAME ASC;` rows, err := db.Query(fmt.Sprintf(sqlStr, tableName)) if err != nil { - return nil, err + err = errors.Wrap(err, "failed to fetch data from query") + return } for rows.Next() { var fieldName, dataType, isNullableString string var length int - err = rows.Scan(&fieldName, &dataType, &isNullableString, &length) - if err != nil { - e.logger.Error("failed to scan row, skipping", err) + if err = rows.Scan(&fieldName, &dataType, &isNullableString, &length); err != nil { + e.logger.Error("failed to get fields", "error", err) continue } result = append(result, &facets.Column{ @@ -224,7 +226,7 @@ func isNullable(value string) bool { return value == "YES" } -// Generate a connecion string +// connection generates a connection string func connection(cfg Config, database string) (db *sql.DB, err error) { connStr := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable", cfg.UserID, cfg.Password, cfg.Host, database) return sql.Open("postgres", connStr) diff --git a/plugins/extractors/superset/superset.go b/plugins/extractors/superset/superset.go index 19c4b1ba7..0d1adb0ea 100644 --- a/plugins/extractors/superset/superset.go +++ b/plugins/extractors/superset/superset.go @@ -80,12 +80,12 @@ func (e *Extractor) Init(_ context.Context, configMap map[string]interface{}) (e } // get access token for further api calls in superset if e.accessToken, err = e.getAccessToken(); err != nil { - return errors.Wrapf(err, "failed to get access token") + return errors.Wrap(err, "failed to get access token") } // get csrf token for secure api calls in superset if e.csrfToken, err = e.getCsrfToken(); err != nil { - return errors.Wrapf(err, "failed to get csrf token") + return errors.Wrap(err, "failed to get csrf token") } return } @@ -94,12 +94,12 @@ func (e *Extractor) Init(_ context.Context, configMap map[string]interface{}) (e func (e *Extractor) Extract(_ context.Context, emit plugins.Emit) (err error) { dashboards, err := e.getDashboardsList() if err != nil { - return errors.Wrapf(err, "failed to get dashboard list") + return errors.Wrap(err, "failed to get dashboard list") } for _, dashboard := range dashboards { data, err := e.buildDashboard(dashboard.ID) if err != nil { - return errors.Wrapf(err, "failed to build dashbaord") + return errors.Wrap(err, "failed to build dashbaord") } emit(models.NewRecord(data)) } @@ -111,7 +111,8 @@ func (e *Extractor) buildDashboard(id int) (data *assets.Dashboard, err error) { var dashboard Dashboard chart, err := e.getChartsList(id) if err != nil { - return nil, errors.Wrapf(err, "failed to get chart list") + err = errors.Wrap(err, "failed to get chart list") + return } data = &assets.Dashboard{ Resource: &common.Resource{ @@ -132,7 +133,8 @@ func (e *Extractor) getDashboardsList() (dashboards []Dashboard, err error) { } var data response if err = e.makeRequest("GET", e.config.Host+"/api/v1/dashboard", nil, &data); err != nil { - return nil, errors.Wrapf(err, "failed to get dashboard") + err = errors.Wrap(err, "failed to get dashboard") + return } return data.Result, nil } @@ -145,7 +147,8 @@ func (e *Extractor) getChartsList(id int) (charts []*assets.Chart, err error) { var data responseChart if err = e.makeRequest("GET", fmt.Sprintf("%s/api/v1/dashboard/%d/charts", e.config.Host, id), nil, &data); err != nil { - return nil, errors.Wrapf(err, "failed to get list of chart details") + err = errors.Wrap(err, "failed to get list of chart details") + return } var tempCharts []*assets.Chart for _, res := range data.Result { @@ -173,7 +176,7 @@ func (e *Extractor) getAccessToken() (accessToken string, err error) { } var data responseToken if err = e.makeRequest("POST", e.config.Host+"/api/v1/security/login", payload, &data); err != nil { - return "", errors.Wrapf(err, "failed to fetch access token") + return "", errors.Wrap(err, "failed to fetch access token") } return data.Token, nil } @@ -185,7 +188,7 @@ func (e *Extractor) getCsrfToken() (csrfToken string, err error) { } var data responseCsrfToken if err = e.makeRequest("GET", e.config.Host+"/api/v1/security/csrf_token/", nil, &data); err != nil { - return "", errors.Wrapf(err, "failed to fetch csrf token") + return "", errors.Wrap(err, "failed to fetch csrf token") } return data.CsrfToken, nil } @@ -194,12 +197,12 @@ func (e *Extractor) getCsrfToken() (csrfToken string, err error) { func (e *Extractor) makeRequest(method, url string, payload interface{}, data interface{}) (err error) { jsonifyPayload, err := json.Marshal(payload) if err != nil { - return errors.Wrapf(err, "failed to encode the payload JSON") + return errors.Wrap(err, "failed to encode the payload JSON") } body := bytes.NewBuffer(jsonifyPayload) req, err := http.NewRequest(method, url, body) if err != nil { - return errors.Wrapf(err, "failed to create request") + return errors.Wrap(err, "failed to create request") } var bearer = "Bearer " + e.accessToken req.Header.Set("Content-Type", "application/json") @@ -209,14 +212,14 @@ func (e *Extractor) makeRequest(method, url string, payload interface{}, data in res, err := e.client.Do(req) if err != nil { - return errors.Wrapf(err, "failed to generate response") + return errors.Wrap(err, "failed to generate response") } if res.StatusCode < 200 || res.StatusCode >= 300 { return errors.Wrapf(err, "response failed with status code: %d", res.StatusCode) } b, err := ioutil.ReadAll(res.Body) if err != nil { - return errors.Wrapf(err, "failed to read response body") + return errors.Wrap(err, "failed to read response body") } if err = json.Unmarshal(b, &data); err != nil { return errors.Wrapf(err, "failed to parse: %s", string(b)) diff --git a/plugins/extractors/superset/superset_test.go b/plugins/extractors/superset/superset_test.go index e92af797a..724b4e108 100644 --- a/plugins/extractors/superset/superset_test.go +++ b/plugins/extractors/superset/superset_test.go @@ -226,12 +226,12 @@ func addChart(id int) (err error) { func makeRequest(method, url string, payload interface{}, data interface{}) (err error) { jsonifyPayload, err := json.Marshal(payload) if err != nil { - return errors.Wrapf(err, "failed to encode the payload JSON") + return errors.Wrap(err, "failed to encode the payload JSON") } body := bytes.NewBuffer(jsonifyPayload) req, err := http.NewRequest(method, url, body) if err != nil { - return errors.Wrapf(err, "failed to create request") + return errors.Wrap(err, "failed to create request") } var bearer = "Bearer " + accessToken req.Header.Set("Content-Type", "application/json") @@ -241,14 +241,14 @@ func makeRequest(method, url string, payload interface{}, data interface{}) (err res, err := client.Do(req) if err != nil { - return errors.Wrapf(err, "failed to generate response") + return errors.Wrap(err, "failed to generate response") } if res.StatusCode < 200 || res.StatusCode >= 300 { - return errors.Wrapf(err, "response failed with status code: %d", res.StatusCode) + return errors.Wrap(err, "response failed with status code: %d", res.StatusCode) } b, err := ioutil.ReadAll(res.Body) if err != nil { - return errors.Wrapf(err, "failed to read response body") + return errors.Wrap(err, "failed to read response body") } if err = json.Unmarshal(b, &data); err != nil { return errors.Wrapf(err, "failed to parse: %s", string(b)) diff --git a/utils/config.go b/utils/config.go index 8ee2aed57..e53b7a1e7 100644 --- a/utils/config.go +++ b/utils/config.go @@ -16,13 +16,10 @@ func init() { func BuildConfig(configMap map[string]interface{}, c interface{}) (err error) { defaults.SetDefaults(c) - err = mapstructure.Decode(configMap, c) - if err != nil { + if err = mapstructure.Decode(configMap, c); err != nil { return err } - - err = validate.Struct(c) - if err != nil { + if err = validate.Struct(c); err != nil { return err } diff --git a/utils/custom_properties.go b/utils/custom_properties.go index 0aa561fe3..16ab1e793 100644 --- a/utils/custom_properties.go +++ b/utils/custom_properties.go @@ -4,6 +4,7 @@ import ( "github.com/odpf/meteor/models" "github.com/odpf/meteor/models/odpf/assets" "github.com/odpf/meteor/models/odpf/assets/facets" + "github.com/pkg/errors" "google.golang.org/protobuf/types/known/structpb" ) @@ -24,7 +25,7 @@ func GetCustomProperties(metadata models.Metadata) map[string]interface{} { func SetCustomProperties(metadata models.Metadata, customFields map[string]interface{}) (models.Metadata, error) { properties, err := appendCustomFields(metadata, customFields) if err != nil { - return metadata, err + return metadata, errors.Wrap(err, "failed to append custom fields in metadata") } switch metadata := metadata.(type) { @@ -51,7 +52,7 @@ func appendCustomFields(metadata models.Metadata, customFields map[string]interf protoStruct, err := parseMapToProto(customFields) if err != nil { - return properties, err + return properties, errors.Wrap(err, "failed to parse map to proto") } properties.Attributes = protoStruct