Skip to content

Commit

Permalink
Merge pull request #197 from erikh/fix-unmap
Browse files Browse the repository at this point in the history
Fix unmap
  • Loading branch information
Erik Hollensbe committed Mar 19, 2016
2 parents 9428dc1 + 31008a4 commit 79f80b5
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 51 deletions.
8 changes: 0 additions & 8 deletions storage/backend/ceph/ceph.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,6 @@ func (c *Driver) Mounted(timeout time.Duration) ([]*storage.Mount, error) {
return nil, err
}

if len(hostMounts) != len(mapped) {
return nil, errored.Errorf("Mounted and mapped volumes do not align.")
}

mounts := []*storage.Mount{}

for _, hostMount := range hostMounts {
Expand All @@ -415,9 +411,5 @@ func (c *Driver) Mounted(timeout time.Duration) ([]*storage.Mount, error) {
}
}

if len(mounts) != len(hostMounts) || len(mounts) != len(mapped) {
return nil, errored.Errorf("Did not align all mounts between mapped and mounted volumes.")
}

return mounts, nil
}
54 changes: 32 additions & 22 deletions storage/backend/ceph/internals.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func (c *Driver) mapImage(do storage.DriverOptions) (string, error) {
return "", err
}

for i := range rbdmap {
if rbdmap[i].Name == do.Volume.Name && rbdmap[i].Pool == do.Volume.Params["pool"] {
device = rbdmap[i].Device
for _, rbd := range rbdmap {
if rbd.Name == do.Volume.Name && rbd.Pool == do.Volume.Params["pool"] {
device = rbd.Device
break
}
}
Expand All @@ -52,7 +52,6 @@ func (c *Driver) mapImage(do storage.DriverOptions) (string, error) {
}

func (c *Driver) mkfsVolume(fscmd, devicePath string, timeout time.Duration) error {
// Create ext4 filesystem on the device. this will take a while
cmd := exec.Command("/bin/sh", "-c", templateFSCmd(fscmd, devicePath))
er, err := executor.NewWithTimeout(cmd, timeout).Run()
if err != nil || er.ExitStatus != 0 {
Expand All @@ -70,24 +69,35 @@ func (c *Driver) unmapImage(do storage.DriverOptions) error {
return err
}

for i := range rbdmap {
if rbdmap[i].Name == do.Volume.Name && rbdmap[i].Pool == do.Volume.Params["pool"] {
for {
log.Debugf("Unmapping volume %s/%s at device %q", poolName, do.Volume.Name, strings.TrimSpace(rbdmap[i].Device))
er, err := executor.New(exec.Command("rbd", "unmap", rbdmap[i].Device)).Run()
if err != nil || er.ExitStatus != 0 {
log.Errorf("Could not unmap volume %q (device %q): %v (%v) (%v)", do.Volume.Name, rbdmap[i].Device, er, err, er.Stderr)
if er.ExitStatus == 4096 {
log.Errorf("Retrying to unmap volume %q (device %q)...", do.Volume.Name, rbdmap[i].Device)
time.Sleep(100 * time.Millisecond)
continue
}
return err
for _, rbd := range rbdmap {
if rbd.Name == do.Volume.Name && rbd.Pool == do.Volume.Params["pool"] {
var retried bool

retry:
log.Debugf("Unmapping volume %s/%s at device %q", poolName, do.Volume.Name, strings.TrimSpace(rbd.Device))
er, err := executor.New(exec.Command("rbd", "unmap", rbd.Device)).Run()
if !retried && (err != nil || er.ExitStatus != 0) {
log.Errorf("Could not unmap volume %q (device %q): %v (%v) (%v)", do.Volume.Name, rbd.Device, er, err, er.Stderr)
if er.ExitStatus == 4096 {
log.Errorf("Retrying to unmap volume %q (device %q)...", do.Volume.Name, rbd.Device)
time.Sleep(100 * time.Millisecond)
retried = true
goto retry
}
return err
}

break
rbdmap2, err := c.showMapped(do.Timeout)
if err != nil {
return err
}

for _, rbd2 := range rbdmap2 {
if rbd.Name == rbd2.Name && rbd.Pool == rbd2.Pool {
retried = true
goto retry
}
}
break
}
}
Expand Down Expand Up @@ -129,13 +139,13 @@ func (c *Driver) getMapped(timeout time.Duration) ([]*storage.Mount, error) {

mounts := []*storage.Mount{}

for i := range rbdmap {
for _, rbd := range rbdmap {
mounts = append(mounts, &storage.Mount{
Device: rbdmap[i].Device,
Device: rbd.Device,
Volume: storage.Volume{
Name: strings.Replace(rbdmap[i].Name, ".", "/", -1),
Name: strings.Replace(rbd.Name, ".", "/", -1),
Params: map[string]string{
"pool": rbdmap[i].Pool,
"pool": rbd.Pool,
},
},
})
Expand Down
20 changes: 20 additions & 0 deletions systemtests/battery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,26 @@ import (
"github.com/contiv/vagrantssh"
)

func (s *systemtestSuite) TestBatteryMultiMountSameHost(c *C) {
c.Skip("Can't be run until docker fixes this bug")
count := 25
errChan := make(chan error, count)

c.Assert(s.createVolume("mon0", "policy1", "test", nil), IsNil)
dockerCmd := "docker run -d -v policy1/test:/mnt alpine sleep 10m"
c.Assert(s.vagrant.GetNode("mon0").RunCommand(dockerCmd), IsNil)

for x := 0; x < count; x++ {
go func() {
dockerCmd := "docker run -d -v policy1/test:/mnt alpine sleep 10m"
errChan <- s.vagrant.GetNode("mon0").RunCommand(dockerCmd)
}()
}

for x := 0; x < count; x++ {
c.Assert(<-errChan, NotNil)
}
}
func (s *systemtestSuite) TestBatteryParallelMount(c *C) {
nodes := s.vagrant.GetNodes()
count := 10
Expand Down
56 changes: 35 additions & 21 deletions volplugin/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ func (dc *DaemonConfig) mount(w http.ResponseWriter, r *http.Request) {
return
}

// FIXME check if we're holding the mount already
log.Infof("Mounting volume %q", uc.Request.Name)

volConfig, err := dc.requestVolume(uc.Policy, uc.Name)
Expand All @@ -245,32 +244,18 @@ func (dc *DaemonConfig) mount(w http.ResponseWriter, r *http.Request) {
return
}

ut := &config.UseMount{
Volume: volConfig,
Hostname: dc.Host,
}

if err := dc.Client.ReportMount(ut); err != nil {
httpError(w, "Reporting mount to master", err)
intName, err := driver.InternalName(uc.Request.Name)
if err != nil {
httpError(w, fmt.Sprintf("Volume %q does not satisfy name requirements", uc.Request.Name), err)
return
}

stopChan := dc.Client.AddStopChan(uc.Request.Name)
go dc.Client.HeartbeatMount(dc.Global.TTL, ut, stopChan)

actualSize, err := volConfig.Options.ActualSize()
if err != nil {
dc.Client.RemoveStopChan(uc.Request.Name)
httpError(w, "Computing size of volume", err)
return
}

intName, err := driver.InternalName(uc.Request.Name)
if err != nil {
httpError(w, fmt.Sprintf("Volume %q does not satisfy name requirements", uc.Request.Name), err)
return
}

driverOpts := storage.DriverOptions{
Volume: storage.Volume{
Name: intName,
Expand All @@ -284,6 +269,35 @@ func (dc *DaemonConfig) mount(w http.ResponseWriter, r *http.Request) {
},
}

// if we're mounted already on this host, the mount publish will succeed and
// we will have two mounts, which will cause trouble at unmount time.

mounts, err := driver.Mounted(dc.Global.Timeout)
if err != nil {
httpError(w, "System failure retrieving mounts", err)
return
}

for _, mount := range mounts {
if mount.Path == driver.MountPath(driverOpts) {
httpError(w, "Mount already exists on this host", nil)
return
}
}

ut := &config.UseMount{
Volume: volConfig,
Hostname: dc.Host,
}

if err := dc.Client.ReportMount(ut); err != nil {
httpError(w, "Reporting mount to master", err)
return
}

stopChan := dc.Client.AddStopChan(uc.Request.Name)
go dc.Client.HeartbeatMount(dc.Global.TTL, ut, stopChan)

mc, err := driver.Mount(driverOpts)
if err != nil {
dc.Client.RemoveStopChan(uc.Request.Name)
Expand Down Expand Up @@ -340,15 +354,15 @@ func (dc *DaemonConfig) unmount(w http.ResponseWriter, r *http.Request) {
Hostname: dc.Host,
}

dc.Client.RemoveStopChan(uc.Request.Name)

if err := driver.Unmount(driverOpts); err != nil {
dc.Client.AddStopChan(uc.Request.Name)
httpError(w, "Could not unmount image", err)
return
}

dc.Client.RemoveStopChan(uc.Request.Name)

if err := dc.Client.ReportUnmount(ut); err != nil {
dc.Client.AddStopChan(uc.Request.Name)
httpError(w, "Reporting unmount to master", err)
return
}
Expand Down

0 comments on commit 79f80b5

Please sign in to comment.