From 53dd4c255e079b9cbfe64f485a1d9c29e008fcf3 Mon Sep 17 00:00:00 2001 From: dentiny Date: Wed, 13 Nov 2024 23:08:26 +0000 Subject: [PATCH 01/10] minor cpp improvement Signed-off-by: dentiny --- src/ray/common/scheduling/cluster_resource_data.h | 9 +++++---- src/ray/raylet/local_task_manager.cc | 13 ++++++++----- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/ray/common/scheduling/cluster_resource_data.h b/src/ray/common/scheduling/cluster_resource_data.h index 30336e8da001..68780e776cfe 100644 --- a/src/ray/common/scheduling/cluster_resource_data.h +++ b/src/ray/common/scheduling/cluster_resource_data.h @@ -39,7 +39,7 @@ class ResourceRequest { ResourceRequest() : ResourceRequest({}, false) {} /// Construct a ResourceRequest with a given resource map. - ResourceRequest(absl::flat_hash_map resource_map) + explicit ResourceRequest(absl::flat_hash_map resource_map) : ResourceRequest(resource_map, false){}; ResourceRequest(absl::flat_hash_map resource_map, @@ -131,15 +131,16 @@ class TaskResourceInstances { boost::select_first_range>>; /// Construct an empty TaskResourceInstances. - TaskResourceInstances() {} + TaskResourceInstances() = default; /// Construct a TaskResourceInstances with the values from a ResourceSet. - TaskResourceInstances(const ResourceSet &resources) { + explicit TaskResourceInstances(const ResourceSet &resources) { for (auto &resource_id : resources.ResourceIds()) { std::vector instances; auto value = resources.Get(resource_id); if (resource_id.IsUnitInstanceResource()) { size_t num_instances = static_cast(value.Double()); + instances.reserve(instances.size() + num_instances); for (size_t i = 0; i < num_instances; i++) { instances.push_back(1.0); }; @@ -196,7 +197,7 @@ class TaskResourceInstances { /// Set the per-instance values for a particular resource. TaskResourceInstances &Set(const ResourceID resource_id, const std::vector &instances) { - if (instances.size() == 0) { + if (instances.empty()) { Remove(resource_id); } else { resources_[resource_id] = instances; diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index ed1560fc65b9..ebbf22aafbf2 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -130,13 +130,16 @@ void LocalTaskManager::DispatchScheduledTasksToWorkers() { auto &scheduling_class = shapes_it->first; auto &dispatch_queue = shapes_it->second; - if (info_by_sched_cls_.find(scheduling_class) == info_by_sched_cls_.end()) { + auto sched_cls_iter = info_by_sched_cls_.find(scheduling_class); + if (sched_cls_iter == info_by_sched_cls_.end()) { // Initialize the class info. - info_by_sched_cls_.emplace( - scheduling_class, - SchedulingClassInfo(MaxRunningTasksPerSchedulingClass(scheduling_class))); + sched_cls_iter = info_by_sched_cls_ + .emplace(scheduling_class, + SchedulingClassInfo(MaxRunningTasksPerSchedulingClass( + scheduling_class))) + .first; } - auto &sched_cls_info = info_by_sched_cls_.at(scheduling_class); + auto &sched_cls_info = sched_cls_iter->second; // Fair scheduling is applied only when the total CPU requests exceed the node's // capacity. This skips scheduling classes whose number of running tasks exceeds the From 940c5223de12fd5337c500591856fea3077802b7 Mon Sep 17 00:00:00 2001 From: dentiny Date: Wed, 13 Nov 2024 23:22:03 +0000 Subject: [PATCH 02/10] move shared pointer -s Signed-off-by: dentiny --- src/ray/raylet/local_task_manager.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index ebbf22aafbf2..9576066f71cf 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -93,18 +93,18 @@ bool LocalTaskManager::WaitForTaskArgsRequests(std::shared_ptr w {task.GetTaskSpecification().GetName(), task.GetTaskSpecification().IsRetry()}); if (args_ready) { RAY_LOG(DEBUG) << "Args already ready, task can be dispatched " << task_id; - tasks_to_dispatch_[scheduling_key].push_back(work); + tasks_to_dispatch_[scheduling_key].emplace_back(std::move(work)); } else { RAY_LOG(DEBUG) << "Waiting for args for task: " << task.GetTaskSpecification().TaskId(); can_dispatch = false; - auto it = waiting_task_queue_.insert(waiting_task_queue_.end(), work); + auto it = waiting_task_queue_.insert(waiting_task_queue_.end(), std::move(work)); RAY_CHECK(waiting_tasks_index_.emplace(task_id, it).second); } } else { RAY_LOG(DEBUG) << "No args, task can be dispatched " << task.GetTaskSpecification().TaskId(); - tasks_to_dispatch_[scheduling_key].push_back(work); + tasks_to_dispatch_[scheduling_key].emplace_back(std::move(task)); } return can_dispatch; } From 1e3ccdd70fc430b29285fd32c3073ca615cdb08c Mon Sep 17 00:00:00 2001 From: dentiny Date: Wed, 13 Nov 2024 23:22:39 +0000 Subject: [PATCH 03/10] empty over size Signed-off-by: dentiny --- src/ray/raylet/local_task_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index 9576066f71cf..e6e131c31971 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -86,7 +86,7 @@ bool LocalTaskManager::WaitForTaskArgsRequests(std::shared_ptr w const auto &scheduling_key = task.GetTaskSpecification().GetSchedulingClass(); auto object_ids = task.GetTaskSpecification().GetDependencies(); bool can_dispatch = true; - if (object_ids.size() > 0) { + if (!object_ids.empty()) { bool args_ready = task_dependency_manager_.RequestTaskDependencies( task_id, task.GetDependencies(), From 778a326607c3331d2b7e78e4ff50f242c79c3fe2 Mon Sep 17 00:00:00 2001 From: dentiny Date: Wed, 13 Nov 2024 23:50:07 +0000 Subject: [PATCH 04/10] fix var Signed-off-by: dentiny --- src/ray/raylet/local_task_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index e6e131c31971..0c7eab3158e6 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -104,7 +104,7 @@ bool LocalTaskManager::WaitForTaskArgsRequests(std::shared_ptr w } else { RAY_LOG(DEBUG) << "No args, task can be dispatched " << task.GetTaskSpecification().TaskId(); - tasks_to_dispatch_[scheduling_key].emplace_back(std::move(task)); + tasks_to_dispatch_[scheduling_key].emplace_back(std::move(work)); } return can_dispatch; } From 7e4728e8daf420c4a754af962aef9ae1f15fc958 Mon Sep 17 00:00:00 2001 From: dentiny Date: Thu, 14 Nov 2024 00:29:35 +0000 Subject: [PATCH 05/10] move Signed-off-by: dentiny --- src/ray/raylet/local_task_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index 0c7eab3158e6..f53fca5a365a 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -76,7 +76,7 @@ void LocalTaskManager::QueueAndScheduleTask(std::shared_ptr work // guarantee that the local node is not selected for scheduling. ASSERT_FALSE( cluster_resource_scheduler_->GetLocalResourceManager().IsLocalNodeDraining()); - WaitForTaskArgsRequests(work); + WaitForTaskArgsRequests(std::move(work)); ScheduleAndDispatchTasks(); } From 520b5ac823dee90205bcb966f56a27943680b5c9 Mon Sep 17 00:00:00 2001 From: dentiny Date: Thu, 14 Nov 2024 00:37:21 +0000 Subject: [PATCH 06/10] move and iterator Signed-off-by: dentiny --- src/ray/raylet/scheduling/cluster_task_manager.cc | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index 99b998dc14fe..73ca9fd40b12 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -50,16 +50,21 @@ void ClusterTaskManager::QueueAndScheduleTask( RAY_LOG(DEBUG) << "Queuing and scheduling task " << task.GetTaskSpecification().TaskId(); auto work = std::make_shared( - task, grant_or_reject, is_selected_based_on_locality, reply, [send_reply_callback] { + task, + grant_or_reject, + is_selected_based_on_locality, + reply, + [send_reply_callback = std::move(send_reply_callback)] { send_reply_callback(Status::OK(), nullptr, nullptr); }); const auto &scheduling_class = task.GetTaskSpecification().GetSchedulingClass(); // If the scheduling class is infeasible, just add the work to the infeasible queue // directly. - if (infeasible_tasks_.count(scheduling_class) > 0) { - infeasible_tasks_[scheduling_class].push_back(work); + auto infeasible_tasks_iter = infeasible_tasks_.find(scheduling_class); + if (infeasible_tasks_iter != infeasible_tasks_.end()) { + iter->second.emplace_back(std::move(work)); } else { - tasks_to_schedule_[scheduling_class].push_back(work); + tasks_to_schedule_[scheduling_class].emplace_back(std::move(work)); } ScheduleAndDispatchTasks(); } From e20edcf4207595ae01a4682ce4fc60aaccfb56bc Mon Sep 17 00:00:00 2001 From: dentiny Date: Thu, 14 Nov 2024 01:34:01 +0000 Subject: [PATCH 07/10] move task spec Signed-off-by: dentiny --- src/ray/core_worker/transport/normal_task_submitter.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/core_worker/transport/normal_task_submitter.cc b/src/ray/core_worker/transport/normal_task_submitter.cc index 9e8ac970db1b..043223fbd422 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.cc +++ b/src/ray/core_worker/transport/normal_task_submitter.cc @@ -333,7 +333,7 @@ void NormalTaskSubmitter::RequestNewWorkerIfNeeded(const SchedulingKey &scheduli // same TaskID to request a worker auto resource_spec_msg = scheduling_key_entry.resource_spec.GetMutableMessage(); resource_spec_msg.set_task_id(TaskID::FromRandom(job_id_).Binary()); - const TaskSpecification resource_spec = TaskSpecification(resource_spec_msg); + const TaskSpecification resource_spec = TaskSpecification(std::move(resource_spec_msg)); rpc::Address best_node_address; const bool is_spillback = (raylet_address != nullptr); bool is_selected_based_on_locality = false; From dec2b2b5797669c2deb30c2428a8e580b6d7ff98 Mon Sep 17 00:00:00 2001 From: dentiny Date: Thu, 14 Nov 2024 02:17:20 +0000 Subject: [PATCH 08/10] avoid iterator lookup Signed-off-by: dentiny --- src/ray/core_worker/transport/normal_task_submitter.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/ray/core_worker/transport/normal_task_submitter.cc b/src/ray/core_worker/transport/normal_task_submitter.cc index 043223fbd422..372918e649ba 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.cc +++ b/src/ray/core_worker/transport/normal_task_submitter.cc @@ -41,8 +41,9 @@ Status NormalTaskSubmitter::SubmitTask(TaskSpecification task_spec) { bool keep_executing = true; { absl::MutexLock lock(&mu_); - if (cancelled_tasks_.find(task_spec.TaskId()) != cancelled_tasks_.end()) { - cancelled_tasks_.erase(task_spec.TaskId()); + auto task_iter = cancelled_tasks_.find(task_spec.TaskId()); + if (task_iter != cancelled_tasks_.end()) { + cancelled_tasks_.erase(task_iter); keep_executing = false; } if (keep_executing) { From fde429903e2c0783e096a854a754f4562a2c881b Mon Sep 17 00:00:00 2001 From: dentiny Date: Thu, 14 Nov 2024 03:17:42 +0000 Subject: [PATCH 09/10] break early Signed-off-by: dentiny --- cpp/src/ray/runtime/task/native_task_submitter.cc | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/cpp/src/ray/runtime/task/native_task_submitter.cc b/cpp/src/ray/runtime/task/native_task_submitter.cc index bd1a544ca23a..58737b36949b 100644 --- a/cpp/src/ray/runtime/task/native_task_submitter.cc +++ b/cpp/src/ray/runtime/task/native_task_submitter.cc @@ -105,11 +105,7 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation, scheduling_strategy, ""); } - std::vector return_ids; - for (const auto &ref : return_refs) { - return_ids.push_back(ObjectID::FromBinary(ref.object_id())); - } - return return_ids[0]; + return ObjectID::FromBinary(return_refs[0].object_id()); } ObjectID NativeTaskSubmitter::SubmitTask(InvocationSpec &invocation, From 9911e529a0de4a77bdd820021e608ed5c7ae3633 Mon Sep 17 00:00:00 2001 From: dentiny Date: Fri, 15 Nov 2024 00:01:23 +0000 Subject: [PATCH 10/10] fix typo Signed-off-by: dentiny --- src/ray/raylet/scheduling/cluster_task_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index 73ca9fd40b12..04580ee05672 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -62,7 +62,7 @@ void ClusterTaskManager::QueueAndScheduleTask( // directly. auto infeasible_tasks_iter = infeasible_tasks_.find(scheduling_class); if (infeasible_tasks_iter != infeasible_tasks_.end()) { - iter->second.emplace_back(std::move(work)); + infeasible_tasks_iter->second.emplace_back(std::move(work)); } else { tasks_to_schedule_[scheduling_class].emplace_back(std::move(work)); }