diff --git a/config/firmament_scheduler.cfg b/config/firmament_scheduler.cfg index fb76b94f2..e452d1a7d 100644 --- a/config/firmament_scheduler.cfg +++ b/config/firmament_scheduler.cfg @@ -1,15 +1,15 @@ ---firmament_scheduler_service_address=127.0.0.1 +--firmament_scheduler_service_address=0.0.0.0 --firmament_scheduler_service_port=9090 --service_scheduler=flow --logtostderr ---max_tasks_per_pu=1 +--max_tasks_per_pu=50 --max_sample_queue_size=100 # Firmament should not reschedule pods upon node failure because Kubernetes # automatically creates new pods to replace the failed pods. --reschedule_tasks_upon_node_failure=false --flow_scheduling_cost_model=6 --flow_scheduling_solver=flowlessly ---flow_scheduling_binary=build/third_party/flowlessly/src/flowlessly-build/flow_scheduler +--flow_scheduling_binary=debug/third_party/flowlessly/src/flowlessly-build/flow_scheduler --flowlessly_algorithm=successive_shortest_path --log_solver_stderr --run_incremental_scheduler=false diff --git a/contrib/docker/firmament-default.conf b/contrib/docker/firmament-default.conf index 97af223dd..9ecfca55c 100644 --- a/contrib/docker/firmament-default.conf +++ b/contrib/docker/firmament-default.conf @@ -3,6 +3,6 @@ # Use the flow scheduler --scheduler=flow # Use the Whare-Map cost model ---flow_scheduling_cost_model=4 +--flow_scheduling_cost_model=8 # Enable flow graph debugging --debug_flow_graph diff --git a/src/scheduling/event_driven_scheduler.h b/src/scheduling/event_driven_scheduler.h index 71df5ae29..a966f8b55 100644 --- a/src/scheduling/event_driven_scheduler.h +++ b/src/scheduling/event_driven_scheduler.h @@ -170,7 +170,8 @@ class EventDrivenScheduler : public SchedulerInterface { MessagingAdapterInterface* m_adapter_ptr_; // A lock indicating if the scheduler is currently // in the process of making scheduling decisions. - boost::recursive_mutex scheduling_lock_; + // Jagadish moving this to shceuler interface class + // boost::recursive_mutex scheduling_lock_; // Map of reference subscriptions map> reference_subscriptions_; // The current resource to task bindings managed by this scheduler. diff --git a/src/scheduling/firmament_scheduler_service.cc b/src/scheduling/firmament_scheduler_service.cc index f67aa1f71..f5735e70d 100644 --- a/src/scheduling/firmament_scheduler_service.cc +++ b/src/scheduling/firmament_scheduler_service.cc @@ -37,6 +37,7 @@ #include "scheduling/scheduling_delta.pb.h" #include "scheduling/simple/simple_scheduler.h" #include "storage/simple_object_store.h" +#include "scheduling/event_driven_scheduler.h" using grpc::Server; using grpc::ServerBuilder; @@ -127,9 +128,11 @@ class FirmamentSchedulerServiceImpl final : vector deltas; scheduler_->ScheduleAllJobs(&sstat, &deltas); // Extract results - LOG(INFO) << "Got " << deltas.size() << " scheduling deltas"; + if(deltas.size()) { + LOG(INFO) << "Got " << deltas.size() << " scheduling deltas"; + } for (auto& d : deltas) { - LOG(INFO) << "Delta: " << d.DebugString(); + //LOG(INFO) << "Delta: " << d.DebugString(); SchedulingDelta* ret_delta = reply->add_deltas(); ret_delta->CopyFrom(d); if (d.type() == SchedulingDelta::PLACE) { @@ -197,6 +200,7 @@ class FirmamentSchedulerServiceImpl final : Status TaskRemoved(ServerContext* context, const TaskUID* tid_ptr, TaskRemovedResponse* reply) override { + boost::lock_guard lock(scheduler_->scheduling_lock_); TaskDescriptor* td_ptr = FindPtrOrNull(*task_map_, tid_ptr->task_uid()); if (td_ptr == NULL) { reply->set_type(TaskReplyType::TASK_NOT_FOUND); @@ -234,6 +238,7 @@ class FirmamentSchedulerServiceImpl final : Status TaskSubmitted(ServerContext* context, const TaskDescription* task_desc_ptr, TaskSubmittedResponse* reply) override { + boost::lock_guard lock(scheduler_->scheduling_lock_); TaskID_t task_id = task_desc_ptr->task_descriptor().uid(); if (FindPtrOrNull(*task_map_, task_id)) { reply->set_type(TaskReplyType::TASK_ALREADY_SUBMITTED); @@ -245,6 +250,7 @@ class FirmamentSchedulerServiceImpl final : } JobID_t job_id = JobIDFromString(task_desc_ptr->task_descriptor().job_id()); JobDescriptor* jd_ptr = FindOrNull(*job_map_, job_id); + //LOG(INFO) << "Job id is " << job_id ; if (jd_ptr == NULL) { CHECK(InsertIfNotPresent(job_map_.get(), job_id, task_desc_ptr->job_descriptor())); @@ -271,6 +277,8 @@ class FirmamentSchedulerServiceImpl final : uint64_t* num_tasks_to_remove = FindOrNull(job_num_tasks_to_remove_, job_id); (*num_tasks_to_remove)++; + if( (*num_tasks_to_remove) >= 3800 ) + LOG(INFO) << "All tasks are submitted" << job_id << ", " << 3800 ; reply->set_type(TaskReplyType::TASK_SUBMITTED_OK); return Status::OK; } @@ -320,6 +328,7 @@ class FirmamentSchedulerServiceImpl final : Status NodeAdded(ServerContext* context, const ResourceTopologyNodeDescriptor* submitted_rtnd_ptr, NodeAddedResponse* reply) override { + boost::lock_guard lock(scheduler_->scheduling_lock_); bool doesnt_exist = DFSTraverseResourceProtobufTreeWhileTrue( *submitted_rtnd_ptr, boost::bind(&FirmamentSchedulerServiceImpl::CheckResourceDoesntExist, @@ -469,6 +478,8 @@ class FirmamentSchedulerServiceImpl final : CHECK(InsertIfNotPresent(resource_map_.get(), res_id, rs_ptr)); return rs_ptr; } + boost::recursive_mutex task_submission_lock_; + boost::recursive_mutex node_addition_lock_; }; diff --git a/src/scheduling/scheduler_interface.h b/src/scheduling/scheduler_interface.h index 7a652d30e..5caf75271 100644 --- a/src/scheduling/scheduler_interface.h +++ b/src/scheduling/scheduler_interface.h @@ -66,6 +66,7 @@ struct SchedulerStats { class SchedulerInterface : public PrintableInterface { public: + boost::recursive_mutex scheduling_lock_; SchedulerInterface(shared_ptr job_map, shared_ptr knowledge_base, shared_ptr resource_map,