Skip to content

Commit

Permalink
Refactor field indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
afritzler committed Feb 15, 2022
1 parent d07e00e commit ddff8b6
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 65 deletions.
18 changes: 9 additions & 9 deletions controllers/storage/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package storage

import (
"context"
"k8s.io/apimachinery/pkg/util/sets"
"github.com/onmetal/onmetal-api/pkg/fieldindexer"
"path/filepath"
"testing"
"time"
Expand Down Expand Up @@ -108,25 +108,25 @@ func SetupTest(ctx context.Context) *corev1.Namespace {
})
Expect(err).ToNot(HaveOccurred())

var indexedFields = &sets.String{}
// index fields here
fieldIndexer := fieldindexer.NewIndexer(k8sManager)
Expect(fieldIndexer.IndexFieldForVolume()).ToNot(HaveOccurred())
Expect(fieldIndexer.IndexFieldForVolumeClaim()).ToNot(HaveOccurred())

// register reconciler here
Expect((&VolumeClaimScheduler{
Client: k8sManager.GetClient(),
EventRecorder: k8sManager.GetEventRecorderFor("volume-claim-scheduler"),
IndexedFields: indexedFields,
}).SetupWithManager(k8sManager)).To(Succeed())

Expect((&VolumeReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
IndexedFields: indexedFields,
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
}).SetupWithManager(k8sManager)).To(Succeed())

Expect((&VolumeClaimReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
IndexedFields: indexedFields,
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
}).SetupWithManager(k8sManager)).To(Succeed())

Expect((&VolumeScheduler{
Expand Down
20 changes: 3 additions & 17 deletions controllers/storage/volume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (
"fmt"
"github.com/go-logr/logr"
storagev1alpha1 "github.com/onmetal/onmetal-api/apis/storage/v1alpha1"
"github.com/onmetal/onmetal-api/pkg/fieldindexer"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -38,8 +38,7 @@ import (
// VolumeReconciler reconciles a Volume object
type VolumeReconciler struct {
client.Client
Scheme *runtime.Scheme
IndexedFields *sets.String
Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups=storage.onmetal.de,resources=volumes,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -122,19 +121,6 @@ func (r *VolumeReconciler) updateVolumePhase(ctx context.Context, log logr.Logge
// SetupWithManager sets up the controller with the Manager.
func (r *VolumeReconciler) SetupWithManager(mgr ctrl.Manager) error {
ctx := context.Background()
if !r.IndexedFields.Has(volumeSpecVolumeClaimNameRefField) {
if err := mgr.GetFieldIndexer().IndexField(ctx, &storagev1alpha1.Volume{},
volumeSpecVolumeClaimNameRefField, func(object client.Object) []string {
volume := object.(*storagev1alpha1.Volume)
if volume.Spec.ClaimRef.Name == "" {
return nil
}
return []string{volume.Spec.ClaimRef.Name}
}); err != nil {
return err
}
r.IndexedFields.Insert(volumeSpecVolumeClaimNameRefField)
}
return ctrl.NewControllerManagedBy(mgr).
Named("volume-controller").
For(&storagev1alpha1.Volume{}).
Expand All @@ -143,7 +129,7 @@ func (r *VolumeReconciler) SetupWithManager(mgr ctrl.Manager) error {
volumeClaim := object.(*storagev1alpha1.VolumeClaim)
volumes := &storagev1alpha1.VolumeList{}
if err := r.List(ctx, volumes, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(volumeSpecVolumeClaimNameRefField, volumeClaim.GetName()),
FieldSelector: fields.OneTermEqualSelector(fieldindexer.VolumeSpecVolumeClaimNameRefField, volumeClaim.GetName()),
Namespace: volumeClaim.GetNamespace(),
}); err != nil {
return []reconcile.Request{}
Expand Down
18 changes: 2 additions & 16 deletions controllers/storage/volumeclaim_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"github.com/go-logr/logr"
"github.com/onmetal/onmetal-api/pkg/fieldindexer"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -36,8 +37,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

const volumeSpecVolumeClaimNameRefField = ".spec.claimRef.name"

// VolumeClaimReconciler reconciles a VolumeClaim object
type VolumeClaimReconciler struct {
client.Client
Expand Down Expand Up @@ -125,19 +124,6 @@ func (r *VolumeClaimReconciler) updateVolumeClaimPhase(ctx context.Context, log
// SetupWithManager sets up the controller with the Manager.
func (r *VolumeClaimReconciler) SetupWithManager(mgr ctrl.Manager) error {
ctx := context.Background()
if !r.IndexedFields.Has(volumeClaimSpecVolumeRefNameField) {
if err := mgr.GetFieldIndexer().IndexField(ctx, &storagev1alpha1.VolumeClaim{},
volumeClaimSpecVolumeRefNameField, func(object client.Object) []string {
claim := object.(*storagev1alpha1.VolumeClaim)
if claim.Spec.VolumeRef.Name == "" {
return nil
}
return []string{claim.Spec.VolumeRef.Name}
}); err != nil {
return err
}
r.IndexedFields.Insert(volumeClaimSpecVolumeRefNameField)
}
return ctrl.NewControllerManagedBy(mgr).
Named("volumeclaim-controller").
For(&storagev1alpha1.VolumeClaim{}).
Expand All @@ -146,7 +132,7 @@ func (r *VolumeClaimReconciler) SetupWithManager(mgr ctrl.Manager) error {
volume := object.(*storagev1alpha1.Volume)
claims := &storagev1alpha1.VolumeClaimList{}
if err := r.List(ctx, claims, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(volumeClaimSpecVolumeRefNameField, volume.GetName()),
FieldSelector: fields.OneTermEqualSelector(fieldindexer.VolumeClaimSpecVolumeRefNameField, volume.GetName()),
Namespace: volume.GetNamespace(),
}); err != nil {
return []reconcile.Request{}
Expand Down
16 changes: 2 additions & 14 deletions controllers/storage/volumeclaim_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
"fmt"
"github.com/go-logr/logr"
storagev1alpha1 "github.com/onmetal/onmetal-api/apis/storage/v1alpha1"
"github.com/onmetal/onmetal-api/pkg/fieldindexer"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -34,12 +34,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
)

const volumeClaimSpecVolumeRefNameField = ".spec.volumeRef.name"

type VolumeClaimScheduler struct {
client.Client
record.EventRecorder
IndexedFields *sets.String
}

//+kubebuilder:rbac:groups=storage.onmetal.de,resources=volumeclaims,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -154,15 +151,6 @@ func (s *VolumeClaimScheduler) volumeSatisfiesClaim(volume *storagev1alpha1.Volu
func (s *VolumeClaimScheduler) SetupWithManager(mgr ctrl.Manager) error {
ctx := context.Background()
log := ctrl.Log.WithName("volume-claim-scheduler").WithName("setup")
if !s.IndexedFields.Has(volumeClaimSpecVolumeRefNameField) {
if err := mgr.GetFieldIndexer().IndexField(ctx, &storagev1alpha1.VolumeClaim{}, volumeClaimSpecVolumeRefNameField, func(object client.Object) []string {
volumeClaim := object.(*storagev1alpha1.VolumeClaim)
return []string{volumeClaim.Spec.VolumeRef.Name}
}); err != nil {
return err
}
s.IndexedFields.Insert(volumeClaimSpecVolumeRefNameField)
}
return ctrl.NewControllerManagedBy(mgr).
Named("volume-claim-scheduler").
For(&storagev1alpha1.VolumeClaim{}, builder.WithPredicates(predicate.NewPredicateFuncs(func(object client.Object) bool {
Expand Down Expand Up @@ -190,7 +178,7 @@ func (s *VolumeClaimScheduler) SetupWithManager(mgr ctrl.Manager) error {
}
volumeClaims := &storagev1alpha1.VolumeClaimList{}
if err := s.List(ctx, volumeClaims, client.InNamespace(volume.Namespace), client.MatchingFields{
volumeClaimSpecVolumeRefNameField: "",
fieldindexer.VolumeClaimSpecVolumeRefNameField: "",
}); err != nil {
log.Error(err, "could not list empty VolumeClaims", "Namespace", volume.Namespace)
return nil
Expand Down
29 changes: 20 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package main
import (
"flag"
"fmt"
"k8s.io/apimachinery/pkg/util/sets"
"github.com/onmetal/onmetal-api/pkg/fieldindexer"
"os"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
Expand Down Expand Up @@ -123,8 +123,22 @@ func main() {
os.Exit(1)
}

var indexedFields = &sets.String{}
// Index fields
fieldIndexer := fieldindexer.NewIndexer(mgr)
if controllers.Enabled(volumeController) || controllers.Enabled(volumeClaimScheduler) {
if err := fieldIndexer.IndexFieldForVolume(); err != nil {
setupLog.Error(err, "unable to create fieldindex", "IndexField", fieldindexer.VolumeClaimSpecVolumeRefNameField)
os.Exit(1)
}
}
if controllers.Enabled(volumeClaimController) {
if err := fieldIndexer.IndexFieldForVolumeClaim(); err != nil {
setupLog.Error(err, "unable to create fieldindex", "IndexField", fieldindexer.VolumeSpecVolumeClaimNameRefField)
os.Exit(1)
}
}

// Register controllers
if controllers.Enabled(machineClassController) {
if err = (&computecontrollers.MachineClassReconciler{
Client: mgr.GetClient(),
Expand Down Expand Up @@ -172,19 +186,17 @@ func main() {
}
if controllers.Enabled(volumeController) {
if err = (&storagecontrollers.VolumeReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
IndexedFields: indexedFields,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Volume")
os.Exit(1)
}
}
if controllers.Enabled(volumeClaimController) {
if err = (&storagecontrollers.VolumeClaimReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
IndexedFields: indexedFields,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "VolumeClaim")
os.Exit(1)
Expand All @@ -201,7 +213,6 @@ func main() {
if err = (&storagecontrollers.VolumeClaimScheduler{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor("volume-claim-scheduler"),
IndexedFields: indexedFields,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "VolumeClaimScheduler")
}
Expand Down
65 changes: 65 additions & 0 deletions pkg/fieldindexer/fieldindexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2022 by the OnMetal authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fieldindexer

import (
"context"
storagev1alpha1 "github.com/onmetal/onmetal-api/apis/storage/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

const (
VolumeSpecVolumeClaimNameRefField = ".spec.claimRef.name"
VolumeClaimSpecVolumeRefNameField = ".spec.volumeRef.name"
)

type FieldIndexer struct {
manager.Manager
}

func NewIndexer(mgr manager.Manager) *FieldIndexer {
return &FieldIndexer{mgr}
}

func (i *FieldIndexer) IndexFieldForVolumeClaim() error {
return i.indexField(&storagev1alpha1.VolumeClaim{}, VolumeClaimSpecVolumeRefNameField, func(object client.Object) []string {
claim := object.(*storagev1alpha1.VolumeClaim)
if claim.Spec.VolumeRef.Name == "" {
return nil
}
return []string{claim.Spec.VolumeRef.Name}
})
}

func (i *FieldIndexer) IndexFieldForVolume() error {
return i.indexField(&storagev1alpha1.Volume{}, VolumeSpecVolumeClaimNameRefField, func(object client.Object) []string {
volume := object.(*storagev1alpha1.Volume)
if volume.Spec.ClaimRef.Name == "" {
return nil
}
return []string{volume.Spec.ClaimRef.Name}
})
}

func (i *FieldIndexer) indexField(object client.Object, field string, indexFunc func(object client.Object) []string) error {
ctx := context.Background()
if err := i.Manager.GetFieldIndexer().IndexField(ctx, object, field, indexFunc); err != nil {
return err
}
return nil
}

0 comments on commit ddff8b6

Please sign in to comment.