From 97dc2625f641d4c5f804cf4e961104c7fb745b9f Mon Sep 17 00:00:00 2001 From: mlmhl Date: Thu, 10 Jan 2019 19:18:37 +0800 Subject: [PATCH] avoid concurrent processing of same PVC --- pkg/controller/controller.go | 29 ++++++++++++++++++------- pkg/util/util.go | 38 ++++++++++++++++++++++++++++++-- pkg/util/util_test.go | 42 ++++++++++++++++++++++++++++++++++++ 3 files changed, 99 insertions(+), 10 deletions(-) create mode 100644 pkg/util/util_test.go diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 738ab2d72..eac9f4ba0 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -24,7 +24,7 @@ import ( "github.com/kubernetes-csi/external-resizer/pkg/resizer" "github.com/kubernetes-csi/external-resizer/pkg/util" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -145,6 +145,19 @@ func (ctrl *resizeController) updatePVC(oldObj, newObj interface{}) { // in-tree resizer name to CSI driver name. if newSize.Cmp(oldSize) > 0 || newResizerName != oldResizerName { ctrl.addPVC(newObj) + } else { + // PVC's size not changed, so this Update event maybe caused by: + // + // 1. Administrators or users introduce other changes(such as add labels, modify annotations, etc.) + // unrelated to volume resize. + // 2. Informer resynced the PVC and send this Update event without any changes. + // + // If it is case 1, we can just discard this event. If case 2, we need to put it into the queue to + // perform a resync operation. + if newPVC.ResourceVersion == oldPVC.ResourceVersion { + // This is case 2. + ctrl.addPVC(newObj) + } } } @@ -366,12 +379,13 @@ func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeCl return util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient) } -func (ctrl *resizeController) markPVCResizeFinished(pvc *v1.PersistentVolumeClaim, newSize resource.Quantity) error { +func (ctrl *resizeController) markPVCResizeFinished( + pvc *v1.PersistentVolumeClaim, + newSize resource.Quantity) error { newPVC := pvc.DeepCopy() newPVC.Status.Capacity[v1.ResourceStorage] = newSize newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(pvc.Status.Conditions, []v1.PersistentVolumeClaimCondition{}) - _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient) - if err != nil { + if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil { klog.Errorf("Mark PVC %q as resize finished failed: %v", util.PVCKey(pvc), err) return err } @@ -392,15 +406,14 @@ func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolume newPVC := pvc.DeepCopy() newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions, []v1.PersistentVolumeClaimCondition{pvcCondition}) - _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient) - if err != nil { + + if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil { klog.Errorf("Mark PVC %q as file system resize required failed: %v", util.PVCKey(pvc), err) return err } - klog.V(4).Infof("Mark PVC %q as file system resize required", util.PVCKey(pvc)) ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal, util.FileSystemResizeRequired, "Require file system resize of volume on node") - return err + return nil } diff --git a/pkg/util/util.go b/pkg/util/util.go index 829be56fe..080f2b1ff 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -22,7 +22,9 @@ import ( "regexp" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/kubernetes" @@ -96,7 +98,7 @@ func PatchPVCStatus( oldPVC *v1.PersistentVolumeClaim, newPVC *v1.PersistentVolumeClaim, kubeClient kubernetes.Interface) (*v1.PersistentVolumeClaim, error) { - patchBytes, err := getPatchData(oldPVC, newPVC) + patchBytes, err := getPVCPatchData(oldPVC, newPVC) if err != nil { return nil, fmt.Errorf("can't patch status of PVC %s as generate path data failed: %v", PVCKey(oldPVC), err) } @@ -108,6 +110,38 @@ func PatchPVCStatus( return updatedClaim, nil } +func getPVCPatchData(oldPVC, newPVC *v1.PersistentVolumeClaim) ([]byte, error) { + patchBytes, err := getPatchData(oldPVC, newPVC) + if err != nil { + return patchBytes, err + } + + patchBytes, err = addResourceVersion(patchBytes, oldPVC.ResourceVersion) + if err != nil { + return nil, fmt.Errorf("apply ResourceVersion to patch data failed: %v", err) + } + return patchBytes, nil +} + +func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, error) { + var patchMap map[string]interface{} + err := json.Unmarshal(patchBytes, &patchMap) + if err != nil { + return nil, fmt.Errorf("error unmarshalling patch with %v", err) + } + u := unstructured.Unstructured{Object: patchMap} + a, err := meta.Accessor(&u) + if err != nil { + return nil, fmt.Errorf("error creating accessor with %v", err) + } + a.SetResourceVersion(resourceVersion) + versionBytes, err := json.Marshal(patchMap) + if err != nil { + return nil, fmt.Errorf("error marshalling json patch with %v", err) + } + return versionBytes, nil +} + // UpdatePVCapacity updates PVC capacity with requested size. func UpdatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity, kubeClient kubernetes.Interface) error { newPV := pv.DeepCopy() @@ -130,7 +164,7 @@ func getPatchData(oldObj, newObj interface{}) ([]byte, error) { } newData, err := json.Marshal(newObj) if err != nil { - return nil, fmt.Errorf("mashal new object failed: %v", err) + return nil, fmt.Errorf("marshal new object failed: %v", err) } patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldObj) if err != nil { diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go new file mode 100644 index 000000000..6bfd599b3 --- /dev/null +++ b/pkg/util/util_test.go @@ -0,0 +1,42 @@ +package util + +import ( + "encoding/json" + "testing" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestGetPVCPatchData(t *testing.T) { + for i, c := range []struct { + OldPVC *v1.PersistentVolumeClaim + }{ + {&v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}}, + {&v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}}, + } { + newPVC := c.OldPVC.DeepCopy() + newPVC.Status.Conditions = append(newPVC.Status.Conditions, + v1.PersistentVolumeClaimCondition{Type: VolumeResizing, Status: v1.ConditionTrue}) + patchBytes, err := getPVCPatchData(c.OldPVC, newPVC) + if err != nil { + t.Errorf("Case %d: Get patch data failed: %v", i, err) + } + + var patchMap map[string]interface{} + err = json.Unmarshal(patchBytes, &patchMap) + if err != nil { + t.Errorf("Case %d: unmarshalling json patch failed: %v", i, err) + } + + metadata, exist := patchMap["metadata"].(map[string]interface{}) + if !exist { + t.Errorf("Case %d: ResourceVersion should exist in patch data", i) + } + resourceVersion := metadata["resourceVersion"].(string) + if resourceVersion != c.OldPVC.ResourceVersion { + t.Errorf("Case %d: ResourceVersion should be %s, got %s", + i, c.OldPVC.ResourceVersion, resourceVersion) + } + } +}