Skip to content

Commit

Permalink
Merge pull request #39 from shivramsrivastava/pod_affinity_antiaffini…
Browse files Browse the repository at this point in the history
…ty_batch_schedule

Pod Affinity/Anti-Affinity optimization
  • Loading branch information
shivramsrivastava authored Mar 15, 2019
2 parents 83a2ff6 + f573c7f commit 9eb2dc3
Show file tree
Hide file tree
Showing 9 changed files with 553 additions and 107 deletions.
216 changes: 187 additions & 29 deletions src/scheduling/event_driven_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ EventDrivenScheduler::EventDrivenScheduler(
VLOG(1) << "EventDrivenScheduler initiated.";
one_task_runnable = false;
queue_based_schedule = false;
affinity_batch_schedule = false;
}

EventDrivenScheduler::~EventDrivenScheduler() {
Expand All @@ -101,14 +102,185 @@ EventDrivenScheduler::~EventDrivenScheduler() {
void EventDrivenScheduler::AddJob(JobDescriptor* jd_ptr) {
boost::lock_guard<boost::recursive_mutex> lock(scheduling_lock_);
InsertOrUpdate(&jobs_to_schedule_, JobIDFromString(jd_ptr->uuid()), jd_ptr);
if (jd_ptr->is_gang_scheduling_job()) {
TaskDescriptor rtd = jd_ptr->root_task();
if (rtd.has_affinity() && (rtd.affinity().has_pod_affinity()
|| rtd.affinity().has_pod_anti_affinity())) {
vector<SchedulingDelta> delta_v;
InsertIfNotPresent(&affinity_job_to_deltas_, jd_ptr, delta_v);
AddPodAffinityAntiAffinityJobData(jd_ptr);
}

void EventDrivenScheduler::AddPodAffinityAntiAffinityJobData(
JobDescriptor* jd_ptr) {
TaskDescriptor rtd = jd_ptr->root_task();
if (rtd.has_affinity() && (rtd.affinity().has_pod_affinity()
|| rtd.affinity().has_pod_anti_affinity())) {
bool no_conflict_within = true;
if (!CheckPodAffinityNoConflictWithin(&rtd, NULL)) {
no_conflict_within = false;
}
if (no_conflict_within) {
if (!CheckPodAntiAffinityNoConflictWithin(&rtd, NULL)) {
no_conflict_within = false;
}
}
if (no_conflict_within) {
no_conflict_root_tasks_.insert(rtd.uid());
unordered_set<TaskID_t> children_set;
InsertIfNotPresent(&root_to_children_tasks_, rtd.uid(), children_set);
} else {
if (jd_ptr->is_gang_scheduling_job()) {
vector<SchedulingDelta> delta_v;
InsertIfNotPresent(&affinity_job_to_deltas_, jd_ptr, delta_v);
}
}
}
}

bool EventDrivenScheduler::MatchExpressionWithPodLabels(
unordered_map<string, string>& labels,
const LabelSelectorRequirement& expression) {
string* value = FindOrNull(labels, expression.key());
if (value) {
for (auto& val : expression.values()) {
if (!value->compare(val)) {
return true;
}
}
}
return false;
}

bool EventDrivenScheduler::NotMatchExpressionWithPodLabels(
unordered_map<string, string>& labels,
const LabelSelectorRequirement& expression) {
string* value = FindOrNull(labels, expression.key());
if (value) {
for (auto& val : expression.values()) {
if (!value->compare(val)) {
return false;
}
}
}
return true;
}

bool EventDrivenScheduler::MatchExpressionKeyWithPodLabels(
unordered_map<string, string>& labels,
const LabelSelectorRequirement& expression) {
string* value = FindOrNull(labels, expression.key());
if (value) {
return true;
}
return false;
}

bool EventDrivenScheduler::NotMatchExpressionKeyWithPodLabels(
unordered_map<string, string>& labels,
const LabelSelectorRequirement& expression) {
string* value = FindOrNull(labels, expression.key());
if (value) {
return false;
}
return false;
}

bool EventDrivenScheduler::CheckPodAffinityNoConflictWithin(
TaskDescriptor* rtd,
TaskDescriptor* other_rtd) {
CHECK_NOTNULL(rtd);
if (rtd->affinity().has_pod_affinity()) {
if (rtd->affinity().pod_affinity()
.requiredduringschedulingignoredduringexecution_size()) {
unordered_map<string, string> labels;
if (!other_rtd) {
other_rtd = rtd;
}
for (auto& label : other_rtd->labels()) {
InsertIfNotPresent(&labels, label.key(), label.value());
}
for (auto& term : rtd->affinity().pod_affinity()
.requiredduringschedulingignoredduringexecution()) {
if (term.has_labelselector()) {
if (term.labelselector().matchexpressions_size()) {
for (auto& expression :
term.labelselector().matchexpressions()) {
if (expression.operator_() == std::string("In")) {
if (MatchExpressionWithPodLabels(labels, expression)) return false;
} else if (expression.operator_() == std::string("NotIn")) {
if (NotMatchExpressionWithPodLabels(labels, expression)) return false;
} else if (expression.operator_() == std::string("Exists")) {
if (MatchExpressionKeyWithPodLabels(labels, expression)) return false;
} else if (expression.operator_() == std::string("DoesNotExist")) {
if (NotMatchExpressionKeyWithPodLabels(labels, expression)) return false;
} else {
LOG(FATAL) << "Unsupported selector type: " << expression.operator_();
return false;
}
}
}
}
}
}
}
return true;
}

bool EventDrivenScheduler::CheckPodAntiAffinityNoConflictWithin(
TaskDescriptor* rtd,
TaskDescriptor* other_rtd) {
CHECK_NOTNULL(rtd);
if (rtd->affinity().has_pod_anti_affinity()) {
if (rtd->affinity().pod_anti_affinity()
.requiredduringschedulingignoredduringexecution_size()) {
unordered_map<string, string> labels;
if (!other_rtd) {
other_rtd = rtd;
}
for (auto& label : other_rtd->labels()) {
InsertIfNotPresent(&labels, label.key(), label.value());
}
for (auto& term : rtd->affinity().pod_anti_affinity()
.requiredduringschedulingignoredduringexecution()) {
unordered_set<string> namespaces;
if (!term.namespaces_size()) {
namespaces.insert(rtd->task_namespace());
} else {
for (auto name : term.namespaces()) {
namespaces.insert(name);
}
}
if (namespaces.find(rtd->task_namespace()) != namespaces.end()) {
return false;
}
if (term.has_labelselector()) {
if (term.labelselector().matchexpressions_size()) {
for (auto& expression_selector :
term.labelselector().matchexpressions()) {
LabelSelectorRequirement expression;
expression.set_key(expression_selector.key());
expression.set_operator_(expression_selector.operator_());
for (auto& value : expression_selector.values()) {
expression.add_values(value);
}
if (expression.operator_() == std::string("In")) {
if (MatchExpressionWithPodLabels(labels, expression)) return false;
} else if (expression.operator_() == std::string("NotIn")) {
if (NotMatchExpressionWithPodLabels(labels, expression)) return false;
} else if (expression.operator_() == std::string("Exists")) {
if (MatchExpressionKeyWithPodLabels(labels, expression)) return false;
} else if (expression.operator_() == std::string("DoesNotExist")) {
if (NotMatchExpressionKeyWithPodLabels(labels, expression)) return false;
} else {
LOG(FATAL) << "Unsupported selector type: " << expression.operator_();
return false;
}
}
}
}
}
}
}
return true;
}

unordered_set<TaskID_t>* EventDrivenScheduler::GetNoConflictTasksSet() {
return &no_conflict_root_tasks_;
}

void EventDrivenScheduler::BindTaskToResource(TaskDescriptor* td_ptr,
Expand Down Expand Up @@ -659,33 +831,19 @@ void EventDrivenScheduler::LazyGraphReduction(
current_task->state() == TaskDescriptor::BLOCKING) {
if (!will_block || (current_task->dependencies_size() == 0
&& current_task->outputs_size() == 0)) {
// Pod affinity/anti-affinity
if (current_task->has_affinity() &&
if (!affinity_batch_schedule && current_task->has_affinity() &&
(current_task->affinity().has_pod_affinity() ||
current_task->affinity().has_pod_anti_affinity())) {
if (queue_based_schedule == false || one_task_runnable == true)
continue;
for (auto task_itr = affinity_antiaffinity_tasks_->begin();
task_itr != affinity_antiaffinity_tasks_->end(); ++task_itr) {
TaskDescriptor* tdp = FindPtrOrNull(*task_map_, *task_itr);
if (tdp) {
if ((tdp->state() == TaskDescriptor::RUNNABLE) &&
(one_task_runnable == false)) {
TaskID_t task_id = *task_itr;
tdp->set_state(TaskDescriptor::CREATED);
task_itr = affinity_antiaffinity_tasks_->erase(task_itr);
affinity_antiaffinity_tasks_->push_back(task_id);
JobID_t tdp_job_id = JobIDFromString(tdp->job_id());
runnable_tasks_[tdp_job_id].erase(task_id);
continue;
}
if (tdp->state() == TaskDescriptor::CREATED) {
tdp->set_state(TaskDescriptor::RUNNABLE);
InsertTaskIntoRunnables(JobIDFromString(tdp->job_id()),
tdp->uid());
one_task_runnable = true;
break;
}
auto task_itr = affinity_antiaffinity_tasks_->begin();
TaskDescriptor* tdp = FindPtrOrNull(*task_map_, *task_itr);
if (tdp) {
if (tdp->state() == TaskDescriptor::CREATED) {
tdp->set_state(TaskDescriptor::RUNNABLE);
InsertTaskIntoRunnables(JobIDFromString(tdp->job_id()),
tdp->uid());
one_task_runnable = true;
}
}
} else {
Expand Down
19 changes: 19 additions & 0 deletions src/scheduling/event_driven_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,20 @@ class EventDrivenScheduler : public SchedulerInterface {
bool local,
bool simulated);
bool UnbindTaskFromResource(TaskDescriptor* td_ptr, ResourceID_t res_id);
void AddPodAffinityAntiAffinityJobData(JobDescriptor* jd_ptr);
bool MatchExpressionWithPodLabels(unordered_map<string, string>& labels,
const LabelSelectorRequirement& expression);
bool NotMatchExpressionWithPodLabels(unordered_map<string, string>& labels,
const LabelSelectorRequirement& expression);
bool MatchExpressionKeyWithPodLabels(unordered_map<string, string>& labels,
const LabelSelectorRequirement& expression);
bool NotMatchExpressionKeyWithPodLabels(unordered_map<string, string>& labels,
const LabelSelectorRequirement& expression);
bool CheckPodAffinityNoConflictWithin(TaskDescriptor* td_ptr,
TaskDescriptor* other_rtd);
bool CheckPodAntiAffinityNoConflictWithin(TaskDescriptor* rtd,
TaskDescriptor* other_rtd);
unordered_set<TaskID_t>* GetNoConflictTasksSet();

// Cached sets of runnable and blocked tasks; these are updated on each
// execution of LazyGraphReduction. Note that this set includes tasks from all
Expand Down Expand Up @@ -194,6 +208,11 @@ class EventDrivenScheduler : public SchedulerInterface {
// Pod affinity/anti-affinity gang schedule tasks deltas
unordered_map<JobDescriptor*, vector<SchedulingDelta>> affinity_job_to_deltas_;
unordered_set<uint64_t> affinity_delta_tasks;
unordered_set<TaskID_t> no_conflict_root_tasks_;
bool affinity_batch_schedule;
unordered_map<TaskID_t, vector<TaskID_t>> no_conflict_tasks_map_;
unordered_map<TaskID_t, TaskID_t> no_conflict_task_mapped_;
unordered_map<TaskID_t, unordered_set<TaskID_t>> root_to_children_tasks_;
};

} // namespace scheduler
Expand Down
56 changes: 35 additions & 21 deletions src/scheduling/firmament_scheduler_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,22 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
// Schedule tasks which does not have pod affinity/anti-affinity
// requirements.
scheduler_->ScheduleAllJobs(&sstat, &deltas);

uint64_t total_unsched_tasks_size = 0;
vector<uint64_t> unscheduled_normal_tasks;
vector<uint64_t> unscheduled_batch_tasks;
if (FLAGS_gather_unscheduled_tasks) {
// Get unscheduled tasks of above scheduling round.
cost_model_->GetUnscheduledTasks(&unscheduled_normal_tasks);
cost_model_->GetUnscheduledTasks(&unscheduled_batch_tasks);
}
// [pod affinity/anti-affinity batch schedule]
vector<TaskID_t>* unsched_batch_affinity_tasks =
scheduler_->ScheduleAllAffinityBatchJobs(&sstat, &deltas);
for (auto unsched_batch_affinity_task : *unsched_batch_affinity_tasks) {
unscheduled_batch_tasks.push_back(unsched_batch_affinity_task);
}
delete unsched_batch_affinity_tasks;

// Schedule tasks having pod affinity/anti-affinity.
// Queue schedule tasks having pod affinity/anti-affinity.
clock_t start = clock();
uint64_t elapsed = 0;
unordered_set<uint64_t> unscheduled_affinity_tasks_set;
Expand All @@ -292,37 +300,36 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
->GetSingleTaskTobeScheduled();
if (FLAGS_gather_unscheduled_tasks) {
TaskDescriptor* td_ptr = FindPtrOrNull(*task_map_, task_id);
CHECK_NOTNULL(td_ptr);
JobDescriptor* jd =
if (td_ptr) {
JobDescriptor* jd =
FindOrNull(*job_map_, JobIDFromString(td_ptr->job_id()));
if (!(jd->is_gang_scheduling_job())) {
if (!task_scheduled) {
if (unscheduled_affinity_tasks_set.find(task_id) ==
unscheduled_affinity_tasks_set.end()) {
unscheduled_affinity_tasks_set.insert(task_id);
unscheduled_affinity_tasks.push_back(task_id);
if (!(jd->is_gang_scheduling_job())) {
if (!task_scheduled) {
if (unscheduled_affinity_tasks_set.find(task_id) ==
unscheduled_affinity_tasks_set.end()) {
unscheduled_affinity_tasks_set.insert(task_id);
unscheduled_affinity_tasks.push_back(task_id);
}
} else {
unscheduled_affinity_tasks_set.erase(task_id);
}
} else {
unscheduled_affinity_tasks_set.erase(task_id);
}
}
}
clock_t stop = clock();
elapsed = (double)(stop - start) * 1000.0 / CLOCKS_PER_SEC;
}
//For pod affinity/anti-affinity gang scheduling tasks
scheduler_->UpdateGangSchedulingDeltas(&sstat, &deltas,
&unscheduled_normal_tasks,
&unscheduled_batch_tasks,
&unscheduled_affinity_tasks_set,
&unscheduled_affinity_tasks);

// Get unscheduled tasks of above scheduling round which tried scheduling
// tasks having pod affinity/anti-affinity. And populate the same into
// reply.
if (FLAGS_gather_unscheduled_tasks) {
auto unscheduled_normal_tasks_ret = reply->mutable_unscheduled_tasks();
for (auto& unsched_task : unscheduled_normal_tasks) {
uint64_t* unsched_task_ret = unscheduled_normal_tasks_ret->Add();
auto unscheduled_batch_tasks_ret = reply->mutable_unscheduled_tasks();
for (auto& unsched_task : unscheduled_batch_tasks) {
uint64_t* unsched_task_ret = unscheduled_batch_tasks_ret->Add();
*unsched_task_ret = unsched_task;
total_unsched_tasks_size++;
}
Expand Down Expand Up @@ -523,7 +530,13 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
}
if (td.has_affinity() && (td.affinity().has_pod_affinity() ||
td.affinity().has_pod_anti_affinity())) {
affinity_antiaffinity_tasks_.push_back(task_id);
unordered_set<TaskID_t>* no_conflict_tasks =
scheduler_->GetNoConflictTasksSet();
JobID_t job_id = JobIDFromString(td.job_id());
JobDescriptor* jd_ptr = FindOrNull(*job_map_, job_id);
if (no_conflict_tasks->find(jd_ptr->root_task().uid()) == no_conflict_tasks->end()) {
affinity_antiaffinity_tasks_.push_back(task_id);
}
}
}

Expand All @@ -541,7 +554,6 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
reply->set_type(TaskReplyType::TASK_STATE_NOT_CREATED);
return Status::OK;
}
AddTaskToLabelsMap(task_desc_ptr->task_descriptor());
JobID_t job_id = JobIDFromString(task_desc_ptr->task_descriptor().job_id());
JobDescriptor* jd_ptr = FindOrNull(*job_map_, job_id);
if (jd_ptr == NULL) {
Expand All @@ -562,13 +574,15 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
td_ptr->CopyFrom(task_desc_ptr->task_descriptor());
CHECK(InsertIfNotPresent(task_map_.get(), td_ptr->uid(), td_ptr));
td_ptr->set_submit_time(wall_time_.GetCurrentTimestamp());
scheduler_->UpdateSpawnedToRootTaskMap(td_ptr);
}
uint64_t* num_incomplete_tasks =
FindOrNull(job_num_incomplete_tasks_, job_id);
CHECK_NOTNULL(num_incomplete_tasks);
if (*num_incomplete_tasks == 0) {
scheduler_->AddJob(jd_ptr);
}
AddTaskToLabelsMap(task_desc_ptr->task_descriptor());
(*num_incomplete_tasks)++;
uint64_t* num_tasks_to_remove =
FindOrNull(job_num_tasks_to_remove_, job_id);
Expand Down
Loading

0 comments on commit 9eb2dc3

Please sign in to comment.