diff --git a/cmd/helm-operator/main.go b/cmd/helm-operator/main.go index f2f80729be..af2e22628a 100644 --- a/cmd/helm-operator/main.go +++ b/cmd/helm-operator/main.go @@ -11,6 +11,7 @@ import ( "github.com/go-kit/kit/log" "github.com/spf13/pflag" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/workqueue" "k8s.io/klog" @@ -166,19 +167,31 @@ func main() { TLSHostname: *tillerTLSHostname, }) - // The status updater, to keep track the release status for each - // HelmRelease. It runs as a separate loop for now. - statusUpdater := status.New(ifClient, kubeClient, helmClient, *namespace) - go statusUpdater.Loop(shutdown, log.With(logger, "component", "annotator")) + checkpoint.CheckForUpdates(product, version, nil, log.With(logger, "component", "checkpoint")) + // setup shared informer for HelmReleases nsOpt := ifinformers.WithNamespace(*namespace) ifInformerFactory := ifinformers.NewSharedInformerFactoryWithOptions(ifClient, *chartsSyncInterval, nsOpt) fhrInformer := ifInformerFactory.Flux().V1beta1().HelmReleases() go ifInformerFactory.Start(shutdown) + // wait for the caches to be synced before starting workers + mainLogger.Log("info", "waiting for informer caches to sync") + if ok := cache.WaitForCacheSync(shutdown, fhrInformer.Informer().HasSynced); !ok { + mainLogger.Log("error", "failed to wait for caches to sync") + os.Exit(1) + } + mainLogger.Log("info", "informer caches synced") + + // the status updater, to keep track of the release status for + // every HelmRelease + statusUpdater := status.New(ifClient, kubeClient, helmClient, *namespace) + go statusUpdater.Loop(shutdown, log.With(logger, "component", "status")) + queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ChartRelease") - // release instance is needed during the sync of git chart changes and during the sync of HelmRelease changes + // release instance is needed during the sync of git chart changes + // and during the sync of HelmRelease changes rel := release.New(log.With(logger, "component", "release"), helmClient) chartSync := chartsync.New( log.With(logger, "component", "chartsync"), @@ -192,17 +205,12 @@ func main() { // start FluxRelease informer opr := operator.New(log.With(logger, "component", "operator"), *logReleaseDiffs, kubeClient, fhrInformer, queue, chartSync) - checkpoint.CheckForUpdates(product, version, nil, log.With(logger, "component", "checkpoint")) // start HTTP server go daemonhttp.ListenAndServe(*listenAddr, chartSync, log.With(logger, "component", "daemonhttp"), shutdown) // start operator - go func() { - if err = opr.Run(1, shutdown, shutdownWg); err != nil { - errc <- fmt.Errorf(ErrOperatorFailure, err) - } - }() + go opr.Run(1, shutdown, shutdownWg) shutdownErr := <-errc logger.Log("exiting...", shutdownErr) diff --git a/integrations/helm/chartsync/chartsync.go b/integrations/helm/chartsync/chartsync.go index df1348eecb..6879a159ce 100644 --- a/integrations/helm/chartsync/chartsync.go +++ b/integrations/helm/chartsync/chartsync.go @@ -118,6 +118,7 @@ type ChartChangeSync struct { kubeClient kubernetes.Clientset ifClient ifclientset.Clientset fhrLister iflister.HelmReleaseLister + fhrSynced cache.InformerSynced release *release.Release releaseQueue ReleaseQueue config Config @@ -149,7 +150,7 @@ func New(logger log.Logger, clients Clients, release *release.Release, releaseQu // Helm releases in the cluster, what HelmRelease declare, and // changes in the git repos mentioned by any HelmRelease. func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *sync.WaitGroup) { - chs.logger.Log("info", "starting git chart sync loop") + chs.logger.Log("info", "starting git chart sync") wg.Add(1) go func() { diff --git a/integrations/helm/operator/operator.go b/integrations/helm/operator/operator.go index 9c3c300c3d..8070279c16 100644 --- a/integrations/helm/operator/operator.go +++ b/integrations/helm/operator/operator.go @@ -1,7 +1,6 @@ package operator import ( - "errors" "fmt" "sync" "time" @@ -50,7 +49,6 @@ type Controller struct { logDiffs bool fhrLister iflister.HelmReleaseLister - fhrSynced cache.InformerSynced sync *chartsync.ChartChangeSync @@ -86,7 +84,6 @@ func New( logger: logger, logDiffs: logReleaseDiffs, fhrLister: fhrInformer.Lister(), - fhrSynced: fhrInformer.Informer().HasSynced, releaseWorkqueue: releaseWorkqueue, recorder: recorder, sync: sync, @@ -121,18 +118,11 @@ func New( // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. -func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitGroup) error { +func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitGroup) { defer runtime.HandleCrash() defer c.releaseWorkqueue.ShutDown() c.logger.Log("info", "starting operator") - // Wait for the caches to be synced before starting workers - c.logger.Log("info", "waiting for informer caches to sync") - - if ok := cache.WaitForCacheSync(stopCh, c.fhrSynced); !ok { - return errors.New("failed to wait for caches to sync") - } - c.logger.Log("info", "unformer caches synced") c.logger.Log("info", "starting workers") for i := 0; i < threadiness; i++ { @@ -145,8 +135,6 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitG wg.Done() } c.logger.Log("info", "stopping workers") - - return nil } // runWorker is a long-running function calling the