Skip to content

Commit

Permalink
Extend allowed DataSources
Browse files Browse the repository at this point in the history
This PR extends the supported DataSources handled by the
external-provisioner to include PVCs (Clone) and external API Object
types.

It removes the restriction of of the provisioner erroring when a
DataSource.Kind of anything other than VolumeSnapshot is supplied
and adds handling for a DataSource.Kind of PersistentVolumeClaim.

In the case of specified DataSources that the provisioner doesn't deal
with, we just log that the provisioner doesn't take any action on the
specified type, and continue with provisioning ignoring the DataSource
field.

This not only enables Clone requests, but it also allows development of
external controllers to initialize newly created volumes.  Additional
improvements/enhancements will be added via PV/PVC taints and
tolerations when they're implemented.

Addresses: Issue #172
  • Loading branch information
j-griffith committed Feb 15, 2019
1 parent cbad731 commit b5e553f
Show file tree
Hide file tree
Showing 2 changed files with 260 additions and 16 deletions.
108 changes: 92 additions & 16 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ const (

snapshotKind = "VolumeSnapshot"
snapshotAPIGroup = snapapi.GroupName // "snapshot.storage.k8s.io"

pvcKind = "PersistentVolumeclaim" // Native types don't take an API group
)

var (
Expand Down Expand Up @@ -143,6 +145,12 @@ var (
}
)

// requiredCapabilities provides a set of extra capabilities required for special/optional features provided by a plugin
type requiredCapabilities struct {
snapshot bool
clone bool
}

// CSIProvisioner struct
type csiProvisioner struct {
client kubernetes.Interface
Expand All @@ -163,6 +171,7 @@ const (
PluginCapability_ACCESSIBILITY_CONSTRAINTS
ControllerCapability_CREATE_DELETE_VOLUME
ControllerCapability_CREATE_DELETE_SNAPSHOT
ControllerCapability_CLONE_VOLUME
)

var _ controller.Provisioner = &csiProvisioner{}
Expand Down Expand Up @@ -276,6 +285,8 @@ func getDriverCapabilities(conn *grpc.ClientConn, timeout time.Duration) (sets.I
capabilities.Insert(ControllerCapability_CREATE_DELETE_VOLUME)
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT:
capabilities.Insert(ControllerCapability_CREATE_DELETE_SNAPSHOT)
case csi.ControllerServiceCapability_RPC_CLONE_VOLUME:
capabilities.Insert(ControllerCapability_CLONE_VOLUME)
}
}
return capabilities, nil
Expand Down Expand Up @@ -340,7 +351,7 @@ func NewCSIProvisioner(client kubernetes.Interface,
// This function get called before any attempt to communicate with the driver.
// Before initiating Create/Delete API calls provisioner checks if Capabilities:
// PluginControllerService, ControllerCreateVolume sre supported and gets the driver name.
func checkDriverCapabilities(grpcClient *grpc.ClientConn, timeout time.Duration, needSnapshotSupport bool) (sets.Int, error) {
func checkDriverCapabilities(grpcClient *grpc.ClientConn, timeout time.Duration, caps *requiredCapabilities) (sets.Int, error) {
capabilities, err := getDriverCapabilities(grpcClient, timeout)
if err != nil {
return nil, fmt.Errorf("failed to get capabilities: %v", err)
Expand All @@ -354,17 +365,22 @@ func checkDriverCapabilities(grpcClient *grpc.ClientConn, timeout time.Duration,
return nil, fmt.Errorf("no create/delete volume support detected")
}

// If PVC.Spec.DataSource is not nil, it indicates the request is to create volume
// from snapshot and therefore we should check for snapshot support;
// otherwise we don't need to check for snapshot support.
if needSnapshotSupport {
if caps.snapshot {
// Check whether plugin supports create snapshot
// If not, create volume from snapshot cannot proceed
if !capabilities.Has(ControllerCapability_CREATE_DELETE_SNAPSHOT) {
return nil, fmt.Errorf("no create/delete snapshot support detected. Cannot create volume from snapshot")
}
}

if caps.clone {
// Check whether plugin supports create clone
// If not, return error
if !capabilities.Has(ControllerCapability_CLONE_VOLUME) {
return nil, fmt.Errorf("no create/delete snapshot support detected. Cannot create volume from snapshot")
}
}

return capabilities, nil
}

Expand Down Expand Up @@ -443,21 +459,26 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
return nil, fmt.Errorf("claim Selector is not supported")
}

var needSnapshotSupport bool
rc := &requiredCapabilities{}
if options.PVC.Spec.DataSource != nil {
// PVC.Spec.DataSource.Name is the name of the VolumeSnapshot API object
if options.PVC.Spec.DataSource.Name == "" {
return nil, fmt.Errorf("the PVC source not found for PVC %s", options.PVC.Name)
}
if options.PVC.Spec.DataSource.Kind != snapshotKind {
return nil, fmt.Errorf("the PVC source is not the right type. Expected %s, Got %s", snapshotKind, options.PVC.Spec.DataSource.Kind)
}
if *(options.PVC.Spec.DataSource.APIGroup) != snapshotAPIGroup {
return nil, fmt.Errorf("the PVC source does not belong to the right APIGroup. Expected %s, Got %s", snapshotAPIGroup, *(options.PVC.Spec.DataSource.APIGroup))

switch options.PVC.Spec.DataSource.Kind {
case snapshotKind:
if *(options.PVC.Spec.DataSource.APIGroup) != snapshotAPIGroup {
return nil, fmt.Errorf("the PVC source does not belong to the right APIGroup. Expected %s, Got %s", snapshotAPIGroup, *(options.PVC.Spec.DataSource.APIGroup))
}
rc.snapshot = true
case pvcKind:
rc.clone = true
default:
glog.Infof("non csi DataSource specified (%s), the provisioner won't act on this request", options.PVC.Spec.DataSource.Kind)
}
needSnapshotSupport = true
}
capabilities, err := checkDriverCapabilities(p.grpcClient, p.timeout, needSnapshotSupport)
capabilities, err := checkDriverCapabilities(p.grpcClient, p.timeout, rc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -505,10 +526,10 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
},
}

if needSnapshotSupport {
if options.PVC.Spec.DataSource != nil && (rc.clone || rc.snapshot) {
volumeContentSource, err := p.getVolumeContentSource(options)
if err != nil {
return nil, fmt.Errorf("error getting snapshot handle for snapshot %s: %v", options.PVC.Spec.DataSource.Name, err)
return nil, fmt.Errorf("error getting handle for DataSource Type %s by Name %s: %v", options.PVC.Spec.DataSource.Kind, options.PVC.Spec.DataSource.Name, err)
}
req.VolumeContentSource = volumeContentSource
}
Expand Down Expand Up @@ -665,7 +686,25 @@ func removePrefixedParameters(param map[string]string) (map[string]string, error
return newParam, nil
}

// getVolumeContentSource is a helper function to process provisioning requests that include a DataSource
// currently we provide Snapshot and PVC, the default case allows the provisioner to still create a volume
// so that an external controller can act upon it. Additional DataSource types can be added here with
// an appropriate implementation function
func (p *csiProvisioner) getVolumeContentSource(options controller.VolumeOptions) (*csi.VolumeContentSource, error) {
switch options.PVC.Spec.DataSource.Kind {
case snapshotKind:
return p.getSnapshotSource(options)
case pvcKind:
return p.getPVCSource(options)
default:
// For now we shouldn't pass other things to this function, but treat it as a noop and extend as needed
return nil, nil
}
}

// getSnapshotSource verifies DataSource.Kind of tyep VolumeSnapshot, making sure that the requested Snapshot is available/ready
// returns the VolumeContentSource for the requested snapshot
func (p *csiProvisioner) getSnapshotSource(options controller.VolumeOptions) (*csi.VolumeContentSource, error) {
snapshotObj, err := p.snapshotClient.VolumesnapshotV1alpha1().VolumeSnapshots(options.PVC.Namespace).Get(options.PVC.Spec.DataSource.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error getting snapshot %s from api server: %v", options.PVC.Spec.DataSource.Name, err)
Expand Down Expand Up @@ -720,13 +759,50 @@ func (p *csiProvisioner) getVolumeContentSource(options controller.VolumeOptions
return volumeContentSource, nil
}

// getPVCSource verifies DataSource.Kind of tyep PersistentVolumeClaim, making sure that the requested PVC is available/ready
// returns the VolumeContentSource for the requested PVC
func (p *csiProvisioner) getPVCSource(options controller.VolumeOptions) (*csi.VolumeContentSource, error) {
sourcePVC, err := p.client.CoreV1().PersistentVolumeClaims(options.PVC.Namespace).Get(options.PVC.Spec.DataSource.Name, metav1.GetOptions{})
if err != nil {
fmt.Printf("JDG: Well that blows: %v\n", err)
return nil, fmt.Errorf("error getting PVC %s from api server: %v", options.PVC.Spec.DataSource.Name, err)
}
if string(sourcePVC.Status.Phase) != "Bound" {
return nil, fmt.Errorf("the PVC DataSource %s must have a status of Bound. Got %v", options.PVC.Spec.DataSource.Name, sourcePVC.Status)
}
if sourcePVC.ObjectMeta.DeletionTimestamp != nil {
return nil, fmt.Errorf("the PVC DataSource %s is currently being deleted", options.PVC.Spec.DataSource.Name)
}
if sourcePVC.Spec.StorageClassName != options.PVC.Spec.StorageClassName {
return nil, fmt.Errorf("the source PVC and destination PVCs must be in the same storage class for cloning. Source is in %v, but new PVC is in %v",
sourcePVC.Spec.StorageClassName, options.PVC.Spec.StorageClassName)
}
capacity := options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
requestedSize := capacity.Value()
if requestedSize < int64(sourcePVC.Spec.Size()) {
return nil, fmt.Errorf("error, new PVC request must be greater than or equal in size to the specified PVC data source, requested %v but source is %v", requestedSize, sourcePVC.Spec.Size())
}

volumeSource := csi.VolumeContentSource_Volume{
Volume: &csi.VolumeContentSource_VolumeSource{
VolumeId: sourcePVC.Spec.VolumeName,
},
}
glog.V(5).Infof("VolumeContentSource_Volume %+v", volumeSource)

volumeContentSource := &csi.VolumeContentSource{
Type: &volumeSource,
}
return volumeContentSource, nil
}

func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
if volume == nil || volume.Spec.CSI == nil {
return fmt.Errorf("invalid CSI PV")
}
volumeId := p.volumeHandleToId(volume.Spec.CSI.VolumeHandle)

_, err := checkDriverCapabilities(p.grpcClient, p.timeout, false)
_, err := checkDriverCapabilities(p.grpcClient, p.timeout, &requiredCapabilities{})
if err != nil {
return err
}
Expand Down
168 changes: 168 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,40 @@ func provisionWithTopologyMockServerSetupExpectations(identityServer *driver.Moc
}, nil).Times(1)
}

// provisionFromPVCMockServerSetupExpectations mocks plugin and controller capabilities reported
// by a CSI plugin that supports the snapshot feature
func provisionFromPVCMockServerSetupExpectations(identityServer *driver.MockIdentityServer, controllerServer *driver.MockControllerServer) {
identityServer.EXPECT().GetPluginCapabilities(gomock.Any(), gomock.Any()).Return(&csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
},
},
},
}, nil).Times(1)
controllerServer.EXPECT().ControllerGetCapabilities(gomock.Any(), gomock.Any()).Return(&csi.ControllerGetCapabilitiesResponse{
Capabilities: []*csi.ControllerServiceCapability{
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
},
},
},
},
}, nil).Times(1)
}

// Minimal PVC required for tests to function
func createFakePVC(requestBytes int64) *v1.PersistentVolumeClaim {
return &v1.PersistentVolumeClaim{
Expand Down Expand Up @@ -1891,3 +1925,137 @@ func TestProvisionWithMountOptions(t *testing.T) {
t.Errorf("expected mount options %v; got: %v", expectedOptions, pv.Spec.MountOptions)
}
}

// TestProvisionFromPVC tests create volume from snapshot
func TestProvisionFromPVC(t *testing.T) {
var requestedBytes int64 = 1000
var pvcName = "src-pvc"
//var timeNow = time.Now().UnixNano()
//var metaTimeNowUnix = &metav1.Time{
// Time: time.Unix(0, timeNow),
// }

type pvSpec struct {
Name string
ReclaimPolicy v1.PersistentVolumeReclaimPolicy
AccessModes []v1.PersistentVolumeAccessMode
Capacity v1.ResourceList
CSIPVS *v1.CSIPersistentVolumeSource
}

testcases := map[string]struct {
volOpts controller.VolumeOptions
restoredVolSizeSmall bool
wrongDataSource bool
pvcStatusReady bool
expectedPVSpec *pvSpec
expectErr bool
}{
"provision with pvc data source": {
volOpts: controller.VolumeOptions{
PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimDelete,
PVName: "test-name",
PVC: &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
UID: "testid",
},
Spec: v1.PersistentVolumeClaimSpec{
Selector: nil,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): resource.MustParse(strconv.FormatInt(requestedBytes, 10)),
},
},
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
DataSource: &v1.TypedLocalObjectReference{
Name: pvcName,
Kind: "PersistentVolumeClaim",
},
},
},
Parameters: map[string]string{},
},
pvcStatusReady: true,
expectedPVSpec: &pvSpec{
Name: "test-testi",
ReclaimPolicy: v1.PersistentVolumeReclaimDelete,
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): bytesToGiQuantity(requestedBytes),
},
CSIPVS: &v1.CSIPersistentVolumeSource{
Driver: "test-driver",
VolumeHandle: "test-volume-id",
FSType: "ext4",
VolumeAttributes: map[string]string{
"storage.kubernetes.io/csiProvisionerIdentity": "test-provisioner",
},
},
},
},
}
mockController, driver, identityServer, controllerServer, csiConn, err := createMockServer(t)
if err != nil {
t.Fatal(err)
}
defer mockController.Finish()
defer driver.Stop()

for k, tc := range testcases {
var clientSet kubernetes.Interface
clientSet = fakeclientset.NewSimpleClientset()
client := &fake.Clientset{}

client.AddReactor("get", "persistentvolumeclaim", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
pvc := createFakePVC(requestedBytes)
pvc.Name = pvcName
return true, pvc, nil
})

csiProvisioner := NewCSIProvisioner(clientSet, nil, driver.Address(), 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
CapacityBytes: requestedBytes,
VolumeId: "test-volume-id",
},
}

if tc.wrongDataSource == false {
provisionFromPVCMockServerSetupExpectations(identityServer, controllerServer)
}
// If tc.restoredVolSizeSmall is true, or tc.wrongDataSource is true, or
// tc.snapshotStatusReady is false, create volume from snapshot operation will fail
// early and therefore CreateVolume is not expected to be called.
// When the following if condition is met, it is a valid create volume from snapshot
// operation and CreateVolume is expected to be called.
if tc.restoredVolSizeSmall == false && tc.wrongDataSource == false && tc.pvcStatusReady {
controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Return(out, nil).Times(1)
}

pv, err := csiProvisioner.Provision(tc.volOpts)
if tc.expectErr && err == nil {
t.Errorf("test %q: Expected error, got none", k)
}

if !tc.expectErr && err != nil {
t.Errorf("test %q: got error: %v", k, err)
}

if tc.expectedPVSpec != nil {
if pv.Name != tc.expectedPVSpec.Name {
t.Errorf("test %q: expected PV name: %q, got: %q", k, tc.expectedPVSpec.Name, pv.Name)
}

if !reflect.DeepEqual(pv.Spec.Capacity, tc.expectedPVSpec.Capacity) {
t.Errorf("test %q: expected capacity: %v, got: %v", k, tc.expectedPVSpec.Capacity, pv.Spec.Capacity)
}

if tc.expectedPVSpec.CSIPVS != nil {
if !reflect.DeepEqual(pv.Spec.PersistentVolumeSource.CSI, tc.expectedPVSpec.CSIPVS) {
t.Errorf("test %q: expected PV: %v, got: %v", k, tc.expectedPVSpec.CSIPVS, pv.Spec.PersistentVolumeSource.CSI)
}
}
}
}
}

0 comments on commit b5e553f

Please sign in to comment.