Skip to content

Commit

Permalink
avoid concurrent processing of same PVC
Browse files Browse the repository at this point in the history
  • Loading branch information
mlmhl committed Jul 1, 2019
1 parent 24780ba commit 97dc262
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 10 deletions.
29 changes: 21 additions & 8 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/kubernetes-csi/external-resizer/pkg/resizer"
"github.com/kubernetes-csi/external-resizer/pkg/util"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -145,6 +145,19 @@ func (ctrl *resizeController) updatePVC(oldObj, newObj interface{}) {
// in-tree resizer name to CSI driver name.
if newSize.Cmp(oldSize) > 0 || newResizerName != oldResizerName {
ctrl.addPVC(newObj)
} else {
// PVC's size not changed, so this Update event maybe caused by:
//
// 1. Administrators or users introduce other changes(such as add labels, modify annotations, etc.)
// unrelated to volume resize.
// 2. Informer resynced the PVC and send this Update event without any changes.
//
// If it is case 1, we can just discard this event. If case 2, we need to put it into the queue to
// perform a resync operation.
if newPVC.ResourceVersion == oldPVC.ResourceVersion {
// This is case 2.
ctrl.addPVC(newObj)
}
}
}

Expand Down Expand Up @@ -366,12 +379,13 @@ func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeCl
return util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
}

func (ctrl *resizeController) markPVCResizeFinished(pvc *v1.PersistentVolumeClaim, newSize resource.Quantity) error {
func (ctrl *resizeController) markPVCResizeFinished(
pvc *v1.PersistentVolumeClaim,
newSize resource.Quantity) error {
newPVC := pvc.DeepCopy()
newPVC.Status.Capacity[v1.ResourceStorage] = newSize
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(pvc.Status.Conditions, []v1.PersistentVolumeClaimCondition{})
_, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
if err != nil {
if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil {
klog.Errorf("Mark PVC %q as resize finished failed: %v", util.PVCKey(pvc), err)
return err
}
Expand All @@ -392,15 +406,14 @@ func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolume
newPVC := pvc.DeepCopy()
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
[]v1.PersistentVolumeClaimCondition{pvcCondition})
_, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient)
if err != nil {

if _, err := util.PatchPVCStatus(pvc, newPVC, ctrl.kubeClient); err != nil {
klog.Errorf("Mark PVC %q as file system resize required failed: %v", util.PVCKey(pvc), err)
return err
}

klog.V(4).Infof("Mark PVC %q as file system resize required", util.PVCKey(pvc))
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal,
util.FileSystemResizeRequired, "Require file system resize of volume on node")

return err
return nil
}
38 changes: 36 additions & 2 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"regexp"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -96,7 +98,7 @@ func PatchPVCStatus(
oldPVC *v1.PersistentVolumeClaim,
newPVC *v1.PersistentVolumeClaim,
kubeClient kubernetes.Interface) (*v1.PersistentVolumeClaim, error) {
patchBytes, err := getPatchData(oldPVC, newPVC)
patchBytes, err := getPVCPatchData(oldPVC, newPVC)
if err != nil {
return nil, fmt.Errorf("can't patch status of PVC %s as generate path data failed: %v", PVCKey(oldPVC), err)
}
Expand All @@ -108,6 +110,38 @@ func PatchPVCStatus(
return updatedClaim, nil
}

func getPVCPatchData(oldPVC, newPVC *v1.PersistentVolumeClaim) ([]byte, error) {
patchBytes, err := getPatchData(oldPVC, newPVC)
if err != nil {
return patchBytes, err
}

patchBytes, err = addResourceVersion(patchBytes, oldPVC.ResourceVersion)
if err != nil {
return nil, fmt.Errorf("apply ResourceVersion to patch data failed: %v", err)
}
return patchBytes, nil
}

func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, error) {
var patchMap map[string]interface{}
err := json.Unmarshal(patchBytes, &patchMap)
if err != nil {
return nil, fmt.Errorf("error unmarshalling patch with %v", err)
}
u := unstructured.Unstructured{Object: patchMap}
a, err := meta.Accessor(&u)
if err != nil {
return nil, fmt.Errorf("error creating accessor with %v", err)
}
a.SetResourceVersion(resourceVersion)
versionBytes, err := json.Marshal(patchMap)
if err != nil {
return nil, fmt.Errorf("error marshalling json patch with %v", err)
}
return versionBytes, nil
}

// UpdatePVCapacity updates PVC capacity with requested size.
func UpdatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity, kubeClient kubernetes.Interface) error {
newPV := pv.DeepCopy()
Expand All @@ -130,7 +164,7 @@ func getPatchData(oldObj, newObj interface{}) ([]byte, error) {
}
newData, err := json.Marshal(newObj)
if err != nil {
return nil, fmt.Errorf("mashal new object failed: %v", err)
return nil, fmt.Errorf("marshal new object failed: %v", err)
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldObj)
if err != nil {
Expand Down
42 changes: 42 additions & 0 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package util

import (
"encoding/json"
"testing"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestGetPVCPatchData(t *testing.T) {
for i, c := range []struct {
OldPVC *v1.PersistentVolumeClaim
}{
{&v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}},
{&v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}},
} {
newPVC := c.OldPVC.DeepCopy()
newPVC.Status.Conditions = append(newPVC.Status.Conditions,
v1.PersistentVolumeClaimCondition{Type: VolumeResizing, Status: v1.ConditionTrue})
patchBytes, err := getPVCPatchData(c.OldPVC, newPVC)
if err != nil {
t.Errorf("Case %d: Get patch data failed: %v", i, err)
}

var patchMap map[string]interface{}
err = json.Unmarshal(patchBytes, &patchMap)
if err != nil {
t.Errorf("Case %d: unmarshalling json patch failed: %v", i, err)
}

metadata, exist := patchMap["metadata"].(map[string]interface{})
if !exist {
t.Errorf("Case %d: ResourceVersion should exist in patch data", i)
}
resourceVersion := metadata["resourceVersion"].(string)
if resourceVersion != c.OldPVC.ResourceVersion {
t.Errorf("Case %d: ResourceVersion should be %s, got %s",
i, c.OldPVC.ResourceVersion, resourceVersion)
}
}
}

0 comments on commit 97dc262

Please sign in to comment.