From a4b89b6445256fa9512081bd72caee5eee06711a Mon Sep 17 00:00:00 2001 From: JimmyYang20 Date: Sun, 20 Mar 2022 23:22:35 +0800 Subject: [PATCH] LL: support schedule workers on multi-nodes worker's spec supports nodeName and nodeSelector Signed-off-by: JimmyYang20 --- .../lifelonglearning/downstream.go | 97 ++- .../lifelonglearning/lifelonglearningjob.go | 86 +- .../lifelonglearning/lifelonglearningjob.go | 769 ++++++++++-------- 3 files changed, 585 insertions(+), 367 deletions(-) diff --git a/pkg/globalmanager/controllers/lifelonglearning/downstream.go b/pkg/globalmanager/controllers/lifelonglearning/downstream.go index 652bc79b1..eee7a306c 100644 --- a/pkg/globalmanager/controllers/lifelonglearning/downstream.go +++ b/pkg/globalmanager/controllers/lifelonglearning/downstream.go @@ -17,9 +17,12 @@ limitations under the License. package lifelonglearning import ( - "fmt" + "context" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" + "k8s.io/klog/v2" sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1" "github.com/kubeedge/sedna/pkg/globalmanager/runtime" @@ -36,17 +39,95 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro // more details at https://github.com/kubernetes/kubernetes/issues/3030 job.Kind = KindName - // Here only propagate to the nodes with non empty name + dataName := job.Spec.Dataset.Name + // LC has dataset object on this node that may call dataset node + var dsNodeName string + ds, err := c.client.Datasets(job.Namespace).Get(context.TODO(), dataName, metav1.GetOptions{}) + if err != nil { + klog.Errorf("not found job(name=%s/%s)'s dataset, error: %v", job.Kind, job.Name, err) + } else { + dsNodeName = ds.Spec.NodeName + } + + var trainNodeName string + var evalNodeName string + var deployNodeName string - // FIXME(llhuii): only the case that all workers having the same nodeName are support, - // will support Spec.NodeSelector and different nodeName. - nodeName := job.Spec.TrainSpec.Template.Spec.NodeName - if len(nodeName) == 0 { - return fmt.Errorf("empty node name") + getAnnotationsNodeName := func(nodeName sednav1.LLJobStage) string { + return runtime.AnnotationsKeyPrefix + string(nodeName) + } + ann := job.GetAnnotations() + if ann != nil { + trainNodeName = ann[getAnnotationsNodeName(sednav1.LLJobTrain)] + evalNodeName = ann[getAnnotationsNodeName(sednav1.LLJobEval)] + deployNodeName = ann[getAnnotationsNodeName(sednav1.LLJobDeploy)] + } + + if eventType == watch.Deleted { + // delete jobs from all LCs + nodes := sets.NewString(dsNodeName, trainNodeName, evalNodeName, deployNodeName) + + for node := range nodes { + c.sendToEdgeFunc(node, eventType, job) + } + + return nil + } + + if dsNodeName == "" { + return nil + } + + jobConditions := job.Status.Conditions + if len(jobConditions) == 0 { + return nil + } + + latestCondition := jobConditions[len(jobConditions)-1] + currentType := latestCondition.Type + jobStage := latestCondition.Stage + + syncJobWithNodeName := func(nodeName string) { + if err := c.sendToEdgeFunc(nodeName, eventType, job); err != nil { + klog.Warningf("Error to sync lifelong learning job %s to node %s in stage %s: %v", + job.Name, nodeName, jobStage, err) + } } runtime.InjectSecretAnnotations(c.kubeClient, job, job.Spec.CredentialName) - return c.sendToEdgeFunc(nodeName, eventType, job) + + // isJobResidentNode checks whether nodeName is a job resident node + isJobResidentNode := func(nodeName string) bool { + // the node where LC monitors dataset and the node where inference worker is running are job resident node + if nodeName == dsNodeName || nodeName == deployNodeName { + return true + } + return false + } + + doJobStageEvent := func(nodeName string) { + if currentType == sednav1.LLJobStageCondWaiting { + syncJobWithNodeName(dsNodeName) + } else if currentType == sednav1.LLJobStageCondRunning { + syncJobWithNodeName(nodeName) + } else if currentType == sednav1.LLJobStageCondCompleted || currentType == sednav1.LLJobStageCondFailed { + if !isJobResidentNode(nodeName) { + // delete LC's job from nodeName that's different from dataset node when worker's status is completed or failed. + c.sendToEdgeFunc(nodeName, watch.Deleted, job) + } + } + } + + switch jobStage { + case sednav1.LLJobTrain: + doJobStageEvent(trainNodeName) + case sednav1.LLJobEval: + doJobStageEvent(evalNodeName) + case sednav1.LLJobDeploy: + doJobStageEvent(deployNodeName) + } + + return nil } func (c *Controller) SetDownstreamSendFunc(f runtime.DownstreamSendFunc) error { diff --git a/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go b/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go index 4c3a2f690..b2ed1e1c8 100644 --- a/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go +++ b/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go @@ -18,7 +18,9 @@ package lifelonglearning import ( "context" + "encoding/json" "fmt" + "k8s.io/apimachinery/pkg/types" "strings" "time" @@ -294,10 +296,51 @@ func (c *Controller) sync(key string) (bool, error) { return forget, err } +// setWorkerNodeNameOfJob sets the worker nodeName of the specified job +// which is used for downstream to sync job info to the specified LC located in nodeName. +func (c *Controller) setWorkerNodeNameOfJob(job *sednav1.LifelongLearningJob, jobStage string, nodeName string) error { + key := runtime.AnnotationsKeyPrefix + jobStage + + return c.addJobAnnotations(job, key, nodeName) +} + +// addJobAnnotations adds info in job annotations +func (c *Controller) addJobAnnotations(job *sednav1.LifelongLearningJob, key string, value string) error { + ann := job.GetAnnotations() + if ann[key] == value { + // already set + return nil + } + + patchData := metav1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{key: value}}} + + patchDataBytes, err := json.Marshal(&patchData) + if err != nil { + return err + } + + jobClient := c.client.LifelongLearningJobs(job.Namespace) + return runtime.RetryUpdateStatus(job.Name, job.Namespace, func() error { + newJob, err := jobClient.Get(context.TODO(), job.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + annotations := newJob.GetAnnotations() + if annotations[key] == value { + return nil + } + + _, err = jobClient.Patch(context.TODO(), job.Name, types.MergePatchType, patchDataBytes, metav1.PatchOptions{}) + return err + }) +} + // transitJobState transit job to next state func (c *Controller) transitJobState(job *sednav1.LifelongLearningJob) (bool, error) { var initialType sednav1.LLJobStageConditionType - var latestCondition = sednav1.LLJobCondition{ + var latestCondition sednav1.LLJobCondition = sednav1.LLJobCondition{ Stage: sednav1.LLJobTrain, Type: initialType, } @@ -305,14 +348,16 @@ func (c *Controller) transitJobState(job *sednav1.LifelongLearningJob) (bool, er var newConditionType sednav1.LLJobStageConditionType var needUpdated = false - var podStatus = v1.PodUnknown + var podStatus v1.PodPhase = v1.PodUnknown + var pod *v1.Pod + jobConditions := job.Status.Conditions if len(jobConditions) > 0 { // get latest pod and pod status latestCondition = (jobConditions)[len(jobConditions)-1] klog.V(2).Infof("lifelonglearning job %v/%v latest stage %v:", job.Namespace, job.Name, latestCondition.Stage) - pod := c.getSpecifiedPods(job, string(latestCondition.Stage)) + pod = c.getSpecifiedPods(job, string(latestCondition.Stage)) if pod != nil { podStatus = pod.Status.Phase @@ -337,25 +382,30 @@ func (c *Controller) transitJobState(job *sednav1.LifelongLearningJob) (bool, er err = c.restartInferPod(job) if err != nil { klog.V(2).Infof("lifelonglearning job %v/%v inference pod failed to restart, err:%s", job.Namespace, job.Name, err) - } else { - klog.V(2).Infof("lifelonglearning job %v/%v inference pod restarts successfully", job.Namespace, job.Name) + return needUpdated, err } - } else if podStatus != v1.PodPending && podStatus != v1.PodRunning { - err = c.createPod(job, jobStage) - } - if err != nil { - return needUpdated, err + + klog.V(2).Infof("lifelonglearning job %v/%v inference pod restarts successfully", job.Namespace, job.Name) + newConditionType = sednav1.LLJobStageCondCompleted + } else { + if podStatus != v1.PodPending && podStatus != v1.PodRunning { + err = c.createPod(job, jobStage) + if err != nil { + return needUpdated, err + } + } + newConditionType = sednav1.LLJobStageCondStarting } - newConditionType = sednav1.LLJobStageCondStarting case sednav1.LLJobStageCondStarting, sednav1.LLJobStageCondRunning: if podStatus == v1.PodRunning { - if jobStage == sednav1.LLJobDeploy { - newConditionType = sednav1.LLJobStageCondCompleted - } else { - // watch pod status, if pod running, set type running - newConditionType = sednav1.LLJobStageCondRunning + // add nodeName to job + if err := c.setWorkerNodeNameOfJob(job, string(jobStage), pod.Spec.NodeName); err != nil { + return needUpdated, err } + + // watch pod status, if pod running, set type running + newConditionType = sednav1.LLJobStageCondRunning } else if podStatus == v1.PodSucceeded { // watch pod status, if pod completed, set type completed newConditionType = sednav1.LLJobStageCondCompleted @@ -541,7 +591,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 originalDataURLOrIndex = dataset.Spec.URL } - var workerParam = new(runtime.WorkerParam) + var workerParam *runtime.WorkerParam = new(runtime.WorkerParam) if podtype == sednav1.LLJobTrain { workerParam.WorkerType = "Train" @@ -672,7 +722,7 @@ func (c *Controller) createInferPod(job *sednav1.LifelongLearningJob) error { return err } - var workerParam = new(runtime.WorkerParam) + var workerParam *runtime.WorkerParam = new(runtime.WorkerParam) workerParam.Mounts = append(workerParam.Mounts, runtime.WorkerMount{ URL: &runtime.MountURL{ diff --git a/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go b/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go index 114ccff0e..60b7b7f2e 100644 --- a/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go +++ b/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go @@ -27,10 +27,12 @@ import ( "sync" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "github.com/kubeedge/sedna/cmd/sedna-lc/app/options" sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1" + gmtypes "github.com/kubeedge/sedna/pkg/globalmanager/controllers/lifelonglearning" "github.com/kubeedge/sedna/pkg/globalmanager/runtime" "github.com/kubeedge/sedna/pkg/localcontroller/db" clienttypes "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" @@ -39,23 +41,27 @@ import ( "github.com/kubeedge/sedna/pkg/localcontroller/trigger" "github.com/kubeedge/sedna/pkg/localcontroller/util" workertypes "github.com/kubeedge/sedna/pkg/localcontroller/worker" + "github.com/microcosm-cc/bluemonday" ) const ( + // JobIterationIntervalSeconds is interval time of each iteration of job + JobIterationIntervalSeconds = 10 + // DatasetHandlerIntervalSeconds is interval time of handling dataset + DatasetHandlerIntervalSeconds = 10 + // EvalSamplesCapacity is capacity of eval samples + EvalSamplesCapacity = 5 //KindName is kind of lifelong-learning-job resource KindName = "lifelonglearningjob" - // TrainPhase is the train phase - TrainPhase = "train" - // EvalPhase is the eval phase - EvalPhase = "eval" - // DeployPhase is the deploy phase - DeployPhase = "deploy" - // TriggerReadyStatus is the ready status about trigger TriggerReadyStatus = "ready" // TriggerCompletedStatus is the completed status about trigger TriggerCompletedStatus = "completed" + + AnnotationsRoundsKey = "sedna.io/rounds" + AnnotationsNumberOfSamplesKey = "sedna.io/number-of-samples" + AnnotationsDataFileOfEvalKey = "sedna.io/data-file-of-eval" ) // LifelongLearningJobManager defines lifelong-learning-job Manager @@ -70,58 +76,47 @@ type Manager struct { // LifelongLearningJob defines config for lifelong-learning-job type Job struct { sednav1.LifelongLearningJob - Dataset *dataset.Dataset - Done chan struct{} - Storage storage.Storage - JobConfig *LLJobConfig -} - -// LLJobConfig defines config for lifelong-learning-job -type LLJobConfig struct { - UniqueIdentifier string - Version int - Phase string - WorkerStatus string - TrainTrigger trigger.Base - TriggerStatus string - TriggerTime time.Time - TrainDataURL string - EvalDataURL string - OutputDir string - OutputConfig *LLOutputConfig - DataSamples *LLDataSamples - TrainModel *Model - DeployModel *Model - EvalResult *Model - Lock sync.Mutex + JobConfig *JobConfig +} + +// JobConfig defines config for lifelong-learning-job +type JobConfig struct { + UniqueIdentifier string + Rounds int + TrainTrigger trigger.Base + TriggerTime time.Time + TrainTriggerStatus string + EvalTriggerStatus string + DeployTriggerStatus string + TrainDataURL string + EvalDataURL string + OutputDir string + OutputConfig *OutputConfig + DataSamples *DataSamples + DeployModel *Model + Lock sync.Mutex + Dataset *dataset.Dataset + Storage storage.Storage + Done chan struct{} } type Model = clienttypes.Model -// LLOutputConfig defines config for job output -type LLOutputConfig struct { - SamplesOutput map[string]string - TrainOutput string - EvalOutput string +// OutputConfig defines config for job output +type OutputConfig struct { + SamplesOutput map[string]string `json:"trainData"` + TrainOutput string `json:"trainOutput"` + EvalOutput string `json:"evalOutput"` } -// LLDataSamples defines samples information -type LLDataSamples struct { - Numbers int +// DataSamples defines samples information +type DataSamples struct { + PreviousNumbers int TrainSamples []string EvalVersionSamples [][]string EvalSamples []string } -const ( - // LLJobIterationIntervalSeconds is interval time of each iteration of job - LLJobIterationIntervalSeconds = 10 - // LLHandlerDataIntervalSeconds is interval time of handling dataset - LLHandlerDataIntervalSeconds = 10 - // LLLLEvalSamplesCapacity is capacity of eval samples - LLEvalSamplesCapacity = 5 -) - // New creates a lifelong-learning-job manager func New(client clienttypes.ClientI, datasetManager *dataset.Manager, options *options.LocalControllerOptions) *Manager { lm := Manager{ @@ -137,14 +132,14 @@ func New(client clienttypes.ClientI, datasetManager *dataset.Manager, options *o // Insert inserts lifelong-learning-job config to db func (lm *Manager) Insert(message *clienttypes.Message) error { - name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind) + p := bluemonday.NewPolicy() + name := p.Sanitize(util.GetUniqueIdentifier(message.Header.Namespace, + message.Header.ResourceName, message.Header.ResourceKind)) first := false job, ok := lm.LifelongLearningJobMap[name] if !ok { job = &Job{} - job.Storage = storage.Storage{IsLocalStorage: false} - job.Done = make(chan struct{}) lm.LifelongLearningJobMap[name] = job first = true } @@ -153,21 +148,14 @@ func (lm *Manager) Insert(message *clienttypes.Message) error { return err } - credential := job.ObjectMeta.Annotations[runtime.SecretAnnotationKey] - if credential != "" { - if err := job.Storage.SetCredential(credential); err != nil { - return fmt.Errorf("failed to set job(name=%s)'s storage credential, error: %+v", name, err) - } + if err := db.SaveResource(name, job.TypeMeta, job.ObjectMeta, job.Spec); err != nil { + return err } if first { go lm.startJob(name) } - if err := db.SaveResource(name, job.TypeMeta, job.ObjectMeta, job.Spec); err != nil { - return err - } - return nil } @@ -179,51 +167,43 @@ func (lm *Manager) startJob(name string) { return } - job.JobConfig = new(LLJobConfig) - jobConfig := job.JobConfig - jobConfig.UniqueIdentifier = name - - err = lm.initJob(job) - if err != nil { - klog.Errorf("failed to init job (name=%s): %+v", jobConfig.UniqueIdentifier) + if err = lm.initJob(job, name); err != nil { + klog.Errorf("failed to init job(%s): %+v", name) return } - klog.Infof("lifelong learning job(name=%s) is started", name) - defer klog.Infof("lifelong learning job(name=%s) is stopped", name) + klog.Infof("lifelong learning job(%s) is started", name) + defer klog.Infof("lifelong learning job(%s) is stopped", name) + + // handle data from dataset go lm.handleData(job) - tick := time.NewTicker(LLJobIterationIntervalSeconds * time.Second) + tick := time.NewTicker(JobIterationIntervalSeconds * time.Second) for { select { - case <-job.Done: + case <-job.JobConfig.Done: return default: } - if job.Dataset == nil { - klog.V(3).Infof("job(name=%s) dataset not ready", - jobConfig.UniqueIdentifier) - - <-tick.C - continue - } + cond := lm.getLatestCondition(job) + jobStage := cond.Stage - switch jobConfig.Phase { - case TrainPhase: + switch jobStage { + case sednav1.LLJobTrain: err = lm.trainTask(job) - case EvalPhase: + case sednav1.LLJobEval: err = lm.evalTask(job) - case DeployPhase: + + case sednav1.LLJobDeploy: err = lm.deployTask(job) default: - klog.Errorf("invalid phase: %s", jobConfig.Phase) + klog.Errorf("invalid phase: %s", jobStage) continue } if err != nil { - klog.Errorf("job(name=%s) complete the %s task failed, error: %v", - jobConfig.UniqueIdentifier, jobConfig.Phase, err) + klog.Errorf("job(%s) failed to complete the %s task: %v", name, jobStage, err) } <-tick.C @@ -234,40 +214,56 @@ func (lm *Manager) startJob(name string) { func (lm *Manager) trainTask(job *Job) error { jobConfig := job.JobConfig - if jobConfig.WorkerStatus == workertypes.ReadyStatus && jobConfig.TriggerStatus == TriggerReadyStatus { - payload, ok, err := lm.triggerTrainTask(job) - if !ok { - return nil - } + latestCond := lm.getLatestCondition(job) + jobStage := latestCond.Stage + currentType := latestCond.Type - if err != nil { - klog.Errorf("job(name=%s) complete the %sing phase triggering task failed, error: %v", - jobConfig.UniqueIdentifier, jobConfig.Phase, err) - return err + if currentType == sednav1.LLJobStageCondWaiting { + err := lm.loadDataset(job) + if err != nil || jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil { + return fmt.Errorf("job(%s) failed to load dataset, and waiting it: %w", + jobConfig.UniqueIdentifier, err) } - err = lm.Client.WriteMessage(payload, job.getHeader()) - if err != nil { - klog.Errorf("job(name=%s) failed to write message: %v", - jobConfig.UniqueIdentifier, err) - return err + if jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil { + return fmt.Errorf("job(%s)'s dataset not ready", jobConfig.UniqueIdentifier) } - jobConfig.TriggerStatus = TriggerCompletedStatus + initTriggerStatus(jobConfig) - klog.Infof("job(name=%s) complete the %sing phase triggering task successfully", - jobConfig.UniqueIdentifier, jobConfig.Phase) - } + if jobConfig.TrainTriggerStatus == TriggerReadyStatus { + payload, ok, err := lm.triggerTrainTask(job) + if !ok { + return nil + } - if jobConfig.WorkerStatus == workertypes.FailedStatus { - klog.Warningf("found the %sing phase worker that ran failed, "+ - "back the training phase triggering task", jobConfig.Phase) - backLLTaskStatus(jobConfig) - } + if err != nil { + klog.Errorf("job(%s) failed to complete the %sing phase triggering task: %v", + jobConfig.UniqueIdentifier, jobStage, err) + job.JobConfig.Rounds-- + return err + } + + err = lm.Client.WriteMessage(payload, job.getHeader()) + if err != nil { + klog.Errorf("job(%s) failed to write message: %v", jobConfig.UniqueIdentifier, err) + job.JobConfig.Rounds-- + return err + } + + forwardSamples(jobConfig, jobStage) + + err = lm.saveJobToDB(job) + if err != nil { + klog.Errorf("job(%s) failed to save job to db: %v", + jobConfig.UniqueIdentifier, err) + // continue anyway + } - if jobConfig.WorkerStatus == workertypes.CompletedStatus { - klog.Infof("job(name=%s) complete the %s task successfully", jobConfig.UniqueIdentifier, jobConfig.Phase) - nextLLTask(jobConfig) + jobConfig.TrainTriggerStatus = TriggerCompletedStatus + klog.Infof("job(name=%s) complete the %sing phase triggering task successfully", + jobConfig.UniqueIdentifier, jobStage) + } } return nil @@ -277,35 +273,37 @@ func (lm *Manager) trainTask(job *Job) error { func (lm *Manager) evalTask(job *Job) error { jobConfig := job.JobConfig - if jobConfig.WorkerStatus == workertypes.ReadyStatus && jobConfig.TriggerStatus == TriggerReadyStatus { - payload, err := lm.triggerEvalTask(job) - if err != nil { - klog.Errorf("job(name=%s) complete the %sing phase triggering task failed, error: %v", - jobConfig.UniqueIdentifier, jobConfig.Phase, err) - return err - } + latestCond := lm.getLatestCondition(job) + jobStage := latestCond.Stage + currentType := latestCond.Type - err = lm.Client.WriteMessage(payload, job.getHeader()) - if err != nil { - return err + if currentType == sednav1.LLJobStageCondWaiting { + err := lm.loadDataset(job) + if err != nil || jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil { + return fmt.Errorf("job(%s) failed to load dataset, and waiting it: %w", + jobConfig.UniqueIdentifier, err) } - jobConfig.TriggerStatus = TriggerCompletedStatus + if jobConfig.EvalTriggerStatus == TriggerReadyStatus { + payload, err := lm.triggerEvalTask(job) + if err != nil { + klog.Errorf("job(%s) completed the %sing phase triggering task failed: %v", + jobConfig.UniqueIdentifier, jobStage, err) + return err + } - klog.Infof("job(name=%s) complete the %sing phase triggering task successfully", - jobConfig.UniqueIdentifier, jobConfig.Phase) - } + err = lm.Client.WriteMessage(payload, job.getHeader()) + if err != nil { + klog.Errorf("job(%s) failed to write message: %v", jobConfig.UniqueIdentifier, err) + return err + } - if jobConfig.WorkerStatus == workertypes.FailedStatus { - msg := fmt.Sprintf("job(name=%s) found the %sing phase worker that ran failed, "+ - "back the training phase triggering task", jobConfig.UniqueIdentifier, jobConfig.Phase) - klog.Errorf(msg) - return fmt.Errorf(msg) - } + forwardSamples(jobConfig, jobStage) - if jobConfig.WorkerStatus == workertypes.CompletedStatus { - klog.Infof("job(name=%s) complete the %s task successfully", jobConfig.UniqueIdentifier, jobConfig.Phase) - nextLLTask(jobConfig) + jobConfig.EvalTriggerStatus = TriggerCompletedStatus + klog.Infof("job(%s) completed the %sing phase triggering task successfully", + jobConfig.UniqueIdentifier, jobStage) + } } return nil @@ -313,39 +311,37 @@ func (lm *Manager) evalTask(job *Job) error { // deployTask starts deploy task func (lm *Manager) deployTask(job *Job) error { - jobConfig := job.JobConfig + if job.JobConfig.DeployTriggerStatus == TriggerReadyStatus { + jobConfig := job.JobConfig + var err error + + status := clienttypes.UpstreamMessage{Phase: string(sednav1.LLJobDeploy)} + models := lm.getJobStageModel(job, sednav1.LLJobDeploy) + if models != nil { + err = lm.updateDeployModelFile(job, models[0].URL, jobConfig.DeployModel.URL) + if err != nil { + status.Status = string(sednav1.LLJobStageCondFailed) + klog.Errorf("failed to update model for job(%s): %v", jobConfig.UniqueIdentifier, err) + return err + } - if jobConfig.WorkerStatus == workertypes.ReadyStatus && jobConfig.TriggerStatus == TriggerReadyStatus { - status := clienttypes.UpstreamMessage{} - status.Phase = DeployPhase - deployModel, err := lm.deployModel(job) - if err != nil { - klog.Errorf("failed to deploy model for job(name=%s): %v", jobConfig.UniqueIdentifier, err) - } else { - klog.Infof("deployed model for job(name=%s) successfully", jobConfig.UniqueIdentifier) - } - if err != nil || deployModel == nil { - status.Status = workertypes.FailedStatus + status.Status = string(sednav1.LLJobStageCondReady) + status.Input = &clienttypes.Input{Models: []Model{{Format: models[0].Format, URL: models[0].URL}}} } else { - status.Status = workertypes.ReadyStatus - status.Input = &clienttypes.Input{ - Models: []Model{ - *deployModel, - }, - } + klog.Infof("job(%s) isn't need to deploy model", jobConfig.UniqueIdentifier) + status.Status = string(sednav1.LLJobStageCondCompleted) } - if err = lm.Client.WriteMessage(status, job.getHeader()); err != nil { + err = lm.Client.WriteMessage(status, job.getHeader()) + if err != nil { + klog.Errorf("job(%s) completed the %s task failed: %v", + jobConfig.UniqueIdentifier, sednav1.LLJobDeploy, err) return err } - jobConfig.TriggerStatus = TriggerCompletedStatus + job.JobConfig.DeployTriggerStatus = TriggerCompletedStatus + klog.Infof("job(%s) completed the %s task successfully", jobConfig.UniqueIdentifier, sednav1.LLJobDeploy) } - - nextLLTask(jobConfig) - - klog.Infof("job(name=%s) complete the deploy task successfully", jobConfig.UniqueIdentifier) - return nil } @@ -365,19 +361,22 @@ func (lm *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) { return nil, false, nil } - jobConfig.Version++ + job.JobConfig.Rounds++ + rounds := jobConfig.Rounds var dataIndexURL string - jobConfig.TrainDataURL, dataIndexURL, err = job.writeLLJSamples(jobConfig.DataSamples.TrainSamples, - jobConfig.OutputConfig.SamplesOutput["train"]) + jobConfig.TrainDataURL, dataIndexURL, err = lm.writeSamples(job, jobConfig.DataSamples.TrainSamples, + jobConfig.OutputConfig.SamplesOutput["train"], rounds, jobConfig.Dataset.Spec.Format, jobConfig.Dataset.URLPrefix) if err != nil { - klog.Errorf("train phase: write samples to the file(%s) is failed, error: %v", jobConfig.TrainDataURL, err) + job.JobConfig.Rounds-- + klog.Errorf("job(%s) train phase: write samples to the file(%s) is failed: %v", + jobConfig.UniqueIdentifier, jobConfig.TrainDataURL, err) return nil, false, err } dataURL := jobConfig.TrainDataURL - outputDir := strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(jobConfig.Version)}, "/") - if job.Storage.IsLocalStorage { + outputDir := strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(rounds)}, "/") + if jobConfig.Storage.IsLocalStorage { dataURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataURL) dataIndexURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataIndexURL) outputDir = util.TrimPrefixPath(lm.VolumeMountPrefix, outputDir) @@ -389,10 +388,11 @@ func (lm *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) { OutputDir: outputDir, } msg := clienttypes.UpstreamMessage{ - Phase: TrainPhase, - Status: workertypes.ReadyStatus, + Phase: string(sednav1.LLJobTrain), + Status: string(sednav1.LLJobStageCondReady), Input: &input, } + jobConfig.TriggerTime = time.Now() return &msg, true, nil } @@ -402,89 +402,81 @@ func (lm *Manager) triggerEvalTask(job *Job) (*clienttypes.UpstreamMessage, erro jobConfig := job.JobConfig var err error + latestCondition := lm.getLatestCondition(job) + + ms := lm.getJobStageModel(job, latestCondition.Stage) + if ms == nil { + return nil, err + } + var dataIndexURL string - jobConfig.EvalDataURL, dataIndexURL, err = job.writeLLJSamples(jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"]) + jobConfig.EvalDataURL, dataIndexURL, err = lm.writeSamples(job, jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"], + job.JobConfig.Rounds, jobConfig.Dataset.Spec.Format, jobConfig.Dataset.URLPrefix) if err != nil { - klog.Errorf("job(name=%s) eval phase: write samples to the file(%s) is failed, error: %v", + klog.Errorf("job(%s) eval phase: write samples to the file(%s) is failed: %v", jobConfig.UniqueIdentifier, jobConfig.EvalDataURL, err) return nil, err } - var models []Model - models = append(models, Model{ - Format: jobConfig.TrainModel.Format, - URL: jobConfig.TrainModel.URL, - }) - dataURL := jobConfig.EvalDataURL - outputDir := strings.Join([]string{jobConfig.OutputConfig.EvalOutput, strconv.Itoa(jobConfig.Version)}, "/") - if job.Storage.IsLocalStorage { + outputDir := strings.Join([]string{jobConfig.OutputConfig.EvalOutput, strconv.Itoa(jobConfig.Rounds)}, "/") + if jobConfig.Storage.IsLocalStorage { dataURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataURL) dataIndexURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataIndexURL) outputDir = util.TrimPrefixPath(lm.VolumeMountPrefix, outputDir) } input := clienttypes.Input{ - Models: models, + Models: ms, DataURL: dataURL, DataIndexURL: dataIndexURL, OutputDir: outputDir, } msg := &clienttypes.UpstreamMessage{ - Phase: EvalPhase, - Status: workertypes.ReadyStatus, + Phase: string(sednav1.LLJobEval), + Status: string(sednav1.LLJobStageCondReady), Input: &input, } return msg, nil } -// deployModel deploys model -func (lm *Manager) deployModel(job *Job) (*Model, error) { - jobConfig := job.JobConfig - - model := &Model{} - model = jobConfig.EvalResult - - if job.Storage.IsLocalStorage { - model.URL = util.AddPrefixPath(lm.VolumeMountPrefix, model.URL) +// updateDeployModelFile updates deploy model file +func (lm *Manager) updateDeployModelFile(job *Job, trainedModel string, deployModel string) error { + if job.JobConfig.Storage.IsLocalStorage { + trainedModel = util.AddPrefixPath(lm.VolumeMountPrefix, trainedModel) } - deployModelURL := jobConfig.DeployModel.URL - if err := job.Storage.CopyFile(model.URL, deployModelURL); err != nil { - return nil, fmt.Errorf("copy model(url=%s) to the deploy model(url=%s) failed, error: %+v", - model.URL, deployModelURL, err) + if err := job.JobConfig.Storage.CopyFile(trainedModel, deployModel); err != nil { + return fmt.Errorf("failed to copy trained model(url=%s) to the deploy model(url=%s): %w", + trainedModel, deployModel, err) } - klog.V(4).Infof("copy model(url=%s) to the deploy model(url=%s) successfully", model.URL, deployModelURL) - klog.Infof("job(name=%s) deploys model(url=%s) successfully", jobConfig.UniqueIdentifier, model.URL) + klog.V(4).Infof("copy trained model(url=%s) to the deploy model(url=%s) successfully", trainedModel, deployModel) - return model, nil + return nil } // createOutputDir creates the job output dir -func (job *Job) createOutputDir(jobConfig *LLJobConfig) error { +func (job *Job) createOutputDir(jobConfig *JobConfig) error { outputDir := jobConfig.OutputDir dirNames := []string{"data/train", "data/eval", "train", "eval"} - // lifelong_kb_index.pkl - if job.Storage.IsLocalStorage { + if job.JobConfig.Storage.IsLocalStorage { if err := util.CreateFolder(outputDir); err != nil { - klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, outputDir) - return err + return fmt.Errorf("failed to create folder %s: %v", outputDir, err) } for _, v := range dirNames { dir := path.Join(outputDir, v) if err := util.CreateFolder(dir); err != nil { - klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, dir) - return err + return fmt.Errorf("failed to create folder %s: %v", dir, err) } } } - outputConfig := LLOutputConfig{ + outputConfig := OutputConfig{ SamplesOutput: map[string]string{ "train": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[0]}, "/"), "eval": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[1]}, "/"), @@ -497,8 +489,18 @@ func (job *Job) createOutputDir(jobConfig *LLJobConfig) error { return nil } +func (lm *Manager) getLatestCondition(job *Job) sednav1.LLJobCondition { + jobConditions := job.Status.Conditions + var latestCondition sednav1.LLJobCondition = sednav1.LLJobCondition{} + if len(jobConditions) > 0 { + // get latest pod and pod status + latestCondition = jobConditions[len(jobConditions)-1] + } + return latestCondition +} + // createFile creates data file and data index file -func (job *Job) createFile(dir string, format string, isLocalStorage bool) (string, string) { +func createFile(dir string, format string, isLocalStorage bool) (string, string) { switch strings.ToLower(format) { case dataset.TXTFormat: if isLocalStorage { @@ -512,16 +514,17 @@ func (job *Job) createFile(dir string, format string, isLocalStorage bool) (stri return "", "" } -// writeLLJSamples writes samples information to a file -func (job *Job) writeLLJSamples(samples []string, dir string) (string, string, error) { - version := job.JobConfig.Version - format := job.Dataset.Spec.Format - urlPrefix := job.Dataset.URLPrefix +// writeSamples writes samples information to a file +func (lm *Manager) writeSamples(job *Job, samples []string, dir string, rounds int, format string, urlPrefix string) (string, string, error) { + if samples == nil { + return "", "", fmt.Errorf("not samples") + } - subDir := strings.Join([]string{dir, strconv.Itoa(version)}, "/") - fileURL, absURLFile := job.createFile(subDir, format, job.Dataset.Storage.IsLocalStorage) + jobConfig := job.JobConfig + subDir := strings.Join([]string{dir, strconv.Itoa(rounds)}, "/") + fileURL, absURLFile := createFile(subDir, format, jobConfig.Dataset.Storage.IsLocalStorage) - if job.Storage.IsLocalStorage { + if jobConfig.Storage.IsLocalStorage { if err := util.CreateFolder(subDir); err != nil { return "", "", err } @@ -529,7 +532,7 @@ func (job *Job) writeLLJSamples(samples []string, dir string) (string, string, e return "", "", err } - if !job.Dataset.Storage.IsLocalStorage && absURLFile != "" { + if !jobConfig.Dataset.Storage.IsLocalStorage && absURLFile != "" { tempSamples := util.ParsingDatasetIndex(samples, urlPrefix) if err := job.writeByLine(tempSamples, absURLFile, format); err != nil { return "", "", err @@ -544,13 +547,13 @@ func (job *Job) writeLLJSamples(samples []string, dir string) (string, string, e return "", "", err } - localFileURL, localAbsURLFile := job.createFile(temporaryDir, format, job.Dataset.Storage.IsLocalStorage) + localFileURL, localAbsURLFile := createFile(temporaryDir, format, jobConfig.Dataset.Storage.IsLocalStorage) if err := job.writeByLine(samples, localFileURL, format); err != nil { return "", "", err } - if err := job.Storage.Upload(localFileURL, fileURL); err != nil { + if err := jobConfig.Storage.Upload(localFileURL, fileURL); err != nil { return "", "", err } @@ -561,7 +564,7 @@ func (job *Job) writeLLJSamples(samples []string, dir string) (string, string, e return "", "", err } - if err := job.Storage.Upload(localAbsURLFile, absURLFile); err != nil { + if err := jobConfig.Storage.Upload(localAbsURLFile, absURLFile); err != nil { return "", "", err } @@ -584,7 +587,7 @@ func (job *Job) writeByLine(samples []string, fileURL string, format string) err w := bufio.NewWriter(file) if format == "csv" { - _, _ = fmt.Fprintln(w, job.Dataset.DataSource.Header) + _, _ = fmt.Fprintln(w, job.JobConfig.Dataset.DataSource.Header) } for _, line := range samples { @@ -605,62 +608,61 @@ func (job *Job) writeByLine(samples []string, fileURL string, format string) err // handleData updates samples information func (lm *Manager) handleData(job *Job) { - tick := time.NewTicker(LLHandlerDataIntervalSeconds * time.Second) + tick := time.NewTicker(DatasetHandlerIntervalSeconds * time.Second) jobConfig := job.JobConfig iterCount := 0 for { select { - case <-job.Done: + case <-jobConfig.Done: return default: } - // in case dataset is not synced to LC before job synced to LC - // here call loadDataset in each period - err := lm.loadDataset(job) if iterCount%100 == 0 { - klog.Infof("job(name=%s) handling dataset", jobConfig.UniqueIdentifier) + klog.V(4).Infof("job(%s) is handling dataset", jobConfig.UniqueIdentifier) } iterCount++ - if err != nil { - klog.Warningf("job(name=%s) failed to load dataset, and waiting it: %v", - jobConfig.UniqueIdentifier, - err) + + if jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil { + // already loaded dataset <-tick.C continue } - dataset := job.Dataset + dataset := jobConfig.Dataset + currentNumberOfSamples := dataset.DataSource.NumberOfSamples + previousNumberOfSamples := jobConfig.DataSamples.PreviousNumbers - if dataset.DataSource != nil && len(dataset.DataSource.TrainSamples) > jobConfig.DataSamples.Numbers { + if dataset.DataSource != nil && currentNumberOfSamples > previousNumberOfSamples { samples := dataset.DataSource.TrainSamples - trainNum := int(job.Spec.Dataset.TrainProb * float64(len(samples)-jobConfig.DataSamples.Numbers)) + newNumberOfSamples := currentNumberOfSamples - previousNumberOfSamples + trainNum := int(job.Spec.Dataset.TrainProb * float64(newNumberOfSamples)) jobConfig.Lock.Lock() jobConfig.DataSamples.TrainSamples = append(jobConfig.DataSamples.TrainSamples, - samples[(jobConfig.DataSamples.Numbers+1):(jobConfig.DataSamples.Numbers+trainNum+1)]...) - klog.Infof("job(name=%s) current train samples nums is %d", - jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.TrainSamples)) + samples[previousNumberOfSamples:previousNumberOfSamples+trainNum]...) + klog.Infof("job(%s)'s current train samples nums is %d", jobConfig.UniqueIdentifier, trainNum) jobConfig.DataSamples.EvalVersionSamples = append(jobConfig.DataSamples.EvalVersionSamples, - samples[(jobConfig.DataSamples.Numbers+trainNum+1):]) + samples[previousNumberOfSamples+trainNum:]) jobConfig.Lock.Unlock() for _, v := range jobConfig.DataSamples.EvalVersionSamples { jobConfig.DataSamples.EvalSamples = append(jobConfig.DataSamples.EvalSamples, v...) } - klog.Infof("job(name=%s) current eval samples nums is %d", - jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.EvalSamples)) + klog.Infof("job(%s)'s current eval samples nums is %d", jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.EvalSamples)) - jobConfig.DataSamples.Numbers = len(samples) + jobConfig.DataSamples.PreviousNumbers = currentNumberOfSamples } + <-tick.C } } +// loadDataset loads dataset information func (lm *Manager) loadDataset(job *Job) error { - if job.Dataset != nil { + if job.JobConfig.Dataset != nil { // already loaded return nil } @@ -671,30 +673,38 @@ func (lm *Manager) loadDataset(job *Job) error { return fmt.Errorf("not exists dataset(name=%s)", datasetName) } - jobConfig := job.JobConfig - jobConfig.DataSamples = &LLDataSamples{ - Numbers: 0, - TrainSamples: make([]string, 0), - EvalVersionSamples: make([][]string, 0), - EvalSamples: make([]string, 0), - } - - job.Dataset = dataset + job.JobConfig.Dataset = dataset return nil } // initJob inits the job object -func (lm *Manager) initJob(job *Job) error { +func (lm *Manager) initJob(job *Job, name string) error { + var err error + job.JobConfig = new(JobConfig) + jobConfig := job.JobConfig - jobConfig.TrainModel = new(Model) - jobConfig.EvalResult = new(Model) + jobConfig.UniqueIdentifier = name + + jobConfig.Storage = storage.Storage{IsLocalStorage: false} + credential := job.ObjectMeta.Annotations[runtime.SecretAnnotationKey] + if credential != "" { + if err := job.JobConfig.Storage.SetCredential(credential); err != nil { + return fmt.Errorf("failed to set storage credential: %w", err) + } + } + + jobConfig.Done = make(chan struct{}) jobConfig.Lock = sync.Mutex{} + jobConfig.Rounds = 0 - jobConfig.Version = 0 - jobConfig.Phase = TrainPhase - jobConfig.WorkerStatus = workertypes.ReadyStatus - jobConfig.TriggerStatus = TriggerReadyStatus - trainTrigger, err := newLLTrigger(job.Spec.TrainSpec.Trigger) + jobConfig.DataSamples = &DataSamples{ + PreviousNumbers: 0, + TrainSamples: make([]string, 0), + EvalVersionSamples: make([][]string, 0), + EvalSamples: make([]string, 0), + } + + trainTrigger, err := newTrigger(job.Spec.TrainSpec.Trigger) if err != nil { return fmt.Errorf("failed to init train trigger: %+w", err) } @@ -702,19 +712,19 @@ func (lm *Manager) initJob(job *Job) error { outputDir := job.Spec.OutputDir - isLocalURL, err := job.Storage.IsLocalURL(outputDir) + isLocalURL, err := jobConfig.Storage.IsLocalURL(outputDir) if err != nil { - return fmt.Errorf("job(name=%s)'s output dir is invalid, error: %+v", job.Name, outputDir) + return fmt.Errorf("job(%s)'s output dir(%s) is invalid: %+w", name, outputDir, err) } if isLocalURL { - job.Storage.IsLocalStorage = true + jobConfig.Storage.IsLocalStorage = true outputDir = util.AddPrefixPath(lm.VolumeMountPrefix, outputDir) } jobConfig.OutputDir = outputDir - if err := job.createOutputDir(jobConfig); err != nil { + if err = job.createOutputDir(jobConfig); err != nil { return err } @@ -723,10 +733,22 @@ func (lm *Manager) initJob(job *Job) error { URL: strings.Join([]string{strings.TrimRight(outputDir, "/"), "deploy/index.pkl"}, "/"), } + if err = lm.updateJobFromDB(job); err != nil { + return fmt.Errorf("failed to update job from db, error: %v", err) + } + + initTriggerStatus(jobConfig) + return nil } -func newLLTrigger(t sednav1.LLTrigger) (trigger.Base, error) { +func initTriggerStatus(jobConfig *JobConfig) { + jobConfig.TrainTriggerStatus = TriggerReadyStatus + jobConfig.EvalTriggerStatus = TriggerReadyStatus + jobConfig.DeployTriggerStatus = TriggerReadyStatus +} + +func newTrigger(t sednav1.LLTrigger) (trigger.Base, error) { // convert trigger to map triggerMap := make(map[string]interface{}) c, err := json.Marshal(t) @@ -741,55 +763,83 @@ func newLLTrigger(t sednav1.LLTrigger) (trigger.Base, error) { return trigger.NewTrigger(triggerMap) } -// forwardSamplesLL deletes the samples information in the memory -func forwardSamplesLL(jobConfig *LLJobConfig) { - switch jobConfig.Phase { - case TrainPhase: - { - jobConfig.Lock.Lock() - jobConfig.DataSamples.TrainSamples = jobConfig.DataSamples.TrainSamples[:0] - jobConfig.Lock.Unlock() - } - case EvalPhase: - { - if len(jobConfig.DataSamples.EvalVersionSamples) > LLEvalSamplesCapacity { - jobConfig.DataSamples.EvalVersionSamples = jobConfig.DataSamples.EvalVersionSamples[1:] +// getModelsFromJobConditions gets models from job condition +func (lm *Manager) getModelsFromJobConditions(jobConditions []sednav1.LLJobCondition, stage sednav1.LLJobStage, currentType sednav1.LLJobStageConditionType, dataType string) []Model { + // TODO: runtime.type changes to common.type for gm and lc + for i := len(jobConditions) - 1; i >= 0; i-- { + var cond gmtypes.ConditionData + jobCond := jobConditions[i] + if jobCond.Stage == stage && jobCond.Type == currentType { + if err := (&cond).Unmarshal([]byte(jobCond.Data)); err != nil { + continue + } + + if dataType == "input" { + if cond.Input == nil { + continue + } + + return cond.Input.Models + } else if dataType == "output" { + if cond.Output == nil { + continue + } + + return cond.Output.Models } } } -} -// backLLTaskStatus backs train task status -func backLLTaskStatus(jobConfig *LLJobConfig) { - jobConfig.Phase = TrainPhase - initLLTaskStatus(jobConfig) + return nil } -// initLLTaskStatus inits task status -func initLLTaskStatus(jobConfig *LLJobConfig) { - jobConfig.WorkerStatus = workertypes.ReadyStatus - jobConfig.TriggerStatus = TriggerReadyStatus -} +// getEvalResult gets eval result from job conditions +func (lm *Manager) getEvalResult(job *Job) ([]map[string][]float64, error) { + jobConditions := job.Status.Conditions + models := lm.getModelsFromJobConditions(jobConditions, sednav1.LLJobEval, sednav1.LLJobStageCondCompleted, "output") -// nextLLTask converts next task status -func nextLLTask(jobConfig *LLJobConfig) { - switch jobConfig.Phase { - case TrainPhase: - { - forwardSamplesLL(jobConfig) - initLLTaskStatus(jobConfig) - jobConfig.Phase = EvalPhase + var result []map[string][]float64 + var err error + for _, m := range models { + bytes, err := json.Marshal(m.Metrics) + if err != nil { + return nil, err } - case EvalPhase: - { - forwardSamplesLL(jobConfig) - initLLTaskStatus(jobConfig) - jobConfig.Phase = DeployPhase + data := make(map[string][]float64) + if err = json.Unmarshal(bytes, &data); err != nil { + return nil, err } - case DeployPhase: - { - backLLTaskStatus(jobConfig) + + result = append(result, data) + } + return result, err +} + +// getJobStageModel gets model from job conditions for eval/deploy +func (lm *Manager) getJobStageModel(job *Job, jobStage sednav1.LLJobStage) (models []Model) { + jobConditions := job.Status.Conditions + + switch jobStage { + case sednav1.LLJobEval: + models = lm.getModelsFromJobConditions(jobConditions, sednav1.LLJobTrain, sednav1.LLJobStageCondCompleted, "output") + case sednav1.LLJobDeploy: + models = lm.getModelsFromJobConditions(jobConditions, sednav1.LLJobEval, sednav1.LLJobStageCondCompleted, "output") + } + + return models +} + +// forwardSamples deletes the samples information in the memory +func forwardSamples(jobConfig *JobConfig, jobStage sednav1.LLJobStage) { + switch jobStage { + case sednav1.LLJobTrain: + jobConfig.Lock.Lock() + jobConfig.DataSamples.TrainSamples = jobConfig.DataSamples.TrainSamples[:0] + jobConfig.Lock.Unlock() + case sednav1.LLJobEval: + if len(jobConfig.DataSamples.EvalVersionSamples) > EvalSamplesCapacity { + jobConfig.DataSamples.EvalVersionSamples = jobConfig.DataSamples.EvalVersionSamples[1:] } } } @@ -798,8 +848,8 @@ func nextLLTask(jobConfig *LLJobConfig) { func (lm *Manager) Delete(message *clienttypes.Message) error { name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind) - if job, ok := lm.LifelongLearningJobMap[name]; ok && job.Done != nil { - close(job.Done) + if job, ok := lm.LifelongLearningJobMap[name]; ok && job.JobConfig.Done != nil { + close(job.JobConfig.Done) } delete(lm.LifelongLearningJobMap, name) @@ -811,7 +861,79 @@ func (lm *Manager) Delete(message *clienttypes.Message) error { return nil } -// Start starts LifelongLearningJob manager +// updateJobFromDB updates job from db +func (lm *Manager) updateJobFromDB(job *Job) error { + var err error + + previousJob, err := db.GetResource(job.JobConfig.UniqueIdentifier) + if err != nil { + return err + } + + m := metav1.ObjectMeta{} + if err != json.Unmarshal([]byte(previousJob.ObjectMeta), &m) { + return err + } + + rounds, ok := m.Annotations[AnnotationsRoundsKey] + if !ok { + return nil + } + + if job.JobConfig.Rounds, err = strconv.Atoi(rounds); err != nil { + return err + } + + numberOfSamples, ok := m.Annotations[AnnotationsNumberOfSamplesKey] + if !ok { + return nil + } + + if job.JobConfig.DataSamples.PreviousNumbers, err = strconv.Atoi(numberOfSamples); err != nil { + return err + } + + dataFileOfEval, ok := m.Annotations[AnnotationsDataFileOfEvalKey] + if !ok { + return nil + } + + localURL, err := job.JobConfig.Storage.Download(dataFileOfEval, "") + + if !job.JobConfig.Storage.IsLocalStorage { + defer os.RemoveAll(localURL) + } + + if err != nil { + return err + } + + samples, err := dataset.GetSamples(dataFileOfEval) + if err != nil { + klog.Errorf("read file %s failed: %v", dataFileOfEval, err) + return err + } + + job.JobConfig.DataSamples.EvalVersionSamples = append(job.JobConfig.DataSamples.EvalVersionSamples, samples) + + return nil +} + +// saveJobToDB saves job info to db +func (lm *Manager) saveJobToDB(job *Job) error { + ann := job.ObjectMeta.Annotations + if ann == nil { + ann = make(map[string]string) + } + + ann[AnnotationsRoundsKey] = strconv.Itoa(job.JobConfig.Rounds) + ann[AnnotationsNumberOfSamplesKey] = strconv.Itoa(job.JobConfig.DataSamples.PreviousNumbers) + ann[AnnotationsDataFileOfEvalKey] = job.JobConfig.EvalDataURL + + return db.SaveResource(job.JobConfig.UniqueIdentifier, job.TypeMeta, job.ObjectMeta, job.Spec) +} + +// Start starts lifelong-learning-job manager func (lm *Manager) Start() error { go lm.monitorWorker() @@ -845,44 +967,9 @@ func (lm *Manager) monitorWorker() { Status: workerMessage.Status, Output: &wo, } - lm.Client.WriteMessage(msg, job.getHeader()) - - lm.handleWorkerMessage(job, workerMessage) - } -} - -// handleWorkerMessage handles message from worker -func (lm *Manager) handleWorkerMessage(job *Job, workerMessage workertypes.MessageContent) { - jobPhase := job.JobConfig.Phase - workerKind := workerMessage.Kind - if jobPhase != workerKind { - klog.Warningf("job(name=%s) %s phase get worker(kind=%s)", job.JobConfig.UniqueIdentifier, - jobPhase, workerKind) - return - } - - var models []*Model - for _, result := range workerMessage.Results { - model := Model{ - Format: result["format"].(string), - URL: result["url"].(string)} - models = append(models, &model) - } - - model := &Model{} - if len(models) != 1 { - return - } - model = models[0] - - job.JobConfig.WorkerStatus = workerMessage.Status - - if job.JobConfig.WorkerStatus == workertypes.CompletedStatus { - switch job.JobConfig.Phase { - case TrainPhase: - job.JobConfig.TrainModel = model - case EvalPhase: - job.JobConfig.EvalResult = model + if err := lm.Client.WriteMessage(msg, job.getHeader()); err != nil { + klog.Errorf("job(%s) failed to write message: %v", name, err) + continue } } }