diff --git a/client/csi_endpoint.go b/client/csi_endpoint.go index 6f9bf6e28fc..8f86dfe7f8c 100644 --- a/client/csi_endpoint.go +++ b/client/csi_endpoint.go @@ -3,6 +3,7 @@ package client import ( "context" "errors" + "fmt" "time" metrics "github.com/armon/go-metrics" @@ -10,6 +11,7 @@ import ( "github.com/hashicorp/nomad/client/dynamicplugins" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/client/structs" + nstructs "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/csi" ) @@ -46,7 +48,9 @@ func (c *CSI) ControllerValidateVolume(req *structs.ClientCSIControllerValidateV plugin, err := c.findControllerPlugin(req.PluginID) if err != nil { - return err + // the server's view of the plugin health is stale, so let it know it + // should retry with another controller instance + return fmt.Errorf("%w: %v", nstructs.ErrCSIClientRPCRetryable, err) } defer plugin.Close() @@ -78,7 +82,9 @@ func (c *CSI) ControllerAttachVolume(req *structs.ClientCSIControllerAttachVolum defer metrics.MeasureSince([]string{"client", "csi_controller", "publish_volume"}, time.Now()) plugin, err := c.findControllerPlugin(req.PluginID) if err != nil { - return err + // the server's view of the plugin health is stale, so let it know it + // should retry with another controller instance + return fmt.Errorf("%w: %v", nstructs.ErrCSIClientRPCRetryable, err) } defer plugin.Close() @@ -123,7 +129,9 @@ func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolum defer metrics.MeasureSince([]string{"client", "csi_controller", "unpublish_volume"}, time.Now()) plugin, err := c.findControllerPlugin(req.PluginID) if err != nil { - return err + // the server's view of the plugin health is stale, so let it know it + // should retry with another controller instance + return fmt.Errorf("%w: %v", nstructs.ErrCSIClientRPCRetryable, err) } defer plugin.Close() @@ -152,9 +160,14 @@ func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolum grpc_retry.WithMax(3), grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond))) if err != nil { + if errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) { + // if the controller detach previously happened but the server failed to + // checkpoint, we'll get an error from the plugin but can safely ignore it. + c.c.logger.Debug("could not unpublish volume: %v", err) + return nil + } return err } - return nil } @@ -191,7 +204,10 @@ func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, re } err = mounter.UnmountVolume(ctx, req.VolumeID, req.ExternalID, req.AllocID, usageOpts) - if err != nil { + if err != nil && !errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) { + // if the unmounting previously happened but the server failed to + // checkpoint, we'll get an error from Unmount but can safely + // ignore it. return err } return nil diff --git a/client/csi_endpoint_test.go b/client/csi_endpoint_test.go index 35fbb3e4ead..766d6a800d4 100644 --- a/client/csi_endpoint_test.go +++ b/client/csi_endpoint_test.go @@ -41,7 +41,7 @@ func TestCSIController_AttachVolume(t *testing.T) { PluginID: "some-garbage", }, }, - ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"), + ExpectedErr: errors.New("CSI client error (retryable): plugin some-garbage for type csi-controller not found"), }, { Name: "validates volumeid is not empty", @@ -198,7 +198,7 @@ func TestCSIController_ValidateVolume(t *testing.T) { }, VolumeID: "foo", }, - ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"), + ExpectedErr: errors.New("CSI client error (retryable): plugin some-garbage for type csi-controller not found"), }, { Name: "validates attachmentmode", @@ -287,7 +287,7 @@ func TestCSIController_DetachVolume(t *testing.T) { PluginID: "some-garbage", }, }, - ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"), + ExpectedErr: errors.New("CSI client error (retryable): plugin some-garbage for type csi-controller not found"), }, { Name: "validates volumeid is not empty", diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index 9f6a5cf2e15..9dccc049853 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -2,6 +2,7 @@ package csimanager import ( "context" + "errors" "fmt" "os" "path/filepath" @@ -317,7 +318,7 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, al // host target path was already destroyed, nothing to do here. // this helps us in the case that a previous GC attempt cleaned // up the volume on the node but the controller RPCs failed - return nil + rpcErr = fmt.Errorf("%w: %v", structs.ErrCSIClientRPCIgnorable, rpcErr) } return rpcErr } @@ -332,9 +333,8 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, al // We successfully removed the directory, return any rpcErrors that were // encountered, but because we got here, they were probably flaky or was - // cleaned up externally. We might want to just return `nil` here in the - // future. - return rpcErr + // cleaned up externally. + return fmt.Errorf("%w: %v", structs.ErrCSIClientRPCIgnorable, rpcErr) } func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions) (err error) { @@ -343,7 +343,7 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo err = v.unpublishVolume(ctx, volID, remoteID, allocID, usage) - if err == nil { + if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) { canRelease := v.usageTracker.Free(allocID, volID, usage) if v.requiresStaging && canRelease { err = v.unstageVolume(ctx, volID, remoteID, usage) @@ -354,7 +354,7 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo SetSubsystem(structs.NodeEventSubsystemStorage). SetMessage("Unmount volume"). AddDetail("volume_id", volID) - if err == nil { + if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) { event.AddDetail("success", "true") } else { event.AddDetail("success", "false") diff --git a/nomad/client_csi_endpoint.go b/nomad/client_csi_endpoint.go index d8208da4e1c..e0e24bf2a07 100644 --- a/nomad/client_csi_endpoint.go +++ b/nomad/client_csi_endpoint.go @@ -1,15 +1,16 @@ package nomad import ( + "errors" "fmt" "math/rand" - "strings" "time" metrics "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad/structs" ) // ClientCSI is used to forward RPC requests to the targed Nomad client's @@ -110,12 +111,8 @@ func (a *ClientCSI) ControllerDetachVolume(args *cstructs.ClientCSIControllerDet // client has stopped and been GC'd, or where the controller has stopped but // we don't have the fingerprint update yet func (a *ClientCSI) isRetryable(err error, clientID, pluginID string) bool { - // TODO(tgross): it would be nicer to use errors.Is here but we - // need to make sure we're using error wrapping to make that work - errMsg := err.Error() - return strings.Contains(errMsg, fmt.Sprintf("Unknown node: %s", clientID)) || - strings.Contains(errMsg, "no plugins registered for type: csi-controller") || - strings.Contains(errMsg, fmt.Sprintf("plugin %s for type controller not found", pluginID)) + return errors.Is(err, structs.ErrUnknownNode) || + errors.Is(err, structs.ErrCSIClientRPCRetryable) } func (a *ClientCSI) NodeDetachVolume(args *cstructs.ClientCSINodeDetachVolumeRequest, reply *cstructs.ClientCSINodeDetachVolumeResponse) error { diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 5d12a4ee3dc..d232d1de7a4 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -620,9 +620,7 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C // we should only get this error if the Nomad node disconnects and // is garbage-collected, so at this point we don't have any reason // to operate as though the volume is attached to it. - if !errors.Is(err, fmt.Errorf("Unknown node: %s", claim.NodeID)) { - // TODO(tgross): need to capture case where NodeUnpublish previously - // happened but we failed to checkpoint for some reason + if !errors.Is(err, structs.ErrUnknownNode) { return fmt.Errorf("could not detach from node: %w", err) } } @@ -658,8 +656,6 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str err := v.srv.RPC("ClientCSI.ControllerDetachVolume", req, &cstructs.ClientCSIControllerDetachVolumeResponse{}) if err != nil { - // TODO(tgross): need to capture case where ControllerUnpublish previously - // happened but we failed to checkpoint for some reason return fmt.Errorf("could not detach from controller: %v", err) } claim.State = structs.CSIVolumeClaimStateReadyToFree @@ -700,7 +696,7 @@ func (v *CSIVolume) lookupExternalNodeID(vol *structs.CSIVolume, claim *structs. // get the the storage provider's ID for the client node (not // Nomad's ID for the node) targetCSIInfo, ok := targetNode.CSINodePlugins[vol.PluginID] - if !ok { + if !ok || targetCSIInfo.NodeInfo == nil { return "", fmt.Errorf("failed to find storage provider info for client %q, node plugin %q is not running or has not fingerprinted on this client", targetNode.ID, vol.PluginID) } return targetCSIInfo.NodeInfo.ID, nil diff --git a/nomad/structs/errors.go b/nomad/structs/errors.go index 7dbfbad8dbe..f20e3ae1f8d 100644 --- a/nomad/structs/errors.go +++ b/nomad/structs/errors.go @@ -52,6 +52,8 @@ var ( ErrNodeLacksRpc = errors.New(errNodeLacksRpc) ErrMissingAllocID = errors.New(errMissingAllocID) + ErrUnknownNode = errors.New(ErrUnknownNodePrefix) + ErrDeploymentTerminalNoCancel = errors.New(errDeploymentTerminalNoCancel) ErrDeploymentTerminalNoFail = errors.New(errDeploymentTerminalNoFail) ErrDeploymentTerminalNoPause = errors.New(errDeploymentTerminalNoPause) @@ -61,6 +63,9 @@ var ( ErrDeploymentTerminalNoRun = errors.New(errDeploymentTerminalNoRun) ErrDeploymentTerminalNoSetHealth = errors.New(errDeploymentTerminalNoSetHealth) ErrDeploymentRunningNoUnblock = errors.New(errDeploymentRunningNoUnblock) + + ErrCSIClientRPCIgnorable = errors.New("CSI client error (ignorable)") + ErrCSIClientRPCRetryable = errors.New("CSI client error (retryable)") ) // IsErrNoLeader returns whether the error is due to there being no leader. diff --git a/nomad/util.go b/nomad/util.go index e2772a73c70..2a5f5b0daf3 100644 --- a/nomad/util.go +++ b/nomad/util.go @@ -246,7 +246,7 @@ func getNodeForRpc(snap *state.StateSnapshot, nodeID string) (*structs.Node, err } if node == nil { - return nil, fmt.Errorf("Unknown node %q", nodeID) + return nil, fmt.Errorf("%w %s", structs.ErrUnknownNode, nodeID) } if err := nodeSupportsRpc(node); err != nil { diff --git a/plugins/csi/client.go b/plugins/csi/client.go index a44e5b1611f..bfff2848d50 100644 --- a/plugins/csi/client.go +++ b/plugins/csi/client.go @@ -12,6 +12,7 @@ import ( multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/grpc-middleware/logging" + "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/shared/hclspec" "google.golang.org/grpc" @@ -314,8 +315,12 @@ func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerU code := status.Code(err) switch code { case codes.NotFound: - err = fmt.Errorf("volume %q or node %q could not be found: %v", - req.ExternalID, req.NodeID, err) + // we'll have validated the volume and node *should* exist at the + // server, so if we get a not-found here it's because we've previously + // checkpointed. we'll return an error so the caller can log it for + // diagnostic purposes. + err = fmt.Errorf("%w: volume %q or node %q could not be found: %v", + structs.ErrCSIClientRPCIgnorable, req.ExternalID, req.NodeID, err) case codes.Internal: err = fmt.Errorf("controller plugin returned an internal error, check the plugin allocation logs for more information: %v", err) } @@ -558,7 +563,8 @@ func (c *client) NodeUnstageVolume(ctx context.Context, volumeID string, staging code := status.Code(err) switch code { case codes.NotFound: - err = fmt.Errorf("volume %q could not be found: %v", volumeID, err) + err = fmt.Errorf("%w: volume %q could not be found: %v", + structs.ErrCSIClientRPCIgnorable, volumeID, err) case codes.Internal: err = fmt.Errorf("node plugin returned an internal error, check the plugin allocation logs for more information: %v", err) } @@ -630,7 +636,8 @@ func (c *client) NodeUnpublishVolume(ctx context.Context, volumeID, targetPath s code := status.Code(err) switch code { case codes.NotFound: - err = fmt.Errorf("volume %q could not be found: %v", volumeID, err) + err = fmt.Errorf("%w: volume %q could not be found: %v", + structs.ErrCSIClientRPCIgnorable, volumeID, err) case codes.Internal: err = fmt.Errorf("node plugin returned an internal error, check the plugin allocation logs for more information: %v", err) }