Skip to content
This repository has been archived by the owner on Jan 10, 2023. It is now read-only.

Commit

Permalink
Start refactoring the executor to further split up Docker and runtime…
Browse files Browse the repository at this point in the history
… bits
  • Loading branch information
sargun committed Feb 6, 2018
1 parent 365728f commit 3b2fba8
Show file tree
Hide file tree
Showing 16 changed files with 367 additions and 414 deletions.
4 changes: 2 additions & 2 deletions executor/drivers/mesos/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/Netflix/metrics-client-go/metrics"
titusproto "github.com/Netflix/titus-executor/api/netflix/titus"
"github.com/Netflix/titus-executor/executor/drivers"
"github.com/Netflix/titus-executor/executor/runtime"
runtimeTypes "github.com/Netflix/titus-executor/executor/runtime/types"
protobuf "github.com/golang/protobuf/proto"
"github.com/mesos/mesos-go/executor"
"github.com/mesos/mesos-go/mesosproto"
Expand Down Expand Up @@ -116,7 +116,7 @@ func (e *titusMesosExecutor) Error(exec executor.ExecutorDriver, err string) {
}

// ReportTitusTaskStatus notifies Mesos of a change in task state.
func (e *titusMesosExecutor) ReportTitusTaskStatus(taskID string, msg string, state titusdriver.TitusTaskState, details *runtime.Details) {
func (e *titusMesosExecutor) ReportTitusTaskStatus(taskID string, msg string, state titusdriver.TitusTaskState, details *runtimeTypes.Details) {
e.mu.Lock()
defer e.mu.Unlock()

Expand Down
6 changes: 3 additions & 3 deletions executor/drivers/testdriver/testdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"time"

"github.com/Netflix/titus-executor/executor/drivers"
"github.com/Netflix/titus-executor/executor/runtime"
runtimeTypes "github.com/Netflix/titus-executor/executor/runtime/types"
log "github.com/sirupsen/logrus"
)

Expand All @@ -13,7 +13,7 @@ type TaskStatus struct {
TaskID string
Status string
Msg string
Details *runtime.Details
Details *runtimeTypes.Details
Timestamp time.Time
}

Expand All @@ -33,7 +33,7 @@ func New(executor titusdriver.TitusExecutor) (*TitusTestDriver, error) {
}

// ReportTitusTaskStatus notifies a test via a channel about a task's state
func (driver *TitusTestDriver) ReportTitusTaskStatus(taskID string, msg string, state titusdriver.TitusTaskState, details *runtime.Details) {
func (driver *TitusTestDriver) ReportTitusTaskStatus(taskID string, msg string, state titusdriver.TitusTaskState, details *runtimeTypes.Details) {
log.Printf("Sending task status for task %s, state %s, and message %s", taskID, state.String(), msg)
driver.StatusChannel <- TaskStatus{TaskID: taskID, Status: state.String(), Msg: msg, Details: details, Timestamp: time.Now()}
}
4 changes: 2 additions & 2 deletions executor/drivers/titusdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"strconv"

"github.com/Netflix/titus-executor/api/netflix/titus"
"github.com/Netflix/titus-executor/executor/runtime"
runtimeTypes "github.com/Netflix/titus-executor/executor/runtime/types"
)

// TitusTaskState represents the current state of a task
Expand Down Expand Up @@ -43,7 +43,7 @@ func (s TitusTaskState) String() string {
type TitusDriver interface {
// ReportTitusTaskStatus is a callback function to notify the driver
// of a change in task state.
ReportTitusTaskStatus(taskID string, msg string, state TitusTaskState, details *runtime.Details)
ReportTitusTaskStatus(taskID string, msg string, state TitusTaskState, details *runtimeTypes.Details)
}

// TitusExecutor is the interface implemented by a generic Titus executor.
Expand Down
30 changes: 16 additions & 14 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/Netflix/titus-executor/executor/launchguard"
"github.com/Netflix/titus-executor/executor/metatron"
"github.com/Netflix/titus-executor/executor/runtime"
"github.com/Netflix/titus-executor/executor/runtime/docker"
runtimeTypes "github.com/Netflix/titus-executor/executor/runtime/types"
"github.com/Netflix/titus-executor/filesystems"
"github.com/Netflix/titus-executor/models"
"github.com/Netflix/titus-executor/uploader"
Expand All @@ -31,11 +33,11 @@ type update struct {
TaskID string
State titusdriver.TitusTaskState
Mesg string
Details *runtime.Details
Details *runtimeTypes.Details
ce launchguard.CleanUpEvent
}

func (u update) withDetails(details *runtime.Details) update {
func (u update) withDetails(details *runtimeTypes.Details) update {
u.Details = details
return u
}
Expand All @@ -53,7 +55,7 @@ func newUpdate(taskID string, state titusdriver.TitusTaskState, mesg string) upd

type containerState struct {
sync.Mutex
*runtime.Container
*runtimeTypes.Container
isKilled bool
watcher *filesystems.Watcher

Expand All @@ -75,7 +77,7 @@ func (c *containerState) cancel() {
type Executor struct {
metrics metrics.Reporter
titusDriver titusdriver.TitusDriver
runtime runtime.Runtime
runtime runtimeTypes.Runtime
logUploaders *uploader.Uploaders

sync.RWMutex
Expand All @@ -94,12 +96,12 @@ type Executor struct {
}

// RuntimeProvider is a factory function for runtime implementations. It is called only once by WithRuntime
type RuntimeProvider func(context.Context) (runtime.Runtime, error)
type RuntimeProvider func(context.Context) (runtimeTypes.Runtime, error)

// New constructs a new Executor object with the default (docker) runtime
func New(m metrics.Reporter, logUploaders *uploader.Uploaders) (*Executor, error) {
dockerRuntime := func(ctx context.Context) (runtime.Runtime, error) {
return runtime.NewDockerRuntime(ctx, m)
dockerRuntime := func(ctx context.Context) (runtimeTypes.Runtime, error) {
return docker.NewDockerRuntime(ctx, m)
}
return WithRuntime(m, dockerRuntime, logUploaders)
}
Expand Down Expand Up @@ -168,15 +170,15 @@ func (e *Executor) runCheck(taskID string) bool {
}

switch status {
case runtime.StatusRunning:
case runtimeTypes.StatusRunning:
// no need to update the status if task is running
c.logEntry.Debug("running")
return false
case runtime.StatusFinished:
case runtimeTypes.StatusFinished:
c.logEntry.Info("finished")
e.update <- newUpdate(taskID, titusdriver.Finished, "finished")
return true
case runtime.StatusFailed:
case runtimeTypes.StatusFailed:
c.logEntry.Info("failed")
e.update <- newUpdate(taskID, titusdriver.Failed, err.Error())
return true
Expand Down Expand Up @@ -249,7 +251,7 @@ func (e *Executor) Start() {

// setupMetatron returns a Docker formatted string bind mount for a container for a directory that will contain
// TODO(fabio): create a type for Binds
func (e *Executor) setupMetatron(c *runtime.Container) (*metatron.CredentialsConfig, error) {
func (e *Executor) setupMetatron(c *runtimeTypes.Container) (*metatron.CredentialsConfig, error) {
if config.DevWorkspace().MockMetatronCreds {
// Make up some creds for local testing
testAppMetadata := "type=titus&version=1&app=myApp&stack=myStack&imageName=myImage&imageVersion=latest&entry=myEntryPoint&t=1481328000"
Expand Down Expand Up @@ -401,7 +403,7 @@ func (e *Executor) StartTask(taskID string, titusInfo *titus.ContainerInfo, mem

c := &containerState{
Container: runtime.NewContainer(taskID, titusInfo,
&runtime.Resources{
&runtimeTypes.Resources{
Mem: mem,
CPU: cpu,
Disk: disk,
Expand Down Expand Up @@ -488,7 +490,7 @@ func (e *Executor) startContainer(c *containerState, startTime time.Time) { // n
c.logEntry.Errorf("task %s: failed to create container %s", c.Container.TaskID, err)
// Treat registry pull errors as LOST and non-existent images as FAILED.
switch err.(type) {
case *runtime.RegistryImageNotFoundError, *runtime.InvalidSecurityGroupError, *runtime.BadEntryPointError:
case *runtimeTypes.RegistryImageNotFoundError, *runtimeTypes.InvalidSecurityGroupError, *runtimeTypes.BadEntryPointError:
c.logEntry.Errorf("Returning TASK_FAILED for task %s : %v", c.Container.TaskID, err)
e.update <- newUpdate(c.Container.TaskID, titusdriver.Failed, err.Error())
default:
Expand All @@ -504,7 +506,7 @@ func (e *Executor) startContainer(c *containerState, startTime time.Time) { // n
c.logEntry.Printf("task %s : start container %s", c.Container.TaskID, err)

switch err.(type) {
case *runtime.BadEntryPointError:
case *runtimeTypes.BadEntryPointError:
c.logEntry.Printf("Returning TaskState_TASK_FAILED for task %s : %v", c.Container.TaskID, err)
e.update <- newUpdate(c.Container.TaskID, titusdriver.Failed, err.Error())
default:
Expand Down
25 changes: 13 additions & 12 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
"github.com/Netflix/titus-executor/api/netflix/titus"
"github.com/Netflix/titus-executor/config"
"github.com/Netflix/titus-executor/executor/drivers/testdriver"
titusruntime "github.com/Netflix/titus-executor/executor/runtime"
runtimeTypes "github.com/Netflix/titus-executor/executor/runtime/types"

"github.com/Netflix/titus-executor/uploader"
)

var (
_ titusruntime.Runtime = (*runtimeMock)(nil)
_ runtimeTypes.Runtime = (*runtimeMock)(nil)
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -216,7 +217,7 @@ func mocks(t *testing.T, killRequests chan<- chan<- struct{}, sub <-chan subscri
kills: killRequests,
}
l := uploader.NewUploadersFromUploaderArray([]uploader.Uploader{&uploader.NoopUploader{}})
e, err := WithRuntime(metrics.Discard, func(ctx context.Context) (titusruntime.Runtime, error) {
e, err := WithRuntime(metrics.Discard, func(ctx context.Context) (runtimeTypes.Runtime, error) {
r.ctx = ctx
return r, nil
}, l)
Expand Down Expand Up @@ -272,12 +273,12 @@ func notifyAll(subscriptions map[string][]chan<- struct{}, taskID string) {
delete(subscriptions, taskID)
}

func (r *runtimeMock) Prepare(ctx context.Context, c *titusruntime.Container, bindMounts []string) error {
func (r *runtimeMock) Prepare(ctx context.Context, c *runtimeTypes.Container, bindMounts []string) error {
r.t.Log("runtimeMock.Prepare", c.TaskID)
return nil
}

func (r *runtimeMock) Start(ctx context.Context, c *titusruntime.Container) (string, error) {
func (r *runtimeMock) Start(ctx context.Context, c *runtimeTypes.Container) (string, error) {
r.t.Log("runtimeMock.Start", c.TaskID)
r.mu.Lock()
defer r.mu.Unlock()
Expand All @@ -292,7 +293,7 @@ func (r *runtimeMock) notifyStartCalled(notify chan struct{}) {
r.startCalled = notify
}

func (r *runtimeMock) Kill(c *titusruntime.Container) error {
func (r *runtimeMock) Kill(c *runtimeTypes.Container) error {
r.t.Log("runtimeMock.Kill", c.TaskID)
// send a kill request and wait for a grant
req := make(chan struct{}, 1)
Expand All @@ -309,23 +310,23 @@ func (r *runtimeMock) Kill(c *titusruntime.Container) error {
return nil
}

func (r *runtimeMock) Cleanup(c *titusruntime.Container) error {
func (r *runtimeMock) Cleanup(c *runtimeTypes.Container) error {
r.t.Log("runtimeMock.Cleanup", c.TaskID)
return nil
}

func (r *runtimeMock) Details(c *titusruntime.Container) (*titusruntime.Details, error) {
func (r *runtimeMock) Details(c *runtimeTypes.Container) (*runtimeTypes.Details, error) {
r.t.Log("runtimeMock.Details", c.TaskID)
return &titusruntime.Details{
return &runtimeTypes.Details{
IPAddresses: make(map[string]string),
NetworkConfiguration: &titusruntime.NetworkConfigurationDetails{
NetworkConfiguration: &runtimeTypes.NetworkConfigurationDetails{
IsRoutableIP: false,
},
}, nil
}

func (r *runtimeMock) Status(c *titusruntime.Container) (titusruntime.Status, error) {
func (r *runtimeMock) Status(c *runtimeTypes.Container) (runtimeTypes.Status, error) {
r.t.Log("runtimeMock.Status", c.TaskID)
// always running is fine for these tests
return titusruntime.StatusRunning, nil
return runtimeTypes.StatusRunning, nil
}
Loading

0 comments on commit 3b2fba8

Please sign in to comment.