diff --git a/src/base/resource_desc.proto b/src/base/resource_desc.proto index b3d463883..bde06f25a 100644 --- a/src/base/resource_desc.proto +++ b/src/base/resource_desc.proto @@ -68,4 +68,6 @@ message ResourceDescriptor { repeated Taint taints = 33; // Avoid pods annotations repeated AvoidPodsAnnotation avoids = 34; + // Max pods allowed per node + uint64 max_pods = 35; } diff --git a/src/base/task_stats.proto b/src/base/task_stats.proto index d08ecff7f..c0f43af25 100644 --- a/src/base/task_stats.proto +++ b/src/base/task_stats.proto @@ -33,4 +33,7 @@ message TaskStats { int64 net_tx_errors = 22; double net_tx_errors_rate = 23; double net_tx_rate = 24; + int64 ephemeral_storage_limit = 25; + int64 ephemeral_storage_request = 26; + int64 ephemeral_storage_usage = 27; } diff --git a/src/scheduling/firmament_scheduler.proto b/src/scheduling/firmament_scheduler.proto index 03f31ad30..4b19cad4d 100644 --- a/src/scheduling/firmament_scheduler.proto +++ b/src/scheduling/firmament_scheduler.proto @@ -30,6 +30,7 @@ service FirmamentScheduler { rpc AddNodeStats (ResourceStats) returns (ResourceStatsResponse) {} rpc Check(HealthCheckRequest) returns (HealthCheckResponse); + rpc AddTaskInfo (TaskInfo) returns (TaskInfoResponse); } message ScheduleRequest {} @@ -133,3 +134,29 @@ message HealthCheckResponse { ServingStatus status = 1; } +// TaskInfo is the stats(including CPU, Memory and ephemeral) of a task. +enum TaskInfoType { + TASKINFO_ADD = 0; + TASKINFO_REMOVE = 1; +} + +message TaskInfo { + string task_name = 1; // podname/namespace + string resource_id = 2; + int64 cpu_utilization = 3; + int64 mem_utilization = 4; + int64 ephemeral_storage_utilization = 5; + TaskInfoType type = 6; +} + +enum TaskInfoReplyType { + TASKINFO_SUBMITTED_OK = 0; + TASKINFO_REMOVED_OK = 2; + TASKINFO_SUBMIT_FAILED = 3; + TASKINFO_REMOVE_FAILED = 4; +} + +message TaskInfoResponse { + TaskInfoReplyType type = 1; +} + diff --git a/src/scheduling/firmament_scheduler_service.cc b/src/scheduling/firmament_scheduler_service.cc index c0d07884d..d0b8a2856 100644 --- a/src/scheduling/firmament_scheduler_service.cc +++ b/src/scheduling/firmament_scheduler_service.cc @@ -176,6 +176,88 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { } } + // Update Non-Firmament related node information. + void UpdateStatsToKnowledgeBase(ResourceStats* resource_stats, + CpuStats* cpu_stats) { + double cpu_utilization = + (cpu_stats->cpu_capacity() - cpu_stats->cpu_allocatable()) / + (double)cpu_stats->cpu_capacity(); + cpu_stats->set_cpu_utilization(cpu_utilization); + double mem_utilization = (resource_stats->mem_capacity() - + resource_stats->mem_allocatable()) / + (double)resource_stats->mem_capacity(); + resource_stats->set_mem_utilization(mem_utilization); + double ephemeral_storage_utilization = (resource_stats->ephemeral_storage_capacity() - + resource_stats->ephemeral_storage_allocatable()) / + (double)resource_stats->ephemeral_storage_capacity(); + resource_stats->set_ephemeral_storage_utilization(ephemeral_storage_utilization); + knowledge_base_->AddMachineSample(*resource_stats); + } + + Status AddTaskInfo (ServerContext* context, const TaskInfo* request, + TaskInfoResponse* response) override { + //boost::lock_guard lock( + // scheduler_->scheduling_lock_); + ResourceID_t res_id = ResourceIDFromString(request->resource_id()); + ResourceStatus* rs_ptr = FindPtrOrNull(*resource_map_, res_id); + if (rs_ptr == NULL || rs_ptr->mutable_descriptor() == NULL) { + response->set_type(TaskInfoReplyType::TASKINFO_SUBMIT_FAILED); + return Status::OK; + } + ResourceStats resource_stats; + CpuStats* cpu_stats = resource_stats.add_cpus_stats(); + bool have_sample = knowledge_base_->GetLatestStatsForMachine( + res_id, &resource_stats); + if (have_sample) { + switch (request->type()) { + case TaskInfoType::TASKINFO_ADD: { + if (!InsertIfNotPresent(&task_resource_map_, + request->task_name(), res_id)) { + response->set_type(TaskInfoReplyType::TASKINFO_SUBMIT_FAILED); + return Status::OK; + } + cpu_stats->set_cpu_allocatable( + cpu_stats->cpu_allocatable() - + request->cpu_utilization()); + resource_stats.set_mem_allocatable( + resource_stats.mem_allocatable() - + request->mem_utilization()); + resource_stats.set_ephemeral_storage_allocatable( + resource_stats.ephemeral_storage_allocatable() - + request->ephemeral_storage_utilization()); + knowledge_base_->UpdateResourceNonFirmamentTaskCount(res_id, true); + UpdateStatsToKnowledgeBase(&resource_stats, cpu_stats); + response->set_type(TaskInfoReplyType::TASKINFO_SUBMITTED_OK); + return Status::OK; + } + case TaskInfoType::TASKINFO_REMOVE: { + ResourceID_t* rid = FindOrNull(task_resource_map_, + request->task_name()); + if (rid == NULL) { + response->set_type(TaskInfoReplyType::TASKINFO_REMOVE_FAILED); + return Status::OK; + } + cpu_stats->set_cpu_allocatable( + cpu_stats->cpu_allocatable() + + request->cpu_utilization()); + resource_stats.set_mem_allocatable( + resource_stats.mem_allocatable() + + request->mem_utilization()); + resource_stats.set_ephemeral_storage_allocatable( + resource_stats.ephemeral_storage_allocatable() + + request->ephemeral_storage_utilization()); + knowledge_base_->UpdateResourceNonFirmamentTaskCount(res_id, false); + UpdateStatsToKnowledgeBase(&resource_stats, cpu_stats); + response->set_type(TaskInfoReplyType::TASKINFO_REMOVED_OK); + return Status::OK; + } + default: + LOG(FATAL) << "Unsupported request type: " << request->type(); + } + } + return Status::OK; + } + Status Schedule(ServerContext* context, const ScheduleRequest* request, SchedulingDeltas* reply) override { boost::lock_guard lock( @@ -748,6 +830,7 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { // Pod affinity/anti-affinity unordered_map>> labels_map_; vector affinity_antiaffinity_tasks_; + unordered_map task_resource_map_; ResourceStatus* CreateTopLevelResource() { ResourceID_t res_id = GenerateResourceID(); diff --git a/src/scheduling/flow/cpu_cost_model.cc b/src/scheduling/flow/cpu_cost_model.cc index a536cf3a7..76786a88d 100644 --- a/src/scheduling/flow/cpu_cost_model.cc +++ b/src/scheduling/flow/cpu_cost_model.cc @@ -130,7 +130,9 @@ ArcDescriptor CpuCostModel::ResourceNodeToResourceNode( } ArcDescriptor CpuCostModel::LeafResourceNodeToSink(ResourceID_t resource_id) { - return ArcDescriptor(0LL, FLAGS_max_tasks_per_pu, 0ULL); + ResourceStatus* rs = FindPtrOrNull(*resource_map_, resource_id); + ResourceTopologyNodeDescriptor* rtnd = rs->mutable_topology_node(); + return ArcDescriptor(0LL, rtnd->resource_desc().num_slots_below(), 0ULL);; } ArcDescriptor CpuCostModel::TaskContinuation(TaskID_t task_id) { @@ -1523,14 +1525,15 @@ vector* CpuCostModel::GetEquivClassToEquivClassesArcs( CHECK_NOTNULL(ecs_for_machine); uint64_t index = 0; CpuMemResVector_t cur_resource; - uint64_t task_count = rd.num_running_tasks_below(); + uint64_t task_count = rd.num_running_tasks_below() + + knowledge_base_->GetResourceNonFirmamentTaskCount(res_id); //TODO(Pratik) : FLAGS_max_tasks_per_pu is treated as equivalent to max-pods, // as max-pods functionality is not yet merged at this point. for (cur_resource = *task_resource_request; cur_resource.cpu_cores_ < available_resources.cpu_cores_ && cur_resource.ram_cap_ < available_resources.ram_cap_ && cur_resource.ephemeral_storage_ < available_resources.ephemeral_storage_ && - index < ecs_for_machine->size() && task_count < FLAGS_max_tasks_per_pu; + index < ecs_for_machine->size() && task_count < rd.max_pods(); cur_resource.cpu_cores_ += task_resource_request->cpu_cores_, cur_resource.ram_cap_ += task_resource_request->ram_cap_, cur_resource.ephemeral_storage_ += task_resource_request->ephemeral_storage_, @@ -1562,7 +1565,7 @@ void CpuCostModel::AddMachine(ResourceTopologyNodeDescriptor* rtnd_ptr) { CHECK(rd.type() == ResourceDescriptor::RESOURCE_MACHINE); ResourceID_t res_id = ResourceIDFromString(rd.uuid()); vector machine_ecs; - for (uint64_t index = 0; index < FLAGS_max_multi_arcs_for_cpu; ++index) { + for (uint64_t index = 0; index < rd.max_pods(); ++index) { EquivClass_t multi_machine_ec = GetMachineEC(rd.friendly_name(), index); machine_ecs.push_back(multi_machine_ec); CHECK(InsertIfNotPresent(&ec_to_index_, multi_machine_ec, index)); @@ -1641,7 +1644,9 @@ FlowGraphNode* CpuCostModel::GatherStats(FlowGraphNode* accumulator, } // Running/idle task count rd_ptr->set_num_running_tasks_below(rd_ptr->current_running_tasks_size()); - rd_ptr->set_num_slots_below(FLAGS_max_tasks_per_pu); + ResourceStatus* m_rs = FindPtrOrNull(*resource_map_, machine_res_id); + ResourceTopologyNodeDescriptor* m_rtnd = m_rs->mutable_topology_node(); + rd_ptr->set_num_slots_below(m_rtnd->resource_desc().max_pods()); return accumulator; } } else if (accumulator->type_ == FlowNodeType::MACHINE) { diff --git a/src/scheduling/knowledge_base.cc b/src/scheduling/knowledge_base.cc index 5a5c2c0e3..c13a353a1 100644 --- a/src/scheduling/knowledge_base.cc +++ b/src/scheduling/knowledge_base.cc @@ -343,4 +343,30 @@ void KnowledgeBase::ProcessTaskFinalReport( } } +void KnowledgeBase::UpdateResourceNonFirmamentTaskCount(ResourceID_t res_id, bool add) { + uint64_t* tasks_count = FindOrNull(resource_tasks_count_, res_id); + if (tasks_count) { + if (add) { + (*tasks_count)++; + } else { + if (*tasks_count > 0) { + (*tasks_count)--; + } + } + } else { + if (add) { + CHECK(InsertOrUpdate(&resource_tasks_count_, res_id, 1)); + } + } +} + +uint64_t KnowledgeBase::GetResourceNonFirmamentTaskCount(ResourceID_t res_id) { + uint64_t* tasks_count = FindOrNull(resource_tasks_count_, res_id); + if (tasks_count) { + return *tasks_count; + } else { + return 0; + } +} + } // namespace firmament diff --git a/src/scheduling/knowledge_base.h b/src/scheduling/knowledge_base.h index 1cbe0517c..28e8ad9ca 100644 --- a/src/scheduling/knowledge_base.h +++ b/src/scheduling/knowledge_base.h @@ -65,6 +65,8 @@ class KnowledgeBase { void LoadKnowledgeBaseFromFile(); void ProcessTaskFinalReport(const vector& equiv_classes, const TaskFinalReport& report); + void UpdateResourceNonFirmamentTaskCount(ResourceID_t res_id, bool add); + uint64_t GetResourceNonFirmamentTaskCount(ResourceID_t res_id); inline const DataLayerManagerInterface& data_layer_manager() { CHECK_NOTNULL(data_layer_manager_); return *data_layer_manager_; @@ -82,6 +84,8 @@ class KnowledgeBase { unordered_map > task_map_; unordered_map > task_exec_reports_; boost::upgrade_mutex kb_lock_; + unordered_map> resource_tasks_count_; private: fstream serial_machine_samples_;