Skip to content

Commit

Permalink
Added code which computes machine samples/resource stats statically a…
Browse files Browse the repository at this point in the history
…nd updates knowledge base
  • Loading branch information
shivramsrivastava committed May 7, 2018
1 parent d0aef37 commit dc96ff6
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
29 changes: 29 additions & 0 deletions src/scheduling/firmament_scheduler_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,35 @@ class FirmamentSchedulerServiceImpl final :
// it such that Firmament does not mandatorily create an executor.
scheduler_->RegisterResource(rtnd_ptr, false, true);
reply->set_type(NodeReplyType::NODE_ADDED_OK);

// Add Node initial status simulation
ResourceStats resource_stats;
ResourceID_t res_id = ResourceIDFromString(rtnd_ptr->resource_desc().uuid());
ResourceStatus* rs_ptr = FindPtrOrNull(*resource_map_, res_id);
if (rs_ptr == NULL || rs_ptr->mutable_descriptor() == NULL) {
reply->set_type(NodeReplyType::NODE_NOT_FOUND);
return Status::OK;
}
resource_stats.set_resource_id(rtnd_ptr->resource_desc().uuid());
resource_stats.set_timestamp(0);
CpuStats* cpu_stats = resource_stats.add_cpus_stats();
cpu_stats->set_cpu_capacity(rtnd_ptr->resource_desc().resource_capacity().cpu_cores());
// Assuming 80% of cpu/mem is is allocatable neglecting 20% for other processes in node.
cpu_stats->set_cpu_allocatable(rtnd_ptr->resource_desc().resource_capacity().cpu_cores()*0.80);
//resource_stats.cpus_stats(0).set_cpu_utilization(0.0);
//resource_stats.cpus_stats(0).set_cpu_reservation(0.0);
resource_stats.set_mem_allocatable(rtnd_ptr->resource_desc().resource_capacity().ram_cap());
resource_stats.set_mem_capacity(rtnd_ptr->resource_desc().resource_capacity().ram_cap()*0.80);
//resource_stats.set_mem_utilization(0.0);
//resource_stats.set_mem_reservation(0.0);
resource_stats.set_disk_bw(0);
resource_stats.set_net_rx_bw(0);
resource_stats.set_net_tx_bw(0);
//LOG(INFO) << "DEBUG: During node additions: CPU CAP: " << cpu_stats->cpu_capacity() << "\n"
// << " CPU ALLOC: " << cpu_stats->cpu_allocatable() << "\n"
// << " MEM CAP: " << resource_stats.mem_capacity() << "\n"
// << " MEM ALLOC: " << resource_stats.mem_allocatable();
knowledge_base_->AddMachineSample(resource_stats);
return Status::OK;
}

Expand Down
29 changes: 28 additions & 1 deletion src/scheduling/flow/flow_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ uint64_t FlowScheduler::ApplySchedulingDeltas(
const vector<SchedulingDelta*>& deltas) {
uint64_t num_scheduled = 0;
// Perform the necessary actions to apply the scheduling changes.
VLOG(2) << "Applying " << deltas.size() << " scheduling deltas...";
//VLOG(2) << "Applying " << deltas.size() << " scheduling deltas...";
LOG(INFO) << "Applying " << deltas.size() << " scheduling deltas...";
for (auto& delta : deltas) {
VLOG(2) << "Processing delta of type " << delta->type();
ResourceID_t res_id = ResourceIDFromString(delta->resource_id());
Expand All @@ -198,6 +199,19 @@ uint64_t FlowScheduler::ApplySchedulingDeltas(
// We should not get any NOOP deltas as they get filtered before.
continue;
} else if (delta->type() == SchedulingDelta::PLACE) {
// Resource stats simulation
ResourceStats resource_stats;
CpuStats* cpu_stats = resource_stats.add_cpus_stats();
bool have_sample =
knowledge_base_->GetLatestStatsForMachine(ResourceIDFromString(rs->mutable_topology_node()->parent_id()), &resource_stats);
if(have_sample) {
cpu_stats->set_cpu_allocatable(cpu_stats->cpu_allocatable() - td_ptr->resource_request().cpu_cores());
resource_stats.set_mem_allocatable(resource_stats.mem_allocatable() - td_ptr->resource_request().ram_cap());
//LOG(INFO) << "DEBUG: While applying PLACE scheduling deltas after iteration: \n"
// << "CPU CAP: " << cpu_stats->cpu_capacity() << ", " << "CPU ALLOC: " << cpu_stats->cpu_allocatable() << "\n"
// << "MEM CAP: " << resource_stats.mem_capacity() << ", " << "MEM ALLOC: " << resource_stats.mem_allocatable();
knowledge_base_->AddMachineSample(resource_stats);
}
// Tag the job to which this task belongs as running
JobDescriptor* jd =
FindOrNull(*job_map_, JobIDFromString(td_ptr->job_id()));
Expand All @@ -206,6 +220,19 @@ uint64_t FlowScheduler::ApplySchedulingDeltas(
HandleTaskPlacement(td_ptr, rs->mutable_descriptor());
num_scheduled++;
} else if (delta->type() == SchedulingDelta::PREEMPT) {
// Resource stats simulation
ResourceStats resource_stats;
CpuStats* cpu_stats = resource_stats.add_cpus_stats();
bool have_sample =
knowledge_base_->GetLatestStatsForMachine(ResourceIDFromString(rs->mutable_topology_node()->parent_id()), &resource_stats);
if(have_sample) {
cpu_stats->set_cpu_allocatable(cpu_stats->cpu_allocatable() + td_ptr->resource_request().cpu_cores());
resource_stats.set_mem_allocatable(resource_stats.mem_allocatable() + td_ptr->resource_request().ram_cap());
//LOG(INFO) << "DEBUG: While applying PREEMPT scheduling deltas after iteration: \n"
// << "CPU CAP: " << cpu_stats->cpu_capacity() << ", " << "CPU ALLOC: " << cpu_stats->cpu_allocatable() << "\n"
// << "MEM CAP: " << resource_stats.mem_capacity() << ", " << "MEM ALLOC: " << resource_stats.mem_allocatable();
knowledge_base_->AddMachineSample(resource_stats);
}
HandleTaskEviction(td_ptr, rs->mutable_descriptor());
} else if (delta->type() == SchedulingDelta::MIGRATE) {
HandleTaskMigration(td_ptr, rs->mutable_descriptor());
Expand Down

0 comments on commit dc96ff6

Please sign in to comment.