diff --git a/pkg/driver/node.go b/pkg/driver/node.go index 2dc3dc28f2..6c22567b92 100644 --- a/pkg/driver/node.go +++ b/pkg/driver/node.go @@ -92,46 +92,46 @@ func newNodeService(driverOptions *DriverOptions) nodeService { } } -func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { +func (d *nodeService) doNodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (string, *csi.NodeStageVolumeResponse, error) { klog.V(4).Infof("NodeStageVolume: called with args %+v", *req) volumeID := req.GetVolumeId() if len(volumeID) == 0 { - return nil, status.Error(codes.InvalidArgument, "Volume ID not provided") + return "", nil, status.Error(codes.InvalidArgument, "Volume ID not provided") } target := req.GetStagingTargetPath() if len(target) == 0 { - return nil, status.Error(codes.InvalidArgument, "Staging target not provided") + return "", nil, status.Error(codes.InvalidArgument, "Staging target not provided") } volCap := req.GetVolumeCapability() if volCap == nil { - return nil, status.Error(codes.InvalidArgument, "Volume capability not provided") + return "", nil, status.Error(codes.InvalidArgument, "Volume capability not provided") } if !isValidVolumeCapabilities([]*csi.VolumeCapability{volCap}) { - return nil, status.Error(codes.InvalidArgument, "Volume capability not supported") + return "", nil, status.Error(codes.InvalidArgument, "Volume capability not supported") } // If the access type is block, do nothing for stage switch volCap.GetAccessType().(type) { case *csi.VolumeCapability_Block: - return &csi.NodeStageVolumeResponse{}, nil + return "", &csi.NodeStageVolumeResponse{}, nil } - mount := volCap.GetMount() - if mount == nil { - return nil, status.Error(codes.InvalidArgument, "NodeStageVolume: mount is nil within volume capability") + mnt := volCap.GetMount() + if mnt == nil { + return "", nil, status.Error(codes.InvalidArgument, "NodeStageVolume: mnt is nil within volume capability") } - fsType := mount.GetFsType() + fsType := mnt.GetFsType() if len(fsType) == 0 { fsType = defaultFsType } var mountOptions []string - for _, f := range mount.MountFlags { + for _, f := range mnt.MountFlags { if !hasMountOption(mountOptions, f) { mountOptions = append(mountOptions, f) } @@ -139,7 +139,7 @@ func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol if ok := d.inFlight.Insert(req); !ok { msg := fmt.Sprintf("request to stage volume=%q is already in progress", volumeID) - return nil, status.Error(codes.Internal, msg) + return "", nil, status.Error(codes.Internal, msg) } defer func() { klog.V(4).Infof("NodeStageVolume: volume=%q operation finished", req.GetVolumeId()) @@ -149,12 +149,12 @@ func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol devicePath, ok := req.PublishContext[DevicePathKey] if !ok { - return nil, status.Error(codes.InvalidArgument, "Device path not provided") + return "", nil, status.Error(codes.InvalidArgument, "Device path not provided") } source, err := d.findDevicePath(devicePath, volumeID) if err != nil { - return nil, status.Errorf(codes.Internal, "Failed to find device path %s. %v", devicePath, err) + return "", nil, status.Errorf(codes.Internal, "Failed to find device path %s. %v", devicePath, err) } klog.V(4).Infof("NodeStageVolume: find device path %s -> %s", devicePath, source) @@ -162,7 +162,7 @@ func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol exists, err := d.mounter.ExistsPath(target) if err != nil { msg := fmt.Sprintf("failed to check if target %q exists: %v", target, err) - return nil, status.Error(codes.Internal, msg) + return "", nil, status.Error(codes.Internal, msg) } // When exists is true it means target path was created but device isn't mounted. // We don't want to do anything in that case and let the operation proceed. @@ -172,7 +172,7 @@ func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol klog.V(4).Infof("NodeStageVolume: creating target dir %q", target) if err = d.mounter.MakeDir(target); err != nil { msg := fmt.Sprintf("could not create target dir %q: %v", target, err) - return nil, status.Error(codes.Internal, msg) + return "", nil, status.Error(codes.Internal, msg) } } @@ -180,7 +180,7 @@ func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol device, _, err := d.mounter.GetDeviceName(target) if err != nil { msg := fmt.Sprintf("failed to check if volume is already mounted: %v", err) - return nil, status.Error(codes.Internal, msg) + return "", nil, status.Error(codes.Internal, msg) } // This operation (NodeStageVolume) MUST be idempotent. @@ -188,15 +188,43 @@ func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol // and is identical to the specified volume_capability the Plugin MUST reply 0 OK. if device == source { klog.V(4).Infof("NodeStageVolume: volume=%q already staged", volumeID) - return &csi.NodeStageVolumeResponse{}, nil + return device, &csi.NodeStageVolumeResponse{}, nil } // FormatAndMount will format only if needed + klog.V(5).Infof("NodeStageVolume: formatting %s and mounting at %s with fstype %s", source, target, fsType) err = d.mounter.FormatAndMount(source, target, fsType, mountOptions) if err != nil { - msg := fmt.Sprintf("could not format %q and mount it at %q", source, target) - return nil, status.Error(codes.Internal, msg) + msg := fmt.Sprintf("could not format %q and mnt it at %q", source, target) + return "", nil, status.Error(codes.Internal, msg) + } + return source, &csi.NodeStageVolumeResponse{}, nil +} + +func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { + device, rsp, err := d.doNodeStageVolume(ctx, req) + if err != nil { + return rsp, err + } + + // Expand the filesystem to full volume size, if needed. + volumeCap := req.GetVolumeCapability() + if volumeCap != nil { + switch volumeCap.GetAccessType().(type) { + case *csi.VolumeCapability_Block: + return rsp, nil + } + } + + r := resizefs.NewResizeFs(&mount.SafeFormatAndMount{ + Interface: mount.New(""), + Exec: exec.New(), + }) + + // TODO: lock per volume ID to have some idempotency + if _, err := r.ResizeIfNecessary(device, req.StagingTargetPath); err != nil { + return nil, status.Errorf(codes.Internal, "Could not resize volume %q (%q): %v", req.GetVolumeId(), device, err) } return &csi.NodeStageVolumeResponse{}, nil diff --git a/vendor/k8s.io/kubernetes/pkg/util/resizefs/resizefs_linux.go b/vendor/k8s.io/kubernetes/pkg/util/resizefs/resizefs_linux.go index 5ec8a38776..ee70351e4b 100644 --- a/vendor/k8s.io/kubernetes/pkg/util/resizefs/resizefs_linux.go +++ b/vendor/k8s.io/kubernetes/pkg/util/resizefs/resizefs_linux.go @@ -20,6 +20,8 @@ package resizefs import ( "fmt" + "strconv" + "strings" "k8s.io/klog" "k8s.io/utils/mount" @@ -60,6 +62,122 @@ func (resizefs *ResizeFs) Resize(devicePath string, deviceMountPath string) (boo return false, fmt.Errorf("ResizeFS.Resize - resize of format %s is not supported for device %s mounted at %s", format, devicePath, deviceMountPath) } +func (resizefs *ResizeFs) ResizeIfNecessary(devicePath string, deviceMountPath string) (bool, error) { + resize, err := resizefs.needResize(devicePath, deviceMountPath) + if err != nil { + return false, err + } + if resize { + klog.V(2).Infof("Volume %s needs resizing", devicePath) + return resizefs.Resize(devicePath, deviceMountPath) + } + return false, nil +} + +func (resizefs *ResizeFs) getDeviceSize(devicePath string) (uint64, error) { + output, err := resizefs.mounter.Exec.Command("blockdev", "--getsize64", devicePath).CombinedOutput() + outStr := strings.TrimSpace(string(output)) + if err != nil { + return 0, fmt.Errorf("failed to read size of device %s: %s: %s", devicePath, err, outStr) + } + size, err := strconv.ParseUint(outStr, 10, 64) + if err != nil { + return 0, fmt.Errorf("failed to parse size of device %s %s: %s", devicePath, outStr, err) + } + return size, nil +} + +func (resizefs *ResizeFs) getExtSize(devicePath string) (uint64, uint64, error) { + output, err := resizefs.mounter.Exec.Command("dumpe2fs", "-h", devicePath).CombinedOutput() + if err != nil { + return 0, 0, fmt.Errorf("failed to read size of filesystem on %s: %s: %s", devicePath, err, string(output)) + } + + lines := strings.Split(string(output), "\n") + var blockSize, blockCount uint64 + + for _, line := range lines { + tokens := strings.Split(line, ":") + if len(tokens) != 2 { + continue + } + key, value := strings.TrimSpace(tokens[0]), strings.TrimSpace(tokens[1]) + if key == "Block count" { + blockCount, err = strconv.ParseUint(value, 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("failed to parse block count %s: %s", value, err) + } + } + if key == "Block size" { + blockSize, err = strconv.ParseUint(value, 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("failed to parse block size %s: %s", value, err) + } + } + } + if blockSize == 0 { + return 0, 0, fmt.Errorf("could not find block size of device %s", devicePath) + } + if blockCount == 0 { + return 0, 0, fmt.Errorf("could not find block count of device %s", devicePath) + } + return blockSize, blockSize * blockCount, nil +} + +func (resizefs *ResizeFs) getXFSSize(devicePath string) (uint64, uint64, error) { + // TODO: implement parsing of xfs_info + // meta-data=/dev/loop0 isize=512 agcount=4, agsize=655424 blks + // = sectsz=512 attr=2, projid32bit=1 + // = crc=1 finobt=0 spinodes=0 + // data = bsize=4096 blocks=2621696, imaxpct=25 + // = sunit=0 swidth=0 blks + // naming =version 2 bsize=4096 ascii-ci=0 ftype=1 + // log =internal bsize=4096 blocks=2560, version=2 + // = sectsz=512 sunit=0 blks, lazy-count=1 + // realtime =none extsz=4096 blocks=0, rtextents=0 + // + // Need to parse "bsize=" and "blocks=" in "data =" segment. + // There is no machine-friendly output. + + return 0, 0, fmt.Errorf("unimplemented") +} + +func (resizefs *ResizeFs) needResize(devicePath string, deviceMountPath string) (bool, error) { + deviceSize, err := resizefs.getDeviceSize(devicePath) + if err != nil { + return false, err + } + var fsSize, blockSize uint64 + format, err := resizefs.mounter.GetDiskFormat(devicePath) + if err != nil { + formatErr := fmt.Errorf("ResizeFS.Resize - error checking format for device %s: %v", devicePath, err) + return false, formatErr + } + + // If disk has no format, there is no need to resize the disk because mkfs.* + // by default will use whole disk anyways. + if format == "" { + return false, nil + } + + klog.V(3).Infof("ResizeFS.needResize - checking mounted volume %s", devicePath) + switch format { + case "ext3", "ext4": + blockSize, fsSize, err = resizefs.getExtSize(devicePath) + case "xfs": + blockSize, fsSize, err = resizefs.getXFSSize(deviceMountPath) + } + if err != nil { + return false, err + } + // Tolerate one block difference, just in case of rounding errors somewhere. + klog.V(5).Infof("Volume %s: device size=%d, filesystem size=%d, block size=%d", devicePath, deviceSize, fsSize, blockSize) + if deviceSize <= fsSize+blockSize { + return false, nil + } + return true, nil +} + func (resizefs *ResizeFs) extResize(devicePath string) (bool, error) { output, err := resizefs.mounter.Exec.Command("resize2fs", devicePath).CombinedOutput() if err == nil { diff --git a/vendor/k8s.io/kubernetes/pkg/util/resizefs/resizefs_unsupported.go b/vendor/k8s.io/kubernetes/pkg/util/resizefs/resizefs_unsupported.go index cde1d4ca08..791130e643 100644 --- a/vendor/k8s.io/kubernetes/pkg/util/resizefs/resizefs_unsupported.go +++ b/vendor/k8s.io/kubernetes/pkg/util/resizefs/resizefs_unsupported.go @@ -38,3 +38,7 @@ func NewResizeFs(mounter *mount.SafeFormatAndMount) *ResizeFs { func (resizefs *ResizeFs) Resize(devicePath string, deviceMountPath string) (bool, error) { return false, fmt.Errorf("Resize is not supported for this build") } + +func (resizefs *ResizeFs) ResizeIfNecessary(devicePath string, deviceMountPath string) (bool, error) { + return false, fmt.Errorf("Resize is not supported for this build") +}