Skip to content
This repository has been archived by the owner on Aug 16, 2022. It is now read-only.

Commit

Permalink
bugfix: propogate errors from goroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
yevgenypats committed Feb 28, 2021
1 parent a6c4e6f commit d8677ae
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 13 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ require (
github.com/gocarina/gocsv v0.0.0-20201208093247-67c824bc04d4
github.com/mitchellh/mapstructure v1.3.3
go.uber.org/zap v1.16.0
golang.org/x/sync v0.0.0-20190423024810-112230192c58
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
32 changes: 19 additions & 13 deletions provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (
"github.com/cloudquery/cq-provider-aws/redshift"
"github.com/cloudquery/cq-provider-aws/s3"
"github.com/cloudquery/cq-provider-aws/sns"
"golang.org/x/sync/errgroup"

"github.com/cloudquery/cloudquery/cqlog"
"github.com/cloudquery/cloudquery/sdk"
"github.com/cloudquery/cq-provider-aws/autoscaling"
"github.com/cloudquery/cq-provider-aws/cloudtrail"
"log"
"strings"
"sync"

Expand Down Expand Up @@ -285,12 +285,18 @@ func (p *Provider) Fetch(data []byte) error {
}

p.initRegionalClients()
var wg sync.WaitGroup
g := errgroup.Group{}
for _, resource := range p.config.Resources {
wg.Add(1)
go p.collectResource(&wg, resource.Name, resource.Other)
resourceName := resource.Name
resourceConfig := resource.Other
g.Go(func() error {
return p.collectResource(resourceName, resourceConfig)
})
}
wg.Wait()
if err := g.Wait(); err != nil {
return err
}

}
globalCollectedResources = map[string]bool{}
p.resourceClients = map[string]resource.ClientInterface{}
Expand All @@ -309,11 +315,10 @@ func (p *Provider) initRegionalClients() {

var lock = sync.RWMutex{}

func (p *Provider) collectResource(wg *sync.WaitGroup, fullResourceName string, config interface{}) {
defer wg.Done()
func (p *Provider) collectResource(fullResourceName string, config interface{}) error {
resourcePath := strings.Split(fullResourceName, ".")
if len(resourcePath) != 2 {
log.Fatalf("resource %s should be in format {service}.{resource}", fullResourceName)
return fmt.Errorf("resource %s should be in format {service}.{resource}", fullResourceName)
}
service := resourcePath[0]
resourceName := resourcePath[1]
Expand All @@ -322,7 +327,7 @@ func (p *Provider) collectResource(wg *sync.WaitGroup, fullResourceName string,
lock.Lock()
if globalCollectedResources[fullResourceName] {
lock.Unlock()
return
return nil
}
globalCollectedResources[fullResourceName] = true
if p.resourceClients[service] == nil {
Expand All @@ -334,7 +339,7 @@ func (p *Provider) collectResource(wg *sync.WaitGroup, fullResourceName string,
}

if p.resourceClients[service] == nil {
log.Fatalf("unsupported service %s for resource %s", service, resourceName)
return fmt.Errorf("unsupported service %s for resource %s", service, resourceName)
}

err := p.resourceClients[service].CollectResource(resourceName, config)
Expand All @@ -344,14 +349,15 @@ func (p *Provider) collectResource(wg *sync.WaitGroup, fullResourceName string,
switch ae.ErrorCode() {
case "AccessDenied", "AccessDeniedException", "UnauthorizedOperation":
p.log.Info("Skipping resource. Access denied", zap.String("account_id", p.accountID), zap.String("region", p.region), zap.String("resource", fullResourceName), zap.Error(err))
return
return nil
case "OptInRequired", "SubscriptionRequiredException":
p.log.Info("Skipping resource. Service disabled", zap.String("account_id", p.accountID), zap.String("region", p.region), zap.String("resource", fullResourceName), zap.Error(err))
return
return nil
}
}
log.Fatal(err)
return err
}
return nil
}

func main() {
Expand Down

0 comments on commit d8677ae

Please sign in to comment.