Skip to content

Commit

Permalink
Lock improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
shivramsrivastava committed May 7, 2018
1 parent c7c677c commit 3195a49
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 7 deletions.
6 changes: 3 additions & 3 deletions config/firmament_scheduler.cfg
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
--firmament_scheduler_service_address=127.0.0.1
--firmament_scheduler_service_address=0.0.0.0
--firmament_scheduler_service_port=9090
--service_scheduler=flow
--logtostderr
--max_tasks_per_pu=1
--max_tasks_per_pu=50
--max_sample_queue_size=100
# Firmament should not reschedule pods upon node failure because Kubernetes
# automatically creates new pods to replace the failed pods.
--reschedule_tasks_upon_node_failure=false
--flow_scheduling_cost_model=6
--flow_scheduling_solver=flowlessly
--flow_scheduling_binary=build/third_party/flowlessly/src/flowlessly-build/flow_scheduler
--flow_scheduling_binary=debug/third_party/flowlessly/src/flowlessly-build/flow_scheduler
--flowlessly_algorithm=successive_shortest_path
--log_solver_stderr
--run_incremental_scheduler=false
Expand Down
2 changes: 1 addition & 1 deletion contrib/docker/firmament-default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
# Use the flow scheduler
--scheduler=flow
# Use the Whare-Map cost model
--flow_scheduling_cost_model=4
--flow_scheduling_cost_model=8
# Enable flow graph debugging
--debug_flow_graph
3 changes: 2 additions & 1 deletion src/scheduling/event_driven_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ class EventDrivenScheduler : public SchedulerInterface {
MessagingAdapterInterface<BaseMessage>* m_adapter_ptr_;
// A lock indicating if the scheduler is currently
// in the process of making scheduling decisions.
boost::recursive_mutex scheduling_lock_;
// Jagadish moving this to shceuler interface class
// boost::recursive_mutex scheduling_lock_;
// Map of reference subscriptions
map<DataObjectID_t, unordered_set<TaskDescriptor*>> reference_subscriptions_;
// The current resource to task bindings managed by this scheduler.
Expand Down
15 changes: 13 additions & 2 deletions src/scheduling/firmament_scheduler_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "scheduling/scheduling_delta.pb.h"
#include "scheduling/simple/simple_scheduler.h"
#include "storage/simple_object_store.h"
#include "scheduling/event_driven_scheduler.h"

using grpc::Server;
using grpc::ServerBuilder;
Expand Down Expand Up @@ -127,9 +128,11 @@ class FirmamentSchedulerServiceImpl final :
vector<SchedulingDelta> deltas;
scheduler_->ScheduleAllJobs(&sstat, &deltas);
// Extract results
LOG(INFO) << "Got " << deltas.size() << " scheduling deltas";
if(deltas.size()) {
LOG(INFO) << "Got " << deltas.size() << " scheduling deltas";
}
for (auto& d : deltas) {
LOG(INFO) << "Delta: " << d.DebugString();
//LOG(INFO) << "Delta: " << d.DebugString();
SchedulingDelta* ret_delta = reply->add_deltas();
ret_delta->CopyFrom(d);
if (d.type() == SchedulingDelta::PLACE) {
Expand Down Expand Up @@ -197,6 +200,7 @@ class FirmamentSchedulerServiceImpl final :
Status TaskRemoved(ServerContext* context,
const TaskUID* tid_ptr,
TaskRemovedResponse* reply) override {
boost::lock_guard<boost::recursive_mutex> lock(scheduler_->scheduling_lock_);
TaskDescriptor* td_ptr = FindPtrOrNull(*task_map_, tid_ptr->task_uid());
if (td_ptr == NULL) {
reply->set_type(TaskReplyType::TASK_NOT_FOUND);
Expand Down Expand Up @@ -234,6 +238,7 @@ class FirmamentSchedulerServiceImpl final :
Status TaskSubmitted(ServerContext* context,
const TaskDescription* task_desc_ptr,
TaskSubmittedResponse* reply) override {
boost::lock_guard<boost::recursive_mutex> lock(scheduler_->scheduling_lock_);
TaskID_t task_id = task_desc_ptr->task_descriptor().uid();
if (FindPtrOrNull(*task_map_, task_id)) {
reply->set_type(TaskReplyType::TASK_ALREADY_SUBMITTED);
Expand All @@ -245,6 +250,7 @@ class FirmamentSchedulerServiceImpl final :
}
JobID_t job_id = JobIDFromString(task_desc_ptr->task_descriptor().job_id());
JobDescriptor* jd_ptr = FindOrNull(*job_map_, job_id);
//LOG(INFO) << "Job id is " << job_id ;
if (jd_ptr == NULL) {
CHECK(InsertIfNotPresent(job_map_.get(), job_id,
task_desc_ptr->job_descriptor()));
Expand All @@ -271,6 +277,8 @@ class FirmamentSchedulerServiceImpl final :
uint64_t* num_tasks_to_remove =
FindOrNull(job_num_tasks_to_remove_, job_id);
(*num_tasks_to_remove)++;
if( (*num_tasks_to_remove) >= 3800 )
LOG(INFO) << "All tasks are submitted" << job_id << ", " << 3800 ;
reply->set_type(TaskReplyType::TASK_SUBMITTED_OK);
return Status::OK;
}
Expand Down Expand Up @@ -320,6 +328,7 @@ class FirmamentSchedulerServiceImpl final :
Status NodeAdded(ServerContext* context,
const ResourceTopologyNodeDescriptor* submitted_rtnd_ptr,
NodeAddedResponse* reply) override {
boost::lock_guard<boost::recursive_mutex> lock(scheduler_->scheduling_lock_);
bool doesnt_exist = DFSTraverseResourceProtobufTreeWhileTrue(
*submitted_rtnd_ptr,
boost::bind(&FirmamentSchedulerServiceImpl::CheckResourceDoesntExist,
Expand Down Expand Up @@ -469,6 +478,8 @@ class FirmamentSchedulerServiceImpl final :
CHECK(InsertIfNotPresent(resource_map_.get(), res_id, rs_ptr));
return rs_ptr;
}
boost::recursive_mutex task_submission_lock_;
boost::recursive_mutex node_addition_lock_;

};

Expand Down
1 change: 1 addition & 0 deletions src/scheduling/scheduler_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct SchedulerStats {

class SchedulerInterface : public PrintableInterface {
public:
boost::recursive_mutex scheduling_lock_;
SchedulerInterface(shared_ptr<JobMap_t> job_map,
shared_ptr<KnowledgeBase> knowledge_base,
shared_ptr<ResourceMap_t> resource_map,
Expand Down

0 comments on commit 3195a49

Please sign in to comment.