Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: work status sync when work dispatching is suspended #5403

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions pkg/controllers/status/work_status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,25 @@ func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error {
return nil
}

if err := c.updateResource(ctx, observedObj, workObject, fedKey); err != nil {
return err
}

klog.Infof("Reflecting the resource(kind=%s, %s/%s) status to the Work(%s/%s).", observedObj.GetKind(), observedObj.GetNamespace(), observedObj.GetName(), workNamespace, workName)
return c.reflectStatus(ctx, workObject, observedObj)
}

func (c *WorkStatusController) updateResource(ctx context.Context, observedObj *unstructured.Unstructured, workObject *workv1alpha1.Work, fedKey keys.FederatedKey) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks great to have this method dedicated to updating. Thanks.

if helper.IsWorkSuspendDispatching(workObject) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @a7i If we add this judgment to this position, we also discard the state collection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, updated.

return nil
}

desiredObj, err := c.getRawManifest(workObject.Spec.Workload.Manifests, observedObj)
if err != nil {
return err
}

clusterName, err := names.GetClusterName(workNamespace)
clusterName, err := names.GetClusterName(workObject.Namespace)
if err != nil {
klog.Errorf("Failed to get member cluster name: %v", err)
return err
Expand Down Expand Up @@ -255,9 +268,7 @@ func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error {
// also needs to update again. The update operation will be a non-operation if the event triggered by Service's
// status changes.
}

klog.Infof("Reflecting the resource(kind=%s, %s/%s) status to the Work(%s/%s).", observedObj.GetKind(), observedObj.GetNamespace(), observedObj.GetName(), workNamespace, workName)
return c.reflectStatus(ctx, workObject, observedObj)
return nil
}

func (c *WorkStatusController) handleDeleteEvent(ctx context.Context, key keys.FederatedKey) error {
Expand Down
71 changes: 40 additions & 31 deletions pkg/controllers/status/work_status_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/informers"
kubernetesfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/resourceinterpreter/default/native"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
Expand Down Expand Up @@ -571,18 +571,16 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
pod *corev1.Pod
raw []byte
controllerWithoutInformer bool
workWithRightNS bool
expectedError bool
workWithDeletionTimestamp bool
wrongWorkNS bool
workApplyFunc func(work *workv1alpha1.Work)
}{
{
name: "failed to exec NeedUpdate",
obj: newPodObj("karmada-es-cluster"),
pod: newPod(workNs, workName),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: true,
workWithRightNS: true,
expectedError: true,
},
{
Expand All @@ -591,7 +589,6 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
pod: newPod(workNs, workName),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: true,
workWithRightNS: true,
expectedError: true,
},
{
Expand All @@ -600,15 +597,13 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
pod: newPod(workNs, workName),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: false,
workWithRightNS: true,
expectedError: true,
},
{
name: "obj not found in informer, wrong dynamicClientSet without pod",
obj: newPodObj("karmada-es-cluster"),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: true,
workWithRightNS: true,
expectedError: false,
},
{
Expand All @@ -617,7 +612,6 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
pod: newPod(workNs, workName, true),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: true,
workWithRightNS: true,
expectedError: false,
},
{
Expand All @@ -626,16 +620,17 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
pod: newPod(workNs, workName),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: true,
workWithRightNS: false,
expectedError: false,
workApplyFunc: func(work *workv1alpha1.Work) {
work.SetName(fmt.Sprintf("%v-test", workNs))
},
},
{
name: "failed to getRawManifest, wrong Manifests in work",
obj: newPodObj("karmada-es-cluster"),
pod: newPod(workNs, workName),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod1","namespace":"default"}}`),
controllerWithoutInformer: true,
workWithRightNS: true,
expectedError: true,
},
{
Expand All @@ -644,10 +639,31 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
pod: newPod(workNs, workName),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: true,
workWithRightNS: true,
expectedError: true,
wrongWorkNS: true,
},
{
name: "skips work with suspend dispatching",
obj: newPodObj("karmada-es-cluster"),
pod: newPod(workNs, workName),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: true,
expectedError: false,
workApplyFunc: func(work *workv1alpha1.Work) {
work.Spec.SuspendDispatching = ptr.To(true)
},
},
{
name: "skips work with deletion timestamp",
obj: newPodObj("karmada-es-cluster"),
pod: newPod(workNs, workName),
raw: []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`),
controllerWithoutInformer: true,
expectedError: false,
workApplyFunc: func(work *workv1alpha1.Work) {
work.SetDeletionTimestamp(ptr.To(metav1.Now()))
},
},
}

for _, tt := range tests {
Expand All @@ -671,11 +687,9 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {
c = newWorkStatusController(cluster)
}

var work *workv1alpha1.Work
if tt.workWithRightNS {
work = testhelper.NewWork(workName, workNs, workUID, tt.raw)
} else {
work = testhelper.NewWork(workName, fmt.Sprintf("%v-test", workNs), workUID, tt.raw)
work := testhelper.NewWork(workName, workNs, workUID, tt.raw)
if tt.workApplyFunc != nil {
tt.workApplyFunc(work)
}

key, _ := generateKey(tt.obj)
Expand All @@ -686,23 +700,24 @@ func TestWorkStatusController_syncWorkStatus(t *testing.T) {

err := c.syncWorkStatus(key)
if tt.expectedError {
assert.NotEmpty(t, err)
assert.Error(t, err)
} else {
assert.Empty(t, err)
assert.NoError(t, err)
}
})
}
}

func newWorkStatusController(cluster *clusterv1alpha1.Cluster, dynamicClientSets ...*dynamicfake.FakeDynamicClient) WorkStatusController {
c := WorkStatusController{
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster).Build(),
Client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster).WithStatusSubresource().Build(),
InformerManager: genericmanager.GetInstance(),
PredicateFunc: helper.NewClusterPredicateOnAgent("test"),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: metav1.Duration{},
RateLimiterOptions: ratelimiterflag.Options{},
eventHandler: nil,
EventRecorder: record.NewFakeRecorder(1024),
RESTMapper: func() meta.RESTMapper {
m := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion})
m.Add(corev1.SchemeGroupVersion.WithKind("Pod"), meta.RESTScopeNamespace)
Expand All @@ -711,21 +726,15 @@ func newWorkStatusController(cluster *clusterv1alpha1.Cluster, dynamicClientSets
}

if len(dynamicClientSets) > 0 {
c.ResourceInterpreter = FakeResourceInterpreter{DefaultInterpreter: native.NewDefaultInterpreter()}
c.ObjectWatcher = objectwatcher.NewObjectWatcher(c.Client, c.RESTMapper, util.NewClusterDynamicClientSetForAgent, c.ResourceInterpreter)

// Generate InformerManager
clusterName := cluster.Name
dynamicClientSet := dynamicClientSets[0]
// Generate ResourceInterpreter and ObjectWatcher
stopCh := make(chan struct{})
defer close(stopCh)

controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopCh)
controlPlaneKubeClientSet := kubernetesfake.NewSimpleClientset()
sharedFactory := informers.NewSharedInformerFactory(controlPlaneKubeClientSet, 0)
serviceLister := sharedFactory.Core().V1().Services().Lister()

c.ResourceInterpreter = resourceinterpreter.NewResourceInterpreter(controlPlaneInformerManager, serviceLister)
c.ObjectWatcher = objectwatcher.NewObjectWatcher(c.Client, c.RESTMapper, util.NewClusterDynamicClientSetForAgent, c.ResourceInterpreter)

// Generate InformerManager
m := genericmanager.NewMultiClusterInformerManager(stopCh)
m.ForCluster(clusterName, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods")) // register pod informer
m.Start(clusterName)
Expand Down