From de1d89e165ebc9d5a652f231d1587a63fe3d2cca Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 7 Aug 2020 09:10:35 -0400 Subject: [PATCH] csi: client RPCs should return wrapped errors for checking When the client-side actions of a CSI client RPC succeed but we get disconnected during the RPC or we fail to checkpoint the claim state, we want to be able to retry the client RPC without getting blocked by the client-side state (ex. mount points) already having been cleaned up in previous calls. --- client/csi_endpoint.go | 26 ++++++++++++++++++----- client/csi_endpoint_test.go | 6 +++--- client/pluginmanager/csimanager/volume.go | 12 +++++------ nomad/client_csi_endpoint.go | 11 ++++------ nomad/csi_endpoint.go | 8 ++----- nomad/structs/errors.go | 5 +++++ nomad/util.go | 2 +- plugins/csi/client.go | 15 +++++++++---- 8 files changed, 53 insertions(+), 32 deletions(-) diff --git a/client/csi_endpoint.go b/client/csi_endpoint.go index 6f9bf6e28fc..8f86dfe7f8c 100644 --- a/client/csi_endpoint.go +++ b/client/csi_endpoint.go @@ -3,6 +3,7 @@ package client import ( "context" "errors" + "fmt" "time" metrics "github.com/armon/go-metrics" @@ -10,6 +11,7 @@ import ( "github.com/hashicorp/nomad/client/dynamicplugins" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/client/structs" + nstructs "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/csi" ) @@ -46,7 +48,9 @@ func (c *CSI) ControllerValidateVolume(req *structs.ClientCSIControllerValidateV plugin, err := c.findControllerPlugin(req.PluginID) if err != nil { - return err + // the server's view of the plugin health is stale, so let it know it + // should retry with another controller instance + return fmt.Errorf("%w: %v", nstructs.ErrCSIClientRPCRetryable, err) } defer plugin.Close() @@ -78,7 +82,9 @@ func (c *CSI) ControllerAttachVolume(req *structs.ClientCSIControllerAttachVolum defer metrics.MeasureSince([]string{"client", "csi_controller", "publish_volume"}, time.Now()) plugin, err := c.findControllerPlugin(req.PluginID) if err != nil { - return err + // the server's view of the plugin health is stale, so let it know it + // should retry with another controller instance + return fmt.Errorf("%w: %v", nstructs.ErrCSIClientRPCRetryable, err) } defer plugin.Close() @@ -123,7 +129,9 @@ func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolum defer metrics.MeasureSince([]string{"client", "csi_controller", "unpublish_volume"}, time.Now()) plugin, err := c.findControllerPlugin(req.PluginID) if err != nil { - return err + // the server's view of the plugin health is stale, so let it know it + // should retry with another controller instance + return fmt.Errorf("%w: %v", nstructs.ErrCSIClientRPCRetryable, err) } defer plugin.Close() @@ -152,9 +160,14 @@ func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolum grpc_retry.WithMax(3), grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond))) if err != nil { + if errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) { + // if the controller detach previously happened but the server failed to + // checkpoint, we'll get an error from the plugin but can safely ignore it. + c.c.logger.Debug("could not unpublish volume: %v", err) + return nil + } return err } - return nil } @@ -191,7 +204,10 @@ func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, re } err = mounter.UnmountVolume(ctx, req.VolumeID, req.ExternalID, req.AllocID, usageOpts) - if err != nil { + if err != nil && !errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) { + // if the unmounting previously happened but the server failed to + // checkpoint, we'll get an error from Unmount but can safely + // ignore it. return err } return nil diff --git a/client/csi_endpoint_test.go b/client/csi_endpoint_test.go index 35fbb3e4ead..766d6a800d4 100644 --- a/client/csi_endpoint_test.go +++ b/client/csi_endpoint_test.go @@ -41,7 +41,7 @@ func TestCSIController_AttachVolume(t *testing.T) { PluginID: "some-garbage", }, }, - ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"), + ExpectedErr: errors.New("CSI client error (retryable): plugin some-garbage for type csi-controller not found"), }, { Name: "validates volumeid is not empty", @@ -198,7 +198,7 @@ func TestCSIController_ValidateVolume(t *testing.T) { }, VolumeID: "foo", }, - ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"), + ExpectedErr: errors.New("CSI client error (retryable): plugin some-garbage for type csi-controller not found"), }, { Name: "validates attachmentmode", @@ -287,7 +287,7 @@ func TestCSIController_DetachVolume(t *testing.T) { PluginID: "some-garbage", }, }, - ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"), + ExpectedErr: errors.New("CSI client error (retryable): plugin some-garbage for type csi-controller not found"), }, { Name: "validates volumeid is not empty", diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index 9f6a5cf2e15..9dccc049853 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -2,6 +2,7 @@ package csimanager import ( "context" + "errors" "fmt" "os" "path/filepath" @@ -317,7 +318,7 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, al // host target path was already destroyed, nothing to do here. // this helps us in the case that a previous GC attempt cleaned // up the volume on the node but the controller RPCs failed - return nil + rpcErr = fmt.Errorf("%w: %v", structs.ErrCSIClientRPCIgnorable, rpcErr) } return rpcErr } @@ -332,9 +333,8 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, al // We successfully removed the directory, return any rpcErrors that were // encountered, but because we got here, they were probably flaky or was - // cleaned up externally. We might want to just return `nil` here in the - // future. - return rpcErr + // cleaned up externally. + return fmt.Errorf("%w: %v", structs.ErrCSIClientRPCIgnorable, rpcErr) } func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions) (err error) { @@ -343,7 +343,7 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo err = v.unpublishVolume(ctx, volID, remoteID, allocID, usage) - if err == nil { + if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) { canRelease := v.usageTracker.Free(allocID, volID, usage) if v.requiresStaging && canRelease { err = v.unstageVolume(ctx, volID, remoteID, usage) @@ -354,7 +354,7 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo SetSubsystem(structs.NodeEventSubsystemStorage). SetMessage("Unmount volume"). AddDetail("volume_id", volID) - if err == nil { + if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) { event.AddDetail("success", "true") } else { event.AddDetail("success", "false") diff --git a/nomad/client_csi_endpoint.go b/nomad/client_csi_endpoint.go index d8208da4e1c..e0e24bf2a07 100644 --- a/nomad/client_csi_endpoint.go +++ b/nomad/client_csi_endpoint.go @@ -1,15 +1,16 @@ package nomad import ( + "errors" "fmt" "math/rand" - "strings" "time" metrics "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad/structs" ) // ClientCSI is used to forward RPC requests to the targed Nomad client's @@ -110,12 +111,8 @@ func (a *ClientCSI) ControllerDetachVolume(args *cstructs.ClientCSIControllerDet // client has stopped and been GC'd, or where the controller has stopped but // we don't have the fingerprint update yet func (a *ClientCSI) isRetryable(err error, clientID, pluginID string) bool { - // TODO(tgross): it would be nicer to use errors.Is here but we - // need to make sure we're using error wrapping to make that work - errMsg := err.Error() - return strings.Contains(errMsg, fmt.Sprintf("Unknown node: %s", clientID)) || - strings.Contains(errMsg, "no plugins registered for type: csi-controller") || - strings.Contains(errMsg, fmt.Sprintf("plugin %s for type controller not found", pluginID)) + return errors.Is(err, structs.ErrUnknownNode) || + errors.Is(err, structs.ErrCSIClientRPCRetryable) } func (a *ClientCSI) NodeDetachVolume(args *cstructs.ClientCSINodeDetachVolumeRequest, reply *cstructs.ClientCSINodeDetachVolumeResponse) error { diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 5d12a4ee3dc..d232d1de7a4 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -620,9 +620,7 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C // we should only get this error if the Nomad node disconnects and // is garbage-collected, so at this point we don't have any reason // to operate as though the volume is attached to it. - if !errors.Is(err, fmt.Errorf("Unknown node: %s", claim.NodeID)) { - // TODO(tgross): need to capture case where NodeUnpublish previously - // happened but we failed to checkpoint for some reason + if !errors.Is(err, structs.ErrUnknownNode) { return fmt.Errorf("could not detach from node: %w", err) } } @@ -658,8 +656,6 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str err := v.srv.RPC("ClientCSI.ControllerDetachVolume", req, &cstructs.ClientCSIControllerDetachVolumeResponse{}) if err != nil { - // TODO(tgross): need to capture case where ControllerUnpublish previously - // happened but we failed to checkpoint for some reason return fmt.Errorf("could not detach from controller: %v", err) } claim.State = structs.CSIVolumeClaimStateReadyToFree @@ -700,7 +696,7 @@ func (v *CSIVolume) lookupExternalNodeID(vol *structs.CSIVolume, claim *structs. // get the the storage provider's ID for the client node (not // Nomad's ID for the node) targetCSIInfo, ok := targetNode.CSINodePlugins[vol.PluginID] - if !ok { + if !ok || targetCSIInfo.NodeInfo == nil { return "", fmt.Errorf("failed to find storage provider info for client %q, node plugin %q is not running or has not fingerprinted on this client", targetNode.ID, vol.PluginID) } return targetCSIInfo.NodeInfo.ID, nil diff --git a/nomad/structs/errors.go b/nomad/structs/errors.go index 7dbfbad8dbe..f20e3ae1f8d 100644 --- a/nomad/structs/errors.go +++ b/nomad/structs/errors.go @@ -52,6 +52,8 @@ var ( ErrNodeLacksRpc = errors.New(errNodeLacksRpc) ErrMissingAllocID = errors.New(errMissingAllocID) + ErrUnknownNode = errors.New(ErrUnknownNodePrefix) + ErrDeploymentTerminalNoCancel = errors.New(errDeploymentTerminalNoCancel) ErrDeploymentTerminalNoFail = errors.New(errDeploymentTerminalNoFail) ErrDeploymentTerminalNoPause = errors.New(errDeploymentTerminalNoPause) @@ -61,6 +63,9 @@ var ( ErrDeploymentTerminalNoRun = errors.New(errDeploymentTerminalNoRun) ErrDeploymentTerminalNoSetHealth = errors.New(errDeploymentTerminalNoSetHealth) ErrDeploymentRunningNoUnblock = errors.New(errDeploymentRunningNoUnblock) + + ErrCSIClientRPCIgnorable = errors.New("CSI client error (ignorable)") + ErrCSIClientRPCRetryable = errors.New("CSI client error (retryable)") ) // IsErrNoLeader returns whether the error is due to there being no leader. diff --git a/nomad/util.go b/nomad/util.go index e2772a73c70..2a5f5b0daf3 100644 --- a/nomad/util.go +++ b/nomad/util.go @@ -246,7 +246,7 @@ func getNodeForRpc(snap *state.StateSnapshot, nodeID string) (*structs.Node, err } if node == nil { - return nil, fmt.Errorf("Unknown node %q", nodeID) + return nil, fmt.Errorf("%w %s", structs.ErrUnknownNode, nodeID) } if err := nodeSupportsRpc(node); err != nil { diff --git a/plugins/csi/client.go b/plugins/csi/client.go index a44e5b1611f..bfff2848d50 100644 --- a/plugins/csi/client.go +++ b/plugins/csi/client.go @@ -12,6 +12,7 @@ import ( multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/grpc-middleware/logging" + "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/shared/hclspec" "google.golang.org/grpc" @@ -314,8 +315,12 @@ func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerU code := status.Code(err) switch code { case codes.NotFound: - err = fmt.Errorf("volume %q or node %q could not be found: %v", - req.ExternalID, req.NodeID, err) + // we'll have validated the volume and node *should* exist at the + // server, so if we get a not-found here it's because we've previously + // checkpointed. we'll return an error so the caller can log it for + // diagnostic purposes. + err = fmt.Errorf("%w: volume %q or node %q could not be found: %v", + structs.ErrCSIClientRPCIgnorable, req.ExternalID, req.NodeID, err) case codes.Internal: err = fmt.Errorf("controller plugin returned an internal error, check the plugin allocation logs for more information: %v", err) } @@ -558,7 +563,8 @@ func (c *client) NodeUnstageVolume(ctx context.Context, volumeID string, staging code := status.Code(err) switch code { case codes.NotFound: - err = fmt.Errorf("volume %q could not be found: %v", volumeID, err) + err = fmt.Errorf("%w: volume %q could not be found: %v", + structs.ErrCSIClientRPCIgnorable, volumeID, err) case codes.Internal: err = fmt.Errorf("node plugin returned an internal error, check the plugin allocation logs for more information: %v", err) } @@ -630,7 +636,8 @@ func (c *client) NodeUnpublishVolume(ctx context.Context, volumeID, targetPath s code := status.Code(err) switch code { case codes.NotFound: - err = fmt.Errorf("volume %q could not be found: %v", volumeID, err) + err = fmt.Errorf("%w: volume %q could not be found: %v", + structs.ErrCSIClientRPCIgnorable, volumeID, err) case codes.Internal: err = fmt.Errorf("node plugin returned an internal error, check the plugin allocation logs for more information: %v", err) }