Skip to content

Commit

Permalink
Add reconciler sharding capability based on label selector
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan Prodan <[email protected]>
  • Loading branch information
stefanprodan committed Mar 29, 2023
1 parent 0ba76c0 commit 74cadb4
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 8 deletions.
6 changes: 3 additions & 3 deletions internal/controllers/kustomization_transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,14 +456,14 @@ func TestKustomizationReconciler_FluxTransformers(t *testing.T) {
path: /metadata/labels/patch1
value: inline-json
`,
Target: kustomize.Selector{
Target: &kustomize.Selector{
LabelSelector: "app=podinfo",
},
},
{
Patch: `
apiVersion: v1
kind: Pod
apiVersion: apps/v1
kind: Deployment
metadata:
name: podinfo
labels:
Expand Down
28 changes: 23 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
ctrl "sigs.k8s.io/controller-runtime"
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/fluxcd/pkg/runtime/acl"
Expand Down Expand Up @@ -78,8 +79,8 @@ func main() {
logOptions logger.Options
leaderElectionOptions leaderelection.Options
rateLimiterOptions runtimeCtrl.RateLimiterOptions
watchOptions runtimeCtrl.WatchOptions
aclOptions acl.Options
watchAllNamespaces bool
noRemoteBases bool
httpRetry int
defaultServiceAccount string
Expand All @@ -91,8 +92,6 @@ func main() {
flag.StringVar(&healthAddr, "health-addr", ":9440", "The address the health endpoint binds to.")
flag.IntVar(&concurrent, "concurrent", 4, "The number of concurrent kustomize reconciles.")
flag.DurationVar(&requeueDependency, "requeue-dependency", 30*time.Second, "The interval at which failing dependencies are reevaluated.")
flag.BoolVar(&watchAllNamespaces, "watch-all-namespaces", true,
"Watch for custom resources in all namespaces, if set to false it will only watch the runtime namespace.")
flag.BoolVar(&noRemoteBases, "no-remote-bases", false,
"Disallow remote bases usage in Kustomize overlays. When this flag is enabled, all resources must refer to local files included in the source artifact.")
flag.IntVar(&httpRetry, "http-retry", 9, "The maximum number of retries when failing to fetch artifacts over HTTP.")
Expand All @@ -105,6 +104,7 @@ func main() {
kubeConfigOpts.BindFlags(flag.CommandLine)
rateLimiterOptions.BindFlags(flag.CommandLine)
featureGates.BindFlags(flag.CommandLine)
watchOptions.BindFlags(flag.CommandLine)

flag.Parse()

Expand All @@ -116,10 +116,16 @@ func main() {
}

watchNamespace := ""
if !watchAllNamespaces {
if !watchOptions.AllNamespaces {
watchNamespace = os.Getenv("RUNTIME_NAMESPACE")
}

watchSelector, err := runtimeCtrl.GetWatchSelector(watchOptions)
if err != nil {
setupLog.Error(err, "unable to configure watch label selector for manager")
os.Exit(1)
}

var disableCacheFor []ctrlclient.Object
shouldCache, err := features.Enabled(features.CacheSecretsAndConfigMaps)
if err != nil {
Expand All @@ -130,6 +136,11 @@ func main() {
disableCacheFor = append(disableCacheFor, &corev1.Secret{}, &corev1.ConfigMap{})
}

leaderElectionId := fmt.Sprintf("%s-%s", controllerName, "leader-election")
if watchOptions.LabelSelector != "" {
leaderElectionId = leaderelection.GenerateID(leaderElectionId, watchOptions.LabelSelector)
}

restConfig := runtimeClient.GetConfigOrDie(clientOptions)
mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
Expand All @@ -141,10 +152,15 @@ func main() {
LeaseDuration: &leaderElectionOptions.LeaseDuration,
RenewDeadline: &leaderElectionOptions.RenewDeadline,
RetryPeriod: &leaderElectionOptions.RetryPeriod,
LeaderElectionID: fmt.Sprintf("%s-leader-election", controllerName),
LeaderElectionID: leaderElectionId,
Namespace: watchNamespace,
Logger: ctrl.Log,
ClientDisableCacheFor: disableCacheFor,
NewCache: ctrlcache.BuilderWithOptions(ctrlcache.Options{
SelectorsByObject: ctrlcache.SelectorsByObject{
&kustomizev1.Kustomization{}: {Label: watchSelector},
},
}),
})
if err != nil {
setupLog.Error(err, "unable to start manager")
Expand All @@ -166,9 +182,11 @@ func main() {
pollingOpts := polling.Options{
CustomStatusReaders: []engine.StatusReader{jobStatusReader},
}

if ok, _ := features.Enabled(features.DisableStatusPollerCache); ok {
pollingOpts.ClusterReaderFactory = engine.ClusterReaderFactoryFunc(clusterreader.NewDirectClusterReader)
}

if err = (&controllers.KustomizationReconciler{
ControllerName: controllerName,
DefaultServiceAccount: defaultServiceAccount,
Expand Down

0 comments on commit 74cadb4

Please sign in to comment.