Skip to content

Commit

Permalink
Extend allowed DataSources
Browse files Browse the repository at this point in the history
This PR extends the supported DataSources handled by the
external-provisioner to include PVCs (Clone) and Populators (external
CRDs).

It removes the restriction of of the provisioner erroring when a
DataSource.Kind of anything other than VolumeSnapshot is supplied (the
Kubernetes API currently enforces this anyway via it's verifictaion
checks) and adds handling for a DataSource.Kind of
PersistentVolumeClaim.

In addition for things like external populators, we provide a default
case that just logs that the proviserion doesn't take any action on the
received type, but if the type is supported/allowed via the Kubernetes
API then the DataSource information is stll in the spec for external
controllers to act upon. This allows external controllers to work (again
requires Kubernetes API support), and can be formalized/added easily
when an official populator controller is merged.

Addresses: Issue kubernetes-csi#172
  • Loading branch information
j-griffith committed Jan 29, 2019
1 parent 64844fb commit 0627c2b
Show file tree
Hide file tree
Showing 2 changed files with 251 additions and 16 deletions.
99 changes: 83 additions & 16 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ const (

snapshotKind = "VolumeSnapshot"
snapshotAPIGroup = snapapi.GroupName // "snapshot.storage.k8s.io"

pvcKind = "PersistentVolumeclaim"
)

var (
Expand Down Expand Up @@ -147,6 +149,11 @@ var (
}
)

type requiredCapabilities struct {
snapshot bool
clone bool
}

// CSIProvisioner struct
type csiProvisioner struct {
client kubernetes.Interface
Expand All @@ -167,6 +174,7 @@ const (
PluginCapability_ACCESSIBILITY_CONSTRAINTS
ControllerCapability_CREATE_DELETE_VOLUME
ControllerCapability_CREATE_DELETE_SNAPSHOT
ControllerCapability_CLONE_VOLUME
)

var _ controller.Provisioner = &csiProvisioner{}
Expand Down Expand Up @@ -280,6 +288,8 @@ func getDriverCapabilities(conn *grpc.ClientConn, timeout time.Duration) (sets.I
capabilities.Insert(ControllerCapability_CREATE_DELETE_VOLUME)
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT:
capabilities.Insert(ControllerCapability_CREATE_DELETE_SNAPSHOT)
case csi.ControllerServiceCapability_RPC_CLONE_VOLUME:
capabilities.Insert(ControllerCapability_CLONE_VOLUME)
}
}
return capabilities, nil
Expand Down Expand Up @@ -344,7 +354,7 @@ func NewCSIProvisioner(client kubernetes.Interface,
// This function get called before any attempt to communicate with the driver.
// Before initiating Create/Delete API calls provisioner checks if Capabilities:
// PluginControllerService, ControllerCreateVolume sre supported and gets the driver name.
func checkDriverCapabilities(grpcClient *grpc.ClientConn, timeout time.Duration, needSnapshotSupport bool) (sets.Int, error) {
func checkDriverCapabilities(grpcClient *grpc.ClientConn, timeout time.Duration, caps *requiredCapabilities) (sets.Int, error) {
capabilities, err := getDriverCapabilities(grpcClient, timeout)
if err != nil {
return nil, fmt.Errorf("failed to get capabilities: %v", err)
Expand All @@ -358,17 +368,22 @@ func checkDriverCapabilities(grpcClient *grpc.ClientConn, timeout time.Duration,
return nil, fmt.Errorf("no create/delete volume support detected")
}

// If PVC.Spec.DataSource is not nil, it indicates the request is to create volume
// from snapshot and therefore we should check for snapshot support;
// otherwise we don't need to check for snapshot support.
if needSnapshotSupport {
if caps.snapshot {
// Check whether plugin supports create snapshot
// If not, create volume from snapshot cannot proceed
if !capabilities.Has(ControllerCapability_CREATE_DELETE_SNAPSHOT) {
return nil, fmt.Errorf("no create/delete snapshot support detected. Cannot create volume from snapshot")
}
}

if caps.clone {
// Check whether plugin supports create clone
// If not, return error
if !capabilities.Has(ControllerCapability_CLONE_VOLUME) {
return nil, fmt.Errorf("no create/delete snapshot support detected. Cannot create volume from snapshot")
}
}

return capabilities, nil
}

Expand Down Expand Up @@ -447,21 +462,26 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
return nil, fmt.Errorf("claim Selector is not supported")
}

var needSnapshotSupport bool
rc := &requiredCapabilities{}
if options.PVC.Spec.DataSource != nil {
// PVC.Spec.DataSource.Name is the name of the VolumeSnapshot API object
if options.PVC.Spec.DataSource.Name == "" {
return nil, fmt.Errorf("the PVC source not found for PVC %s", options.PVC.Name)
}
if options.PVC.Spec.DataSource.Kind != snapshotKind {
return nil, fmt.Errorf("the PVC source is not the right type. Expected %s, Got %s", snapshotKind, options.PVC.Spec.DataSource.Kind)
}
if *(options.PVC.Spec.DataSource.APIGroup) != snapshotAPIGroup {
return nil, fmt.Errorf("the PVC source does not belong to the right APIGroup. Expected %s, Got %s", snapshotAPIGroup, *(options.PVC.Spec.DataSource.APIGroup))

switch options.PVC.Spec.DataSource.Kind {
case snapshotKind:
if *(options.PVC.Spec.DataSource.APIGroup) != snapshotAPIGroup {
return nil, fmt.Errorf("the PVC source does not belong to the right APIGroup. Expected %s, Got %s", snapshotAPIGroup, *(options.PVC.Spec.DataSource.APIGroup))
}
rc.snapshot = true
case pvcKind:
rc.clone = true
default:
glog.Infof("non csi DataSource specified (%s), the provisioner won't act on this request", options.PVC.Spec.DataSource.Kind)
}
needSnapshotSupport = true
}
capabilities, err := checkDriverCapabilities(p.grpcClient, p.timeout, needSnapshotSupport)
capabilities, err := checkDriverCapabilities(p.grpcClient, p.timeout, rc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -509,10 +529,10 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
},
}

if needSnapshotSupport {
if options.PVC.Spec.DataSource != nil && (rc.clone || rc.snapshot) {
volumeContentSource, err := p.getVolumeContentSource(options)
if err != nil {
return nil, fmt.Errorf("error getting snapshot handle for snapshot %s: %v", options.PVC.Spec.DataSource.Name, err)
return nil, fmt.Errorf("error getting handle for DataSource Type %s by Name %s: %v", options.PVC.Spec.DataSource.Kind, options.PVC.Spec.DataSource.Name, err)
}
req.VolumeContentSource = volumeContentSource
}
Expand Down Expand Up @@ -687,6 +707,18 @@ func removePrefixedParameters(param map[string]string) (map[string]string, error
}

func (p *csiProvisioner) getVolumeContentSource(options controller.VolumeOptions) (*csi.VolumeContentSource, error) {
switch options.PVC.Spec.DataSource.Kind {
case snapshotKind:
return p.getSnapshotSource(options)
case pvcKind:
return p.getPVCSource(options)
default:
// For now we shouldn't pass other things to this function, but treat it as a noop and extend as needed
return nil, nil
}
}

func (p *csiProvisioner) getSnapshotSource(options controller.VolumeOptions) (*csi.VolumeContentSource, error) {
snapshotObj, err := p.snapshotClient.VolumesnapshotV1alpha1().VolumeSnapshots(options.PVC.Namespace).Get(options.PVC.Spec.DataSource.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error getting snapshot %s from api server: %v", options.PVC.Spec.DataSource.Name, err)
Expand Down Expand Up @@ -741,13 +773,48 @@ func (p *csiProvisioner) getVolumeContentSource(options controller.VolumeOptions
return volumeContentSource, nil
}

func (p *csiProvisioner) getPVCSource(options controller.VolumeOptions) (*csi.VolumeContentSource, error) {
sourcePVC, err := p.client.CoreV1().PersistentVolumeClaims(options.PVC.Namespace).Get(options.PVC.Spec.DataSource.Name, metav1.GetOptions{})
if err != nil {
fmt.Printf("JDG: Well that blows: %v\n", err)
return nil, fmt.Errorf("error getting PVC %s from api server: %v", options.PVC.Spec.DataSource.Name, err)
}
if string(sourcePVC.Status.Phase) != "Bound" {
return nil, fmt.Errorf("the PVC DataSource %s must have a status of Bound. Got %v", options.PVC.Spec.DataSource.Name, sourcePVC.Status)
}
if sourcePVC.ObjectMeta.DeletionTimestamp != nil {
return nil, fmt.Errorf("the PVC DataSource %s is currently being deleted", options.PVC.Spec.DataSource.Name)
}
if sourcePVC.Spec.StorageClassName != options.PVC.Spec.StorageClassName {
return nil, fmt.Errorf("the source PVC and destination PVCs must be in the same storage class for cloning. Source is in %v, but new PVC is in %v",
sourcePVC.Spec.StorageClassName, options.PVC.Spec.StorageClassName)
}
capacity := options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
requestedSize := capacity.Value()
if requestedSize < int64(sourcePVC.Spec.Size()) {
return nil, fmt.Errorf("error, new PVC request must be greater than or equal in size to the specified PVC data source, requested %v but source is %v", requestedSize, sourcePVC.Spec.Size())
}

volumeSource := csi.VolumeContentSource_Volume{
Volume: &csi.VolumeContentSource_VolumeSource{
VolumeId: sourcePVC.Spec.VolumeName,
},
}
glog.V(5).Infof("VolumeContentSource_Volume %+v", volumeSource)

volumeContentSource := &csi.VolumeContentSource{
Type: &volumeSource,
}
return volumeContentSource, nil
}

func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
if volume == nil || volume.Spec.CSI == nil {
return fmt.Errorf("invalid CSI PV")
}
volumeId := p.volumeHandleToId(volume.Spec.CSI.VolumeHandle)

_, err := checkDriverCapabilities(p.grpcClient, p.timeout, false)
_, err := checkDriverCapabilities(p.grpcClient, p.timeout, &requiredCapabilities{})
if err != nil {
return err
}
Expand Down
168 changes: 168 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,40 @@ func provisionWithTopologyMockServerSetupExpectations(identityServer *driver.Moc
}, nil).Times(1)
}

// provisionFromPVCMockServerSetupExpectations mocks plugin and controller capabilities reported
// by a CSI plugin that supports the snapshot feature
func provisionFromPVCMockServerSetupExpectations(identityServer *driver.MockIdentityServer, controllerServer *driver.MockControllerServer) {
identityServer.EXPECT().GetPluginCapabilities(gomock.Any(), gomock.Any()).Return(&csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
},
},
},
}, nil).Times(1)
controllerServer.EXPECT().ControllerGetCapabilities(gomock.Any(), gomock.Any()).Return(&csi.ControllerGetCapabilitiesResponse{
Capabilities: []*csi.ControllerServiceCapability{
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
},
},
},
{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
},
},
},
},
}, nil).Times(1)
}

// Minimal PVC required for tests to function
func createFakePVC(requestBytes int64) *v1.PersistentVolumeClaim {
return &v1.PersistentVolumeClaim{
Expand Down Expand Up @@ -1891,3 +1925,137 @@ func TestProvisionWithMountOptions(t *testing.T) {
t.Errorf("expected mount options %v; got: %v", expectedOptions, pv.Spec.MountOptions)
}
}

// TestProvisionFromPVC tests create volume from snapshot
func TestProvisionFromPVC(t *testing.T) {
var requestedBytes int64 = 1000
var pvcName = "src-pvc"
//var timeNow = time.Now().UnixNano()
//var metaTimeNowUnix = &metav1.Time{
// Time: time.Unix(0, timeNow),
// }

type pvSpec struct {
Name string
ReclaimPolicy v1.PersistentVolumeReclaimPolicy
AccessModes []v1.PersistentVolumeAccessMode
Capacity v1.ResourceList
CSIPVS *v1.CSIPersistentVolumeSource
}

testcases := map[string]struct {
volOpts controller.VolumeOptions
restoredVolSizeSmall bool
wrongDataSource bool
pvcStatusReady bool
expectedPVSpec *pvSpec
expectErr bool
}{
"provision with pvc data source": {
volOpts: controller.VolumeOptions{
PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimDelete,
PVName: "test-name",
PVC: &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
UID: "testid",
},
Spec: v1.PersistentVolumeClaimSpec{
Selector: nil,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): resource.MustParse(strconv.FormatInt(requestedBytes, 10)),
},
},
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
DataSource: &v1.TypedLocalObjectReference{
Name: pvcName,
Kind: "PersistentVolumeClaim",
},
},
},
Parameters: map[string]string{},
},
pvcStatusReady: true,
expectedPVSpec: &pvSpec{
Name: "test-testi",
ReclaimPolicy: v1.PersistentVolumeReclaimDelete,
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): bytesToGiQuantity(requestedBytes),
},
CSIPVS: &v1.CSIPersistentVolumeSource{
Driver: "test-driver",
VolumeHandle: "test-volume-id",
FSType: "ext4",
VolumeAttributes: map[string]string{
"storage.kubernetes.io/csiProvisionerIdentity": "test-provisioner",
},
},
},
},
}
mockController, driver, identityServer, controllerServer, csiConn, err := createMockServer(t)
if err != nil {
t.Fatal(err)
}
defer mockController.Finish()
defer driver.Stop()

for k, tc := range testcases {
var clientSet kubernetes.Interface
clientSet = fakeclientset.NewSimpleClientset()
client := &fake.Clientset{}

client.AddReactor("get", "persistentvolumeclaim", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
pvc := createFakePVC(requestedBytes)
pvc.Name = pvcName
return true, pvc, nil
})

csiProvisioner := NewCSIProvisioner(clientSet, nil, driver.Address(), 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
CapacityBytes: requestedBytes,
VolumeId: "test-volume-id",
},
}

if tc.wrongDataSource == false {
provisionFromPVCMockServerSetupExpectations(identityServer, controllerServer)
}
// If tc.restoredVolSizeSmall is true, or tc.wrongDataSource is true, or
// tc.snapshotStatusReady is false, create volume from snapshot operation will fail
// early and therefore CreateVolume is not expected to be called.
// When the following if condition is met, it is a valid create volume from snapshot
// operation and CreateVolume is expected to be called.
if tc.restoredVolSizeSmall == false && tc.wrongDataSource == false && tc.pvcStatusReady {
controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Return(out, nil).Times(1)
}

pv, err := csiProvisioner.Provision(tc.volOpts)
if tc.expectErr && err == nil {
t.Errorf("test %q: Expected error, got none", k)
}

if !tc.expectErr && err != nil {
t.Errorf("test %q: got error: %v", k, err)
}

if tc.expectedPVSpec != nil {
if pv.Name != tc.expectedPVSpec.Name {
t.Errorf("test %q: expected PV name: %q, got: %q", k, tc.expectedPVSpec.Name, pv.Name)
}

if !reflect.DeepEqual(pv.Spec.Capacity, tc.expectedPVSpec.Capacity) {
t.Errorf("test %q: expected capacity: %v, got: %v", k, tc.expectedPVSpec.Capacity, pv.Spec.Capacity)
}

if tc.expectedPVSpec.CSIPVS != nil {
if !reflect.DeepEqual(pv.Spec.PersistentVolumeSource.CSI, tc.expectedPVSpec.CSIPVS) {
t.Errorf("test %q: expected PV: %v, got: %v", k, tc.expectedPVSpec.CSIPVS, pv.Spec.PersistentVolumeSource.CSI)
}
}
}
}
}

0 comments on commit 0627c2b

Please sign in to comment.