Skip to content
This repository has been archived by the owner on Jan 10, 2023. It is now read-only.

Commit

Permalink
Lazily initialize gpuinfo
Browse files Browse the repository at this point in the history
  • Loading branch information
sargun committed Apr 14, 2018
1 parent 4f581eb commit 4e5e02a
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 59 deletions.
47 changes: 22 additions & 25 deletions executor/runtime/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
docker "github.com/docker/docker/client"
"github.com/docker/go-units"
"github.com/ftrvxmtrx/fd"
"github.com/hashicorp/go-multierror"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"golang.org/x/sys/unix"
Expand Down Expand Up @@ -182,7 +183,6 @@ type DockerRuntime struct { // nolint: golint
metrics metrics.Reporter
registryAuthCfg *types.AuthConfig
client *docker.Client
gpuInfo *nvidia.PluginInfo
awsRegion string
tiniSocketDir string
tiniEnabled bool
Expand Down Expand Up @@ -228,11 +228,6 @@ func NewDockerRuntime(executorCtx context.Context, m metrics.Reporter, cfg confi

dockerRuntime.awsRegion = os.Getenv("EC2_REGION")

dockerRuntime.gpuInfo, err = nvidia.NewNvidiaInfo(client)
if err != nil {
return nil, err
}

err = setupLoggingInfra(dockerRuntime)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1451,17 +1446,22 @@ func (r *DockerRuntime) setupGPU(c *runtimeTypes.Container, dockerCfg *container
if c.TitusInfo.GetNumGpus() <= 0 {
return nil
}

gpuInfo, err := nvidia.NewNvidiaInfo(r.client)
if err != nil {
return err
}
// Use nvidia volume plugin that will mount the appropriate
// libraries/binaries into the container based on host nvidia driver.

for _, volume := range r.gpuInfo.Volumes {
for _, volume := range gpuInfo.Volumes {
parts := strings.Split(volume, ":")
dockerCfg.Volumes[parts[1]] = struct{}{}
hostCfg.Binds = append(hostCfg.Binds, volume)
}

// Add control devices to container.
for _, ctrlDevice := range r.gpuInfo.GetCtrlDevices() {
for _, ctrlDevice := range gpuInfo.GetCtrlDevices() {
hostCfg.Devices = append(hostCfg.Devices, container.DeviceMapping{
PathOnHost: ctrlDevice,
PathInContainer: ctrlDevice,
Expand All @@ -1470,12 +1470,13 @@ func (r *DockerRuntime) setupGPU(c *runtimeTypes.Container, dockerCfg *container
}

// Allocate a specific GPU to add to the container
gpuDevicePaths, err := r.gpuInfo.AllocDevices(c.TaskID, int(c.TitusInfo.GetNumGpus()))
c.GPUInfo, err = gpuInfo.AllocDevices(int(c.TitusInfo.GetNumGpus()))
if err != nil {
return fmt.Errorf("Cannot allocate %d requested GPU device: %v", c.TitusInfo.GetNumGpus(), err)
}
log.Printf("Allocated %d GPU devices %s for task %s", c.TitusInfo.GetNumGpus(), gpuDevicePaths, c.TaskID)
for _, gpuDevicePath := range gpuDevicePaths {

log.Printf("Allocated %d GPU devices %s for task %s", c.TitusInfo.GetNumGpus(), c.GPUInfo, c.TaskID)
for _, gpuDevicePath := range c.GPUInfo.Devices() {
hostCfg.Devices = append(hostCfg.Devices, container.DeviceMapping{
PathOnHost: gpuDevicePath,
PathInContainer: gpuDevicePath,
Expand Down Expand Up @@ -1568,7 +1569,8 @@ func (r *DockerRuntime) dockerStatus(c *runtimeTypes.Container) (runtimeTypes.St
func (r *DockerRuntime) Kill(c *runtimeTypes.Container) error {
log.Infof("Killing %s", c.TaskID)

var errs []error
var errs *multierror.Error

containerStopTimeout := time.Second * time.Duration(c.TitusInfo.GetKillWaitSeconds())
if containerStopTimeout == 0 {
containerStopTimeout = defaultKillWait
Expand All @@ -1578,7 +1580,7 @@ func (r *DockerRuntime) Kill(c *runtimeTypes.Container) error {
goto stopped
} else if err != nil {
log.Error("Failed to inspect container: ", err)
errs = append(errs, err)
errs = multierror.Append(errs, err)
// There could be a race condition here, where if the container is killed before it is started, it could go into a wonky state
} else if !containerJSON.State.Running {
goto stopped
Expand All @@ -1587,15 +1589,15 @@ func (r *DockerRuntime) Kill(c *runtimeTypes.Container) error {
if err := r.client.ContainerStop(context.TODO(), c.ID, &containerStopTimeout); err != nil {
r.metrics.Counter("titus.executor.dockerStopContainerError", 1, nil)
log.Errorf("container %s : stop %v", c.TaskID, err)
errs = append(errs, err)
errs = multierror.Append(errs, err)
} else {
goto stopped
}

if err := r.client.ContainerKill(context.TODO(), c.ID, "SIGKILL"); err != nil {
r.metrics.Counter("titus.executor.dockerKillContainerError", 1, nil)
log.Errorf("container %s : kill %v", c.TaskID, err)
errs = append(errs, err)
errs = multierror.Append(errs, err)
}

stopped:
Expand All @@ -1615,17 +1617,12 @@ stopped:
log.WithField("taskId", c.TaskID).Info("No need to deallocate, no allocation command")
}

// Deallocate any GPU device assigned to the container
// Note: Since the executor doesn't persist task->GPU device mappings
// we expect the executor to remove existing containers on startup
// to make sure the allocated state is correct.
numDealloc := r.gpuInfo.DeallocDevice(c.TaskID)
log.Infof("Deallocated %d GPU devices for task %s", numDealloc, c.TaskID)

if len(errs) > 0 {
return &compositeError{errs}
if c.TitusInfo.GetNumGpus() > 0 {
numDealloc := c.GPUInfo.Deallocate()
log.Infof("Deallocated %d GPU devices for task %s", numDealloc, c.TaskID)
}
return nil

return errs.ErrorOrNil()
}

// Cleanup runs the registered callbacks for a container
Expand Down
9 changes: 9 additions & 0 deletions executor/runtime/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ func (e *InvalidSecurityGroupError) Error() string {
// CleanupFunc can be registered to be called on container teardown, errors are reported, but not acted upon
type CleanupFunc func() error

// GPUContainer manages the GPUs for a container, and frees them
type GPUContainer interface {
Devices() []string
Deallocate() int
}

// Container contains config state for a container.
// It is not safe to be used concurrently, synchronization and locking needs to be handled externally.
type Container struct { // nolint: maligned
Expand All @@ -81,6 +87,9 @@ type Container struct { // nolint: maligned
NormalizedENIIndex int
BandwidthLimitMbps uint32

// GPU devices
GPUInfo GPUContainer

AllocationCommand *exec.Cmd
SetupCommand *exec.Cmd

Expand Down
68 changes: 34 additions & 34 deletions nvidia/nvidia.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"math/rand"

"github.com/Netflix/titus-executor/executor/runtime/types"
"github.com/Netflix/titus-executor/fslocker"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
Expand Down Expand Up @@ -66,13 +67,12 @@ type gpuInfo struct {

// PluginInfo represents host NVIDIA GPU info
type PluginInfo struct {
ctrlDevices []string
nvidiaDevices []string
dockerClient *docker.Client
mutex sync.Mutex
fsLocker *fslocker.FSLocker
Volumes []string
perTaskAllocatedDevices map[string]map[string]*fslocker.ExclusiveLock
ctrlDevices []string
nvidiaDevices []string
dockerClient *docker.Client
mutex sync.Mutex
fsLocker *fslocker.FSLocker
Volumes []string
}

type nvidiaDockerCli struct {
Expand All @@ -87,7 +87,6 @@ func NewNvidiaInfo(client *docker.Client) (*PluginInfo, error) {
n.ctrlDevices = make([]string, 0)
n.nvidiaDevices = make([]string, 0)
n.dockerClient = client
n.perTaskAllocatedDevices = make(map[string]map[string]*fslocker.ExclusiveLock)

return n, n.initHostGpuInfo()
}
Expand Down Expand Up @@ -275,56 +274,37 @@ func iterOverDevices(devices []string) chan string {
// AllocDevices allocates GPU device names from the free device list for the given task ID.
// Returns an error if no devices are available. If an error is returned,
// the allocation change must not be applied.
func (n *PluginInfo) AllocDevices(taskID string, numDevs int) ([]string, error) {
func (n *PluginInfo) AllocDevices(numDevs int) (types.GPUContainer, error) {
var lock *fslocker.ExclusiveLock
var err error
zeroTimeout := time.Duration(0)
devices := make([]string, numDevs)
i := 0

n.mutex.Lock()
defer n.mutex.Unlock()

n.perTaskAllocatedDevices[taskID] = make(map[string]*fslocker.ExclusiveLock)
allocatedDevices := make(map[string]*fslocker.ExclusiveLock, numDevs)
for device := range iterOverDevices(n.nvidiaDevices) {
lock, err = n.fsLocker.ExclusiveLock(device, &zeroTimeout)
if err == nil && lock != nil {
n.perTaskAllocatedDevices[taskID][device] = lock
allocatedDevices[device] = lock
}
if len(n.perTaskAllocatedDevices[taskID]) == numDevs {
if len(allocatedDevices) == numDevs {
goto success
}
}
err = fmt.Errorf("Unable able to allocate %d GPU devices. Not enough free GPU devices available", numDevs)
goto fail

success:
for dev := range n.perTaskAllocatedDevices[taskID] {
devices[i] = dev
i++
}
return devices, nil
return &nvidiaGPUContainer{allocatedDevices: allocatedDevices}, nil

fail:
// Deallocate devices
for _, lock := range n.perTaskAllocatedDevices[taskID] {
lock.Unlock()
}
delete(n.perTaskAllocatedDevices, taskID)

return []string{}, err
}

// DeallocDevice deallocate a task's device.
func (n *PluginInfo) DeallocDevice(taskID string) int {
i := 0
for _, lock := range n.perTaskAllocatedDevices[taskID] {
for _, lock := range allocatedDevices {
lock.Unlock()
i++
}
delete(n.perTaskAllocatedDevices, taskID)

return i
return nil, err
}

// GetCtrlDevices returns the control devices.
Expand All @@ -334,3 +314,23 @@ func (n *PluginInfo) GetCtrlDevices() []string {

return n.ctrlDevices
}

type nvidiaGPUContainer struct {
allocatedDevices map[string]*fslocker.ExclusiveLock
}

func (c *nvidiaGPUContainer) Devices() []string {
devices := make([]string, 0, len(c.allocatedDevices))
for key := range c.allocatedDevices {
devices = append(devices, key)
}
return devices
}

func (c *nvidiaGPUContainer) Deallocate() int {
allocatedCount := len(c.allocatedDevices)
for _, lock := range c.allocatedDevices {
lock.Unlock()
}
return allocatedCount
}

0 comments on commit 4e5e02a

Please sign in to comment.