Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove duplicated code and fix minor bugs #326

Merged
merged 1 commit into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.12.1 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
Expand Down
167 changes: 53 additions & 114 deletions internal/controller/install/armadaserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,67 +64,34 @@ type ArmadaServerReconciler struct {
// move the current state of the cluster closer to the desired state.
func (r *ArmadaServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithValues("namespace", req.Namespace, "name", req.Name)

started := time.Now()
logger.Info("Reconciling ArmadaServer object")

logger.Info("Fetching ArmadaServer object from cache")
var as installv1alpha1.ArmadaServer
if err := r.Client.Get(ctx, req.NamespacedName, &as); err != nil {
if k8serrors.IsNotFound(err) {
logger.Info("ArmadaServer not found in cache, ending reconcile...", "namespace", req.Namespace, "name", req.Name)
return ctrl.Result{}, nil
}

logger.Info("Reconciling object")

var server installv1alpha1.ArmadaServer
if miss, err := getObject(ctx, r.Client, &server, req.NamespacedName, logger); err != nil || miss {
return ctrl.Result{}, err
}

pc, err := installv1alpha1.BuildPortConfig(as.Spec.ApplicationConfig)
pc, err := installv1alpha1.BuildPortConfig(server.Spec.ApplicationConfig)
if err != nil {
return ctrl.Result{}, err
}
as.Spec.PortConfig = pc
server.Spec.PortConfig = pc

var components *CommonComponents
components, err = generateArmadaServerInstallComponents(&as, r.Scheme)
components, err = generateArmadaServerInstallComponents(&server, r.Scheme)
if err != nil {
return ctrl.Result{}, err
}

deletionTimestamp := as.ObjectMeta.DeletionTimestamp
// examine DeletionTimestamp to determine if object is under deletion
if deletionTimestamp.IsZero() {
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object. This is equivalent
// registering our finalizer.
if !controllerutil.ContainsFinalizer(&as, operatorFinalizer) {
logger.Info("Attaching finalizer to As object", "finalizer", operatorFinalizer)
controllerutil.AddFinalizer(&as, operatorFinalizer)
if err := r.Update(ctx, &as); err != nil {
return ctrl.Result{}, err
}
}
} else {
logger.Info("ArmadaServer object is being deleted", "finalizer", operatorFinalizer)
logger.Info("Namespace-scoped resources will be deleted by Kubernetes based on their OwnerReference")
// The object is being deleted
if controllerutil.ContainsFinalizer(&as, operatorFinalizer) {
// our finalizer is present, so lets handle any external dependency
logger.Info("Running cleanup function for ArmadaServer cluster-scoped components", "finalizer", operatorFinalizer)
if err := r.deleteExternalResources(ctx, components, logger); err != nil {
// if fail to delete the external dependency here, return with error
// so that it can be retried
return ctrl.Result{}, err
}

// remove our finalizer from the list and update it.
logger.Info("Removing finalizer from ArmadaServer object", "finalizer", operatorFinalizer)
controllerutil.RemoveFinalizer(&as, operatorFinalizer)
if err := r.Update(ctx, &as); err != nil {
return ctrl.Result{}, err
}
}

// Stop reconciliation as the item is being deleted
return ctrl.Result{}, nil
cleanupF := func(ctx context.Context) error {
return r.deleteExternalResources(ctx, components, logger)
}
finish, err := checkAndHandleObjectDeletion(ctx, r.Client, &server, operatorFinalizer, cleanupF, logger)
if err != nil || finish {
return ctrl.Result{}, err
}

componentsCopy := components.DeepCopy()
Expand All @@ -134,90 +101,58 @@ func (r *ArmadaServerReconciler) Reconcile(ctx context.Context, req ctrl.Request
return nil
}

if components.ServiceAccount != nil {
logger.Info("Upserting ArmadaServer ServiceAccount object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ServiceAccount, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.ServiceAccount, server.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.Secret != nil {
logger.Info("Upserting ArmadaServer Secret object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Secret, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.Secret, server.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if as.Spec.PulsarInit {
for idx := range components.Jobs {
err = func() error {
if components.Jobs[idx] != nil {
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Jobs[idx], mutateFn); err != nil {
return err
}
ctxTimeout, cancel := context.WithTimeout(ctx, migrationTimeout)
defer cancel()

err := waitForJob(ctxTimeout, r.Client, components.Jobs[idx], migrationPollSleep)
if err != nil {
return err
}
if server.Spec.PulsarInit {
for _, job := range components.Jobs {
err = func(job *batchv1.Job) error {
if err := upsertObjectIfNeeded(ctx, r.Client, job, server.Kind, mutateFn, logger); err != nil {
return err
}

if err := waitForJob(ctx, r.Client, job, jobPollInterval, jobTimeout); err != nil {
return err
}
return nil
}()
}(job)
if err != nil {
return ctrl.Result{}, err
}
}
}

if components.Deployment != nil {
logger.Info("Upserting ArmadaServer Deployment object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Deployment, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.Deployment, server.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.Service != nil {
logger.Info("Upserting ArmadaServer Service object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Service, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.Service, server.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.IngressGrpc != nil {
logger.Info("Upserting ArmadaServer GRPC Ingress object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.IngressGrpc, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.IngressGrpc, server.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.IngressHttp != nil {
logger.Info("Upserting ArmadaServer REST Ingress object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.IngressHttp, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.IngressHttp, server.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.PodDisruptionBudget != nil {
logger.Info("Upserting ArmadaServer PodDisruptionBudget object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.PodDisruptionBudget, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.PodDisruptionBudget, server.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.PrometheusRule != nil {
logger.Info("Upserting ArmadaServer PrometheusRule object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.PrometheusRule, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.PrometheusRule, server.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.ServiceMonitor != nil {
logger.Info("Upserting ArmadaServer ServiceMonitor object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ServiceMonitor, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.ServiceMonitor, server.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

logger.Info("Successfully reconciled ArmadaServer object", "durationMillis", time.Since(started).Milliseconds())
Expand Down Expand Up @@ -283,13 +218,17 @@ func generateArmadaServerInstallComponents(as *installv1alpha1.ArmadaServer, sch
}

var pr *monitoringv1.PrometheusRule
var sm *monitoringv1.ServiceMonitor
if as.Spec.Prometheus != nil && as.Spec.Prometheus.Enabled {
pr = createServerPrometheusRule(as.Name, as.Namespace, as.Spec.Prometheus.ScrapeInterval, as.Spec.Labels, as.Spec.Prometheus.Labels)
}
if err := controllerutil.SetOwnerReference(as, pr, scheme); err != nil {
return nil, err
}

sm := createServiceMonitor(as)
if err := controllerutil.SetOwnerReference(as, sm, scheme); err != nil {
return nil, err
sm = createServiceMonitor(as)
if err := controllerutil.SetOwnerReference(as, sm, scheme); err != nil {
return nil, err
}
}

jobs := []*batchv1.Job{{}}
Expand Down Expand Up @@ -331,12 +270,12 @@ func createArmadaServerMigrationJobs(as *installv1alpha1.ArmadaServer) ([]*batch

appConfig, err := builders.ConvertRawExtensionToYaml(as.Spec.ApplicationConfig)
if err != nil {
return []*batchv1.Job{}, err
return nil, err
}
var asConfig AppConfig
err = yaml.Unmarshal([]byte(appConfig), &asConfig)
if err != nil {
return []*batchv1.Job{}, err
return nil, err
}

// First job is to poll/wait for Pulsar to be fully started
Expand Down
111 changes: 30 additions & 81 deletions internal/controller/install/binoculars_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,13 @@ type BinocularsReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *BinocularsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithValues("namespace", req.Namespace, "name", req.Name)

started := time.Now()
logger.Info("Reconciling Binoculars object")

logger.Info("Fetching Binoculars object from cache")
logger.Info("Reconciling Binoculars object")

var binoculars installv1alpha1.Binoculars
if err := r.Client.Get(ctx, req.NamespacedName, &binoculars); err != nil {
if k8serrors.IsNotFound(err) {
logger.Info("Binoculars not found in cache, ending reconcile...", "namespace", req.Namespace, "name", req.Name)
return ctrl.Result{}, nil
}
if miss, err := getObject(ctx, r.Client, &binoculars, req.NamespacedName, logger); err != nil || miss {
return ctrl.Result{}, err
}

Expand All @@ -87,41 +83,12 @@ func (r *BinocularsReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

deletionTimestamp := binoculars.ObjectMeta.DeletionTimestamp
// examine DeletionTimestamp to determine if object is under deletion
if deletionTimestamp.IsZero() {
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object. This is equivalent
// registering our finalizer.
if !controllerutil.ContainsFinalizer(&binoculars, operatorFinalizer) {
logger.Info("Attaching finalizer to Binoculars object", "finalizer", operatorFinalizer)
controllerutil.AddFinalizer(&binoculars, operatorFinalizer)
if err := r.Update(ctx, &binoculars); err != nil {
return ctrl.Result{}, err
}
}
} else {
logger.Info("Binoculars object is being deleted", "finalizer", operatorFinalizer)
// The object is being deleted
if controllerutil.ContainsFinalizer(&binoculars, operatorFinalizer) {
// our finalizer is present, so lets handle any external dependency
logger.Info("Running cleanup function for Binoculars object", "finalizer", operatorFinalizer)
if err := r.deleteExternalResources(ctx, components); err != nil {
// if fail to delete the external dependency here, return with error
// so that it can be retried
return ctrl.Result{}, err
}

// remove our finalizer from the list and update it.
logger.Info("Removing finalizer from Binoculars object", "finalizer", operatorFinalizer)
controllerutil.RemoveFinalizer(&binoculars, operatorFinalizer)
if err := r.Update(ctx, &binoculars); err != nil {
return ctrl.Result{}, err
}
}

// Stop reconciliation as the item is being deleted
return ctrl.Result{}, nil
cleanupF := func(ctx context.Context) error {
return r.deleteExternalResources(ctx, components)
}
finish, err := checkAndHandleObjectDeletion(ctx, r.Client, &binoculars, operatorFinalizer, cleanupF, logger)
if err != nil || finish {
return ctrl.Result{}, err
}

componentsCopy := components.DeepCopy()
Expand All @@ -131,58 +98,40 @@ func (r *BinocularsReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return nil
}

if components.ServiceAccount != nil {
logger.Info("Upserting Binoculars ServiceAccount object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ServiceAccount, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.ServiceAccount, binoculars.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.ClusterRole != nil {
logger.Info("Upserting Binoculars ClusterRole object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ClusterRole, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.ClusterRole, binoculars.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.ClusterRoleBindings != nil && len(components.ClusterRoleBindings) > 0 {
logger.Info("Upserting Binoculars ClusterRoleBinding object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.ClusterRoleBindings[0], mutateFn); err != nil {
return ctrl.Result{}, err
if len(components.ClusterRoleBindings) > 0 {
for _, crb := range components.ClusterRoleBindings {
if err := upsertObjectIfNeeded(ctx, r.Client, crb, binoculars.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}
}
}

if components.Secret != nil {
logger.Info("Upserting Binoculars Secret object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Secret, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.Secret, binoculars.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.Deployment != nil {
logger.Info("Upserting Binoculars Deployment object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Deployment, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.Deployment, binoculars.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

if components.Service != nil {
logger.Info("Upserting Binoculars Service object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.Service, mutateFn); err != nil {
return ctrl.Result{}, err
}
if err := upsertObjectIfNeeded(ctx, r.Client, components.Service, binoculars.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}
if components.IngressGrpc != nil {
logger.Info("Upserting GRPC Ingress object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.IngressGrpc, mutateFn); err != nil {
return ctrl.Result{}, err
}

if err := upsertObjectIfNeeded(ctx, r.Client, components.IngressGrpc, binoculars.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}
if components.IngressHttp != nil {
logger.Info("Upserting REST Ingress object")
if _, err := controllerutil.CreateOrUpdate(ctx, r.Client, components.IngressHttp, mutateFn); err != nil {
return ctrl.Result{}, err
}

if err := upsertObjectIfNeeded(ctx, r.Client, components.IngressHttp, binoculars.Kind, mutateFn, logger); err != nil {
return ctrl.Result{}, err
}

logger.Info("Successfully reconciled Binoculars object", "durationMillis", time.Since(started).Milliseconds())
Expand Down
Loading
Loading