Skip to content
This repository has been archived by the owner on Jan 10, 2023. It is now read-only.

Lazily initialize gpuinfo #83

Merged
merged 2 commits into from
Apr 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 22 additions & 25 deletions executor/runtime/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
docker "github.com/docker/docker/client"
"github.com/docker/go-units"
"github.com/ftrvxmtrx/fd"
"github.com/hashicorp/go-multierror"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"golang.org/x/sys/unix"
Expand Down Expand Up @@ -182,7 +183,6 @@ type DockerRuntime struct { // nolint: golint
metrics metrics.Reporter
registryAuthCfg *types.AuthConfig
client *docker.Client
gpuInfo *nvidia.PluginInfo
awsRegion string
tiniSocketDir string
tiniEnabled bool
Expand Down Expand Up @@ -228,11 +228,6 @@ func NewDockerRuntime(executorCtx context.Context, m metrics.Reporter, cfg confi

dockerRuntime.awsRegion = os.Getenv("EC2_REGION")

dockerRuntime.gpuInfo, err = nvidia.NewNvidiaInfo(client)
if err != nil {
return nil, err
}

err = setupLoggingInfra(dockerRuntime)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1451,17 +1446,22 @@ func (r *DockerRuntime) setupGPU(c *runtimeTypes.Container, dockerCfg *container
if c.TitusInfo.GetNumGpus() <= 0 {
return nil
}

gpuInfo, err := nvidia.NewNvidiaInfo(r.client)
if err != nil {
return err
}
// Use nvidia volume plugin that will mount the appropriate
// libraries/binaries into the container based on host nvidia driver.

for _, volume := range r.gpuInfo.Volumes {
for _, volume := range gpuInfo.Volumes {
parts := strings.Split(volume, ":")
dockerCfg.Volumes[parts[1]] = struct{}{}
hostCfg.Binds = append(hostCfg.Binds, volume)
}

// Add control devices to container.
for _, ctrlDevice := range r.gpuInfo.GetCtrlDevices() {
for _, ctrlDevice := range gpuInfo.GetCtrlDevices() {
hostCfg.Devices = append(hostCfg.Devices, container.DeviceMapping{
PathOnHost: ctrlDevice,
PathInContainer: ctrlDevice,
Expand All @@ -1470,12 +1470,13 @@ func (r *DockerRuntime) setupGPU(c *runtimeTypes.Container, dockerCfg *container
}

// Allocate a specific GPU to add to the container
gpuDevicePaths, err := r.gpuInfo.AllocDevices(c.TaskID, int(c.TitusInfo.GetNumGpus()))
c.GPUInfo, err = gpuInfo.AllocDevices(int(c.TitusInfo.GetNumGpus()))
if err != nil {
return fmt.Errorf("Cannot allocate %d requested GPU device: %v", c.TitusInfo.GetNumGpus(), err)
}
log.Printf("Allocated %d GPU devices %s for task %s", c.TitusInfo.GetNumGpus(), gpuDevicePaths, c.TaskID)
for _, gpuDevicePath := range gpuDevicePaths {

log.Printf("Allocated %d GPU devices %s for task %s", c.TitusInfo.GetNumGpus(), c.GPUInfo, c.TaskID)
for _, gpuDevicePath := range c.GPUInfo.Devices() {
hostCfg.Devices = append(hostCfg.Devices, container.DeviceMapping{
PathOnHost: gpuDevicePath,
PathInContainer: gpuDevicePath,
Expand Down Expand Up @@ -1568,7 +1569,8 @@ func (r *DockerRuntime) dockerStatus(c *runtimeTypes.Container) (runtimeTypes.St
func (r *DockerRuntime) Kill(c *runtimeTypes.Container) error {
log.Infof("Killing %s", c.TaskID)

var errs []error
var errs *multierror.Error

containerStopTimeout := time.Second * time.Duration(c.TitusInfo.GetKillWaitSeconds())
if containerStopTimeout == 0 {
containerStopTimeout = defaultKillWait
Expand All @@ -1578,7 +1580,7 @@ func (r *DockerRuntime) Kill(c *runtimeTypes.Container) error {
goto stopped
} else if err != nil {
log.Error("Failed to inspect container: ", err)
errs = append(errs, err)
errs = multierror.Append(errs, err)
// There could be a race condition here, where if the container is killed before it is started, it could go into a wonky state
} else if !containerJSON.State.Running {
goto stopped
Expand All @@ -1587,15 +1589,15 @@ func (r *DockerRuntime) Kill(c *runtimeTypes.Container) error {
if err := r.client.ContainerStop(context.TODO(), c.ID, &containerStopTimeout); err != nil {
r.metrics.Counter("titus.executor.dockerStopContainerError", 1, nil)
log.Errorf("container %s : stop %v", c.TaskID, err)
errs = append(errs, err)
errs = multierror.Append(errs, err)
} else {
goto stopped
}

if err := r.client.ContainerKill(context.TODO(), c.ID, "SIGKILL"); err != nil {
r.metrics.Counter("titus.executor.dockerKillContainerError", 1, nil)
log.Errorf("container %s : kill %v", c.TaskID, err)
errs = append(errs, err)
errs = multierror.Append(errs, err)
}

stopped:
Expand All @@ -1615,17 +1617,12 @@ stopped:
log.WithField("taskId", c.TaskID).Info("No need to deallocate, no allocation command")
}

// Deallocate any GPU device assigned to the container
// Note: Since the executor doesn't persist task->GPU device mappings
// we expect the executor to remove existing containers on startup
// to make sure the allocated state is correct.
numDealloc := r.gpuInfo.DeallocDevice(c.TaskID)
log.Infof("Deallocated %d GPU devices for task %s", numDealloc, c.TaskID)

if len(errs) > 0 {
return &compositeError{errs}
if c.TitusInfo.GetNumGpus() > 0 {
numDealloc := c.GPUInfo.Deallocate()
log.Infof("Deallocated %d GPU devices for task %s", numDealloc, c.TaskID)
}
return nil

return errs.ErrorOrNil()
}

// Cleanup runs the registered callbacks for a container
Expand Down
9 changes: 9 additions & 0 deletions executor/runtime/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ func (e *InvalidSecurityGroupError) Error() string {
// CleanupFunc can be registered to be called on container teardown, errors are reported, but not acted upon
type CleanupFunc func() error

// GPUContainer manages the GPUs for a container, and frees them
type GPUContainer interface {
Devices() []string
Deallocate() int
}

// Container contains config state for a container.
// It is not safe to be used concurrently, synchronization and locking needs to be handled externally.
type Container struct { // nolint: maligned
Expand All @@ -81,6 +87,9 @@ type Container struct { // nolint: maligned
NormalizedENIIndex int
BandwidthLimitMbps uint32

// GPU devices
GPUInfo GPUContainer

AllocationCommand *exec.Cmd
SetupCommand *exec.Cmd

Expand Down
68 changes: 34 additions & 34 deletions nvidia/nvidia.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"math/rand"

"github.com/Netflix/titus-executor/executor/runtime/types"
"github.com/Netflix/titus-executor/fslocker"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
Expand Down Expand Up @@ -66,13 +67,12 @@ type gpuInfo struct {

// PluginInfo represents host NVIDIA GPU info
type PluginInfo struct {
ctrlDevices []string
nvidiaDevices []string
dockerClient *docker.Client
mutex sync.Mutex
fsLocker *fslocker.FSLocker
Volumes []string
perTaskAllocatedDevices map[string]map[string]*fslocker.ExclusiveLock
ctrlDevices []string
nvidiaDevices []string
dockerClient *docker.Client
mutex sync.Mutex
fsLocker *fslocker.FSLocker
Volumes []string
}

type nvidiaDockerCli struct {
Expand All @@ -87,7 +87,6 @@ func NewNvidiaInfo(client *docker.Client) (*PluginInfo, error) {
n.ctrlDevices = make([]string, 0)
n.nvidiaDevices = make([]string, 0)
n.dockerClient = client
n.perTaskAllocatedDevices = make(map[string]map[string]*fslocker.ExclusiveLock)

return n, n.initHostGpuInfo()
}
Expand Down Expand Up @@ -275,56 +274,37 @@ func iterOverDevices(devices []string) chan string {
// AllocDevices allocates GPU device names from the free device list for the given task ID.
// Returns an error if no devices are available. If an error is returned,
// the allocation change must not be applied.
func (n *PluginInfo) AllocDevices(taskID string, numDevs int) ([]string, error) {
func (n *PluginInfo) AllocDevices(numDevs int) (types.GPUContainer, error) {
var lock *fslocker.ExclusiveLock
var err error
zeroTimeout := time.Duration(0)
devices := make([]string, numDevs)
i := 0

n.mutex.Lock()
defer n.mutex.Unlock()

n.perTaskAllocatedDevices[taskID] = make(map[string]*fslocker.ExclusiveLock)
allocatedDevices := make(map[string]*fslocker.ExclusiveLock, numDevs)
for device := range iterOverDevices(n.nvidiaDevices) {
lock, err = n.fsLocker.ExclusiveLock(device, &zeroTimeout)
if err == nil && lock != nil {
n.perTaskAllocatedDevices[taskID][device] = lock
allocatedDevices[device] = lock
}
if len(n.perTaskAllocatedDevices[taskID]) == numDevs {
if len(allocatedDevices) == numDevs {
goto success
}
}
err = fmt.Errorf("Unable able to allocate %d GPU devices. Not enough free GPU devices available", numDevs)
goto fail

success:
for dev := range n.perTaskAllocatedDevices[taskID] {
devices[i] = dev
i++
}
return devices, nil
return &nvidiaGPUContainer{allocatedDevices: allocatedDevices}, nil

fail:
// Deallocate devices
for _, lock := range n.perTaskAllocatedDevices[taskID] {
lock.Unlock()
}
delete(n.perTaskAllocatedDevices, taskID)

return []string{}, err
}

// DeallocDevice deallocate a task's device.
func (n *PluginInfo) DeallocDevice(taskID string) int {
i := 0
for _, lock := range n.perTaskAllocatedDevices[taskID] {
for _, lock := range allocatedDevices {
lock.Unlock()
i++
}
delete(n.perTaskAllocatedDevices, taskID)

return i
return nil, err
}

// GetCtrlDevices returns the control devices.
Expand All @@ -334,3 +314,23 @@ func (n *PluginInfo) GetCtrlDevices() []string {

return n.ctrlDevices
}

type nvidiaGPUContainer struct {
allocatedDevices map[string]*fslocker.ExclusiveLock
}

func (c *nvidiaGPUContainer) Devices() []string {
devices := make([]string, 0, len(c.allocatedDevices))
for key := range c.allocatedDevices {
devices = append(devices, key)
}
return devices
}

func (c *nvidiaGPUContainer) Deallocate() int {
allocatedCount := len(c.allocatedDevices)
for _, lock := range c.allocatedDevices {
lock.Unlock()
}
return allocatedCount
}
Loading