Skip to content

Commit

Permalink
Merge pull request #527 from saikat-royc/issue-526
Browse files Browse the repository at this point in the history
Return success in CreateVolume when disk is READY
  • Loading branch information
k8s-ci-robot authored Jun 18, 2020
2 parents 7e03d6d + 6962e03 commit c16e7d1
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 0 deletions.
11 changes: 11 additions & 0 deletions pkg/gce-cloud-provider/compute/cloud-disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ func (d *CloudDisk) GetKind() string {
}
}

func (d *CloudDisk) GetStatus() string {
switch d.Type() {
case Zonal:
return d.ZonalDisk.Status
case Regional:
return d.RegionalDisk.Status
default:
return "Unknown"
}
}

// GetPDType returns the type of the PD as either 'pd-standard' or 'pd-ssd' The
// "Type" field on the compute disk is stored as a url like
// projects/project/zones/zone/diskTypes/pd-standard
Expand Down
11 changes: 11 additions & 0 deletions pkg/gce-cloud-provider/compute/fake-gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type FakeCloudProvider struct {
pageTokens map[string]sets.String
instances map[string]*computev1.Instance
snapshots map[string]*computev1.Snapshot

// marker to set disk status during InsertDisk operation.
mockDiskStatus string
}

var _ GCECompute = &FakeCloudProvider{}
Expand All @@ -60,6 +63,8 @@ func CreateFakeCloudProvider(project, zone string, cloudDisks []*CloudDisk) (*Fa
instances: map[string]*computev1.Instance{},
snapshots: map[string]*computev1.Snapshot{},
pageTokens: map[string]sets.String{},
// A newly created disk is marked READY by default.
mockDiskStatus: "READY",
}
for _, d := range cloudDisks {
fcp.disks[d.GetName()] = d
Expand Down Expand Up @@ -250,6 +255,7 @@ func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, volKey *meta.Key
Type: cloud.GetDiskTypeURI(volKey, params.DiskType),
SelfLink: fmt.Sprintf("projects/%s/zones/%s/disks/%s", cloud.project, volKey.Zone, volKey.Name),
SourceSnapshotId: snapshotID,
Status: cloud.mockDiskStatus,
}
if params.DiskEncryptionKMSKey != "" {
diskToCreateGA.DiskEncryptionKey = &computev1.CustomerEncryptionKey{
Expand All @@ -265,6 +271,7 @@ func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, volKey *meta.Key
Type: cloud.GetDiskTypeURI(volKey, params.DiskType),
SelfLink: fmt.Sprintf("projects/%s/regions/%s/disks/%s", cloud.project, volKey.Region, volKey.Name),
SourceSnapshotId: snapshotID,
Status: cloud.mockDiskStatus,
}
if params.DiskEncryptionKMSKey != "" {
diskToCreateV1.DiskEncryptionKey = &computev1.CustomerEncryptionKey{
Expand Down Expand Up @@ -466,6 +473,10 @@ func (cloud *FakeCloudProvider) getGlobalSnapshotURI(snapshotName string) string
snapshotName)
}

func (cloud *FakeCloudProvider) UpdateDiskStatus(s string) {
cloud.mockDiskStatus = s
}

type FakeBlockingCloudProvider struct {
*FakeCloudProvider
ReadyToExecute chan chan struct{}
Expand Down
42 changes: 42 additions & 0 deletions pkg/gce-pd-csi-driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,29 @@ const (
replicationTypeRegionalPD = "regional-pd"
)

func isDiskReady(disk *gce.CloudDisk) (bool, error) {
status := disk.GetStatus()
switch status {
case "READY":
return true, nil
case "FAILED":
return false, fmt.Errorf("Disk %s status is FAILED", disk.GetName())
case "CREATING":
klog.V(4).Infof("Disk %s status is CREATING", disk.GetName())
return false, nil
case "DELETING":
klog.V(4).Infof("Disk %s status is DELETING", disk.GetName())
return false, nil
case "RESTORING":
klog.V(4).Infof("Disk %s status is RESTORING", disk.GetName())
return false, nil
default:
klog.V(4).Infof("Disk %s status is: %s", disk.GetName(), status)
}

return false, nil
}

func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
var err error
// Validate arguments
Expand Down Expand Up @@ -143,6 +166,16 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
if err != nil {
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("CreateVolume disk already exists with same name and is incompatible: %v", err))
}

ready, err := isDiskReady(existingDisk)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("CreateVolume disk %v had error checking ready status: %v", volKey, err))
}

if !ready {
return nil, status.Error(codes.Internal, fmt.Sprintf("CreateVolume disk %v is not ready", volKey))
}

// If there is no validation error, immediately return success
klog.V(4).Infof("CreateVolume succeeded for disk %v, it already exists and was compatible", volKey)
return generateCreateVolumeResponse(existingDisk, capBytes, zones), nil
Expand Down Expand Up @@ -187,6 +220,15 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
default:
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("CreateVolume replication type '%s' is not supported", params.ReplicationType))
}

ready, err := isDiskReady(disk)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("CreateVolume disk %v had error checking ready status: %v", volKey, err))
}
if !ready {
return nil, status.Error(codes.Internal, fmt.Sprintf("CreateVolume disk %v is not ready", volKey))
}

klog.V(4).Infof("CreateVolume succeeded for disk %v", volKey)
return generateCreateVolumeResponse(disk, capBytes, zones), nil

Expand Down
108 changes: 108 additions & 0 deletions pkg/gce-pd-csi-driver/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1549,3 +1549,111 @@ func TestVolumeOperationConcurrency(t *testing.T) {
t.Errorf("Unexpected error: %v", err)
}
}

func TestCreateVolumeDiskReady(t *testing.T) {
// Define test cases
testCases := []struct {
name string
diskStatus string
req *csi.CreateVolumeRequest
expVol *csi.Volume
expErrCode codes.Code
}{
{
name: "disk status RESTORING",
diskStatus: "RESTORING",
req: &csi.CreateVolumeRequest{
Name: "test-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCaps,
Parameters: stdParams,
},
expErrCode: codes.Internal,
},
{
name: "disk status CREATING",
diskStatus: "CREATING",
req: &csi.CreateVolumeRequest{
Name: "test-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCaps,
Parameters: stdParams,
},
expErrCode: codes.Internal,
},
{
name: "disk status DELETING",
diskStatus: "DELETING",
req: &csi.CreateVolumeRequest{
Name: "test-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCaps,
Parameters: stdParams,
},
expErrCode: codes.Internal,
},
{
name: "disk status FAILED",
diskStatus: "FAILED",
req: &csi.CreateVolumeRequest{
Name: "test-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCaps,
Parameters: stdParams,
},
expErrCode: codes.Internal,
},
{
name: "success default",
diskStatus: "READY",
req: &csi.CreateVolumeRequest{
Name: "test-name",
CapacityRange: stdCapRange,
VolumeCapabilities: stdVolCaps,
Parameters: stdParams,
},
expVol: &csi.Volume{
CapacityBytes: common.GbToBytes(20),
VolumeId: testVolumeID,
VolumeContext: nil,
AccessibleTopology: stdTopology,
},
},
}

// Run test cases
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fcp, err := gce.CreateFakeCloudProvider(project, zone, nil)
if err != nil {
t.Fatalf("Failed to create fake cloud provider: %v", err)
}

// Setup hook to create new disks with given status.
fcp.UpdateDiskStatus(tc.diskStatus)
// Setup new driver each time so no interference
gceDriver := initGCEDriverWithCloudProvider(t, fcp)
// Start Test
resp, err := gceDriver.cs.CreateVolume(context.Background(), tc.req)
//check response
if err != nil {
serverError, ok := status.FromError(err)
if !ok {
t.Fatalf("Could not get error status code from err: %v", serverError)
}
if serverError.Code() != tc.expErrCode {
t.Fatalf("Expected error code: %v, got: %v. err : %v", tc.expErrCode, serverError.Code(), err)
}
return
}
if tc.expErrCode != codes.OK {
t.Fatalf("Expected error: %v, got no error", tc.expErrCode)
}

vol := resp.GetVolume()
if !reflect.DeepEqual(vol, tc.expVol) {
t.Fatalf("Mismatch in expected vol %v, current volume: %v\n", tc.expVol, vol)
}
})
}
}

0 comments on commit c16e7d1

Please sign in to comment.