Skip to content

Commit 079f60c

Browse files
authored
csi: client RPCs should return wrapped errors for checking (#8605)
When the client-side actions of a CSI client RPC succeed but we get disconnected during the RPC or we fail to checkpoint the claim state, we want to be able to retry the client RPC without getting blocked by the client-side state (ex. mount points) already having been cleaned up in previous calls.
1 parent a264dd2 commit 079f60c

File tree

8 files changed

+53
-32
lines changed

8 files changed

+53
-32
lines changed

client/csi_endpoint.go

+21-5
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@ package client
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"time"
78

89
metrics "github.com/armon/go-metrics"
910
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
1011
"github.com/hashicorp/nomad/client/dynamicplugins"
1112
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
1213
"github.com/hashicorp/nomad/client/structs"
14+
nstructs "github.com/hashicorp/nomad/nomad/structs"
1315
"github.com/hashicorp/nomad/plugins/csi"
1416
)
1517

@@ -46,7 +48,9 @@ func (c *CSI) ControllerValidateVolume(req *structs.ClientCSIControllerValidateV
4648

4749
plugin, err := c.findControllerPlugin(req.PluginID)
4850
if err != nil {
49-
return err
51+
// the server's view of the plugin health is stale, so let it know it
52+
// should retry with another controller instance
53+
return fmt.Errorf("%w: %v", nstructs.ErrCSIClientRPCRetryable, err)
5054
}
5155
defer plugin.Close()
5256

@@ -78,7 +82,9 @@ func (c *CSI) ControllerAttachVolume(req *structs.ClientCSIControllerAttachVolum
7882
defer metrics.MeasureSince([]string{"client", "csi_controller", "publish_volume"}, time.Now())
7983
plugin, err := c.findControllerPlugin(req.PluginID)
8084
if err != nil {
81-
return err
85+
// the server's view of the plugin health is stale, so let it know it
86+
// should retry with another controller instance
87+
return fmt.Errorf("%w: %v", nstructs.ErrCSIClientRPCRetryable, err)
8288
}
8389
defer plugin.Close()
8490

@@ -123,7 +129,9 @@ func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolum
123129
defer metrics.MeasureSince([]string{"client", "csi_controller", "unpublish_volume"}, time.Now())
124130
plugin, err := c.findControllerPlugin(req.PluginID)
125131
if err != nil {
126-
return err
132+
// the server's view of the plugin health is stale, so let it know it
133+
// should retry with another controller instance
134+
return fmt.Errorf("%w: %v", nstructs.ErrCSIClientRPCRetryable, err)
127135
}
128136
defer plugin.Close()
129137

@@ -152,9 +160,14 @@ func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolum
152160
grpc_retry.WithMax(3),
153161
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
154162
if err != nil {
163+
if errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) {
164+
// if the controller detach previously happened but the server failed to
165+
// checkpoint, we'll get an error from the plugin but can safely ignore it.
166+
c.c.logger.Debug("could not unpublish volume: %v", err)
167+
return nil
168+
}
155169
return err
156170
}
157-
158171
return nil
159172
}
160173

@@ -191,7 +204,10 @@ func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, re
191204
}
192205

193206
err = mounter.UnmountVolume(ctx, req.VolumeID, req.ExternalID, req.AllocID, usageOpts)
194-
if err != nil {
207+
if err != nil && !errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) {
208+
// if the unmounting previously happened but the server failed to
209+
// checkpoint, we'll get an error from Unmount but can safely
210+
// ignore it.
195211
return err
196212
}
197213
return nil

client/csi_endpoint_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func TestCSIController_AttachVolume(t *testing.T) {
4141
PluginID: "some-garbage",
4242
},
4343
},
44-
ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"),
44+
ExpectedErr: errors.New("CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
4545
},
4646
{
4747
Name: "validates volumeid is not empty",
@@ -198,7 +198,7 @@ func TestCSIController_ValidateVolume(t *testing.T) {
198198
},
199199
VolumeID: "foo",
200200
},
201-
ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"),
201+
ExpectedErr: errors.New("CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
202202
},
203203
{
204204
Name: "validates attachmentmode",
@@ -287,7 +287,7 @@ func TestCSIController_DetachVolume(t *testing.T) {
287287
PluginID: "some-garbage",
288288
},
289289
},
290-
ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"),
290+
ExpectedErr: errors.New("CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
291291
},
292292
{
293293
Name: "validates volumeid is not empty",

client/pluginmanager/csimanager/volume.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package csimanager
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"os"
78
"path/filepath"
@@ -317,7 +318,7 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, al
317318
// host target path was already destroyed, nothing to do here.
318319
// this helps us in the case that a previous GC attempt cleaned
319320
// up the volume on the node but the controller RPCs failed
320-
return nil
321+
rpcErr = fmt.Errorf("%w: %v", structs.ErrCSIClientRPCIgnorable, rpcErr)
321322
}
322323
return rpcErr
323324
}
@@ -332,9 +333,8 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, al
332333

333334
// We successfully removed the directory, return any rpcErrors that were
334335
// encountered, but because we got here, they were probably flaky or was
335-
// cleaned up externally. We might want to just return `nil` here in the
336-
// future.
337-
return rpcErr
336+
// cleaned up externally.
337+
return fmt.Errorf("%w: %v", structs.ErrCSIClientRPCIgnorable, rpcErr)
338338
}
339339

340340
func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions) (err error) {
@@ -343,7 +343,7 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo
343343

344344
err = v.unpublishVolume(ctx, volID, remoteID, allocID, usage)
345345

346-
if err == nil {
346+
if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) {
347347
canRelease := v.usageTracker.Free(allocID, volID, usage)
348348
if v.requiresStaging && canRelease {
349349
err = v.unstageVolume(ctx, volID, remoteID, usage)
@@ -354,7 +354,7 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo
354354
SetSubsystem(structs.NodeEventSubsystemStorage).
355355
SetMessage("Unmount volume").
356356
AddDetail("volume_id", volID)
357-
if err == nil {
357+
if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) {
358358
event.AddDetail("success", "true")
359359
} else {
360360
event.AddDetail("success", "false")

nomad/client_csi_endpoint.go

+4-7
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
package nomad
22

33
import (
4+
"errors"
45
"fmt"
56
"math/rand"
6-
"strings"
77
"time"
88

99
metrics "github.com/armon/go-metrics"
1010
log "github.com/hashicorp/go-hclog"
1111
memdb "github.com/hashicorp/go-memdb"
1212
cstructs "github.com/hashicorp/nomad/client/structs"
13+
"github.com/hashicorp/nomad/nomad/structs"
1314
)
1415

1516
// ClientCSI is used to forward RPC requests to the targed Nomad client's
@@ -110,12 +111,8 @@ func (a *ClientCSI) ControllerDetachVolume(args *cstructs.ClientCSIControllerDet
110111
// client has stopped and been GC'd, or where the controller has stopped but
111112
// we don't have the fingerprint update yet
112113
func (a *ClientCSI) isRetryable(err error, clientID, pluginID string) bool {
113-
// TODO(tgross): it would be nicer to use errors.Is here but we
114-
// need to make sure we're using error wrapping to make that work
115-
errMsg := err.Error()
116-
return strings.Contains(errMsg, fmt.Sprintf("Unknown node: %s", clientID)) ||
117-
strings.Contains(errMsg, "no plugins registered for type: csi-controller") ||
118-
strings.Contains(errMsg, fmt.Sprintf("plugin %s for type controller not found", pluginID))
114+
return errors.Is(err, structs.ErrUnknownNode) ||
115+
errors.Is(err, structs.ErrCSIClientRPCRetryable)
119116
}
120117

121118
func (a *ClientCSI) NodeDetachVolume(args *cstructs.ClientCSINodeDetachVolumeRequest, reply *cstructs.ClientCSINodeDetachVolumeResponse) error {

nomad/csi_endpoint.go

+2-6
Original file line numberDiff line numberDiff line change
@@ -605,9 +605,7 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C
605605
// we should only get this error if the Nomad node disconnects and
606606
// is garbage-collected, so at this point we don't have any reason
607607
// to operate as though the volume is attached to it.
608-
if !errors.Is(err, fmt.Errorf("Unknown node: %s", claim.NodeID)) {
609-
// TODO(tgross): need to capture case where NodeUnpublish previously
610-
// happened but we failed to checkpoint for some reason
608+
if !errors.Is(err, structs.ErrUnknownNode) {
611609
return fmt.Errorf("could not detach from node: %w", err)
612610
}
613611
}
@@ -662,8 +660,6 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str
662660
err = v.srv.RPC("ClientCSI.ControllerDetachVolume", req,
663661
&cstructs.ClientCSIControllerDetachVolumeResponse{})
664662
if err != nil {
665-
// TODO(tgross): need to capture case where ControllerUnpublish previously
666-
// happened but we failed to checkpoint for some reason
667663
return fmt.Errorf("could not detach from controller: %v", err)
668664
}
669665
claim.State = structs.CSIVolumeClaimStateReadyToFree
@@ -704,7 +700,7 @@ func (v *CSIVolume) lookupExternalNodeID(vol *structs.CSIVolume, claim *structs.
704700
// get the the storage provider's ID for the client node (not
705701
// Nomad's ID for the node)
706702
targetCSIInfo, ok := targetNode.CSINodePlugins[vol.PluginID]
707-
if !ok {
703+
if !ok || targetCSIInfo.NodeInfo == nil {
708704
return "", fmt.Errorf("failed to find storage provider info for client %q, node plugin %q is not running or has not fingerprinted on this client", targetNode.ID, vol.PluginID)
709705
}
710706
return targetCSIInfo.NodeInfo.ID, nil

nomad/structs/errors.go

+5
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ var (
5252
ErrNodeLacksRpc = errors.New(errNodeLacksRpc)
5353
ErrMissingAllocID = errors.New(errMissingAllocID)
5454

55+
ErrUnknownNode = errors.New(ErrUnknownNodePrefix)
56+
5557
ErrDeploymentTerminalNoCancel = errors.New(errDeploymentTerminalNoCancel)
5658
ErrDeploymentTerminalNoFail = errors.New(errDeploymentTerminalNoFail)
5759
ErrDeploymentTerminalNoPause = errors.New(errDeploymentTerminalNoPause)
@@ -61,6 +63,9 @@ var (
6163
ErrDeploymentTerminalNoRun = errors.New(errDeploymentTerminalNoRun)
6264
ErrDeploymentTerminalNoSetHealth = errors.New(errDeploymentTerminalNoSetHealth)
6365
ErrDeploymentRunningNoUnblock = errors.New(errDeploymentRunningNoUnblock)
66+
67+
ErrCSIClientRPCIgnorable = errors.New("CSI client error (ignorable)")
68+
ErrCSIClientRPCRetryable = errors.New("CSI client error (retryable)")
6469
)
6570

6671
// IsErrNoLeader returns whether the error is due to there being no leader.

nomad/util.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ func getNodeForRpc(snap *state.StateSnapshot, nodeID string) (*structs.Node, err
246246
}
247247

248248
if node == nil {
249-
return nil, fmt.Errorf("Unknown node %q", nodeID)
249+
return nil, fmt.Errorf("%w %s", structs.ErrUnknownNode, nodeID)
250250
}
251251

252252
if err := nodeSupportsRpc(node); err != nil {

plugins/csi/client.go

+11-4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
multierror "github.com/hashicorp/go-multierror"
1313
"github.com/hashicorp/nomad/helper"
1414
"github.com/hashicorp/nomad/helper/grpc-middleware/logging"
15+
"github.com/hashicorp/nomad/nomad/structs"
1516
"github.com/hashicorp/nomad/plugins/base"
1617
"github.com/hashicorp/nomad/plugins/shared/hclspec"
1718
"google.golang.org/grpc"
@@ -314,8 +315,12 @@ func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerU
314315
code := status.Code(err)
315316
switch code {
316317
case codes.NotFound:
317-
err = fmt.Errorf("volume %q or node %q could not be found: %v",
318-
req.ExternalID, req.NodeID, err)
318+
// we'll have validated the volume and node *should* exist at the
319+
// server, so if we get a not-found here it's because we've previously
320+
// checkpointed. we'll return an error so the caller can log it for
321+
// diagnostic purposes.
322+
err = fmt.Errorf("%w: volume %q or node %q could not be found: %v",
323+
structs.ErrCSIClientRPCIgnorable, req.ExternalID, req.NodeID, err)
319324
case codes.Internal:
320325
err = fmt.Errorf("controller plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
321326
}
@@ -558,7 +563,8 @@ func (c *client) NodeUnstageVolume(ctx context.Context, volumeID string, staging
558563
code := status.Code(err)
559564
switch code {
560565
case codes.NotFound:
561-
err = fmt.Errorf("volume %q could not be found: %v", volumeID, err)
566+
err = fmt.Errorf("%w: volume %q could not be found: %v",
567+
structs.ErrCSIClientRPCIgnorable, volumeID, err)
562568
case codes.Internal:
563569
err = fmt.Errorf("node plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
564570
}
@@ -630,7 +636,8 @@ func (c *client) NodeUnpublishVolume(ctx context.Context, volumeID, targetPath s
630636
code := status.Code(err)
631637
switch code {
632638
case codes.NotFound:
633-
err = fmt.Errorf("volume %q could not be found: %v", volumeID, err)
639+
err = fmt.Errorf("%w: volume %q could not be found: %v",
640+
structs.ErrCSIClientRPCIgnorable, volumeID, err)
634641
case codes.Internal:
635642
err = fmt.Errorf("node plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
636643
}

0 commit comments

Comments
 (0)