From 66e2d643df5e2b95ca4c044280c690999f2f79a3 Mon Sep 17 00:00:00 2001 From: s-morita <32567894+smorita-esol@users.noreply.github.com> Date: Tue, 6 Jun 2023 18:19:33 +0900 Subject: [PATCH 1/5] feat: Thread configuration prototype This is a prototype implementation of RCLCPP for discussion about the thread configuration feature to receive and apply a set of scheduling parameters for the threads controlled by the ROS 2 executor. Our basic idea is as below. 1. Implement a new class rclcpp::thread and modify rclcpp to use it. This class has the same function set as the std::thread but also additional features to control its thread attributions. 2. Modify the rcl layer to receive a set of scheduling parameters. The parameters are described in YAML format and passed via command line parameters, environment variables, or files. 3. the rclcpp reads the parameters from rcl and applies them to each thread in the thread pool. There have been some discussions about this pull request, as below. [ROS Discourse] https://discourse.ros.org/t/adding-thread-attributes-configuration-in-ros-2-framework/30701 [ROS 2 Real-Time Working Group] https://github.com/ros-realtime/ros-realtime.github.io/issues/18 Signed-off-by: Shoji Morita --- rclcpp/CMakeLists.txt | 10 + .../executors/multi_threaded_executor.hpp | 3 + .../executors/single_threaded_executor.hpp | 7 + rclcpp/include/rclcpp/threads.hpp | 26 +++ .../rclcpp/threads/posix/linux/cpu_set.hpp | 158 ++++++++++++++ .../include/rclcpp/threads/posix/thread.hpp | 205 ++++++++++++++++++ .../rclcpp/threads/posix/thread_attribute.hpp | 189 ++++++++++++++++ .../rclcpp/threads/posix/thread_func.hpp | 54 +++++ .../rclcpp/threads/posix/thread_id.hpp | 146 +++++++++++++ .../rclcpp/threads/posix/utilities.hpp | 42 ++++ rclcpp/include/rclcpp/threads/std/thread.hpp | 145 +++++++++++++ .../rclcpp/threads/std/thread_attribute.hpp | 165 ++++++++++++++ .../include/rclcpp/threads/windows/thread.hpp | 22 ++ .../executors/multi_threaded_executor.cpp | 64 +++++- .../executors/single_threaded_executor.cpp | 35 ++- rclcpp/src/rclcpp/threads/posix_thread.cpp | 171 +++++++++++++++ rclcpp/src/rclcpp/threads/windows_thread.cpp | 19 ++ 17 files changed, 1450 insertions(+), 11 deletions(-) create mode 100644 rclcpp/include/rclcpp/threads.hpp create mode 100644 rclcpp/include/rclcpp/threads/posix/linux/cpu_set.hpp create mode 100644 rclcpp/include/rclcpp/threads/posix/thread.hpp create mode 100644 rclcpp/include/rclcpp/threads/posix/thread_attribute.hpp create mode 100644 rclcpp/include/rclcpp/threads/posix/thread_func.hpp create mode 100644 rclcpp/include/rclcpp/threads/posix/thread_id.hpp create mode 100644 rclcpp/include/rclcpp/threads/posix/utilities.hpp create mode 100644 rclcpp/include/rclcpp/threads/std/thread.hpp create mode 100644 rclcpp/include/rclcpp/threads/std/thread_attribute.hpp create mode 100644 rclcpp/include/rclcpp/threads/windows/thread.hpp create mode 100644 rclcpp/src/rclcpp/threads/posix_thread.cpp create mode 100644 rclcpp/src/rclcpp/threads/windows_thread.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index b2d1023064..85434df9a0 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -121,6 +121,16 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/waitable.cpp ) +if(${CMAKE_SYSTEM_NAME} MATCHES "Linux") + list(APPEND ${PROJECT_NAME}_SRCS + src/rclcpp/threads/posix_thread.cpp + ) +elseif(${CMAKE_SYSTEM_NAME} MATCHES "Windows") + list(APPEND ${PROJECT_NAME}_SRCS + src/rclcpp/threads/windows_thread.cpp + ) +endif() + find_package(Python3 REQUIRED COMPONENTS Interpreter) # "watch" template for changes diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 119013ebfb..8c48c5fe1e 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -18,10 +18,12 @@ #include #include #include +#include #include #include #include +#include "rcl_yaml_param_parser/types.h" #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" @@ -85,6 +87,7 @@ class MultiThreadedExecutor : public rclcpp::Executor size_t number_of_threads_; bool yield_before_execute_; std::chrono::nanoseconds next_exec_timeout_; + rcl_thread_attrs_t * thread_attributes_; }; } // namespace executors diff --git a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp index 9dc6dec57b..526592da16 100644 --- a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp @@ -22,6 +22,7 @@ #include #include +#include "rcl_yaml_param_parser/types.h" #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" @@ -65,8 +66,14 @@ class SingleThreadedExecutor : public rclcpp::Executor void spin() override; +protected: + RCLCPP_PUBLIC + void + run(); + private: RCLCPP_DISABLE_COPY(SingleThreadedExecutor) + rcl_thread_attrs_t * thread_attributes_; }; } // namespace executors diff --git a/rclcpp/include/rclcpp/threads.hpp b/rclcpp/include/rclcpp/threads.hpp new file mode 100644 index 0000000000..ac628de3e1 --- /dev/null +++ b/rclcpp/include/rclcpp/threads.hpp @@ -0,0 +1,26 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS_HPP_ +#define RCLCPP__THREADS_HPP_ + +#if defined(__linux__) +#include "rclcpp/threads/posix/thread.hpp" +#elif defined(_WIN32) +#include "rclcpp/threads/win32/thread.hpp" +#else +#include "rclcpp/threads/std/thread.hpp" +#endif + +#endif // RCLCPP__THREADS_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/linux/cpu_set.hpp b/rclcpp/include/rclcpp/threads/posix/linux/cpu_set.hpp new file mode 100644 index 0000000000..60e9a1fbd7 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/linux/cpu_set.hpp @@ -0,0 +1,158 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__LINUX__CPU_SET_HPP_ +#define RCLCPP__THREADS__POSIX__LINUX__CPU_SET_HPP_ + +#include +#include +#include +#include + +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +namespace detail +{ + +struct CpuSet +{ + using NativeCpuSetType = cpu_set_t; + CpuSet() = default; + explicit CpuSet(std::size_t cpu) + { + init_cpu_set(); + CPU_ZERO_S(alloc_size(), cpu_set_.get()); + CPU_SET_S(cpu, alloc_size(), cpu_set_.get()); + } + CpuSet(const CpuSet & other) + { + if (other.cpu_set_) { + init_cpu_set(); + memcpy(cpu_set_.get(), other.cpu_set_.get(), alloc_size()); + } + } + CpuSet & operator=(const CpuSet & other) + { + if (other.cpu_set_) { + init_cpu_set(); + memcpy(cpu_set_.get(), other.cpu_set_.get(), alloc_size()); + } else { + clear(); + } + return *this; + } + CpuSet(CpuSet && other) + : CpuSet() + { + swap(other); + } + CpuSet & operator=(CpuSet && other) + { + CpuSet tmp; + other.swap(tmp); + tmp.swap(*this); + return *this; + } + void swap(CpuSet & other) + { + using std::swap; + swap(cpu_set_, other.cpu_set_); + swap(num_proc_, other.num_proc_); + } + void set(std::size_t cpu) + { + init_cpu_set(); + valid_cpu(cpu); + CPU_SET_S(cpu, alloc_size(), cpu_set_.get()); + } + void unset(std::size_t cpu) + { + init_cpu_set(); + valid_cpu(cpu); + CPU_CLR_S(cpu, alloc_size(), cpu_set_.get()); + } + void clear() + { + if (cpu_set_) { + CPU_ZERO_S(alloc_size(), cpu_set_.get()); + } + } + bool is_set(std::size_t cpu) const + { + if (cpu_set_) { + valid_cpu(cpu); + return CPU_ISSET_S(cpu, alloc_size(), cpu_set_.get()); + } else { + return false; + } + } + + std::size_t max_processors() const + { + return num_proc_; + } + std::size_t alloc_size() const + { + return CPU_ALLOC_SIZE(num_proc_); + } + NativeCpuSetType * native_cpu_set() const + { + return cpu_set_.get(); + } + +private: + void init_cpu_set() + { + if (cpu_set_) { + return; + } + auto num_proc = sysconf(_SC_NPROCESSORS_ONLN); + if (num_proc == -1) { + return; + } + auto p = CPU_ALLOC(CPU_ALLOC_SIZE(num_proc)); + cpu_set_ = std::unique_ptr(p); + num_proc_ = num_proc; + } + void valid_cpu(std::size_t cpu) const + { + if (num_proc_ <= cpu) { + auto ec = std::make_error_code(std::errc::invalid_argument); + throw std::system_error{ec, "cpu number is invaild"}; + } + } + struct CpuSetDeleter + { + void operator()(NativeCpuSetType * cpu_set) const + { + CPU_FREE(cpu_set); + } + }; + std::unique_ptr cpu_set_; + std::size_t num_proc_; +}; + +inline void swap(CpuSet & a, CpuSet & b) +{ + a.swap(b); +} + +} // namespace detail + +} // namespace rclcpp + +#endif // RCLCPP__THREADS__POSIX__LINUX__CPU_SET_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread.hpp b/rclcpp/include/rclcpp/threads/posix/thread.hpp new file mode 100644 index 0000000000..2a249541f4 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/thread.hpp @@ -0,0 +1,205 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__THREAD_HPP_ +#define RCLCPP__THREADS__POSIX__THREAD_HPP_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/macros.hpp" +#include "rclcpp/threads/posix/thread_attribute.hpp" +#include "rclcpp/threads/posix/thread_func.hpp" +#include "rclcpp/threads/posix/thread_id.hpp" +#include "rclcpp/threads/posix/utilities.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +struct Thread +{ + using NativeHandleType = pthread_t; + using Attribute = detail::ThreadAttribute; + using Id = detail::ThreadId; + + // Assume pthread_t is an invalid handle if it's 0 + Thread() noexcept + : handle_{} {} + Thread(Thread && other) + : handle_{} + { + swap(other); + } + template, Attribute>::value>> + explicit Thread(F && f, Args && ... args) + : Thread( + static_cast(nullptr), + make_thread_func(std::forward(f), std::forward(args)...)) + {} + template + Thread(Attribute const & attr, F && f, Args && ... args) + : Thread( + &attr, + make_thread_func_with_attr(attr, std::forward(f), std::forward(args)...)) + {} + Thread(Thread const &) = delete; + ~Thread() + { + if (handle_) { + std::terminate(); + } + } + + Thread & operator=(Thread && other) noexcept + { + if (handle_) { + std::terminate(); + } + swap(other); + return *this; + } + + Thread & operator=(Thread const &) = delete; + + void swap(Thread & other) + { + using std::swap; + swap(handle_, other.handle_); + swap(name_, other.name_); + } + + void join() + { + void * p; + int r = pthread_join(handle_, &p); + detail::throw_if_error(r, "Error in pthread_join "); + handle_ = NativeHandleType{}; + } + + bool joinable() const noexcept + { + return 0 == pthread_equal(handle_, NativeHandleType{}); + } + + void detach() + { + int r = pthread_detach(handle_); + detail::throw_if_error(r, "Error in pthread_detach "); + handle_ = NativeHandleType{}; + } + + NativeHandleType native_handle() const + { + return handle_; + } + + Id get_id() const noexcept + { + return Id{handle_}; + } + + static unsigned int hardware_concurrency() noexcept + { + auto r = sysconf(_SC_NPROCESSORS_ONLN); + if (r == -1) { + return 0u; + } else { + return static_cast(r); + } + } + +private: + using ThreadFuncBase = detail::ThreadFuncBase; + template + static ThreadFuncBase::UniquePtr make_thread_func(F && f, Args && ... args) + { + static_assert( + !std::is_member_object_pointer_v>, + "F is a pointer to member, that has no effect on a thread"); + + detail::ThreadFuncBase * func = new detail::ThreadFunc( + [f = std::forward(f), args = std::tuple(std::forward(args)...)]() mutable + { + std::apply(f, args); + }); + return ThreadFuncBase::UniquePtr(func); + } + template + static ThreadFuncBase::UniquePtr make_thread_func_with_attr( + Attribute const & attr, + F && f, + Args && ... args) + { + static_assert( + !std::is_member_object_pointer_v>, + "F is a pointer to member, that has no effect on a thread"); + + detail::ThreadFuncBase * func = new detail::ThreadFunc( + [attr, f = std::forward(f), args = std::tuple(std::forward(args)...)]() mutable + { + std::apply(f, args); + }); + return ThreadFuncBase::UniquePtr(func); + } + + Thread(Attribute const * attr, ThreadFuncBase::UniquePtr func); + + static void apply_attr(Attribute const & attr); + + NativeHandleType handle_; + std::string name_; +}; + +inline void swap(Thread & t1, Thread & t2) +{ + t1.swap(t2); +} + +namespace detail +{ +void apply_attr_to_current_thread(ThreadAttribute const & attr); +} + +namespace this_thread +{ + +template +void run_with_thread_attribute( + detail::ThreadAttribute const & attr, F && f, Args && ... args) +{ + static_assert( + !std::is_member_object_pointer_v>, + "F is a pointer to member, that has no effect on a thread"); + + detail::apply_attr_to_current_thread(attr); + std::invoke(std::forward(f), std::forward(args)...); +} + +} // namespace this_thread + +} // namespace rclcpp + +#endif // RCLCPP__THREADS__POSIX__THREAD_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread_attribute.hpp b/rclcpp/include/rclcpp/threads/posix/thread_attribute.hpp new file mode 100644 index 0000000000..e2baa37696 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/thread_attribute.hpp @@ -0,0 +1,189 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__THREAD_ATTRIBUTE_HPP_ +#define RCLCPP__THREADS__POSIX__THREAD_ATTRIBUTE_HPP_ + +#include +#include +#include + +#include "rcl_yaml_param_parser/types.h" +#include "rclcpp/visibility_control.hpp" + +#ifdef __linux__ +#include "rclcpp/threads/posix/linux/cpu_set.hpp" +#endif + +namespace rclcpp +{ + +namespace detail +{ + +struct ThreadAttribute +{ + ThreadAttribute(); + + ThreadAttribute(const ThreadAttribute &) = default; + ThreadAttribute(ThreadAttribute &&) = default; + ThreadAttribute & operator=(const ThreadAttribute &) = default; + ThreadAttribute & operator=(ThreadAttribute &&) = default; + + using NativeAttributeType = pthread_attr_t; + + ThreadAttribute & set_affinity(CpuSet cs) + { + cpu_set_ = std::move(cs); + return *this; + } + const CpuSet & get_affinity() const + { + return cpu_set_; + } + + ThreadAttribute & set_sched_policy(rcl_thread_scheduling_policy_type_t sp) + { + sched_policy_ = rcl_scheduling_policy_to_sched_policy(sp); + return *this; + } + int get_sched_policy() const + { + return sched_policy_; + } + + ThreadAttribute & set_stack_size(std::size_t sz) + { + stack_size_ = sz; + return *this; + } + std::size_t get_stack_size() const + { + return stack_size_; + } + + ThreadAttribute & set_priority(int prio) + { + priority_ = prio; + return *this; + } + int get_priority() const + { + return priority_; + } + + ThreadAttribute & set_run_as_detached(bool detach) + { + detached_flag_ = detach; + return *this; + } + bool get_run_as_detached() const + { + return detached_flag_; + } + + ThreadAttribute & set_name(std::string name) + { + name_ = std::move(name); + return *this; + } + const std::string & get_name() const + { + return name_; + } + + void + set_thread_attribute( + const rcl_thread_attr_t & attr) + { + CpuSet cpu_set(attr.core_affinity); + set_affinity(std::move(cpu_set)); + set_sched_policy(attr.scheduling_policy); + set_priority(attr.priority); + set_name(attr.name); + } + + void + swap( + ThreadAttribute & other) + { + using std::swap; + swap(cpu_set_, other.cpu_set_); + swap(sched_policy_, other.sched_policy_); + swap(stack_size_, other.stack_size_); + swap(priority_, other.priority_); + swap(detached_flag_, other.detached_flag_); + swap(name_, other.name_); + } + +private: + CpuSet cpu_set_; + int sched_policy_; + std::size_t stack_size_; + int priority_; + bool detached_flag_; + std::string name_; + + int rcl_scheduling_policy_to_sched_policy( + rcl_thread_scheduling_policy_type_t sched_policy) + { + switch (sched_policy) { +#ifdef SCHED_FIFO + case RCL_THREAD_SCHEDULING_POLICY_FIFO: + return SCHED_FIFO; +#endif +#ifdef SCHED_RR + case RCL_THREAD_SCHEDULING_POLICY_RR: + return SCHED_RR; +#endif +#ifdef SCHED_OTHER + case RCL_THREAD_SCHEDULING_POLICY_OTHER: + return SCHED_OTHER; +#endif +#ifdef SCHED_IDLE + case RCL_THREAD_SCHEDULING_POLICY_IDLE: + return SCHED_IDLE; +#endif +#ifdef SCHED_BATCH + case RCL_THREAD_SCHEDULING_POLICY_BATCH: + return SCHED_BATCH; +#endif +#ifdef SCHED_SPORADIC + case RCL_THREAD_SCHEDULING_POLICY_SPORADIC: + return SCHED_SPORADIC; +#endif + /* Todo: Necessity and setting method need to be considered + #ifdef SCHED_DEADLINE + case RCL_THREAD_SCHEDULING_POLICY_DEADLINE: + return SCHED_DEADLINE; + break; + #endif + */ + default: + throw std::runtime_error("Invalid scheduling policy"); + } + return -1; + } +}; + +inline void swap(ThreadAttribute & a, ThreadAttribute & b) +{ + a.swap(b); +} + +} // namespace detail + +} // namespace rclcpp + +#endif // RCLCPP__THREADS__POSIX__THREAD_ATTRIBUTE_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread_func.hpp b/rclcpp/include/rclcpp/threads/posix/thread_func.hpp new file mode 100644 index 0000000000..d4ae497e14 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/thread_func.hpp @@ -0,0 +1,54 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__THREAD_FUNC_HPP_ +#define RCLCPP__THREADS__POSIX__THREAD_FUNC_HPP_ + +#include +#include +#include + +namespace rclcpp::detail +{ + +struct ThreadFuncBase +{ + virtual ~ThreadFuncBase() = default; + virtual void run() = 0; + RCLCPP_UNIQUE_PTR_DEFINITIONS(ThreadFuncBase) +}; + +template +struct ThreadFunc : ThreadFuncBase +{ + template + explicit ThreadFunc(G && g) + : func_(std::forward(g)) + {} + +private: + void run() override + { + func_(); + } + + F func_; +}; + +template +ThreadFunc(F &&)->ThreadFunc>; + +} // namespace rclcpp::detail + +#endif // RCLCPP__THREADS__POSIX__THREAD_FUNC_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread_id.hpp b/rclcpp/include/rclcpp/threads/posix/thread_id.hpp new file mode 100644 index 0000000000..20fa3b8a00 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/thread_id.hpp @@ -0,0 +1,146 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__THREAD_ID_HPP_ +#define RCLCPP__THREADS__POSIX__THREAD_ID_HPP_ + +#include + +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +#ifdef __clang__ +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wmismatched-tags" +#endif + +struct Thread; + +namespace detail +{ + +namespace thread_id_ns +{ + +struct ThreadId; + +inline ThreadId get_id() noexcept; +inline bool operator==(ThreadId id1, ThreadId id2); +inline bool operator!=(ThreadId id1, ThreadId id2); +inline bool operator<(ThreadId id1, ThreadId id2); +inline bool operator>(ThreadId id1, ThreadId id2); +inline bool operator<=(ThreadId id1, ThreadId id2); +inline bool operator>=(ThreadId id1, ThreadId id2); +template +inline std::basic_ostream & operator<<( + std::basic_ostream &, + ThreadId); + +struct ThreadId +{ + ThreadId() = default; + ThreadId(ThreadId const &) = default; + ThreadId(ThreadId &&) = default; + ThreadId & operator=(ThreadId const &) = default; + ThreadId & operator=(ThreadId &&) = default; + + friend bool operator==(ThreadId id1, ThreadId id2) + { + return pthread_equal(id1.h, id2.h); + } + friend bool operator<(ThreadId id1, ThreadId id2) + { + return id1.h < id2.h; + } + template + friend std::basic_ostream & operator<<( + std::basic_ostream & ost, + ThreadId id) + { + return ost << id.h; + } + +private: + friend class rclcpp::Thread; + friend ThreadId get_id() noexcept; + friend struct std::hash; + explicit ThreadId(pthread_t h) + : h(h) {} + pthread_t h; +}; + +ThreadId get_id() noexcept +{ + return ThreadId{pthread_self()}; +} + +bool operator!=(ThreadId id1, ThreadId id2) +{ + return !(id1 == id2); +} + +bool operator>(ThreadId id1, ThreadId id2) +{ + return id2 < id1; +} + +bool operator<=(ThreadId id1, ThreadId id2) +{ + return !(id1 > id2); +} + +bool operator>=(ThreadId id1, ThreadId id2) +{ + return !(id1 < id2); +} + +} // namespace thread_id_ns + +using thread_id_ns::ThreadId; +using thread_id_ns::operator==; +using thread_id_ns::operator!=; +using thread_id_ns::operator<; // NOLINT +using thread_id_ns::operator>; // NOLINT +using thread_id_ns::operator<=; +using thread_id_ns::operator>=; +using thread_id_ns::operator<<; + +} // namespace detail + +namespace this_thread +{ + +using detail::thread_id_ns::get_id; + +} // namespace this_thread + +} // namespace rclcpp + +namespace std +{ + +template<> +struct hash +{ + std::size_t operator()(rclcpp::detail::thread_id_ns::ThreadId id) + { + return id.h; + } +}; + +} // namespace std + +#endif // RCLCPP__THREADS__POSIX__THREAD_ID_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/utilities.hpp b/rclcpp/include/rclcpp/threads/posix/utilities.hpp new file mode 100644 index 0000000000..4b648db276 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/utilities.hpp @@ -0,0 +1,42 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__UTILITIES_HPP_ +#define RCLCPP__THREADS__POSIX__UTILITIES_HPP_ + +#include + +namespace rclcpp +{ + +namespace detail +{ + +namespace +{ + +inline void throw_if_error(int r, char const * msg) +{ + if (r != 0) { + throw std::system_error(r, std::system_category(), msg); + } +} + +} // namespace + +} // namespace detail + +} // namespace rclcpp + +#endif // RCLCPP__THREADS__POSIX__UTILITIES_HPP_ diff --git a/rclcpp/include/rclcpp/threads/std/thread.hpp b/rclcpp/include/rclcpp/threads/std/thread.hpp new file mode 100644 index 0000000000..3ce0b9e188 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/std/thread.hpp @@ -0,0 +1,145 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__STD__THREAD_HPP_ +#define RCLCPP__THREADS__STD__THREAD_HPP_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/threads/std/thread_attribute.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +struct Thread +{ + using NativeHandleType = std::thread::native_handle_type; + using Attribute = detail::ThreadAttribute; + using Id = std::thread::id; + + Thread() noexcept + : thread_{} + {} + Thread(Thread && other) + : thread_{} + { + swap(other); + } + template, Attribute>::value>> + explicit Thread(F && f, Args && ... args) + : thread_(std::forward(f), std::forward(args)...) + {} + template + Thread(Attribute & attr, F && f, Args && ... args) + : thread_(std::forward(f), std::forward(args)...) + { + if (attr.set_unavailable_items_) { + throw std::runtime_error("std::thread can't set thread attribute"); + } + if (attr.get_run_as_detached()) { + thread_.detach(); + } + } + Thread(Thread const &) = delete; + ~Thread() {} + + Thread & operator=(Thread && other) noexcept + { + swap(other); + return *this; + } + + Thread & operator=(Thread const &) = delete; + + void swap(Thread & other) + { + using std::swap; + swap(thread_, other.thread_); + } + + void join() + { + thread_.join(); + thread_ = std::thread{}; + } + + bool joinable() const noexcept + { + return thread_.joinable(); + } + + void detach() + { + thread_.detach(); + thread_ = std::thread{}; + } + + NativeHandleType native_handle() + { + return thread_.native_handle(); + } + + Id get_id() const noexcept + { + return thread_.get_id(); + } + + static int hardware_concurrency() noexcept + { + return std::thread::hardware_concurrency(); + } + +private: + std::thread thread_; +}; + +inline void swap(Thread & t1, Thread & t2) +{ + t1.swap(t2); +} + +namespace this_thread +{ + +template +void run_with_thread_attribute(Thread::Attribute & attr, F && f, Args && ... args) +{ + static_assert( + !std::is_member_object_pointer_v>, + "F is a pointer to member, that is ineffective on thread"); + + if (attr.set_unavailable_items_) { + throw std::runtime_error("std::thread can't set thread attribute"); + } + + std::invoke(f, std::forward(args)...); +} + +} // namespace this_thread + +} // namespace rclcpp + +#endif // RCLCPP__THREADS__STD__THREAD_HPP_ diff --git a/rclcpp/include/rclcpp/threads/std/thread_attribute.hpp b/rclcpp/include/rclcpp/threads/std/thread_attribute.hpp new file mode 100644 index 0000000000..ef0da283be --- /dev/null +++ b/rclcpp/include/rclcpp/threads/std/thread_attribute.hpp @@ -0,0 +1,165 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__STD__THREAD_ATTRIBUTE_HPP_ +#define RCLCPP__THREADS__STD__THREAD_ATTRIBUTE_HPP_ + +#include +#include +#include + +#include "rcl_yaml_param_parser/types.h" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +struct Thread; + +namespace detail +{ +struct ThreadAttribute; +} // namespace detail + +namespace this_thread +{ +template +void run_with_thread_attribute( + detail::ThreadAttribute & attr, F && f, Args && ... args); +} // namespace this_thread + +namespace detail +{ + +struct CpuSet +{ + using NativeCpuSetType = std::size_t; + CpuSet() {} + explicit CpuSet(std::size_t) {} + CpuSet(const CpuSet &) {} + CpuSet & operator=(const CpuSet &) + { + return *this; + } + CpuSet(CpuSet &&) = delete; + CpuSet & operator=(CpuSet &&) = delete; + ~CpuSet() {} + void set(std::size_t) {} + void unset(std::size_t) {} + void clear() {} + bool is_set(std::size_t) + { + return false; + } + std::size_t get_max_processors() const + { + return 0; + } + NativeCpuSetType native_cpu_set() const + { + return 0; + } +}; + +struct ThreadAttribute +{ + using PriorityType = int; + + ThreadAttribute() + : set_unavailable_items_(false) {} + + ThreadAttribute(const ThreadAttribute &) = default; + ThreadAttribute(ThreadAttribute &&) = default; + ThreadAttribute & operator=(const ThreadAttribute &) = default; + ThreadAttribute & operator=(ThreadAttribute &&) = default; + + ThreadAttribute & set_affinity(CpuSet &) + { + set_unavailable_items_ = true; + return *this; + } + CpuSet get_affinity() + { + return CpuSet{}; + } + + ThreadAttribute & set_stack_size(std::size_t) + { + set_unavailable_items_ = true; + return *this; + } + std::size_t get_stack_size() const + { + return 0; + } + + ThreadAttribute & set_priority(int prio) + { + (void)prio; + set_unavailable_items_ = true; + return *this; + } + int get_priority() const + { + return 0; + } + + ThreadAttribute & set_run_as_detached(bool detach) + { + run_as_detached_ = detach; + return *this; + } + bool get_run_as_detached() const + { + return run_as_detached_; + } + + ThreadAttribute & set_name(std::string const &) + { + set_unavailable_items_ = true; + return *this; + } + const char * get_name() const + { + return ""; + } + + void + set_thread_attribute( + const rcl_thread_attr_t &) + { + set_unavailable_items_ = true; + } + + void swap( + ThreadAttribute & other) + { + std::swap(*this, other); + } + +private: + friend struct rclcpp::Thread; + template + friend void this_thread::run_with_thread_attribute( + ThreadAttribute & attr, F && f, Args && ... args); + + bool set_unavailable_items_; + bool run_as_detached_; +}; + +} // namespace detail + +} // namespace rclcpp + +#endif // RCLCPP__THREADS__STD__THREAD_ATTRIBUTE_HPP_ diff --git a/rclcpp/include/rclcpp/threads/windows/thread.hpp b/rclcpp/include/rclcpp/threads/windows/thread.hpp new file mode 100644 index 0000000000..49e6134fcf --- /dev/null +++ b/rclcpp/include/rclcpp/threads/windows/thread.hpp @@ -0,0 +1,22 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__WINDOWS__THREAD_HPP_ +#define RCLCPP__THREADS__WINDOWS__THREAD_HPP_ + +// Not implemented so far. +// The windows specific code will be implemented +// while discussing the scheduling parameter passing feature at Real-time WG. + +#endif // RCLCPP__THREADS__WINDOWS__THREAD_HPP_ diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 507d47f913..e835ef4af9 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -23,6 +23,7 @@ #include "rclcpp/logging.hpp" #include "rclcpp/utilities.hpp" +#include "rclcpp/threads.hpp" using rclcpp::executors::MultiThreadedExecutor; @@ -33,13 +34,34 @@ MultiThreadedExecutor::MultiThreadedExecutor( std::chrono::nanoseconds next_exec_timeout) : rclcpp::Executor(options), yield_before_execute_(yield_before_execute), - next_exec_timeout_(next_exec_timeout) + next_exec_timeout_(next_exec_timeout), + thread_attributes_(nullptr) { + bool has_number_of_threads_arg = number_of_threads > 0; + rcl_ret_t ret; + number_of_threads_ = number_of_threads > 0 ? number_of_threads : std::max(std::thread::hardware_concurrency(), 2U); - if (number_of_threads_ == 1) { + ret = rcl_arguments_get_thread_attrs( + &options.context->get_rcl_context()->global_arguments, + &thread_attributes_); + if (ret != RCL_RET_OK) { + ret = rcl_context_get_thread_attrs( + options.context->get_rcl_context().get(), + &thread_attributes_); + } + + if (has_number_of_threads_arg && thread_attributes_ && + thread_attributes_->num_attributes != number_of_threads) + { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "The number of threads argument passed to the MultiThreadedExecutor" + " is different from the number of thread attributes.\n" + "The executor runs using the thread attributes and ignores the former."); + } else if (number_of_threads_ == 1) { RCLCPP_WARN( rclcpp::get_logger("rclcpp"), "MultiThreadedExecutor is used with a single thread.\n" @@ -56,17 +78,35 @@ MultiThreadedExecutor::spin() throw std::runtime_error("spin() called while already spinning"); } RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); - std::vector threads; + std::vector threads; size_t thread_id = 0; - { - std::lock_guard wait_lock{wait_mutex_}; - for (; thread_id < number_of_threads_ - 1; ++thread_id) { - auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); - threads.emplace_back(func); + + if (thread_attributes_) { + rclcpp::detail::ThreadAttribute thread_attr; + { + std::lock_guard wait_lock{wait_mutex_}; + for (; thread_id < thread_attributes_->num_attributes - 1; ++thread_id) { + thread_attr.set_thread_attribute( + thread_attributes_->attributes[thread_id]); + auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); + threads.emplace_back(rclcpp::Thread(thread_attr, func)); + } + } + thread_attr.set_thread_attribute( + thread_attributes_->attributes[thread_id]); + this_thread::run_with_thread_attribute( + thread_attr, &MultiThreadedExecutor::run, this, thread_id); + } else { + { + std::lock_guard wait_lock{wait_mutex_}; + for (; thread_id < number_of_threads_ - 1; ++thread_id) { + auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); + threads.emplace_back(func); + } } + run(thread_id); } - run(thread_id); for (auto & thread : threads) { thread.join(); } @@ -75,7 +115,11 @@ MultiThreadedExecutor::spin() size_t MultiThreadedExecutor::get_number_of_threads() { - return number_of_threads_; + if (thread_attributes_) { + return thread_attributes_->num_attributes; + } else { + return number_of_threads_; + } } void diff --git a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp index e7f311c147..6c688851e6 100644 --- a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp @@ -16,16 +16,49 @@ #include "rclcpp/executors/single_threaded_executor.hpp" #include "rclcpp/any_executable.hpp" +#include "rclcpp/threads.hpp" using rclcpp::executors::SingleThreadedExecutor; SingleThreadedExecutor::SingleThreadedExecutor(const rclcpp::ExecutorOptions & options) -: rclcpp::Executor(options) {} +: rclcpp::Executor(options), + thread_attributes_(nullptr) +{ + rcl_ret_t ret; + + ret = rcl_arguments_get_thread_attrs( + &options.context->get_rcl_context()->global_arguments, + &thread_attributes_); + if (ret != RCL_RET_OK) { + ret = rcl_context_get_thread_attrs( + options.context->get_rcl_context().get(), + &thread_attributes_); + } + if (thread_attributes_ && thread_attributes_->num_attributes != 1) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Specified thread attributes contains multiple configurations.\n" + "The executor runs only using first configuration."); + } +} SingleThreadedExecutor::~SingleThreadedExecutor() {} void SingleThreadedExecutor::spin() +{ + if (thread_attributes_) { + rclcpp::detail::ThreadAttribute thread_attr; + thread_attr.set_thread_attribute( + thread_attributes_->attributes[0]); + rclcpp::this_thread::run_with_thread_attribute(thread_attr, &SingleThreadedExecutor::run, this); + } else { + run(); + } +} + +void +SingleThreadedExecutor::run() { if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); diff --git a/rclcpp/src/rclcpp/threads/posix_thread.cpp b/rclcpp/src/rclcpp/threads/posix_thread.cpp new file mode 100644 index 0000000000..7599aa499f --- /dev/null +++ b/rclcpp/src/rclcpp/threads/posix_thread.cpp @@ -0,0 +1,171 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include + +#include "rclcpp/threads/posix/thread.hpp" +#include "rclcpp/threads/posix/utilities.hpp" + +static void set_pthread_attr(pthread_attr_t & native_attr, rclcpp::Thread::Attribute const & attr); +static void * thread_main(void * p); + +namespace rclcpp +{ + +Thread::Thread(Attribute const * attr, ThreadFuncBase::UniquePtr func) +: handle_(NativeHandleType{}), name_(attr ? attr->get_name() : std::string{}) +{ + Attribute::NativeAttributeType native_attr; + int r = pthread_attr_init(&native_attr); + detail::throw_if_error(r, "Error in pthread_attr_init "); + + if (attr != nullptr) { + set_pthread_attr(native_attr, *attr); + } + + NativeHandleType h; + r = pthread_create(&h, &native_attr, thread_main, func.get()); + detail::throw_if_error(r, "Error in pthread_create "); + + if (attr == nullptr || !attr->get_run_as_detached()) { + this->handle_ = h; + } + + pthread_attr_destroy(&native_attr); + + func.release(); +} + +void Thread::apply_attr(Attribute const & attr) +{ + int r; + int policy = attr.get_sched_policy(); +#if __linux__ + if (policy != SCHED_FIFO && policy != SCHED_RR && policy != SCHED_OTHER) { + sched_param param; + param.sched_priority = attr.get_priority(); + r = pthread_setschedparam(pthread_self(), policy, ¶m); + detail::throw_if_error(r, "Error in pthread_setschedparam "); + } +#endif // #if __linux__ +} + +namespace detail +{ + +ThreadAttribute::ThreadAttribute() +{ + NativeAttributeType attr; + int r; + + r = pthread_attr_init(&attr); + throw_if_error(r, "Error in pthread_attr_init "); + + r = pthread_attr_getschedpolicy(&attr, &sched_policy_); + throw_if_error(r, "Error in pthread_attr_getschedpolicy "); + + r = pthread_attr_getstacksize(&attr, &stack_size_); + throw_if_error(r, "Error in pthread_attr_getstacksize "); + + sched_param param; + r = pthread_attr_getschedparam(&attr, ¶m); + throw_if_error(r, "Error in pthread_attr_getschedparam "); + priority_ = param.sched_priority; + + int flag; + r = pthread_attr_getdetachstate(&attr, &flag); + throw_if_error(r, "Error in pthread_attr_getdetachstate "); + detached_flag_ = (flag == PTHREAD_CREATE_DETACHED); + + pthread_attr_destroy(&attr); +} + + +void apply_attr_to_current_thread(ThreadAttribute const & attr) +{ + int r; + +#if __linux__ + CpuSet cpu_set = attr.get_affinity(); + CpuSet::NativeCpuSetType * native_cpu_set = cpu_set.native_cpu_set(); + if (native_cpu_set) { + std::size_t alloc_size = cpu_set.alloc_size(); + r = pthread_setaffinity_np(pthread_self(), alloc_size, native_cpu_set); + throw_if_error(r, "Error in sched_setaffinity "); + } +#endif // #if __linux__ + + sched_param param; + param.sched_priority = attr.get_priority(); + int policy = attr.get_sched_policy(); + r = pthread_setschedparam(pthread_self(), policy, ¶m); + throw_if_error(r, "Error in sched_setscheduler"); +} + +} // namespace detail + +} // namespace rclcpp + +static void * thread_main(void * p) +{ + using rclcpp::detail::ThreadFuncBase; + ThreadFuncBase::UniquePtr func(reinterpret_cast(p)); + + try { + func->run(); + } catch (...) { + std::cerr << "failed to run thread" << std::endl; + std::terminate(); + } + + return nullptr; +} + +static void set_pthread_attr(pthread_attr_t & native_attr, rclcpp::Thread::Attribute const & attr) +{ + int r; + using rclcpp::detail::throw_if_error; + +#if defined(__linux__) + rclcpp::detail::CpuSet affinity = attr.get_affinity(); + size_t cpu_size = CPU_ALLOC_SIZE(static_cast(sysconf(_SC_NPROCESSORS_ONLN))); + r = pthread_attr_setaffinity_np(&native_attr, cpu_size, affinity.native_cpu_set()); + throw_if_error(r, "Error in pthread_attr_setaffinity_np "); +#endif + + std::size_t stack_size = attr.get_stack_size(); + r = pthread_attr_setstacksize(&native_attr, stack_size); + throw_if_error(r, "Error in pthread_attr_setstacksize "); + + int flag = attr.get_run_as_detached() ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE; + r = pthread_attr_setdetachstate(&native_attr, flag); + throw_if_error(r, "Error in pthread_attr_setdetachstate "); + + int sched_policy = attr.get_sched_policy(); + if (sched_policy == SCHED_FIFO || sched_policy == SCHED_RR) { + r = pthread_attr_setinheritsched(&native_attr, PTHREAD_EXPLICIT_SCHED); + throw_if_error(r, "Error in pthread_attr_setinheritsched "); + + r = pthread_attr_setschedpolicy(&native_attr, sched_policy); + throw_if_error(r, "Error in pthread_attr_setschedpolicy "); + + sched_param param; + param.sched_priority = attr.get_priority(); + r = pthread_attr_setschedparam(&native_attr, ¶m); + throw_if_error(r, "Error in pthread_attr_setschedparam "); + } +} diff --git a/rclcpp/src/rclcpp/threads/windows_thread.cpp b/rclcpp/src/rclcpp/threads/windows_thread.cpp new file mode 100644 index 0000000000..2f6ca50ec7 --- /dev/null +++ b/rclcpp/src/rclcpp/threads/windows_thread.cpp @@ -0,0 +1,19 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/threads/windows/thread.hpp" + +// Not implemented so far. +// The windows specific code will be implemented +// while discussing the scheduling parameter passing feature at Real-time WG. From b0988dd4764b9c3ab11a6ce966a1c4d6454979fc Mon Sep 17 00:00:00 2001 From: Shoji Morita Date: Tue, 20 Jun 2023 20:12:55 +0900 Subject: [PATCH 2/5] Decoupled the additional feature from rclcpp to rcpputils, reflecting on the pointing out below. https://github.com/ros2/rclcpp/pull/2205#issuecomment-1593832758 Signed-off-by: Shoji Morita --- rclcpp/CMakeLists.txt | 10 - .../executors/multi_threaded_executor.hpp | 9 +- .../executors/single_threaded_executor.hpp | 4 +- rclcpp/include/rclcpp/threads.hpp | 26 --- .../rclcpp/threads/posix/linux/cpu_set.hpp | 158 -------------- .../include/rclcpp/threads/posix/thread.hpp | 205 ------------------ .../rclcpp/threads/posix/thread_attribute.hpp | 189 ---------------- .../rclcpp/threads/posix/thread_func.hpp | 54 ----- .../rclcpp/threads/posix/thread_id.hpp | 146 ------------- .../rclcpp/threads/posix/utilities.hpp | 42 ---- rclcpp/include/rclcpp/threads/std/thread.hpp | 145 ------------- .../rclcpp/threads/std/thread_attribute.hpp | 165 -------------- .../include/rclcpp/threads/windows/thread.hpp | 22 -- .../executors/multi_threaded_executor.cpp | 12 +- .../executors/single_threaded_executor.cpp | 7 +- rclcpp/src/rclcpp/threads/posix_thread.cpp | 171 --------------- rclcpp/src/rclcpp/threads/windows_thread.cpp | 19 -- 17 files changed, 16 insertions(+), 1368 deletions(-) delete mode 100644 rclcpp/include/rclcpp/threads.hpp delete mode 100644 rclcpp/include/rclcpp/threads/posix/linux/cpu_set.hpp delete mode 100644 rclcpp/include/rclcpp/threads/posix/thread.hpp delete mode 100644 rclcpp/include/rclcpp/threads/posix/thread_attribute.hpp delete mode 100644 rclcpp/include/rclcpp/threads/posix/thread_func.hpp delete mode 100644 rclcpp/include/rclcpp/threads/posix/thread_id.hpp delete mode 100644 rclcpp/include/rclcpp/threads/posix/utilities.hpp delete mode 100644 rclcpp/include/rclcpp/threads/std/thread.hpp delete mode 100644 rclcpp/include/rclcpp/threads/std/thread_attribute.hpp delete mode 100644 rclcpp/include/rclcpp/threads/windows/thread.hpp delete mode 100644 rclcpp/src/rclcpp/threads/posix_thread.cpp delete mode 100644 rclcpp/src/rclcpp/threads/windows_thread.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 85434df9a0..b2d1023064 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -121,16 +121,6 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/waitable.cpp ) -if(${CMAKE_SYSTEM_NAME} MATCHES "Linux") - list(APPEND ${PROJECT_NAME}_SRCS - src/rclcpp/threads/posix_thread.cpp - ) -elseif(${CMAKE_SYSTEM_NAME} MATCHES "Windows") - list(APPEND ${PROJECT_NAME}_SRCS - src/rclcpp/threads/windows_thread.cpp - ) -endif() - find_package(Python3 REQUIRED COMPONENTS Interpreter) # "watch" template for changes diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 8c48c5fe1e..9ec7ace383 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -20,10 +20,9 @@ #include #include #include -#include #include -#include "rcl_yaml_param_parser/types.h" +#include "rcutils/thread_attr.h" #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" @@ -41,7 +40,7 @@ class MultiThreadedExecutor : public rclcpp::Executor /// Constructor for MultiThreadedExecutor. /** - * For the yield_before_execute option, when true std::this_thread::yield() + * For the yield_before_execute option, when true rcpputils::this_thread::yield() * will be called after acquiring work (as an AnyExecutable) and * releasing the spinning lock, but before executing the work. * This is useful for reproducing some bugs related to taking work more than @@ -50,7 +49,7 @@ class MultiThreadedExecutor : public rclcpp::Executor * \param options common options for all executors * \param number_of_threads number of threads to have in the thread pool, * the default 0 will use the number of cpu cores found (minimum of 2) - * \param yield_before_execute if true std::this_thread::yield() is called + * \param yield_before_execute if true rcpputils::this_thread::yield() is called * \param timeout maximum time to wait */ RCLCPP_PUBLIC @@ -87,7 +86,7 @@ class MultiThreadedExecutor : public rclcpp::Executor size_t number_of_threads_; bool yield_before_execute_; std::chrono::nanoseconds next_exec_timeout_; - rcl_thread_attrs_t * thread_attributes_; + rcutils_thread_attrs_t * thread_attributes_; }; } // namespace executors diff --git a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp index 526592da16..95c2818a61 100644 --- a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp @@ -22,7 +22,7 @@ #include #include -#include "rcl_yaml_param_parser/types.h" +#include "rcutils/thread_attr.h" #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" @@ -73,7 +73,7 @@ class SingleThreadedExecutor : public rclcpp::Executor private: RCLCPP_DISABLE_COPY(SingleThreadedExecutor) - rcl_thread_attrs_t * thread_attributes_; + rcutils_thread_attrs_t * thread_attributes_; }; } // namespace executors diff --git a/rclcpp/include/rclcpp/threads.hpp b/rclcpp/include/rclcpp/threads.hpp deleted file mode 100644 index ac628de3e1..0000000000 --- a/rclcpp/include/rclcpp/threads.hpp +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2023 eSOL Co.,Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP__THREADS_HPP_ -#define RCLCPP__THREADS_HPP_ - -#if defined(__linux__) -#include "rclcpp/threads/posix/thread.hpp" -#elif defined(_WIN32) -#include "rclcpp/threads/win32/thread.hpp" -#else -#include "rclcpp/threads/std/thread.hpp" -#endif - -#endif // RCLCPP__THREADS_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/linux/cpu_set.hpp b/rclcpp/include/rclcpp/threads/posix/linux/cpu_set.hpp deleted file mode 100644 index 60e9a1fbd7..0000000000 --- a/rclcpp/include/rclcpp/threads/posix/linux/cpu_set.hpp +++ /dev/null @@ -1,158 +0,0 @@ -// Copyright 2023 eSOL Co.,Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP__THREADS__POSIX__LINUX__CPU_SET_HPP_ -#define RCLCPP__THREADS__POSIX__LINUX__CPU_SET_HPP_ - -#include -#include -#include -#include - -#include "rclcpp/visibility_control.hpp" - -namespace rclcpp -{ - -namespace detail -{ - -struct CpuSet -{ - using NativeCpuSetType = cpu_set_t; - CpuSet() = default; - explicit CpuSet(std::size_t cpu) - { - init_cpu_set(); - CPU_ZERO_S(alloc_size(), cpu_set_.get()); - CPU_SET_S(cpu, alloc_size(), cpu_set_.get()); - } - CpuSet(const CpuSet & other) - { - if (other.cpu_set_) { - init_cpu_set(); - memcpy(cpu_set_.get(), other.cpu_set_.get(), alloc_size()); - } - } - CpuSet & operator=(const CpuSet & other) - { - if (other.cpu_set_) { - init_cpu_set(); - memcpy(cpu_set_.get(), other.cpu_set_.get(), alloc_size()); - } else { - clear(); - } - return *this; - } - CpuSet(CpuSet && other) - : CpuSet() - { - swap(other); - } - CpuSet & operator=(CpuSet && other) - { - CpuSet tmp; - other.swap(tmp); - tmp.swap(*this); - return *this; - } - void swap(CpuSet & other) - { - using std::swap; - swap(cpu_set_, other.cpu_set_); - swap(num_proc_, other.num_proc_); - } - void set(std::size_t cpu) - { - init_cpu_set(); - valid_cpu(cpu); - CPU_SET_S(cpu, alloc_size(), cpu_set_.get()); - } - void unset(std::size_t cpu) - { - init_cpu_set(); - valid_cpu(cpu); - CPU_CLR_S(cpu, alloc_size(), cpu_set_.get()); - } - void clear() - { - if (cpu_set_) { - CPU_ZERO_S(alloc_size(), cpu_set_.get()); - } - } - bool is_set(std::size_t cpu) const - { - if (cpu_set_) { - valid_cpu(cpu); - return CPU_ISSET_S(cpu, alloc_size(), cpu_set_.get()); - } else { - return false; - } - } - - std::size_t max_processors() const - { - return num_proc_; - } - std::size_t alloc_size() const - { - return CPU_ALLOC_SIZE(num_proc_); - } - NativeCpuSetType * native_cpu_set() const - { - return cpu_set_.get(); - } - -private: - void init_cpu_set() - { - if (cpu_set_) { - return; - } - auto num_proc = sysconf(_SC_NPROCESSORS_ONLN); - if (num_proc == -1) { - return; - } - auto p = CPU_ALLOC(CPU_ALLOC_SIZE(num_proc)); - cpu_set_ = std::unique_ptr(p); - num_proc_ = num_proc; - } - void valid_cpu(std::size_t cpu) const - { - if (num_proc_ <= cpu) { - auto ec = std::make_error_code(std::errc::invalid_argument); - throw std::system_error{ec, "cpu number is invaild"}; - } - } - struct CpuSetDeleter - { - void operator()(NativeCpuSetType * cpu_set) const - { - CPU_FREE(cpu_set); - } - }; - std::unique_ptr cpu_set_; - std::size_t num_proc_; -}; - -inline void swap(CpuSet & a, CpuSet & b) -{ - a.swap(b); -} - -} // namespace detail - -} // namespace rclcpp - -#endif // RCLCPP__THREADS__POSIX__LINUX__CPU_SET_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread.hpp b/rclcpp/include/rclcpp/threads/posix/thread.hpp deleted file mode 100644 index 2a249541f4..0000000000 --- a/rclcpp/include/rclcpp/threads/posix/thread.hpp +++ /dev/null @@ -1,205 +0,0 @@ -// Copyright 2023 eSOL Co.,Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP__THREADS__POSIX__THREAD_HPP_ -#define RCLCPP__THREADS__POSIX__THREAD_HPP_ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "rclcpp/macros.hpp" -#include "rclcpp/threads/posix/thread_attribute.hpp" -#include "rclcpp/threads/posix/thread_func.hpp" -#include "rclcpp/threads/posix/thread_id.hpp" -#include "rclcpp/threads/posix/utilities.hpp" -#include "rclcpp/visibility_control.hpp" - -namespace rclcpp -{ - -struct Thread -{ - using NativeHandleType = pthread_t; - using Attribute = detail::ThreadAttribute; - using Id = detail::ThreadId; - - // Assume pthread_t is an invalid handle if it's 0 - Thread() noexcept - : handle_{} {} - Thread(Thread && other) - : handle_{} - { - swap(other); - } - template, Attribute>::value>> - explicit Thread(F && f, Args && ... args) - : Thread( - static_cast(nullptr), - make_thread_func(std::forward(f), std::forward(args)...)) - {} - template - Thread(Attribute const & attr, F && f, Args && ... args) - : Thread( - &attr, - make_thread_func_with_attr(attr, std::forward(f), std::forward(args)...)) - {} - Thread(Thread const &) = delete; - ~Thread() - { - if (handle_) { - std::terminate(); - } - } - - Thread & operator=(Thread && other) noexcept - { - if (handle_) { - std::terminate(); - } - swap(other); - return *this; - } - - Thread & operator=(Thread const &) = delete; - - void swap(Thread & other) - { - using std::swap; - swap(handle_, other.handle_); - swap(name_, other.name_); - } - - void join() - { - void * p; - int r = pthread_join(handle_, &p); - detail::throw_if_error(r, "Error in pthread_join "); - handle_ = NativeHandleType{}; - } - - bool joinable() const noexcept - { - return 0 == pthread_equal(handle_, NativeHandleType{}); - } - - void detach() - { - int r = pthread_detach(handle_); - detail::throw_if_error(r, "Error in pthread_detach "); - handle_ = NativeHandleType{}; - } - - NativeHandleType native_handle() const - { - return handle_; - } - - Id get_id() const noexcept - { - return Id{handle_}; - } - - static unsigned int hardware_concurrency() noexcept - { - auto r = sysconf(_SC_NPROCESSORS_ONLN); - if (r == -1) { - return 0u; - } else { - return static_cast(r); - } - } - -private: - using ThreadFuncBase = detail::ThreadFuncBase; - template - static ThreadFuncBase::UniquePtr make_thread_func(F && f, Args && ... args) - { - static_assert( - !std::is_member_object_pointer_v>, - "F is a pointer to member, that has no effect on a thread"); - - detail::ThreadFuncBase * func = new detail::ThreadFunc( - [f = std::forward(f), args = std::tuple(std::forward(args)...)]() mutable - { - std::apply(f, args); - }); - return ThreadFuncBase::UniquePtr(func); - } - template - static ThreadFuncBase::UniquePtr make_thread_func_with_attr( - Attribute const & attr, - F && f, - Args && ... args) - { - static_assert( - !std::is_member_object_pointer_v>, - "F is a pointer to member, that has no effect on a thread"); - - detail::ThreadFuncBase * func = new detail::ThreadFunc( - [attr, f = std::forward(f), args = std::tuple(std::forward(args)...)]() mutable - { - std::apply(f, args); - }); - return ThreadFuncBase::UniquePtr(func); - } - - Thread(Attribute const * attr, ThreadFuncBase::UniquePtr func); - - static void apply_attr(Attribute const & attr); - - NativeHandleType handle_; - std::string name_; -}; - -inline void swap(Thread & t1, Thread & t2) -{ - t1.swap(t2); -} - -namespace detail -{ -void apply_attr_to_current_thread(ThreadAttribute const & attr); -} - -namespace this_thread -{ - -template -void run_with_thread_attribute( - detail::ThreadAttribute const & attr, F && f, Args && ... args) -{ - static_assert( - !std::is_member_object_pointer_v>, - "F is a pointer to member, that has no effect on a thread"); - - detail::apply_attr_to_current_thread(attr); - std::invoke(std::forward(f), std::forward(args)...); -} - -} // namespace this_thread - -} // namespace rclcpp - -#endif // RCLCPP__THREADS__POSIX__THREAD_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread_attribute.hpp b/rclcpp/include/rclcpp/threads/posix/thread_attribute.hpp deleted file mode 100644 index e2baa37696..0000000000 --- a/rclcpp/include/rclcpp/threads/posix/thread_attribute.hpp +++ /dev/null @@ -1,189 +0,0 @@ -// Copyright 2023 eSOL Co.,Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP__THREADS__POSIX__THREAD_ATTRIBUTE_HPP_ -#define RCLCPP__THREADS__POSIX__THREAD_ATTRIBUTE_HPP_ - -#include -#include -#include - -#include "rcl_yaml_param_parser/types.h" -#include "rclcpp/visibility_control.hpp" - -#ifdef __linux__ -#include "rclcpp/threads/posix/linux/cpu_set.hpp" -#endif - -namespace rclcpp -{ - -namespace detail -{ - -struct ThreadAttribute -{ - ThreadAttribute(); - - ThreadAttribute(const ThreadAttribute &) = default; - ThreadAttribute(ThreadAttribute &&) = default; - ThreadAttribute & operator=(const ThreadAttribute &) = default; - ThreadAttribute & operator=(ThreadAttribute &&) = default; - - using NativeAttributeType = pthread_attr_t; - - ThreadAttribute & set_affinity(CpuSet cs) - { - cpu_set_ = std::move(cs); - return *this; - } - const CpuSet & get_affinity() const - { - return cpu_set_; - } - - ThreadAttribute & set_sched_policy(rcl_thread_scheduling_policy_type_t sp) - { - sched_policy_ = rcl_scheduling_policy_to_sched_policy(sp); - return *this; - } - int get_sched_policy() const - { - return sched_policy_; - } - - ThreadAttribute & set_stack_size(std::size_t sz) - { - stack_size_ = sz; - return *this; - } - std::size_t get_stack_size() const - { - return stack_size_; - } - - ThreadAttribute & set_priority(int prio) - { - priority_ = prio; - return *this; - } - int get_priority() const - { - return priority_; - } - - ThreadAttribute & set_run_as_detached(bool detach) - { - detached_flag_ = detach; - return *this; - } - bool get_run_as_detached() const - { - return detached_flag_; - } - - ThreadAttribute & set_name(std::string name) - { - name_ = std::move(name); - return *this; - } - const std::string & get_name() const - { - return name_; - } - - void - set_thread_attribute( - const rcl_thread_attr_t & attr) - { - CpuSet cpu_set(attr.core_affinity); - set_affinity(std::move(cpu_set)); - set_sched_policy(attr.scheduling_policy); - set_priority(attr.priority); - set_name(attr.name); - } - - void - swap( - ThreadAttribute & other) - { - using std::swap; - swap(cpu_set_, other.cpu_set_); - swap(sched_policy_, other.sched_policy_); - swap(stack_size_, other.stack_size_); - swap(priority_, other.priority_); - swap(detached_flag_, other.detached_flag_); - swap(name_, other.name_); - } - -private: - CpuSet cpu_set_; - int sched_policy_; - std::size_t stack_size_; - int priority_; - bool detached_flag_; - std::string name_; - - int rcl_scheduling_policy_to_sched_policy( - rcl_thread_scheduling_policy_type_t sched_policy) - { - switch (sched_policy) { -#ifdef SCHED_FIFO - case RCL_THREAD_SCHEDULING_POLICY_FIFO: - return SCHED_FIFO; -#endif -#ifdef SCHED_RR - case RCL_THREAD_SCHEDULING_POLICY_RR: - return SCHED_RR; -#endif -#ifdef SCHED_OTHER - case RCL_THREAD_SCHEDULING_POLICY_OTHER: - return SCHED_OTHER; -#endif -#ifdef SCHED_IDLE - case RCL_THREAD_SCHEDULING_POLICY_IDLE: - return SCHED_IDLE; -#endif -#ifdef SCHED_BATCH - case RCL_THREAD_SCHEDULING_POLICY_BATCH: - return SCHED_BATCH; -#endif -#ifdef SCHED_SPORADIC - case RCL_THREAD_SCHEDULING_POLICY_SPORADIC: - return SCHED_SPORADIC; -#endif - /* Todo: Necessity and setting method need to be considered - #ifdef SCHED_DEADLINE - case RCL_THREAD_SCHEDULING_POLICY_DEADLINE: - return SCHED_DEADLINE; - break; - #endif - */ - default: - throw std::runtime_error("Invalid scheduling policy"); - } - return -1; - } -}; - -inline void swap(ThreadAttribute & a, ThreadAttribute & b) -{ - a.swap(b); -} - -} // namespace detail - -} // namespace rclcpp - -#endif // RCLCPP__THREADS__POSIX__THREAD_ATTRIBUTE_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread_func.hpp b/rclcpp/include/rclcpp/threads/posix/thread_func.hpp deleted file mode 100644 index d4ae497e14..0000000000 --- a/rclcpp/include/rclcpp/threads/posix/thread_func.hpp +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2023 eSOL Co.,Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP__THREADS__POSIX__THREAD_FUNC_HPP_ -#define RCLCPP__THREADS__POSIX__THREAD_FUNC_HPP_ - -#include -#include -#include - -namespace rclcpp::detail -{ - -struct ThreadFuncBase -{ - virtual ~ThreadFuncBase() = default; - virtual void run() = 0; - RCLCPP_UNIQUE_PTR_DEFINITIONS(ThreadFuncBase) -}; - -template -struct ThreadFunc : ThreadFuncBase -{ - template - explicit ThreadFunc(G && g) - : func_(std::forward(g)) - {} - -private: - void run() override - { - func_(); - } - - F func_; -}; - -template -ThreadFunc(F &&)->ThreadFunc>; - -} // namespace rclcpp::detail - -#endif // RCLCPP__THREADS__POSIX__THREAD_FUNC_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread_id.hpp b/rclcpp/include/rclcpp/threads/posix/thread_id.hpp deleted file mode 100644 index 20fa3b8a00..0000000000 --- a/rclcpp/include/rclcpp/threads/posix/thread_id.hpp +++ /dev/null @@ -1,146 +0,0 @@ -// Copyright 2023 eSOL Co.,Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP__THREADS__POSIX__THREAD_ID_HPP_ -#define RCLCPP__THREADS__POSIX__THREAD_ID_HPP_ - -#include - -#include "rclcpp/visibility_control.hpp" - -namespace rclcpp -{ - -#ifdef __clang__ -#pragma clang diagnostic push -#pragma clang diagnostic ignored "-Wmismatched-tags" -#endif - -struct Thread; - -namespace detail -{ - -namespace thread_id_ns -{ - -struct ThreadId; - -inline ThreadId get_id() noexcept; -inline bool operator==(ThreadId id1, ThreadId id2); -inline bool operator!=(ThreadId id1, ThreadId id2); -inline bool operator<(ThreadId id1, ThreadId id2); -inline bool operator>(ThreadId id1, ThreadId id2); -inline bool operator<=(ThreadId id1, ThreadId id2); -inline bool operator>=(ThreadId id1, ThreadId id2); -template -inline std::basic_ostream & operator<<( - std::basic_ostream &, - ThreadId); - -struct ThreadId -{ - ThreadId() = default; - ThreadId(ThreadId const &) = default; - ThreadId(ThreadId &&) = default; - ThreadId & operator=(ThreadId const &) = default; - ThreadId & operator=(ThreadId &&) = default; - - friend bool operator==(ThreadId id1, ThreadId id2) - { - return pthread_equal(id1.h, id2.h); - } - friend bool operator<(ThreadId id1, ThreadId id2) - { - return id1.h < id2.h; - } - template - friend std::basic_ostream & operator<<( - std::basic_ostream & ost, - ThreadId id) - { - return ost << id.h; - } - -private: - friend class rclcpp::Thread; - friend ThreadId get_id() noexcept; - friend struct std::hash; - explicit ThreadId(pthread_t h) - : h(h) {} - pthread_t h; -}; - -ThreadId get_id() noexcept -{ - return ThreadId{pthread_self()}; -} - -bool operator!=(ThreadId id1, ThreadId id2) -{ - return !(id1 == id2); -} - -bool operator>(ThreadId id1, ThreadId id2) -{ - return id2 < id1; -} - -bool operator<=(ThreadId id1, ThreadId id2) -{ - return !(id1 > id2); -} - -bool operator>=(ThreadId id1, ThreadId id2) -{ - return !(id1 < id2); -} - -} // namespace thread_id_ns - -using thread_id_ns::ThreadId; -using thread_id_ns::operator==; -using thread_id_ns::operator!=; -using thread_id_ns::operator<; // NOLINT -using thread_id_ns::operator>; // NOLINT -using thread_id_ns::operator<=; -using thread_id_ns::operator>=; -using thread_id_ns::operator<<; - -} // namespace detail - -namespace this_thread -{ - -using detail::thread_id_ns::get_id; - -} // namespace this_thread - -} // namespace rclcpp - -namespace std -{ - -template<> -struct hash -{ - std::size_t operator()(rclcpp::detail::thread_id_ns::ThreadId id) - { - return id.h; - } -}; - -} // namespace std - -#endif // RCLCPP__THREADS__POSIX__THREAD_ID_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/utilities.hpp b/rclcpp/include/rclcpp/threads/posix/utilities.hpp deleted file mode 100644 index 4b648db276..0000000000 --- a/rclcpp/include/rclcpp/threads/posix/utilities.hpp +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2023 eSOL Co.,Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP__THREADS__POSIX__UTILITIES_HPP_ -#define RCLCPP__THREADS__POSIX__UTILITIES_HPP_ - -#include - -namespace rclcpp -{ - -namespace detail -{ - -namespace -{ - -inline void throw_if_error(int r, char const * msg) -{ - if (r != 0) { - throw std::system_error(r, std::system_category(), msg); - } -} - -} // namespace - -} // namespace detail - -} // namespace rclcpp - -#endif // RCLCPP__THREADS__POSIX__UTILITIES_HPP_ diff --git a/rclcpp/include/rclcpp/threads/std/thread.hpp b/rclcpp/include/rclcpp/threads/std/thread.hpp deleted file mode 100644 index 3ce0b9e188..0000000000 --- a/rclcpp/include/rclcpp/threads/std/thread.hpp +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright 2023 eSOL Co.,Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP__THREADS__STD__THREAD_HPP_ -#define RCLCPP__THREADS__STD__THREAD_HPP_ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "rclcpp/threads/std/thread_attribute.hpp" -#include "rclcpp/visibility_control.hpp" - -namespace rclcpp -{ - -struct Thread -{ - using NativeHandleType = std::thread::native_handle_type; - using Attribute = detail::ThreadAttribute; - using Id = std::thread::id; - - Thread() noexcept - : thread_{} - {} - Thread(Thread && other) - : thread_{} - { - swap(other); - } - template, Attribute>::value>> - explicit Thread(F && f, Args && ... args) - : thread_(std::forward(f), std::forward(args)...) - {} - template - Thread(Attribute & attr, F && f, Args && ... args) - : thread_(std::forward(f), std::forward(args)...) - { - if (attr.set_unavailable_items_) { - throw std::runtime_error("std::thread can't set thread attribute"); - } - if (attr.get_run_as_detached()) { - thread_.detach(); - } - } - Thread(Thread const &) = delete; - ~Thread() {} - - Thread & operator=(Thread && other) noexcept - { - swap(other); - return *this; - } - - Thread & operator=(Thread const &) = delete; - - void swap(Thread & other) - { - using std::swap; - swap(thread_, other.thread_); - } - - void join() - { - thread_.join(); - thread_ = std::thread{}; - } - - bool joinable() const noexcept - { - return thread_.joinable(); - } - - void detach() - { - thread_.detach(); - thread_ = std::thread{}; - } - - NativeHandleType native_handle() - { - return thread_.native_handle(); - } - - Id get_id() const noexcept - { - return thread_.get_id(); - } - - static int hardware_concurrency() noexcept - { - return std::thread::hardware_concurrency(); - } - -private: - std::thread thread_; -}; - -inline void swap(Thread & t1, Thread & t2) -{ - t1.swap(t2); -} - -namespace this_thread -{ - -template -void run_with_thread_attribute(Thread::Attribute & attr, F && f, Args && ... args) -{ - static_assert( - !std::is_member_object_pointer_v>, - "F is a pointer to member, that is ineffective on thread"); - - if (attr.set_unavailable_items_) { - throw std::runtime_error("std::thread can't set thread attribute"); - } - - std::invoke(f, std::forward(args)...); -} - -} // namespace this_thread - -} // namespace rclcpp - -#endif // RCLCPP__THREADS__STD__THREAD_HPP_ diff --git a/rclcpp/include/rclcpp/threads/std/thread_attribute.hpp b/rclcpp/include/rclcpp/threads/std/thread_attribute.hpp deleted file mode 100644 index ef0da283be..0000000000 --- a/rclcpp/include/rclcpp/threads/std/thread_attribute.hpp +++ /dev/null @@ -1,165 +0,0 @@ -// Copyright 2023 eSOL Co.,Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP__THREADS__STD__THREAD_ATTRIBUTE_HPP_ -#define RCLCPP__THREADS__STD__THREAD_ATTRIBUTE_HPP_ - -#include -#include -#include - -#include "rcl_yaml_param_parser/types.h" -#include "rclcpp/visibility_control.hpp" - -namespace rclcpp -{ - -struct Thread; - -namespace detail -{ -struct ThreadAttribute; -} // namespace detail - -namespace this_thread -{ -template -void run_with_thread_attribute( - detail::ThreadAttribute & attr, F && f, Args && ... args); -} // namespace this_thread - -namespace detail -{ - -struct CpuSet -{ - using NativeCpuSetType = std::size_t; - CpuSet() {} - explicit CpuSet(std::size_t) {} - CpuSet(const CpuSet &) {} - CpuSet & operator=(const CpuSet &) - { - return *this; - } - CpuSet(CpuSet &&) = delete; - CpuSet & operator=(CpuSet &&) = delete; - ~CpuSet() {} - void set(std::size_t) {} - void unset(std::size_t) {} - void clear() {} - bool is_set(std::size_t) - { - return false; - } - std::size_t get_max_processors() const - { - return 0; - } - NativeCpuSetType native_cpu_set() const - { - return 0; - } -}; - -struct ThreadAttribute -{ - using PriorityType = int; - - ThreadAttribute() - : set_unavailable_items_(false) {} - - ThreadAttribute(const ThreadAttribute &) = default; - ThreadAttribute(ThreadAttribute &&) = default; - ThreadAttribute & operator=(const ThreadAttribute &) = default; - ThreadAttribute & operator=(ThreadAttribute &&) = default; - - ThreadAttribute & set_affinity(CpuSet &) - { - set_unavailable_items_ = true; - return *this; - } - CpuSet get_affinity() - { - return CpuSet{}; - } - - ThreadAttribute & set_stack_size(std::size_t) - { - set_unavailable_items_ = true; - return *this; - } - std::size_t get_stack_size() const - { - return 0; - } - - ThreadAttribute & set_priority(int prio) - { - (void)prio; - set_unavailable_items_ = true; - return *this; - } - int get_priority() const - { - return 0; - } - - ThreadAttribute & set_run_as_detached(bool detach) - { - run_as_detached_ = detach; - return *this; - } - bool get_run_as_detached() const - { - return run_as_detached_; - } - - ThreadAttribute & set_name(std::string const &) - { - set_unavailable_items_ = true; - return *this; - } - const char * get_name() const - { - return ""; - } - - void - set_thread_attribute( - const rcl_thread_attr_t &) - { - set_unavailable_items_ = true; - } - - void swap( - ThreadAttribute & other) - { - std::swap(*this, other); - } - -private: - friend struct rclcpp::Thread; - template - friend void this_thread::run_with_thread_attribute( - ThreadAttribute & attr, F && f, Args && ... args); - - bool set_unavailable_items_; - bool run_as_detached_; -}; - -} // namespace detail - -} // namespace rclcpp - -#endif // RCLCPP__THREADS__STD__THREAD_ATTRIBUTE_HPP_ diff --git a/rclcpp/include/rclcpp/threads/windows/thread.hpp b/rclcpp/include/rclcpp/threads/windows/thread.hpp deleted file mode 100644 index 49e6134fcf..0000000000 --- a/rclcpp/include/rclcpp/threads/windows/thread.hpp +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2023 eSOL Co.,Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP__THREADS__WINDOWS__THREAD_HPP_ -#define RCLCPP__THREADS__WINDOWS__THREAD_HPP_ - -// Not implemented so far. -// The windows specific code will be implemented -// while discussing the scheduling parameter passing feature at Real-time WG. - -#endif // RCLCPP__THREADS__WINDOWS__THREAD_HPP_ diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index e835ef4af9..c54f0c33e1 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -20,10 +20,10 @@ #include #include "rcpputils/scope_exit.hpp" +#include "rcpputils/threads.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/utilities.hpp" -#include "rclcpp/threads.hpp" using rclcpp::executors::MultiThreadedExecutor; @@ -42,7 +42,7 @@ MultiThreadedExecutor::MultiThreadedExecutor( number_of_threads_ = number_of_threads > 0 ? number_of_threads : - std::max(std::thread::hardware_concurrency(), 2U); + std::max(rcpputils::Thread::hardware_concurrency(), 2U); ret = rcl_arguments_get_thread_attrs( &options.context->get_rcl_context()->global_arguments, @@ -78,23 +78,23 @@ MultiThreadedExecutor::spin() throw std::runtime_error("spin() called while already spinning"); } RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); - std::vector threads; + std::vector threads; size_t thread_id = 0; if (thread_attributes_) { - rclcpp::detail::ThreadAttribute thread_attr; + rcpputils::Thread::Attribute thread_attr; { std::lock_guard wait_lock{wait_mutex_}; for (; thread_id < thread_attributes_->num_attributes - 1; ++thread_id) { thread_attr.set_thread_attribute( thread_attributes_->attributes[thread_id]); auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); - threads.emplace_back(rclcpp::Thread(thread_attr, func)); + threads.emplace_back(rcpputils::Thread(thread_attr, func)); } } thread_attr.set_thread_attribute( thread_attributes_->attributes[thread_id]); - this_thread::run_with_thread_attribute( + rcpputils::this_thread::run_with_thread_attribute( thread_attr, &MultiThreadedExecutor::run, this, thread_id); } else { { diff --git a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp index 6c688851e6..cb471992b4 100644 --- a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp @@ -13,10 +13,10 @@ // limitations under the License. #include "rcpputils/scope_exit.hpp" +#include "rcpputils/threads.hpp" #include "rclcpp/executors/single_threaded_executor.hpp" #include "rclcpp/any_executable.hpp" -#include "rclcpp/threads.hpp" using rclcpp::executors::SingleThreadedExecutor; @@ -48,10 +48,11 @@ void SingleThreadedExecutor::spin() { if (thread_attributes_) { - rclcpp::detail::ThreadAttribute thread_attr; + rcpputils::Thread::Attribute thread_attr; thread_attr.set_thread_attribute( thread_attributes_->attributes[0]); - rclcpp::this_thread::run_with_thread_attribute(thread_attr, &SingleThreadedExecutor::run, this); + rcpputils::this_thread::run_with_thread_attribute( + thread_attr, &SingleThreadedExecutor::run, this); } else { run(); } diff --git a/rclcpp/src/rclcpp/threads/posix_thread.cpp b/rclcpp/src/rclcpp/threads/posix_thread.cpp deleted file mode 100644 index 7599aa499f..0000000000 --- a/rclcpp/src/rclcpp/threads/posix_thread.cpp +++ /dev/null @@ -1,171 +0,0 @@ -// Copyright 2023 eSOL Co.,Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include - -#include - -#include "rclcpp/threads/posix/thread.hpp" -#include "rclcpp/threads/posix/utilities.hpp" - -static void set_pthread_attr(pthread_attr_t & native_attr, rclcpp::Thread::Attribute const & attr); -static void * thread_main(void * p); - -namespace rclcpp -{ - -Thread::Thread(Attribute const * attr, ThreadFuncBase::UniquePtr func) -: handle_(NativeHandleType{}), name_(attr ? attr->get_name() : std::string{}) -{ - Attribute::NativeAttributeType native_attr; - int r = pthread_attr_init(&native_attr); - detail::throw_if_error(r, "Error in pthread_attr_init "); - - if (attr != nullptr) { - set_pthread_attr(native_attr, *attr); - } - - NativeHandleType h; - r = pthread_create(&h, &native_attr, thread_main, func.get()); - detail::throw_if_error(r, "Error in pthread_create "); - - if (attr == nullptr || !attr->get_run_as_detached()) { - this->handle_ = h; - } - - pthread_attr_destroy(&native_attr); - - func.release(); -} - -void Thread::apply_attr(Attribute const & attr) -{ - int r; - int policy = attr.get_sched_policy(); -#if __linux__ - if (policy != SCHED_FIFO && policy != SCHED_RR && policy != SCHED_OTHER) { - sched_param param; - param.sched_priority = attr.get_priority(); - r = pthread_setschedparam(pthread_self(), policy, ¶m); - detail::throw_if_error(r, "Error in pthread_setschedparam "); - } -#endif // #if __linux__ -} - -namespace detail -{ - -ThreadAttribute::ThreadAttribute() -{ - NativeAttributeType attr; - int r; - - r = pthread_attr_init(&attr); - throw_if_error(r, "Error in pthread_attr_init "); - - r = pthread_attr_getschedpolicy(&attr, &sched_policy_); - throw_if_error(r, "Error in pthread_attr_getschedpolicy "); - - r = pthread_attr_getstacksize(&attr, &stack_size_); - throw_if_error(r, "Error in pthread_attr_getstacksize "); - - sched_param param; - r = pthread_attr_getschedparam(&attr, ¶m); - throw_if_error(r, "Error in pthread_attr_getschedparam "); - priority_ = param.sched_priority; - - int flag; - r = pthread_attr_getdetachstate(&attr, &flag); - throw_if_error(r, "Error in pthread_attr_getdetachstate "); - detached_flag_ = (flag == PTHREAD_CREATE_DETACHED); - - pthread_attr_destroy(&attr); -} - - -void apply_attr_to_current_thread(ThreadAttribute const & attr) -{ - int r; - -#if __linux__ - CpuSet cpu_set = attr.get_affinity(); - CpuSet::NativeCpuSetType * native_cpu_set = cpu_set.native_cpu_set(); - if (native_cpu_set) { - std::size_t alloc_size = cpu_set.alloc_size(); - r = pthread_setaffinity_np(pthread_self(), alloc_size, native_cpu_set); - throw_if_error(r, "Error in sched_setaffinity "); - } -#endif // #if __linux__ - - sched_param param; - param.sched_priority = attr.get_priority(); - int policy = attr.get_sched_policy(); - r = pthread_setschedparam(pthread_self(), policy, ¶m); - throw_if_error(r, "Error in sched_setscheduler"); -} - -} // namespace detail - -} // namespace rclcpp - -static void * thread_main(void * p) -{ - using rclcpp::detail::ThreadFuncBase; - ThreadFuncBase::UniquePtr func(reinterpret_cast(p)); - - try { - func->run(); - } catch (...) { - std::cerr << "failed to run thread" << std::endl; - std::terminate(); - } - - return nullptr; -} - -static void set_pthread_attr(pthread_attr_t & native_attr, rclcpp::Thread::Attribute const & attr) -{ - int r; - using rclcpp::detail::throw_if_error; - -#if defined(__linux__) - rclcpp::detail::CpuSet affinity = attr.get_affinity(); - size_t cpu_size = CPU_ALLOC_SIZE(static_cast(sysconf(_SC_NPROCESSORS_ONLN))); - r = pthread_attr_setaffinity_np(&native_attr, cpu_size, affinity.native_cpu_set()); - throw_if_error(r, "Error in pthread_attr_setaffinity_np "); -#endif - - std::size_t stack_size = attr.get_stack_size(); - r = pthread_attr_setstacksize(&native_attr, stack_size); - throw_if_error(r, "Error in pthread_attr_setstacksize "); - - int flag = attr.get_run_as_detached() ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE; - r = pthread_attr_setdetachstate(&native_attr, flag); - throw_if_error(r, "Error in pthread_attr_setdetachstate "); - - int sched_policy = attr.get_sched_policy(); - if (sched_policy == SCHED_FIFO || sched_policy == SCHED_RR) { - r = pthread_attr_setinheritsched(&native_attr, PTHREAD_EXPLICIT_SCHED); - throw_if_error(r, "Error in pthread_attr_setinheritsched "); - - r = pthread_attr_setschedpolicy(&native_attr, sched_policy); - throw_if_error(r, "Error in pthread_attr_setschedpolicy "); - - sched_param param; - param.sched_priority = attr.get_priority(); - r = pthread_attr_setschedparam(&native_attr, ¶m); - throw_if_error(r, "Error in pthread_attr_setschedparam "); - } -} diff --git a/rclcpp/src/rclcpp/threads/windows_thread.cpp b/rclcpp/src/rclcpp/threads/windows_thread.cpp deleted file mode 100644 index 2f6ca50ec7..0000000000 --- a/rclcpp/src/rclcpp/threads/windows_thread.cpp +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2023 eSOL Co.,Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "rclcpp/threads/windows/thread.hpp" - -// Not implemented so far. -// The windows specific code will be implemented -// while discussing the scheduling parameter passing feature at Real-time WG. From 0e9616afc5c3580c79fe80c7de8450b6b9198209 Mon Sep 17 00:00:00 2001 From: Shoji Morita Date: Thu, 10 Aug 2023 09:54:04 +0900 Subject: [PATCH 3/5] Updated the rcl interface in accordance with the specifications outlined in the DRAFT REP shared below. https://discourse.ros.org/t/adding-thread-attributes-configuration-in-ros-2-framework/30701/5 Signed-off-by: Shoji Morita --- .../rclcpp/executors/multi_threaded_executor.cpp | 12 ++++-------- .../rclcpp/executors/single_threaded_executor.cpp | 14 +++++--------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index c54f0c33e1..079f5ddb5b 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -38,19 +38,15 @@ MultiThreadedExecutor::MultiThreadedExecutor( thread_attributes_(nullptr) { bool has_number_of_threads_arg = number_of_threads > 0; - rcl_ret_t ret; number_of_threads_ = number_of_threads > 0 ? number_of_threads : std::max(rcpputils::Thread::hardware_concurrency(), 2U); - ret = rcl_arguments_get_thread_attrs( - &options.context->get_rcl_context()->global_arguments, - &thread_attributes_); - if (ret != RCL_RET_OK) { - ret = rcl_context_get_thread_attrs( - options.context->get_rcl_context().get(), - &thread_attributes_); + if (rcutils_thread_attrs_t * attrs = rcl_context_get_thread_attrs( + options.context->get_rcl_context().get())) + { + thread_attributes_ = attrs; } if (has_number_of_threads_arg && thread_attributes_ && diff --git a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp index cb471992b4..c4de845c51 100644 --- a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp @@ -24,16 +24,12 @@ SingleThreadedExecutor::SingleThreadedExecutor(const rclcpp::ExecutorOptions & o : rclcpp::Executor(options), thread_attributes_(nullptr) { - rcl_ret_t ret; - - ret = rcl_arguments_get_thread_attrs( - &options.context->get_rcl_context()->global_arguments, - &thread_attributes_); - if (ret != RCL_RET_OK) { - ret = rcl_context_get_thread_attrs( - options.context->get_rcl_context().get(), - &thread_attributes_); + if (rcutils_thread_attrs_t * attrs = rcl_context_get_thread_attrs( + options.context->get_rcl_context().get())) + { + thread_attributes_ = attrs; } + if (thread_attributes_ && thread_attributes_->num_attributes != 1) { RCLCPP_WARN( rclcpp::get_logger("rclcpp"), From 35af6fa9ff56cbd14b49650839b784e6d7db2cd3 Mon Sep 17 00:00:00 2001 From: Shoji Morita Date: Thu, 14 Dec 2023 14:55:52 +0900 Subject: [PATCH 4/5] Modified the conventional executors (single/multithreaded) to use the infrastructure proposed in the REP-2017 below. https://github.com/ros-infrastructure/rep/pull/385 Signed-off-by: Shoji Morita --- rclcpp/include/rclcpp/executor_options.hpp | 3 + .../executors/multi_threaded_executor.hpp | 38 ++++- .../executors/single_threaded_executor.hpp | 27 +++- .../executors/multi_threaded_executor.cpp | 122 +++++++++++----- .../executors/single_threaded_executor.cpp | 78 +++++++--- .../test_multi_threaded_executor.cpp | 133 +++++++++++++++++- 6 files changed, 335 insertions(+), 66 deletions(-) diff --git a/rclcpp/include/rclcpp/executor_options.hpp b/rclcpp/include/rclcpp/executor_options.hpp index 066f29eb4e..54948d618d 100644 --- a/rclcpp/include/rclcpp/executor_options.hpp +++ b/rclcpp/include/rclcpp/executor_options.hpp @@ -15,6 +15,8 @@ #ifndef RCLCPP__EXECUTOR_OPTIONS_HPP_ #define RCLCPP__EXECUTOR_OPTIONS_HPP_ +#include + #include "rclcpp/context.hpp" #include "rclcpp/contexts/default_context.hpp" #include "rclcpp/memory_strategies.hpp" @@ -36,6 +38,7 @@ struct ExecutorOptions rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy; rclcpp::Context::SharedPtr context; size_t max_conditions; + std::string name; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 9ec7ace383..88dd303b8d 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -21,12 +21,13 @@ #include #include #include +#include -#include "rcutils/thread_attr.h" #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" #include "rclcpp/visibility_control.hpp" +#include "rcpputils/thread/thread_attribute.hpp" namespace rclcpp { @@ -59,6 +60,21 @@ class MultiThreadedExecutor : public rclcpp::Executor bool yield_before_execute = false, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + RCLCPP_PUBLIC + explicit MultiThreadedExecutor( + size_t number_of_threads, + const rcpputils::ThreadAttribute & thread_attr, + bool yield_before_execute = false, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + + RCLCPP_PUBLIC + explicit MultiThreadedExecutor( + const rclcpp::ExecutorOptions & options, + size_t number_of_threads, + const rcpputils::ThreadAttribute & thread_attr, + bool yield_before_execute = false, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + RCLCPP_PUBLIC virtual ~MultiThreadedExecutor(); @@ -74,6 +90,16 @@ class MultiThreadedExecutor : public rclcpp::Executor size_t get_number_of_threads(); + RCLCPP_PUBLIC + const std::optional & + get_thread_attribute() const + { + return thread_attr_; + } + + RCLCPP_PUBLIC + static const char default_name[]; + protected: RCLCPP_PUBLIC void @@ -82,11 +108,19 @@ class MultiThreadedExecutor : public rclcpp::Executor private: RCLCPP_DISABLE_COPY(MultiThreadedExecutor) + RCLCPP_PUBLIC + explicit MultiThreadedExecutor( + const rclcpp::ExecutorOptions & options, + size_t number_of_threads, + std::optional thread_attr, + bool yield_before_execute, + std::chrono::nanoseconds timeout); + std::mutex wait_mutex_; size_t number_of_threads_; + std::optional thread_attr_; bool yield_before_execute_; std::chrono::nanoseconds next_exec_timeout_; - rcutils_thread_attrs_t * thread_attributes_; }; } // namespace executors diff --git a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp index 95c2818a61..819587052f 100644 --- a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp @@ -21,8 +21,8 @@ #include #include #include +#include -#include "rcutils/thread_attr.h" #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" @@ -30,6 +30,7 @@ #include "rclcpp/utilities.hpp" #include "rclcpp/rate.hpp" #include "rclcpp/visibility_control.hpp" +#include "rcpputils/thread/thread_attribute.hpp" namespace rclcpp { @@ -50,6 +51,11 @@ class SingleThreadedExecutor : public rclcpp::Executor explicit SingleThreadedExecutor( const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions()); + RCLCPP_PUBLIC + explicit SingleThreadedExecutor( + const rclcpp::ExecutorOptions & options, + const rcpputils::ThreadAttribute & thread_attr); + /// Default destructor. RCLCPP_PUBLIC virtual ~SingleThreadedExecutor(); @@ -66,6 +72,22 @@ class SingleThreadedExecutor : public rclcpp::Executor void spin() override; + RCLCPP_PUBLIC + bool has_thread_attribute() const + { + return thread_attr_.has_value(); + } + + RCLCPP_PUBLIC + const std::optional & + get_thread_attribute() const + { + return thread_attr_; + } + + RCLCPP_PUBLIC + static const char default_name[]; + protected: RCLCPP_PUBLIC void @@ -73,7 +95,8 @@ class SingleThreadedExecutor : public rclcpp::Executor private: RCLCPP_DISABLE_COPY(SingleThreadedExecutor) - rcutils_thread_attrs_t * thread_attributes_; + + std::optional thread_attr_; }; } // namespace executors diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 079f5ddb5b..8e61746f93 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -14,50 +14,70 @@ #include "rclcpp/executors/multi_threaded_executor.hpp" +#include #include #include #include +#include #include #include "rcpputils/scope_exit.hpp" -#include "rcpputils/threads.hpp" +#include "rcpputils/thread.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/utilities.hpp" using rclcpp::executors::MultiThreadedExecutor; +const char MultiThreadedExecutor::default_name[] = "RCLCPP_EXECUTOR_MULTI_THREADED"; + +static std::optional +default_thread_attr(const rclcpp::ExecutorOptions & options); + MultiThreadedExecutor::MultiThreadedExecutor( const rclcpp::ExecutorOptions & options, size_t number_of_threads, bool yield_before_execute, std::chrono::nanoseconds next_exec_timeout) +: MultiThreadedExecutor( + options, number_of_threads, default_thread_attr(options), + yield_before_execute, next_exec_timeout) {} + +MultiThreadedExecutor::MultiThreadedExecutor( + size_t number_of_threads, + rcpputils::ThreadAttribute const & thread_attr, + bool yield_before_execute, + std::chrono::nanoseconds next_exec_timeout) +: MultiThreadedExecutor( + rclcpp::ExecutorOptions(), number_of_threads, std::optional(thread_attr), + yield_before_execute, next_exec_timeout) {} + +MultiThreadedExecutor::MultiThreadedExecutor( + const rclcpp::ExecutorOptions & options, + size_t number_of_threads, + rcpputils::ThreadAttribute const & thread_attr, + bool yield_before_execute, + std::chrono::nanoseconds next_exec_timeout) +: MultiThreadedExecutor( + options, number_of_threads, std::optional(thread_attr), + yield_before_execute, next_exec_timeout) {} + +MultiThreadedExecutor::MultiThreadedExecutor( + const rclcpp::ExecutorOptions & options, + size_t number_of_threads, + std::optional thread_attr, + bool yield_before_execute, + std::chrono::nanoseconds next_exec_timeout) : rclcpp::Executor(options), + thread_attr_(std::move(thread_attr)), yield_before_execute_(yield_before_execute), - next_exec_timeout_(next_exec_timeout), - thread_attributes_(nullptr) + next_exec_timeout_(next_exec_timeout) { - bool has_number_of_threads_arg = number_of_threads > 0; - number_of_threads_ = number_of_threads > 0 ? number_of_threads : std::max(rcpputils::Thread::hardware_concurrency(), 2U); - if (rcutils_thread_attrs_t * attrs = rcl_context_get_thread_attrs( - options.context->get_rcl_context().get())) - { - thread_attributes_ = attrs; - } - - if (has_number_of_threads_arg && thread_attributes_ && - thread_attributes_->num_attributes != number_of_threads) - { - RCLCPP_WARN( - rclcpp::get_logger("rclcpp"), - "The number of threads argument passed to the MultiThreadedExecutor" - " is different from the number of thread attributes.\n" - "The executor runs using the thread attributes and ignores the former."); - } else if (number_of_threads_ == 1) { + if (number_of_threads_ == 1) { RCLCPP_WARN( rclcpp::get_logger("rclcpp"), "MultiThreadedExecutor is used with a single thread.\n" @@ -77,21 +97,12 @@ MultiThreadedExecutor::spin() std::vector threads; size_t thread_id = 0; - if (thread_attributes_) { - rcpputils::Thread::Attribute thread_attr; - { - std::lock_guard wait_lock{wait_mutex_}; - for (; thread_id < thread_attributes_->num_attributes - 1; ++thread_id) { - thread_attr.set_thread_attribute( - thread_attributes_->attributes[thread_id]); - auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); - threads.emplace_back(rcpputils::Thread(thread_attr, func)); - } + if (thread_attr_) { + std::lock_guard wait_lock{wait_mutex_}; + for (; thread_id < number_of_threads_; ++thread_id) { + auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); + threads.emplace_back(thread_attr_.value(), func); } - thread_attr.set_thread_attribute( - thread_attributes_->attributes[thread_id]); - rcpputils::this_thread::run_with_thread_attribute( - thread_attr, &MultiThreadedExecutor::run, this, thread_id); } else { { std::lock_guard wait_lock{wait_mutex_}; @@ -111,11 +122,7 @@ MultiThreadedExecutor::spin() size_t MultiThreadedExecutor::get_number_of_threads() { - if (thread_attributes_) { - return thread_attributes_->num_attributes; - } else { - return number_of_threads_; - } + return number_of_threads_; } void @@ -144,3 +151,40 @@ MultiThreadedExecutor::run(size_t this_thread_number) any_exec.callback_group.reset(); } } + +std::optional +default_thread_attr(rclcpp::ExecutorOptions const & options) +{ + const rcutils_thread_attrs_t * attrs = rcl_context_get_thread_attrs( + options.context->get_rcl_context().get()); + if (!attrs) { + return std::nullopt; + } + + std::string name; + bool name_specified = !options.name.empty(); + if (name_specified) { + name = options.name; + } else { + name = MultiThreadedExecutor::default_name; + } + + const rcutils_thread_attr_t * attrs_beg = attrs->attributes; + const rcutils_thread_attr_t * attrs_end = attrs->attributes + attrs->num_attributes; + const rcutils_thread_attr_t * attr = std::find_if( + attrs_beg, attrs_end, + [&](const auto & attr) { + return attr.name == name; + }); + if (attr != attrs_end) { + return rcpputils::ThreadAttribute(*attr); + } else { + if (name_specified) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "MultiThreadedExecutor is named \"%s\", but not found corresponding thread attribute.", + name.c_str()); + } + return std::nullopt; + } +} diff --git a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp index c4de845c51..8d3212efc9 100644 --- a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp @@ -12,43 +12,40 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include "rcpputils/scope_exit.hpp" -#include "rcpputils/threads.hpp" +#include "rcpputils/thread.hpp" #include "rclcpp/executors/single_threaded_executor.hpp" #include "rclcpp/any_executable.hpp" using rclcpp::executors::SingleThreadedExecutor; -SingleThreadedExecutor::SingleThreadedExecutor(const rclcpp::ExecutorOptions & options) +const char SingleThreadedExecutor::default_name[] = "RCLCPP_EXECUTOR_SINGLE_THREADED"; + +static std::optional +default_thread_attr(const rclcpp::ExecutorOptions & options); + +SingleThreadedExecutor::SingleThreadedExecutor( + const rclcpp::ExecutorOptions & options) : rclcpp::Executor(options), - thread_attributes_(nullptr) -{ - if (rcutils_thread_attrs_t * attrs = rcl_context_get_thread_attrs( - options.context->get_rcl_context().get())) - { - thread_attributes_ = attrs; - } + thread_attr_(default_thread_attr(options)) {} - if (thread_attributes_ && thread_attributes_->num_attributes != 1) { - RCLCPP_WARN( - rclcpp::get_logger("rclcpp"), - "Specified thread attributes contains multiple configurations.\n" - "The executor runs only using first configuration."); - } -} +SingleThreadedExecutor::SingleThreadedExecutor( + const rclcpp::ExecutorOptions & options, + const rcpputils::ThreadAttribute & thread_attr) +: rclcpp::Executor(options), + thread_attr_(thread_attr) {} SingleThreadedExecutor::~SingleThreadedExecutor() {} void SingleThreadedExecutor::spin() { - if (thread_attributes_) { - rcpputils::Thread::Attribute thread_attr; - thread_attr.set_thread_attribute( - thread_attributes_->attributes[0]); - rcpputils::this_thread::run_with_thread_attribute( - thread_attr, &SingleThreadedExecutor::run, this); + if (thread_attr_) { + rcpputils::Thread thread(thread_attr_.value(), &SingleThreadedExecutor::run, this); + thread.join(); } else { run(); } @@ -68,3 +65,40 @@ SingleThreadedExecutor::run() } } } + +std::optional +default_thread_attr(rclcpp::ExecutorOptions const & options) +{ + const rcutils_thread_attrs_t * attrs = rcl_context_get_thread_attrs( + options.context->get_rcl_context().get()); + if (!attrs) { + return std::nullopt; + } + + std::string name; + bool name_specified = !options.name.empty(); + if (name_specified) { + name = options.name; + } else { + name = SingleThreadedExecutor::default_name; + } + + const rcutils_thread_attr_t * attrs_beg = attrs->attributes; + const rcutils_thread_attr_t * attrs_end = attrs->attributes + attrs->num_attributes; + const rcutils_thread_attr_t * attr = std::find_if( + attrs_beg, attrs_end, + [&](const auto & attr) { + return attr.name == name; + }); + if (attr != attrs_end) { + return rcpputils::ThreadAttribute(*attr); + } else { + if (name_specified) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "SingleThreadedExecutor is named \"%s\", but not found corresponding thread attribute.", + name.c_str()); + } + return std::nullopt; + } +} diff --git a/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp b/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp index b6fd2c3fda..8b552a4c04 100644 --- a/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp +++ b/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp @@ -14,10 +14,16 @@ #include +#include + +#include #include +#include #include #include +#include +#include "rcpputils/thread.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/node.hpp" #include "rclcpp/rclcpp.hpp" @@ -28,10 +34,14 @@ using namespace std::chrono_literals; class TestMultiThreadedExecutor : public ::testing::Test { protected: - static void SetUpTestCase() + void SetUp() override { rclcpp::init(0, nullptr); } + void TearDown() override + { + rclcpp::shutdown(); + } }; constexpr std::chrono::milliseconds PERIOD_MS = 1000ms; @@ -97,3 +107,124 @@ TEST_F(TestMultiThreadedExecutor, timer_over_take) { executor.add_node(node); executor.spin(); } + +TEST_F(TestMultiThreadedExecutor, thread_attribute_apply) { + using rcpputils::Thread; + using rcpputils::ThreadAttribute; + using rcpputils::ThreadId; + using rcpputils::SchedPolicy; + + ThreadAttribute attr; + unsigned thread_count = 10; + +#if __linux__ + ThreadId parent_thread = rcpputils::this_thread::get_id(); + { + sched_param param = {0}; + int r = pthread_setschedparam(pthread_self(), SCHED_OTHER, ¶m); + EXPECT_EQ(0, r); + // SCHED_FIFO, SCHED_RR are require the privilege on linux + attr.set_sched_policy(SchedPolicy::batch); + } +#endif + + rclcpp::executors::MultiThreadedExecutor executor(rclcpp::ExecutorOptions(), thread_count, attr); + + std::atomic succ_count = 0; + auto node = std::make_shared("node", "ns"); + auto timer = node->create_wall_timer( + std::chrono::milliseconds(1), [&]() { +#if __linux__ + int policy; + sched_param param; + ThreadId worker_thread = rcpputils::this_thread::get_id(); + int r = pthread_getschedparam(pthread_self(), &policy, ¶m); + SchedPolicy rclcpp_policy = SchedPolicy(0x8000'0000 | policy); + EXPECT_EQ(0, r); + EXPECT_EQ(SchedPolicy::batch, rclcpp_policy); + EXPECT_NE(worker_thread, parent_thread); +#endif + unsigned n = succ_count.fetch_add(1); + if (n == thread_count - 1) { + executor.cancel(); + } + }); + + executor.add_node(node); + executor.spin(); + EXPECT_EQ(succ_count, thread_count); +} + +constexpr char const * argv[] = { + "test_multi_threaded_executor", + "--ros-args", + "--thread-attrs-value", + R"( +- name: RCLCPP_EXECUTOR_MULTI_THREADED + scheduling_policy: FIFO + priority: 10 + core_affinity: [] +- name: executor-1 + scheduling_policy: RR + priority: 20 + core_affinity: [0] + )", + NULL, +}; +constexpr int argc = sizeof(argv) / sizeof(*argv) - 1; + +class TestMultiThreadedExecutorAttribute : public ::testing::Test +{ +protected: + void SetUp() override + { + rclcpp::init(argc, argv); + } + void TearDown() override + { + rclcpp::shutdown(); + } +}; + +TEST_F(TestMultiThreadedExecutorAttribute, thread_attribute_default_name) { + using rcpputils::SchedPolicy; + rclcpp::executors::MultiThreadedExecutor executor; + + const auto & opt_attr = executor.get_thread_attribute(); + ASSERT_TRUE(opt_attr); + + const auto & attr = opt_attr.value(); + EXPECT_EQ(SchedPolicy::fifo, attr.get_sched_policy()); + EXPECT_EQ(10, attr.get_priority()); + + const auto & affinity = attr.get_affinity(); + EXPECT_EQ(0, affinity.count()); +} + +TEST_F(TestMultiThreadedExecutorAttribute, thread_attribute_specified_name) { + using rcpputils::SchedPolicy; + auto options = rclcpp::ExecutorOptions(); + options.name = "executor-1"; + rclcpp::executors::MultiThreadedExecutor executor(options); + + const auto & opt_attr = executor.get_thread_attribute(); + ASSERT_TRUE(opt_attr); + + const auto & attr = opt_attr.value(); + EXPECT_EQ(SchedPolicy::rr, attr.get_sched_policy()); + EXPECT_EQ(20, attr.get_priority()); + + const auto & affinity = attr.get_affinity(); + EXPECT_EQ(1, affinity.count()); + EXPECT_TRUE(affinity.is_set(0)); +} + +TEST_F(TestMultiThreadedExecutorAttribute, thread_attribute_not_found) { + using rcpputils::SchedPolicy; + auto options = rclcpp::ExecutorOptions(); + options.name = "executor-not-exists"; + rclcpp::executors::MultiThreadedExecutor executor(options); + + const auto & opt_attr = executor.get_thread_attribute(); + ASSERT_FALSE(opt_attr); +} From 9c3af41b1adbeae19d75e5c418122005145cfa7c Mon Sep 17 00:00:00 2001 From: Shoji Morita Date: Fri, 26 Jan 2024 18:52:10 +0900 Subject: [PATCH 5/5] Modified structure member names to reflect the point made on the thread below. https://github.com/ros-infrastructure/rep/pull/385#discussion_r1350512216 Signed-off-by: Shoji Morita --- rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp | 2 +- rclcpp/src/rclcpp/executors/single_threaded_executor.cpp | 2 +- rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 8e61746f93..83adfb6a12 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -174,7 +174,7 @@ default_thread_attr(rclcpp::ExecutorOptions const & options) const rcutils_thread_attr_t * attr = std::find_if( attrs_beg, attrs_end, [&](const auto & attr) { - return attr.name == name; + return attr.tag == name; }); if (attr != attrs_end) { return rcpputils::ThreadAttribute(*attr); diff --git a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp index 8d3212efc9..1a3157c58d 100644 --- a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp @@ -88,7 +88,7 @@ default_thread_attr(rclcpp::ExecutorOptions const & options) const rcutils_thread_attr_t * attr = std::find_if( attrs_beg, attrs_end, [&](const auto & attr) { - return attr.name == name; + return attr.tag == name; }); if (attr != attrs_end) { return rcpputils::ThreadAttribute(*attr); diff --git a/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp b/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp index 8b552a4c04..06f8347c4f 100644 --- a/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp +++ b/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp @@ -160,11 +160,11 @@ constexpr char const * argv[] = { "--ros-args", "--thread-attrs-value", R"( -- name: RCLCPP_EXECUTOR_MULTI_THREADED +- tag: RCLCPP_EXECUTOR_MULTI_THREADED scheduling_policy: FIFO priority: 10 core_affinity: [] -- name: executor-1 +- tag: executor-1 scheduling_policy: RR priority: 20 core_affinity: [0]