Skip to content

Commit

Permalink
Merge pull request #27 from shivramsrivastava/with_max_pods_v07
Browse files Browse the repository at this point in the history
Max-Pods feature support in firmament
  • Loading branch information
shivramsrivastava authored Jan 24, 2019
2 parents fdb122d + 7356256 commit faef9e6
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 5 deletions.
2 changes: 2 additions & 0 deletions src/base/resource_desc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,6 @@ message ResourceDescriptor {
repeated Taint taints = 33;
// Avoid pods annotations
repeated AvoidPodsAnnotation avoids = 34;
// Max pods allowed per node
uint64 max_pods = 35;
}
3 changes: 3 additions & 0 deletions src/base/task_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ message TaskStats {
int64 net_tx_errors = 22;
double net_tx_errors_rate = 23;
double net_tx_rate = 24;
int64 ephemeral_storage_limit = 25;
int64 ephemeral_storage_request = 26;
int64 ephemeral_storage_usage = 27;
}
27 changes: 27 additions & 0 deletions src/scheduling/firmament_scheduler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ service FirmamentScheduler {
rpc AddNodeStats (ResourceStats) returns (ResourceStatsResponse) {}

rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
rpc AddTaskInfo (TaskInfo) returns (TaskInfoResponse);
}

message ScheduleRequest {}
Expand Down Expand Up @@ -133,3 +134,29 @@ message HealthCheckResponse {
ServingStatus status = 1;
}

// TaskInfo is the stats(including CPU, Memory and ephemeral) of a task.
enum TaskInfoType {
TASKINFO_ADD = 0;
TASKINFO_REMOVE = 1;
}

message TaskInfo {
string task_name = 1; // podname/namespace
string resource_id = 2;
int64 cpu_utilization = 3;
int64 mem_utilization = 4;
int64 ephemeral_storage_utilization = 5;
TaskInfoType type = 6;
}

enum TaskInfoReplyType {
TASKINFO_SUBMITTED_OK = 0;
TASKINFO_REMOVED_OK = 2;
TASKINFO_SUBMIT_FAILED = 3;
TASKINFO_REMOVE_FAILED = 4;
}

message TaskInfoResponse {
TaskInfoReplyType type = 1;
}

83 changes: 83 additions & 0 deletions src/scheduling/firmament_scheduler_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,88 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
}
}

// Update Non-Firmament related node information.
void UpdateStatsToKnowledgeBase(ResourceStats* resource_stats,
CpuStats* cpu_stats) {
double cpu_utilization =
(cpu_stats->cpu_capacity() - cpu_stats->cpu_allocatable()) /
(double)cpu_stats->cpu_capacity();
cpu_stats->set_cpu_utilization(cpu_utilization);
double mem_utilization = (resource_stats->mem_capacity() -
resource_stats->mem_allocatable()) /
(double)resource_stats->mem_capacity();
resource_stats->set_mem_utilization(mem_utilization);
double ephemeral_storage_utilization = (resource_stats->ephemeral_storage_capacity() -
resource_stats->ephemeral_storage_allocatable()) /
(double)resource_stats->ephemeral_storage_capacity();
resource_stats->set_ephemeral_storage_utilization(ephemeral_storage_utilization);
knowledge_base_->AddMachineSample(*resource_stats);
}

Status AddTaskInfo (ServerContext* context, const TaskInfo* request,
TaskInfoResponse* response) override {
//boost::lock_guard<boost::recursive_mutex> lock(
// scheduler_->scheduling_lock_);
ResourceID_t res_id = ResourceIDFromString(request->resource_id());
ResourceStatus* rs_ptr = FindPtrOrNull(*resource_map_, res_id);
if (rs_ptr == NULL || rs_ptr->mutable_descriptor() == NULL) {
response->set_type(TaskInfoReplyType::TASKINFO_SUBMIT_FAILED);
return Status::OK;
}
ResourceStats resource_stats;
CpuStats* cpu_stats = resource_stats.add_cpus_stats();
bool have_sample = knowledge_base_->GetLatestStatsForMachine(
res_id, &resource_stats);
if (have_sample) {
switch (request->type()) {
case TaskInfoType::TASKINFO_ADD: {
if (!InsertIfNotPresent(&task_resource_map_,
request->task_name(), res_id)) {
response->set_type(TaskInfoReplyType::TASKINFO_SUBMIT_FAILED);
return Status::OK;
}
cpu_stats->set_cpu_allocatable(
cpu_stats->cpu_allocatable() -
request->cpu_utilization());
resource_stats.set_mem_allocatable(
resource_stats.mem_allocatable() -
request->mem_utilization());
resource_stats.set_ephemeral_storage_allocatable(
resource_stats.ephemeral_storage_allocatable() -
request->ephemeral_storage_utilization());
knowledge_base_->UpdateResourceNonFirmamentTaskCount(res_id, true);
UpdateStatsToKnowledgeBase(&resource_stats, cpu_stats);
response->set_type(TaskInfoReplyType::TASKINFO_SUBMITTED_OK);
return Status::OK;
}
case TaskInfoType::TASKINFO_REMOVE: {
ResourceID_t* rid = FindOrNull(task_resource_map_,
request->task_name());
if (rid == NULL) {
response->set_type(TaskInfoReplyType::TASKINFO_REMOVE_FAILED);
return Status::OK;
}
cpu_stats->set_cpu_allocatable(
cpu_stats->cpu_allocatable() +
request->cpu_utilization());
resource_stats.set_mem_allocatable(
resource_stats.mem_allocatable() +
request->mem_utilization());
resource_stats.set_ephemeral_storage_allocatable(
resource_stats.ephemeral_storage_allocatable() +
request->ephemeral_storage_utilization());
knowledge_base_->UpdateResourceNonFirmamentTaskCount(res_id, false);
UpdateStatsToKnowledgeBase(&resource_stats, cpu_stats);
response->set_type(TaskInfoReplyType::TASKINFO_REMOVED_OK);
return Status::OK;
}
default:
LOG(FATAL) << "Unsupported request type: " << request->type();
}
}
return Status::OK;
}

Status Schedule(ServerContext* context, const ScheduleRequest* request,
SchedulingDeltas* reply) override {
boost::lock_guard<boost::recursive_mutex> lock(
Expand Down Expand Up @@ -748,6 +830,7 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
// Pod affinity/anti-affinity
unordered_map<string, unordered_map<string, vector<TaskID_t>>> labels_map_;
vector<TaskID_t> affinity_antiaffinity_tasks_;
unordered_map<string, ResourceID_t> task_resource_map_;

ResourceStatus* CreateTopLevelResource() {
ResourceID_t res_id = GenerateResourceID();
Expand Down
15 changes: 10 additions & 5 deletions src/scheduling/flow/cpu_cost_model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ ArcDescriptor CpuCostModel::ResourceNodeToResourceNode(
}

ArcDescriptor CpuCostModel::LeafResourceNodeToSink(ResourceID_t resource_id) {
return ArcDescriptor(0LL, FLAGS_max_tasks_per_pu, 0ULL);
ResourceStatus* rs = FindPtrOrNull(*resource_map_, resource_id);
ResourceTopologyNodeDescriptor* rtnd = rs->mutable_topology_node();
return ArcDescriptor(0LL, rtnd->resource_desc().num_slots_below(), 0ULL);;
}

ArcDescriptor CpuCostModel::TaskContinuation(TaskID_t task_id) {
Expand Down Expand Up @@ -1523,14 +1525,15 @@ vector<EquivClass_t>* CpuCostModel::GetEquivClassToEquivClassesArcs(
CHECK_NOTNULL(ecs_for_machine);
uint64_t index = 0;
CpuMemResVector_t cur_resource;
uint64_t task_count = rd.num_running_tasks_below();
uint64_t task_count = rd.num_running_tasks_below() +
knowledge_base_->GetResourceNonFirmamentTaskCount(res_id);
//TODO(Pratik) : FLAGS_max_tasks_per_pu is treated as equivalent to max-pods,
// as max-pods functionality is not yet merged at this point.
for (cur_resource = *task_resource_request;
cur_resource.cpu_cores_ < available_resources.cpu_cores_ &&
cur_resource.ram_cap_ < available_resources.ram_cap_ &&
cur_resource.ephemeral_storage_ < available_resources.ephemeral_storage_ &&
index < ecs_for_machine->size() && task_count < FLAGS_max_tasks_per_pu;
index < ecs_for_machine->size() && task_count < rd.max_pods();
cur_resource.cpu_cores_ += task_resource_request->cpu_cores_,
cur_resource.ram_cap_ += task_resource_request->ram_cap_,
cur_resource.ephemeral_storage_ += task_resource_request->ephemeral_storage_,
Expand Down Expand Up @@ -1562,7 +1565,7 @@ void CpuCostModel::AddMachine(ResourceTopologyNodeDescriptor* rtnd_ptr) {
CHECK(rd.type() == ResourceDescriptor::RESOURCE_MACHINE);
ResourceID_t res_id = ResourceIDFromString(rd.uuid());
vector<EquivClass_t> machine_ecs;
for (uint64_t index = 0; index < FLAGS_max_multi_arcs_for_cpu; ++index) {
for (uint64_t index = 0; index < rd.max_pods(); ++index) {
EquivClass_t multi_machine_ec = GetMachineEC(rd.friendly_name(), index);
machine_ecs.push_back(multi_machine_ec);
CHECK(InsertIfNotPresent(&ec_to_index_, multi_machine_ec, index));
Expand Down Expand Up @@ -1641,7 +1644,9 @@ FlowGraphNode* CpuCostModel::GatherStats(FlowGraphNode* accumulator,
}
// Running/idle task count
rd_ptr->set_num_running_tasks_below(rd_ptr->current_running_tasks_size());
rd_ptr->set_num_slots_below(FLAGS_max_tasks_per_pu);
ResourceStatus* m_rs = FindPtrOrNull(*resource_map_, machine_res_id);
ResourceTopologyNodeDescriptor* m_rtnd = m_rs->mutable_topology_node();
rd_ptr->set_num_slots_below(m_rtnd->resource_desc().max_pods());
return accumulator;
}
} else if (accumulator->type_ == FlowNodeType::MACHINE) {
Expand Down
26 changes: 26 additions & 0 deletions src/scheduling/knowledge_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,4 +343,30 @@ void KnowledgeBase::ProcessTaskFinalReport(
}
}

void KnowledgeBase::UpdateResourceNonFirmamentTaskCount(ResourceID_t res_id, bool add) {
uint64_t* tasks_count = FindOrNull(resource_tasks_count_, res_id);
if (tasks_count) {
if (add) {
(*tasks_count)++;
} else {
if (*tasks_count > 0) {
(*tasks_count)--;
}
}
} else {
if (add) {
CHECK(InsertOrUpdate(&resource_tasks_count_, res_id, 1));
}
}
}

uint64_t KnowledgeBase::GetResourceNonFirmamentTaskCount(ResourceID_t res_id) {
uint64_t* tasks_count = FindOrNull(resource_tasks_count_, res_id);
if (tasks_count) {
return *tasks_count;
} else {
return 0;
}
}

} // namespace firmament
4 changes: 4 additions & 0 deletions src/scheduling/knowledge_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class KnowledgeBase {
void LoadKnowledgeBaseFromFile();
void ProcessTaskFinalReport(const vector<EquivClass_t>& equiv_classes,
const TaskFinalReport& report);
void UpdateResourceNonFirmamentTaskCount(ResourceID_t res_id, bool add);
uint64_t GetResourceNonFirmamentTaskCount(ResourceID_t res_id);
inline const DataLayerManagerInterface& data_layer_manager() {
CHECK_NOTNULL(data_layer_manager_);
return *data_layer_manager_;
Expand All @@ -82,6 +84,8 @@ class KnowledgeBase {
unordered_map<TaskID_t, deque<TaskStats> > task_map_;
unordered_map<TaskID_t, deque<TaskFinalReport> > task_exec_reports_;
boost::upgrade_mutex kb_lock_;
unordered_map<ResourceID_t, uint64_t,
boost::hash<boost::uuids::uuid>> resource_tasks_count_;

private:
fstream serial_machine_samples_;
Expand Down

0 comments on commit faef9e6

Please sign in to comment.