Skip to content

Commit

Permalink
feat: ability to suspend Work
Browse files Browse the repository at this point in the history
Signed-off-by: Amir Alavi <[email protected]>
  • Loading branch information
a7i committed May 16, 2024
1 parent 6cfed59 commit d854b00
Show file tree
Hide file tree
Showing 28 changed files with 354 additions and 11 deletions.
12 changes: 12 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -19100,6 +19100,10 @@
"schedulerName": {
"description": "SchedulerName represents which scheduler to proceed the scheduling. If specified, the policy will be dispatched by specified scheduler. If not specified, the policy will be dispatched by default scheduler.",
"type": "string"
},
"suspend": {
"description": "Suspend will instruct all work objects to pause propagation to member clusters. Defaults to false.",
"type": "boolean"
}
}
},
Expand Down Expand Up @@ -19701,6 +19705,10 @@
"description": "WorkSpec defines the desired state of Work.",
"type": "object",
"properties": {
"suspend": {
"description": "Suspend tells the controller to suspend subsequent executions. Defaults to false.",
"type": "boolean"
},
"workload": {
"description": "Workload represents the manifest workload to be deployed on managed cluster.",
"default": {},
Expand Down Expand Up @@ -20152,6 +20160,10 @@
"schedulerName": {
"description": "SchedulerName represents which scheduler to proceed the scheduling. It inherits directly from the associated PropagationPolicy(or ClusterPropagationPolicy).",
"type": "string"
},
"suspend": {
"description": "Suspend tells the controller to suspend subsequent executions. Defaults to false.",
"type": "boolean"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,12 @@ spec:
If specified, the policy will be dispatched by specified scheduler.
If not specified, the policy will be dispatched by default scheduler.
type: string
suspend:
default: false
description: |-
Suspend will instruct all work objects to pause propagation to member clusters.
Defaults to false.
type: boolean
required:
- resourceSelectors
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,12 @@ spec:
If specified, the policy will be dispatched by specified scheduler.
If not specified, the policy will be dispatched by default scheduler.
type: string
suspend:
default: false
description: |-
Suspend will instruct all work objects to pause propagation to member clusters.
Defaults to false.
type: boolean
required:
- resourceSelectors
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ spec:
- kind
- name
type: object
suspend:
default: false
description: |-
Suspend will instruct all work objects to pause propagation to member clusters.
Defaults to false.
type: boolean
required:
- resource
type: object
Expand Down Expand Up @@ -1173,6 +1179,12 @@ spec:
SchedulerName represents which scheduler to proceed the scheduling.
It inherits directly from the associated PropagationPolicy(or ClusterPropagationPolicy).
type: string
suspend:
default: false
description: |-
Suspend tells the controller to suspend subsequent executions.
Defaults to false.
type: boolean
required:
- resource
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ spec:
- kind
- name
type: object
suspend:
default: false
description: |-
Suspend will instruct all work objects to pause propagation to member clusters.
Defaults to false.
type: boolean
required:
- resource
type: object
Expand Down Expand Up @@ -1173,6 +1179,12 @@ spec:
SchedulerName represents which scheduler to proceed the scheduling.
It inherits directly from the associated PropagationPolicy(or ClusterPropagationPolicy).
type: string
suspend:
default: false
description: |-
Suspend tells the controller to suspend subsequent executions.
Defaults to false.
type: boolean
required:
- resource
type: object
Expand Down
6 changes: 6 additions & 0 deletions charts/karmada/_crds/bases/work/work.karmada.io_works.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ spec:
spec:
description: Spec represents the desired behavior of Work.
properties:
suspend:
default: false
description: |-
Suspend tells the controller to suspend subsequent executions.
Defaults to false.
type: boolean
workload:
description: Workload represents the manifest workload to be deployed
on managed cluster.
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/policy/v1alpha1/propagation_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,13 @@ type PropagationSpec struct {
// +kubebuilder:validation:Enum=Lazy
// +optional
ActivationPreference ActivationPreference `json:"activationPreference,omitempty"`

// Suspend will instruct all work objects to pause propagation to member clusters.
// Defaults to false.
//
// +kubebuilder:default=false
// +optional
Suspend bool `json:"suspend,omitempty"`
}

// ResourceSelector the resources will be selected.
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/work/v1alpha1/binding_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ type ResourceBindingSpec struct {
// Clusters represents target member clusters where the resource to be deployed.
// +optional
Clusters []TargetCluster `json:"clusters,omitempty"`
// Suspend will instruct all work objects to pause propagation to member clusters.
// Defaults to false.
//
// +kubebuilder:default=false
// +optional
Suspend bool `json:"suspend,omitempty"`
}

// ObjectReference contains enough information to locate the referenced object inside current cluster.
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/work/v1alpha1/binding_types_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func ConvertBindingSpecToHub(src *ResourceBindingSpec, dst *workv1alpha2.Resourc
for i := range src.Clusters {
dst.Clusters = append(dst.Clusters, workv1alpha2.TargetCluster(src.Clusters[i]))
}

dst.Suspend = src.Suspend
}

// ConvertBindingStatusToHub converts ResourceBindingStatus to the Hub version.
Expand Down Expand Up @@ -124,6 +126,8 @@ func ConvertBindingSpecFromHub(src *workv1alpha2.ResourceBindingSpec, dst *Resou
for i := range src.Clusters {
dst.Clusters = append(dst.Clusters, TargetCluster(src.Clusters[i]))
}

dst.Suspend = src.Suspend
}

// ConvertBindingStatusFromHub converts ResourceBindingStatus from the Hub version.
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/work/v1alpha1/work_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ type Work struct {
type WorkSpec struct {
// Workload represents the manifest workload to be deployed on managed cluster.
Workload WorkloadTemplate `json:"workload,omitempty"`

// Suspend tells the controller to suspend subsequent executions.
// Defaults to false.
//
// +kubebuilder:default=false
// +optional
Suspend bool `json:"suspend,omitempty"`
}

// WorkloadTemplate represents the manifest workload to be deployed on managed cluster.
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/work/v1alpha2/binding_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ type ResourceBindingSpec struct {
// It is represented in RFC3339 form (like '2006-01-02T15:04:05Z') and is in UTC.
// +optional
RescheduleTriggeredAt *metav1.Time `json:"rescheduleTriggeredAt,omitempty"`

// Suspend tells the controller to suspend subsequent executions.
// Defaults to false.
//
// +kubebuilder:default=false
// +optional
Suspend bool `json:"suspend,omitempty"`
}

// ObjectReference contains enough information to locate the referenced object inside current cluster.
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/binding/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func ensureWork(
var requiredByBindingSnapshot []workv1alpha2.BindingSnapshot
var replicas int32
var conflictResolutionInBinding policyv1alpha1.ConflictResolution
var suspend bool
switch scope {
case apiextensionsv1.NamespaceScoped:
bindingObj := binding.(*workv1alpha2.ResourceBinding)
Expand All @@ -51,13 +52,15 @@ func ensureWork(
placement = bindingObj.Spec.Placement
replicas = bindingObj.Spec.Replicas
conflictResolutionInBinding = bindingObj.Spec.ConflictResolution
suspend = bindingObj.Spec.Suspend
case apiextensionsv1.ClusterScoped:
bindingObj := binding.(*workv1alpha2.ClusterResourceBinding)
targetClusters = bindingObj.Spec.Clusters
requiredByBindingSnapshot = bindingObj.Spec.RequiredBy
placement = bindingObj.Spec.Placement
replicas = bindingObj.Spec.Replicas
conflictResolutionInBinding = bindingObj.Spec.ConflictResolution
suspend = bindingObj.Spec.Suspend
}

targetClusters = mergeTargetClusters(targetClusters, requiredByBindingSnapshot)
Expand Down Expand Up @@ -126,7 +129,7 @@ func ensureWork(
Annotations: annotations,
}

if err = helper.CreateOrUpdateWork(c, workMeta, clonedWorkload); err != nil {
if err = helper.CreateOrUpdateWork(c, workMeta, clonedWorkload, suspend); err != nil {
return err
}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/controllers/execution/execution_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques
return c.removeFinalizer(work)
}

if work.Spec.Suspend {
klog.V(4).Infof("Skip syncing work(%s/%s) for cluster(%s) as work is suspended.", work.Namespace, work.Name, cluster.Name)
return controllerruntime.Result{}, nil
}

if !util.IsClusterReady(&cluster.Status) {
klog.Errorf("Stop syncing the work(%s/%s) for the cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name)
return controllerruntime.Result{}, fmt.Errorf("cluster(%s) not ready", cluster.Name)
Expand Down
118 changes: 118 additions & 0 deletions pkg/controllers/execution/execution_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
Copyright 2024 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package execution

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/helper"
)

func TestExecutionController_Reconcile(t *testing.T) {
tests := []struct {
name string
c Controller
work *workv1alpha1.Work
ns string
expectRes controllerruntime.Result
existErr bool
}{
{
name: "work is suspended, no error, no apply",
c: newController(newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)),
work: &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
Name: "work",
Namespace: "karmada-es-cluster",
},
Spec: workv1alpha1.WorkSpec{
Suspend: true,
},
Status: workv1alpha1.WorkStatus{
Conditions: []metav1.Condition{
{
Type: workv1alpha1.WorkApplied,
Status: metav1.ConditionTrue,
},
},
},
},
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req := controllerruntime.Request{
NamespacedName: types.NamespacedName{
Name: "work",
Namespace: tt.ns,
},
}

if err := tt.c.Client.Create(context.Background(), tt.work); err != nil {
t.Fatalf("Failed to create cluster: %v", err)
}

res, err := tt.c.Reconcile(context.Background(), req)
assert.Equal(t, tt.expectRes, res)
if tt.existErr {
assert.NotEmpty(t, err)
} else {
assert.Empty(t, err)
}
})
}
}

func newController(objects ...client.Object) Controller {
return Controller{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(objects...).Build(),
InformerManager: genericmanager.GetInstance(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
}
}

func newCluster(name string, clusterType string, clusterStatus metav1.ConditionStatus) *clusterv1alpha1.Cluster {
return &clusterv1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: clusterv1alpha1.ClusterSpec{},
Status: clusterv1alpha1.ClusterStatus{
Conditions: []metav1.Condition{
{
Type: clusterType,
Status: clusterStatus,
},
},
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *SyncController) buildWorks(quota *policyv1alpha1.FederatedResourceQuota
},
}

err = helper.CreateOrUpdateWork(c.Client, objectMeta, resourceQuotaObj)
err = helper.CreateOrUpdateWork(c.Client, objectMeta, resourceQuotaObj, false)
if err != nil {
errs = append(errs, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/mcs/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func reportEndpointSlice(c client.Client, endpointSlice *unstructured.Unstructur
return err
}

if err := helper.CreateOrUpdateWork(c, workMeta, endpointSlice); err != nil {
if err := helper.CreateOrUpdateWork(c, workMeta, endpointSlice, false); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func reportEndpointSlice(c client.Client, endpointSlice *unstructured.Unstructur
return err
}

if err := helper.CreateOrUpdateWork(c, workMeta, endpointSlice); err != nil {
if err := helper.CreateOrUpdateWork(c, workMeta, endpointSlice, false); err != nil {
klog.Errorf("Failed to create or update work(%s/%s), Error: %v", workMeta.Namespace, workMeta.Name, err)
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func (c *EndpointsliceDispatchController) ensureEndpointSliceWork(mcs *networkin
klog.Errorf("Failed to convert typed object to unstructured object, error is: %v", err)
return err
}
if err := helper.CreateOrUpdateWork(c.Client, workMeta, unstructuredEPS); err != nil {
if err := helper.CreateOrUpdateWork(c.Client, workMeta, unstructuredEPS, false); err != nil {
klog.Errorf("Failed to dispatch EndpointSlice %s/%s from %s to cluster %s:%v",
work.GetNamespace(), work.GetName(), providerCluster, consumerCluster, err)
return err
Expand Down
Loading

0 comments on commit d854b00

Please sign in to comment.