Skip to content

Commit

Permalink
cleared job with all running tasks from root tasks map
Browse files Browse the repository at this point in the history
  • Loading branch information
pratikmeher44 committed Mar 12, 2019
1 parent 4497d27 commit f573c7f
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 31 deletions.
2 changes: 2 additions & 0 deletions src/scheduling/event_driven_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ void EventDrivenScheduler::AddPodAffinityAntiAffinityJobData(
}
if (no_conflict_within) {
no_conflict_root_tasks_.insert(rtd.uid());
unordered_set<TaskID_t> children_set;
InsertIfNotPresent(&root_to_children_tasks_, rtd.uid(), children_set);
} else {
if (jd_ptr->is_gang_scheduling_job()) {
vector<SchedulingDelta> delta_v;
Expand Down
1 change: 1 addition & 0 deletions src/scheduling/event_driven_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ class EventDrivenScheduler : public SchedulerInterface {
bool affinity_batch_schedule;
unordered_map<TaskID_t, vector<TaskID_t>> no_conflict_tasks_map_;
unordered_map<TaskID_t, TaskID_t> no_conflict_task_mapped_;
unordered_map<TaskID_t, unordered_set<TaskID_t>> root_to_children_tasks_;
};

} // namespace scheduler
Expand Down
1 change: 1 addition & 0 deletions src/scheduling/firmament_scheduler_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
td_ptr->CopyFrom(task_desc_ptr->task_descriptor());
CHECK(InsertIfNotPresent(task_map_.get(), td_ptr->uid(), td_ptr));
td_ptr->set_submit_time(wall_time_.GetCurrentTimestamp());
scheduler_->UpdateSpawnedToRootTaskMap(td_ptr);
}
uint64_t* num_incomplete_tasks =
FindOrNull(job_num_incomplete_tasks_, job_id);
Expand Down
67 changes: 36 additions & 31 deletions src/scheduling/flow/flow_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,37 +358,7 @@ void FlowScheduler::RemoveAffinityAntiAffinityJobData(JobID_t job_id) {
JobDescriptor* jdp = FindOrNull(*job_map_, job_id);
if (jdp) {
no_conflict_root_tasks_.erase(jdp->root_task().uid());
vector<TaskID_t>* tasks_vec =
FindOrNull(no_conflict_tasks_map_, jdp->root_task().uid());
if (tasks_vec) {
if (tasks_vec->size()) {
vector<TaskID_t>::iterator it_first = tasks_vec->begin();
TaskID_t key_task = *it_first;
tasks_vec->erase(it_first);
InsertIfNotPresent(&no_conflict_tasks_map_, key_task, *tasks_vec);
no_conflict_task_mapped_.erase(key_task);
for (auto t : *tasks_vec) {
no_conflict_task_mapped_.erase(t);
InsertIfNotPresent(&no_conflict_task_mapped_, t, key_task);
}
}
no_conflict_tasks_map_.erase(jdp->root_task().uid());
} else {
TaskID_t* key_rtask =
FindOrNull(no_conflict_task_mapped_, jdp->root_task().uid());
if (key_rtask) {
vector<TaskID_t>* t_vec =
FindOrNull(no_conflict_tasks_map_, *key_rtask);
if (t_vec) {
vector<TaskID_t>::iterator it_vec =
find(t_vec->begin(), t_vec->end(), jdp->root_task().uid());
if (it_vec != t_vec->end()) {
t_vec->erase(it_vec);
}
}
no_conflict_task_mapped_.erase(jdp->root_task().uid());
}
}
root_to_children_tasks_.erase(jdp->root_task().uid());
}
}

Expand Down Expand Up @@ -527,6 +497,15 @@ void FlowScheduler::HandleTaskPlacement(TaskDescriptor* td_ptr,
if (FLAGS_pod_affinity_antiaffinity_symmetry) {
cost_model_->UpdateResourceToTaskSymmetryMap(res_id, td_ptr->uid());
}
if (!td_ptr->spawned_size()) {
JobDescriptor* jd_ptr =
FindOrNull(*job_map_, JobIDFromString(td_ptr->job_id()));
unordered_set<TaskID_t>* task_set_ptr =
FindOrNull(root_to_children_tasks_, jd_ptr->root_task().uid());
if (task_set_ptr) {
task_set_ptr->erase(td_ptr->uid());
}
}
}
cost_model_->UpdateResourceToNamespacesMap(res_id,
td_ptr->task_namespace(), true);
Expand Down Expand Up @@ -627,6 +606,8 @@ vector<TaskID_t>* FlowScheduler::ScheduleAllAffinityBatchJobs(
}
affinity_batch_job_schedule_.clear();
}
no_conflict_tasks_map_.clear();
no_conflict_task_mapped_.clear();
affinity_batch_schedule = false;
return unscheduled_tasks;
}
Expand All @@ -653,10 +634,22 @@ uint64_t FlowScheduler::ScheduleAllQueueJobs(SchedulerStats* scheduler_stats,
return num_scheduled_tasks;
}

bool FlowScheduler::CheckAllTasksInJobRunning(TaskDescriptor* rtd) {
if (rtd && (rtd->state() == TaskDescriptor::RUNNING)) {
unordered_set<TaskID_t>* spawned_tasks_set =
FindOrNull(root_to_children_tasks_, rtd->uid());
if (spawned_tasks_set && !spawned_tasks_set->size()) {
return true;
}
}
return false;
}

void FlowScheduler::UpdateBatchAffinityTasksMap() {
for (auto task : no_conflict_root_tasks_) {
bool matched = false;
TaskDescriptor* rtd = FindPtrOrNull(*task_map_, task);
if (CheckAllTasksInJobRunning(rtd)) continue;
for (auto it = no_conflict_tasks_map_.begin();
it != no_conflict_tasks_map_.end(); it++) {
TaskDescriptor* other_rtd = FindPtrOrNull(*task_map_, it->first);
Expand Down Expand Up @@ -1156,5 +1149,17 @@ void FlowScheduler::UpdateGangSchedulingDeltas(
affinity_delta_tasks.clear();
}

void FlowScheduler::UpdateSpawnedToRootTaskMap(TaskDescriptor* td_ptr) {
if (td_ptr) {
JobDescriptor* jd_ptr =
FindOrNull(*job_map_, JobIDFromString(td_ptr->job_id()));
unordered_set<TaskID_t>* task_set_ptr =
FindOrNull(root_to_children_tasks_, jd_ptr->root_task().uid());
if (task_set_ptr) {
task_set_ptr->insert(td_ptr->uid());
}
}
}

} // namespace scheduler
} // namespace firmament
2 changes: 2 additions & 0 deletions src/scheduling/flow/flow_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class FlowScheduler : public EventDrivenScheduler {
vector<uint64_t>* unscheduled_normal_tasks,
unordered_set<uint64_t>* unscheduled_affinity_tasks_set,
vector<uint64_t>* unscheduled_affinity_tasks);
virtual void UpdateSpawnedToRootTaskMap(TaskDescriptor* td_ptr);
virtual uint64_t ScheduleAllJobs(SchedulerStats* scheduler_stats,
vector<SchedulingDelta>* deltas);
virtual uint64_t ScheduleJob(JobDescriptor* jd_ptr,
Expand Down Expand Up @@ -150,6 +151,7 @@ class FlowScheduler : public EventDrivenScheduler {
ResourceStatus* rs);
void UpdateBatchAffinityTasksMap();
void RemoveAffinityAntiAffinityJobData(JobID_t job_id);
bool CheckAllTasksInJobRunning(TaskDescriptor* rtd);

// Pointer to the coordinator's topology manager
shared_ptr<TopologyManager> topology_manager_;
Expand Down
5 changes: 5 additions & 0 deletions src/scheduling/scheduler_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,11 @@ class SchedulerInterface : public PrintableInterface {
return NULL;
}

/**
* Updates with a entry in
*/
virtual void UpdateSpawnedToRootTaskMap(TaskDescriptor* td_ptr) {}

protected:
/**
* Handles the migration of a task.
Expand Down

0 comments on commit f573c7f

Please sign in to comment.