Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache driver capabilities #241

Merged
merged 1 commit into from
Mar 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"strings"
"time"

"k8s.io/klog"

flag "github.com/spf13/pflag"

ctrl "github.com/kubernetes-csi/external-provisioner/pkg/controller"
Expand All @@ -39,6 +37,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"

utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflag "k8s.io/apiserver/pkg/util/flag"
Expand Down Expand Up @@ -149,13 +148,18 @@ func init() {
}
klog.V(2).Infof("Detected CSI driver %s", provisionerName)

pluginCapabilities, controllerCapabilities, err := ctrl.GetDriverCapabilities(grpcClient, *operationTimeout)
if err != nil {
klog.Fatalf("Error getting CSI driver capabilities: %s", err)
}

// Generate a unique ID for this provisioner
timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerName

// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName)
csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities)
provisionController = controller.NewProvisionController(
clientset,
provisionerName,
Expand Down
155 changes: 62 additions & 93 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,26 +144,21 @@ var (

// CSIProvisioner struct
type csiProvisioner struct {
client kubernetes.Interface
csiClient csi.ControllerClient
csiAPIClient csiclientset.Interface
grpcClient *grpc.ClientConn
snapshotClient snapclientset.Interface
timeout time.Duration
identity string
volumeNamePrefix string
volumeNameUUIDLength int
config *rest.Config
driverName string
client kubernetes.Interface
csiClient csi.ControllerClient
csiAPIClient csiclientset.Interface
grpcClient *grpc.ClientConn
snapshotClient snapclientset.Interface
timeout time.Duration
identity string
volumeNamePrefix string
volumeNameUUIDLength int
config *rest.Config
driverName string
pluginCapabilities connection.PluginCapabilitySet
controllerCapabilities connection.ControllerCapabilitySet
}

const (
PluginCapability_CONTROLLER_SERVICE = iota
PluginCapability_ACCESSIBILITY_CONSTRAINTS
ControllerCapability_CREATE_DELETE_VOLUME
ControllerCapability_CREATE_DELETE_SNAPSHOT
)

var _ controller.Provisioner = &csiProvisioner{}
var _ controller.BlockProvisioner = &csiProvisioner{}

Expand Down Expand Up @@ -198,47 +193,23 @@ func GetDriverName(conn *grpc.ClientConn, timeout time.Duration) (string, error)
return connection.GetDriverName(ctx, conn)
}

func getDriverCapabilities(conn *grpc.ClientConn, timeout time.Duration) (sets.Int, error) {
pluginCaps, err := getPluginCapabilities(conn, timeout)
func GetDriverCapabilities(conn *grpc.ClientConn, timeout time.Duration) (connection.PluginCapabilitySet, connection.ControllerCapabilitySet, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
pluginCapabilities, err := connection.GetPluginCapabilities(ctx, conn)
if err != nil {
return nil, err
return nil, nil, err
}

controllerCaps, err := getControllerCapabilities(conn, timeout)
/* Each CSI operation gets its own timeout / context */
ctx, cancel = context.WithTimeout(context.Background(), timeout)
defer cancel()
controllerCapabilities, err := connection.GetControllerCapabilities(ctx, conn)
if err != nil {
return nil, err
}

capabilities := make(sets.Int)
for cap := range pluginCaps {
switch cap {
case csi.PluginCapability_Service_CONTROLLER_SERVICE:
capabilities.Insert(PluginCapability_CONTROLLER_SERVICE)
case csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS:
capabilities.Insert(PluginCapability_ACCESSIBILITY_CONSTRAINTS)
}
return nil, nil, err
}
for cap := range controllerCaps {
switch cap {
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME:
capabilities.Insert(ControllerCapability_CREATE_DELETE_VOLUME)
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT:
capabilities.Insert(ControllerCapability_CREATE_DELETE_SNAPSHOT)
}
}
return capabilities, nil
}

func getPluginCapabilities(conn *grpc.ClientConn, timeout time.Duration) (connection.PluginCapabilitySet, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return connection.GetPluginCapabilities(ctx, conn)
}

func getControllerCapabilities(conn *grpc.ClientConn, timeout time.Duration) (connection.ControllerCapabilitySet, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return connection.GetControllerCapabilities(ctx, conn)
return pluginCapabilities, controllerCapabilities, nil
}

// NewCSIProvisioner creates new CSI provisioner
Expand All @@ -250,53 +221,49 @@ func NewCSIProvisioner(client kubernetes.Interface,
volumeNameUUIDLength int,
grpcClient *grpc.ClientConn,
snapshotClient snapclientset.Interface,
driverName string) controller.Provisioner {
driverName string,
pluginCapabilities connection.PluginCapabilitySet,
controllerCapabilities connection.ControllerCapabilitySet) controller.Provisioner {

csiClient := csi.NewControllerClient(grpcClient)
provisioner := &csiProvisioner{
client: client,
grpcClient: grpcClient,
csiClient: csiClient,
csiAPIClient: csiAPIClient,
snapshotClient: snapshotClient,
timeout: connectionTimeout,
identity: identity,
volumeNamePrefix: volumeNamePrefix,
volumeNameUUIDLength: volumeNameUUIDLength,
driverName: driverName,
client: client,
grpcClient: grpcClient,
csiClient: csiClient,
csiAPIClient: csiAPIClient,
snapshotClient: snapshotClient,
timeout: connectionTimeout,
identity: identity,
volumeNamePrefix: volumeNamePrefix,
volumeNameUUIDLength: volumeNameUUIDLength,
driverName: driverName,
pluginCapabilities: pluginCapabilities,
controllerCapabilities: controllerCapabilities,
}
return provisioner
}

// This function get called before any attempt to communicate with the driver.
// Before initiating Create/Delete API calls provisioner checks if Capabilities:
// PluginControllerService, ControllerCreateVolume sre supported and gets the driver name.
func checkDriverCapabilities(grpcClient *grpc.ClientConn, timeout time.Duration, needSnapshotSupport bool) (sets.Int, error) {
capabilities, err := getDriverCapabilities(grpcClient, timeout)
if err != nil {
return nil, fmt.Errorf("failed to get capabilities: %v", err)
}

if !capabilities.Has(PluginCapability_CONTROLLER_SERVICE) {
return nil, fmt.Errorf("no plugin controller service support detected")
func (p *csiProvisioner) checkDriverCapabilities(needSnapshotSupport bool) error {
if !p.pluginCapabilities[csi.PluginCapability_Service_CONTROLLER_SERVICE] {
return fmt.Errorf("CSI driver does not support dynamic provisioning: plugin CONTROLLER_SERVICE capability is not reported")
}

if !capabilities.Has(ControllerCapability_CREATE_DELETE_VOLUME) {
return nil, fmt.Errorf("no create/delete volume support detected")
if !p.controllerCapabilities[csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME] {
return fmt.Errorf("CSI driver does not support dynamic provisioning: controller CREATE_DELETE_VOLUME capability is not reported")
}

// If PVC.Spec.DataSource is not nil, it indicates the request is to create volume
// from snapshot and therefore we should check for snapshot support;
// otherwise we don't need to check for snapshot support.
if needSnapshotSupport {
// Check whether plugin supports create snapshot
// If not, create volume from snapshot cannot proceed
if !capabilities.Has(ControllerCapability_CREATE_DELETE_SNAPSHOT) {
return nil, fmt.Errorf("no create/delete snapshot support detected. Cannot create volume from snapshot")
if !p.controllerCapabilities[csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT] {
return fmt.Errorf("CSI driver does not support snapshot restore: controller CREATE_DELETE_SNAPSHOT capability is not reported")
}
}

return capabilities, nil
return nil
}

func makeVolumeName(prefix, pvcUID string, volumeNameUUIDLength int) (string, error) {
Expand Down Expand Up @@ -370,10 +337,6 @@ func getVolumeCapability(
}

func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.PersistentVolume, error) {
if options.PVC.Spec.Selector != nil {
return nil, fmt.Errorf("claim Selector is not supported")
}

var needSnapshotSupport bool
if options.PVC.Spec.DataSource != nil {
// PVC.Spec.DataSource.Name is the name of the VolumeSnapshot API object
Expand All @@ -388,11 +351,15 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
}
needSnapshotSupport = true
}
capabilities, err := checkDriverCapabilities(p.grpcClient, p.timeout, needSnapshotSupport)
if err != nil {

if err := p.checkDriverCapabilities(needSnapshotSupport); err != nil {
return nil, err
}

if options.PVC.Spec.Selector != nil {
return nil, fmt.Errorf("claim Selector is not supported")
}

pvName, err := makeVolumeName(p.volumeNamePrefix, fmt.Sprintf("%s", options.PVC.ObjectMeta.UID), p.volumeNameUUIDLength)
if err != nil {
return nil, err
Expand Down Expand Up @@ -443,8 +410,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
req.VolumeContentSource = volumeContentSource
}

if capabilities.Has(PluginCapability_ACCESSIBILITY_CONSTRAINTS) &&
utilfeature.DefaultFeatureGate.Enabled(features.Topology) {
if p.supportsTopology() {
requirements, err := GenerateAccessibilityRequirements(
p.client,
p.csiAPIClient,
Expand Down Expand Up @@ -549,8 +515,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
},
}

if capabilities.Has(PluginCapability_ACCESSIBILITY_CONSTRAINTS) &&
utilfeature.DefaultFeatureGate.Enabled(features.Topology) {
if p.supportsTopology() {
pv.Spec.NodeAffinity = GenerateVolumeNodeAffinity(rep.Volume.AccessibleTopology)
}

Expand All @@ -568,6 +533,11 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
return pv, nil
}

func (p *csiProvisioner) supportsTopology() bool {
return p.pluginCapabilities[csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS] &&
utilfeature.DefaultFeatureGate.Enabled(features.Topology)
}

func removePrefixedParameters(param map[string]string) (map[string]string, error) {
newParam := map[string]string{}
for k, v := range param {
Expand Down Expand Up @@ -656,8 +626,7 @@ func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
}
volumeId := p.volumeHandleToId(volume.Spec.CSI.VolumeHandle)

_, err := checkDriverCapabilities(p.grpcClient, p.timeout, false)
if err != nil {
if err := p.checkDriverCapabilities(false); err != nil {
return err
}

Expand Down Expand Up @@ -685,7 +654,7 @@ func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
defer cancel()

_, err = p.csiClient.DeleteVolume(ctx, &req)
_, err := p.csiClient.DeleteVolume(ctx, &req)

return err
}
Expand Down
Loading