Skip to content

Commit

Permalink
cleanup, added README's, renamed clustermetrics-controller to quotama…
Browse files Browse the repository at this point in the history
…nager-controller

Signed-off-by: cwiklik <[email protected]>
  • Loading branch information
cwiklik committed May 29, 2024
1 parent 15960ac commit 391e453
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 92 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

Additional modules, tools and documentation to facilitate KubeStellar integration with other community projects.

This project includes bash-based scripts to replicate demos and PoCs such as KFP + KubeStellar integration
and Argo Workflows + KubeStellar integration.
This project includes bash-based scripts to replicate demos and PoCs such as KFP + KubeStellar integration, Argo Workflows + KubeStellar, and Kueue + KubeStellar integrations.

- [suspend-webook](./suspend-webhook/) webook used to suspend argo workflows (and in the future other types of
workloads supporting the suspend flag)
Expand All @@ -16,6 +15,7 @@ sync/status sycn mechanisms.

- [mc-scheduling](./mc-scheduling/) -A Multi-cluster scheduling framework supporting pluggable schedulers.

- [kueue-ks](./kueue-ks/) -Set of controllers enabling integration of Kueue with KubeStellar.

## KubeFlow Pipelines v2

Expand All @@ -25,3 +25,6 @@ Check out this [instructions](./scripts/kfp/)
## Argo Workflows

Check out this [instructions](./scripts/argo-wf/)

## kueue-ks
Check out this [instructions](./scripts/kueue/)
2 changes: 1 addition & 1 deletion charts/cluster-metrics/templates/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ spec:
- --metrics-bind-address=127.0.0.1:8080
- --leader-elect
- --metrics-name={{.Values.clusterName}}
image: ko.local/cluster-metrics:102f815
image: ko.local/cluster-metrics:15960ac
livenessProbe:
httpGet:
path: /healthz
Expand Down
2 changes: 1 addition & 1 deletion clustermetrics/config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ kind: Kustomization
images:
- name: controller
newName: ko.local/cluster-metrics
newTag: 102f815
newTag: 15960ac
47 changes: 18 additions & 29 deletions kueue-ks/README.md
Original file line number Diff line number Diff line change
@@ -1,44 +1,33 @@
# kueue-ks
// TODO(user): Add simple overview of use/purpose
# Multi-cluster Job Workload Management with Kueue and KubeStellar
This project aims to simplify the deployment and management of batch workloads across multiple Kubernetes clusters using [Kueue](https://kueue.sigs.k8s.io) for job queueing and [KubeStellar](https://docs.kubestellar.io) for multi-cluster configuration management.

## Description
// TODO(user): An in-depth paragraph about your project and overview of use

## Getting Started
You’ll need a Kubernetes cluster to run against. You can use [KIND](https://sigs.k8s.io/kind) to get a local cluster for testing, or run against a remote cluster.
**Note:** Your controller will automatically use the current context in your kubeconfig file (i.e. whatever cluster `kubectl cluster-info` shows).
## Overview
This repository contains two core controllers:

### Running on the cluster
1. Install Instances of Custom Resources:
- *WorkloadController:* watches for Kueue `Workload` objects and orchestrates the downsync and deployment of corresponding jobs to worker clusters managed by KubeStellar
- *QuotaManagerController:* monitors [ClusterMetrics](https://github.com/kubestellar/galaxy/tree/main/clustermetrics) from each worker cluster and dynamically updates Kueue's global resource quotas as needed

```sh
kubectl apply -f config/samples/
```
## Description
In multi-cluster Kubernetes environments, managing batch workloads and ensuring efficient resource utilization across clusters can be a complex challenge. Organizations often face issues such as resource contention, over-provisioning, and inefficient workload distribution, leading to suboptimal resource utilization and increased costs.

2. Build and push your image to the location specified by `IMG`:

```sh
make docker-build docker-push IMG=<some-registry>/kueue-ks:tag
```

3. Deploy the controller to the cluster with the image specified by `IMG`:
The kueue-ks project goal is to address these challenges by leveraging Kueue's quota management capabilities and integrating with KubeStellar for multi-cluster configuration management. The primary goal is to enable centralized management and intelligent distribution of batch workloads across multiple clusters based on available resource quotas.

```sh
make deploy IMG=<some-registry>/kueue-ks:tag
```

### Uninstall CRDs
To delete the CRDs from the cluster:
## Getting Started
You’ll need a Kubernetes cluster to run against. You can use [K3D](https://k3d.io) to get a local cluster for testing.

### Running on the K3D cluster
1. Check out this [instructions](./scripts/kueue/)

2. Run job examples:

```sh
make uninstall
kubectl create -f examples/batch-job.yaml
```

### Undeploy controller
UnDeploy the controller to the cluster:

```sh
make undeploy
kubectl create -f examples/pytorch-simple-job.yaml
```

## Contributing
Expand Down
10 changes: 3 additions & 7 deletions kueue-ks/controllers/admissioncheck_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,9 @@ const (
//+kubebuilder:rbac:groups=kueue.x-k8s.io.galaxy.kubestellar.io,resources=admissionchecks/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=kueue.x-k8s.io.galaxy.kubestellar.io,resources=admissionchecks/finalizers,verbs=update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the AdmissionCheck object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//

//Reconciles kueue AdmissionCheck

// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *AdmissionCheckReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,35 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
//"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
v1beta1 "sigs.k8s.io/kueue/apis/kueue/v1beta1"
)

// ClusterMetricsReconciler reconciles a ClusterMetrics object
const (
CPU = "cpu"
Memory = "memory"
)

// ClusterMetricsReconciler reconciles ClusterMetrics object from each cluster and updates
// global quota managed by kueue in a cluster queue. As new clusters join or new nodes
// are added this controller increases quota accordingly. Quota decreasing is not so
// straighforward. Although technically the decrease can be done, kueue will not preempt
// any jobs even if it is required due to reduced quota. The only way the decrease can
// work is to stop accepting new jobs, drain, decrease quota and open the gate for new
// jobs.
type ClusterMetricsReconciler struct {
client.Client
Scheme *runtime.Scheme
WorkerClusters map[string]clustermetrics.ClusterMetrics
ClusterQueue string
}

//+kubebuilder:rbac:groups=galaxy.kubestellar.io.galaxy.kubestellar.io,resources=clustermetrics,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=galaxy.kubestellar.io.galaxy.kubestellar.io,resources=clustermetrics/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=galaxy.kubestellar.io.galaxy.kubestellar.io,resources=clustermetrics/finalizers,verbs=update

/*
func NewClusterMetricsReconciler(c client.Client, s *runtime.Scheme, q string, cfg *rest.Config) *ClusterMetricsReconciler {
return &ClusterMetricsReconciler{
Client: c,
Scheme: s,
WorkerClusters: make(map[string]clustermetrics.ClusterMetrics),
ClusterQueue: q,
}
}
*/
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
Expand Down Expand Up @@ -82,16 +83,15 @@ func (r *ClusterMetricsReconciler) Reconcile(ctx context.Context, req ctrl.Reque
available := map[string]*resource.Quantity{}
for _, cm := range r.WorkerClusters {
for _, node := range cm.Status.Nodes {
//log.Info("%%%%%%%%%% ", "Cluster", cm.Name)
if available["cpu"] == nil {
available["cpu"] = resource.NewQuantity(0, resource.BinarySI)
if available[CPU] == nil {
available[CPU] = resource.NewQuantity(0, resource.BinarySI)
}
available["cpu"].Add(*node.AllocatableResources.Cpu())
available[CPU].Add(*node.AllocatableResources.Cpu())

if available["memory"] == nil {
available["memory"] = resource.NewQuantity(0, resource.BinarySI)
if available[Memory] == nil {
available[Memory] = resource.NewQuantity(0, resource.BinarySI)
}
available["memory"].Add(*node.AllocatableResources.Memory())
available[Memory].Add(*node.AllocatableResources.Memory())
}
}
clusterQueue := v1beta1.ClusterQueue{}
Expand All @@ -104,38 +104,27 @@ func (r *ClusterMetricsReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}
Default := 0
update := false
//log.Info("Clusterqueue :::::::", "Resources", clusterQueue.Spec.ResourceGroups[0].Flavors[Default])
queueNominalCpuCount := clusterQueue.Spec.ResourceGroups[0].Flavors[Default].Resources[0].NominalQuota
if clusterQueue.Spec.ResourceGroups[0].Flavors[Default].Resources[0].Name == "cpu" {
// log.Info("Clusterqueue nominal ---- CPU")
if available["cpu"] != nil {
// log.Info("Clusterqueue nominal cpus ----",
// "", queueNominalCpuCount,
// "", queueNominalCpuCount.Format)
if available["cpu"].Value() > queueNominalCpuCount.Value() {
if clusterQueue.Spec.ResourceGroups[0].Flavors[Default].Resources[0].Name == CPU {
if available[CPU] != nil {
if available[CPU].Value() > queueNominalCpuCount.Value() {
update = true
delta := available["cpu"].DeepCopy()
delta := available[CPU].DeepCopy()
delta.Sub(queueNominalCpuCount)
queueNominalCpuCount.Add(delta)
// log.Info("ClusterQueue New CPU Quota ----", "", queueNominalCpuCount.Value())
clusterQueue.Spec.ResourceGroups[0].Flavors[Default].Resources[0].NominalQuota = queueNominalCpuCount
}
}
}
if clusterQueue.Spec.ResourceGroups[0].Flavors[Default].Resources[1].Name == "memory" {
// log.Info("Clusterqueue nominal ---- MEMORY")
if clusterQueue.Spec.ResourceGroups[0].Flavors[Default].Resources[1].Name == Memory {
queueNominalMemoryQuota := clusterQueue.Spec.ResourceGroups[0].Flavors[Default].Resources[1].NominalQuota //.ScaledValue(resource.Giga)

if available["memory"] != nil {
// log.Info("Clusterqueue nominal memory ----",
// "", queueNominalMemoryQuota,
// "", queueNominalMemoryQuota.Format)
if available["memory"].ScaledValue(resource.Kilo) > queueNominalMemoryQuota.ScaledValue(resource.Kilo) {
if available[Memory] != nil {
if available[Memory].ScaledValue(resource.Kilo) > queueNominalMemoryQuota.ScaledValue(resource.Kilo) {
update = true
delta := available["memory"].DeepCopy()
delta := available[Memory].DeepCopy()
delta.Sub(queueNominalMemoryQuota)
queueNominalMemoryQuota.Add(delta)
// log.Info("ClusterQueue New Memory Quota ----", "", queueNominalMemoryQuota)
clusterQueue.Spec.ResourceGroups[0].Flavors[Default].Resources[1].NominalQuota = queueNominalMemoryQuota
}
}
Expand All @@ -154,6 +143,6 @@ func (r *ClusterMetricsReconciler) Reconcile(ctx context.Context, req ctrl.Reque
// SetupWithManager sets up the controller with the Manager.
func (r *ClusterMetricsReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&clustermetrics.ClusterMetrics{}).
For(&clustermetrics.ClusterMetrics{}).
Complete(r)
}
15 changes: 2 additions & 13 deletions kueue-ks/controllers/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,8 @@ func NewWorkloadReconciler(c client.Client, kueueClient *kueueClient.Clientset,
}


// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the Workload object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// Reconciles kueue Workload object and if quota exists it downsyncs a job to a worker cluster.

// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand All @@ -100,13 +95,11 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return reconcile.Result{}, client.IgnoreNotFound(err)
}
if !workload.HasQuotaReservation(wl) {
//1.2 workload has no reservation
log.Info("workload with no reservation, delete owned requests")
return reconcile.Result{}, r.evictJob(ctx, wl)
}

if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
//1.2 workload has no reservation or is finished
log.Info("remote workload has completed")
return reconcile.Result{}, nil
}
Expand All @@ -130,15 +123,12 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}
// jobs with assigned cluster have already been scheduled to run
if _, exists := jobObject.GetLabels()[AssignedClusterLabel]; exists {
log.Info("............ Cluster Assignment Present")

if workload.HasAllChecksReady(wl) {
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
wl := &kueue.Workload{}
if err := r.Client.Get(ctx, req.NamespacedName, wl); err != nil {
log.Error(err, "Error when fetching Workload object ")
return err
//return reconcile.Result{}, client.IgnoreNotFound(err)
}
log.Info("............ All Checks Ready")
newCondition := metav1.Condition{
Expand Down Expand Up @@ -167,7 +157,6 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
log.Info("New BindingPolicy created for object", "Name", meta.Name)

} else {
log.Info("............ Not All Checks Ready")
relevantChecks, err := admissioncheck.FilterForController(ctx, r.Client, wl.Status.AdmissionChecks, ControllerName)
if err != nil {
return reconcile.Result{}, err
Expand Down
40 changes: 40 additions & 0 deletions scripts/kueue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# README

These scripts are currently supporting deployment of KubeStellar, Kueue, and integration controllers on K3D Kubernetes only.

## Preparation

Before runnning the scripts, make sure to increase your ulimits and
fs.inotify.max_user_watches and fs.inotify.max_user_instances for the machine
running the kind clusters. See [these instructions](https://kind.sigs.k8s.io/docs/user/known-issues/#pod-errors-due-to-too-many-open-files) for more info.

For Rancher Desktop follow [these instructions](https://docs.rancherdesktop.io/how-to-guides/increasing-open-file-limit).
and use the following config:

```yaml
provision:
- mode: system
script: |
#!/bin/sh
cat <<'EOF' > /etc/security/limits.d/rancher-desktop.conf
* soft nofile 82920
* hard nofile 82920
EOF
sysctl -w vm.max_map_count=262144
sysctl -w fs.inotify.max_user_instances=8192
sysctl -w fs.inotify.max_user_watches=1048576
```
Before running the script, make sure to run `go mod tidy`


## Running the scripts

Note: at the start each script deletes yor current kubeflex, cluster1 and cluster2 clusters, and
backs up and delete your default kubeconfig in ~/.kube/config.

To install, run the `install-all` script:

```shell
./instal-all.sh
```

0 comments on commit 391e453

Please sign in to comment.