diff --git a/rclcpp/executors/executor_comparison/CMakeLists.txt b/rclcpp/executors/executor_comparison/CMakeLists.txt new file mode 100644 index 00000000..ec92a0f7 --- /dev/null +++ b/rclcpp/executors/executor_comparison/CMakeLists.txt @@ -0,0 +1,34 @@ +cmake_minimum_required(VERSION 3.5) +project(simple_thread_tester) + +find_package(ament_cmake REQUIRED) +find_package(rclcpp REQUIRED) +find_package(std_msgs REQUIRED) + +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_BUILD_TYPE Debug) + +add_executable(publisher_node publisher_node.cc) +ament_target_dependencies(publisher_node rclcpp std_msgs) +install(TARGETS publisher_node DESTINATION lib/simple_thread_tester) + +add_executable(single_thread_subscriber_node single_thread_subscriber_node.cc) +ament_target_dependencies(single_thread_subscriber_node rclcpp std_msgs) +install(TARGETS single_thread_subscriber_node + DESTINATION lib/simple_thread_tester) + +add_executable(multi_thread_reentrant_subscriber_node + multi_thread_reentrant_subscriber_node.cc) +ament_target_dependencies(multi_thread_reentrant_subscriber_node rclcpp + std_msgs) +install(TARGETS multi_thread_reentrant_subscriber_node + DESTINATION lib/simple_thread_tester) + +add_executable(multi_thread_mutually_exclusive_subscriber_node + multi_thread_mutually_exclusive_subscriber_node.cc) +ament_target_dependencies(multi_thread_mutually_exclusive_subscriber_node + rclcpp std_msgs) +install(TARGETS multi_thread_mutually_exclusive_subscriber_node + DESTINATION lib/simple_thread_tester) + +ament_package() diff --git a/rclcpp/executors/executor_comparison/README.md b/rclcpp/executors/executor_comparison/README.md new file mode 100644 index 00000000..c9663401 --- /dev/null +++ b/rclcpp/executors/executor_comparison/README.md @@ -0,0 +1,31 @@ +# Simple ros2 package to understand threads + +Visit [here](https://motion-boseong.vercel.app/threading-ros2) for explanations. + +## Running node + +### 1. Single-thread test +In terminal A (also for the two belows) + +``` +ros2 run simple_thread_tester publisher_node +``` + +In terminal B + +``` +ros2 run simple_thread_tester single_thread_subscriber_node +``` +### 2. Multi-thread reentrant callback group test + +``` +ros2 run simple_thread_tester multi_thread_reentrant_subscriber_node +``` + +### 3. Multi-thread mutually exclusive callback group test + +``` +ros2 run simple_thread_tester multi_thread_reentrant_subscriber_node +# If simulate mutex +ros2 param set /subscriber_node use_mutex true +``` diff --git a/rclcpp/executors/executor_comparison/multi_thread_mutually_exclusive_subscriber_node.cc b/rclcpp/executors/executor_comparison/multi_thread_mutually_exclusive_subscriber_node.cc new file mode 100644 index 00000000..b93b68a0 --- /dev/null +++ b/rclcpp/executors/executor_comparison/multi_thread_mutually_exclusive_subscriber_node.cc @@ -0,0 +1,138 @@ +#include +#include +#include + +#include +#include + +using namespace std::chrono_literals; + +class MultiThreadMutuallyExclusiveSubscriber : public rclcpp::Node { + +public: + MultiThreadMutuallyExclusiveSubscriber(rclcpp::NodeOptions node_options) + : Node("subscriber_node", + node_options.allow_undeclared_parameters(true)) { + rclcpp::SubscriptionOptions options; + options.callback_group = + create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); + + short_subscriber_ = create_subscription( + "/short_topic", rclcpp::QoS(10), + std::bind(&MultiThreadMutuallyExclusiveSubscriber::ShortTopicCallback, + this, std::placeholders::_1), + options); + + options.callback_group = + create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); + long_subscriber_ = create_subscription( + "/long_topic", rclcpp::QoS(10), + std::bind(&MultiThreadMutuallyExclusiveSubscriber::LongTopicCallback, + this, std::placeholders::_1), + options); + + timer_ = create_wall_timer( + 1s, std::bind(&MultiThreadMutuallyExclusiveSubscriber::TimerCallback, + this)); + + auto param_change_callback = + [this](std::vector parameters) { + for (auto parameter : parameters) { + if (parameter.get_name() == "use_mutex") { + use_mutex = parameter.as_bool(); + if (use_mutex) + RCLCPP_INFO(get_logger(), "will use mutex"); + else + RCLCPP_INFO(get_logger(), "will not mutex"); + } + } + + auto result = rcl_interfaces::msg::SetParametersResult(); + result.successful = true; + return result; + }; + + callback_handler = + this->add_on_set_parameters_callback(param_change_callback); + } + +private: + bool use_mutex{false}; + struct { + std::mutex short_topic_mutex; + std::mutex long_topic_mutex; + } mutex_list; + std::string processed_short_string_; + std::string processed_long_string_; + void ProcessString(const std::string &raw_string, std::string &output); + + rclcpp::Subscription::SharedPtr short_subscriber_; + rclcpp::Subscription::SharedPtr long_subscriber_; + rclcpp::TimerBase::SharedPtr timer_; + OnSetParametersCallbackHandle::SharedPtr callback_handler; + + void ShortTopicCallback(const std_msgs::msg::String::SharedPtr msg); + void LongTopicCallback(const std_msgs::msg::String::SharedPtr msg); + void TimerCallback(); +}; + +void MultiThreadMutuallyExclusiveSubscriber::ProcessString( + const std::string &raw_string, std::string &output) { + output = "** "; + + for (char c : raw_string) { + output.push_back(c); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + output += " **"; +}; + +void MultiThreadMutuallyExclusiveSubscriber::ShortTopicCallback( + const std_msgs::msg::String::SharedPtr msg) { + std::shared_ptr> lock; + if (use_mutex) + mutex_list.short_topic_mutex.lock(); + ProcessString(msg->data, processed_short_string_); + RCLCPP_INFO(get_logger(), "Setting processed: %s", + processed_short_string_.c_str()); + if (use_mutex) + mutex_list.short_topic_mutex.unlock(); +} + +void MultiThreadMutuallyExclusiveSubscriber::LongTopicCallback( + const std_msgs::msg::String::SharedPtr msg) { + + if (use_mutex) + mutex_list.long_topic_mutex.lock(); + ProcessString(msg->data, processed_long_string_); + RCLCPP_INFO(get_logger(), "Setting processed: %s", + processed_long_string_.c_str()); + if (use_mutex) + mutex_list.long_topic_mutex.unlock(); +} + +void MultiThreadMutuallyExclusiveSubscriber::TimerCallback() { + if (use_mutex) { + mutex_list.short_topic_mutex.lock(); + mutex_list.long_topic_mutex.lock(); + } + RCLCPP_INFO(get_logger(), + "Getting processed strings: \n [long] %s \n [short] %s", + processed_long_string_.c_str(), processed_short_string_.c_str()); + + if (use_mutex) { + mutex_list.short_topic_mutex.unlock(); + mutex_list.long_topic_mutex.unlock(); + } +} + +int main(int argc, char **argv) { + rclcpp::init(argc, argv); + rclcpp::NodeOptions option; + auto node = std::make_shared(option); + rclcpp::executors::MultiThreadedExecutor executor; + executor.add_node(node); + executor.spin(); + rclcpp::shutdown(); + return 0; +} diff --git a/rclcpp/executors/executor_comparison/multi_thread_reentrant_subscriber_node.cc b/rclcpp/executors/executor_comparison/multi_thread_reentrant_subscriber_node.cc new file mode 100644 index 00000000..2d62b925 --- /dev/null +++ b/rclcpp/executors/executor_comparison/multi_thread_reentrant_subscriber_node.cc @@ -0,0 +1,76 @@ +#include +#include +#include + +#include +#include + +using namespace std::chrono_literals; + +class MultiThreadReentrantSubscriber : public rclcpp::Node { + +public: + MultiThreadReentrantSubscriber() : Node("subscriber_node") { + rclcpp::SubscriptionOptions options; + options.callback_group = + create_callback_group(rclcpp::CallbackGroupType::Reentrant); + + short_subscriber_ = create_subscription( + "/short_topic", rclcpp::QoS(10), + std::bind(&MultiThreadReentrantSubscriber::ShortTopicCallback, this, + std::placeholders::_1), + options); + + long_subscriber_ = create_subscription( + "/long_topic", rclcpp::QoS(10), + std::bind(&MultiThreadReentrantSubscriber::LongTopicCallback, this, + std::placeholders::_1), + options); + } + +private: + std::string processed_short_string_; + std::string processed_long_string_; + std::string ProcessString(const std::string &raw_string); + + rclcpp::Subscription::SharedPtr short_subscriber_; + rclcpp::Subscription::SharedPtr long_subscriber_; + void ShortTopicCallback(const std_msgs::msg::String::SharedPtr msg); + void LongTopicCallback(const std_msgs::msg::String::SharedPtr msg); +}; + +std::string +MultiThreadReentrantSubscriber::ProcessString(const std::string &raw_string) { + std::thread::id this_id = std::this_thread::get_id(); + std::ostringstream oss; + oss << std::this_thread::get_id(); + + std::this_thread::sleep_for(std::chrono::seconds(2)); + return "** " + raw_string + " **" + " ( by " + oss.str() + ") "; +}; + +void MultiThreadReentrantSubscriber::ShortTopicCallback( + const std_msgs::msg::String::SharedPtr msg) { + auto processed_string = ProcessString(msg->data); + + RCLCPP_INFO(get_logger(), "Setting processed: %s", processed_string.c_str()); + processed_short_string_ = processed_string; +} + +void MultiThreadReentrantSubscriber::LongTopicCallback( + const std_msgs::msg::String::SharedPtr msg) { + auto processed_string = ProcessString(msg->data); + + RCLCPP_INFO(get_logger(), "Setting processed: %s", processed_string.c_str()); + processed_long_string_ = processed_string; +} + +int main(int argc, char **argv) { + rclcpp::init(argc, argv); + auto node = std::make_shared(); + rclcpp::executors::MultiThreadedExecutor executor; + executor.add_node(node); + executor.spin(); + rclcpp::shutdown(); + return 0; +} diff --git a/rclcpp/executors/executor_comparison/package.xml b/rclcpp/executors/executor_comparison/package.xml new file mode 100644 index 00000000..113288fb --- /dev/null +++ b/rclcpp/executors/executor_comparison/package.xml @@ -0,0 +1,19 @@ + + + + simple_thread_tester + 0.0.0 + TODO: Package description + jbs + TODO: License declaration + + ament_cmake + + + ament_lint_auto + ament_lint_common + + + ament_cmake + + \ No newline at end of file diff --git a/rclcpp/executors/executor_comparison/publisher_node.cc b/rclcpp/executors/executor_comparison/publisher_node.cc new file mode 100644 index 00000000..5644b590 --- /dev/null +++ b/rclcpp/executors/executor_comparison/publisher_node.cc @@ -0,0 +1,54 @@ +#include + +#include +#include + +using namespace std::chrono_literals; + +class PublisherNode : public rclcpp::Node { +public: + PublisherNode() : Node("publisher_node") { + short_publisher_ = create_publisher( + "/short_topic", rclcpp::SystemDefaultsQoS()); + long_publisher_ = create_publisher( + "/long_topic", rclcpp::SystemDefaultsQoS()); + + short_timer_ = + create_wall_timer(100ms, [this]() { PublishShortMessage(); }); + + long_timer_ = create_wall_timer(1s, [this]() { PublishLongMessage(); }); + } + +private: + size_t short_seq_{0}; + size_t long_seq_{0}; + + rclcpp::Publisher::SharedPtr short_publisher_; + rclcpp::Publisher::SharedPtr long_publisher_; + + rclcpp::TimerBase::SharedPtr short_timer_; + rclcpp::TimerBase::SharedPtr long_timer_; + + void PublishShortMessage() { + std_msgs::msg::String message; + message.data = "Short message (seq=" + std::to_string(short_seq_++) + ")"; + RCLCPP_INFO(get_logger(), message.data, " published."); + short_publisher_->publish(message); + } + + void PublishLongMessage() { + std_msgs::msg::String message; + message.data = "Long message with more characters (seq=" + + std::to_string(long_seq_++) + ")"; + RCLCPP_INFO(get_logger(), message.data, " published."); + long_publisher_->publish(message); + } +}; + +int main(int argc, char **argv) { + rclcpp::init(argc, argv); + auto node = std::make_shared(); + rclcpp::spin(node); + rclcpp::shutdown(); + return 0; +} diff --git a/rclcpp/executors/executor_comparison/single_thread_subscriber_node.cc b/rclcpp/executors/executor_comparison/single_thread_subscriber_node.cc new file mode 100644 index 00000000..6ddb0042 --- /dev/null +++ b/rclcpp/executors/executor_comparison/single_thread_subscriber_node.cc @@ -0,0 +1,63 @@ +#include +#include + +#include +#include + +using namespace std::chrono_literals; + +class SingleThreadSubscriber : public rclcpp::Node { + +public: + SingleThreadSubscriber() : Node("subscriber_node") { + short_subscriber_ = create_subscription( + "/short_topic", rclcpp::QoS(10), + std::bind(&SingleThreadSubscriber::ShortTopicCallback, this, + std::placeholders::_1)); + + long_subscriber_ = create_subscription( + "/long_topic", rclcpp::QoS(10), + std::bind(&SingleThreadSubscriber::LongTopicCallback, this, + std::placeholders::_1)); + } + +private: + std::string processed_short_string_; + std::string processed_long_string_; + std::string ProcessString(const std::string &raw_string); + + rclcpp::Subscription::SharedPtr short_subscriber_; + rclcpp::Subscription::SharedPtr long_subscriber_; + void ShortTopicCallback(const std_msgs::msg::String::SharedPtr msg); + void LongTopicCallback(const std_msgs::msg::String::SharedPtr msg); +}; + +std::string +SingleThreadSubscriber::ProcessString(const std::string &raw_string) { + std::this_thread::sleep_for(std::chrono::seconds(2)); + return "** " + raw_string + " **"; +}; + +void SingleThreadSubscriber::ShortTopicCallback( + const std_msgs::msg::String::SharedPtr msg) { + auto processed_string = ProcessString(msg->data); + + RCLCPP_INFO(get_logger(), "Setting processed: %s", processed_string.c_str()); + processed_short_string_ = processed_string; +} + +void SingleThreadSubscriber::LongTopicCallback( + const std_msgs::msg::String::SharedPtr msg) { + auto processed_string = ProcessString(msg->data); + + RCLCPP_INFO(get_logger(), "Setting processed: %s", processed_string.c_str()); + processed_long_string_ = processed_string; +} + +int main(int argc, char **argv) { + rclcpp::init(argc, argv); + auto node = std::make_shared(); + rclcpp::spin(node); + rclcpp::shutdown(); + return 0; +}