Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Multiple Separate TPU Worker Groups per RayCluster #467

Merged
merged 5 commits into from
Apr 5, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Support for multiple seperate TPU workergroups per RayCluster
  • Loading branch information
ryanaoleary committed Apr 3, 2024
commit 5b3411999423b0b4a84f17610550ae53ced752d4
98 changes: 53 additions & 45 deletions applications/ray/kuberay-tpu-webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func containerRequestingTPUs(containers ...corev1.Container) bool {
return false
}

func getNumTPUHostsFromTopology(clusterName string, namespace string, topology string, acceleratorType string) (int32, error) {
func getNumTPUHostsFromTopology(clusterName string, groupName string, namespace string, topology string, acceleratorType string) (int32, error) {
if topology == "" {
return 0, errors.New("TPU topology not specified")
}
Expand All @@ -86,7 +86,7 @@ func getNumTPUHostsFromTopology(clusterName string, namespace string, topology s
for i := 0; i < len(topologyVals); i++ {
dim, err := strconv.Atoi(topologyVals[i])
if err != nil {
klog.ErrorS(err, "RayCluster", namespace+"/"+clusterName, "gke-tpu-topology", topology)
klog.ErrorS(err, "RayCluster", namespace+"/"+clusterName, "Worker Group", groupName, "gke-tpu-topology", topology)
return 0, err
}
chips *= dim
Expand All @@ -98,19 +98,19 @@ func getNumTPUHostsFromTopology(clusterName string, namespace string, topology s
// v5e TPU VMs can have 1, 4 or 8 chips
chipsPerHost, err := strconv.Atoi(acceleratorTypeValues[1])
if err != nil {
klog.ErrorS(err, "RayCluster", namespace+"/"+clusterName, "gke-tpu-accelerator", acceleratorType)
klog.ErrorS(err, "RayCluster", namespace+"/"+clusterName, "Worker Group", groupName, "gke-tpu-accelerator", acceleratorType)
return 0, err
}
chipsPerHost = min(chipsPerHost, 8) // max of 8 chips per host
}
hosts := int32(max(chips/chipsPerHost, 1))
klog.V(1).InfoS("getNumTPUHostsFromTopology", "RayCluster", namespace+"/"+clusterName, "hosts", hosts)
klog.V(1).InfoS("getNumTPUHostsFromTopology", "RayCluster", namespace+"/"+clusterName, "Worker Group", groupName, "hosts", hosts)
return hosts, nil
}

// check if request is for TPU multi-host
func isTPUMultiHost(clusterName string, namespace string, topology string, acceleratorType string) (bool, error) {
vms, err := getNumTPUHostsFromTopology(clusterName, namespace, topology, acceleratorType)
func isTPUMultiHost(clusterName string, groupName string, namespace string, topology string, acceleratorType string) (bool, error) {
vms, err := getNumTPUHostsFromTopology(clusterName, groupName, namespace, topology, acceleratorType)
if err != nil {
return false, err
}
Expand All @@ -133,7 +133,7 @@ func extractRayCluster(admissionReview *admissionv1.AdmissionReview) (*ray.RayCl
return &rayCluster, nil
}

func genDNSHostnames(workerGroupSpec ray.WorkerGroupSpec, replicaIndex int) (string, error) {
func genDNSHostnames(workerGroupSpec ray.WorkerGroupSpec, clusterName string, namespace string, replicaIndex int) (string, error) {
numHosts := workerGroupSpec.NumOfHosts
if numHosts == 0 {
return "", errors.New("workerGroupSpec NumOfHosts not set")
Expand All @@ -144,6 +144,7 @@ func genDNSHostnames(workerGroupSpec ray.WorkerGroupSpec, replicaIndex int) (str
for j := 0; j < int(numHosts); j++ {
hostNames[j] = fmt.Sprintf("%s-%d-%d.%s", workerGroupName, replicaIndex, j, headlessServiceName)
}
klog.V(1).InfoS("genDNSHostnames", "RayCluster", namespace+"/"+clusterName, "NumOfHosts", numHosts, "Replica Index", replicaIndex)
return strings.Join(hostNames, ","), nil
}

Expand Down Expand Up @@ -218,6 +219,7 @@ func checkWorkersMatchTopology(clusterName string, namespace string, workerGroup
if numHosts == 0 {
return false, errors.New("workerGroupSpec NumOfHosts not set")
}
groupName := workerGroupSpec.GroupName
containers := workerGroupSpec.Template.Spec.Containers
if containers == nil {
return false, errors.New("Container path not specified")
Expand All @@ -227,12 +229,12 @@ func checkWorkersMatchTopology(clusterName string, namespace string, workerGroup
acceleratorType := workerGroupSpec.Template.Spec.NodeSelector["cloud.google.com/gke-tpu-accelerator"]
klog.V(1).InfoS("checkWorkersMatchTopology", "RayCluster", namespace+"/"+clusterName, "topology", topology, "AcceleratorType", acceleratorType, "NumOfHosts", numHosts)
if topology == "" {
klog.ErrorS(errors.New("TPU topology not specified"), "RayCluster", namespace+"/"+clusterName, "gke-tpu-topology", topology)
klog.ErrorS(errors.New("TPU topology not specified"), "checkWorkersMatchTopology", "RayCluster", namespace+"/"+clusterName, "gke-tpu-topology", topology)
}
if acceleratorType == "" {
klog.ErrorS(errors.New("TPU accelerator not specified"), "RayCluster", namespace+"/"+clusterName, "gke-tpu-accelerator", acceleratorType)
klog.ErrorS(errors.New("TPU accelerator not specified"), "checkWorkersMatchTopology", "RayCluster", namespace+"/"+clusterName, "gke-tpu-accelerator", acceleratorType)
}
expectedHosts, err := getNumTPUHostsFromTopology(clusterName, namespace, topology, acceleratorType)
expectedHosts, err := getNumTPUHostsFromTopology(clusterName, groupName, namespace, topology, acceleratorType)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -263,22 +265,24 @@ func validateRayCluster(admissionReview *admissionv1.AdmissionReview) (*admissio
}
for i := 0; i < len(workerGroupSpecs); i++ {
workerGroupSpec := workerGroupSpecs[i]
// create mapping for pod slices -> TPU_WORKER_HOSTNAMES in cluster
replicas := int(*workerGroupSpec.Replicas)
numOfHosts := workerGroupSpec.NumOfHosts
for replicaIndex := 0; replicaIndex < replicas; replicaIndex++ {
// reset past sliceToWorkers and sliceToHostnames entries for slice in ray cluster
groupName := workerGroupSpec.GroupName
podSlice := slice{clusterName, groupName, replicaIndex, numOfHosts}
sliceToWorkers[podSlice] = nil
sliceToHostnames[podSlice] = ""
// generate TPU_WORKER_HOSTNAMES
if numOfHosts > 1 {
joinedHostNames, err := genDNSHostnames(workerGroupSpec, replicaIndex)
if err != nil {
klog.Error("Failed to generate DNS Hostnames")
if containerRequestingTPUs(workerGroupSpec.Template.Spec.Containers...) {
// create mapping for pod slices -> TPU_WORKER_HOSTNAMES in cluster
replicas := int(*workerGroupSpec.Replicas)
numOfHosts := workerGroupSpec.NumOfHosts
for replicaIndex := 0; replicaIndex < replicas; replicaIndex++ {
// reset past sliceToWorkers and sliceToHostnames entries for slice in ray cluster
groupName := workerGroupSpec.GroupName
podSlice := slice{clusterName, groupName, replicaIndex, numOfHosts}
sliceToWorkers[podSlice] = nil
sliceToHostnames[podSlice] = ""
// generate TPU_WORKER_HOSTNAMES
if numOfHosts > 1 {
joinedHostNames, err := genDNSHostnames(workerGroupSpec, clusterName, namespace, replicaIndex)
if err != nil {
klog.Error("Failed to generate DNS Hostnames")
}
sliceToHostnames[podSlice] = joinedHostNames
}
sliceToHostnames[podSlice] = joinedHostNames
}
}
// validate NumOfHosts for worker group matches topology nodeSelector
Expand All @@ -291,8 +295,8 @@ func validateRayCluster(admissionReview *admissionv1.AdmissionReview) (*admissio
admit = false
status = "Failure"
message = "Number of workers in worker group not equal to specified topology"
break
}
break
}

// Create AdmissionResponse
Expand Down Expand Up @@ -320,13 +324,16 @@ func getEnvironmentVariable(varName string, container corev1.Container) string {

// get next lowest-index pod slice to assign a pod to in the RayCluster
// this will be the first pod slice with # created pods < NumOfHosts
func getReplicaIndex(clusterName string, namespace string) int {
func getReplicaIndex(clusterName string, groupName string, namespace string) int {
// first pod created in cluster
if sliceToWorkers == nil {
return 0
}
nextLowestId := math.MaxInt32
numSlices := 0 // tracks # of slices in worker group created so far
for slice, workerList := range sliceToWorkers {
if slice.clusterName == clusterName {
if slice.clusterName == clusterName && slice.groupName == groupName {
numSlices++
createdPods := 0
for _, worker := range workerList {
if worker.isCreated {
Expand All @@ -340,10 +347,11 @@ func getReplicaIndex(clusterName string, namespace string) int {
}
}
}
// first pod of new slice in cluster
if nextLowestId == math.MaxInt32 {
klog.ErrorS(errors.New("Replica Index never set"), "RayCluster", namespace+"/"+clusterName, "Replica Index", nextLowestId)
nextLowestId = numSlices
}
klog.V(0).InfoS("getReplicaIndex", "RayCluster", namespace+"/"+clusterName, "Replica Index", nextLowestId)
klog.V(0).InfoS("getReplicaIndex", "RayCluster", namespace+"/"+clusterName, "Worker Group", groupName, "Replica Index", nextLowestId)
return nextLowestId
}

Expand Down Expand Up @@ -379,7 +387,7 @@ func getNextWorkerID(podSlice slice, namespace string, replicaIndex int) int {
}
tpuWorkerID = nextLowestID
}
klog.V(0).InfoS("getNextWorkerID", "RayCluster", namespace+"/"+podSlice.clusterName, "TPU_WORKER_ID", tpuWorkerID)
klog.V(0).InfoS("getNextWorkerID", "RayCluster", namespace+"/"+podSlice.clusterName, "Worker Group", podSlice.groupName, "TPU_WORKER_ID", tpuWorkerID)
return tpuWorkerID
}

Expand Down Expand Up @@ -417,31 +425,31 @@ func mutatePod(admissionReview *admissionv1.AdmissionReview) (*admissionv1.Admis
if clusterName == "" {
return nil, errors.New("Kuberay Pod missing RayCluster label")
}
namespace := pod.Namespace
groupName := pod.Labels["ray.io/group"]
topology := pod.Spec.NodeSelector["cloud.google.com/gke-tpu-topology"]
acceleratorType := pod.Spec.NodeSelector["cloud.google.com/gke-tpu-accelerator"]
if topology == "" {
klog.ErrorS(errors.New("TPU topology not specified"), "RayCluster", namespace+"/"+clusterName, "gke-tpu-topology", topology)
}
if acceleratorType == "" {
klog.ErrorS(errors.New("TPU accelerator not specified"), "RayCluster", namespace+"/"+clusterName, "gke-tpu-accelerator", acceleratorType)
}
containers := pod.Spec.Containers
if containers == nil {
return nil, errors.New("Container path not specified")
}
if containerRequestingTPUs(containers...) {
namespace := pod.Namespace
groupName := pod.Labels["ray.io/group"]
topology := pod.Spec.NodeSelector["cloud.google.com/gke-tpu-topology"]
acceleratorType := pod.Spec.NodeSelector["cloud.google.com/gke-tpu-accelerator"]
if topology == "" {
klog.ErrorS(errors.New("TPU topology not specified"), "mutatePod", "RayCluster", namespace+"/"+clusterName, "gke-tpu-topology", topology)
}
if acceleratorType == "" {
klog.ErrorS(errors.New("TPU accelerator not specified"), "mutatePod", "RayCluster", namespace+"/"+clusterName, "gke-tpu-accelerator", acceleratorType)
}
// assign worker to the next unique ID in the pod slice and update map
numOfHosts, _ := getNumTPUHostsFromTopology(clusterName, namespace, topology, acceleratorType) // ignore error here because topology may not be set yet
replicaIndex := getReplicaIndex(clusterName, namespace)
numOfHosts, _ := getNumTPUHostsFromTopology(clusterName, groupName, namespace, topology, acceleratorType) // ignore error here because topology may not be set yet
replicaIndex := getReplicaIndex(clusterName, groupName, namespace)
podSlice := slice{clusterName, groupName, replicaIndex, numOfHosts}
tpuWorkerID := getNextWorkerID(podSlice, namespace, replicaIndex) // defaults to 0 for single-host

// inject replica index label
injectReplicaLabel(clusterName, namespace, replicaIndex, groupName, &patches)

isMultiHost, _ := isTPUMultiHost(clusterName, namespace, topology, acceleratorType) // ignore error here because topology may not be set yet
isMultiHost, _ := isTPUMultiHost(clusterName, groupName, namespace, topology, acceleratorType) // ignore error here because topology may not be set yet
if isMultiHost {
// inject hostname into pod spec for DNS records
hostname := fmt.Sprintf(groupName+"-%d-%d", replicaIndex, tpuWorkerID)
Expand Down Expand Up @@ -545,7 +553,7 @@ func deletePod(admissionReview *admissionv1.AdmissionReview) (*admissionv1.Admis
if replicaIndexLabel != "" {
replicaIndexLabelValues := strings.Split(replicaIndexLabel, "-")
replicaIndex, _ := strconv.Atoi(replicaIndexLabelValues[1]) // ignore error here since must be set

containers := pod.Spec.Containers
if containers == nil {
return nil, errors.New("Pod spec missing containers")
Expand Down