Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve error handling #238

Merged
merged 1 commit into from
Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 16 additions & 28 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ import (
"github.com/pkg/errors"
)

const (
defaultBatchSize = 1
)
const defaultBatchSize = 1

// Agent runs recipes for specified plugins.
type Agent struct {
Expand Down Expand Up @@ -117,20 +115,20 @@ func (r *Agent) Run(recipe recipe.Recipe) (run Run) {
)
runExtractor, err := r.setupExtractor(ctx, recipe.Source, stream)
if err != nil {
run.Error = err
run.Error = errors.Wrap(err, "failed to setup extractor")
return
}

for _, pr := range recipe.Processors {
if err := r.setupProcessor(ctx, pr, stream); err != nil {
run.Error = err
run.Error = errors.Wrap(err, "failed to setup processor")
return
}
}

for _, sr := range recipe.Sinks {
if err := r.setupSink(ctx, sr, stream); err != nil {
run.Error = err
run.Error = errors.Wrap(err, "failed to setup sink")
return
}
}
Expand All @@ -152,14 +150,14 @@ func (r *Agent) Run(recipe recipe.Recipe) (run Run) {
}()
err = runExtractor()
if err != nil {
run.Error = err
run.Error = errors.Wrap(err, "failed to run extractor")
}
}()

// start listening.
// this process is blocking
if err := stream.broadcast(); err != nil {
run.Error = err
run.Error = errors.Wrap(err, "failed to broadcast stream")
}

// code will reach here stream.Listen() is done.
Expand All @@ -185,15 +183,13 @@ func (r *Agent) setupExtractor(ctx context.Context, sr recipe.SourceRecipe, str
err = errors.Wrapf(err, "could not find extractor \"%s\"", sr.Type)
return
}
err = extractor.Init(ctx, sr.Config)
if err != nil {
if err = extractor.Init(ctx, sr.Config); err != nil {
err = errors.Wrapf(err, "could not initiate extractor \"%s\"", sr.Type)
return
}

runFn = func() (err error) {
err = extractor.Extract(ctx, str.push)
if err != nil {
if err = extractor.Extract(ctx, str.push); err != nil {
err = errors.Wrapf(err, "error running extractor \"%s\"", sr.Type)
}

Expand All @@ -204,15 +200,11 @@ func (r *Agent) setupExtractor(ctx context.Context, sr recipe.SourceRecipe, str

func (r *Agent) setupProcessor(ctx context.Context, pr recipe.ProcessorRecipe, str *stream) (err error) {
var proc plugins.Processor
proc, err = r.processorFactory.Get(pr.Name)
if err != nil {
err = errors.Wrapf(err, "could not find processor \"%s\"", pr.Name)
return
if proc, err = r.processorFactory.Get(pr.Name); err != nil {
return errors.Wrapf(err, "could not find processor \"%s\"", pr.Name)
}
err = proc.Init(ctx, pr.Config)
if err != nil {
err = errors.Wrapf(err, "could not initiate processor \"%s\"", pr.Name)
return
if err = proc.Init(ctx, pr.Config); err != nil {
return errors.Wrapf(err, "could not initiate processor \"%s\"", pr.Name)
}

str.setMiddleware(func(src models.Record) (dst models.Record, err error) {
Expand All @@ -230,15 +222,11 @@ func (r *Agent) setupProcessor(ctx context.Context, pr recipe.ProcessorRecipe, s

func (r *Agent) setupSink(ctx context.Context, sr recipe.SinkRecipe, stream *stream) (err error) {
var sink plugins.Syncer
sink, err = r.sinkFactory.Get(sr.Name)
if err != nil {
err = errors.Wrapf(err, "could not find sink \"%s\"", sr.Name)
return
if sink, err = r.sinkFactory.Get(sr.Name); err != nil {
return errors.Wrapf(err, "could not find sink \"%s\"", sr.Name)
}
err = sink.Init(ctx, sr.Config)
if err != nil {
err = errors.Wrapf(err, "could not initiate sink \"%s\"", sr.Name)
return
if err = sink.Init(ctx, sr.Config); err != nil {
return errors.Wrapf(err, "could not initiate sink \"%s\"", sr.Name)
}

retryNotification := func(e error, d time.Duration) {
Expand Down
8 changes: 7 additions & 1 deletion agent/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,38 @@ import (
"github.com/odpf/meteor/models"
)

// batch contains the configuration for a batch
type batch struct {
data []models.Record
capacity int
}

// newBatch returns a new batch
func newBatch(capacity int) *batch {
return &batch{
capacity: capacity,
}
}

// add appends a record to the batch
func (b *batch) add(d models.Record) error {
if b.isFull() {
return errors.New("batch: cannot add, batch is full!")
return errors.New("batch: cannot add, batch is full")
}

b.data = append(b.data, d)
return nil
}

// flush removes all records from the batch
func (b *batch) flush() []models.Record {
data := b.data
b.data = []models.Record{}

return data
}

// isFull returns true if the batch is full
func (b *batch) isFull() bool {
// size 0 means there is no limit, hence will not ever be full
if b.capacity == 0 {
Expand All @@ -42,6 +47,7 @@ func (b *batch) isFull() bool {
return len(b.data) >= b.capacity
}

// isEmpty returns true if the batch is empty
func (b *batch) isEmpty() bool {
return len(b.data) == 0
}
2 changes: 1 addition & 1 deletion agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package agent

import "github.com/odpf/meteor/recipe"

// TaskType is the type of a task
// TaskType is the type of task
type TaskType string

const (
Expand Down
6 changes: 4 additions & 2 deletions agent/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ func (s *stream) broadcast() error {
batch := newBatch(l.batchSize)
// listen to channel and emit data to subscriber callback if batch is full
for d := range l.channel {
batch.add(d)
if err := batch.add(d); err != nil {
s.closeWithError(err)
}
if batch.isFull() {
if err := l.callback(batch.flush()); err != nil {
s.closeWithError(err)
Expand All @@ -78,7 +80,7 @@ func (s *stream) broadcast() error {
}

// push() will run the record through all the registered middleware
// and emit the record to all registerd subscribers.
// and emit the record to all registered subscribers.
func (s *stream) push(data models.Record) {
data, err := s.runMiddlewares(data)
if err != nil {
Expand Down
25 changes: 10 additions & 15 deletions generator/recipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package generator

import (
"embed"
"github.com/pkg/errors"
"os"
"strings"
"text/template"
Expand Down Expand Up @@ -32,45 +33,39 @@ func Recipe(name string, source string, sinks []string, processors []string) (er

if source != "" {
tem.Source = make(map[string]string)
sinfo, err := registry.Extractors.Info(source)
sourceInfo, err := registry.Extractors.Info(source)
if err != nil {
return err
return errors.Wrap(err, "failed to provide extractor information")
}
tem.Source[source] = sinfo.SampleConfig
tem.Sinks[source] = sourceInfo.SampleConfig
}
if len(sinks) > 0 {
tem.Sinks = make(map[string]string)
for _, sink := range sinks {
info, err := registry.Sinks.Info(sink)
if err != nil {
return err
return errors.Wrap(err, "failed to provide sink information")
}
tem.Sinks[sink] = info.SampleConfig
}
}

if len(processors) > 0 {
tem.Processors = make(map[string]string)
for _, procc := range processors {
info, err := registry.Processors.Info(procc)
if err != nil {
return err
return errors.Wrap(err, "failed to provide processor information")
}
tem.Processors[procc] = info.SampleConfig
}
}

tmpl := template.Must(
template.New("recipe.yaml").Funcs(templateFuncs).ParseFS(file, "*"),
)

tmpl, err := template.ParseFS(file, "*")
if err != nil {
return err
return errors.Wrap(err, "failed to parse file")
}

err = tmpl.Execute(os.Stdout, tem)
if err != nil {
return err
if err = tmpl.Execute(os.Stdout, tem); err != nil {
return errors.Wrap(err, "failed to execute template")
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/mcuadros/go-defaults v1.2.0
github.com/mitchellh/mapstructure v1.4.1
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
github.com/muesli/reflow v0.3.0
github.com/muesli/reflow v0.3.0 // indirect
github.com/odpf/salt v0.0.0-20210919015538-3fd8ab22acea
github.com/opencontainers/runc v1.0.1 // indirect
github.com/ory/dockertest/v3 v3.7.0
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func main() {
command := cmd.New(lg, monitor, cfg)

if err := command.Execute(); err != nil {
if strings.HasPrefix(err.Error(), "unknown command ") {
if strings.HasPrefix(err.Error(), "unknown command") {
if !strings.HasSuffix(err.Error(), "\n") {
fmt.Println()
}
Expand Down
5 changes: 4 additions & 1 deletion metrics/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"fmt"
"github.com/pkg/errors"
"net"
"strconv"

Expand All @@ -16,7 +17,7 @@ var (
runMetricName = "run"
)

// StatsdMonitor reprsents the statsd monitor.
// StatsdMonitor represents the statsd monitor.
type StatsdMonitor struct {
client statsdClient
prefix string
Expand Down Expand Up @@ -72,10 +73,12 @@ type statsdClient interface {
func NewStatsdClient(statsdAddress string) (c *statsd.StatsdClient, err error) {
statsdHost, statsdPortStr, err := net.SplitHostPort(statsdAddress)
if err != nil {
err = errors.Wrap(err, "failed to split the network address")
return
}
statsdPort, err := strconv.Atoi(statsdPortStr)
if err != nil {
err = errors.Wrap(err, "failed to convert port type")
return
}
c = statsd.New(statsdHost, statsdPort)
Expand Down
1 change: 1 addition & 0 deletions models/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/odpf/meteor/models/odpf/assets/facets"
)

// Metadata is a wrapper for the meta
type Metadata interface {
GetResource() *common.Resource
GetProperties() *facets.Properties
Expand Down
3 changes: 3 additions & 0 deletions models/record.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package models

// Record represents the metadata of a record
type Record struct {
data Metadata
}

// NewRecord creates a new record
func NewRecord(data Metadata) Record {
return Record{data: data}
}

// Data returns the record data
func (r Record) Data() Metadata {
return r.data
}
4 changes: 3 additions & 1 deletion plugins/external/plugin.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package plugins

import (
"errors"
"github.com/pkg/errors"
"os/exec"

"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -48,11 +48,13 @@ func dispense(client *plugin.Client) (processor Processor, err error) {
// Connect via RPC
rpcClient, err := client.Client()
if err != nil {
err = errors.Wrap(err, "failed to connect client")
return
}
// Request the plugin
raw, err := rpcClient.Dispense(processorPluginKey)
if err != nil {
err = errors.Wrap(err, "failed to dispense a new instance of the plugin")
return
}

Expand Down
10 changes: 5 additions & 5 deletions plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (e *Extractor) Validate(configMap map[string]interface{}) (err error) {
return utils.BuildConfig(configMap, &Config{})
}

// Extract checks if the table is valid and extracts the table schema
// Init initializes the extractor
func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) (err error) {
err = utils.BuildConfig(configMap, &e.config)
if err != nil {
Expand All @@ -90,7 +90,7 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{})

e.client, err = e.createClient(ctx)
if err != nil {
return errors.Wrap(err, "error creating client")
return errors.Wrap(err, "failed to create client")
}

return
Expand All @@ -106,7 +106,7 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error)
break
}
if err != nil {
return errors.Wrapf(err, "failed to fetch dataset")
return errors.Wrap(err, "failed to fetch dataset")
}
e.extractTable(ctx, ds, emit)
}
Expand All @@ -133,13 +133,13 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit
break
}
if err != nil {
e.logger.Error("failed to scan, skipping table", "err", err)
e.logger.Error("failed to get table, skipping table", "err", err)
continue
}
e.logger.Debug("extracting table", "table", table.FullyQualifiedName())
tmd, err := table.Metadata(ctx)
if err != nil {
e.logger.Error("failed to fetch table's metadata, skipping table", "err", err, "table", table.FullyQualifiedName())
e.logger.Error("failed to fetch table metadata", "err", err, "table", table.FullyQualifiedName())
continue
}

Expand Down
Loading