From c89e97084d0517aa4e045b8e52a4d71b11c4a38b Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 4 Feb 2016 13:09:53 -0800 Subject: [PATCH 1/3] Fix AllocRunner not capturing destroy signal and tests --- client/alloc_runner.go | 32 ++++-- client/alloc_runner_test.go | 212 +++++++++++++++++++++++++++++++++++- 2 files changed, 231 insertions(+), 13 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 584a031e146..b32dc9299ad 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -326,15 +326,8 @@ func (r *AllocRunner) Run() { defer close(r.waitCh) go r.dirtySyncState() - // Check if the allocation is in a terminal status - alloc := r.alloc - if alloc.TerminalStatus() { - r.logger.Printf("[DEBUG] client: aborting runner for alloc '%s', terminal status", r.alloc.ID) - return - } - r.logger.Printf("[DEBUG] client: starting runner for alloc '%s'", r.alloc.ID) - // Find the task group to run in the allocation + alloc := r.alloc tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) if tg == nil { r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup) @@ -353,7 +346,17 @@ func (r *AllocRunner) Run() { r.ctx = driver.NewExecContext(allocDir, r.alloc.ID) } + // Check if the allocation is in a terminal status. In this case, we don't + // start any of the task runners and directly wait for the destroy signal to + // clean up the allocation. + if alloc.TerminalStatus() { + r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.alloc.ID) + r.handleDestroy() + r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID) + } + // Start the task runners + r.logger.Printf("[DEBUG] client: starting runner for alloc '%s'", r.alloc.ID) r.taskLock.Lock() for _, task := range tg.Tasks { if _, ok := r.restored[task.Name]; ok { @@ -428,8 +431,16 @@ OUTER: // Final state sync r.retrySyncState(nil) - // Check if we should destroy our state - if r.destroy { + // Block until we should destroy the state of the alloc + r.handleDestroy() + r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID) +} + +// handleDestroy blocks till the AllocRunner should be destroyed and does the +// necessary cleanup. +func (r *AllocRunner) handleDestroy() { + select { + case <-r.destroyCh: if err := r.DestroyContext(); err != nil { r.logger.Printf("[ERR] client: failed to destroy context for alloc '%s': %v", r.alloc.ID, err) @@ -439,7 +450,6 @@ OUTER: r.alloc.ID, err) } } - r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID) } // Update is used to update the allocation of the context diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 5d68e013cc5..a97544d4142 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -62,6 +62,95 @@ func TestAllocRunner_SimpleRun(t *testing.T) { }) } +func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { + ctestutil.ExecCompatible(t) + upd, ar := testAllocRunner(false) + + // Ensure task takes some time + task := ar.alloc.Job.TaskGroups[0].Tasks[0] + task.Config["command"] = "/bin/sleep" + task.Config["args"] = []string{"10"} + go ar.Run() + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, fmt.Errorf("No updates") + } + last := upd.Allocs[upd.Count-1] + if last.ClientStatus == structs.AllocClientStatusRunning { + return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Update the alloc to be terminal which should cause the alloc runner to + // stop the tasks and wait for a destroy. + update := ar.alloc.Copy() + update.DesiredStatus = structs.AllocDesiredStatusStop + ar.Update(update) + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, nil + } + + // Check the status has changed. + last := upd.Allocs[upd.Count-1] + if last.ClientStatus != structs.AllocClientStatusDead { + return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead) + } + + // Check the state still exists + if _, err := os.Stat(ar.stateFilePath()); err != nil { + return false, fmt.Errorf("state file destroyed: %v", err) + } + + // Check the alloc directory still exists + if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err != nil { + return false, fmt.Errorf("alloc dir destroyed: %v", ar.ctx.AllocDir.AllocDir) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) + }) + + // Send the destroy signal and ensure the AllocRunner cleans up. + ar.Destroy() + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, nil + } + + // Check the status has changed. + last := upd.Allocs[upd.Count-1] + if last.ClientStatus != structs.AllocClientStatusDead { + return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead) + } + + // Check the state was cleaned + if _, err := os.Stat(ar.stateFilePath()); err == nil { + return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath()) + } else if !os.IsNotExist(err) { + return false, fmt.Errorf("stat err: %v", err) + } + + // Check the alloc directory was cleaned + if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil { + return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir) + } else if !os.IsNotExist(err) { + return false, fmt.Errorf("stat err: %v", err) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) + }) +} + func TestAllocRunner_Destroy(t *testing.T) { ctestutil.ExecCompatible(t) upd, ar := testAllocRunner(false) @@ -83,8 +172,28 @@ func TestAllocRunner_Destroy(t *testing.T) { if upd.Count == 0 { return false, nil } + + // Check the status has changed. last := upd.Allocs[upd.Count-1] - return last.ClientStatus == structs.AllocClientStatusDead, nil + if last.ClientStatus != structs.AllocClientStatusDead { + return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead) + } + + // Check the state was cleaned + if _, err := os.Stat(ar.stateFilePath()); err == nil { + return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath()) + } else if !os.IsNotExist(err) { + return false, fmt.Errorf("stat err: %v", err) + } + + // Check the alloc directory was cleaned + if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil { + return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir) + } else if !os.IsNotExist(err) { + return false, fmt.Errorf("stat err: %v", err) + } + + return true, nil }, func(err error) { t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) }) @@ -129,7 +238,6 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { task.Config["command"] = "/bin/sleep" task.Config["args"] = []string{"10"} go ar.Run() - defer ar.Destroy() // Snapshot state testutil.WaitForResult(func() (bool, error) { @@ -171,3 +279,103 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { t.Fatalf("took too long to terminate") } } + +func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { + ctestutil.ExecCompatible(t) + upd, ar := testAllocRunner(false) + + // Ensure task takes some time + task := ar.alloc.Job.TaskGroups[0].Tasks[0] + task.Config["command"] = "/bin/sleep" + task.Config["args"] = []string{"10"} + go ar.Run() + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, fmt.Errorf("No updates") + } + last := upd.Allocs[upd.Count-1] + if last.ClientStatus == structs.AllocClientStatusRunning { + return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Update the alloc to be terminal which should cause the alloc runner to + // stop the tasks and wait for a destroy. + update := ar.alloc.Copy() + update.DesiredStatus = structs.AllocDesiredStatusStop + ar.Update(update) + + testutil.WaitForResult(func() (bool, error) { + return ar.alloc.DesiredStatus == structs.AllocDesiredStatusStop, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + err := ar.SaveState() + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create a new alloc runner + consulClient, err := NewConsulService(&consulServiceConfig{ar.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}}) + ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update, + &structs.Allocation{ID: ar.alloc.ID}, consulClient) + err = ar2.RestoreState() + if err != nil { + t.Fatalf("err: %v", err) + } + go ar2.Run() + + testutil.WaitForResult(func() (bool, error) { + // Check the state still exists + if _, err := os.Stat(ar.stateFilePath()); err != nil { + return false, fmt.Errorf("state file destroyed: %v", err) + } + + // Check the alloc directory still exists + if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err != nil { + return false, fmt.Errorf("alloc dir destroyed: %v", ar.ctx.AllocDir.AllocDir) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) + }) + + // Send the destroy signal and ensure the AllocRunner cleans up. + ar2.Destroy() + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, nil + } + + // Check the status has changed. + last := upd.Allocs[upd.Count-1] + if last.ClientStatus != structs.AllocClientStatusDead { + return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead) + } + + // Check the state was cleaned + if _, err := os.Stat(ar.stateFilePath()); err == nil { + return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath()) + } else if !os.IsNotExist(err) { + return false, fmt.Errorf("stat err: %v", err) + } + + // Check the alloc directory was cleaned + if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil { + return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir) + } else if !os.IsNotExist(err) { + return false, fmt.Errorf("stat err: %v", err) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) + }) +} From fb010e2c5d7c8d3265e039ddea20507170c5a078 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 4 Feb 2016 14:19:27 -0800 Subject: [PATCH 2/3] Small fixes --- client/alloc_runner.go | 5 +++-- client/alloc_runner_test.go | 11 +++++++---- testutil/wait.go | 2 +- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index b32dc9299ad..928b0105be9 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -353,10 +353,11 @@ func (r *AllocRunner) Run() { r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.alloc.ID) r.handleDestroy() r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID) + return } // Start the task runners - r.logger.Printf("[DEBUG] client: starting runner for alloc '%s'", r.alloc.ID) + r.logger.Printf("[DEBUG] client: starting task runners for alloc '%s'", r.alloc.ID) r.taskLock.Lock() for _, task := range tg.Tasks { if _, ok := r.restored[task.Name]; ok { @@ -418,7 +419,6 @@ OUTER: // Destroy each sub-task r.taskLock.Lock() - defer r.taskLock.Unlock() for _, tr := range r.tasks { tr.Destroy() } @@ -427,6 +427,7 @@ OUTER: for _, tr := range r.tasks { <-tr.WaitCh() } + r.taskLock.Unlock() // Final state sync r.retrySyncState(nil) diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index a97544d4142..f082ba08e4f 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -114,7 +114,7 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { return true, nil }, func(err error) { - t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) + t.Fatalf("err: %v", err) }) // Send the destroy signal and ensure the AllocRunner cleans up. @@ -147,7 +147,7 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { return true, nil }, func(err error) { - t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) + t.Fatalf("err: %v", err) }) } @@ -195,7 +195,7 @@ func TestAllocRunner_Destroy(t *testing.T) { return true, nil }, func(err error) { - t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) + t.Fatalf("err: %v", err) }) if time.Since(start) > 15*time.Second { @@ -320,6 +320,9 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { t.Fatalf("err: %v", err) } + // Ensure both alloc runners don't destroy + ar.destroy = true + // Create a new alloc runner consulClient, err := NewConsulService(&consulServiceConfig{ar.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}}) ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update, @@ -376,6 +379,6 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { return true, nil }, func(err error) { - t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) + t.Fatalf("err: %v", err) }) } diff --git a/testutil/wait.go b/testutil/wait.go index c1d5321bc0a..43ad02acadb 100644 --- a/testutil/wait.go +++ b/testutil/wait.go @@ -18,7 +18,7 @@ type testFn func() (bool, error) type errorFn func(error) func WaitForResult(test testFn, error errorFn) { - WaitForResultRetries(1000*TestMultiplier(), test, error) + WaitForResultRetries(2000*TestMultiplier(), test, error) } func WaitForResultRetries(retries int64, test testFn, error errorFn) { From 33088410986a104a0f6efe03900825df588981c4 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 4 Feb 2016 14:30:32 -0800 Subject: [PATCH 3/3] Serialize the list of mounted entries in the alloc dir --- client/allocdir/alloc_dir.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index dd2ac6afed3..e5200be41c7 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -36,7 +36,7 @@ type AllocDir struct { TaskDirs map[string]string // A list of locations the shared alloc has been mounted to. - mounted []string + Mounted []string } // AllocFileInfo holds information about a file inside the AllocDir @@ -64,7 +64,7 @@ func NewAllocDir(allocDir string) *AllocDir { // Tears down previously build directory structure. func (d *AllocDir) Destroy() error { // Unmount all mounted shared alloc dirs. - for _, m := range d.mounted { + for _, m := range d.Mounted { if err := d.unmountSharedDir(m); err != nil { return fmt.Errorf("Failed to unmount shared directory: %v", err) } @@ -233,7 +233,7 @@ func (d *AllocDir) MountSharedDir(task string) error { return fmt.Errorf("Failed to mount shared directory for task %v: %v", task, err) } - d.mounted = append(d.mounted, taskLoc) + d.Mounted = append(d.Mounted, taskLoc) return nil }