Skip to content

Commit

Permalink
Modify CephFs provisioner to use the ceph mgr commands
Browse files Browse the repository at this point in the history
Currently CephFs provisioner mounts the ceph filesystem
and creates a subdirectory as a part of provisioning the
volume. Ceph now supports commands to provision fs subvolumes,
hance modify the provisioner to use ceph mgr commands to
(de)provision fs subvolumes.

Signed-off-by: Poornima G <[email protected]>
  • Loading branch information
Poornima G authored and mergify[bot] committed Jul 12, 2019
1 parent fa68c35 commit 32ea550
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 126 deletions.
6 changes: 3 additions & 3 deletions e2e/deploy-rook.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ func deployOperator(c kubernetes.Interface) {

_, err := framework.RunKubectl("create", "-f", opPath)
Expect(err).Should(BeNil())
err = waitForDaemonSets("rook-ceph-agent", rookNS, c, deployTimeout)
Expect(err).Should(BeNil())
err = waitForDaemonSets("rook-discover", rookNS, c, deployTimeout)
Expect(err).Should(BeNil())
err = waitForDeploymentComplete("rook-ceph-operator", rookNS, c, deployTimeout)
Expand All @@ -80,10 +78,12 @@ func deployOperator(c kubernetes.Interface) {
func deployCluster(c kubernetes.Interface) {
opPath := fmt.Sprintf("%s/%s", rookURL, "cluster-test.yaml")
framework.RunKubectlOrDie("create", "-f", opPath)
err := waitForDaemonSets("rook-ceph-agent", rookNS, c, deployTimeout)
Expect(err).Should(BeNil())
opt := &metav1.ListOptions{
LabelSelector: "app=rook-ceph-mon",
}
err := checkCephPods(rookNS, c, 1, deployTimeout, opt)
err = checkCephPods(rookNS, c, 1, deployTimeout, opt)
Expect(err).Should(BeNil())
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/cephfs/cephuser.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func getCephUser(volOptions *volumeOptions, adminCr *util.Credentials, volID vol

func createCephUser(volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID) (*cephEntity, error) {
adminID, userID := genUserIDs(adminCr, volID)
volRootPath, err := getVolumeRootPathCeph(volOptions, adminCr, volID)
if err != nil {
return nil, err
}

return getSingleCephEntity(
"-m", volOptions.Monitors,
Expand All @@ -91,7 +95,7 @@ func createCephUser(volOptions *volumeOptions, adminCr *util.Credentials, volID
"-f", "json",
"auth", "get-or-create", userID,
// User capabilities
"mds", fmt.Sprintf("allow rw path=%s", getVolumeRootPathCeph(volID)),
"mds", fmt.Sprintf("allow rw path=%s", volRootPath),
"mon", "allow r",
"osd", fmt.Sprintf("allow rw pool=%s namespace=%s", volOptions.Pool, getVolumeNamespace(volID)),
)
Expand Down
7 changes: 6 additions & 1 deletion pkg/cephfs/mountcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,16 @@ func mountOneCacheEntry(volOptions *volumeOptions, vid *volumeIdentifier, me *vo
volID := vid.VolumeID

if volOptions.ProvisionVolume {
volOptions.RootPath = getVolumeRootPathCeph(volumeID(vid.FsSubvolName))
cr, err = util.GetAdminCredentials(decodeCredentials(me.Secrets))
if err != nil {
return err
}

volOptions.RootPath, err = getVolumeRootPathCeph(volOptions, cr, volumeID(vid.FsSubvolName))
if err != nil {
return err
}

var entity *cephEntity
entity, err = getCephUser(volOptions, cr, volumeID(vid.FsSubvolName))
if err != nil {
Expand Down
6 changes: 0 additions & 6 deletions pkg/cephfs/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/json"
"errors"
"fmt"
"os"
"os/exec"

"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -91,11 +90,6 @@ func isMountPoint(p string) (bool, error) {
return !notMnt, nil
}

func pathExists(p string) bool {
_, err := os.Stat(p)
return err == nil
}

// Controller service request validation
func (cs *ControllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
Expand Down
192 changes: 84 additions & 108 deletions pkg/cephfs/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,142 +17,118 @@ limitations under the License.
package cephfs

import (
"fmt"
"os"
"path"
"strconv"
"strings"

"github.com/ceph/ceph-csi/pkg/util"

"k8s.io/klog"
)

const (
cephVolumesRoot = "csi-volumes"

namespacePrefix = "ns-"
namespacePrefix = "fsvolumens_"
csiSubvolumeGroup = "csi"
)

func getCephRootPathLocal(volID volumeID) string {
return fmt.Sprintf("%s/controller/volumes/root-%s", PluginFolder, string(volID))
}
var (
// cephfsInit is used to create "csi" subvolume group for the first time the csi plugin loads.
// Subvolume group create gets called every time the plugin loads, though it doesn't result in error
// its unnecessary
cephfsInit = false
)

func getCephRootVolumePathLocal(volID volumeID) string {
return path.Join(getCephRootPathLocal(volID), cephVolumesRoot, string(volID))
}
func getVolumeRootPathCeph(volOptions *volumeOptions, cr *util.Credentials, volID volumeID) (string, error) {
stdout, _, err := util.ExecCommand(
"ceph",
"fs",
"subvolume",
"getpath",
volOptions.FsName,
string(volID),
"--group_name",
csiSubvolumeGroup,
"-m", volOptions.Monitors,
"-c", util.CephConfigPath,
"-n", cephEntityClientPrefix+cr.ID,
"--key="+cr.Key)

func getVolumeRootPathCeph(volID volumeID) string {
return path.Join("/", cephVolumesRoot, string(volID))
if err != nil {
klog.Errorf("failed to get the rootpath for the vol %s(%s)", string(volID), err)
return "", err
}
return strings.TrimSuffix(string(stdout), "\n"), nil
}

func getVolumeNamespace(volID volumeID) string {
return namespacePrefix + string(volID)
}

func setVolumeAttribute(root, attrName, attrValue string) error {
return execCommandErr("setfattr", "-n", attrName, "-v", attrValue, root)
}

func createVolume(volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID, bytesQuota int64) error {
if err := mountCephRoot(volID, volOptions, adminCr); err != nil {
return err
}
defer unmountCephRoot(volID)

var (
volRoot = getCephRootVolumePathLocal(volID)
volRootCreating = volRoot + "-creating"
)

if pathExists(volRoot) {
klog.V(4).Infof("cephfs: volume %s already exists, skipping creation", volID)
return nil
}

if err := createMountPoint(volRootCreating); err != nil {
return err
}

if bytesQuota > 0 {
if err := setVolumeAttribute(volRootCreating, "ceph.quota.max_bytes", fmt.Sprintf("%d", bytesQuota)); err != nil {
func createVolume(volOptions *volumeOptions, cr *util.Credentials, volID volumeID, bytesQuota int64) error {
//TODO: When we support multiple fs, need to hande subvolume group create for all fs's
if !cephfsInit {
err := execCommandErr(
"ceph",
"fs",
"subvolumegroup",
"create",
volOptions.FsName,
csiSubvolumeGroup,
"--mode",
"777",
"--pool_layout",
volOptions.Pool,
"-m", volOptions.Monitors,
"-c", util.CephConfigPath,
"-n", cephEntityClientPrefix+cr.ID,
"--key="+cr.Key)
if err != nil {
klog.Errorf("failed to create subvolume group csi, for the vol %s(%s)", string(volID), err)
return err
}
klog.V(4).Infof("cephfs: created subvolume group csi")
cephfsInit = true
}

if err := setVolumeAttribute(volRootCreating, "ceph.dir.layout.pool", volOptions.Pool); err != nil {
return fmt.Errorf("%v\ncephfs: Does pool '%s' exist?", err, volOptions.Pool)
}

if err := setVolumeAttribute(volRootCreating, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volID)); err != nil {
return err
}

if err := os.Rename(volRootCreating, volRoot); err != nil {
return fmt.Errorf("couldn't mark volume %s as created: %v", volID, err)
}

return nil
}

func purgeVolume(volID volumeID, adminCr *util.Credentials, volOptions *volumeOptions) error {
if err := mountCephRoot(volID, volOptions, adminCr); err != nil {
err := execCommandErr(
"ceph",
"fs",
"subvolume",
"create",
volOptions.FsName,
string(volID),
strconv.FormatInt(bytesQuota, 10),
"--group_name",
csiSubvolumeGroup,
"-m", volOptions.Monitors,
"-c", util.CephConfigPath,
"-n", cephEntityClientPrefix+cr.ID,
"--key="+cr.Key)
if err != nil {
klog.Errorf("failed to create subvolume %s(%s) in fs %s", string(volID), err, volOptions.FsName)
return err
}
defer unmountCephRoot(volID)

var (
volRoot = getCephRootVolumePathLocal(volID)
volRootDeleting = volRoot + "-deleting"
)

if pathExists(volRoot) {
if err := os.Rename(volRoot, volRootDeleting); err != nil {
return fmt.Errorf("couldn't mark volume %s for deletion: %v", volID, err)
}
} else {
if !pathExists(volRootDeleting) {
klog.V(4).Infof("cephfs: volume %s not found, assuming it to be already deleted", volID)
return nil
}
}

if err := os.RemoveAll(volRootDeleting); err != nil {
return fmt.Errorf("failed to delete volume %s: %v", volID, err)
}

return nil
}

func mountCephRoot(volID volumeID, volOptions *volumeOptions, adminCr *util.Credentials) error {
cephRoot := getCephRootPathLocal(volID)

// Root path is not set for dynamically provisioned volumes
// Access to cephfs's / is required
volOptions.RootPath = "/"

if err := createMountPoint(cephRoot); err != nil {
return err
}

m, err := newMounter(volOptions)
func purgeVolume(volID volumeID, cr *util.Credentials, volOptions *volumeOptions) error {
err := execCommandErr(
"ceph",
"fs",
"subvolume",
"rm",
volOptions.FsName,
string(volID),
"--group_name",
csiSubvolumeGroup,
"--force",
"-m", volOptions.Monitors,
"-c", util.CephConfigPath,
"-n", cephEntityClientPrefix+cr.ID,
"--key="+cr.Key)
if err != nil {
return fmt.Errorf("failed to create mounter: %v", err)
}

if err = m.mount(cephRoot, adminCr, volOptions); err != nil {
return fmt.Errorf("error mounting ceph root: %v", err)
klog.Errorf("failed to purge subvolume %s(%s) in fs %s", string(volID), err, volOptions.FsName)
return err
}

return nil
}

func unmountCephRoot(volID volumeID) {
cephRoot := getCephRootPathLocal(volID)

if err := unmountVolume(cephRoot); err != nil {
klog.Errorf("failed to unmount %s with error %s", cephRoot, err)
} else {
if err := os.Remove(cephRoot); err != nil {
klog.Errorf("failed to remove %s with error %s", cephRoot, err)
}
}
}
9 changes: 7 additions & 2 deletions pkg/cephfs/volumeoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cephfs

import (
"fmt"
"path"
"strconv"

"github.com/pkg/errors"
Expand Down Expand Up @@ -225,7 +226,11 @@ func newVolumeOptionsFromVolID(volID string, volOpt, secrets map[string]string)
}
}

volOptions.RootPath = getVolumeRootPathCeph(volumeID(vid.FsSubvolName))
volOptions.RootPath, err = getVolumeRootPathCeph(&volOptions, cr, volumeID(vid.FsSubvolName))
if err != nil {
return nil, nil, err
}

volOptions.ProvisionVolume = true

return &volOptions, &vid, nil
Expand Down Expand Up @@ -267,7 +272,7 @@ func newVolumeOptionsFromVersion1Context(volID string, options, secrets map[stri
return nil, nil, err
}

opts.RootPath = getVolumeRootPathCeph(volumeID(volID))
opts.RootPath = path.Join("/csi-volumes", string(volumeID(volID)))
} else {
if err = extractOption(&opts.RootPath, "rootPath", options); err != nil {
return nil, nil, err
Expand Down
9 changes: 5 additions & 4 deletions pkg/util/cephcmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,18 @@ import (
// ExecCommand executes passed in program with args and returns separate stdout and stderr streams
func ExecCommand(program string, args ...string) (stdout, stderr []byte, err error) {
var (
cmd = exec.Command(program, args...) // nolint: gosec
stdoutBuf bytes.Buffer
stderrBuf bytes.Buffer
cmd = exec.Command(program, args...) // nolint: gosec
sanitizedArgs = StripSecretInArgs(args)
stdoutBuf bytes.Buffer
stderrBuf bytes.Buffer
)

cmd.Stdout = &stdoutBuf
cmd.Stderr = &stderrBuf

if err := cmd.Run(); err != nil {
return stdoutBuf.Bytes(), stderrBuf.Bytes(), fmt.Errorf("an error (%v)"+
" occurred while running %s", err, program)
" occurred while running %s args: %v", err, program, sanitizedArgs)
}

return stdoutBuf.Bytes(), nil, nil
Expand Down
2 changes: 1 addition & 1 deletion scripts/travis-functest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ sudo scripts/minikube.sh k8s-sidecar
sudo chown -R travis: "$HOME"/.minikube /usr/local/bin/kubectl
# functional tests

go test github.com/ceph/ceph-csi/e2e --rook-version=v1.0.1 --deploy-rook=true --deploy-timeout=10 -timeout=30m -v
go test github.com/ceph/ceph-csi/e2e --rook-version=master --deploy-rook=true --deploy-timeout=10 -timeout=30m -v

sudo scripts/minikube.sh clean

0 comments on commit 32ea550

Please sign in to comment.