diff --git a/rclcpp/include/rclcpp/executor_options.hpp b/rclcpp/include/rclcpp/executor_options.hpp index 066f29eb4e..54948d618d 100644 --- a/rclcpp/include/rclcpp/executor_options.hpp +++ b/rclcpp/include/rclcpp/executor_options.hpp @@ -15,6 +15,8 @@ #ifndef RCLCPP__EXECUTOR_OPTIONS_HPP_ #define RCLCPP__EXECUTOR_OPTIONS_HPP_ +#include + #include "rclcpp/context.hpp" #include "rclcpp/contexts/default_context.hpp" #include "rclcpp/memory_strategies.hpp" @@ -36,6 +38,7 @@ struct ExecutorOptions rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy; rclcpp::Context::SharedPtr context; size_t max_conditions; + std::string name; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 119013ebfb..88dd303b8d 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -18,14 +18,16 @@ #include #include #include +#include #include -#include #include +#include #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" #include "rclcpp/visibility_control.hpp" +#include "rcpputils/thread/thread_attribute.hpp" namespace rclcpp { @@ -39,7 +41,7 @@ class MultiThreadedExecutor : public rclcpp::Executor /// Constructor for MultiThreadedExecutor. /** - * For the yield_before_execute option, when true std::this_thread::yield() + * For the yield_before_execute option, when true rcpputils::this_thread::yield() * will be called after acquiring work (as an AnyExecutable) and * releasing the spinning lock, but before executing the work. * This is useful for reproducing some bugs related to taking work more than @@ -48,7 +50,7 @@ class MultiThreadedExecutor : public rclcpp::Executor * \param options common options for all executors * \param number_of_threads number of threads to have in the thread pool, * the default 0 will use the number of cpu cores found (minimum of 2) - * \param yield_before_execute if true std::this_thread::yield() is called + * \param yield_before_execute if true rcpputils::this_thread::yield() is called * \param timeout maximum time to wait */ RCLCPP_PUBLIC @@ -58,6 +60,21 @@ class MultiThreadedExecutor : public rclcpp::Executor bool yield_before_execute = false, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + RCLCPP_PUBLIC + explicit MultiThreadedExecutor( + size_t number_of_threads, + const rcpputils::ThreadAttribute & thread_attr, + bool yield_before_execute = false, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + + RCLCPP_PUBLIC + explicit MultiThreadedExecutor( + const rclcpp::ExecutorOptions & options, + size_t number_of_threads, + const rcpputils::ThreadAttribute & thread_attr, + bool yield_before_execute = false, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + RCLCPP_PUBLIC virtual ~MultiThreadedExecutor(); @@ -73,6 +90,16 @@ class MultiThreadedExecutor : public rclcpp::Executor size_t get_number_of_threads(); + RCLCPP_PUBLIC + const std::optional & + get_thread_attribute() const + { + return thread_attr_; + } + + RCLCPP_PUBLIC + static const char default_name[]; + protected: RCLCPP_PUBLIC void @@ -81,8 +108,17 @@ class MultiThreadedExecutor : public rclcpp::Executor private: RCLCPP_DISABLE_COPY(MultiThreadedExecutor) + RCLCPP_PUBLIC + explicit MultiThreadedExecutor( + const rclcpp::ExecutorOptions & options, + size_t number_of_threads, + std::optional thread_attr, + bool yield_before_execute, + std::chrono::nanoseconds timeout); + std::mutex wait_mutex_; size_t number_of_threads_; + std::optional thread_attr_; bool yield_before_execute_; std::chrono::nanoseconds next_exec_timeout_; }; diff --git a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp index 9dc6dec57b..819587052f 100644 --- a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" @@ -29,6 +30,7 @@ #include "rclcpp/utilities.hpp" #include "rclcpp/rate.hpp" #include "rclcpp/visibility_control.hpp" +#include "rcpputils/thread/thread_attribute.hpp" namespace rclcpp { @@ -49,6 +51,11 @@ class SingleThreadedExecutor : public rclcpp::Executor explicit SingleThreadedExecutor( const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions()); + RCLCPP_PUBLIC + explicit SingleThreadedExecutor( + const rclcpp::ExecutorOptions & options, + const rcpputils::ThreadAttribute & thread_attr); + /// Default destructor. RCLCPP_PUBLIC virtual ~SingleThreadedExecutor(); @@ -65,8 +72,31 @@ class SingleThreadedExecutor : public rclcpp::Executor void spin() override; + RCLCPP_PUBLIC + bool has_thread_attribute() const + { + return thread_attr_.has_value(); + } + + RCLCPP_PUBLIC + const std::optional & + get_thread_attribute() const + { + return thread_attr_; + } + + RCLCPP_PUBLIC + static const char default_name[]; + +protected: + RCLCPP_PUBLIC + void + run(); + private: RCLCPP_DISABLE_COPY(SingleThreadedExecutor) + + std::optional thread_attr_; }; } // namespace executors diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 507d47f913..83adfb6a12 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -14,30 +14,68 @@ #include "rclcpp/executors/multi_threaded_executor.hpp" +#include #include #include #include +#include #include #include "rcpputils/scope_exit.hpp" +#include "rcpputils/thread.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/utilities.hpp" using rclcpp::executors::MultiThreadedExecutor; +const char MultiThreadedExecutor::default_name[] = "RCLCPP_EXECUTOR_MULTI_THREADED"; + +static std::optional +default_thread_attr(const rclcpp::ExecutorOptions & options); + MultiThreadedExecutor::MultiThreadedExecutor( const rclcpp::ExecutorOptions & options, size_t number_of_threads, bool yield_before_execute, std::chrono::nanoseconds next_exec_timeout) +: MultiThreadedExecutor( + options, number_of_threads, default_thread_attr(options), + yield_before_execute, next_exec_timeout) {} + +MultiThreadedExecutor::MultiThreadedExecutor( + size_t number_of_threads, + rcpputils::ThreadAttribute const & thread_attr, + bool yield_before_execute, + std::chrono::nanoseconds next_exec_timeout) +: MultiThreadedExecutor( + rclcpp::ExecutorOptions(), number_of_threads, std::optional(thread_attr), + yield_before_execute, next_exec_timeout) {} + +MultiThreadedExecutor::MultiThreadedExecutor( + const rclcpp::ExecutorOptions & options, + size_t number_of_threads, + rcpputils::ThreadAttribute const & thread_attr, + bool yield_before_execute, + std::chrono::nanoseconds next_exec_timeout) +: MultiThreadedExecutor( + options, number_of_threads, std::optional(thread_attr), + yield_before_execute, next_exec_timeout) {} + +MultiThreadedExecutor::MultiThreadedExecutor( + const rclcpp::ExecutorOptions & options, + size_t number_of_threads, + std::optional thread_attr, + bool yield_before_execute, + std::chrono::nanoseconds next_exec_timeout) : rclcpp::Executor(options), + thread_attr_(std::move(thread_attr)), yield_before_execute_(yield_before_execute), next_exec_timeout_(next_exec_timeout) { number_of_threads_ = number_of_threads > 0 ? number_of_threads : - std::max(std::thread::hardware_concurrency(), 2U); + std::max(rcpputils::Thread::hardware_concurrency(), 2U); if (number_of_threads_ == 1) { RCLCPP_WARN( @@ -56,17 +94,26 @@ MultiThreadedExecutor::spin() throw std::runtime_error("spin() called while already spinning"); } RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); - std::vector threads; + std::vector threads; size_t thread_id = 0; - { + + if (thread_attr_) { std::lock_guard wait_lock{wait_mutex_}; - for (; thread_id < number_of_threads_ - 1; ++thread_id) { + for (; thread_id < number_of_threads_; ++thread_id) { auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); - threads.emplace_back(func); + threads.emplace_back(thread_attr_.value(), func); + } + } else { + { + std::lock_guard wait_lock{wait_mutex_}; + for (; thread_id < number_of_threads_ - 1; ++thread_id) { + auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); + threads.emplace_back(func); + } } + run(thread_id); } - run(thread_id); for (auto & thread : threads) { thread.join(); } @@ -104,3 +151,40 @@ MultiThreadedExecutor::run(size_t this_thread_number) any_exec.callback_group.reset(); } } + +std::optional +default_thread_attr(rclcpp::ExecutorOptions const & options) +{ + const rcutils_thread_attrs_t * attrs = rcl_context_get_thread_attrs( + options.context->get_rcl_context().get()); + if (!attrs) { + return std::nullopt; + } + + std::string name; + bool name_specified = !options.name.empty(); + if (name_specified) { + name = options.name; + } else { + name = MultiThreadedExecutor::default_name; + } + + const rcutils_thread_attr_t * attrs_beg = attrs->attributes; + const rcutils_thread_attr_t * attrs_end = attrs->attributes + attrs->num_attributes; + const rcutils_thread_attr_t * attr = std::find_if( + attrs_beg, attrs_end, + [&](const auto & attr) { + return attr.tag == name; + }); + if (attr != attrs_end) { + return rcpputils::ThreadAttribute(*attr); + } else { + if (name_specified) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "MultiThreadedExecutor is named \"%s\", but not found corresponding thread attribute.", + name.c_str()); + } + return std::nullopt; + } +} diff --git a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp index e7f311c147..1a3157c58d 100644 --- a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp @@ -12,20 +12,47 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include "rcpputils/scope_exit.hpp" +#include "rcpputils/thread.hpp" #include "rclcpp/executors/single_threaded_executor.hpp" #include "rclcpp/any_executable.hpp" using rclcpp::executors::SingleThreadedExecutor; -SingleThreadedExecutor::SingleThreadedExecutor(const rclcpp::ExecutorOptions & options) -: rclcpp::Executor(options) {} +const char SingleThreadedExecutor::default_name[] = "RCLCPP_EXECUTOR_SINGLE_THREADED"; + +static std::optional +default_thread_attr(const rclcpp::ExecutorOptions & options); + +SingleThreadedExecutor::SingleThreadedExecutor( + const rclcpp::ExecutorOptions & options) +: rclcpp::Executor(options), + thread_attr_(default_thread_attr(options)) {} + +SingleThreadedExecutor::SingleThreadedExecutor( + const rclcpp::ExecutorOptions & options, + const rcpputils::ThreadAttribute & thread_attr) +: rclcpp::Executor(options), + thread_attr_(thread_attr) {} SingleThreadedExecutor::~SingleThreadedExecutor() {} void SingleThreadedExecutor::spin() +{ + if (thread_attr_) { + rcpputils::Thread thread(thread_attr_.value(), &SingleThreadedExecutor::run, this); + thread.join(); + } else { + run(); + } +} + +void +SingleThreadedExecutor::run() { if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); @@ -38,3 +65,40 @@ SingleThreadedExecutor::spin() } } } + +std::optional +default_thread_attr(rclcpp::ExecutorOptions const & options) +{ + const rcutils_thread_attrs_t * attrs = rcl_context_get_thread_attrs( + options.context->get_rcl_context().get()); + if (!attrs) { + return std::nullopt; + } + + std::string name; + bool name_specified = !options.name.empty(); + if (name_specified) { + name = options.name; + } else { + name = SingleThreadedExecutor::default_name; + } + + const rcutils_thread_attr_t * attrs_beg = attrs->attributes; + const rcutils_thread_attr_t * attrs_end = attrs->attributes + attrs->num_attributes; + const rcutils_thread_attr_t * attr = std::find_if( + attrs_beg, attrs_end, + [&](const auto & attr) { + return attr.tag == name; + }); + if (attr != attrs_end) { + return rcpputils::ThreadAttribute(*attr); + } else { + if (name_specified) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "SingleThreadedExecutor is named \"%s\", but not found corresponding thread attribute.", + name.c_str()); + } + return std::nullopt; + } +} diff --git a/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp b/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp index b6fd2c3fda..06f8347c4f 100644 --- a/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp +++ b/rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp @@ -14,10 +14,16 @@ #include +#include + +#include #include +#include #include #include +#include +#include "rcpputils/thread.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/node.hpp" #include "rclcpp/rclcpp.hpp" @@ -28,10 +34,14 @@ using namespace std::chrono_literals; class TestMultiThreadedExecutor : public ::testing::Test { protected: - static void SetUpTestCase() + void SetUp() override { rclcpp::init(0, nullptr); } + void TearDown() override + { + rclcpp::shutdown(); + } }; constexpr std::chrono::milliseconds PERIOD_MS = 1000ms; @@ -97,3 +107,124 @@ TEST_F(TestMultiThreadedExecutor, timer_over_take) { executor.add_node(node); executor.spin(); } + +TEST_F(TestMultiThreadedExecutor, thread_attribute_apply) { + using rcpputils::Thread; + using rcpputils::ThreadAttribute; + using rcpputils::ThreadId; + using rcpputils::SchedPolicy; + + ThreadAttribute attr; + unsigned thread_count = 10; + +#if __linux__ + ThreadId parent_thread = rcpputils::this_thread::get_id(); + { + sched_param param = {0}; + int r = pthread_setschedparam(pthread_self(), SCHED_OTHER, ¶m); + EXPECT_EQ(0, r); + // SCHED_FIFO, SCHED_RR are require the privilege on linux + attr.set_sched_policy(SchedPolicy::batch); + } +#endif + + rclcpp::executors::MultiThreadedExecutor executor(rclcpp::ExecutorOptions(), thread_count, attr); + + std::atomic succ_count = 0; + auto node = std::make_shared("node", "ns"); + auto timer = node->create_wall_timer( + std::chrono::milliseconds(1), [&]() { +#if __linux__ + int policy; + sched_param param; + ThreadId worker_thread = rcpputils::this_thread::get_id(); + int r = pthread_getschedparam(pthread_self(), &policy, ¶m); + SchedPolicy rclcpp_policy = SchedPolicy(0x8000'0000 | policy); + EXPECT_EQ(0, r); + EXPECT_EQ(SchedPolicy::batch, rclcpp_policy); + EXPECT_NE(worker_thread, parent_thread); +#endif + unsigned n = succ_count.fetch_add(1); + if (n == thread_count - 1) { + executor.cancel(); + } + }); + + executor.add_node(node); + executor.spin(); + EXPECT_EQ(succ_count, thread_count); +} + +constexpr char const * argv[] = { + "test_multi_threaded_executor", + "--ros-args", + "--thread-attrs-value", + R"( +- tag: RCLCPP_EXECUTOR_MULTI_THREADED + scheduling_policy: FIFO + priority: 10 + core_affinity: [] +- tag: executor-1 + scheduling_policy: RR + priority: 20 + core_affinity: [0] + )", + NULL, +}; +constexpr int argc = sizeof(argv) / sizeof(*argv) - 1; + +class TestMultiThreadedExecutorAttribute : public ::testing::Test +{ +protected: + void SetUp() override + { + rclcpp::init(argc, argv); + } + void TearDown() override + { + rclcpp::shutdown(); + } +}; + +TEST_F(TestMultiThreadedExecutorAttribute, thread_attribute_default_name) { + using rcpputils::SchedPolicy; + rclcpp::executors::MultiThreadedExecutor executor; + + const auto & opt_attr = executor.get_thread_attribute(); + ASSERT_TRUE(opt_attr); + + const auto & attr = opt_attr.value(); + EXPECT_EQ(SchedPolicy::fifo, attr.get_sched_policy()); + EXPECT_EQ(10, attr.get_priority()); + + const auto & affinity = attr.get_affinity(); + EXPECT_EQ(0, affinity.count()); +} + +TEST_F(TestMultiThreadedExecutorAttribute, thread_attribute_specified_name) { + using rcpputils::SchedPolicy; + auto options = rclcpp::ExecutorOptions(); + options.name = "executor-1"; + rclcpp::executors::MultiThreadedExecutor executor(options); + + const auto & opt_attr = executor.get_thread_attribute(); + ASSERT_TRUE(opt_attr); + + const auto & attr = opt_attr.value(); + EXPECT_EQ(SchedPolicy::rr, attr.get_sched_policy()); + EXPECT_EQ(20, attr.get_priority()); + + const auto & affinity = attr.get_affinity(); + EXPECT_EQ(1, affinity.count()); + EXPECT_TRUE(affinity.is_set(0)); +} + +TEST_F(TestMultiThreadedExecutorAttribute, thread_attribute_not_found) { + using rcpputils::SchedPolicy; + auto options = rclcpp::ExecutorOptions(); + options.name = "executor-not-exists"; + rclcpp::executors::MultiThreadedExecutor executor(options); + + const auto & opt_attr = executor.get_thread_attribute(); + ASSERT_FALSE(opt_attr); +}