Skip to content

Commit

Permalink
rbd: protect against concurrent gRPC calls
Browse files Browse the repository at this point in the history
The timeout value in external-provisioner is fairly low. It's not
uncommon that it times out and retries before the rbdplugin is done
with CreateVolume. rbdplugin has to serialize calls and ensure that
they are idempotent to deal with this.
  • Loading branch information
pohly committed Oct 26, 2018
1 parent 188cdd1 commit 720ad4a
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 2 deletions.
14 changes: 12 additions & 2 deletions pkg/rbd/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
}

volumeNameMutex.LockKey(req.GetName())
defer volumeNameMutex.UnlockKey(req.GetName())

// Need to check for already existing volume name, and if found
// check for the requested capacity and already allocated capacity
if exVol, err := getRBDVolumeByName(req.GetName()); err == nil {
Expand Down Expand Up @@ -156,6 +159,8 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
}
// For now the image get unconditionally deleted, but here retention policy can be checked
volumeID := req.GetVolumeId()
volumeIDMutex.LockKey(volumeID)
defer volumeIDMutex.UnlockKey(volumeID)
rbdVol := &rbdVolume{}
if err := loadVolInfo(volumeID, path.Join(PluginFolder, "controller"), rbdVol); err != nil {
if os.IsNotExist(errors.Cause(err)) {
Expand All @@ -174,8 +179,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
}
// Removing persistent storage file for the unmapped volume
if err := deleteVolInfo(volumeID, path.Join(PluginFolder, "controller")); err != nil {
// TODO: we can theoretically end up here when two DeleteVolume calls
// get invoked concurrently. Serialize?
return nil, err
}

Expand Down Expand Up @@ -214,6 +217,9 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
return nil, status.Error(codes.InvalidArgument, "Source Volume ID cannot be empty")
}

snapshotNameMutex.LockKey(req.GetName())
defer snapshotNameMutex.UnlockKey(req.GetName())

// Need to check for already existing snapshot name, and if found
// check for the requested source volume id and already allocated source volume id
if exSnap, err := getRBDSnapshotByName(req.GetName()); err == nil {
Expand Down Expand Up @@ -332,6 +338,9 @@ func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
if len(snapshotID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Snapshot ID cannot be empty")
}
snapshotIDMutex.LockKey(snapshotID)
defer snapshotIDMutex.UnlockKey(snapshotID)

rbdSnap := &rbdSnapshot{}
if err := loadSnapInfo(snapshotID, path.Join(PluginFolder, "controller-snap"), rbdSnap); err != nil {
return nil, err
Expand Down Expand Up @@ -368,6 +377,7 @@ func (cs *controllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap
sourceVolumeId := req.GetSourceVolumeId()

// TODO (sngchlko) list with token
// TODO (#94) protect concurrent access to global data structures

// list only a specific snapshot which has snapshot ID
if snapshotID := req.GetSnapshotId(); len(snapshotID) != 0 {
Expand Down
5 changes: 5 additions & 0 deletions pkg/rbd/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
s := strings.Split(strings.TrimSuffix(targetPath, "/mount"), "/")
volName := s[len(s)-1]

targetPathMutex.LockKey(targetPath)
defer targetPathMutex.UnlockKey(targetPath)

notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
if err != nil {
if os.IsNotExist(err) {
Expand Down Expand Up @@ -97,6 +100,8 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis

func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
targetPath := req.GetTargetPath()
targetPathMutex.LockKey(targetPath)
defer targetPathMutex.UnlockKey(targetPath)

notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions pkg/rbd/rbd_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,19 @@ type rbdSnapshot struct {
}

var (
// serializes operations based on "<rbd pool>/<rbd image>" as key
attachdetachMutex = keymutex.NewKeyMutex()
// serializes operations based on "volume name" as key
volumeNameMutex = keymutex.NewKeyMutex()
// serializes operations based on "volume id" as key
volumeIDMutex = keymutex.NewKeyMutex()
// serializes operations based on "snapshot name" as key
snapshotNameMutex = keymutex.NewKeyMutex()
// serializes operations based on "snapshot id" as key
snapshotIDMutex = keymutex.NewKeyMutex()
// serializes operations based on "mount target path" as key
targetPathMutex = keymutex.NewKeyMutex()

supportedFeatures = sets.NewString("layering")
)

Expand Down

0 comments on commit 720ad4a

Please sign in to comment.