Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: [do not merge] Resize ext4 filesystem on NodeStage #595

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 48 additions & 20 deletions pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,54 +92,54 @@ func newNodeService(driverOptions *DriverOptions) nodeService {
}
}

func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
func (d *nodeService) doNodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (string, *csi.NodeStageVolumeResponse, error) {
klog.V(4).Infof("NodeStageVolume: called with args %+v", *req)

volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
return "", nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
}

target := req.GetStagingTargetPath()
if len(target) == 0 {
return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
return "", nil, status.Error(codes.InvalidArgument, "Staging target not provided")
}

volCap := req.GetVolumeCapability()
if volCap == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
return "", nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
}

if !isValidVolumeCapabilities([]*csi.VolumeCapability{volCap}) {
return nil, status.Error(codes.InvalidArgument, "Volume capability not supported")
return "", nil, status.Error(codes.InvalidArgument, "Volume capability not supported")
}

// If the access type is block, do nothing for stage
switch volCap.GetAccessType().(type) {
case *csi.VolumeCapability_Block:
return &csi.NodeStageVolumeResponse{}, nil
return "", &csi.NodeStageVolumeResponse{}, nil
}

mount := volCap.GetMount()
if mount == nil {
return nil, status.Error(codes.InvalidArgument, "NodeStageVolume: mount is nil within volume capability")
mnt := volCap.GetMount()
if mnt == nil {
return "", nil, status.Error(codes.InvalidArgument, "NodeStageVolume: mnt is nil within volume capability")
}

fsType := mount.GetFsType()
fsType := mnt.GetFsType()
if len(fsType) == 0 {
fsType = defaultFsType
}

var mountOptions []string
for _, f := range mount.MountFlags {
for _, f := range mnt.MountFlags {
if !hasMountOption(mountOptions, f) {
mountOptions = append(mountOptions, f)
}
}

if ok := d.inFlight.Insert(req); !ok {
msg := fmt.Sprintf("request to stage volume=%q is already in progress", volumeID)
return nil, status.Error(codes.Internal, msg)
return "", nil, status.Error(codes.Internal, msg)
}
defer func() {
klog.V(4).Infof("NodeStageVolume: volume=%q operation finished", req.GetVolumeId())
Expand All @@ -149,20 +149,20 @@ func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol

devicePath, ok := req.PublishContext[DevicePathKey]
if !ok {
return nil, status.Error(codes.InvalidArgument, "Device path not provided")
return "", nil, status.Error(codes.InvalidArgument, "Device path not provided")
}

source, err := d.findDevicePath(devicePath, volumeID)
if err != nil {
return nil, status.Errorf(codes.Internal, "Failed to find device path %s. %v", devicePath, err)
return "", nil, status.Errorf(codes.Internal, "Failed to find device path %s. %v", devicePath, err)
}

klog.V(4).Infof("NodeStageVolume: find device path %s -> %s", devicePath, source)

exists, err := d.mounter.ExistsPath(target)
if err != nil {
msg := fmt.Sprintf("failed to check if target %q exists: %v", target, err)
return nil, status.Error(codes.Internal, msg)
return "", nil, status.Error(codes.Internal, msg)
}
// When exists is true it means target path was created but device isn't mounted.
// We don't want to do anything in that case and let the operation proceed.
Expand All @@ -172,31 +172,59 @@ func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
klog.V(4).Infof("NodeStageVolume: creating target dir %q", target)
if err = d.mounter.MakeDir(target); err != nil {
msg := fmt.Sprintf("could not create target dir %q: %v", target, err)
return nil, status.Error(codes.Internal, msg)
return "", nil, status.Error(codes.Internal, msg)
}
}

// Check if a device is mounted in target directory
device, _, err := d.mounter.GetDeviceName(target)
if err != nil {
msg := fmt.Sprintf("failed to check if volume is already mounted: %v", err)
return nil, status.Error(codes.Internal, msg)
return "", nil, status.Error(codes.Internal, msg)
}

// This operation (NodeStageVolume) MUST be idempotent.
// If the volume corresponding to the volume_id is already staged to the staging_target_path,
// and is identical to the specified volume_capability the Plugin MUST reply 0 OK.
if device == source {
klog.V(4).Infof("NodeStageVolume: volume=%q already staged", volumeID)
return &csi.NodeStageVolumeResponse{}, nil
return device, &csi.NodeStageVolumeResponse{}, nil
}

// FormatAndMount will format only if needed

klog.V(5).Infof("NodeStageVolume: formatting %s and mounting at %s with fstype %s", source, target, fsType)
err = d.mounter.FormatAndMount(source, target, fsType, mountOptions)
if err != nil {
msg := fmt.Sprintf("could not format %q and mount it at %q", source, target)
return nil, status.Error(codes.Internal, msg)
msg := fmt.Sprintf("could not format %q and mnt it at %q", source, target)
return "", nil, status.Error(codes.Internal, msg)
}
return source, &csi.NodeStageVolumeResponse{}, nil
}

func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
device, rsp, err := d.doNodeStageVolume(ctx, req)
if err != nil {
return rsp, err
}

// Expand the filesystem to full volume size, if needed.
volumeCap := req.GetVolumeCapability()
if volumeCap != nil {
switch volumeCap.GetAccessType().(type) {
case *csi.VolumeCapability_Block:
return rsp, nil
}
}

r := resizefs.NewResizeFs(&mount.SafeFormatAndMount{
Interface: mount.New(""),
Exec: exec.New(),
})

// TODO: lock per volume ID to have some idempotency
if _, err := r.ResizeIfNecessary(device, req.StagingTargetPath); err != nil {
return nil, status.Errorf(codes.Internal, "Could not resize volume %q (%q): %v", req.GetVolumeId(), device, err)
}

return &csi.NodeStageVolumeResponse{}, nil
Expand Down
118 changes: 118 additions & 0 deletions vendor/k8s.io/kubernetes/pkg/util/resizefs/resizefs_linux.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.