Skip to content

Commit

Permalink
Merge pull request #153 from makkes/concurrent-reconciles
Browse files Browse the repository at this point in the history
Make concurrent reconciliation configurable
  • Loading branch information
stefanprodan authored Jun 24, 2021
2 parents 6185181 + 03043e5 commit d41b3f1
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 6 deletions.
10 changes: 9 additions & 1 deletion controllers/imagepolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/client-go/tools/reference"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
Expand Down Expand Up @@ -57,6 +58,10 @@ type ImagePolicyReconciler struct {
Database DatabaseReader
}

type ImagePolicyReconcilerOptions struct {
MaxConcurrentReconciles int
}

// +kubebuilder:rbac:groups=image.toolkit.fluxcd.io,resources=imagepolicies,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=image.toolkit.fluxcd.io,resources=imagepolicies/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=image.toolkit.fluxcd.io,resources=imagerepositories,verbs=get;list;watch
Expand Down Expand Up @@ -191,7 +196,7 @@ func (r *ImagePolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

func (r *ImagePolicyReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *ImagePolicyReconciler) SetupWithManager(mgr ctrl.Manager, opts ImagePolicyReconcilerOptions) error {
// index the policies by which image repo they point at, so that
// it's easy to list those out when an image repo changes.
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &imagev1.ImagePolicy{}, imageRepoKey, func(obj client.Object) []string {
Expand All @@ -207,6 +212,9 @@ func (r *ImagePolicyReconciler) SetupWithManager(mgr ctrl.Manager) error {
&source.Kind{Type: &imagev1.ImageRepository{}},
handler.EnqueueRequestsFromMapFunc(r.imagePoliciesForRepository),
).
WithOptions(controller.Options{
MaxConcurrentReconciles: opts.MaxConcurrentReconciles,
}).
Complete(r)
}

Expand Down
10 changes: 9 additions & 1 deletion controllers/imagerepository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/client-go/tools/reference"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/fluxcd/pkg/apis/meta"
Expand Down Expand Up @@ -75,6 +76,10 @@ type ImageRepositoryReconciler struct {
}
}

type ImageRepositoryReconcilerOptions struct {
MaxConcurrentReconciles int
}

type dockerConfig struct {
Auths map[string]authn.AuthConfig
}
Expand Down Expand Up @@ -352,10 +357,13 @@ func (r *ImageRepositoryReconciler) shouldScan(repo imagev1.ImageRepository, now
return false, when, nil
}

func (r *ImageRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *ImageRepositoryReconciler) SetupWithManager(mgr ctrl.Manager, opts ImageRepositoryReconcilerOptions) error {
return ctrl.NewControllerManagedBy(mgr).
For(&imagev1.ImageRepository{}).
WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{})).
WithOptions(controller.Options{
MaxConcurrentReconciles: opts.MaxConcurrentReconciles,
}).
Complete(r)
}

Expand Down
4 changes: 2 additions & 2 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,14 @@ var _ = BeforeSuite(func(done Done) {
Scheme: scheme.Scheme,
Database: database.NewBadgerDatabase(badgerDB),
}
Expect(imageRepoReconciler.SetupWithManager(k8sMgr)).To(Succeed())
Expect(imageRepoReconciler.SetupWithManager(k8sMgr, ImageRepositoryReconcilerOptions{})).To(Succeed())

imagePolicyReconciler = &ImagePolicyReconciler{
Client: k8sMgr.GetClient(),
Scheme: scheme.Scheme,
Database: database.NewBadgerDatabase(badgerDB),
}
Expect(imagePolicyReconciler.SetupWithManager(k8sMgr)).To(Succeed())
Expect(imagePolicyReconciler.SetupWithManager(k8sMgr, ImagePolicyReconcilerOptions{})).To(Succeed())

// this must be started for the caches to be running, and thereby
// for the client to be usable.
Expand Down
10 changes: 8 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func main() {
watchAllNamespaces bool
storagePath string
storageValueLogFileSize int64
concurrent int
)

flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
Expand All @@ -77,6 +78,7 @@ func main() {
"Watch for custom resources in all namespaces, if set to false it will only watch the runtime namespace.")
flag.StringVar(&storagePath, "storage-path", "/data", "Where to store the persistent database of image metadata")
flag.Int64Var(&storageValueLogFileSize, "storage-value-log-file-size", 1<<28, "Set the database's memory mapped value log file size in bytes. Effective memory usage is about two times this size.")
flag.IntVar(&concurrent, "concurrent", 4, "The number of concurrent resource reconciles.")
clientOptions.BindFlags(flag.CommandLine)
logOptions.BindFlags(flag.CommandLine)
leaderElectionOptions.BindFlags(flag.CommandLine)
Expand Down Expand Up @@ -142,7 +144,9 @@ func main() {
ExternalEventRecorder: eventRecorder,
MetricsRecorder: metricsRecorder,
Database: db,
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(mgr, controllers.ImageRepositoryReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", imagev1.ImageRepositoryKind)
os.Exit(1)
}
Expand All @@ -153,7 +157,9 @@ func main() {
ExternalEventRecorder: eventRecorder,
MetricsRecorder: metricsRecorder,
Database: db,
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(mgr, controllers.ImagePolicyReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", imagev1.ImagePolicyKind)
os.Exit(1)
}
Expand Down

0 comments on commit d41b3f1

Please sign in to comment.