Skip to content

Commit b775a73

Browse files
authored
CSI: make gRPC client creation more robust (#12057)
Nomad communicates with CSI plugin tasks via gRPC. The plugin supervisor hook uses this to ping the plugin for health checks which it emits as task events. After the first successful health check the plugin supervisor registers the plugin in the client's dynamic plugin registry, which in turn creates a CSI plugin manager instance that has its own gRPC client for fingerprinting the plugin and sending mount requests. If the plugin manager instance fails to connect to the plugin on its first attempt, it exits. The plugin supervisor hook is unaware that connection failed so long as its own pings continue to work. A transient failure during plugin startup may mislead the plugin supervisor hook into thinking the plugin is up (so there's no need to restart the allocation) but no fingerprinter is started. * Refactors the gRPC client to connect on first use. This provides the plugin manager instance the ability to retry the gRPC client connection until success. * Add a 30s timeout to the plugin supervisor so that we don't poll forever waiting for a plugin that will never come back up. Minor improvements: * The plugin supervisor hook creates a new gRPC client for every probe and then throws it away. Instead, reuse the client as we do for the plugin manager. * The gRPC client constructor has a 1 second timeout. Clarify that this timeout applies to the connection and not the rest of the client lifetime.
1 parent 07f4227 commit b775a73

File tree

7 files changed

+201
-146
lines changed

7 files changed

+201
-146
lines changed

.changelog/12057.txt

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
```release-note:bug
2+
csi: Fixed a bug where the plugin instance manager would not retry the initial gRPC connection to plugins
3+
```
4+
5+
```release-note:bug
6+
csi: Fixed a bug where the plugin supervisor would not restart the task if it failed to connect to the plugin
7+
```

client/allocrunner/taskrunner/plugin_supervisor_hook.go

+53-37
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type csiPluginSupervisorHook struct {
3838

3939
// eventEmitter is used to emit events to the task
4040
eventEmitter ti.EventEmitter
41+
lifecycle ti.TaskLifecycle
4142

4243
shutdownCtx context.Context
4344
shutdownCancelFn context.CancelFunc
@@ -54,6 +55,7 @@ type csiPluginSupervisorHookConfig struct {
5455
clientStateDirPath string
5556
events ti.EventEmitter
5657
runner *TaskRunner
58+
lifecycle ti.TaskLifecycle
5759
capabilities *drivers.Capabilities
5860
logger hclog.Logger
5961
}
@@ -90,6 +92,7 @@ func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPlugi
9092
hook := &csiPluginSupervisorHook{
9193
alloc: config.runner.Alloc(),
9294
runner: config.runner,
95+
lifecycle: config.lifecycle,
9396
logger: config.logger,
9497
task: task,
9598
mountPoint: pluginRoot,
@@ -201,27 +204,41 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) {
201204
}()
202205

203206
socketPath := filepath.Join(h.mountPoint, structs.CSISocketName)
207+
208+
client := csi.NewClient(socketPath, h.logger.Named("csi_client").With(
209+
"plugin.name", h.task.CSIPluginConfig.ID,
210+
"plugin.type", h.task.CSIPluginConfig.Type))
211+
defer client.Close()
212+
204213
t := time.NewTimer(0)
205214

215+
// We're in Poststart at this point, so if we can't connect within
216+
// this deadline, assume it's broken so we can restart the task
217+
startCtx, startCancelFn := context.WithTimeout(ctx, 30*time.Second)
218+
defer startCancelFn()
219+
220+
var err error
221+
var pluginHealthy bool
222+
206223
// Step 1: Wait for the plugin to initially become available.
207224
WAITFORREADY:
208225
for {
209226
select {
210-
case <-ctx.Done():
227+
case <-startCtx.Done():
228+
h.kill(ctx, fmt.Errorf("CSI plugin failed probe: %v", err))
211229
return
212230
case <-t.C:
213-
pluginHealthy, err := h.supervisorLoopOnce(ctx, socketPath)
231+
pluginHealthy, err = h.supervisorLoopOnce(startCtx, client)
214232
if err != nil || !pluginHealthy {
215-
h.logger.Debug("CSI Plugin not ready", "error", err)
216-
217-
// Plugin is not yet returning healthy, because we want to optimise for
218-
// quickly bringing a plugin online, we use a short timeout here.
219-
// TODO(dani): Test with more plugins and adjust.
233+
h.logger.Debug("CSI plugin not ready", "error", err)
234+
// Use only a short delay here to optimize for quickly
235+
// bringing up a plugin
220236
t.Reset(5 * time.Second)
221237
continue
222238
}
223239

224240
// Mark the plugin as healthy in a task event
241+
h.logger.Debug("CSI plugin is ready")
225242
h.previousHealthState = pluginHealthy
226243
event := structs.NewTaskEvent(structs.TaskPluginHealthy)
227244
event.SetMessage(fmt.Sprintf("plugin: %s", h.task.CSIPluginConfig.ID))
@@ -232,15 +249,14 @@ WAITFORREADY:
232249
}
233250

234251
// Step 2: Register the plugin with the catalog.
235-
deregisterPluginFn, err := h.registerPlugin(socketPath)
252+
deregisterPluginFn, err := h.registerPlugin(client, socketPath)
236253
if err != nil {
237-
h.logger.Error("CSI Plugin registration failed", "error", err)
238-
event := structs.NewTaskEvent(structs.TaskPluginUnhealthy)
239-
event.SetMessage(fmt.Sprintf("failed to register plugin: %s, reason: %v", h.task.CSIPluginConfig.ID, err))
240-
h.eventEmitter.EmitEvent(event)
254+
h.kill(ctx, fmt.Errorf("CSI plugin failed to register: %v", err))
255+
return
241256
}
242257

243-
// Step 3: Start the lightweight supervisor loop.
258+
// Step 3: Start the lightweight supervisor loop. At this point, failures
259+
// don't cause the task to restart
244260
t.Reset(0)
245261
for {
246262
select {
@@ -249,9 +265,9 @@ WAITFORREADY:
249265
deregisterPluginFn()
250266
return
251267
case <-t.C:
252-
pluginHealthy, err := h.supervisorLoopOnce(ctx, socketPath)
268+
pluginHealthy, err := h.supervisorLoopOnce(ctx, client)
253269
if err != nil {
254-
h.logger.Error("CSI Plugin fingerprinting failed", "error", err)
270+
h.logger.Error("CSI plugin fingerprinting failed", "error", err)
255271
}
256272

257273
// The plugin has transitioned to a healthy state. Emit an event.
@@ -265,7 +281,7 @@ WAITFORREADY:
265281
if h.previousHealthState && !pluginHealthy {
266282
event := structs.NewTaskEvent(structs.TaskPluginUnhealthy)
267283
if err != nil {
268-
event.SetMessage(fmt.Sprintf("error: %v", err))
284+
event.SetMessage(fmt.Sprintf("Error: %v", err))
269285
} else {
270286
event.SetMessage("Unknown Reason")
271287
}
@@ -281,16 +297,9 @@ WAITFORREADY:
281297
}
282298
}
283299

284-
func (h *csiPluginSupervisorHook) registerPlugin(socketPath string) (func(), error) {
285-
300+
func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPath string) (func(), error) {
286301
// At this point we know the plugin is ready and we can fingerprint it
287302
// to get its vendor name and version
288-
client, err := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type))
289-
if err != nil {
290-
return nil, fmt.Errorf("failed to create csi client: %v", err)
291-
}
292-
defer client.Close()
293-
294303
info, err := client.PluginInfo()
295304
if err != nil {
296305
return nil, fmt.Errorf("failed to probe plugin: %v", err)
@@ -354,21 +363,13 @@ func (h *csiPluginSupervisorHook) registerPlugin(socketPath string) (func(), err
354363
}, nil
355364
}
356365

357-
func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, socketPath string) (bool, error) {
358-
_, err := os.Stat(socketPath)
359-
if err != nil {
360-
return false, fmt.Errorf("failed to stat socket: %v", err)
361-
}
366+
func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client csi.CSIPlugin) (bool, error) {
367+
probeCtx, probeCancelFn := context.WithTimeout(ctx, 5*time.Second)
368+
defer probeCancelFn()
362369

363-
client, err := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type))
370+
healthy, err := client.PluginProbe(probeCtx)
364371
if err != nil {
365-
return false, fmt.Errorf("failed to create csi client: %v", err)
366-
}
367-
defer client.Close()
368-
369-
healthy, err := client.PluginProbe(ctx)
370-
if err != nil {
371-
return false, fmt.Errorf("failed to probe plugin: %v", err)
372+
return false, err
372373
}
373374

374375
return healthy, nil
@@ -387,6 +388,21 @@ func (h *csiPluginSupervisorHook) Stop(_ context.Context, req *interfaces.TaskSt
387388
return nil
388389
}
389390

391+
func (h *csiPluginSupervisorHook) kill(ctx context.Context, reason error) {
392+
h.logger.Error("killing task because plugin failed", "error", reason)
393+
event := structs.NewTaskEvent(structs.TaskPluginUnhealthy)
394+
event.SetMessage(fmt.Sprintf("Error: %v", reason.Error()))
395+
h.eventEmitter.EmitEvent(event)
396+
397+
if err := h.lifecycle.Kill(ctx,
398+
structs.NewTaskEvent(structs.TaskKilling).
399+
SetFailsTask().
400+
SetDisplayMessage("CSI plugin did not become healthy before timeout"),
401+
); err != nil {
402+
h.logger.Error("failed to kill task", "kill_reason", reason, "error", err)
403+
}
404+
}
405+
390406
func ensureMountpointInserted(mounts []*drivers.MountConfig, mount *drivers.MountConfig) []*drivers.MountConfig {
391407
for _, mnt := range mounts {
392408
if mnt.IsEqual(mount) {

client/allocrunner/taskrunner/task_runner_hooks.go

+1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ func (tr *TaskRunner) initHooks() {
7676
clientStateDirPath: tr.clientConfig.StateDir,
7777
events: tr,
7878
runner: tr,
79+
lifecycle: tr,
7980
capabilities: tr.driverCapabilities,
8081
logger: hookLogger,
8182
}))

client/client.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -390,11 +390,11 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
390390
c.dynamicRegistry =
391391
dynamicplugins.NewRegistry(c.stateDB, map[string]dynamicplugins.PluginDispenser{
392392
dynamicplugins.PluginTypeCSIController: func(info *dynamicplugins.PluginInfo) (interface{}, error) {
393-
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "controller"))
393+
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "controller")), nil
394394
},
395395
dynamicplugins.PluginTypeCSINode: func(info *dynamicplugins.PluginInfo) (interface{}, error) {
396-
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "client"))
397-
}, // TODO(tgross): refactor these dispenser constructors into csimanager to tidy it up
396+
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "client")), nil
397+
},
398398
})
399399

400400
// Setup the clients RPC server

client/pluginmanager/csimanager/instance.go

+1-6
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,7 @@ func newInstanceManager(logger hclog.Logger, eventer TriggerNodeEvent, updater U
7373
}
7474

7575
func (i *instanceManager) run() {
76-
c, err := csi.NewClient(i.info.ConnectionInfo.SocketPath, i.logger)
77-
if err != nil {
78-
i.logger.Error("failed to setup instance manager client", "error", err)
79-
close(i.shutdownCh)
80-
return
81-
}
76+
c := csi.NewClient(i.info.ConnectionInfo.SocketPath, i.logger)
8277
i.client = c
8378
i.fp.client = c
8479

0 commit comments

Comments
 (0)