From f573c7ff744a350970df241a4f30d0f422dc12c6 Mon Sep 17 00:00:00 2001 From: Pratik Meher Date: Fri, 15 Feb 2019 12:07:47 +0000 Subject: [PATCH] cleared job with all running tasks from root tasks map --- src/scheduling/event_driven_scheduler.cc | 2 + src/scheduling/event_driven_scheduler.h | 1 + src/scheduling/firmament_scheduler_service.cc | 1 + src/scheduling/flow/flow_scheduler.cc | 67 ++++++++++--------- src/scheduling/flow/flow_scheduler.h | 2 + src/scheduling/scheduler_interface.h | 5 ++ 6 files changed, 47 insertions(+), 31 deletions(-) diff --git a/src/scheduling/event_driven_scheduler.cc b/src/scheduling/event_driven_scheduler.cc index 2b001683d..62b543c47 100644 --- a/src/scheduling/event_driven_scheduler.cc +++ b/src/scheduling/event_driven_scheduler.cc @@ -121,6 +121,8 @@ void EventDrivenScheduler::AddPodAffinityAntiAffinityJobData( } if (no_conflict_within) { no_conflict_root_tasks_.insert(rtd.uid()); + unordered_set children_set; + InsertIfNotPresent(&root_to_children_tasks_, rtd.uid(), children_set); } else { if (jd_ptr->is_gang_scheduling_job()) { vector delta_v; diff --git a/src/scheduling/event_driven_scheduler.h b/src/scheduling/event_driven_scheduler.h index 1acc41523..5d5ca712d 100644 --- a/src/scheduling/event_driven_scheduler.h +++ b/src/scheduling/event_driven_scheduler.h @@ -212,6 +212,7 @@ class EventDrivenScheduler : public SchedulerInterface { bool affinity_batch_schedule; unordered_map> no_conflict_tasks_map_; unordered_map no_conflict_task_mapped_; + unordered_map> root_to_children_tasks_; }; } // namespace scheduler diff --git a/src/scheduling/firmament_scheduler_service.cc b/src/scheduling/firmament_scheduler_service.cc index f5b7e4d11..21ca9e695 100644 --- a/src/scheduling/firmament_scheduler_service.cc +++ b/src/scheduling/firmament_scheduler_service.cc @@ -574,6 +574,7 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { td_ptr->CopyFrom(task_desc_ptr->task_descriptor()); CHECK(InsertIfNotPresent(task_map_.get(), td_ptr->uid(), td_ptr)); td_ptr->set_submit_time(wall_time_.GetCurrentTimestamp()); + scheduler_->UpdateSpawnedToRootTaskMap(td_ptr); } uint64_t* num_incomplete_tasks = FindOrNull(job_num_incomplete_tasks_, job_id); diff --git a/src/scheduling/flow/flow_scheduler.cc b/src/scheduling/flow/flow_scheduler.cc index 68ce48279..95e4dc0d8 100644 --- a/src/scheduling/flow/flow_scheduler.cc +++ b/src/scheduling/flow/flow_scheduler.cc @@ -358,37 +358,7 @@ void FlowScheduler::RemoveAffinityAntiAffinityJobData(JobID_t job_id) { JobDescriptor* jdp = FindOrNull(*job_map_, job_id); if (jdp) { no_conflict_root_tasks_.erase(jdp->root_task().uid()); - vector* tasks_vec = - FindOrNull(no_conflict_tasks_map_, jdp->root_task().uid()); - if (tasks_vec) { - if (tasks_vec->size()) { - vector::iterator it_first = tasks_vec->begin(); - TaskID_t key_task = *it_first; - tasks_vec->erase(it_first); - InsertIfNotPresent(&no_conflict_tasks_map_, key_task, *tasks_vec); - no_conflict_task_mapped_.erase(key_task); - for (auto t : *tasks_vec) { - no_conflict_task_mapped_.erase(t); - InsertIfNotPresent(&no_conflict_task_mapped_, t, key_task); - } - } - no_conflict_tasks_map_.erase(jdp->root_task().uid()); - } else { - TaskID_t* key_rtask = - FindOrNull(no_conflict_task_mapped_, jdp->root_task().uid()); - if (key_rtask) { - vector* t_vec = - FindOrNull(no_conflict_tasks_map_, *key_rtask); - if (t_vec) { - vector::iterator it_vec = - find(t_vec->begin(), t_vec->end(), jdp->root_task().uid()); - if (it_vec != t_vec->end()) { - t_vec->erase(it_vec); - } - } - no_conflict_task_mapped_.erase(jdp->root_task().uid()); - } - } + root_to_children_tasks_.erase(jdp->root_task().uid()); } } @@ -527,6 +497,15 @@ void FlowScheduler::HandleTaskPlacement(TaskDescriptor* td_ptr, if (FLAGS_pod_affinity_antiaffinity_symmetry) { cost_model_->UpdateResourceToTaskSymmetryMap(res_id, td_ptr->uid()); } + if (!td_ptr->spawned_size()) { + JobDescriptor* jd_ptr = + FindOrNull(*job_map_, JobIDFromString(td_ptr->job_id())); + unordered_set* task_set_ptr = + FindOrNull(root_to_children_tasks_, jd_ptr->root_task().uid()); + if (task_set_ptr) { + task_set_ptr->erase(td_ptr->uid()); + } + } } cost_model_->UpdateResourceToNamespacesMap(res_id, td_ptr->task_namespace(), true); @@ -627,6 +606,8 @@ vector* FlowScheduler::ScheduleAllAffinityBatchJobs( } affinity_batch_job_schedule_.clear(); } + no_conflict_tasks_map_.clear(); + no_conflict_task_mapped_.clear(); affinity_batch_schedule = false; return unscheduled_tasks; } @@ -653,10 +634,22 @@ uint64_t FlowScheduler::ScheduleAllQueueJobs(SchedulerStats* scheduler_stats, return num_scheduled_tasks; } +bool FlowScheduler::CheckAllTasksInJobRunning(TaskDescriptor* rtd) { + if (rtd && (rtd->state() == TaskDescriptor::RUNNING)) { + unordered_set* spawned_tasks_set = + FindOrNull(root_to_children_tasks_, rtd->uid()); + if (spawned_tasks_set && !spawned_tasks_set->size()) { + return true; + } + } + return false; +} + void FlowScheduler::UpdateBatchAffinityTasksMap() { for (auto task : no_conflict_root_tasks_) { bool matched = false; TaskDescriptor* rtd = FindPtrOrNull(*task_map_, task); + if (CheckAllTasksInJobRunning(rtd)) continue; for (auto it = no_conflict_tasks_map_.begin(); it != no_conflict_tasks_map_.end(); it++) { TaskDescriptor* other_rtd = FindPtrOrNull(*task_map_, it->first); @@ -1156,5 +1149,17 @@ void FlowScheduler::UpdateGangSchedulingDeltas( affinity_delta_tasks.clear(); } +void FlowScheduler::UpdateSpawnedToRootTaskMap(TaskDescriptor* td_ptr) { + if (td_ptr) { + JobDescriptor* jd_ptr = + FindOrNull(*job_map_, JobIDFromString(td_ptr->job_id())); + unordered_set* task_set_ptr = + FindOrNull(root_to_children_tasks_, jd_ptr->root_task().uid()); + if (task_set_ptr) { + task_set_ptr->insert(td_ptr->uid()); + } + } +} + } // namespace scheduler } // namespace firmament diff --git a/src/scheduling/flow/flow_scheduler.h b/src/scheduling/flow/flow_scheduler.h index 02d8bb468..c0e9ac71e 100644 --- a/src/scheduling/flow/flow_scheduler.h +++ b/src/scheduling/flow/flow_scheduler.h @@ -102,6 +102,7 @@ class FlowScheduler : public EventDrivenScheduler { vector* unscheduled_normal_tasks, unordered_set* unscheduled_affinity_tasks_set, vector* unscheduled_affinity_tasks); + virtual void UpdateSpawnedToRootTaskMap(TaskDescriptor* td_ptr); virtual uint64_t ScheduleAllJobs(SchedulerStats* scheduler_stats, vector* deltas); virtual uint64_t ScheduleJob(JobDescriptor* jd_ptr, @@ -150,6 +151,7 @@ class FlowScheduler : public EventDrivenScheduler { ResourceStatus* rs); void UpdateBatchAffinityTasksMap(); void RemoveAffinityAntiAffinityJobData(JobID_t job_id); + bool CheckAllTasksInJobRunning(TaskDescriptor* rtd); // Pointer to the coordinator's topology manager shared_ptr topology_manager_; diff --git a/src/scheduling/scheduler_interface.h b/src/scheduling/scheduler_interface.h index 55a1ea49d..8feb15503 100644 --- a/src/scheduling/scheduler_interface.h +++ b/src/scheduling/scheduler_interface.h @@ -305,6 +305,11 @@ class SchedulerInterface : public PrintableInterface { return NULL; } + /** + * Updates with a entry in + */ + virtual void UpdateSpawnedToRootTaskMap(TaskDescriptor* td_ptr) {} + protected: /** * Handles the migration of a task.