Skip to content

Commit

Permalink
fix: error formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
scortier committed Oct 4, 2021
1 parent c16b018 commit 3dee4f1
Show file tree
Hide file tree
Showing 27 changed files with 231 additions and 197 deletions.
50 changes: 18 additions & 32 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ import (
"github.com/pkg/errors"
)

const (
defaultBatchSize = 1
)
const defaultBatchSize = 1

// Agent runs recipes for specified plugins.
type Agent struct {
Expand Down Expand Up @@ -110,20 +108,20 @@ func (r *Agent) Run(recipe recipe.Recipe) (run Run) {
)
runExtractor, err := r.setupExtractor(ctx, recipe.Source, stream)
if err != nil {
run.Error = err
run.Error = errors.Wrap(err, "failed to setup extractor")
return
}

for _, pr := range recipe.Processors {
if err := r.setupProcessor(ctx, pr, stream); err != nil {
run.Error = err
run.Error = errors.Wrap(err, "failed to setup processor")
return
}
}

for _, sr := range recipe.Sinks {
if err := r.setupSink(ctx, sr, stream); err != nil {
run.Error = err
run.Error = errors.Wrap(err, "failed to setup sink")
return
}
}
Expand All @@ -145,14 +143,14 @@ func (r *Agent) Run(recipe recipe.Recipe) (run Run) {
}()
err = runExtractor()
if err != nil {
run.Error = err
run.Error = errors.Wrap(err, "failed to run extractor")
}
}()

// start listening.
// this process is blocking
if err := stream.broadcast(); err != nil {
run.Error = err
run.Error = errors.Wrap(err, "failed to stream")
}

// code will reach here stream.Listen() is done.
Expand All @@ -178,15 +176,13 @@ func (r *Agent) setupExtractor(ctx context.Context, sr recipe.SourceRecipe, str
err = errors.Wrapf(err, "could not find extractor \"%s\"", sr.Type)
return
}
err = extractor.Init(ctx, sr.Config)
if err != nil {
if err = extractor.Init(ctx, sr.Config); err != nil {
err = errors.Wrapf(err, "could not initiate extractor \"%s\"", sr.Type)
return
}

runFn = func() (err error) {
err = extractor.Extract(ctx, str.push)
if err != nil {
if err = extractor.Extract(ctx, str.push); err != nil {
err = errors.Wrapf(err, "error running extractor \"%s\"", sr.Type)
}

Expand All @@ -197,15 +193,11 @@ func (r *Agent) setupExtractor(ctx context.Context, sr recipe.SourceRecipe, str

func (r *Agent) setupProcessor(ctx context.Context, pr recipe.ProcessorRecipe, str *stream) (err error) {
var proc plugins.Processor
proc, err = r.processorFactory.Get(pr.Name)
if err != nil {
err = errors.Wrapf(err, "could not find processor \"%s\"", pr.Name)
return
if proc, err = r.processorFactory.Get(pr.Name); err != nil {
return errors.Wrapf(err, "could not find processor \"%s\"", pr.Name)
}
err = proc.Init(ctx, pr.Config)
if err != nil {
err = errors.Wrapf(err, "could not initiate processor \"%s\"", pr.Name)
return
if err = proc.Init(ctx, pr.Config); err != nil {
return errors.Wrapf(err, "could not initiate processor \"%s\"", pr.Name)
}

str.setMiddleware(func(src models.Record) (dst models.Record, err error) {
Expand All @@ -223,22 +215,16 @@ func (r *Agent) setupProcessor(ctx context.Context, pr recipe.ProcessorRecipe, s

func (r *Agent) setupSink(ctx context.Context, sr recipe.SinkRecipe, stream *stream) (err error) {
var sink plugins.Syncer
sink, err = r.sinkFactory.Get(sr.Name)
if err != nil {
err = errors.Wrapf(err, "could not find sink \"%s\"", sr.Name)
return
if sink, err = r.sinkFactory.Get(sr.Name); err != nil {
return errors.Wrapf(err, "could not find sink \"%s\"", sr.Name)
}
err = sink.Init(ctx, sr.Config)
if err != nil {
err = errors.Wrapf(err, "could not initiate sink \"%s\"", sr.Name)
return
if err = sink.Init(ctx, sr.Config); err != nil {
return errors.Wrapf(err, "could not initiate sink \"%s\"", sr.Name)
}

stream.subscribe(func(records []models.Record) (err error) {
err = sink.Sink(ctx, records)
if err != nil {
err = errors.Wrapf(err, "error running sink \"%s\"", sr.Name)
return
if err = sink.Sink(ctx, records); err != nil {
return errors.Wrapf(err, "error running sink \"%s\"", sr.Name)
}

return
Expand Down
8 changes: 7 additions & 1 deletion agent/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,38 @@ import (
"github.com/odpf/meteor/models"
)

// batch
type batch struct {
data []models.Record
capacity int
}

// newBatch returns a new batch
func newBatch(capacity int) *batch {
return &batch{
capacity: capacity,
}
}

// add appends a record to the batch
func (b *batch) add(d models.Record) error {
if b.isFull() {
return errors.New("batch: cannot add, batch is full!")
return errors.New("batch: cannot add, batch is full")
}

b.data = append(b.data, d)
return nil
}

// flush removes all records from the batch
func (b *batch) flush() []models.Record {
data := b.data
b.data = []models.Record{}

return data
}

// isFull returns true if the batch is full
func (b *batch) isFull() bool {
// size 0 means there is no limit, hence will not ever be full
if b.capacity == 0 {
Expand All @@ -42,6 +47,7 @@ func (b *batch) isFull() bool {
return len(b.data) >= b.capacity
}

// isEmpty returns true if the batch is empty
func (b *batch) isEmpty() bool {
return len(b.data) == 0
}
2 changes: 1 addition & 1 deletion agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package agent

import "github.com/odpf/meteor/recipe"

// TaskType is the type of a task
// TaskType is the type of task
type TaskType string

const (
Expand Down
6 changes: 4 additions & 2 deletions agent/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ func (s *stream) broadcast() error {
batch := newBatch(l.batchSize)
// listen to channel and emit data to subscriber callback if batch is full
for d := range l.channel {
batch.add(d)
if err := batch.add(d); err != nil {
s.closeWithError(err)
}
if batch.isFull() {
if err := l.callback(batch.flush()); err != nil {
s.closeWithError(err)
Expand All @@ -78,7 +80,7 @@ func (s *stream) broadcast() error {
}

// push() will run the record through all the registered middleware
// and emit the record to all registerd subscribers.
// and emit the record to all registered subscribers.
func (s *stream) push(data models.Record) {
data, err := s.runMiddlewares(data)
if err != nil {
Expand Down
25 changes: 10 additions & 15 deletions generator/recipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package generator

import (
"embed"
"github.com/pkg/errors"
"os"
"strings"
"text/template"
Expand Down Expand Up @@ -32,45 +33,39 @@ func Recipe(name string, source string, sinks []string, processors []string) (er

if source != "" {
tem.Source = make(map[string]string)
sinfo, err := registry.Extractors.Info(source)
sourceInfo, err := registry.Extractors.Info(source)
if err != nil {
return err
return errors.Wrap(err, "failed to provide extractor information")
}
tem.Source[source] = sinfo.SampleConfig
tem.Source[source] = indent.String(sourceInfo.SampleConfig, size)
}
if len(sinks) > 0 {
tem.Sinks = make(map[string]string)
for _, sink := range sinks {
info, err := registry.Sinks.Info(sink)
if err != nil {
return err
return errors.Wrap(err, "failed to provide sink information")
}
tem.Sinks[sink] = info.SampleConfig
}
}

if len(processors) > 0 {
tem.Processors = make(map[string]string)
for _, procc := range processors {
info, err := registry.Processors.Info(procc)
if err != nil {
return err
return errors.Wrap(err, "failed to provide processor information")
}
tem.Processors[procc] = info.SampleConfig
}
}

tmpl := template.Must(
template.New("recipe.yaml").Funcs(templateFuncs).ParseFS(file, "*"),
)

tmpl, err := template.ParseFS(file, "*")
if err != nil {
return err
return errors.Wrap(err, "failed to parse file")
}

err = tmpl.Execute(os.Stdout, tem)
if err != nil {
return err
if err = tmpl.Execute(os.Stdout, tem); err != nil {
return errors.Wrap(err, "failed to execute template")
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func main() {
command := cmd.New(lg, monitor)

if err := command.Execute(); err != nil {
if strings.HasPrefix(err.Error(), "unknown command ") {
if strings.HasPrefix(err.Error(), "unknown command") {
if !strings.HasSuffix(err.Error(), "\n") {
fmt.Println()
}
Expand Down
5 changes: 4 additions & 1 deletion metrics/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"fmt"
"github.com/pkg/errors"
"net"
"strconv"

Expand All @@ -16,7 +17,7 @@ var (
runMetricName = "run"
)

// StatsdMonitor reprsents the statsd monitor.
// StatsdMonitor represents the statsd monitor.
type StatsdMonitor struct {
client statsdClient
prefix string
Expand Down Expand Up @@ -72,10 +73,12 @@ type statsdClient interface {
func NewStatsdClient(statsdAddress string) (c *statsd.StatsdClient, err error) {
statsdHost, statsdPortStr, err := net.SplitHostPort(statsdAddress)
if err != nil {
err = errors.Wrap(err, "failed to split the network address")
return
}
statsdPort, err := strconv.Atoi(statsdPortStr)
if err != nil {
err = errors.Wrap(err, "failed to convert port type")
return
}
c = statsd.New(statsdHost, statsdPort)
Expand Down
1 change: 1 addition & 0 deletions models/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/odpf/meteor/models/odpf/assets/facets"
)

// Metadata is a wrapper for the meta
type Metadata interface {
GetResource() *common.Resource
GetProperties() *facets.Properties
Expand Down
3 changes: 3 additions & 0 deletions models/record.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package models

// Record represents the metadata of a record
type Record struct {
data Metadata
}

// NewRecord creates a new record
func NewRecord(data Metadata) Record {
return Record{data: data}
}

// Data returns the record data
func (r Record) Data() Metadata {
return r.data
}
4 changes: 3 additions & 1 deletion plugins/external/plugin.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package plugins

import (
"errors"
"github.com/pkg/errors"
"os/exec"

"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -48,11 +48,13 @@ func dispense(client *plugin.Client) (processor Processor, err error) {
// Connect via RPC
rpcClient, err := client.Client()
if err != nil {
err = errors.Wrap(err, "failed to connect client")
return
}
// Request the plugin
raw, err := rpcClient.Dispense(processorPluginKey)
if err != nil {
err = errors.Wrap(err, "failed to dispense a new instance of the plugin")
return
}

Expand Down
10 changes: 5 additions & 5 deletions plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (e *Extractor) Validate(configMap map[string]interface{}) (err error) {
return utils.BuildConfig(configMap, &Config{})
}

// Extract checks if the table is valid and extracts the table schema
// Init initializes the extractor
func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{}) (err error) {
err = utils.BuildConfig(configMap, &e.config)
if err != nil {
Expand All @@ -90,7 +90,7 @@ func (e *Extractor) Init(ctx context.Context, configMap map[string]interface{})

e.client, err = e.createClient(ctx)
if err != nil {
return errors.Wrap(err, "error creating client")
return errors.Wrap(err, "failed to create client")
}

return
Expand All @@ -106,7 +106,7 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error)
break
}
if err != nil {
return errors.Wrapf(err, "failed to fetch dataset")
return errors.Wrap(err, "failed to fetch dataset")
}
e.extractTable(ctx, ds, emit)
}
Expand All @@ -133,13 +133,13 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit
break
}
if err != nil {
e.logger.Error("failed to scan, skipping table", "err", err)
e.logger.Error("failed to get table, skipping table", "err", err)
continue
}
e.logger.Debug("extracting table", "table", table.FullyQualifiedName())
tmd, err := table.Metadata(ctx)
if err != nil {
e.logger.Error("failed to fetch table's metadata, skipping table", "err", err, "table", table.FullyQualifiedName())
e.logger.Error("failed to fetch table metadata", "err", err, "table", table.FullyQualifiedName())
continue
}

Expand Down
Loading

0 comments on commit 3dee4f1

Please sign in to comment.