From 391e45346757e514d52c3b59bb2bf59bbc1a735f Mon Sep 17 00:00:00 2001 From: cwiklik Date: Wed, 29 May 2024 09:40:24 -0400 Subject: [PATCH] cleanup, added README's, renamed clustermetrics-controller to quotamanager-controller Signed-off-by: cwiklik --- README.md | 7 +- .../cluster-metrics/templates/operator.yaml | 2 +- .../config/manager/kustomization.yaml | 2 +- kueue-ks/README.md | 47 +++++-------- .../controllers/admissioncheck_controller.go | 10 +-- ...ntroller.go => quotamanager_controller.go} | 67 ++++++++----------- kueue-ks/controllers/workload_controller.go | 15 +---- scripts/kueue/README.md | 40 +++++++++++ 8 files changed, 98 insertions(+), 92 deletions(-) rename kueue-ks/controllers/{clustermetrics_controller.go => quotamanager_controller.go} (70%) create mode 100644 scripts/kueue/README.md diff --git a/README.md b/README.md index f2ff4d1..57bf62c 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,7 @@ Additional modules, tools and documentation to facilitate KubeStellar integration with other community projects. -This project includes bash-based scripts to replicate demos and PoCs such as KFP + KubeStellar integration -and Argo Workflows + KubeStellar integration. +This project includes bash-based scripts to replicate demos and PoCs such as KFP + KubeStellar integration, Argo Workflows + KubeStellar, and Kueue + KubeStellar integrations. - [suspend-webook](./suspend-webhook/) webook used to suspend argo workflows (and in the future other types of workloads supporting the suspend flag) @@ -16,6 +15,7 @@ sync/status sycn mechanisms. - [mc-scheduling](./mc-scheduling/) -A Multi-cluster scheduling framework supporting pluggable schedulers. +- [kueue-ks](./kueue-ks/) -Set of controllers enabling integration of Kueue with KubeStellar. ## KubeFlow Pipelines v2 @@ -25,3 +25,6 @@ Check out this [instructions](./scripts/kfp/) ## Argo Workflows Check out this [instructions](./scripts/argo-wf/) + +## kueue-ks +Check out this [instructions](./scripts/kueue/) diff --git a/charts/cluster-metrics/templates/operator.yaml b/charts/cluster-metrics/templates/operator.yaml index 407e5bf..95b8e66 100644 --- a/charts/cluster-metrics/templates/operator.yaml +++ b/charts/cluster-metrics/templates/operator.yaml @@ -297,7 +297,7 @@ spec: - --metrics-bind-address=127.0.0.1:8080 - --leader-elect - --metrics-name={{.Values.clusterName}} - image: ko.local/cluster-metrics:102f815 + image: ko.local/cluster-metrics:15960ac livenessProbe: httpGet: path: /healthz diff --git a/clustermetrics/config/manager/kustomization.yaml b/clustermetrics/config/manager/kustomization.yaml index 9f8cb89..d521993 100644 --- a/clustermetrics/config/manager/kustomization.yaml +++ b/clustermetrics/config/manager/kustomization.yaml @@ -5,4 +5,4 @@ kind: Kustomization images: - name: controller newName: ko.local/cluster-metrics - newTag: 102f815 + newTag: 15960ac diff --git a/kueue-ks/README.md b/kueue-ks/README.md index e6eddcd..efa83b8 100644 --- a/kueue-ks/README.md +++ b/kueue-ks/README.md @@ -1,44 +1,33 @@ -# kueue-ks -// TODO(user): Add simple overview of use/purpose +# Multi-cluster Job Workload Management with Kueue and KubeStellar +This project aims to simplify the deployment and management of batch workloads across multiple Kubernetes clusters using [Kueue](https://kueue.sigs.k8s.io) for job queueing and [KubeStellar](https://docs.kubestellar.io) for multi-cluster configuration management. -## Description -// TODO(user): An in-depth paragraph about your project and overview of use -## Getting Started -You’ll need a Kubernetes cluster to run against. You can use [KIND](https://sigs.k8s.io/kind) to get a local cluster for testing, or run against a remote cluster. -**Note:** Your controller will automatically use the current context in your kubeconfig file (i.e. whatever cluster `kubectl cluster-info` shows). +## Overview +This repository contains two core controllers: -### Running on the cluster -1. Install Instances of Custom Resources: +- *WorkloadController:* watches for Kueue `Workload` objects and orchestrates the downsync and deployment of corresponding jobs to worker clusters managed by KubeStellar +- *QuotaManagerController:* monitors [ClusterMetrics](https://github.com/kubestellar/galaxy/tree/main/clustermetrics) from each worker cluster and dynamically updates Kueue's global resource quotas as needed -```sh -kubectl apply -f config/samples/ -``` +## Description +In multi-cluster Kubernetes environments, managing batch workloads and ensuring efficient resource utilization across clusters can be a complex challenge. Organizations often face issues such as resource contention, over-provisioning, and inefficient workload distribution, leading to suboptimal resource utilization and increased costs. -2. Build and push your image to the location specified by `IMG`: - -```sh -make docker-build docker-push IMG=/kueue-ks:tag -``` - -3. Deploy the controller to the cluster with the image specified by `IMG`: +The kueue-ks project goal is to address these challenges by leveraging Kueue's quota management capabilities and integrating with KubeStellar for multi-cluster configuration management. The primary goal is to enable centralized management and intelligent distribution of batch workloads across multiple clusters based on available resource quotas. -```sh -make deploy IMG=/kueue-ks:tag -``` -### Uninstall CRDs -To delete the CRDs from the cluster: +## Getting Started +You’ll need a Kubernetes cluster to run against. You can use [K3D](https://k3d.io) to get a local cluster for testing. + +### Running on the K3D cluster +1. Check out this [instructions](./scripts/kueue/) + +2. Run job examples: ```sh -make uninstall +kubectl create -f examples/batch-job.yaml ``` -### Undeploy controller -UnDeploy the controller to the cluster: - ```sh -make undeploy +kubectl create -f examples/pytorch-simple-job.yaml ``` ## Contributing diff --git a/kueue-ks/controllers/admissioncheck_controller.go b/kueue-ks/controllers/admissioncheck_controller.go index 95e2b54..5367f24 100644 --- a/kueue-ks/controllers/admissioncheck_controller.go +++ b/kueue-ks/controllers/admissioncheck_controller.go @@ -57,13 +57,9 @@ const ( //+kubebuilder:rbac:groups=kueue.x-k8s.io.galaxy.kubestellar.io,resources=admissionchecks/status,verbs=get;update;patch //+kubebuilder:rbac:groups=kueue.x-k8s.io.galaxy.kubestellar.io,resources=admissionchecks/finalizers,verbs=update -// Reconcile is part of the main kubernetes reconciliation loop which aims to -// move the current state of the cluster closer to the desired state. -// TODO(user): Modify the Reconcile function to compare the state specified by -// the AdmissionCheck object against the actual cluster state, and then -// perform operations to make the cluster state reflect the state specified by -// the user. -// + +//Reconciles kueue AdmissionCheck + // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.12.1/pkg/reconcile func (r *AdmissionCheckReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { diff --git a/kueue-ks/controllers/clustermetrics_controller.go b/kueue-ks/controllers/quotamanager_controller.go similarity index 70% rename from kueue-ks/controllers/clustermetrics_controller.go rename to kueue-ks/controllers/quotamanager_controller.go index b783754..3d2fad1 100644 --- a/kueue-ks/controllers/clustermetrics_controller.go +++ b/kueue-ks/controllers/quotamanager_controller.go @@ -25,34 +25,35 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - //"k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" v1beta1 "sigs.k8s.io/kueue/apis/kueue/v1beta1" ) -// ClusterMetricsReconciler reconciles a ClusterMetrics object +const ( + CPU = "cpu" + Memory = "memory" +) + +// ClusterMetricsReconciler reconciles ClusterMetrics object from each cluster and updates +// global quota managed by kueue in a cluster queue. As new clusters join or new nodes +// are added this controller increases quota accordingly. Quota decreasing is not so +// straighforward. Although technically the decrease can be done, kueue will not preempt +// any jobs even if it is required due to reduced quota. The only way the decrease can +// work is to stop accepting new jobs, drain, decrease quota and open the gate for new +// jobs. type ClusterMetricsReconciler struct { client.Client Scheme *runtime.Scheme WorkerClusters map[string]clustermetrics.ClusterMetrics ClusterQueue string } + //+kubebuilder:rbac:groups=galaxy.kubestellar.io.galaxy.kubestellar.io,resources=clustermetrics,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=galaxy.kubestellar.io.galaxy.kubestellar.io,resources=clustermetrics/status,verbs=get;update;patch //+kubebuilder:rbac:groups=galaxy.kubestellar.io.galaxy.kubestellar.io,resources=clustermetrics/finalizers,verbs=update -/* -func NewClusterMetricsReconciler(c client.Client, s *runtime.Scheme, q string, cfg *rest.Config) *ClusterMetricsReconciler { - return &ClusterMetricsReconciler{ - Client: c, - Scheme: s, - WorkerClusters: make(map[string]clustermetrics.ClusterMetrics), - ClusterQueue: q, - } -} -*/ // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // TODO(user): Modify the Reconcile function to compare the state specified by @@ -82,16 +83,15 @@ func (r *ClusterMetricsReconciler) Reconcile(ctx context.Context, req ctrl.Reque available := map[string]*resource.Quantity{} for _, cm := range r.WorkerClusters { for _, node := range cm.Status.Nodes { - //log.Info("%%%%%%%%%% ", "Cluster", cm.Name) - if available["cpu"] == nil { - available["cpu"] = resource.NewQuantity(0, resource.BinarySI) + if available[CPU] == nil { + available[CPU] = resource.NewQuantity(0, resource.BinarySI) } - available["cpu"].Add(*node.AllocatableResources.Cpu()) + available[CPU].Add(*node.AllocatableResources.Cpu()) - if available["memory"] == nil { - available["memory"] = resource.NewQuantity(0, resource.BinarySI) + if available[Memory] == nil { + available[Memory] = resource.NewQuantity(0, resource.BinarySI) } - available["memory"].Add(*node.AllocatableResources.Memory()) + available[Memory].Add(*node.AllocatableResources.Memory()) } } clusterQueue := v1beta1.ClusterQueue{} @@ -104,38 +104,27 @@ func (r *ClusterMetricsReconciler) Reconcile(ctx context.Context, req ctrl.Reque } Default := 0 update := false - //log.Info("Clusterqueue :::::::", "Resources", clusterQueue.Spec.ResourceGroups[0].Flavors[Default]) queueNominalCpuCount := clusterQueue.Spec.ResourceGroups[0].Flavors[Default].Resources[0].NominalQuota - if clusterQueue.Spec.ResourceGroups[0].Flavors[Default].Resources[0].Name == "cpu" { - // log.Info("Clusterqueue nominal ---- CPU") - if available["cpu"] != nil { - // log.Info("Clusterqueue nominal cpus ----", - // "", queueNominalCpuCount, - // "", queueNominalCpuCount.Format) - if available["cpu"].Value() > queueNominalCpuCount.Value() { + if clusterQueue.Spec.ResourceGroups[0].Flavors[Default].Resources[0].Name == CPU { + if available[CPU] != nil { + if available[CPU].Value() > queueNominalCpuCount.Value() { update = true - delta := available["cpu"].DeepCopy() + delta := available[CPU].DeepCopy() delta.Sub(queueNominalCpuCount) queueNominalCpuCount.Add(delta) - // log.Info("ClusterQueue New CPU Quota ----", "", queueNominalCpuCount.Value()) clusterQueue.Spec.ResourceGroups[0].Flavors[Default].Resources[0].NominalQuota = queueNominalCpuCount } } } - if clusterQueue.Spec.ResourceGroups[0].Flavors[Default].Resources[1].Name == "memory" { - // log.Info("Clusterqueue nominal ---- MEMORY") + if clusterQueue.Spec.ResourceGroups[0].Flavors[Default].Resources[1].Name == Memory { queueNominalMemoryQuota := clusterQueue.Spec.ResourceGroups[0].Flavors[Default].Resources[1].NominalQuota //.ScaledValue(resource.Giga) - if available["memory"] != nil { - // log.Info("Clusterqueue nominal memory ----", - // "", queueNominalMemoryQuota, - // "", queueNominalMemoryQuota.Format) - if available["memory"].ScaledValue(resource.Kilo) > queueNominalMemoryQuota.ScaledValue(resource.Kilo) { + if available[Memory] != nil { + if available[Memory].ScaledValue(resource.Kilo) > queueNominalMemoryQuota.ScaledValue(resource.Kilo) { update = true - delta := available["memory"].DeepCopy() + delta := available[Memory].DeepCopy() delta.Sub(queueNominalMemoryQuota) queueNominalMemoryQuota.Add(delta) - // log.Info("ClusterQueue New Memory Quota ----", "", queueNominalMemoryQuota) clusterQueue.Spec.ResourceGroups[0].Flavors[Default].Resources[1].NominalQuota = queueNominalMemoryQuota } } @@ -154,6 +143,6 @@ func (r *ClusterMetricsReconciler) Reconcile(ctx context.Context, req ctrl.Reque // SetupWithManager sets up the controller with the Manager. func (r *ClusterMetricsReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). - For(&clustermetrics.ClusterMetrics{}). + For(&clustermetrics.ClusterMetrics{}). Complete(r) } diff --git a/kueue-ks/controllers/workload_controller.go b/kueue-ks/controllers/workload_controller.go index b64604c..1f9eb0b 100644 --- a/kueue-ks/controllers/workload_controller.go +++ b/kueue-ks/controllers/workload_controller.go @@ -81,13 +81,8 @@ func NewWorkloadReconciler(c client.Client, kueueClient *kueueClient.Clientset, } -// Reconcile is part of the main kubernetes reconciliation loop which aims to -// move the current state of the cluster closer to the desired state. -// TODO(user): Modify the Reconcile function to compare the state specified by -// the Workload object against the actual cluster state, and then -// perform operations to make the cluster state reflect the state specified by -// the user. -// +// Reconciles kueue Workload object and if quota exists it downsyncs a job to a worker cluster. + // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.12.1/pkg/reconcile func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -100,13 +95,11 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return reconcile.Result{}, client.IgnoreNotFound(err) } if !workload.HasQuotaReservation(wl) { - //1.2 workload has no reservation log.Info("workload with no reservation, delete owned requests") return reconcile.Result{}, r.evictJob(ctx, wl) } if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) { - //1.2 workload has no reservation or is finished log.Info("remote workload has completed") return reconcile.Result{}, nil } @@ -130,15 +123,12 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c } // jobs with assigned cluster have already been scheduled to run if _, exists := jobObject.GetLabels()[AssignedClusterLabel]; exists { - log.Info("............ Cluster Assignment Present") - if workload.HasAllChecksReady(wl) { err := retry.RetryOnConflict(retry.DefaultRetry, func() error { wl := &kueue.Workload{} if err := r.Client.Get(ctx, req.NamespacedName, wl); err != nil { log.Error(err, "Error when fetching Workload object ") return err - //return reconcile.Result{}, client.IgnoreNotFound(err) } log.Info("............ All Checks Ready") newCondition := metav1.Condition{ @@ -167,7 +157,6 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c log.Info("New BindingPolicy created for object", "Name", meta.Name) } else { - log.Info("............ Not All Checks Ready") relevantChecks, err := admissioncheck.FilterForController(ctx, r.Client, wl.Status.AdmissionChecks, ControllerName) if err != nil { return reconcile.Result{}, err diff --git a/scripts/kueue/README.md b/scripts/kueue/README.md new file mode 100644 index 0000000..ec900df --- /dev/null +++ b/scripts/kueue/README.md @@ -0,0 +1,40 @@ +# README + +These scripts are currently supporting deployment of KubeStellar, Kueue, and integration controllers on K3D Kubernetes only. + +## Preparation + +Before runnning the scripts, make sure to increase your ulimits and +fs.inotify.max_user_watches and fs.inotify.max_user_instances for the machine +running the kind clusters. See [these instructions](https://kind.sigs.k8s.io/docs/user/known-issues/#pod-errors-due-to-too-many-open-files) for more info. + +For Rancher Desktop follow [these instructions](https://docs.rancherdesktop.io/how-to-guides/increasing-open-file-limit). +and use the following config: + +```yaml +provision: +- mode: system + script: | + #!/bin/sh + cat <<'EOF' > /etc/security/limits.d/rancher-desktop.conf + * soft nofile 82920 + * hard nofile 82920 + EOF + sysctl -w vm.max_map_count=262144 + sysctl -w fs.inotify.max_user_instances=8192 + sysctl -w fs.inotify.max_user_watches=1048576 +``` + +Before running the script, make sure to run `go mod tidy` + + +## Running the scripts + +Note: at the start each script deletes yor current kubeflex, cluster1 and cluster2 clusters, and +backs up and delete your default kubeconfig in ~/.kube/config. + +To install, run the `install-all` script: + +```shell +./instal-all.sh +``` \ No newline at end of file