Skip to content

Commit

Permalink
Extend allowed DataSources
Browse files Browse the repository at this point in the history
This PR extends the supported DataSources handled by the
external-provisioner to include PVCs (Clone).

It removes the restriction of of the provisioner erroring when a
DataSource.Kind of anything other than VolumeSnapshot is supplied
and adds handling for a DataSource.Kind of PersistentVolumeClaim.

This implements the ability to specify a clone, and pass that dataSource
on to plugins that report support for the capability.

Addresses: Issue kubernetes-csi#172
  • Loading branch information
j-griffith committed May 30, 2019
1 parent 967b7a3 commit 33ea9a5
Show file tree
Hide file tree
Showing 2 changed files with 258 additions and 16 deletions.
104 changes: 89 additions & 15 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ const (
defaultFSType = "ext4"

snapshotKind = "VolumeSnapshot"
snapshotAPIGroup = snapapi.GroupName // "snapshot.storage.k8s.io"
snapshotAPIGroup = snapapi.GroupName // "snapshot.storage.k8s.io"
pvcKind = "PersistentVolumeClaim" // Native types don't require an API group

tokenPVNameKey = "pv.name"
tokenPVCNameKey = "pvc.name"
Expand Down Expand Up @@ -146,6 +147,12 @@ var (
}
)

// requiredCapabilities provides a set of extra capabilities required for special/optional features provided by a plugin
type requiredCapabilities struct {
snapshot bool
clone bool
}

// CSIProvisioner struct
type csiProvisioner struct {
client kubernetes.Interface
Expand Down Expand Up @@ -239,7 +246,7 @@ func NewCSIProvisioner(client kubernetes.Interface,
// This function get called before any attempt to communicate with the driver.
// Before initiating Create/Delete API calls provisioner checks if Capabilities:
// PluginControllerService, ControllerCreateVolume sre supported and gets the driver name.
func (p *csiProvisioner) checkDriverCapabilities(needSnapshotSupport bool) error {
func (p *csiProvisioner) checkDriverCapabilities(rc *requiredCapabilities) error {
if !p.pluginCapabilities[csi.PluginCapability_Service_CONTROLLER_SERVICE] {
return fmt.Errorf("CSI driver does not support dynamic provisioning: plugin CONTROLLER_SERVICE capability is not reported")
}
Expand All @@ -248,13 +255,20 @@ func (p *csiProvisioner) checkDriverCapabilities(needSnapshotSupport bool) error
return fmt.Errorf("CSI driver does not support dynamic provisioning: controller CREATE_DELETE_VOLUME capability is not reported")
}

if needSnapshotSupport {
if rc.snapshot {
// Check whether plugin supports create snapshot
// If not, create volume from snapshot cannot proceed
if !p.controllerCapabilities[csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT] {
return fmt.Errorf("CSI driver does not support snapshot restore: controller CREATE_DELETE_SNAPSHOT capability is not reported")
}
}
if rc.clone {
// Check whether plugin supports clone operations
// If not, create volume from pvc cannot proceed
if !p.controllerCapabilities[csi.ControllerServiceCapability_RPC_CLONE_VOLUME] {
return fmt.Errorf("CSI driver does not support clone operations: controller CLONE_VOLUME capability is not reported")
}
}

return nil
}
Expand Down Expand Up @@ -353,22 +367,27 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per
}
}

var needSnapshotSupport bool
// Make sure the plugin is capable of fulfilling the requested options
rc := &requiredCapabilities{}
if options.PVC.Spec.DataSource != nil {
// PVC.Spec.DataSource.Name is the name of the VolumeSnapshot API object
if options.PVC.Spec.DataSource.Name == "" {
return nil, fmt.Errorf("the PVC source not found for PVC %s", options.PVC.Name)
}
if options.PVC.Spec.DataSource.Kind != snapshotKind {
return nil, fmt.Errorf("the PVC source is not the right type. Expected %s, Got %s", snapshotKind, options.PVC.Spec.DataSource.Kind)
}
if *(options.PVC.Spec.DataSource.APIGroup) != snapshotAPIGroup {
return nil, fmt.Errorf("the PVC source does not belong to the right APIGroup. Expected %s, Got %s", snapshotAPIGroup, *(options.PVC.Spec.DataSource.APIGroup))

switch options.PVC.Spec.DataSource.Kind {
case snapshotKind:
if *(options.PVC.Spec.DataSource.APIGroup) != snapshotAPIGroup {
return nil, fmt.Errorf("the PVC source does not belong to the right APIGroup. Expected %s, Got %s", snapshotAPIGroup, *(options.PVC.Spec.DataSource.APIGroup))
}
rc.snapshot = true
case pvcKind:
rc.clone = true
default:
klog.Infof("non csi DataSource specified (%s), the provisioner won't act on this request", options.PVC.Spec.DataSource.Kind)
}
needSnapshotSupport = true
}

if err := p.checkDriverCapabilities(needSnapshotSupport); err != nil {
if err := p.checkDriverCapabilities(rc); err != nil {
return nil, err
}

Expand Down Expand Up @@ -418,10 +437,10 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per
},
}

if needSnapshotSupport {
if options.PVC.Spec.DataSource != nil && (rc.clone || rc.snapshot) {
volumeContentSource, err := p.getVolumeContentSource(options)
if err != nil {
return nil, fmt.Errorf("error getting snapshot handle for snapshot %s: %v", options.PVC.Spec.DataSource.Name, err)
return nil, fmt.Errorf("error getting handle for DataSource Type %s by Name %s: %v", options.PVC.Spec.DataSource.Kind, options.PVC.Spec.DataSource.Name, err)
}
req.VolumeContentSource = volumeContentSource
}
Expand Down Expand Up @@ -598,7 +617,61 @@ func removePrefixedParameters(param map[string]string) (map[string]string, error
return newParam, nil
}

// getVolumeContentSource is a helper function to process provisioning requests that include a DataSource
// currently we provide Snapshot and PVC, the default case allows the provisioner to still create a volume
// so that an external controller can act upon it. Additional DataSource types can be added here with
// an appropriate implementation function
func (p *csiProvisioner) getVolumeContentSource(options controller.ProvisionOptions) (*csi.VolumeContentSource, error) {
switch options.PVC.Spec.DataSource.Kind {
case snapshotKind:
return p.getSnapshotSource(options)
case pvcKind:
return p.getPVCSource(options)
default:
// For now we shouldn't pass other things to this function, but treat it as a noop and extend as needed
return nil, nil
}
}

// getPVCSource verifies DataSource.Kind of type PersistentVolumeClaim, making sure that the requested PVC is available/ready
// returns the VolumeContentSource for the requested PVC
func (p *csiProvisioner) getPVCSource(options controller.ProvisionOptions) (*csi.VolumeContentSource, error) {
sourcePVC, err := p.client.CoreV1().PersistentVolumeClaims(options.PVC.Namespace).Get(options.PVC.Spec.DataSource.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error getting PVC %s from api server: %v", options.PVC.Spec.DataSource.Name, err)
}
if string(sourcePVC.Status.Phase) != "Bound" {
return nil, fmt.Errorf("the PVC DataSource %s must have a status of Bound. Got %v", options.PVC.Spec.DataSource.Name, sourcePVC.Status)
}
if sourcePVC.ObjectMeta.DeletionTimestamp != nil {
return nil, fmt.Errorf("the PVC DataSource %s is currently being deleted", options.PVC.Spec.DataSource.Name)
}
if sourcePVC.Spec.StorageClassName != options.PVC.Spec.StorageClassName {
return nil, fmt.Errorf("the source PVC and destination PVCs must be in the same storage class for cloning. Source is in %v, but new PVC is in %v",
sourcePVC.Spec.StorageClassName, options.PVC.Spec.StorageClassName)
}
capacity := options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
requestedSize := capacity.Value()
if requestedSize < int64(sourcePVC.Spec.Size()) {
return nil, fmt.Errorf("error, new PVC request must be greater than or equal in size to the specified PVC data source, requested %v but source is %v", requestedSize, sourcePVC.Spec.Size())
}

volumeSource := csi.VolumeContentSource_Volume{
Volume: &csi.VolumeContentSource_VolumeSource{
VolumeId: sourcePVC.Spec.VolumeName,
},
}
klog.V(5).Infof("VolumeContentSource_Volume %+v", volumeSource)

volumeContentSource := &csi.VolumeContentSource{
Type: &volumeSource,
}
return volumeContentSource, nil
}

// getSnapshotSource verifies DataSource.Kind of type VolumeSnapshot, making sure that the requested Snapshot is available/ready
// returns the VolumeContentSource for the requested snapshot
func (p *csiProvisioner) getSnapshotSource(options controller.ProvisionOptions) (*csi.VolumeContentSource, error) {
snapshotObj, err := p.snapshotClient.VolumesnapshotV1alpha1().VolumeSnapshots(options.PVC.Namespace).Get(options.PVC.Spec.DataSource.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error getting snapshot %s from api server: %v", options.PVC.Spec.DataSource.Name, err)
Expand Down Expand Up @@ -676,7 +749,8 @@ func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {

volumeId := p.volumeHandleToId(volume.Spec.CSI.VolumeHandle)

if err = p.checkDriverCapabilities(false); err != nil {
rc := &requiredCapabilities{}
if err := p.checkDriverCapabilities(rc); err != nil {
return err
}

Expand Down
170 changes: 169 additions & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,11 +455,21 @@ func provisionWithTopologyCapabilities() (connection.PluginCapabilitySet, connec
}
}

func provisionFromPVCCapabilities() (connection.PluginCapabilitySet, connection.ControllerCapabilitySet) {
return connection.PluginCapabilitySet{
csi.PluginCapability_Service_CONTROLLER_SERVICE: true,
}, connection.ControllerCapabilitySet{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME: true,
csi.ControllerServiceCapability_RPC_CLONE_VOLUME: true,
}
}

// Minimal PVC required for tests to function
func createFakePVC(requestBytes int64) *v1.PersistentVolumeClaim {
return &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
UID: "testid",
UID: "testid",
Name: "fake-pvc",
},
Spec: v1.PersistentVolumeClaimSpec{
Selector: nil, // Provisioner doesn't support selector
Expand All @@ -479,6 +489,39 @@ func createFakePVCWithVolumeMode(requestBytes int64, volumeMode v1.PersistentVol
return claim
}

// fakeClaim returns a valid PVC with the requested settings
func fakeClaim(name, claimUID, capacity, boundToVolume string, phase v1.PersistentVolumeClaimPhase, class *string) *v1.PersistentVolumeClaim {
claim := v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "testns",
UID: types.UID(claimUID),
ResourceVersion: "1",
SelfLink: "/api/v1/namespaces/testns/persistentvolumeclaims/" + name,
},
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce, v1.ReadOnlyMany},
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): resource.MustParse(capacity),
},
},
VolumeName: boundToVolume,
StorageClassName: class,
},
Status: v1.PersistentVolumeClaimStatus{
Phase: phase,
},
}

if phase == v1.ClaimBound {
claim.Status.AccessModes = claim.Spec.AccessModes
claim.Status.Capacity = claim.Spec.Resources.Requests
}

return &claim

}
func TestGetSecretReference(t *testing.T) {
testcases := map[string]struct {
secretParams secretParamsMap
Expand Down Expand Up @@ -2086,3 +2129,128 @@ func runDeleteTest(t *testing.T, k string, tc deleteTestcase) {
t.Errorf("test %q: got error: %v", k, err)
}
}

// TestProvisionFromPVC tests create volume clone
func TestProvisionFromPVC(t *testing.T) {
var requestedBytes int64 = 1000
srcName := "fake-pvc"
deletePolicy := v1.PersistentVolumeReclaimDelete

type pvSpec struct {
Name string
ReclaimPolicy v1.PersistentVolumeReclaimPolicy
AccessModes []v1.PersistentVolumeAccessMode
Capacity v1.ResourceList
CSIPVS *v1.CSIPersistentVolumeSource
}

testcases := map[string]struct {
volOpts controller.ProvisionOptions
restoredVolSizeSmall bool
wrongDataSource bool
pvcStatusReady bool
expectedPVSpec *pvSpec
expectErr bool
}{
"provision with pvc data source": {
volOpts: controller.ProvisionOptions{
StorageClass: &storagev1.StorageClass{
ReclaimPolicy: &deletePolicy,
Parameters: map[string]string{},
},
PVName: "test-name",
PVC: &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
UID: "testid",
},
Spec: v1.PersistentVolumeClaimSpec{
Selector: nil,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): resource.MustParse(strconv.FormatInt(requestedBytes, 10)),
},
},
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
DataSource: &v1.TypedLocalObjectReference{
Name: srcName,
Kind: "PersistentVolumeClaim",
},
},
},
},
pvcStatusReady: true,
expectedPVSpec: &pvSpec{
Name: "test-testi",
ReclaimPolicy: v1.PersistentVolumeReclaimDelete,
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): bytesToGiQuantity(requestedBytes),
},
CSIPVS: &v1.CSIPersistentVolumeSource{
Driver: "test-driver",
VolumeHandle: "test-volume-id",
FSType: "ext4",
VolumeAttributes: map[string]string{
"storage.kubernetes.io/csiProvisionerIdentity": "test-provisioner",
},
},
},
},
}

tmpdir := tempDir(t)
defer os.RemoveAll(tmpdir)
mockController, driver, _, controllerServer, csiConn, err := createMockServer(t, tmpdir)
if err != nil {
t.Fatal(err)
}
defer mockController.Finish()
defer driver.Stop()

for k, tc := range testcases {
var requestedBytes int64 = 1000
var clientSet *fakeclientset.Clientset

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
CapacityBytes: requestedBytes,
VolumeId: "test-volume-id",
},
}

controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Return(out, nil).Times(1)
clientSet = fakeclientset.NewSimpleClientset()

claim := fakeClaim(srcName, "fake-claim-uid", "1Gi", "test-pv", v1.ClaimBound, nil)
clientSet = fakeclientset.NewSimpleClientset(claim)

pluginCaps, controllerCaps := provisionFromPVCCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Return(out, nil).Times(1)

pv, err := csiProvisioner.Provision(tc.volOpts)
if tc.expectErr && err == nil {
t.Errorf("test %q: Expected error, got none", k)
}

if !tc.expectErr && err != nil {
t.Errorf("test %q: got error: %v", k, err)
}

if tc.expectedPVSpec != nil {
if pv.Name != tc.expectedPVSpec.Name {
t.Errorf("test %q: expected PV name: %q, got: %q", k, tc.expectedPVSpec.Name, pv.Name)
}

if !reflect.DeepEqual(pv.Spec.Capacity, tc.expectedPVSpec.Capacity) {
t.Errorf("test %q: expected capacity: %v, got: %v", k, tc.expectedPVSpec.Capacity, pv.Spec.Capacity)
}

if tc.expectedPVSpec.CSIPVS != nil {
if !reflect.DeepEqual(pv.Spec.PersistentVolumeSource.CSI, tc.expectedPVSpec.CSIPVS) {
t.Errorf("test %q: expected PV: %v, got: %v", k, tc.expectedPVSpec.CSIPVS, pv.Spec.PersistentVolumeSource.CSI)
}
}
}
}
}

0 comments on commit 33ea9a5

Please sign in to comment.