Skip to content

Commit

Permalink
support auto delete WorkloadRebalancer when time up.
Browse files Browse the repository at this point in the history
Signed-off-by: chaosi-zju <[email protected]>
  • Loading branch information
chaosi-zju committed Apr 30, 2024
1 parent 9bbbc2c commit b016c25
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 8 deletions.
14 changes: 14 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -16759,6 +16759,11 @@
"workloads"
],
"properties": {
"ttlMinutesAfterFinished": {
"description": "ttlMinutesAfterFinished limits the lifetime of a WorkloadRebalancer that has finished execution (means each target workload is finished with result of Successful or Failed). If this field is set, ttlSecondsAfterFinished after the WorkloadRebalancer finishes, it is eligible to be automatically deleted. If this field is unset, the WorkloadRebalancer won't be automatically deleted. If this field is set to zero, the WorkloadRebalancer becomes eligible to be deleted immediately after it finishes.",
"type": "integer",
"format": "int32"
},
"workloads": {
"description": "Workloads used to specify the list of expected resource. Nil or empty list is not allowed.",
"type": "array",
Expand All @@ -16773,6 +16778,15 @@
"description": "WorkloadRebalancerStatus contains information about the current status of a WorkloadRebalancer updated periodically by schedule trigger controller.",
"type": "object",
"properties": {
"lastUpdateTime": {
"description": "LastUpdateTime represents the last update time of any field in WorkloadRebalancerStatus other than itself. optional",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Time"
},
"observedGeneration": {
"description": "ObservedGeneration is the generation(.metadata.generation) observed by the controller. If ObservedGeneration is less than the generation in metadata means the controller hasn't confirmed the rebalance result or hasn't done the rebalance yet. optional",
"type": "integer",
"format": "int64"
},
"observedWorkloads": {
"description": "ObservedWorkloads contains information about the execution states and messages of target resources.",
"type": "array",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ spec:
description: Spec represents the specification of the desired behavior
of WorkloadRebalancer.
properties:
ttlMinutesAfterFinished:
description: ttlMinutesAfterFinished limits the lifetime of a WorkloadRebalancer
that has finished execution (means each target workload is finished
with result of Successful or Failed). If this field is set, ttlSecondsAfterFinished
after the WorkloadRebalancer finishes, it is eligible to be automatically
deleted. If this field is unset, the WorkloadRebalancer won't be
automatically deleted. If this field is set to zero, the WorkloadRebalancer
becomes eligible to be deleted immediately after it finishes.
format: int32
type: integer
workloads:
description: Workloads used to specify the list of expected resource.
Nil or empty list is not allowed.
Expand Down Expand Up @@ -69,6 +79,18 @@ spec:
status:
description: Status represents the status of WorkloadRebalancer.
properties:
lastUpdateTime:
description: LastUpdateTime represents the last update time of any
field in WorkloadRebalancerStatus other than itself. optional
format: date-time
type: string
observedGeneration:
description: ObservedGeneration is the generation(.metadata.generation)
observed by the controller. If ObservedGeneration is less than the
generation in metadata means the controller hasn't confirmed the
rebalance result or hasn't done the rebalance yet. optional
format: int64
type: integer
observedWorkloads:
description: ObservedWorkloads contains information about the execution
states and messages of target resources.
Expand Down
5 changes: 4 additions & 1 deletion cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,8 @@ func startRemedyController(ctx controllerscontext.Context) (enabled bool, err er

func startWorkloadRebalancerController(ctx controllerscontext.Context) (enabled bool, err error) {
workloadRebalancer := workloadrebalancer.RebalancerController{
Client: ctx.Mgr.GetClient(),
Client: ctx.Mgr.GetClient(),
ControlPlaneClient: ctx.ControlPlaneClient,
}
err = workloadRebalancer.SetupWithManager(ctx.Mgr)
if err != nil {
Expand All @@ -727,6 +728,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
discoverClientSet := discovery.NewDiscoveryClientForConfigOrDie(restConfig)
kubeClientSet := kubeclientset.NewForConfigOrDie(restConfig)
controlPlaneClient := gclient.NewForConfigOrDie(restConfig)

overrideManager := overridemanager.New(mgr.GetClient(), mgr.GetEventRecorderFor(overridemanager.OverrideManagerName))
skippedResourceConfig := util.NewSkippedResourceConfig()
Expand Down Expand Up @@ -812,6 +814,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
StopChan: stopChan,
DynamicClientSet: dynamicClientSet,
KubeClientSet: kubeClientSet,
ControlPlaneClient: controlPlaneClient,
OverrideManager: overrideManager,
ControlPlaneInformerManager: controlPlaneInformerManager,
ResourceInterpreter: resourceInterpreter,
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/apps/v1alpha1/workloadrebalancer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ type WorkloadRebalancerStatus struct {
// ObservedWorkloads contains information about the execution states and messages of target resources.
// +optional
ObservedWorkloads []ObservedWorkload `json:"observedWorkloads,omitempty"`

// ObservedGeneration is the generation(.metadata.generation) observed by the controller.
// If ObservedGeneration is less than the generation in metadata means the controller hasn't confirmed
// the rebalance result or hasn't done the rebalance yet.
// optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`

// LastUpdateTime represents the last update time of any field in WorkloadRebalancerStatus other than itself.
// optional
LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"`
}

// ObservedWorkload the observed resource.
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/controllers/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/karmada-io/karmada/pkg/controllers/federatedhpa/config"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
Expand Down Expand Up @@ -109,6 +110,7 @@ type Context struct {
StopChan <-chan struct{}
DynamicClientSet dynamic.Interface
KubeClientSet clientset.Interface
ControlPlaneClient client.Client
OverrideManager overridemanager.OverrideManager
ControlPlaneInformerManager genericmanager.SingleClusterInformerManager
ResourceInterpreter resourceinterpreter.ResourceInterpreter
Expand Down
120 changes: 119 additions & 1 deletion pkg/controllers/workloadrebalancer/ttlafterfinished_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,126 @@ limitations under the License.

package workloadrebalancer

import "github.com/karmada-io/karmada/pkg/util"
import (
"context"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/karmada-io/karmada/pkg/apis/apps/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
)

func (c *RebalancerController) deleteExpiredRebalancer(key util.QueueKey) error {
objKey, ok := key.(client.ObjectKey)
if !ok {
klog.Errorf("Invalid object key: %+v", key)
return nil
}
klog.V(4).Infof("Checking if WorkloadRebalancer(%s) is ready for cleanup", objKey.Name)

// 1. Get WorkloadRebalancer from cache.
rebalancer := &appsv1alpha1.WorkloadRebalancer{}
if err := c.Client.Get(context.TODO(), objKey, rebalancer); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}

// 2. Use the WorkloadRebalancer from cache to see if the TTL expires.
if expiredAt, err := c.processTTL(rebalancer); err != nil {
return err
} else if expiredAt == nil {
return nil
}

// 3. The WorkloadRebalancer's TTL is assumed to have expired, but the WorkloadRebalancer TTL might be stale.
// Before deleting the WorkloadRebalancer, do a final sanity check.
// If TTL is modified before we do this check, we cannot be sure if the TTL truly expires.
fresh := &appsv1alpha1.WorkloadRebalancer{}
if err := c.ControlPlaneClient.Get(context.TODO(), objKey, fresh); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}

// 4. Use the latest WorkloadRebalancer directly from api server to see if the TTL truly expires.
if expiredAt, err := c.processTTL(fresh); err != nil {
return err
} else if expiredAt == nil {
return nil
}

// 5. deletes the WorkloadRebalancer if TTL truly expires.
options := &client.DeleteOptions{Preconditions: &metav1.Preconditions{ResourceVersion: &fresh.ResourceVersion}}
if err := c.ControlPlaneClient.Delete(context.TODO(), fresh, options); err != nil {
klog.Errorf("Cleaning up WorkloadRebalancer(%s) failed: %+v", fresh.Name, err)
return err
}
klog.V(4).Infof("Cleaning up WorkloadRebalancer(%s) successful.", fresh.Name)

return nil
}

// processTTL checks whether a given WorkloadRebalancer's TTL has expired, and add it to the queue after
// the TTL is expected to expire if the TTL will expire later.
func (c *RebalancerController) processTTL(r *appsv1alpha1.WorkloadRebalancer) (expiredAt *time.Time, err error) {
if !needsCleanup(r) {
return nil, nil
}

remainingTTL, expireAt, err := timeLeft(r)
if err != nil {
return nil, err
}

// TTL has expired
if *remainingTTL <= 0 {
return expireAt, nil
}

c.ttlAfterFinishedWorker.AddAfter(client.ObjectKey{Name: r.Name}, *remainingTTL)
return nil, nil
}

func timeLeft(r *appsv1alpha1.WorkloadRebalancer) (*time.Duration, *time.Time, error) {
now := time.Now()
finishAt := r.Status.LastUpdateTime.Time
expireAt := finishAt.Add(time.Duration(*r.Spec.TTLMinutesAfterFinished) * time.Second)

if finishAt.After(now) {
klog.Infof("Found Rebalancer(%s) finished in the future. This is likely due to time skew in the cluster, cleanup will be deferred.", r.Name)
}

remainingTTL := expireAt.Sub(now)
klog.V(4).Infof("Found Rebalancer(%s) finished, finishTime: %+v, remainingTTL: %+v, startTime: %+v, deadlineTTL: %+v",
r.Name, finishAt.UTC(), remainingTTL, now.UTC(), expireAt.UTC())

return &remainingTTL, &expireAt, nil
}

// needsCleanup checks whether a WorkloadRebalancer has finished and has a TTL set.
func needsCleanup(r *appsv1alpha1.WorkloadRebalancer) bool {
return r.Spec.TTLMinutesAfterFinished != nil && isRebalancerFinished(r)
}

// isRebalancerFinished checks whether the given WorkloadRebalancer has finished execution.
// It does not discriminate between successful and failed terminations.
func isRebalancerFinished(r *appsv1alpha1.WorkloadRebalancer) bool {
// if a finished WorkloadRebalancer is updated and didn't have time to refresh the status,
// it is regarded as non-finished since observedGeneration not equal to generation.
if r.Status.ObservedGeneration != r.Generation {
return false
}
for _, workload := range r.Status.ObservedWorkloads {
if workload.Result != appsv1alpha1.RebalanceSuccessful && workload.Result != appsv1alpha1.RebalanceFailed {
return false
}
}
return true
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ const (

// RebalancerController is to handle a rebalance to workloads selected by WorkloadRebalancer object.
type RebalancerController struct {
Client client.Client
Client client.Client // used to operate WorkloadRebalancer resources from cache.
ControlPlaneClient client.Client // used to fetch arbitrary resources from api server.

ttlAfterFinishedWorker util.AsyncWorker
}
Expand All @@ -56,13 +57,13 @@ type RebalancerController struct {
func (c *RebalancerController) SetupWithManager(mgr controllerruntime.Manager) error {
var predicateFunc = predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
c.ttlAfterFinishedWorker.Add(e.Object)
c.ttlAfterFinishedWorker.Add(client.ObjectKey{Name: e.Object.GetName()})
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
oldObj := e.ObjectOld.(*appsv1alpha1.WorkloadRebalancer)
newObj := e.ObjectNew.(*appsv1alpha1.WorkloadRebalancer)
c.ttlAfterFinishedWorker.Add(newObj)
c.ttlAfterFinishedWorker.Add(client.ObjectKey{Name: newObj.GetName()})
return !reflect.DeepEqual(oldObj.Spec, newObj.Spec)
},
DeleteFunc: func(event.DeleteEvent) bool { return false },
Expand Down Expand Up @@ -122,7 +123,7 @@ func (c *RebalancerController) buildWorkloadRebalancerStatus(rebalancer *appsv1a
Workload: resource,
})
}
return appsv1alpha1.WorkloadRebalancerStatus{ObservedWorkloads: observedWorkloads}
return appsv1alpha1.WorkloadRebalancerStatus{ObservedWorkloads: observedWorkloads, ObservedGeneration: rebalancer.Generation}
}

// When spec filed of WorkloadRebalancer updated, we shall refresh the workload list in status.observedWorkloads:
Expand Down Expand Up @@ -155,7 +156,7 @@ func (c *RebalancerController) syncWorkloadsFromSpecToStatus(rebalancer *appsv1a
observedWorkloads = append(observedWorkloads, appsv1alpha1.ObservedWorkload{Workload: workload})
}

return appsv1alpha1.WorkloadRebalancerStatus{ObservedWorkloads: observedWorkloads}
return appsv1alpha1.WorkloadRebalancerStatus{ObservedWorkloads: observedWorkloads, ObservedGeneration: rebalancer.Generation}
}

func (c *RebalancerController) doWorkloadRebalance(ctx context.Context, rebalancer *appsv1alpha1.WorkloadRebalancer) (successNum int64, retryNum int64) {
Expand Down Expand Up @@ -239,7 +240,13 @@ func (c *RebalancerController) recordAndCountRebalancerFailed(resource *appsv1al
}

func (c *RebalancerController) updateWorkloadRebalancerStatus(ctx context.Context, oldRebalancer, rebalancer *appsv1alpha1.WorkloadRebalancer) error {
if reflect.DeepEqual(oldRebalancer.Status, rebalancer.Status) {
return nil
}

rebalancerPatch := client.MergeFrom(oldRebalancer)
lastUpdateTime := metav1.Now()
rebalancer.Status.LastUpdateTime = &lastUpdateTime

return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
klog.V(4).Infof("Start to patch WorkloadRebalancer(%s) status", rebalancer.Name)
Expand Down
22 changes: 21 additions & 1 deletion pkg/generated/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b016c25

Please sign in to comment.