Skip to content
This repository has been archived by the owner on Jan 10, 2023. It is now read-only.

Commit

Permalink
Ensure that kill when prepare occurs preempts work
Browse files Browse the repository at this point in the history
  • Loading branch information
sargun committed May 15, 2018
1 parent b3c8a8f commit 8cd092e
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 3 deletions.
17 changes: 14 additions & 3 deletions executor/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,25 +260,36 @@ no_launchguard:
}
r.updateStatus(ctx, titusdriver.Starting, "creating")

prepareCtx, prepareCancel := context.WithCancel(ctx)
defer prepareCancel()
go func() {
select {
case <-r.killChan:
prepareCancel()
case <-prepareCtx.Done():
}
}()
// When Create() returns the host may have been modified to create storage and pull the image.
// These steps may or may not have completed depending on if/where a failure occurred.
bindMounts := []string{}
err = r.runtime.Prepare(ctx, r.container, bindMounts)
err = r.runtime.Prepare(prepareCtx, r.container, bindMounts)
if err != nil {
r.metrics.Counter("titus.executor.launchTaskFailed", 1, nil)
r.logger.Error("task failed to create container: ", err)
// Treat registry pull errors as LOST and non-existent images as FAILED.
switch err.(type) {
case *runtimeTypes.RegistryImageNotFoundError, *runtimeTypes.InvalidSecurityGroupError, *runtimeTypes.BadEntryPointError:
r.logger.Error("Returning TASK_FAILED for task: ", err)
r.updateStatus(ctx, titusdriver.Failed, err.Error())
r.updateStatus(prepareCtx, titusdriver.Failed, err.Error())
default:
r.logger.Error("Returning TASK_LOST for task: ", err)
r.updateStatus(ctx, titusdriver.Lost, err.Error())
r.updateStatus(prepareCtx, titusdriver.Lost, err.Error())
}
return
}

// By this point, we should have no more dependence on the prepare context
prepareCancel()
r.updateStatus(ctx, titusdriver.Starting, "starting")
logDir, details, statusChan, err := r.runtime.Start(ctx, r.container)
if err != nil { // nolint: vetshadow
Expand Down
69 changes: 69 additions & 0 deletions executor/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,15 @@ type runtimeMock struct {
startCalled chan<- struct{}

statusChan chan runtimeTypes.StatusMessage

prepareCallback func(context.Context) error
}

func (r *runtimeMock) Prepare(ctx context.Context, c *runtimeTypes.Container, bindMounts []string) error {
r.t.Log("runtimeMock.Prepare", c.TaskID)
if r.prepareCallback != nil {
return r.prepareCallback(ctx)
}
return nil
}

Expand Down Expand Up @@ -101,6 +106,70 @@ func (r *runtimeMock) Cleanup(c *runtimeTypes.Container) error {
return nil
}

func TestCancelDuringPrepare(t *testing.T) { // nolint: gocyclo
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

taskID := "Titus-123-worker-0-2"
image := "titusops/alpine"
taskInfo := &titus.ContainerInfo{
ImageName: &image,
IgnoreLaunchGuard: proto.Bool(true),
}
kills := make(chan chan<- struct{}, 1)

statusChan := make(chan runtimeTypes.StatusMessage, 10)
r := &runtimeMock{
t: t,
startCalled: make(chan<- struct{}),
kills: kills,
ctx: ctx,
statusChan: statusChan,
}

r.prepareCallback = func(c context.Context) error {
<-c.Done()
return c.Err()
}

l := uploader.NewUploadersFromUploaderArray([]uploader.Uploader{&uploader.NoopUploader{}})
cfg := config.Config{}

executor, err := WithRuntime(ctx, metrics.Discard, func(ctx context.Context, _cfg config.Config) (runtimeTypes.Runtime, error) {
return r, nil
}, l, cfg)
require.NoError(t, err)
require.NoError(t, executor.StartTask(taskID, taskInfo, 1, 1, 1))

once := sync.Once{}

testFailed := make(chan struct{})
time.AfterFunc(5*time.Second, func() {
close(testFailed)
})

for {
select {
case update := <-executor.UpdatesChan:
switch update.State {
case titusdriver.Starting:
once.Do(func() {
executor.Kill()
logrus.Debug("Killing task, now that it's entered starting")
})
case titusdriver.Lost:
return
default:
t.Fatal("Unknown state: ", update)
}
case <-testFailed:
t.Fatal("Test Failed, executor didn't yield when killed in prepare")
case <-ctx.Done():
t.Fatal("Context complete?")
}
}
}

func TestSendRedundantStatusMessage(t *testing.T) { // nolint: gocyclo
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down

0 comments on commit 8cd092e

Please sign in to comment.