From 2dfa31859b59fee92e34eec0f2548a34a4c3724a Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 22 Aug 2019 11:59:12 -0400 Subject: [PATCH 1/4] client: move script checks into task runner In Nomad prior to Consul Connect, all Consul checks work the same except for Script checks. Because the Task being checked is running in its own container namespaces, the check is executed by Nomad in the Task's context. If the Script check passes, Nomad uses the TTL check feature of Consul to update the check status. This means in order to run a Script check, we need to know what Task to execute it in. To support Consul Connect, we need Group Services, and these need to be registered in Consul along with their checks. We could push the Service down into the Task, but this doesn't work if someone wants to associate a service with a task's ports, but do script checks in another task in the allocation. Because Nomad is handling the Script check and not Consul anyways, this moves the script check handling into the task runner so that the task runner can own the script check's configuration and lifecycle. This will allow us to pass the group service check configuration down into a task without associating the service itself with the task. --- .../taskrunner/script_check_hook.go | 347 ++++++++++++++++ .../taskrunner/script_check_hook_test.go | 213 ++++++++++ .../taskrunner/task_runner_hooks.go | 9 + client/allocrunner/taskrunner/tasklet.go | 161 ++++++++ client/allocrunner/taskrunner/tasklet_test.go | 270 ++++++++++++ client/consul/consul.go | 1 + client/consul/consul_testing.go | 11 +- command/agent/consul/client.go | 74 +--- command/agent/consul/script.go | 215 ---------- command/agent/consul/script_test.go | 309 -------------- command/agent/consul/unit_test.go | 386 ++++-------------- 11 files changed, 1109 insertions(+), 887 deletions(-) create mode 100644 client/allocrunner/taskrunner/script_check_hook.go create mode 100644 client/allocrunner/taskrunner/script_check_hook_test.go create mode 100644 client/allocrunner/taskrunner/tasklet.go create mode 100644 client/allocrunner/taskrunner/tasklet_test.go delete mode 100644 command/agent/consul/script.go delete mode 100644 command/agent/consul/script_test.go diff --git a/client/allocrunner/taskrunner/script_check_hook.go b/client/allocrunner/taskrunner/script_check_hook.go new file mode 100644 index 00000000000..dfe1d231e2d --- /dev/null +++ b/client/allocrunner/taskrunner/script_check_hook.go @@ -0,0 +1,347 @@ +package taskrunner + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/hashicorp/consul/api" + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + tinterfaces "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" + "github.com/hashicorp/nomad/client/consul" + "github.com/hashicorp/nomad/client/taskenv" + agentconsul "github.com/hashicorp/nomad/command/agent/consul" + "github.com/hashicorp/nomad/nomad/structs" +) + +var _ interfaces.TaskPoststartHook = &scriptCheckHook{} +var _ interfaces.TaskUpdateHook = &scriptCheckHook{} +var _ interfaces.TaskStopHook = &scriptCheckHook{} + +// default max amount of time to wait for all scripts on shutdown. +const defaultShutdownWait = time.Minute + +type scriptCheckHookConfig struct { + alloc *structs.Allocation + task *structs.Task + consul consul.ConsulServiceAPI + logger log.Logger + shutdownWait time.Duration +} + +// scriptCheckHook implements a task runner hook for running script +// checks in the context of a task +type scriptCheckHook struct { + consul consul.ConsulServiceAPI + allocID string + taskName string + logger log.Logger + shutdownWait time.Duration // max time to wait for scripts to shutdown + shutdownCh chan struct{} // closed when all scripts should shutdown + + // The following fields can be changed by Update() + driverExec tinterfaces.ScriptExecutor + taskEnv *taskenv.TaskEnv + + // These maintain state + scripts map[string]*scriptCheck + runningScripts map[string]*taskletHandle + + // Since Update() may be called concurrently with any other hook all + // hook methods must be fully serialized + mu sync.Mutex +} + +func newScriptCheckHook(c scriptCheckHookConfig) *scriptCheckHook { + scriptChecks := make(map[string]*scriptCheck) + for _, service := range c.task.Services { + for _, check := range service.Checks { + if check.Type != structs.ServiceCheckScript { + continue + } + sc := newScriptCheck(&scriptCheckConfig{ + allocID: c.alloc.ID, + taskName: c.task.Name, + check: check, + service: service, + agent: c.consul, + }) + scriptChecks[sc.id] = sc + } + } + + h := &scriptCheckHook{ + consul: c.consul, + allocID: c.alloc.ID, + taskName: c.task.Name, + scripts: scriptChecks, + runningScripts: make(map[string]*taskletHandle), + shutdownWait: defaultShutdownWait, + shutdownCh: make(chan struct{}), + } + + if c.shutdownWait != 0 { + h.shutdownWait = c.shutdownWait // override for testing + } + h.logger = c.logger.Named(h.Name()) + return h +} + +func (h *scriptCheckHook) Name() string { + return "script_checks" +} + +// PostStart implements interfaces.TaskPoststartHook. It adds the current +// task context (driver and env) to the script checks and starts up the +// scripts. +func (h *scriptCheckHook) Poststart(ctx context.Context, req *interfaces.TaskPoststartRequest, _ *interfaces.TaskPoststartResponse) error { + h.mu.Lock() + defer h.mu.Unlock() + + if req.DriverExec == nil { + return fmt.Errorf("driver doesn't support script checks") + } + + // Store the TaskEnv for interpolating now and when Updating + h.driverExec = req.DriverExec + h.taskEnv = req.TaskEnv + h.scripts = h.getTaskScriptChecks() + + // Handle starting scripts + for checkID, script := range h.scripts { + // If it's already running, cancel and replace + if oldScript, running := h.runningScripts[checkID]; running { + oldScript.cancel() + } + // Start and store the handle + h.runningScripts[checkID] = script.run() + } + return nil +} + +// Updated implements interfaces.TaskUpdateHook. It adds the current +// task context (driver and env) to the script checks and replaces any +// that have been changed. +func (h *scriptCheckHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequest, _ *interfaces.TaskUpdateResponse) error { + h.mu.Lock() + defer h.mu.Unlock() + + // Get current script checks with request's driver metadata as it + // can't change due to Updates + oldScriptChecks := h.getTaskScriptChecks() + + task := req.Alloc.LookupTask(h.taskName) + if task == nil { + return fmt.Errorf("task %q not found in updated alloc", h.taskName) + } + + // Update service hook fields + h.taskEnv = req.TaskEnv + + // Create new script checks struct with those new values + newScriptChecks := h.getTaskScriptChecks() + + // Handle starting scripts + for checkID, script := range newScriptChecks { + if _, ok := oldScriptChecks[checkID]; ok { + // If it's already running, cancel and replace + if oldScript, running := h.runningScripts[checkID]; running { + oldScript.cancel() + } + // Start and store the handle + h.runningScripts[checkID] = script.run() + } + } + + // Cancel scripts we no longer want + for checkID := range oldScriptChecks { + if _, ok := newScriptChecks[checkID]; !ok { + if oldScript, running := h.runningScripts[checkID]; running { + oldScript.cancel() + } + } + } + return nil +} + +// Stop implements interfaces.TaskStopHook and blocks waiting for running +// scripts to finish (or for the shutdownWait timeout to expire). +func (h *scriptCheckHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest, resp *interfaces.TaskStopResponse) error { + h.mu.Lock() + defer h.mu.Unlock() + close(h.shutdownCh) + deadline := time.After(h.shutdownWait) + err := fmt.Errorf("timed out waiting for script checks to exit") + for _, script := range h.runningScripts { + select { + case <-script.wait(): + case <-ctx.Done(): + // the caller is passing the background context, so + // we should never really see this outside of testing + case <-deadline: + // at this point the Consul client has been cleaned + // up so we don't want to hang onto this. + return err + } + } + return nil +} + +// getTaskScriptChecks returns an interpolated copy of services and checks with +// values from the task's environment. +func (h *scriptCheckHook) getTaskScriptChecks() map[string]*scriptCheck { + // Guard against not having a valid taskEnv. This can be the case if the + // PreKilling or Exited hook is run before Poststart. + if h.taskEnv == nil || h.driverExec == nil { + return nil + } + newChecks := make(map[string]*scriptCheck) + for _, orig := range h.scripts { + sc := orig.Copy() + sc.exec = h.driverExec + sc.logger = h.logger + sc.shutdownCh = h.shutdownCh + sc.callback = newScriptCheckCallback(sc) + sc.Command = h.taskEnv.ReplaceEnv(orig.Command) + sc.Args = h.taskEnv.ParseAndReplace(orig.Args) + newChecks[sc.id] = sc + } + return newChecks +} + +// heartbeater is the subset of consul agent functionality needed by script +// checks to heartbeat +type heartbeater interface { + UpdateTTL(id, output, status string) error +} + +// scriptCheck runs script checks via a interfaces.ScriptExecutor and updates the +// appropriate check's TTL when the script succeeds. +type scriptCheck struct { + id string + agent heartbeater + lastCheckOk bool // true if the last check was ok; otherwise false + tasklet +} + +// scriptCheckConfig is a parameter struct for newScriptCheck +type scriptCheckConfig struct { + allocID string + taskName string + service *structs.Service + check *structs.ServiceCheck + agent heartbeater +} + +// newScriptCheck constructs a scriptCheck. we're only going to +// configure the immutable fields of scriptCheck here, with the +// rest being configured during the Poststart hook so that we have +// the rest of the task execution environment +func newScriptCheck(config *scriptCheckConfig) *scriptCheck { + serviceID := agentconsul.MakeTaskServiceID( + config.allocID, config.taskName, config.service) + checkID := agentconsul.MakeCheckID(serviceID, config.check) + + sc := &scriptCheck{ + id: checkID, + agent: config.agent, + lastCheckOk: true, // start logging on first failure + } + // we can't use the promoted fields of tasklet in the struct literal + sc.allocID = config.allocID + sc.taskName = config.taskName + sc.Command = config.check.Command + sc.Args = config.check.Args + sc.Interval = config.check.Interval + sc.Timeout = config.check.Timeout + return sc +} + +func (sc *scriptCheck) Copy() *scriptCheck { + newSc := sc + return newSc +} + +// closes over the script check and returns the taskletCallback for +// when the script check executes. +func newScriptCheckCallback(s *scriptCheck) taskletCallback { + + return func(ctx context.Context, params execResult) { + output := params.output + code := params.code + err := params.err + + state := api.HealthCritical + switch code { + case 0: + state = api.HealthPassing + case 1: + state = api.HealthWarning + } + + var outputMsg string + if err != nil { + state = api.HealthCritical + outputMsg = err.Error() + } else { + outputMsg = string(output) + } + + // heartbeat the check to Consul + err = s.updateTTL(ctx, s.id, outputMsg, state) + select { + case <-ctx.Done(): + // check has been removed; don't report errors + return + default: + } + + if err != nil { + if s.lastCheckOk { + s.lastCheckOk = false + s.logger.Warn("updating check failed", "error", err) + } else { + s.logger.Debug("updating check still failing", "error", err) + } + + } else if !s.lastCheckOk { + // Succeeded for the first time or after failing; log + s.lastCheckOk = true + s.logger.Info("updating check succeeded") + } + } +} + +const ( + updateTTLBackoffBaseline = 1 * time.Second + updateTTLBackoffLimit = 3 * time.Second +) + +// updateTTL updates the state to Consul, performing an expontential backoff +// in the case where the check isn't registered in Consul to avoid a race between +// service registration and the first check. +func (s *scriptCheck) updateTTL(ctx context.Context, id, msg, state string) error { + for attempts := 0; ; attempts++ { + err := s.agent.UpdateTTL(id, msg, state) + if err == nil || + !strings.Contains(err.Error(), "does not have associated TTL") { + return err + } + + // Handle the retry case + backoff := (1 << (2 * uint64(attempts))) * updateTTLBackoffBaseline + if backoff > updateTTLBackoffLimit { + return err + } + + // Wait till retrying + select { + case <-ctx.Done(): + return err + case <-time.After(backoff): + } + } +} diff --git a/client/allocrunner/taskrunner/script_check_hook_test.go b/client/allocrunner/taskrunner/script_check_hook_test.go new file mode 100644 index 00000000000..c40c588667c --- /dev/null +++ b/client/allocrunner/taskrunner/script_check_hook_test.go @@ -0,0 +1,213 @@ +package taskrunner + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/hashicorp/consul/api" + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +func newScriptMock(hb heartbeater, exec interfaces.ScriptExecutor, logger hclog.Logger, interval, timeout time.Duration) *scriptCheck { + script := newScriptCheck(&scriptCheckConfig{ + allocID: "allocid", + taskName: "testtask", + agent: hb, + service: &structs.Service{Name: "xx"}, + check: &structs.ServiceCheck{}, + }) + script.exec = exec + script.logger = logger + script.Interval = interval + script.Timeout = timeout + script.callback = newScriptCheckCallback(script) + script.lastCheckOk = true + return script +} + +// fakeHeartbeater implements the heartbeater interface to allow mocking out +// Consul in script executor tests. +type fakeHeartbeater struct { + heartbeats chan heartbeat +} + +func (f *fakeHeartbeater) UpdateTTL(checkID, output, status string) error { + f.heartbeats <- heartbeat{checkID: checkID, output: output, status: status} + return nil +} + +func newFakeHeartbeater() *fakeHeartbeater { + return &fakeHeartbeater{heartbeats: make(chan heartbeat)} +} + +type heartbeat struct { + checkID string + output string + status string +} + +// TestScript_Exec_Cancel asserts cancelling a script check shortcircuits +// any running scripts. +func TestScript_Exec_Cancel(t *testing.T) { + exec, cancel := newBlockingScriptExec() + defer cancel() + + logger := testlog.HCLogger(t) + script := newScriptMock(nil, // heartbeater should never be called + exec, logger, time.Hour, time.Hour) + + handle := script.run() + <-exec.running // wait until Exec is called + handle.cancel() // cancel now that we're blocked in exec + + select { + case <-handle.wait(): + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to exit") + } + + // The underlying ScriptExecutor (newBlockScriptExec) *cannot* be + // canceled. Only a wrapper around it obeys the context cancelation. + require.NotEqual(t, atomic.LoadInt32(&exec.exited), 1, + "expected script executor to still be running after timeout") +} + +// TestScript_Exec_TimeoutBasic asserts a script will be killed when the +// timeout is reached. +func TestScript_Exec_TimeoutBasic(t *testing.T) { + t.Parallel() + exec, cancel := newBlockingScriptExec() + defer cancel() + + logger := testlog.HCLogger(t) + hb := newFakeHeartbeater() + script := newScriptMock(hb, exec, logger, time.Hour, time.Second) + + handle := script.run() + defer handle.cancel() // cleanup + <-exec.running // wait until Exec is called + + // Check for UpdateTTL call + select { + case update := <-hb.heartbeats: + require.Equal(t, update.output, context.DeadlineExceeded.Error()) + require.Equal(t, update.status, api.HealthCritical) + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to exit") + } + + // The underlying ScriptExecutor (newBlockScriptExec) *cannot* be + // canceled. Only a wrapper around it obeys the context cancelation. + require.NotEqual(t, atomic.LoadInt32(&exec.exited), 1, + "expected script executor to still be running after timeout") + + // Cancel and watch for exit + handle.cancel() + select { + case <-handle.wait(): // ok! + case update := <-hb.heartbeats: + t.Errorf("unexpected UpdateTTL call on exit with status=%q", update) + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to exit") + } +} + +// TestScript_Exec_TimeoutCritical asserts a script will be killed when +// the timeout is reached and always set a critical status regardless of what +// Exec returns. +func TestScript_Exec_TimeoutCritical(t *testing.T) { + t.Parallel() + logger := testlog.HCLogger(t) + hb := newFakeHeartbeater() + script := newScriptMock(hb, sleeperExec{}, logger, time.Hour, time.Nanosecond) + + handle := script.run() + defer handle.cancel() // cleanup + + // Check for UpdateTTL call + select { + case update := <-hb.heartbeats: + require.Equal(t, update.output, context.DeadlineExceeded.Error()) + require.Equal(t, update.status, api.HealthCritical) + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to timeout") + } +} + +// TestScript_Exec_Shutdown asserts a script will be executed once more +// when told to shutdown. +func TestScript_Exec_Shutdown(t *testing.T) { + shutdown := make(chan struct{}) + exec := newSimpleExec(0, nil) + logger := testlog.HCLogger(t) + hb := newFakeHeartbeater() + script := newScriptMock(hb, exec, logger, time.Hour, 3*time.Second) + script.shutdownCh = shutdown + + handle := script.run() + defer handle.cancel() // cleanup + close(shutdown) // tell scriptCheck to exit + + select { + case update := <-hb.heartbeats: + require.Equal(t, update.output, "code=0 err=") + require.Equal(t, update.status, api.HealthPassing) + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to exit") + } + + select { + case <-handle.wait(): // ok! + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to exit") + } +} + +func TestScript_Exec_Codes(t *testing.T) { + + exec := newScriptedExec([]execResult{ + {[]byte("output"), 1, nil}, + {[]byte("output"), 0, nil}, + {[]byte("output"), 0, context.DeadlineExceeded}, + {[]byte("output"), 0, nil}, + {[]byte(""), 2, fmt.Errorf("some error")}, + {[]byte("output"), 0, nil}, + {[]byte("error9000"), 9000, nil}, + }) + logger := testlog.HCLogger(t) + hb := newFakeHeartbeater() + script := newScriptMock( + hb, exec, logger, time.Nanosecond, 3*time.Second) + + handle := script.run() + defer handle.cancel() // cleanup + deadline := time.After(3 * time.Second) + + expected := []heartbeat{ + {script.id, "output", api.HealthWarning}, + {script.id, "output", api.HealthPassing}, + {script.id, context.DeadlineExceeded.Error(), api.HealthCritical}, + {script.id, "output", api.HealthPassing}, + {script.id, "some error", api.HealthCritical}, + {script.id, "output", api.HealthPassing}, + {script.id, "error9000", api.HealthCritical}, + } + + for i := 0; i <= 6; i++ { + select { + case update := <-hb.heartbeats: + require.Equal(t, update, expected[i], + "expected update %d to be '%s' but received '%s'", + i, expected[i], update) + case <-deadline: + t.Fatalf("timed out waiting for all script checks to finish") + } + } +} diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 9083e9ecf69..25d1b59bccd 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -105,6 +105,15 @@ func (tr *TaskRunner) initHooks() { logger: hookLogger, })) } + + // If there are any script checks, add the hook + scriptCheckHook := newScriptCheckHook(scriptCheckHookConfig{ + alloc: tr.Alloc(), + task: tr.Task(), + consul: tr.consulClient, + logger: hookLogger, + }) + tr.runnerHooks = append(tr.runnerHooks, scriptCheckHook) } func (tr *TaskRunner) emitHookError(err error, hookName string) { diff --git a/client/allocrunner/taskrunner/tasklet.go b/client/allocrunner/taskrunner/tasklet.go new file mode 100644 index 00000000000..24a834a1064 --- /dev/null +++ b/client/allocrunner/taskrunner/tasklet.go @@ -0,0 +1,161 @@ +package taskrunner + +import ( + "context" + "time" + + metrics "github.com/armon/go-metrics" + log "github.com/hashicorp/go-hclog" + + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" +) + +// contextExec allows canceling a interfaces.ScriptExecutor with a context. +type contextExec struct { + // pctx is the parent context. A subcontext will be created with Exec's + // timeout. + pctx context.Context + + // exec to be wrapped in a context + exec interfaces.ScriptExecutor +} + +func newContextExec(ctx context.Context, exec interfaces.ScriptExecutor) *contextExec { + return &contextExec{ + pctx: ctx, + exec: exec, + } +} + +// execResult are the outputs of an Exec +type execResult struct { + output []byte + code int + err error +} + +// Exec a command until the timeout expires, the context is canceled, or the +// underlying Exec returns. +func (c *contextExec) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) { + resCh := make(chan execResult, 1) + + // Don't trust the underlying implementation to obey timeout + ctx, cancel := context.WithTimeout(c.pctx, timeout) + defer cancel() + + go func() { + output, code, err := c.exec.Exec(timeout, cmd, args) + select { + case resCh <- execResult{output, code, err}: + case <-ctx.Done(): + } + }() + + select { + case res := <-resCh: + return res.output, res.code, res.err + case <-ctx.Done(): + return nil, 0, ctx.Err() + } +} + +// tasklet is an abstraction around periodically running a script within +// the context of a Task. The interfaces.ScriptExecutor is fired at least +// once and on each interval, and fires a callback whenever the script +// is complete. +type tasklet struct { + allocID string + taskName string + Command string // Command is the command to run for tasklet + Args []string // Args is a list of arguments for tasklet + Interval time.Duration // Interval of the tasklet + Timeout time.Duration // Timeout of the tasklet + exec interfaces.ScriptExecutor + callback taskletCallback + logger log.Logger + shutdownCh <-chan struct{} +} + +// taskletHandle is returned by tasklet.run by cancelling a tasklet and +// waiting for it to shutdown. +type taskletHandle struct { + // cancel the script + cancel func() + exitCh chan struct{} +} + +// wait returns a chan that's closed when the tasklet exits +func (t taskletHandle) wait() <-chan struct{} { + return t.exitCh +} + +// taskletCallback is called with a cancellation context and the output of a +// tasklet's Exec whenever it runs. +type taskletCallback func(context.Context, execResult) + +// run this tasklet check and return its cancel func. The tasklet's +// callback will be called each time it completes. If the shutdownCh is +// closed the check will be run once more before exiting. +func (t *tasklet) run() *taskletHandle { + ctx, cancel := context.WithCancel(context.Background()) + exitCh := make(chan struct{}) + + // Wrap the original interfaces.ScriptExecutor in one that obeys context + // cancelation. + ctxExec := newContextExec(ctx, t.exec) + + go func() { + defer close(exitCh) + timer := time.NewTimer(0) + defer timer.Stop() + for { + // Block until tasklet is removed, Nomad is shutting + // down, or the tasklet interval is up + select { + case <-ctx.Done(): + // tasklet has been removed + return + case <-t.shutdownCh: + // unblock but don't exit until after we run once more + case <-timer.C: + timer.Reset(t.Interval) + } + + metrics.IncrCounter([]string{ + "client", "allocrunner", "taskrunner", "tasklet_runs"}, 1) + + // Execute check script with timeout + t.logger.Trace("tasklet executing", + "allocID", t.allocID, "task", t.taskName) + output, code, err := ctxExec.Exec(t.Timeout, t.Command, t.Args) + switch err { + case context.Canceled: + // check removed during execution; exit + return + case context.DeadlineExceeded: + metrics.IncrCounter([]string{ + "client", "allocrunner", "taskrunner", + "tasklet_timeouts"}, 1) + // If no error was returned, set one to make sure the tasklet + // is marked as failed + if err == nil { + err = context.DeadlineExceeded + } + + // Log deadline exceeded every time as it's a + // distinct issue from the tasklet returning failure + t.logger.Warn("tasklet timed out", "timeout", t.Timeout) + } + + t.callback(ctx, execResult{output, code, err}) + + select { + case <-t.shutdownCh: + // We've been told to exit and just ran so exit + return + default: + } + } + }() + return &taskletHandle{cancel: cancel, exitCh: exitCh} +} diff --git a/client/allocrunner/taskrunner/tasklet_test.go b/client/allocrunner/taskrunner/tasklet_test.go new file mode 100644 index 00000000000..93a217d8499 --- /dev/null +++ b/client/allocrunner/taskrunner/tasklet_test.go @@ -0,0 +1,270 @@ +package taskrunner + +import ( + "context" + "fmt" + "os" + "os/exec" + "sync/atomic" + "testing" + "time" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/testtask" + "github.com/stretchr/testify/assert" +) + +func TestMain(m *testing.M) { + if !testtask.Run() { + os.Exit(m.Run()) + } +} + +func TestTasklet_Exec_HappyPath(t *testing.T) { + results := []execResult{ + {[]byte("output"), 0, nil}, + {[]byte("output"), 1, nil}, + {[]byte("output"), 0, context.DeadlineExceeded}, + {[]byte(""), 2, fmt.Errorf("some error")}, + {[]byte("error9000"), 9000, nil}, + } + exec := newScriptedExec(results) + tm := newTaskletMock(exec, testlog.HCLogger(t), time.Nanosecond, 3*time.Second) + + handle := tm.run() + defer handle.cancel() // just-in-case cleanup + + deadline := time.After(3 * time.Second) + for i := 0; i <= 4; i++ { + select { + case result := <-tm.calls: + // for the happy path without cancelations or shutdowns, we expect + // to get the results passed to the callback in order and without + // modification + assert.Equal(t, result, results[i]) + case <-deadline: + t.Fatalf("timed out waiting for all script checks to finish") + } + } +} + +// TestTasklet_Exec_Cancel asserts cancelling a tasklet short-circuits +// any running executions the tasklet +func TestTasklet_Exec_Cancel(t *testing.T) { + exec, cancel := newBlockingScriptExec() + defer cancel() + tm := newTaskletMock(exec, testlog.HCLogger(t), time.Hour, time.Hour) + + handle := tm.run() + <-exec.running // wait until Exec is called + handle.cancel() // cancel now that we're blocked in exec + + select { + case <-handle.wait(): + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for tasklet check to exit") + } + + // The underlying ScriptExecutor (newBlockScriptExec) *cannot* be + // canceled. Only a wrapper around it obeys the context cancelation. + if atomic.LoadInt32(&exec.exited) == 1 { + t.Errorf("expected script executor to still be running after timeout") + } + // No tasklets finished, so no callbacks should have gotten a + // chance to fire + select { + case call := <-tm.calls: + t.Errorf("expected 0 calls of tasklet, got %v", call) + default: + break + } +} + +// TestTasklet_Exec_Timeout asserts a tasklet script will be killed +// when the timeout is reached. +func TestTasklet_Exec_Timeout(t *testing.T) { + t.Parallel() + exec, cancel := newBlockingScriptExec() + defer cancel() + + tm := newTaskletMock(exec, testlog.HCLogger(t), time.Hour, time.Second) + + handle := tm.run() + defer handle.cancel() // just-in-case cleanup + <-exec.running // wait until Exec is called + + // We should get a timeout + select { + case update := <-tm.calls: + if update.err != context.DeadlineExceeded { + t.Errorf("expected context.DeadlineExceeed but received %+v", update) + } + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to exit") + } + + // The underlying ScriptExecutor (newBlockScriptExec) *cannot* be + // canceled. Only a wrapper around it obeys the context cancelation. + if atomic.LoadInt32(&exec.exited) == 1 { + t.Errorf("expected executor to still be running after timeout") + } + + // Cancel and watch for exit + handle.cancel() + select { + case <-handle.wait(): // ok! + case update := <-tm.calls: + t.Errorf("unexpected extra callback on exit with status=%v", update) + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for tasklet to exit") + } +} + +// TestTasklet_Exec_Shutdown asserts a script will be executed once more +// when told to shutdown. +func TestTasklet_Exec_Shutdown(t *testing.T) { + exec := newSimpleExec(0, nil) + shutdown := make(chan struct{}) + tm := newTaskletMock(exec, testlog.HCLogger(t), time.Hour, 3*time.Second) + tm.shutdownCh = shutdown + handle := tm.run() + + defer handle.cancel() // just-in-case cleanup + close(shutdown) // tell script to exit + + select { + case update := <-tm.calls: + if update.err != nil { + t.Errorf("expected clean shutdown but received %q", update.err) + } + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to exit") + } + + select { + case <-handle.wait(): // ok + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to exit") + } +} + +// test helpers + +type taskletMock struct { + tasklet + calls chan execResult +} + +func newTaskletMock(exec interfaces.ScriptExecutor, logger hclog.Logger, interval, timeout time.Duration) *taskletMock { + tm := &taskletMock{calls: make(chan execResult)} + tm.allocID = "allocid" + tm.taskName = "testtask" + tm.exec = exec + tm.logger = logger + tm.Interval = interval + tm.Timeout = timeout + tm.callback = func(ctx context.Context, params execResult) { + tm.calls <- params + } + return tm +} + +// blockingScriptExec implements ScriptExec by running a subcommand that never +// exits. +type blockingScriptExec struct { + // pctx is canceled *only* for test cleanup. Just like real + // ScriptExecutors its Exec method cannot be canceled directly -- only + // with a timeout. + pctx context.Context + + // running is ticked before blocking to allow synchronizing operations + running chan struct{} + + // set to 1 with atomics if Exec is called and has exited + exited int32 +} + +// newBlockingScriptExec returns a ScriptExecutor that blocks Exec() until the +// caller recvs on the b.running chan. It also returns a CancelFunc for test +// cleanup only. The runtime cannot cancel ScriptExecutors before their timeout +// expires. +func newBlockingScriptExec() (*blockingScriptExec, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + exec := &blockingScriptExec{ + pctx: ctx, + running: make(chan struct{}), + } + return exec, cancel +} + +func (b *blockingScriptExec) Exec(dur time.Duration, _ string, _ []string) ([]byte, int, error) { + b.running <- struct{}{} + ctx, cancel := context.WithTimeout(b.pctx, dur) + defer cancel() + cmd := exec.CommandContext(ctx, testtask.Path(), "sleep", "9000h") + testtask.SetCmdEnv(cmd) + err := cmd.Run() + code := 0 + if exitErr, ok := err.(*exec.ExitError); ok { + if !exitErr.Success() { + code = 1 + } + } + atomic.StoreInt32(&b.exited, 1) + return []byte{}, code, err +} + +// sleeperExec sleeps for 100ms but returns successfully to allow testing timeout conditions +type sleeperExec struct{} + +func (sleeperExec) Exec(time.Duration, string, []string) ([]byte, int, error) { + time.Sleep(100 * time.Millisecond) + return []byte{}, 0, nil +} + +// simpleExec is a fake ScriptExecutor that returns whatever is specified. +type simpleExec struct { + code int + err error +} + +func (s simpleExec) Exec(time.Duration, string, []string) ([]byte, int, error) { + return []byte(fmt.Sprintf("code=%d err=%v", s.code, s.err)), s.code, s.err +} + +// newSimpleExec creates a new ScriptExecutor that returns the given code and err. +func newSimpleExec(code int, err error) simpleExec { + return simpleExec{code: code, err: err} +} + +// scriptedExec is a fake ScriptExecutor with a predetermined sequence +// of results. +type scriptedExec struct { + fn func() ([]byte, int, error) +} + +// For each call to Exec, scriptedExec returns the next result in its +// sequence of results +func (s scriptedExec) Exec(time.Duration, string, []string) ([]byte, int, error) { + return s.fn() +} + +func newScriptedExec(results []execResult) scriptedExec { + index := 0 + s := scriptedExec{} + // we have to close over the index because the interface we're + // mocking expects a value and not a pointer, which prevents + // us from updating the index + fn := func() ([]byte, int, error) { + result := results[index] + // prevents us from iterating off the end of the results + if index+1 < len(results) { + index = index + 1 + } + return result.output, result.code, result.err + } + s.fn = fn + return s +} diff --git a/client/consul/consul.go b/client/consul/consul.go index a789c4c34f5..df1c455cfe9 100644 --- a/client/consul/consul.go +++ b/client/consul/consul.go @@ -15,4 +15,5 @@ type ConsulServiceAPI interface { RemoveTask(*consul.TaskServices) UpdateTask(old, newTask *consul.TaskServices) error AllocRegistrations(allocID string) (*consul.AllocRegistration, error) + UpdateTTL(id, output, status string) error } diff --git a/client/consul/consul_testing.go b/client/consul/consul_testing.go index e38cfc6c17c..ce276b02a90 100644 --- a/client/consul/consul_testing.go +++ b/client/consul/consul_testing.go @@ -21,7 +21,7 @@ type MockConsulOp struct { func NewMockConsulOp(op, allocID, name string) MockConsulOp { switch op { case "add", "remove", "update", "alloc_registrations", - "add_group", "remove_group", "update_group": + "add_group", "remove_group", "update_group", "update_ttl": default: panic(fmt.Errorf("invalid consul op: %s", op)) } @@ -123,6 +123,15 @@ func (m *MockConsulServiceClient) AllocRegistrations(allocID string) (*consul.Al return nil, nil } +func (m *MockConsulServiceClient) UpdateTTL(checkID, output, status string) error { + // TODO(tgross): this method is here so we can implement the + // interface but the locking we need for testing creates a lot + // of opportunities for deadlocks in testing that will never + // appear in live code. + m.logger.Trace("UpdateTTL", "check_id", checkID, "status", status) + return nil +} + func (m *MockConsulServiceClient) GetOps() []MockConsulOp { m.mu.Lock() defer m.mu.Unlock() diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 7c8ca34d939..fc34eacee26 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -105,10 +105,8 @@ func agentServiceUpdateRequired(reg *api.AgentServiceRegistration, svc *api.Agen // operations are submitted to the main loop via commit() for synchronizing // with Consul. type operations struct { - regServices []*api.AgentServiceRegistration - regChecks []*api.AgentCheckRegistration - scripts []*scriptCheck - + regServices []*api.AgentServiceRegistration + regChecks []*api.AgentCheckRegistration deregServices []string deregChecks []string } @@ -230,10 +228,8 @@ type ServiceClient struct { opCh chan *operations - services map[string]*api.AgentServiceRegistration - checks map[string]*api.AgentCheckRegistration - scripts map[string]*scriptCheck - runningScripts map[string]*scriptHandle + services map[string]*api.AgentServiceRegistration + checks map[string]*api.AgentCheckRegistration explicitlyDeregisteredServices map[string]bool explicitlyDeregisteredChecks map[string]bool @@ -284,8 +280,6 @@ func NewServiceClient(consulClient AgentAPI, logger log.Logger, isNomadClient bo opCh: make(chan *operations, 8), services: make(map[string]*api.AgentServiceRegistration), checks: make(map[string]*api.AgentCheckRegistration), - scripts: make(map[string]*scriptCheck), - runningScripts: make(map[string]*scriptHandle), explicitlyDeregisteredServices: make(map[string]bool), explicitlyDeregisteredChecks: make(map[string]bool), allocRegistrations: make(map[string]*AllocRegistration), @@ -439,25 +433,16 @@ func (c *ServiceClient) merge(ops *operations) { for _, check := range ops.regChecks { c.checks[check.ID] = check } - for _, s := range ops.scripts { - c.scripts[s.id] = s - } for _, sid := range ops.deregServices { delete(c.services, sid) c.explicitlyDeregisteredServices[sid] = true } for _, cid := range ops.deregChecks { - if script, ok := c.runningScripts[cid]; ok { - script.cancel() - delete(c.scripts, cid) - delete(c.runningScripts, cid) - } delete(c.checks, cid) c.explicitlyDeregisteredChecks[cid] = true } metrics.SetGauge([]string{"client", "consul", "services"}, float32(len(c.services))) metrics.SetGauge([]string{"client", "consul", "checks"}, float32(len(c.checks))) - metrics.SetGauge([]string{"client", "consul", "script_checks"}, float32(len(c.runningScripts))) } // sync enqueued operations. @@ -593,16 +578,6 @@ func (c *ServiceClient) sync() error { } creg++ metrics.IncrCounter([]string{"client", "consul", "check_registrations"}, 1) - - // Handle starting scripts - if script, ok := c.scripts[id]; ok { - // If it's already running, cancel and replace - if oldScript, running := c.runningScripts[id]; running { - oldScript.cancel() - } - // Start and store the handle - c.runningScripts[id] = script.run() - } } // Only log if something was actually synced @@ -649,7 +624,7 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) ops.regServices = append(ops.regServices, serviceReg) for _, check := range service.Checks { - checkID := makeCheckID(id, check) + checkID := MakeCheckID(id, check) if check.Type == structs.ServiceCheckScript { return fmt.Errorf("service %q contains invalid check: agent checks do not support scripts", service.Name) } @@ -782,17 +757,9 @@ func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *st checkIDs := make([]string, 0, numChecks) for _, check := range service.Checks { - checkID := makeCheckID(serviceID, check) + checkID := MakeCheckID(serviceID, check) checkIDs = append(checkIDs, checkID) if check.Type == structs.ServiceCheckScript { - if task.DriverExec == nil { - return nil, fmt.Errorf("driver doesn't support script checks") - } - - sc := newScriptCheck(task.AllocID, task.Name, checkID, check, task.DriverExec, - c.client, c.logger, c.shutdownCh) - ops.scripts = append(ops.scripts, sc) - // Skip getAddress for script checks checkReg, err := createCheckReg(serviceID, checkID, check, "", 0) if err != nil { @@ -977,7 +944,7 @@ func (c *ServiceClient) RegisterTask(task *TaskServices) error { serviceID := MakeTaskServiceID(task.AllocID, task.Name, service) for _, check := range service.Checks { if check.TriggersRestarts() { - checkID := makeCheckID(serviceID, check) + checkID := MakeCheckID(serviceID, check) c.checkWatcher.Watch(task.AllocID, task.Name, checkID, check, task.Restarter) } } @@ -1012,7 +979,7 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error { // Existing service entry removed ops.deregServices = append(ops.deregServices, existingID) for _, check := range existingSvc.Checks { - cid := makeCheckID(existingID, check) + cid := MakeCheckID(existingID, check) ops.deregChecks = append(ops.deregChecks, cid) // Unwatch watched checks @@ -1040,12 +1007,12 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error { // See if any checks were updated existingChecks := make(map[string]*structs.ServiceCheck, len(existingSvc.Checks)) for _, check := range existingSvc.Checks { - existingChecks[makeCheckID(existingID, check)] = check + existingChecks[MakeCheckID(existingID, check)] = check } // Register new checks for _, check := range newSvc.Checks { - checkID := makeCheckID(existingID, check) + checkID := MakeCheckID(existingID, check) if _, exists := existingChecks[checkID]; exists { // Check is still required. Remove it from the map so it doesn't get // deleted later. @@ -1101,7 +1068,7 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error { serviceID := MakeTaskServiceID(newTask.AllocID, newTask.Name, service) for _, check := range service.Checks { if check.TriggersRestarts() { - checkID := makeCheckID(serviceID, check) + checkID := MakeCheckID(serviceID, check) c.checkWatcher.Watch(newTask.AllocID, newTask.Name, checkID, check, newTask.Restarter) } } @@ -1120,7 +1087,7 @@ func (c *ServiceClient) RemoveTask(task *TaskServices) { ops.deregServices = append(ops.deregServices, id) for _, check := range service.Checks { - cid := makeCheckID(id, check) + cid := MakeCheckID(id, check) ops.deregChecks = append(ops.deregChecks, cid) if check.TriggersRestarts() { @@ -1177,6 +1144,11 @@ func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration, return reg, nil } +// TODO(tgross): make sure this is properly nil-checked, etc. +func (c *ServiceClient) UpdateTTL(id, output, status string) error { + return c.client.UpdateTTL(id, output, status) +} + // Shutdown the Consul client. Update running task registrations and deregister // agent from Consul. On first call blocks up to shutdownWait before giving up // on syncing operations. @@ -1220,14 +1192,6 @@ func (c *ServiceClient) Shutdown() error { } } - // Give script checks time to exit (no need to lock as Run() has exited) - for _, h := range c.runningScripts { - select { - case <-h.wait(): - case <-deadline: - return fmt.Errorf("timed out waiting for script checks to run") - } - } return nil } @@ -1285,10 +1249,10 @@ func MakeTaskServiceID(allocID, taskName string, service *structs.Service) strin return fmt.Sprintf("%s%s-%s-%s-%s", nomadTaskPrefix, allocID, taskName, service.Name, service.PortLabel) } -// makeCheckID creates a unique ID for a check. +// MakeCheckID creates a unique ID for a check. // // Example Check ID: _nomad-check-434ae42f9a57c5705344974ac38de2aee0ee089d -func makeCheckID(serviceID string, check *structs.ServiceCheck) string { +func MakeCheckID(serviceID string, check *structs.ServiceCheck) string { return fmt.Sprintf("%s%s", nomadCheckPrefix, check.Hash(serviceID)) } diff --git a/command/agent/consul/script.go b/command/agent/consul/script.go deleted file mode 100644 index b4a99c84761..00000000000 --- a/command/agent/consul/script.go +++ /dev/null @@ -1,215 +0,0 @@ -package consul - -import ( - "context" - "time" - - metrics "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" - - "github.com/hashicorp/consul/api" - "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" - "github.com/hashicorp/nomad/nomad/structs" -) - -// heartbeater is the subset of consul agent functionality needed by script -// checks to heartbeat -type heartbeater interface { - UpdateTTL(id, output, status string) error -} - -// contextExec allows canceling a ScriptExecutor with a context. -type contextExec struct { - // pctx is the parent context. A subcontext will be created with Exec's - // timeout. - pctx context.Context - - // exec to be wrapped in a context - exec interfaces.ScriptExecutor -} - -func newContextExec(ctx context.Context, exec interfaces.ScriptExecutor) *contextExec { - return &contextExec{ - pctx: ctx, - exec: exec, - } -} - -type execResult struct { - buf []byte - code int - err error -} - -// Exec a command until the timeout expires, the context is canceled, or the -// underlying Exec returns. -func (c *contextExec) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) { - resCh := make(chan execResult, 1) - - // Don't trust the underlying implementation to obey timeout - ctx, cancel := context.WithTimeout(c.pctx, timeout) - defer cancel() - - go func() { - output, code, err := c.exec.Exec(timeout, cmd, args) - select { - case resCh <- execResult{output, code, err}: - case <-ctx.Done(): - } - }() - - select { - case res := <-resCh: - return res.buf, res.code, res.err - case <-ctx.Done(): - return nil, 0, ctx.Err() - } -} - -// scriptHandle is returned by scriptCheck.run by cancelling a scriptCheck and -// waiting for it to shutdown. -type scriptHandle struct { - // cancel the script - cancel func() - exitCh chan struct{} -} - -// wait returns a chan that's closed when the script exits -func (s *scriptHandle) wait() <-chan struct{} { - return s.exitCh -} - -// scriptCheck runs script checks via a ScriptExecutor and updates the -// appropriate check's TTL when the script succeeds. -type scriptCheck struct { - allocID string - taskName string - - id string - check *structs.ServiceCheck - exec interfaces.ScriptExecutor - agent heartbeater - - // lastCheckOk is true if the last check was ok; otherwise false - lastCheckOk bool - - logger log.Logger - shutdownCh <-chan struct{} -} - -// newScriptCheck creates a new scriptCheck. run() should be called once the -// initial check is registered with Consul. -func newScriptCheck(allocID, taskName, checkID string, check *structs.ServiceCheck, - exec interfaces.ScriptExecutor, agent heartbeater, logger log.Logger, - shutdownCh <-chan struct{}) *scriptCheck { - - logger = logger.ResetNamed("consul.checks").With("task", taskName, "alloc_id", allocID, "check", check.Name) - return &scriptCheck{ - allocID: allocID, - taskName: taskName, - id: checkID, - check: check, - exec: exec, - agent: agent, - lastCheckOk: true, // start logging on first failure - logger: logger, - shutdownCh: shutdownCh, - } -} - -// run this script check and return its cancel func. If the shutdownCh is -// closed the check will be run once more before exiting. -func (s *scriptCheck) run() *scriptHandle { - ctx, cancel := context.WithCancel(context.Background()) - exitCh := make(chan struct{}) - - // Wrap the original ScriptExecutor in one that obeys context - // cancelation. - ctxExec := newContextExec(ctx, s.exec) - - go func() { - defer close(exitCh) - timer := time.NewTimer(0) - defer timer.Stop() - for { - // Block until check is removed, Nomad is shutting - // down, or the check interval is up - select { - case <-ctx.Done(): - // check has been removed - return - case <-s.shutdownCh: - // unblock but don't exit until after we heartbeat once more - case <-timer.C: - timer.Reset(s.check.Interval) - } - metrics.IncrCounter([]string{"client", "consul", "script_runs"}, 1) - - // Execute check script with timeout - output, code, err := ctxExec.Exec(s.check.Timeout, s.check.Command, s.check.Args) - switch err { - case context.Canceled: - // check removed during execution; exit - return - case context.DeadlineExceeded: - metrics.IncrCounter([]string{"client", "consul", "script_timeouts"}, 1) - // If no error was returned, set one to make sure the task goes critical - if err == nil { - err = context.DeadlineExceeded - } - - // Log deadline exceeded every time as it's a - // distinct issue from checks returning - // failures - s.logger.Warn("check timed out", "timeout", s.check.Timeout) - } - - state := api.HealthCritical - switch code { - case 0: - state = api.HealthPassing - case 1: - state = api.HealthWarning - } - - var outputMsg string - if err != nil { - state = api.HealthCritical - outputMsg = err.Error() - } else { - outputMsg = string(output) - } - - // Actually heartbeat the check - err = s.agent.UpdateTTL(s.id, outputMsg, state) - select { - case <-ctx.Done(): - // check has been removed; don't report errors - return - default: - } - - if err != nil { - if s.lastCheckOk { - s.lastCheckOk = false - s.logger.Warn("updating check failed", "error", err) - } else { - s.logger.Debug("updating check still failing", "error", err) - } - - } else if !s.lastCheckOk { - // Succeeded for the first time or after failing; log - s.lastCheckOk = true - s.logger.Info("updating check succeeded") - } - - select { - case <-s.shutdownCh: - // We've been told to exit and just heartbeated so exit - return - default: - } - } - }() - return &scriptHandle{cancel: cancel, exitCh: exitCh} -} diff --git a/command/agent/consul/script_test.go b/command/agent/consul/script_test.go deleted file mode 100644 index 25b6329b308..00000000000 --- a/command/agent/consul/script_test.go +++ /dev/null @@ -1,309 +0,0 @@ -package consul - -import ( - "context" - "fmt" - "os" - "os/exec" - "sync/atomic" - "testing" - "time" - - "github.com/hashicorp/consul/api" - "github.com/hashicorp/nomad/helper/testlog" - "github.com/hashicorp/nomad/helper/testtask" - "github.com/hashicorp/nomad/nomad/structs" -) - -func TestMain(m *testing.M) { - if !testtask.Run() { - os.Exit(m.Run()) - } -} - -// blockingScriptExec implements ScriptExec by running a subcommand that never -// exits. -type blockingScriptExec struct { - // pctx is canceled *only* for test cleanup. Just like real - // ScriptExecutors its Exec method cannot be canceled directly -- only - // with a timeout. - pctx context.Context - - // running is ticked before blocking to allow synchronizing operations - running chan struct{} - - // set to 1 with atomics if Exec is called and has exited - exited int32 -} - -// newBlockingScriptExec returns a ScriptExecutor that blocks Exec() until the -// caller recvs on the b.running chan. It also returns a CancelFunc for test -// cleanup only. The runtime cannot cancel ScriptExecutors before their timeout -// expires. -func newBlockingScriptExec() (*blockingScriptExec, context.CancelFunc) { - ctx, cancel := context.WithCancel(context.Background()) - exec := &blockingScriptExec{ - pctx: ctx, - running: make(chan struct{}), - } - return exec, cancel -} - -func (b *blockingScriptExec) Exec(dur time.Duration, _ string, _ []string) ([]byte, int, error) { - b.running <- struct{}{} - ctx, cancel := context.WithTimeout(b.pctx, dur) - defer cancel() - cmd := exec.CommandContext(ctx, testtask.Path(), "sleep", "9000h") - testtask.SetCmdEnv(cmd) - err := cmd.Run() - code := 0 - if exitErr, ok := err.(*exec.ExitError); ok { - if !exitErr.Success() { - code = 1 - } - } - atomic.StoreInt32(&b.exited, 1) - return []byte{}, code, err -} - -// TestConsulScript_Exec_Cancel asserts cancelling a script check shortcircuits -// any running scripts. -func TestConsulScript_Exec_Cancel(t *testing.T) { - serviceCheck := structs.ServiceCheck{ - Name: "sleeper", - Interval: time.Hour, - Timeout: time.Hour, - } - exec, cancel := newBlockingScriptExec() - defer cancel() - - // pass nil for heartbeater as it shouldn't be called - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testlog.HCLogger(t), nil) - handle := check.run() - - // wait until Exec is called - <-exec.running - - // cancel now that we're blocked in exec - handle.cancel() - - select { - case <-handle.wait(): - case <-time.After(3 * time.Second): - t.Fatalf("timed out waiting for script check to exit") - } - - // The underlying ScriptExecutor (newBlockScriptExec) *cannot* be - // canceled. Only a wrapper around it obeys the context cancelation. - if atomic.LoadInt32(&exec.exited) == 1 { - t.Errorf("expected script executor to still be running after timeout") - } -} - -type execStatus struct { - checkID string - output string - status string -} - -// fakeHeartbeater implements the heartbeater interface to allow mocking out -// Consul in script executor tests. -type fakeHeartbeater struct { - updates chan execStatus -} - -func (f *fakeHeartbeater) UpdateTTL(checkID, output, status string) error { - f.updates <- execStatus{checkID: checkID, output: output, status: status} - return nil -} - -func newFakeHeartbeater() *fakeHeartbeater { - return &fakeHeartbeater{updates: make(chan execStatus)} -} - -// TestConsulScript_Exec_TimeoutBasic asserts a script will be killed when the -// timeout is reached. -func TestConsulScript_Exec_TimeoutBasic(t *testing.T) { - t.Parallel() - - serviceCheck := structs.ServiceCheck{ - Name: "sleeper", - Interval: time.Hour, - Timeout: time.Second, - } - - exec, cancel := newBlockingScriptExec() - defer cancel() - - hb := newFakeHeartbeater() - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.HCLogger(t), nil) - handle := check.run() - defer handle.cancel() // just-in-case cleanup - <-exec.running - - // Check for UpdateTTL call - select { - case update := <-hb.updates: - if update.status != api.HealthCritical { - t.Errorf("expected %q due to timeout but received %q", api.HealthCritical, update) - } - case <-time.After(3 * time.Second): - t.Fatalf("timed out waiting for script check to exit") - } - - // The underlying ScriptExecutor (newBlockScriptExec) *cannot* be - // canceled. Only a wrapper around it obeys the context cancelation. - if atomic.LoadInt32(&exec.exited) == 1 { - t.Errorf("expected script executor to still be running after timeout") - } - - // Cancel and watch for exit - handle.cancel() - select { - case <-handle.wait(): - // ok! - case update := <-hb.updates: - t.Errorf("unexpected UpdateTTL call on exit with status=%q", update) - case <-time.After(3 * time.Second): - t.Fatalf("timed out waiting for script check to exit") - } -} - -// sleeperExec sleeps for 100ms but returns successfully to allow testing timeout conditions -type sleeperExec struct{} - -func (sleeperExec) Exec(time.Duration, string, []string) ([]byte, int, error) { - time.Sleep(100 * time.Millisecond) - return []byte{}, 0, nil -} - -// TestConsulScript_Exec_TimeoutCritical asserts a script will be killed when -// the timeout is reached and always set a critical status regardless of what -// Exec returns. -func TestConsulScript_Exec_TimeoutCritical(t *testing.T) { - t.Parallel() - - serviceCheck := structs.ServiceCheck{ - Name: "sleeper", - Interval: time.Hour, - Timeout: time.Nanosecond, - } - hb := newFakeHeartbeater() - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, sleeperExec{}, hb, testlog.HCLogger(t), nil) - handle := check.run() - defer handle.cancel() // just-in-case cleanup - - // Check for UpdateTTL call - select { - case update := <-hb.updates: - if update.status != api.HealthCritical { - t.Errorf("expected %q due to timeout but received %q", api.HealthCritical, update) - } - if update.output != context.DeadlineExceeded.Error() { - t.Errorf("expected output=%q but found: %q", context.DeadlineExceeded.Error(), update.output) - } - case <-time.After(3 * time.Second): - t.Fatalf("timed out waiting for script check to timeout") - } -} - -// simpleExec is a fake ScriptExecutor that returns whatever is specified. -type simpleExec struct { - code int - err error -} - -func (s simpleExec) Exec(time.Duration, string, []string) ([]byte, int, error) { - return []byte(fmt.Sprintf("code=%d err=%v", s.code, s.err)), s.code, s.err -} - -// newSimpleExec creates a new ScriptExecutor that returns the given code and err. -func newSimpleExec(code int, err error) simpleExec { - return simpleExec{code: code, err: err} -} - -// TestConsulScript_Exec_Shutdown asserts a script will be executed once more -// when told to shutdown. -func TestConsulScript_Exec_Shutdown(t *testing.T) { - serviceCheck := structs.ServiceCheck{ - Name: "sleeper", - Interval: time.Hour, - Timeout: 3 * time.Second, - } - - hb := newFakeHeartbeater() - shutdown := make(chan struct{}) - exec := newSimpleExec(0, nil) - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.HCLogger(t), shutdown) - handle := check.run() - defer handle.cancel() // just-in-case cleanup - - // Tell scriptCheck to exit - close(shutdown) - - select { - case update := <-hb.updates: - if update.status != api.HealthPassing { - t.Errorf("expected %q due to timeout but received %q", api.HealthCritical, update) - } - case <-time.After(3 * time.Second): - t.Fatalf("timed out waiting for script check to exit") - } - - select { - case <-handle.wait(): - // ok! - case <-time.After(3 * time.Second): - t.Fatalf("timed out waiting for script check to exit") - } -} - -func TestConsulScript_Exec_Codes(t *testing.T) { - run := func(code int, err error, expected string) func(t *testing.T) { - return func(t *testing.T) { - t.Parallel() - serviceCheck := structs.ServiceCheck{ - Name: "test", - Interval: time.Hour, - Timeout: 3 * time.Second, - } - - hb := newFakeHeartbeater() - shutdown := make(chan struct{}) - exec := newSimpleExec(code, err) - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.HCLogger(t), shutdown) - handle := check.run() - defer handle.cancel() - - select { - case update := <-hb.updates: - if update.status != expected { - t.Errorf("expected %q but received %q", expected, update) - } - // assert output is being reported - expectedOutput := fmt.Sprintf("code=%d err=%v", code, err) - if err != nil { - expectedOutput = err.Error() - } - if update.output != expectedOutput { - t.Errorf("expected output=%q but found: %q", expectedOutput, update.output) - } - case <-time.After(3 * time.Second): - t.Fatalf("timed out waiting for script check to exec") - } - } - } - - // Test exit codes with errors - t.Run("Passing", run(0, nil, api.HealthPassing)) - t.Run("Warning", run(1, nil, api.HealthWarning)) - t.Run("Critical-2", run(2, nil, api.HealthCritical)) - t.Run("Critical-9000", run(9000, nil, api.HealthCritical)) - - // Errors should always cause Critical status - err := fmt.Errorf("test error") - t.Run("Error-0", run(0, err, api.HealthCritical)) - t.Run("Error-1", run(1, err, api.HealthCritical)) - t.Run("Error-2", run(2, err, api.HealthCritical)) - t.Run("Error-9000", run(9000, err, api.HealthCritical)) -} diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index d61dc30e881..5c3effeb3d1 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -14,7 +14,6 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" - "github.com/hashicorp/nomad/testutil" "github.com/kr/pretty" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -46,40 +45,9 @@ func testTask() *TaskServices { }, }, }, - DriverExec: newMockExec(), } } -// mockExec implements the ScriptExecutor interface and will use an alternate -// implementation t.ExecFunc if non-nil. -type mockExec struct { - // Ticked whenever a script is called - execs chan int - - // If non-nil will be called by script checks - ExecFunc func(ctx context.Context, cmd string, args []string) ([]byte, int, error) -} - -func newMockExec() *mockExec { - return &mockExec{ - execs: make(chan int, 100), - } -} - -func (m *mockExec) Exec(dur time.Duration, cmd string, args []string) ([]byte, int, error) { - select { - case m.execs <- 1: - default: - } - if m.ExecFunc == nil { - // Default impl is just "ok" - return []byte("ok"), 0, nil - } - ctx, cancel := context.WithTimeout(context.Background(), dur) - defer cancel() - return m.ExecFunc(ctx, cmd, args) -} - // restartRecorder is a minimal TaskRestarter implementation that simply // counts how many restarts were triggered. type restartRecorder struct { @@ -96,7 +64,6 @@ type testFakeCtx struct { ServiceClient *ServiceClient FakeConsul *MockAgent Task *TaskServices - MockExec *mockExec } var errNoOps = fmt.Errorf("testing error: no pending operations") @@ -131,7 +98,6 @@ func setupFake(t *testing.T) *testFakeCtx { ServiceClient: sc, FakeConsul: fc, Task: tt, - MockExec: tt.DriverExec.(*mockExec), } } @@ -226,13 +192,6 @@ func TestConsul_ChangePorts(t *testing.T) { require.Equal(fmt.Sprintf(":%d", xPort), v.TCP) case "c2": origScriptKey = k - select { - case <-ctx.MockExec.execs: - // Here we validate there is nothing left on the channel - require.Equal(0, len(ctx.MockExec.execs)) - case <-time.After(3 * time.Second): - t.Fatalf("script not called in time") - } case "c3": origHTTPKey = k require.Equal(fmt.Sprintf("http://:%d/", yPort), v.HTTP) @@ -678,291 +637,104 @@ func TestConsul_RegServices(t *testing.T) { func TestConsul_ShutdownOK(t *testing.T) { require := require.New(t) ctx := setupFake(t) - - // Add a script check to make sure its TTL gets updated - ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ - { - Name: "scriptcheck", - Type: "script", - Command: "true", - // Make check block until shutdown - Interval: 9000 * time.Hour, - Timeout: 10 * time.Second, - InitialStatus: "warning", - }, - } - go ctx.ServiceClient.Run() - // Register a task and agent - if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - + // register the Nomad agent service and check agentServices := []*structs.Service{ { Name: "http", Tags: []string{"nomad"}, PortLabel: "localhost:2345", + Checks: []*structs.ServiceCheck{ + { + Name: "nomad-tcp", + Type: "tcp", + Interval: 9000 * time.Hour, // make check block + Timeout: 10 * time.Second, + InitialStatus: "warning", + }, + }, }, } - if err := ctx.ServiceClient.RegisterAgent("client", agentServices); err != nil { - t.Fatalf("unexpected error registering agent: %v", err) - } - - testutil.WaitForResult(func() (bool, error) { - return ctx.ServiceClient.hasSeen(), fmt.Errorf("error contacting Consul") - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Shutdown should block until scripts finish - if err := ctx.ServiceClient.Shutdown(); err != nil { - t.Errorf("unexpected error shutting down client: %v", err) - } - - // UpdateTTL should have been called once for the script check and once - // for shutdown - if n := len(ctx.FakeConsul.checkTTLs); n != 1 { - t.Fatalf("expected 1 checkTTL entry but found: %d", n) - } - for _, v := range ctx.FakeConsul.checkTTLs { - require.Equalf(2, v, "expected 2 updates but found %d", v) - } - for _, v := range ctx.FakeConsul.checks { - if v.Status != "passing" { - t.Fatalf("expected check to be passing but found %q", v.Status) - } - } -} - -// TestConsul_ShutdownSlow tests the slow but ok path for the shutdown logic in -// ServiceClient. -func TestConsul_ShutdownSlow(t *testing.T) { - t.Parallel() - ctx := setupFake(t) - - // Add a script check to make sure its TTL gets updated - ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ - { - Name: "scriptcheck", - Type: "script", - Command: "true", - // Make check block until shutdown - Interval: 9000 * time.Hour, - Timeout: 5 * time.Second, - InitialStatus: "warning", - }, - } - - // Make Exec slow, but not too slow - waiter := make(chan struct{}) - ctx.MockExec.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) { - select { - case <-waiter: - default: - close(waiter) - } - time.Sleep(time.Second) - return []byte{}, 0, nil - } - - // Make shutdown wait time just a bit longer than ctx.Exec takes - ctx.ServiceClient.shutdownWait = 3 * time.Second - - go ctx.ServiceClient.Run() - - // Register a task and agent - if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - - // wait for Exec to get called before shutting down - <-waiter - - // Shutdown should block until all enqueued operations finish. - preShutdown := time.Now() - if err := ctx.ServiceClient.Shutdown(); err != nil { - t.Errorf("unexpected error shutting down client: %v", err) - } + require.NoError(ctx.ServiceClient.RegisterAgent("client", agentServices)) + require.Eventually(ctx.ServiceClient.hasSeen, time.Second, 10*time.Millisecond) - // Shutdown time should have taken: ~1s <= shutdown <= 3s - // actual timing might be less than 1s, to account for shutdown invocation overhead - shutdownTime := time.Now().Sub(preShutdown) - if shutdownTime < 900*time.Millisecond || shutdownTime > ctx.ServiceClient.shutdownWait { - t.Errorf("expected shutdown to take >1s and <%s but took: %s", ctx.ServiceClient.shutdownWait, shutdownTime) - } + // assert successful registration + require.Len(ctx.FakeConsul.services, 1, "expected agent service to be registered") + require.Len(ctx.FakeConsul.checks, 1, "expected agent check to be registered") + require.Contains(ctx.FakeConsul.services, + makeAgentServiceID("client", agentServices[0])) - // UpdateTTL should have been called once for the script check - if n := len(ctx.FakeConsul.checkTTLs); n != 1 { - t.Fatalf("expected 1 checkTTL entry but found: %d", n) - } - for _, v := range ctx.FakeConsul.checkTTLs { - if v != 1 { - t.Fatalf("expected script check to be updated once but found %d", v) - } - } - for _, v := range ctx.FakeConsul.checks { - if v.Status != "passing" { - t.Fatalf("expected check to be passing but found %q", v.Status) - } - } + // Shutdown() should block until Nomad agent service/check is deregistered + require.NoError(ctx.ServiceClient.Shutdown()) + require.Len(ctx.FakeConsul.services, 0, "expected agent service to be deregistered") + require.Len(ctx.FakeConsul.checks, 0, "expected agent check to be deregistered") } // TestConsul_ShutdownBlocked tests the blocked past deadline path for the // shutdown logic in ServiceClient. func TestConsul_ShutdownBlocked(t *testing.T) { + require := require.New(t) t.Parallel() ctx := setupFake(t) + // can be short because we're intentionally blocking, but needs to + // be longer than the time we'll block Consul so we can be sure + // we're not delayed either. + ctx.ServiceClient.shutdownWait = time.Second + go ctx.ServiceClient.Run() - // Add a script check to make sure its TTL gets updated - ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ + // register the Nomad agent service and check + agentServices := []*structs.Service{ { - Name: "scriptcheck", - Type: "script", - Command: "true", - // Make check block until shutdown - Interval: 9000 * time.Hour, - Timeout: 9000 * time.Hour, - InitialStatus: "warning", + Name: "http", + Tags: []string{"nomad"}, + PortLabel: "localhost:2345", + Checks: []*structs.ServiceCheck{ + { + Name: "nomad-tcp", + Type: "tcp", + Interval: 9000 * time.Hour, // make check block + Timeout: 10 * time.Second, + InitialStatus: "warning", + }, + }, }, } + require.NoError(ctx.ServiceClient.RegisterAgent("client", agentServices)) + require.Eventually(ctx.ServiceClient.hasSeen, time.Second, 10*time.Millisecond) + require.Len(ctx.FakeConsul.services, 1, "expected agent service to be registered") + require.Len(ctx.FakeConsul.checks, 1, "expected agent check to be registered") - block := make(chan struct{}) - defer close(block) // cleanup after test - - // Make Exec block forever + // prevent normal shutdown by blocking Consul. the shutdown should wait + // until agent deregistration has finished waiter := make(chan struct{}) - ctx.MockExec.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) { + result := make(chan error) + go func() { + ctx.FakeConsul.mu.Lock() close(waiter) - <-block - return []byte{}, 0, nil - } - - // Use a short shutdown deadline since we're intentionally blocking forever - ctx.ServiceClient.shutdownWait = time.Second - - go ctx.ServiceClient.Run() - - // Register a task and agent - if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } + result <- ctx.ServiceClient.Shutdown() + }() - // Wait for exec to be called - <-waiter + <-waiter // wait for lock to be hit // Shutdown should block until all enqueued operations finish. preShutdown := time.Now() - err := ctx.ServiceClient.Shutdown() - if err == nil { - t.Errorf("expected a timed out error from shutdown") - } - - // Shutdown time should have taken shutdownWait; to avoid timing - // related errors simply test for wait <= shutdown <= wait+3s - shutdownTime := time.Now().Sub(preShutdown) - maxWait := ctx.ServiceClient.shutdownWait + (3 * time.Second) - if shutdownTime < ctx.ServiceClient.shutdownWait || shutdownTime > maxWait { - t.Errorf("expected shutdown to take >%s and <%s but took: %s", ctx.ServiceClient.shutdownWait, maxWait, shutdownTime) - } - - // UpdateTTL should not have been called for the script check - if n := len(ctx.FakeConsul.checkTTLs); n != 0 { - t.Fatalf("expected 0 checkTTL entry but found: %d", n) - } - for _, v := range ctx.FakeConsul.checks { - if expected := "warning"; v.Status != expected { - t.Fatalf("expected check to be %q but found %q", expected, v.Status) - } - } -} - -// TestConsul_RemoveScript assert removing a script check removes all objects -// related to that check. -func TestConsul_CancelScript(t *testing.T) { - ctx := setupFake(t) - ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ - { - Name: "scriptcheckDel", - Type: "script", - Interval: 9000 * time.Hour, - Timeout: 9000 * time.Hour, - }, - { - Name: "scriptcheckKeep", - Type: "script", - Interval: 9000 * time.Hour, - Timeout: 9000 * time.Hour, - }, - } - - if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - - if err := ctx.syncOnce(); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - - if len(ctx.FakeConsul.checks) != 2 { - t.Errorf("expected 2 checks but found %d", len(ctx.FakeConsul.checks)) - } - - if len(ctx.ServiceClient.scripts) != 2 && len(ctx.ServiceClient.runningScripts) != 2 { - t.Errorf("expected 2 running script but found scripts=%d runningScripts=%d", - len(ctx.ServiceClient.scripts), len(ctx.ServiceClient.runningScripts)) - } - - for i := 0; i < 2; i++ { - select { - case <-ctx.MockExec.execs: - // Script ran as expected! - case <-time.After(3 * time.Second): - t.Fatalf("timed out waiting for script check to run") - } - } - - // Remove a check and update the task - origTask := ctx.Task.Copy() - ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ - { - Name: "scriptcheckKeep", - Type: "script", - Interval: 9000 * time.Hour, - Timeout: 9000 * time.Hour, - }, - } - - if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - - if err := ctx.syncOnce(); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - - if len(ctx.FakeConsul.checks) != 1 { - t.Errorf("expected 1 check but found %d", len(ctx.FakeConsul.checks)) - } - - if len(ctx.ServiceClient.scripts) != 1 && len(ctx.ServiceClient.runningScripts) != 1 { - t.Errorf("expected 1 running script but found scripts=%d runningScripts=%d", - len(ctx.ServiceClient.scripts), len(ctx.ServiceClient.runningScripts)) - } - - // Make sure exec wasn't called again select { - case <-ctx.MockExec.execs: - t.Errorf("unexpected execution of script; was goroutine not cancelled?") - case <-time.After(100 * time.Millisecond): - // No unexpected script execs - } - - // Don't leak goroutines - for _, scriptHandle := range ctx.ServiceClient.runningScripts { - scriptHandle.cancel() - } + case <-time.After(200 * time.Millisecond): + ctx.FakeConsul.mu.Unlock() + require.NoError(<-result) + case <-result: + t.Fatal("should not have received result until Consul unblocked") + } + shutdownTime := time.Now().Sub(preShutdown).Seconds() + require.Less(shutdownTime, time.Second.Seconds(), + "expected shutdown to take >200ms and <1s") + require.Greater(shutdownTime, 200*time.Millisecond.Seconds(), + "expected shutdown to take >200ms and <1s") + require.Len(ctx.FakeConsul.services, 0, + "expected agent service to be deregistered") + require.Len(ctx.FakeConsul.checks, 0, + "expected agent check to be deregistered") } // TestConsul_DriverNetwork_AutoUse asserts that if a driver network has @@ -1771,7 +1543,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) { require.Len(ctx.ServiceClient.checks, 3) delete(ctx.ServiceClient.services, outofbandTaskServiceID) - delete(ctx.ServiceClient.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + delete(ctx.ServiceClient.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) require.Len(ctx.ServiceClient.services, 2) require.Len(ctx.ServiceClient.checks, 2) @@ -1788,9 +1560,9 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) { require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID) require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID) - require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0])) - require.NotContains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) - require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0])) + require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0])) } // TestConsul_ServiceDeregistration_InProbation asserts that during initialization @@ -1880,7 +1652,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { require.Len(ctx.ServiceClient.checks, 3) delete(ctx.ServiceClient.services, outofbandTaskServiceID) - delete(ctx.ServiceClient.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + delete(ctx.ServiceClient.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) require.Len(ctx.ServiceClient.services, 2) require.Len(ctx.ServiceClient.checks, 2) @@ -1897,9 +1669,9 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { require.Contains(ctx.FakeConsul.services, outofbandTaskServiceID) require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID) - require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0])) - require.Contains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) - require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0])) + require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0])) + require.Contains(ctx.FakeConsul.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0])) // after probation, outofband services and checks are removed ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour) @@ -1912,8 +1684,8 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID) require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID) - require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0])) - require.NotContains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) - require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0])) + require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0])) } From fb13c5baa6ba63c53445bb9bb3e64f82923c6f6a Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 28 Aug 2019 16:46:42 -0400 Subject: [PATCH 2/4] client: add support for task group checks When tasks are checked for script checks, we walk back through their task group to see if there are script checks associated with the task. If so, we'll spin off script check tasklets for them. The group-level service and any restart behaviors it needs are entirely encapsulated within the group service hook. --- .../taskrunner/script_check_hook.go | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/client/allocrunner/taskrunner/script_check_hook.go b/client/allocrunner/taskrunner/script_check_hook.go index dfe1d231e2d..7d021062e9d 100644 --- a/client/allocrunner/taskrunner/script_check_hook.go +++ b/client/allocrunner/taskrunner/script_check_hook.go @@ -73,6 +73,32 @@ func newScriptCheckHook(c scriptCheckHookConfig) *scriptCheckHook { } } + // Walk back through the task group to see if there are script checks + // associated with the task. If so, we'll create scriptCheck tasklets + // for them. The group-level service and any check restart behaviors it + // needs are entirely encapsulated within the group service hook which + // watches Consul for status changes. + tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup) + for _, service := range tg.Services { + for _, check := range service.Checks { + if check.Type != structs.ServiceCheckScript { + continue + } + if check.TaskName != c.task.Name { + continue + } + groupTaskName := "group-" + tg.Name + sc := newScriptCheck(&scriptCheckConfig{ + allocID: c.alloc.ID, + taskName: groupTaskName, + service: service, + check: check, + agent: c.consul, + }) + scriptChecks[sc.id] = sc + } + } + h := &scriptCheckHook{ consul: c.consul, allocID: c.alloc.ID, From 40c707b33a77201648ce59538fcaa5540cfae8bb Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 3 Sep 2019 13:53:00 -0400 Subject: [PATCH 3/4] remove redundant taskName, allocID fields from scriptCheck --- client/allocrunner/taskrunner/script_check_hook.go | 3 +-- client/allocrunner/taskrunner/script_check_hook_test.go | 2 ++ client/allocrunner/taskrunner/tasklet.go | 5 +---- client/allocrunner/taskrunner/tasklet_test.go | 2 -- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/client/allocrunner/taskrunner/script_check_hook.go b/client/allocrunner/taskrunner/script_check_hook.go index 7d021062e9d..4d9a4fd2cb0 100644 --- a/client/allocrunner/taskrunner/script_check_hook.go +++ b/client/allocrunner/taskrunner/script_check_hook.go @@ -277,8 +277,6 @@ func newScriptCheck(config *scriptCheckConfig) *scriptCheck { lastCheckOk: true, // start logging on first failure } // we can't use the promoted fields of tasklet in the struct literal - sc.allocID = config.allocID - sc.taskName = config.taskName sc.Command = config.check.Command sc.Args = config.check.Args sc.Interval = config.check.Interval @@ -286,6 +284,7 @@ func newScriptCheck(config *scriptCheckConfig) *scriptCheck { return sc } +// Copy does a *shallow* copy of script checks. func (sc *scriptCheck) Copy() *scriptCheck { newSc := sc return newSc diff --git a/client/allocrunner/taskrunner/script_check_hook_test.go b/client/allocrunner/taskrunner/script_check_hook_test.go index c40c588667c..4373674fbae 100644 --- a/client/allocrunner/taskrunner/script_check_hook_test.go +++ b/client/allocrunner/taskrunner/script_check_hook_test.go @@ -170,6 +170,8 @@ func TestScript_Exec_Shutdown(t *testing.T) { } } +// TestScript_Exec_Codes asserts script exit codes are translated to their +// corresponding Consul health check status. func TestScript_Exec_Codes(t *testing.T) { exec := newScriptedExec([]execResult{ diff --git a/client/allocrunner/taskrunner/tasklet.go b/client/allocrunner/taskrunner/tasklet.go index 24a834a1064..0f6d2e578c2 100644 --- a/client/allocrunner/taskrunner/tasklet.go +++ b/client/allocrunner/taskrunner/tasklet.go @@ -64,8 +64,6 @@ func (c *contextExec) Exec(timeout time.Duration, cmd string, args []string) ([] // once and on each interval, and fires a callback whenever the script // is complete. type tasklet struct { - allocID string - taskName string Command string // Command is the command to run for tasklet Args []string // Args is a list of arguments for tasklet Interval time.Duration // Interval of the tasklet @@ -125,8 +123,7 @@ func (t *tasklet) run() *taskletHandle { "client", "allocrunner", "taskrunner", "tasklet_runs"}, 1) // Execute check script with timeout - t.logger.Trace("tasklet executing", - "allocID", t.allocID, "task", t.taskName) + t.logger.Trace("tasklet executing") output, code, err := ctxExec.Exec(t.Timeout, t.Command, t.Args) switch err { case context.Canceled: diff --git a/client/allocrunner/taskrunner/tasklet_test.go b/client/allocrunner/taskrunner/tasklet_test.go index 93a217d8499..4dc8f36f2c3 100644 --- a/client/allocrunner/taskrunner/tasklet_test.go +++ b/client/allocrunner/taskrunner/tasklet_test.go @@ -159,8 +159,6 @@ type taskletMock struct { func newTaskletMock(exec interfaces.ScriptExecutor, logger hclog.Logger, interval, timeout time.Duration) *taskletMock { tm := &taskletMock{calls: make(chan execResult)} - tm.allocID = "allocid" - tm.taskName = "testtask" tm.exec = exec tm.logger = logger tm.Interval = interval From 363b923b443759e9f4f1be2e90f074e99138fd7b Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 3 Sep 2019 13:54:03 -0400 Subject: [PATCH 4/4] retry updateTTL on all errors --- client/allocrunner/taskrunner/script_check_hook.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/client/allocrunner/taskrunner/script_check_hook.go b/client/allocrunner/taskrunner/script_check_hook.go index 4d9a4fd2cb0..a7d8935ba79 100644 --- a/client/allocrunner/taskrunner/script_check_hook.go +++ b/client/allocrunner/taskrunner/script_check_hook.go @@ -3,7 +3,6 @@ package taskrunner import ( "context" "fmt" - "strings" "sync" "time" @@ -351,9 +350,8 @@ const ( func (s *scriptCheck) updateTTL(ctx context.Context, id, msg, state string) error { for attempts := 0; ; attempts++ { err := s.agent.UpdateTTL(id, msg, state) - if err == nil || - !strings.Contains(err.Error(), "does not have associated TTL") { - return err + if err == nil { + return nil } // Handle the retry case