diff --git a/go.mod b/go.mod index 33536d4b1..02c020a1e 100644 --- a/go.mod +++ b/go.mod @@ -32,5 +32,6 @@ require ( github.com/gocarina/gocsv v0.0.0-20201208093247-67c824bc04d4 github.com/mitchellh/mapstructure v1.3.3 go.uber.org/zap v1.16.0 + golang.org/x/sync v0.0.0-20190423024810-112230192c58 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ) diff --git a/go.sum b/go.sum index cc61c35f7..9a2ff4962 100644 --- a/go.sum +++ b/go.sum @@ -490,6 +490,7 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/provider.go b/provider.go index 0232d22ef..93412c5cf 100644 --- a/provider.go +++ b/provider.go @@ -26,12 +26,12 @@ import ( "github.com/cloudquery/cq-provider-aws/redshift" "github.com/cloudquery/cq-provider-aws/s3" "github.com/cloudquery/cq-provider-aws/sns" + "golang.org/x/sync/errgroup" "github.com/cloudquery/cloudquery/cqlog" "github.com/cloudquery/cloudquery/sdk" "github.com/cloudquery/cq-provider-aws/autoscaling" "github.com/cloudquery/cq-provider-aws/cloudtrail" - "log" "strings" "sync" @@ -285,12 +285,18 @@ func (p *Provider) Fetch(data []byte) error { } p.initRegionalClients() - var wg sync.WaitGroup + g := errgroup.Group{} for _, resource := range p.config.Resources { - wg.Add(1) - go p.collectResource(&wg, resource.Name, resource.Other) + resourceName := resource.Name + resourceConfig := resource.Other + g.Go(func() error { + return p.collectResource(resourceName, resourceConfig) + }) } - wg.Wait() + if err := g.Wait(); err != nil { + return err + } + } globalCollectedResources = map[string]bool{} p.resourceClients = map[string]resource.ClientInterface{} @@ -309,11 +315,10 @@ func (p *Provider) initRegionalClients() { var lock = sync.RWMutex{} -func (p *Provider) collectResource(wg *sync.WaitGroup, fullResourceName string, config interface{}) { - defer wg.Done() +func (p *Provider) collectResource(fullResourceName string, config interface{}) error { resourcePath := strings.Split(fullResourceName, ".") if len(resourcePath) != 2 { - log.Fatalf("resource %s should be in format {service}.{resource}", fullResourceName) + return fmt.Errorf("resource %s should be in format {service}.{resource}", fullResourceName) } service := resourcePath[0] resourceName := resourcePath[1] @@ -322,7 +327,7 @@ func (p *Provider) collectResource(wg *sync.WaitGroup, fullResourceName string, lock.Lock() if globalCollectedResources[fullResourceName] { lock.Unlock() - return + return nil } globalCollectedResources[fullResourceName] = true if p.resourceClients[service] == nil { @@ -334,7 +339,7 @@ func (p *Provider) collectResource(wg *sync.WaitGroup, fullResourceName string, } if p.resourceClients[service] == nil { - log.Fatalf("unsupported service %s for resource %s", service, resourceName) + return fmt.Errorf("unsupported service %s for resource %s", service, resourceName) } err := p.resourceClients[service].CollectResource(resourceName, config) @@ -344,14 +349,15 @@ func (p *Provider) collectResource(wg *sync.WaitGroup, fullResourceName string, switch ae.ErrorCode() { case "AccessDenied", "AccessDeniedException", "UnauthorizedOperation": p.log.Info("Skipping resource. Access denied", zap.String("account_id", p.accountID), zap.String("region", p.region), zap.String("resource", fullResourceName), zap.Error(err)) - return + return nil case "OptInRequired", "SubscriptionRequiredException": p.log.Info("Skipping resource. Service disabled", zap.String("account_id", p.accountID), zap.String("region", p.region), zap.String("resource", fullResourceName), zap.Error(err)) - return + return nil } } - log.Fatal(err) + return err } + return nil } func main() {