Skip to content

Commit 324702a

Browse files
committed
backport of commit c2dc1c5
plus MinJob() from nomad/mock/job.go
1 parent 6961e68 commit 324702a

File tree

11 files changed

+366
-34
lines changed

11 files changed

+366
-34
lines changed

.changelog/17104.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
client: clean up resources upon failure to restore task during client restart
3+
```

client/allocrunner/alloc_runner.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
log "github.com/hashicorp/go-hclog"
1010
multierror "github.com/hashicorp/go-multierror"
11+
1112
"github.com/hashicorp/nomad/client/allocdir"
1213
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
1314
"github.com/hashicorp/nomad/client/allocrunner/state"
@@ -344,17 +345,15 @@ func (ar *allocRunner) Run() {
344345
ar.logger.Error("prerun failed", "error", err)
345346

346347
for _, tr := range ar.tasks {
347-
tr.MarkFailedDead(fmt.Sprintf("failed to setup alloc: %v", err))
348+
// emit event and mark task to be cleaned up during runTasks()
349+
tr.MarkFailedKill(fmt.Sprintf("failed to setup alloc: %v", err))
348350
}
349-
350-
goto POST
351351
}
352352
}
353353

354354
// Run the runners (blocks until they exit)
355355
ar.runTasks()
356356

357-
POST:
358357
if ar.isShuttingDown() {
359358
return
360359
}

client/allocrunner/alloc_runner_hooks.go

+4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"time"
66

77
multierror "github.com/hashicorp/go-multierror"
8+
89
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
910
clientconfig "github.com/hashicorp/nomad/client/config"
1011
"github.com/hashicorp/nomad/client/taskenv"
@@ -135,6 +136,9 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
135136
newCSIHook(alloc, hookLogger, ar.csiManager, ar.rpcClient, ar, ar.hookResources, ar.clientConfig.Node.SecretID),
136137
newChecksHook(hookLogger, alloc, ar.checkStore, ar),
137138
}
139+
if config.ExtraAllocHooks != nil {
140+
ar.runnerHooks = append(ar.runnerHooks, config.ExtraAllocHooks...)
141+
}
138142

139143
return nil
140144
}

client/allocrunner/fail_hook.go

+118
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// Copyright (c) HashiCorp, Inc.
2+
// SPDX-License-Identifier: MPL-2.0
3+
4+
// FailHook is designed to fail for testing purposes,
5+
// so should never be included in a release.
6+
//go:build !release
7+
8+
package allocrunner
9+
10+
import (
11+
"errors"
12+
"fmt"
13+
"os"
14+
15+
"github.com/hashicorp/go-hclog"
16+
"github.com/hashicorp/hcl/v2/hclsimple"
17+
18+
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
19+
)
20+
21+
var ErrFailHookError = errors.New("failed successfully")
22+
23+
func NewFailHook(l hclog.Logger, name string) *FailHook {
24+
return &FailHook{
25+
name: name,
26+
logger: l.Named(name),
27+
}
28+
}
29+
30+
type FailHook struct {
31+
name string
32+
logger hclog.Logger
33+
Fail struct {
34+
Prerun bool `hcl:"prerun,optional"`
35+
PreKill bool `hcl:"prekill,optional"`
36+
Postrun bool `hcl:"postrun,optional"`
37+
Destroy bool `hcl:"destroy,optional"`
38+
Update bool `hcl:"update,optional"`
39+
PreTaskRestart bool `hcl:"pretaskrestart,optional"`
40+
Shutdown bool `hcl:"shutdown,optional"`
41+
}
42+
}
43+
44+
func (h *FailHook) Name() string {
45+
return h.name
46+
}
47+
48+
func (h *FailHook) LoadConfig(path string) *FailHook {
49+
if _, err := os.Stat(path); os.IsNotExist(err) {
50+
h.logger.Error("couldn't load config", "error", err)
51+
return h
52+
}
53+
if err := hclsimple.DecodeFile(path, nil, &h.Fail); err != nil {
54+
h.logger.Error("error parsing config", "path", path, "error", err)
55+
}
56+
return h
57+
}
58+
59+
var _ interfaces.RunnerPrerunHook = &FailHook{}
60+
61+
func (h *FailHook) Prerun() error {
62+
if h.Fail.Prerun {
63+
return fmt.Errorf("prerun %w", ErrFailHookError)
64+
}
65+
return nil
66+
}
67+
68+
var _ interfaces.RunnerPreKillHook = &FailHook{}
69+
70+
func (h *FailHook) PreKill() {
71+
if h.Fail.PreKill {
72+
h.logger.Error("prekill", "error", ErrFailHookError)
73+
}
74+
}
75+
76+
var _ interfaces.RunnerPostrunHook = &FailHook{}
77+
78+
func (h *FailHook) Postrun() error {
79+
if h.Fail.Postrun {
80+
return fmt.Errorf("postrun %w", ErrFailHookError)
81+
}
82+
return nil
83+
}
84+
85+
var _ interfaces.RunnerDestroyHook = &FailHook{}
86+
87+
func (h *FailHook) Destroy() error {
88+
if h.Fail.Destroy {
89+
return fmt.Errorf("destroy %w", ErrFailHookError)
90+
}
91+
return nil
92+
}
93+
94+
var _ interfaces.RunnerUpdateHook = &FailHook{}
95+
96+
func (h *FailHook) Update(request *interfaces.RunnerUpdateRequest) error {
97+
if h.Fail.Update {
98+
return fmt.Errorf("update %w", ErrFailHookError)
99+
}
100+
return nil
101+
}
102+
103+
var _ interfaces.RunnerTaskRestartHook = &FailHook{}
104+
105+
func (h *FailHook) PreTaskRestart() error {
106+
if h.Fail.PreTaskRestart {
107+
return fmt.Errorf("destroy %w", ErrFailHookError)
108+
}
109+
return nil
110+
}
111+
112+
var _ interfaces.ShutdownHook = &FailHook{}
113+
114+
func (h *FailHook) Shutdown() {
115+
if h.Fail.Shutdown {
116+
h.logger.Error("shutdown", "error", ErrFailHookError)
117+
}
118+
}

client/allocrunner/taskrunner/task_runner.go

+12-21
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ import (
88
"sync"
99
"time"
1010

11-
"github.com/hashicorp/nomad/client/lib/cgutil"
1211
"golang.org/x/exp/slices"
1312

1413
metrics "github.com/armon/go-metrics"
1514
log "github.com/hashicorp/go-hclog"
1615
multierror "github.com/hashicorp/go-multierror"
1716
"github.com/hashicorp/hcl/v2/hcldec"
17+
1818
"github.com/hashicorp/nomad/client/allocdir"
1919
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
2020
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/restarts"
@@ -24,6 +24,7 @@ import (
2424
"github.com/hashicorp/nomad/client/devicemanager"
2525
"github.com/hashicorp/nomad/client/dynamicplugins"
2626
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
27+
"github.com/hashicorp/nomad/client/lib/cgutil"
2728
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
2829
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
2930
"github.com/hashicorp/nomad/client/serviceregistration"
@@ -492,30 +493,20 @@ func (tr *TaskRunner) initLabels() {
492493
}
493494
}
494495

495-
// MarkFailedDead marks a task as failed and not to run. Aimed to be invoked
496-
// when alloc runner prestart hooks failed. Should never be called with Run().
497-
func (tr *TaskRunner) MarkFailedDead(reason string) {
498-
defer close(tr.waitCh)
499-
500-
tr.stateLock.Lock()
501-
if err := tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState); err != nil {
502-
//TODO Nomad will be unable to restore this task; try to kill
503-
// it now and fail? In general we prefer to leave running
504-
// tasks running even if the agent encounters an error.
505-
tr.logger.Warn("error persisting local failed task state; may be unable to restore after a Nomad restart",
506-
"error", err)
507-
}
508-
tr.stateLock.Unlock()
509-
496+
// MarkFailedKill marks a task as failed and should be killed.
497+
// It should be invoked when alloc runner prestart hooks fail.
498+
// Afterwards, Run() will perform any necessary cleanup.
499+
func (tr *TaskRunner) MarkFailedKill(reason string) {
500+
// Emit an event that fails the task and gives reasons for humans.
510501
event := structs.NewTaskEvent(structs.TaskSetupFailure).
502+
SetKillReason(structs.TaskRestoreFailed).
511503
SetDisplayMessage(reason).
512504
SetFailsTask()
513-
tr.UpdateState(structs.TaskStateDead, event)
505+
tr.EmitEvent(event)
514506

515-
// Run the stop hooks in case task was a restored task that failed prestart
516-
if err := tr.stop(); err != nil {
517-
tr.logger.Error("stop failed while marking task dead", "error", err)
518-
}
507+
// Cancel kill context, so later when allocRunner runs tr.Run(),
508+
// we'll follow the usual kill path and do all the appropriate cleanup steps.
509+
tr.killCtxCancel()
519510
}
520511

521512
// Run the TaskRunner. Starts the user's task or reattaches to a restored task.

client/allocrunner/taskrunner/task_runner_test.go

+61-3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ import (
1313
"time"
1414

1515
"github.com/golang/snappy"
16+
"github.com/kr/pretty"
17+
"github.com/shoenig/test"
18+
"github.com/shoenig/test/must"
19+
"github.com/stretchr/testify/assert"
20+
"github.com/stretchr/testify/require"
21+
1622
"github.com/hashicorp/nomad/ci"
1723
"github.com/hashicorp/nomad/client/allocdir"
1824
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
@@ -38,9 +44,6 @@ import (
3844
"github.com/hashicorp/nomad/plugins/device"
3945
"github.com/hashicorp/nomad/plugins/drivers"
4046
"github.com/hashicorp/nomad/testutil"
41-
"github.com/kr/pretty"
42-
"github.com/stretchr/testify/assert"
43-
"github.com/stretchr/testify/require"
4447
)
4548

4649
type MockTaskStateUpdater struct {
@@ -658,6 +661,61 @@ func TestTaskRunner_Restore_System(t *testing.T) {
658661
})
659662
}
660663

664+
// TestTaskRunner_MarkFailedKill asserts that MarkFailedKill marks the task as failed
665+
// and cancels the killCtx so a subsequent Run() will do any necessary task cleanup.
666+
func TestTaskRunner_MarkFailedKill(t *testing.T) {
667+
ci.Parallel(t)
668+
669+
// set up some taskrunner
670+
alloc := mock.MinAlloc()
671+
task := alloc.Job.TaskGroups[0].Tasks[0]
672+
conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
673+
t.Cleanup(cleanup)
674+
tr, err := NewTaskRunner(conf)
675+
must.NoError(t, err)
676+
677+
// side quest: set this lifecycle coordination channel,
678+
// so early in tr MAIN, it doesn't randomly follow that route.
679+
// test config creates this already closed, but not so in real life.
680+
startCh := make(chan struct{})
681+
t.Cleanup(func() { close(startCh) })
682+
tr.startConditionMetCh = startCh
683+
684+
// function under test: should mark the task as failed and cancel kill context
685+
reason := "because i said so"
686+
tr.MarkFailedKill(reason)
687+
688+
// explicitly check kill context.
689+
select {
690+
case <-tr.killCtx.Done():
691+
default:
692+
t.Fatal("kill context should be done")
693+
}
694+
695+
// Run() should now follow the kill path.
696+
go tr.Run()
697+
698+
select { // it should finish up very quickly
699+
case <-tr.WaitCh():
700+
case <-time.After(time.Second):
701+
t.Error("task not killed (or not as fast as expected)")
702+
}
703+
704+
// check state for expected values and events
705+
state := tr.TaskState()
706+
707+
// this gets set directly by MarkFailedKill()
708+
test.True(t, state.Failed, test.Sprint("task should have failed"))
709+
// this is set in Run()
710+
test.Eq(t, structs.TaskStateDead, state.State, test.Sprint("task should be dead"))
711+
// reason "because i said so" should be a task event message
712+
foundMessages := make(map[string]bool)
713+
for _, event := range state.Events {
714+
foundMessages[event.DisplayMessage] = true
715+
}
716+
test.True(t, foundMessages[reason], test.Sprintf("expected '%s' in events: %#v", reason, foundMessages))
717+
}
718+
661719
// TestTaskRunner_TaskEnv_Interpolated asserts driver configurations are
662720
// interpolated.
663721
func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) {

0 commit comments

Comments
 (0)