Skip to content
This repository has been archived by the owner on Jan 10, 2023. It is now read-only.

Terminate during prepare #119

Merged
merged 2 commits into from
May 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions executor/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,19 @@ no_launchguard:
}
r.updateStatus(ctx, titusdriver.Starting, "creating")

prepareCtx, prepareCancel := context.WithCancel(ctx)
defer prepareCancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: I think this can be moved to a single prepareCancel call right after Prepare(...) (and right before the if err != nil), since it's the only place it needs to be called, regardless if Prepare failed or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metalinter complains if I do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:(

go func() {
select {
case <-r.killChan:
prepareCancel()
case <-prepareCtx.Done():
}
}()
// When Create() returns the host may have been modified to create storage and pull the image.
// These steps may or may not have completed depending on if/where a failure occurred.
bindMounts := []string{}
err = r.runtime.Prepare(ctx, r.container, bindMounts)
err = r.runtime.Prepare(prepareCtx, r.container, bindMounts)
if err != nil {
r.metrics.Counter("titus.executor.launchTaskFailed", 1, nil)
r.logger.Error("task failed to create container: ", err)
Expand All @@ -279,6 +288,8 @@ no_launchguard:
return
}

// By this point, we should have no more dependence on the prepare context
prepareCancel()
r.updateStatus(ctx, titusdriver.Starting, "starting")
logDir, details, statusChan, err := r.runtime.Start(ctx, r.container)
if err != nil { // nolint: vetshadow
Expand Down Expand Up @@ -551,7 +562,7 @@ func (r *Runner) updateStatusWithDetails(ctx context.Context, status titusdriver
}:
l.Info("Updating task status")
case <-ctx.Done():
l.Info("Not sending update")
l.Warn("Not sending update, because UpdatesChan Blocked, (or closed), and context completed")
}
}

Expand Down
180 changes: 125 additions & 55 deletions executor/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,131 @@ type runtimeMock struct {
startCalled chan<- struct{}

statusChan chan runtimeTypes.StatusMessage

prepareCallback func(context.Context) error
}

func (r *runtimeMock) Prepare(ctx context.Context, c *runtimeTypes.Container, bindMounts []string) error {
r.t.Log("runtimeMock.Prepare", c.TaskID)
if r.prepareCallback != nil {
return r.prepareCallback(ctx)
}
return nil
}

func (r *runtimeMock) Start(ctx context.Context, c *runtimeTypes.Container) (string, *runtimeTypes.Details, <-chan runtimeTypes.StatusMessage, error) {
r.t.Log("runtimeMock.Start", c.TaskID)
r.mu.Lock()
defer r.mu.Unlock()
close(r.startCalled)
r.startCalled = make(chan<- struct{}) // reset subscription
details := &runtimeTypes.Details{
IPAddresses: make(map[string]string),
NetworkConfiguration: &runtimeTypes.NetworkConfigurationDetails{
IsRoutableIP: false,
},
}

status := runtimeTypes.StatusMessage{
Status: runtimeTypes.StatusRunning,
Msg: "running",
}

// We can do this because it's buffered.
r.statusChan <- status
return "", details, r.statusChan, nil
}

func (r *runtimeMock) Kill(c *runtimeTypes.Container) error {
logrus.Infof("runtimeMock.Kill (%v): %s", r.ctx, c.TaskID)
defer close(r.statusChan)
defer logrus.Info("runtimeMock.Killed: ", c.TaskID)
// send a kill request and wait for a grant
req := make(chan struct{}, 1)
select {
case r.kills <- req:
case <-r.ctx.Done():
logrus.Info("runtimeMock.Kill canceled")
return errors.New("runtimeMock.Kill canceled")
}
select {
case <-req:
case <-r.ctx.Done():
logrus.Info("runtimeMock.Kill canceled")

return errors.New("runtimeMock.Kill canceled")
}
return nil
}

func (r *runtimeMock) Cleanup(c *runtimeTypes.Container) error {
r.t.Log("runtimeMock.Cleanup", c.TaskID)
return nil
}

func TestCancelDuringPrepare(t *testing.T) { // nolint: gocyclo
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

taskID := "TestCancelDuringPrepare"
image := "titusops/alpine"
taskInfo := &titus.ContainerInfo{
ImageName: &image,
IgnoreLaunchGuard: proto.Bool(true),
}
kills := make(chan chan<- struct{}, 1)

statusChan := make(chan runtimeTypes.StatusMessage, 10)
r := &runtimeMock{
t: t,
startCalled: make(chan<- struct{}),
kills: kills,
ctx: ctx,
statusChan: statusChan,
}

r.prepareCallback = func(c context.Context) error {
<-c.Done()
return c.Err()
}

l := uploader.NewUploadersFromUploaderArray([]uploader.Uploader{&uploader.NoopUploader{}})
cfg := config.Config{}

executor, err := WithRuntime(ctx, metrics.Discard, func(ctx context.Context, _cfg config.Config) (runtimeTypes.Runtime, error) {
return r, nil
}, l, cfg)
require.NoError(t, err)
require.NoError(t, executor.StartTask(taskID, taskInfo, 1, 1, 1))

once := sync.Once{}

testFailed := make(chan struct{})
time.AfterFunc(30*time.Second, func() {
close(testFailed)
})

for {
select {
case update := <-executor.UpdatesChan:
logrus.Debug("Got update: ", update)
switch update.State {
case titusdriver.Starting:
once.Do(func() {
executor.Kill()
logrus.Debug("Killing task, now that it's entered starting")
})
case titusdriver.Lost:
return
default:
t.Fatal("Unknown state: ", update)
}
case <-testFailed:
panic("Test Failed, executor didn't yield when killed in prepare")
case <-ctx.Done():
t.Fatal("Context complete?")
}
}
}

func TestSendRedundantStatusMessage(t *testing.T) { // nolint: gocyclo
Expand Down Expand Up @@ -276,58 +401,3 @@ func drain(t *testing.T, e *Runner, taskLaunched chan struct{}) {
}
t.Log("Drain complete")
}

func (r *runtimeMock) Prepare(ctx context.Context, c *runtimeTypes.Container, bindMounts []string) error {
r.t.Log("runtimeMock.Prepare", c.TaskID)
return nil
}

func (r *runtimeMock) Start(ctx context.Context, c *runtimeTypes.Container) (string, *runtimeTypes.Details, <-chan runtimeTypes.StatusMessage, error) {
r.t.Log("runtimeMock.Start", c.TaskID)
r.mu.Lock()
defer r.mu.Unlock()
close(r.startCalled)
r.startCalled = make(chan<- struct{}) // reset subscription
details := &runtimeTypes.Details{
IPAddresses: make(map[string]string),
NetworkConfiguration: &runtimeTypes.NetworkConfigurationDetails{
IsRoutableIP: false,
},
}

status := runtimeTypes.StatusMessage{
Status: runtimeTypes.StatusRunning,
Msg: "running",
}

// We can do this because it's buffered.
r.statusChan <- status
return "", details, r.statusChan, nil
}

func (r *runtimeMock) Kill(c *runtimeTypes.Container) error {
logrus.Infof("runtimeMock.Kill (%v): %s", r.ctx, c.TaskID)
defer close(r.statusChan)
defer logrus.Info("runtimeMock.Killed: ", c.TaskID)
// send a kill request and wait for a grant
req := make(chan struct{}, 1)
select {
case r.kills <- req:
case <-r.ctx.Done():
logrus.Info("runtimeMock.Kill canceled")
return errors.New("runtimeMock.Kill canceled")
}
select {
case <-req:
case <-r.ctx.Done():
logrus.Info("runtimeMock.Kill canceled")

return errors.New("runtimeMock.Kill canceled")
}
return nil
}

func (r *runtimeMock) Cleanup(c *runtimeTypes.Container) error {
r.t.Log("runtimeMock.Cleanup", c.TaskID)
return nil
}