Skip to content

Commit

Permalink
Extend allowed DataSources
Browse files Browse the repository at this point in the history
This PR extends the supported DataSources handled by the
external-provisioner to include PVCs (Clone).

It removes the restriction of of the provisioner erroring when a
DataSource.Kind of anything other than VolumeSnapshot is supplied
and adds handling for a DataSource.Kind of PersistentVolumeClaim.

This implements the ability to specify a clone, and pass that dataSource
on to plugins that report support for the capability.

Addresses: Issue kubernetes-csi#172
  • Loading branch information
j-griffith committed Apr 11, 2019
1 parent 5019bc9 commit 839edba
Show file tree
Hide file tree
Showing 2 changed files with 257 additions and 15 deletions.
104 changes: 89 additions & 15 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ const (
defaultFSType = "ext4"

snapshotKind = "VolumeSnapshot"
snapshotAPIGroup = snapapi.GroupName // "snapshot.storage.k8s.io"
snapshotAPIGroup = snapapi.GroupName // "snapshot.storage.k8s.io"
pvcKind = "PersistentVolumeClaim" // Native types don't require an API group

tokenPVNameKey = "pv.name"
tokenPVCNameKey = "pvc.name"
Expand Down Expand Up @@ -144,6 +145,12 @@ var (
}
)

// requiredCapabilities provides a set of extra capabilities required for special/optional features provided by a plugin
type requiredCapabilities struct {
snapshot bool
clone bool
}

// CSIProvisioner struct
type csiProvisioner struct {
client kubernetes.Interface
Expand Down Expand Up @@ -237,7 +244,7 @@ func NewCSIProvisioner(client kubernetes.Interface,
// This function get called before any attempt to communicate with the driver.
// Before initiating Create/Delete API calls provisioner checks if Capabilities:
// PluginControllerService, ControllerCreateVolume sre supported and gets the driver name.
func (p *csiProvisioner) checkDriverCapabilities(needSnapshotSupport bool) error {
func (p *csiProvisioner) checkDriverCapabilities(rc *requiredCapabilities) error {
if !p.pluginCapabilities[csi.PluginCapability_Service_CONTROLLER_SERVICE] {
return fmt.Errorf("CSI driver does not support dynamic provisioning: plugin CONTROLLER_SERVICE capability is not reported")
}
Expand All @@ -246,13 +253,20 @@ func (p *csiProvisioner) checkDriverCapabilities(needSnapshotSupport bool) error
return fmt.Errorf("CSI driver does not support dynamic provisioning: controller CREATE_DELETE_VOLUME capability is not reported")
}

if needSnapshotSupport {
if rc.snapshot {
// Check whether plugin supports create snapshot
// If not, create volume from snapshot cannot proceed
if !p.controllerCapabilities[csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT] {
return fmt.Errorf("CSI driver does not support snapshot restore: controller CREATE_DELETE_SNAPSHOT capability is not reported")
}
}
if rc.clone {
// Check whether plugin supports clone operations
// If not, create volume from pvc cannot proceed
if !p.controllerCapabilities[csi.ControllerServiceCapability_RPC_CLONE_VOLUME] {
return fmt.Errorf("CSI driver does not support clone operations: controller CLONE_VOLUME capability is not reported")
}
}

return nil
}
Expand Down Expand Up @@ -353,22 +367,27 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
}
}

var needSnapshotSupport bool
// Make sure the plugin is capable of fulfilling the requested options
rc := &requiredCapabilities{}
if options.PVC.Spec.DataSource != nil {
// PVC.Spec.DataSource.Name is the name of the VolumeSnapshot API object
if options.PVC.Spec.DataSource.Name == "" {
return nil, fmt.Errorf("the PVC source not found for PVC %s", options.PVC.Name)
}
if options.PVC.Spec.DataSource.Kind != snapshotKind {
return nil, fmt.Errorf("the PVC source is not the right type. Expected %s, Got %s", snapshotKind, options.PVC.Spec.DataSource.Kind)
}
if *(options.PVC.Spec.DataSource.APIGroup) != snapshotAPIGroup {
return nil, fmt.Errorf("the PVC source does not belong to the right APIGroup. Expected %s, Got %s", snapshotAPIGroup, *(options.PVC.Spec.DataSource.APIGroup))

switch options.PVC.Spec.DataSource.Kind {
case snapshotKind:
if *(options.PVC.Spec.DataSource.APIGroup) != snapshotAPIGroup {
return nil, fmt.Errorf("the PVC source does not belong to the right APIGroup. Expected %s, Got %s", snapshotAPIGroup, *(options.PVC.Spec.DataSource.APIGroup))
}
rc.snapshot = true
case pvcKind:
rc.clone = true
default:
klog.Infof("non csi DataSource specified (%s), the provisioner won't act on this request", options.PVC.Spec.DataSource.Kind)
}
needSnapshotSupport = true
}

if err := p.checkDriverCapabilities(needSnapshotSupport); err != nil {
if err := p.checkDriverCapabilities(rc); err != nil {
return nil, err
}

Expand Down Expand Up @@ -418,10 +437,10 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
},
}

if needSnapshotSupport {
if options.PVC.Spec.DataSource != nil && (rc.clone || rc.snapshot) {
volumeContentSource, err := p.getVolumeContentSource(options)
if err != nil {
return nil, fmt.Errorf("error getting snapshot handle for snapshot %s: %v", options.PVC.Spec.DataSource.Name, err)
return nil, fmt.Errorf("error getting handle for DataSource Type %s by Name %s: %v", options.PVC.Spec.DataSource.Kind, options.PVC.Spec.DataSource.Name, err)
}
req.VolumeContentSource = volumeContentSource
}
Expand Down Expand Up @@ -591,7 +610,61 @@ func removePrefixedParameters(param map[string]string) (map[string]string, error
return newParam, nil
}

// getVolumeContentSource is a helper function to process provisioning requests that include a DataSource
// currently we provide Snapshot and PVC, the default case allows the provisioner to still create a volume
// so that an external controller can act upon it. Additional DataSource types can be added here with
// an appropriate implementation function
func (p *csiProvisioner) getVolumeContentSource(options controller.VolumeOptions) (*csi.VolumeContentSource, error) {
switch options.PVC.Spec.DataSource.Kind {
case snapshotKind:
return p.getSnapshotSource(options)
case pvcKind:
return p.getPVCSource(options)
default:
// For now we shouldn't pass other things to this function, but treat it as a noop and extend as needed
return nil, nil
}
}

// getPVCSource verifies DataSource.Kind of type PersistentVolumeClaim, making sure that the requested PVC is available/ready
// returns the VolumeContentSource for the requested PVC
func (p *csiProvisioner) getPVCSource(options controller.VolumeOptions) (*csi.VolumeContentSource, error) {
sourcePVC, err := p.client.CoreV1().PersistentVolumeClaims(options.PVC.Namespace).Get(options.PVC.Spec.DataSource.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error getting PVC %s from api server: %v", options.PVC.Spec.DataSource.Name, err)
}
if string(sourcePVC.Status.Phase) != "Bound" {
return nil, fmt.Errorf("the PVC DataSource %s must have a status of Bound. Got %v", options.PVC.Spec.DataSource.Name, sourcePVC.Status)
}
if sourcePVC.ObjectMeta.DeletionTimestamp != nil {
return nil, fmt.Errorf("the PVC DataSource %s is currently being deleted", options.PVC.Spec.DataSource.Name)
}
if sourcePVC.Spec.StorageClassName != options.PVC.Spec.StorageClassName {
return nil, fmt.Errorf("the source PVC and destination PVCs must be in the same storage class for cloning. Source is in %v, but new PVC is in %v",
sourcePVC.Spec.StorageClassName, options.PVC.Spec.StorageClassName)
}
capacity := options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
requestedSize := capacity.Value()
if requestedSize < int64(sourcePVC.Spec.Size()) {
return nil, fmt.Errorf("error, new PVC request must be greater than or equal in size to the specified PVC data source, requested %v but source is %v", requestedSize, sourcePVC.Spec.Size())
}

volumeSource := csi.VolumeContentSource_Volume{
Volume: &csi.VolumeContentSource_VolumeSource{
VolumeId: sourcePVC.Spec.VolumeName,
},
}
klog.V(5).Infof("VolumeContentSource_Volume %+v", volumeSource)

volumeContentSource := &csi.VolumeContentSource{
Type: &volumeSource,
}
return volumeContentSource, nil
}

// getSnapshotSource verifies DataSource.Kind of type VolumeSnapshot, making sure that the requested Snapshot is available/ready
// returns the VolumeContentSource for the requested snapshot
func (p *csiProvisioner) getSnapshotSource(options controller.VolumeOptions) (*csi.VolumeContentSource, error) {
snapshotObj, err := p.snapshotClient.VolumesnapshotV1alpha1().VolumeSnapshots(options.PVC.Namespace).Get(options.PVC.Spec.DataSource.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error getting snapshot %s from api server: %v", options.PVC.Spec.DataSource.Name, err)
Expand Down Expand Up @@ -652,7 +725,8 @@ func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
}
volumeId := p.volumeHandleToId(volume.Spec.CSI.VolumeHandle)

if err := p.checkDriverCapabilities(false); err != nil {
rc := &requiredCapabilities{}
if err := p.checkDriverCapabilities(rc); err != nil {
return err
}

Expand Down
168 changes: 168 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,40 @@ func provisionWithTopologyCapabilities() (connection.PluginCapabilitySet, connec
}
}

// provisionFromPVCMockServerSetupExpectations mocks plugin and controller capabilities reported
// by a CSI plugin that supports the snapshot feature
func provisionFromPVCMockServerSetupExpectations(identityServer *driver.MockIdentityServer, controllerServer *driver.MockControllerServer) {
identityServer.EXPECT().GetPluginCapabilities(gomock.Any(), gomock.Any()).Return(&csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
},
},
},
}, nil).Times(1)
controllerServer.EXPECT().ControllerGetCapabilities(gomock.Any(), gomock.Any()).Return(&csi.ControllerGetCapabilitiesResponse{
Capabilities: []*csi.ControllerServiceCapability{
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
},
},
},
},
}, nil).Times(1)
}

// Minimal PVC required for tests to function
func createFakePVC(requestBytes int64) *v1.PersistentVolumeClaim {
return &v1.PersistentVolumeClaim{
Expand Down Expand Up @@ -1787,3 +1821,137 @@ func TestProvisionWithMountOptions(t *testing.T) {
t.Errorf("expected mount options %v; got: %v", expectedOptions, pv.Spec.MountOptions)
}
}

// TestProvisionFromPVC tests create volume from snapshot
func TestProvisionFromPVC(t *testing.T) {
var requestedBytes int64 = 1000
var pvcName = "src-pvc"
//var timeNow = time.Now().UnixNano()
//var metaTimeNowUnix = &metav1.Time{
// Time: time.Unix(0, timeNow),
// }

type pvSpec struct {
Name string
ReclaimPolicy v1.PersistentVolumeReclaimPolicy
AccessModes []v1.PersistentVolumeAccessMode
Capacity v1.ResourceList
CSIPVS *v1.CSIPersistentVolumeSource
}

testcases := map[string]struct {
volOpts controller.VolumeOptions
restoredVolSizeSmall bool
wrongDataSource bool
pvcStatusReady bool
expectedPVSpec *pvSpec
expectErr bool
}{
"provision with pvc data source": {
volOpts: controller.VolumeOptions{
PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimDelete,
PVName: "test-name",
PVC: &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
UID: "testid",
},
Spec: v1.PersistentVolumeClaimSpec{
Selector: nil,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): resource.MustParse(strconv.FormatInt(requestedBytes, 10)),
},
},
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
DataSource: &v1.TypedLocalObjectReference{
Name: pvcName,
Kind: "PersistentVolumeClaim",
},
},
},
Parameters: map[string]string{},
},
pvcStatusReady: true,
expectedPVSpec: &pvSpec{
Name: "test-testi",
ReclaimPolicy: v1.PersistentVolumeReclaimDelete,
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): bytesToGiQuantity(requestedBytes),
},
CSIPVS: &v1.CSIPersistentVolumeSource{
Driver: "test-driver",
VolumeHandle: "test-volume-id",
FSType: "ext4",
VolumeAttributes: map[string]string{
"storage.kubernetes.io/csiProvisionerIdentity": "test-provisioner",
},
},
},
},
}
mockController, driver, identityServer, controllerServer, csiConn, err := createMockServer(t)
if err != nil {
t.Fatal(err)
}
defer mockController.Finish()
defer driver.Stop()

for k, tc := range testcases {
var clientSet kubernetes.Interface
clientSet = fakeclientset.NewSimpleClientset()
client := &fake.Clientset{}

client.AddReactor("get", "persistentvolumeclaim", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
pvc := createFakePVC(requestedBytes)
pvc.Name = pvcName
return true, pvc, nil
})

csiProvisioner := NewCSIProvisioner(clientSet, nil, driver.Address(), 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
CapacityBytes: requestedBytes,
VolumeId: "test-volume-id",
},
}

if tc.wrongDataSource == false {
provisionFromPVCMockServerSetupExpectations(identityServer, controllerServer)
}
// If tc.restoredVolSizeSmall is true, or tc.wrongDataSource is true, or
// tc.snapshotStatusReady is false, create volume from snapshot operation will fail
// early and therefore CreateVolume is not expected to be called.
// When the following if condition is met, it is a valid create volume from snapshot
// operation and CreateVolume is expected to be called.
if tc.restoredVolSizeSmall == false && tc.wrongDataSource == false && tc.pvcStatusReady {
controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Return(out, nil).Times(1)
}

pv, err := csiProvisioner.Provision(tc.volOpts)
if tc.expectErr && err == nil {
t.Errorf("test %q: Expected error, got none", k)
}

if !tc.expectErr && err != nil {
t.Errorf("test %q: got error: %v", k, err)
}

if tc.expectedPVSpec != nil {
if pv.Name != tc.expectedPVSpec.Name {
t.Errorf("test %q: expected PV name: %q, got: %q", k, tc.expectedPVSpec.Name, pv.Name)
}

if !reflect.DeepEqual(pv.Spec.Capacity, tc.expectedPVSpec.Capacity) {
t.Errorf("test %q: expected capacity: %v, got: %v", k, tc.expectedPVSpec.Capacity, pv.Spec.Capacity)
}

if tc.expectedPVSpec.CSIPVS != nil {
if !reflect.DeepEqual(pv.Spec.PersistentVolumeSource.CSI, tc.expectedPVSpec.CSIPVS) {
t.Errorf("test %q: expected PV: %v, got: %v", k, tc.expectedPVSpec.CSIPVS, pv.Spec.PersistentVolumeSource.CSI)
}
}
}
}
}

0 comments on commit 839edba

Please sign in to comment.