Skip to content

Commit

Permalink
LL: support schedule workers on multi-nodes
Browse files Browse the repository at this point in the history
worker's spec supports nodeName and nodeSelector

Signed-off-by: JimmyYang20 <[email protected]>
  • Loading branch information
JimmyYang20 committed May 24, 2022
1 parent 45e868b commit 104943d
Show file tree
Hide file tree
Showing 3 changed files with 599 additions and 381 deletions.
99 changes: 91 additions & 8 deletions pkg/globalmanager/controllers/lifelonglearning/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ limitations under the License.
package lifelonglearning

import (
"fmt"
"context"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"

sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
"github.com/kubeedge/sedna/pkg/globalmanager/runtime"
Expand All @@ -36,17 +39,97 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro
// more details at https://github.com/kubernetes/kubernetes/issues/3030
job.Kind = KindName

// Here only propagate to the nodes with non empty name
dataName := job.Spec.Dataset.Name
// LC has dataset object on this node that may call dataset node
var dsNodeName string
ds, err := c.client.Datasets(job.Namespace).Get(context.TODO(), dataName, metav1.GetOptions{})
if err != nil {
klog.Errorf("not found job(name=%s/%s)'s dataset, error: %v", job.Kind, job.Name, err)
} else {
dsNodeName = ds.Spec.NodeName
}

var trainNodeName string
var evalNodeName string
var deployNodeName string

// FIXME(llhuii): only the case that all workers having the same nodeName are support,
// will support Spec.NodeSelector and different nodeName.
nodeName := job.Spec.TrainSpec.Template.Spec.NodeName
if len(nodeName) == 0 {
return fmt.Errorf("empty node name")
getAnnotationsNodeName := func(nodeName sednav1.LLJobStage) string {
return runtime.AnnotationsKeyPrefix + string(nodeName)
}
ann := job.GetAnnotations()
if ann != nil {
trainNodeName = ann[getAnnotationsNodeName(sednav1.LLJobTrain)]
evalNodeName = ann[getAnnotationsNodeName(sednav1.LLJobEval)]
deployNodeName = ann[getAnnotationsNodeName(sednav1.LLJobDeploy)]
}

if eventType == watch.Deleted {
// delete jobs from all LCs
nodes := sets.NewString(dsNodeName, trainNodeName, evalNodeName, deployNodeName)

for node := range nodes {
c.sendToEdgeFunc(node, eventType, job)
}

return nil
}

if dsNodeName == "" {
return nil
}

jobConditions := job.Status.Conditions
if len(jobConditions) == 0 {
return nil
}

latestCondition := jobConditions[len(jobConditions)-1]
currentType := latestCondition.Type
jobStage := latestCondition.Stage

syncJobWithNodeName := func(nodeName string) {
if err := c.sendToEdgeFunc(nodeName, eventType, job); err != nil {
klog.Warningf("Error to sync lifelong learning job %s to node %s in stage %s: %v",
job.Name, nodeName, jobStage, err)
}
}

runtime.InjectSecretAnnotations(c.kubeClient, job, job.Spec.CredentialName)
return c.sendToEdgeFunc(nodeName, eventType, job)

// isJobResidentNode checks whether nodeName is a job resident node
isJobResidentNode := func(nodeName string) bool {
// the node where LC monitors dataset and the node where inference worker is running are job resident node
if nodeName == dsNodeName || nodeName == deployNodeName {
return true
}
return false
}

doJobStageEvent := func(nodeName string) {
switch currentType {
case sednav1.LLJobStageCondWaiting:
syncJobWithNodeName(dsNodeName)
case sednav1.LLJobStageCondRunning:
syncJobWithNodeName(nodeName)
case sednav1.LLJobStageCondCompleted, sednav1.LLJobStageCondFailed:
if !isJobResidentNode(nodeName) {
// delete LC's job from nodeName that's different from dataset node when worker's status
// is completed or failed.
c.sendToEdgeFunc(nodeName, watch.Deleted, job)
}
}
}

switch jobStage {
case sednav1.LLJobTrain:
doJobStageEvent(trainNodeName)
case sednav1.LLJobEval:
doJobStageEvent(evalNodeName)
case sednav1.LLJobDeploy:
doJobStageEvent(deployNodeName)
}

return nil
}

func (c *Controller) SetDownstreamSendFunc(f runtime.DownstreamSendFunc) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package lifelonglearning

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilrand "k8s.io/apimachinery/pkg/util/rand"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -294,25 +296,68 @@ func (c *Controller) sync(key string) (bool, error) {
return forget, err
}

// setWorkerNodeNameOfJob sets the worker nodeName of the specified job
// which is used for downstream to sync job info to the specified LC located in nodeName.
func (c *Controller) setWorkerNodeNameOfJob(job *sednav1.LifelongLearningJob, jobStage string, nodeName string) error {
key := runtime.AnnotationsKeyPrefix + jobStage

return c.addJobAnnotations(job, key, nodeName)
}

// addJobAnnotations adds info in job annotations
func (c *Controller) addJobAnnotations(job *sednav1.LifelongLearningJob, key string, value string) error {
ann := job.GetAnnotations()
if ann[key] == value {
// already set
return nil
}

patchData := metav1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{key: value}}}

patchDataBytes, err := json.Marshal(&patchData)
if err != nil {
return err
}

jobClient := c.client.LifelongLearningJobs(job.Namespace)
return runtime.RetryUpdateStatus(job.Name, job.Namespace, func() error {
newJob, err := jobClient.Get(context.TODO(), job.Name, metav1.GetOptions{})
if err != nil {
return err
}

annotations := newJob.GetAnnotations()
if annotations[key] == value {
return nil
}

_, err = jobClient.Patch(context.TODO(), job.Name, types.MergePatchType, patchDataBytes, metav1.PatchOptions{})
return err
})
}

// transitJobState transit job to next state
func (c *Controller) transitJobState(job *sednav1.LifelongLearningJob) (bool, error) {
var initialType sednav1.LLJobStageConditionType
var latestCondition = sednav1.LLJobCondition{
var latestCondition sednav1.LLJobCondition = sednav1.LLJobCondition{
Stage: sednav1.LLJobTrain,
Type: initialType,
}

var newConditionType sednav1.LLJobStageConditionType
var needUpdated = false

var podStatus = v1.PodUnknown
var podStatus v1.PodPhase = v1.PodUnknown
var pod *v1.Pod

jobConditions := job.Status.Conditions
if len(jobConditions) > 0 {
// get latest pod and pod status
latestCondition = (jobConditions)[len(jobConditions)-1]
klog.V(2).Infof("lifelonglearning job %v/%v latest stage %v:", job.Namespace, job.Name,
latestCondition.Stage)
pod := c.getSpecifiedPods(job, string(latestCondition.Stage))
pod = c.getSpecifiedPods(job, string(latestCondition.Stage))

if pod != nil {
podStatus = pod.Status.Phase
Expand All @@ -337,25 +382,30 @@ func (c *Controller) transitJobState(job *sednav1.LifelongLearningJob) (bool, er
err = c.restartInferPod(job)
if err != nil {
klog.V(2).Infof("lifelonglearning job %v/%v inference pod failed to restart, err:%s", job.Namespace, job.Name, err)
} else {
klog.V(2).Infof("lifelonglearning job %v/%v inference pod restarts successfully", job.Namespace, job.Name)
return needUpdated, err
}
} else if podStatus != v1.PodPending && podStatus != v1.PodRunning {
err = c.createPod(job, jobStage)
}
if err != nil {
return needUpdated, err

klog.V(2).Infof("lifelonglearning job %v/%v inference pod restarts successfully", job.Namespace, job.Name)
newConditionType = sednav1.LLJobStageCondCompleted
} else {
if podStatus != v1.PodPending && podStatus != v1.PodRunning {
err = c.createPod(job, jobStage)
if err != nil {
return needUpdated, err
}
}
newConditionType = sednav1.LLJobStageCondStarting
}
newConditionType = sednav1.LLJobStageCondStarting

case sednav1.LLJobStageCondStarting, sednav1.LLJobStageCondRunning:
if podStatus == v1.PodRunning {
if jobStage == sednav1.LLJobDeploy {
newConditionType = sednav1.LLJobStageCondCompleted
} else {
// watch pod status, if pod running, set type running
newConditionType = sednav1.LLJobStageCondRunning
// add nodeName to job
if err := c.setWorkerNodeNameOfJob(job, string(jobStage), pod.Spec.NodeName); err != nil {
return needUpdated, err
}

// watch pod status, if pod running, set type running
newConditionType = sednav1.LLJobStageCondRunning
} else if podStatus == v1.PodSucceeded {
// watch pod status, if pod completed, set type completed
newConditionType = sednav1.LLJobStageCondCompleted
Expand Down Expand Up @@ -541,7 +591,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1
originalDataURLOrIndex = dataset.Spec.URL
}

var workerParam = new(runtime.WorkerParam)
var workerParam *runtime.WorkerParam = new(runtime.WorkerParam)
if podtype == sednav1.LLJobTrain {
workerParam.WorkerType = "Train"

Expand Down Expand Up @@ -672,7 +722,7 @@ func (c *Controller) createInferPod(job *sednav1.LifelongLearningJob) error {
return err
}

var workerParam = new(runtime.WorkerParam)
var workerParam *runtime.WorkerParam = new(runtime.WorkerParam)
workerParam.Mounts = append(workerParam.Mounts,
runtime.WorkerMount{
URL: &runtime.MountURL{
Expand Down
Loading

0 comments on commit 104943d

Please sign in to comment.