Skip to content
This repository was archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Verify informer caches have synced before booting
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddeco committed May 29, 2019
1 parent 8aa213f commit 886f598
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 25 deletions.
30 changes: 19 additions & 11 deletions cmd/helm-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/spf13/pflag"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
Expand Down Expand Up @@ -166,19 +167,31 @@ func main() {
TLSHostname: *tillerTLSHostname,
})

// The status updater, to keep track the release status for each
// HelmRelease. It runs as a separate loop for now.
statusUpdater := status.New(ifClient, kubeClient, helmClient, *namespace)
go statusUpdater.Loop(shutdown, log.With(logger, "component", "annotator"))
checkpoint.CheckForUpdates(product, version, nil, log.With(logger, "component", "checkpoint"))

// setup shared informer for HelmReleases
nsOpt := ifinformers.WithNamespace(*namespace)
ifInformerFactory := ifinformers.NewSharedInformerFactoryWithOptions(ifClient, *chartsSyncInterval, nsOpt)
fhrInformer := ifInformerFactory.Flux().V1beta1().HelmReleases()
go ifInformerFactory.Start(shutdown)

// wait for the caches to be synced before starting workers
mainLogger.Log("info", "waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(shutdown, fhrInformer.Informer().HasSynced); !ok {
mainLogger.Log("error", "failed to wait for caches to sync")
os.Exit(1)
}
mainLogger.Log("info", "informer caches synced")

// the status updater, to keep track of the release status for
// every HelmRelease
statusUpdater := status.New(ifClient, kubeClient, helmClient, *namespace)
go statusUpdater.Loop(shutdown, log.With(logger, "component", "status"))

queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ChartRelease")

// release instance is needed during the sync of git chart changes and during the sync of HelmRelease changes
// release instance is needed during the sync of git chart changes
// and during the sync of HelmRelease changes
rel := release.New(log.With(logger, "component", "release"), helmClient)
chartSync := chartsync.New(
log.With(logger, "component", "chartsync"),
Expand All @@ -192,17 +205,12 @@ func main() {

// start FluxRelease informer
opr := operator.New(log.With(logger, "component", "operator"), *logReleaseDiffs, kubeClient, fhrInformer, queue, chartSync)
checkpoint.CheckForUpdates(product, version, nil, log.With(logger, "component", "checkpoint"))

// start HTTP server
go daemonhttp.ListenAndServe(*listenAddr, chartSync, log.With(logger, "component", "daemonhttp"), shutdown)

// start operator
go func() {
if err = opr.Run(1, shutdown, shutdownWg); err != nil {
errc <- fmt.Errorf(ErrOperatorFailure, err)
}
}()
go opr.Run(1, shutdown, shutdownWg)

shutdownErr := <-errc
logger.Log("exiting...", shutdownErr)
Expand Down
3 changes: 2 additions & 1 deletion integrations/helm/chartsync/chartsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type ChartChangeSync struct {
kubeClient kubernetes.Clientset
ifClient ifclientset.Clientset
fhrLister iflister.HelmReleaseLister
fhrSynced cache.InformerSynced
release *release.Release
releaseQueue ReleaseQueue
config Config
Expand Down Expand Up @@ -149,7 +150,7 @@ func New(logger log.Logger, clients Clients, release *release.Release, releaseQu
// Helm releases in the cluster, what HelmRelease declare, and
// changes in the git repos mentioned by any HelmRelease.
func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *sync.WaitGroup) {
chs.logger.Log("info", "starting git chart sync loop")
chs.logger.Log("info", "starting git chart sync")

wg.Add(1)
go func() {
Expand Down
14 changes: 1 addition & 13 deletions integrations/helm/operator/operator.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package operator

import (
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -50,7 +49,6 @@ type Controller struct {
logDiffs bool

fhrLister iflister.HelmReleaseLister
fhrSynced cache.InformerSynced

sync *chartsync.ChartChangeSync

Expand Down Expand Up @@ -86,7 +84,6 @@ func New(
logger: logger,
logDiffs: logReleaseDiffs,
fhrLister: fhrInformer.Lister(),
fhrSynced: fhrInformer.Informer().HasSynced,
releaseWorkqueue: releaseWorkqueue,
recorder: recorder,
sync: sync,
Expand Down Expand Up @@ -121,18 +118,11 @@ func New(
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitGroup) error {
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitGroup) {
defer runtime.HandleCrash()
defer c.releaseWorkqueue.ShutDown()

c.logger.Log("info", "starting operator")
// Wait for the caches to be synced before starting workers
c.logger.Log("info", "waiting for informer caches to sync")

if ok := cache.WaitForCacheSync(stopCh, c.fhrSynced); !ok {
return errors.New("failed to wait for caches to sync")
}
c.logger.Log("info", "unformer caches synced")

c.logger.Log("info", "starting workers")
for i := 0; i < threadiness; i++ {
Expand All @@ -145,8 +135,6 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}, wg *sync.WaitG
wg.Done()
}
c.logger.Log("info", "stopping workers")

return nil
}

// runWorker is a long-running function calling the
Expand Down

0 comments on commit 886f598

Please sign in to comment.