From 747a0191c24444f8fbeebac18f2ae940abc69b71 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Tue, 18 Aug 2015 18:41:40 -0700 Subject: [PATCH 1/6] added the mapped ring buffer class for intra-pc --- rclcpp/CMakeLists.txt | 9 + rclcpp/include/rclcpp/mapped_ring_buffer.hpp | 205 +++++++++++++++++++ rclcpp/package.xml | 1 + rclcpp/test/test_mapped_ring_buffer.cpp | 154 ++++++++++++++ 4 files changed, 369 insertions(+) create mode 100644 rclcpp/include/rclcpp/mapped_ring_buffer.hpp create mode 100644 rclcpp/test/test_mapped_ring_buffer.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 0926518cfe..d4e3ad4b5c 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -3,6 +3,7 @@ cmake_minimum_required(VERSION 2.8.3) project(rclcpp) find_package(ament_cmake REQUIRED) +find_package(rcl_interfaces REQUIRED) ament_export_dependencies(rmw) ament_export_dependencies(rcl_interfaces) @@ -12,6 +13,14 @@ ament_export_include_directories(include) if(AMENT_ENABLE_TESTING) find_package(ament_lint_auto REQUIRED) ament_lint_auto_find_test_dependencies() + + if(NOT WIN32) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall -Wextra") + endif() + + include_directories(include) + + ament_add_gtest(test_mapped_ring_buffer test/test_mapped_ring_buffer.cpp) endif() ament_package( diff --git a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp new file mode 100644 index 0000000000..d4fc0f7288 --- /dev/null +++ b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp @@ -0,0 +1,205 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_RING_BUFFER_HPP_ +#define RCLCPP_RCLCPP_RING_BUFFER_HPP_ + +#include + +#include +#include +#include +#include +#include + +namespace rclcpp +{ +namespace mapped_ring_buffer +{ + +class MappedRingBufferBase +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBufferBase); +}; + +/// Ring buffer container of unique_ptr's of T, which can be accessed by a key. +/* T must be a CopyConstructable and CopyAssignable. + * This class can be used in a container by using the base class MappedRingBufferBase. + * This class must have a positive, non-zero size. + * This class cannot be resized nor can it reserve additional space after construction. + * This class is not CopyConstructable nor CopyAssignable. + * + * The key's are not guaranteed to be unique because push_and_replace does not + * check for colliding keys. + * It is up to the user to only use unique keys. + * A side effect of this is that when get_copy_at_key or pop_at_key are called, + * they return the first encountered instance of the key. + * But iteration does not begin with the ring buffer's head, and therefore + * there is no guarantee on which value is returned if a key is used multiple + * times. + */ +template +class MappedRingBuffer : public MappedRingBufferBase +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBuffer); + + /// Constructor. + /* The constructor will allocate memory while reserving space. + * + * /param size size of the ring buffer; must be positive and non-zero. + */ + MappedRingBuffer(size_t size) : elements_(size), head_(0) + { + if (size == 0) { + throw std::invalid_argument("size must be a positive, non-zero value"); + } + } + virtual ~MappedRingBuffer() {} + + /// Return a copy of the value stored in the ring buffer at the given key. + /* The key is matched if an element in the ring buffer has a matching key. + * This method will allocate in order to return a copy. + * + * The key is not guaranteed to be unique, see the class docs for more. + * + * /param key the key associated with the stored value + * /param value if the key is found, the value is stored in this parameter + */ + void + get_copy_at_key(uint64_t key, std::unique_ptr & value) + { + auto it = get_iterator_of_key(key); + value = nullptr; + if (it != elements_.end() && it->in_use) { + value = std::unique_ptr(new T(*it->value)); + } + } + + /// Return ownership of the value stored in the ring buffer, leaving a copy. + /* The key is matched if an element in the ring bufer has a matching key. + * This method will allocate in order to store a copy. + * + * The key is not guaranteed to be unique, see the class docs for more. + * + * The ownership of the currently stored object is returned, but a copy is + * made and stored in its place. + * This means that multiple calls to this function for a particular element + * will result in returning the copied and stored object not the original. + * This also means that later calls to pop_at_key will not return the + * originally stored object, since it was returned by the first call to this + * method. + * + * /param key the key associated with the stored value + * /param value if the key is found, the value is stored in this parameter + */ + void + get_ownership_at_key(uint64_t key, std::unique_ptr & value) + { + auto it = get_iterator_of_key(key); + value = nullptr; + if (it != elements_.end() && it->in_use) { + // Make a copy. + auto copy = std::unique_ptr(new T(*it->value)); + // Return the original. + value.swap(it->value); + // Store the copy. + it->value.swap(copy); + } + } + + /// Return ownership of the value stored in the ring buffer at the given key. + /* The key is matched if an element in the ring buffer has a matching key. + * + * The key is not guaranteed to be unique, see the class docs for more. + * + * /param key the key associated with the stored value + * /param value if the key is found, the value is stored in this parameter + */ + void + pop_at_key(uint64_t key, std::unique_ptr & value) + { + auto it = get_iterator_of_key(key); + value = nullptr; + if (it != elements_.end() && it->in_use) { + value.swap(it->value); + it->in_use = false; + } + } + + /// Insert a key-value pair, displacing an existing pair if necessary. + /* The key's uniqueness is not checked on insertion. + * It is up to the user to ensure the key is unique. + * This method should not allocate memory. + * + * After insertion, if a pair was replaced, then value will contain ownership + * of that displaced value. Otherwise it will be a nullptr. + * + * /param key the key associated with the value to be stored + * /param value the value to store, and optionally the value displaced + */ + bool + push_and_replace(uint64_t key, std::unique_ptr & value) + { + bool did_replace = elements_[head_].in_use; + elements_[head_].key = key; + elements_[head_].value.swap(value); + elements_[head_].in_use = true; + head_ = (head_ + 1) % elements_.size(); + return did_replace; + } + + bool + push_and_replace(uint64_t key, std::unique_ptr && value) + { + std::unique_ptr temp = std::move(value); + return push_and_replace(key, temp); + } + + /// Return true if the key is found in the ring buffer, otherwise false. + bool + has_key(uint64_t key) + { + return elements_.end() != get_iterator_of_key(key); + } + +private: + RCLCPP_DISABLE_COPY(MappedRingBuffer); + + struct element + { + uint64_t key; + std::unique_ptr value; + bool in_use; + }; + + typename std::vector::iterator + get_iterator_of_key(uint64_t key) + { + auto it = std::find_if(elements_.begin(), elements_.end(), [key] (element & e) -> bool { + return e.key == key && e.in_use; + }); + return it; + } + + std::vector elements_; + size_t head_; + +}; + +} /* namespace ring_buffer */ +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_RING_BUFFER_HPP_ */ diff --git a/rclcpp/package.xml b/rclcpp/package.xml index 6962b8c658..50fb4120a0 100644 --- a/rclcpp/package.xml +++ b/rclcpp/package.xml @@ -13,6 +13,7 @@ rcl_interfaces rcl_interfaces + ament_cmake_gtest ament_lint_auto ament_lint_common diff --git a/rclcpp/test/test_mapped_ring_buffer.cpp b/rclcpp/test/test_mapped_ring_buffer.cpp new file mode 100644 index 0000000000..4e40882507 --- /dev/null +++ b/rclcpp/test/test_mapped_ring_buffer.cpp @@ -0,0 +1,154 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +/* +Tests get_copy and pop on an empty mrb. +*/ +TEST(test_mapped_ring_buffer, empty) { + // Cannot create a buffer of size zero. + EXPECT_THROW(rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(0), std::invalid_argument); + // Getting or popping an empty buffer should result in a nullptr. + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(1); + + std::unique_ptr actual; + mrb.get_copy_at_key(1, actual); + EXPECT_EQ(nullptr, actual); + + mrb.pop_at_key(1, actual); + EXPECT_EQ(nullptr, actual); +} + +/* +Tests push_and_replace with a temporary object. +*/ +TEST(test_mapped_ring_buffer, temporary_l_value) { + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + // Pass in value with temporary object + mrb.push_and_replace(1, std::unique_ptr(new char('a'))); + + std::unique_ptr actual; + mrb.get_copy_at_key(1, actual); + EXPECT_EQ('a', *actual); + + mrb.pop_at_key(1, actual); + EXPECT_EQ('a', *actual); + + mrb.get_copy_at_key(1, actual); + EXPECT_EQ(nullptr, actual); +} + +/* +Tests normal usage of the mrb. +*/ +TEST(test_mapped_ring_buffer, nominal) { + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + std::unique_ptr expected(new char('a')); + // Store expected value's address for later comparison. + char * expected_orig = expected.get(); + + EXPECT_FALSE(mrb.push_and_replace(1, expected)); + + std::unique_ptr actual; + mrb.get_copy_at_key(1, actual); + EXPECT_NE(nullptr, actual); + if (actual) EXPECT_EQ('a', *actual); + EXPECT_NE(expected_orig, actual.get()); + + mrb.pop_at_key(1, actual); + EXPECT_NE(nullptr, actual); + if (actual) EXPECT_EQ('a', *actual); + EXPECT_EQ(expected_orig, actual.get()); + + mrb.get_copy_at_key(1, actual); + EXPECT_EQ(nullptr, actual); + + expected.reset(new char('a')); + EXPECT_FALSE(mrb.push_and_replace(1, expected)); + + expected.reset(new char('b')); + EXPECT_FALSE(mrb.push_and_replace(2, expected)); + + expected.reset(new char('c')); + EXPECT_TRUE(mrb.push_and_replace(3, expected)); + + mrb.get_copy_at_key(1, actual); + EXPECT_EQ(nullptr, actual); + + mrb.get_copy_at_key(2, actual); + EXPECT_NE(nullptr, actual); + if (actual) EXPECT_EQ('b', *actual); + + mrb.get_copy_at_key(3, actual); + EXPECT_NE(nullptr, actual); + if (actual) EXPECT_EQ('c', *actual); +} + +/* +Tests get_ownership on a normal mrb. +*/ +TEST(test_mapped_ring_buffer, get_ownership) { + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + std::unique_ptr expected(new char('a')); + // Store expected value's address for later comparison. + char * expected_orig = expected.get(); + + EXPECT_FALSE(mrb.push_and_replace(1, expected)); + + std::unique_ptr actual; + mrb.get_copy_at_key(1, actual); + EXPECT_NE(nullptr, actual); + if (actual) EXPECT_EQ('a', *actual); + EXPECT_NE(expected_orig, actual.get()); + + mrb.get_ownership_at_key(1, actual); + EXPECT_NE(nullptr, actual); + if (actual) EXPECT_EQ('a', *actual); + EXPECT_EQ(expected_orig, actual.get()); + + mrb.pop_at_key(1, actual); + EXPECT_NE(nullptr, actual); + if (actual) EXPECT_EQ('a', *actual); // The value should be the same. + EXPECT_NE(expected_orig, actual.get()); // Even though we pop'ed, we didn't get the original. + + mrb.get_copy_at_key(1, actual); + EXPECT_EQ(nullptr, actual); +} + +/* +Tests the affect of reusing keys (non-unique keys) in a mrb. +*/ +TEST(test_mapped_ring_buffer, non_unique_keys) { + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + + std::unique_ptr input(new char('a')); + mrb.push_and_replace(1, input); + input.reset(new char('b')); + + // Different value, same key. + mrb.push_and_replace(1, input); + + std::unique_ptr actual; + mrb.pop_at_key(1, actual); + EXPECT_NE(nullptr, actual); + if (actual) EXPECT_EQ('a', *actual); + + actual = nullptr; + mrb.pop_at_key(1, actual); + EXPECT_NE(nullptr, actual); + if (actual) EXPECT_EQ('b', *actual); +} From 94bf5ffb8ce69aefcddddddbda83af491c5e5938 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Tue, 18 Aug 2015 18:42:00 -0700 Subject: [PATCH 2/6] adding the intra process manager class --- rclcpp/CMakeLists.txt | 2 + .../include/rclcpp/intra_process_manager.hpp | 413 ++++++++++ rclcpp/test/test_intra_process_manager.cpp | 748 ++++++++++++++++++ 3 files changed, 1163 insertions(+) create mode 100644 rclcpp/include/rclcpp/intra_process_manager.hpp create mode 100644 rclcpp/test/test_intra_process_manager.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index d4e3ad4b5c..1501ef855e 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -21,6 +21,8 @@ if(AMENT_ENABLE_TESTING) include_directories(include) ament_add_gtest(test_mapped_ring_buffer test/test_mapped_ring_buffer.cpp) + ament_add_gtest(test_intra_process_manager test/test_intra_process_manager.cpp) + target_include_directories(test_intra_process_manager PUBLIC "${rcl_interfaces_INCLUDE_DIRS}") endif() ament_package( diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp new file mode 100644 index 0000000000..38eee41d87 --- /dev/null +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -0,0 +1,413 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_ +#define RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace rclcpp +{ +namespace intra_process_manager +{ + +/// This class facilitates intra process communication between nodes. +/* This class is used in the creation of publishers and subscriptions. + * A singleton instance of this class is owned by a rclcpp::Context and a + * rclcpp::Node can use an associated Context to get an instance of this class. + * Nodes which do not have a common Context will not exchange intra process + * messages because they will not share access to an instance of this class. + * + * When a Node creates a publisher or subscription, it will register them + * with this class. + * The node will also hook into the publisher's publish call + * in order to do intra process related work. + * + * When a publisher is created, it advertises on the topic the user provided, + * as well as a "shadowing" topic of type rcl_interfaces/IntraProcessMessage. + * For instance, if the user specified the topic '/namespace/chatter', then the + * corresponding intra process topic might be '/namespace/chatter__intra'. + * The publisher is also allocated an id which is unique among all publishers + * and subscriptions in this process. + * Additionally, when registered with this class a ring buffer is created and + * owned by this class as a temporary place to hold messages destined for intra + * process subscriptions. + * + * When a subscription is created, it subscribes to the topic provided by the + * user as well as to the corresponding intra process topic. + * It is also gets a unique id from the singleton instance of this class which + * is unique among publishers and subscriptions. + * + * When the user publishes a message, the message is stored by calling + * store_intra_process_message on this class. + * The instance of that message is uniquely identified by a publisher id and a + * message sequence number. + * The publisher id, message sequence pair is unique with in the process. + * At that point a list of the id's of intra process subscriptions which have + * been registered with the singleton instance of this class are stored with + * the message instance so that delivery is only made to those subscriptions. + * Then an instance of rcl_interfaces/IntraProcessMessage is published to the + * intra process topic which is specific to the topic specified by the user. + * + * When an instance of rcl_interfaces/IntraProcessMessage is received by a + * subscription, then it is handled by calling take_intra_process_message + * on a singleton of this class. + * The subscription passes a publisher id, message sequence pair which + * uniquely identifies the message instance it was suppose to receive as well + * as the subscriptions unique id. + * If the message is still being held by this class and the subscription's id + * is in the list of intended subscriptions then the message is returned. + * If either of those predicates are not satisfied then the message is not + * returned and the subscription does not call the users callback. + * + * Since the publisher builds a list of destined subscriptions on publish, and + * other requests are ignored, this class knows how many times a message + * instance should be requested. + * The final time a message is requested, the ownership is passed out of this + * class and passed to the final subscription, effectively freeing space in + * this class's internal storage. + * + * Since a topic is being used to ferry notifications about new intra process + * messages between publishers and subscriptions, it is possible for that + * notification to be lost. + * It is also possible that a subscription which was available when publish was + * called will no longer exist once the notification gets posted. + * In both cases this might result in a message instance getting requested + * fewer times than expected. + * This is why the internal storage of this class is a ring buffer. + * That way if a message is orphaned it will eventually be dropped from storage + * when a new message instance is stored and will not result in a memory leak. + * + * However, since the storage system is finite, this also means that a message + * instance might get displaced by an incoming message instance before all + * interested parties have called take_intra_process_message. + * Because of this the size of the internal storage should be carefully + * considered. + * + * /TODO(wjwwood): update to include information about handling latching. + * /TODO(wjwwood): consider thread safety of the class. + * + * This class is neither CopyConstructable nor CopyAssignable. + */ +class IntraProcessManager +{ +private: + RCLCPP_DISABLE_COPY(IntraProcessManager); +public: + RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager); + + IntraProcessManager() = default; + + /// Register a subscription with the manager, returns subscriptions unique id. + /* In addition to generating a unique intra process id for the subscription, + * this method also stores the topic name of the subscription. + * + * This method is normally called during the creation of a subscription, + * but after it creates the internal intra process rmw_subscription_t. + * + * This method will allocate memory. + * + * /param subscription the Subscription to register. + * /return an unsigned 64-bit integer which is the subscription's unique id. + */ + uint64_t + add_subscription(subscription::SubscriptionBase::SharedPtr subscription) + { + auto id = IntraProcessManager::get_next_unique_id(); + subscriptions_[id] = subscription; + subscription_ids_by_topic_[subscription->get_topic_name()].insert(id); + return id; + } + + /// Unregister a subscription using the subscription's unique id. + /* This method does not allocate memory. + * + * /param intra_process_subscription_id id of the subscription to remove. + */ + void + remove_subscription(uint64_t intra_process_subscription_id) + { + subscriptions_.erase(intra_process_subscription_id); + for (auto & pair : subscription_ids_by_topic_) { + pair.second.erase(intra_process_subscription_id); + } + // Iterate over all publisher infos and all stored subscription id's and + // remove references to this subscription's id. + for (auto & publisher_pair : publishers_) { + for (auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) { + sub_pair.second.erase(intra_process_subscription_id); + } + } + } + + /// Register a publisher with the manager, returns the publisher unique id. + /* In addition to generating and returning a unique id for the publisher, + * this method creates internal ring buffer storage for "in-flight" intra + * process messages which are stored when store_intra_process_message is + * called with this publisher's unique id. + * + * The buffer_size must be less than or equal to the max uint64_t value. + * If the buffer_size is 0 then a buffer size is calculated using the + * publisher's QoS settings. + * The default is to use the depth field of the publisher's QoS. + * TODO(wjwwood): Consider doing depth *= 1.2, round up, or similar. + * TODO(wjwwood): Consider what to do for keep all. + * + * This method is templated on the publisher's message type so that internal + * storage of the same type can be allocated. + * + * This method will allocate memory. + * + * /param publisher publisher to be registered with the manager. + * /param buffer_size if 0 (default) a size is calculated based on the QoS. + * /return an unsigned 64-bit integer which is the publisher's unique id. + */ + template + uint64_t + add_publisher(publisher::Publisher::SharedPtr publisher, size_t buffer_size=0) + { + auto id = IntraProcessManager::get_next_unique_id(); + publishers_[id].publisher = publisher; + size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size(); + // As long as the size of the ring buffer is less than the max sequence number, we're safe. + assert(size <= std::numeric_limits::max()); + publishers_[id].sequence_number.store(0); + publishers_[id].buffer = mapped_ring_buffer::MappedRingBuffer::make_shared(size); + publishers_[id].target_subscriptions_by_message_sequence.reserve(size); + return id; + } + + /// Unregister a publisher using the publisher's unique id. + /* This method does not allocate memory. + * + * /param intra_process_publisher_id id of the publisher to remove. + */ + void + remove_publisher(uint64_t intra_process_publisher_id) + { + publishers_.erase(intra_process_publisher_id); + } + + /// Store a message in the manager, and return the message sequence number. + /* The given message is stored in internal storage using the given publisher + * id and the newly generated message sequence, which is also returned. + * The combination of publisher id and message sequence number can later + * be used with a subscription id to retrieve the message by calling + * take_intra_process_message. + * The number of times take_intra_process_message can be called with this + * unique pair of id's is determined by the number of subscriptions currently + * subscribed to the same topic and which share the same Context, i.e. once + * for each subscription which should receive the intra process message. + * + * The ownership of the incoming message is transfered to the internal + * storage in order to avoid copying the message data. + * Therefore, the message parameter will no longer contain the original + * message after calling this method. + * Instead it will either be a nullptr or it will contain the ownership of + * the message instance which was displaced. + * If the message parameter is not equal to nullptr after calling this method + * then a message was prematurely displaced, i.e. take_intra_process_message + * had not been called on it as many times as was expected. + * + * This method can throw an exception if the publisher id is not found or + * if the publisher shared_ptr given to add_publisher has gone out of scope. + * + * This method does allocate memory. + * + * /param intra_process_publisher_id the id of the publisher of this message. + * /param message the message that is being stored. + * /return the message sequence number. + */ + template + uint64_t + store_intra_process_message( + uint64_t intra_process_publisher_id, + std::unique_ptr & message) + { + auto it = publishers_.find(intra_process_publisher_id); + if (it == publishers_.end()) { + throw std::runtime_error("store_intra_process_message called with invalid publisher id"); + } + publisher_info & info = it->second; + // Calculate the next message sequence number. + uint64_t message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed); + // Insert the message into the ring buffer using the message_seq to identify it. + typedef typename mapped_ring_buffer::MappedRingBuffer TypedMRB; + typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(info.buffer); + bool did_replace = typed_buffer->push_and_replace(message_seq, message); + // TODO(wjwwood): do something when a message was displaced. log debug? + (void)did_replace; // Avoid unused variable warning. + // Figure out what subscriptions should receive the message. + auto publisher = info.publisher.lock(); + if (!publisher) { + throw std::runtime_error("publisher has unexpectedly gone out of scope"); + } + auto & destined_subscriptions = subscription_ids_by_topic_[publisher->get_topic_name()]; + // Store the list for later comparison. + info.target_subscriptions_by_message_sequence[message_seq].clear(); + std::copy( + destined_subscriptions.begin(), destined_subscriptions.end(), + // Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq] + std::inserter( + info.target_subscriptions_by_message_sequence[message_seq], + // This ends up only being a hint to std::set, could also be .begin(). + info.target_subscriptions_by_message_sequence[message_seq].end() + ) + ); + // Return the message sequence which is sent to the subscription. + return message_seq; + } + + /// Take an intra process message. + /* The intra_process_publisher_id and message_sequence_number parameters + * uniquely identify a message instance, which should be taken. + * + * The requesting_subscriptions_intra_process_id parameter is used to make + * sure the requesting subscription was intended to receive this message + * instance. + * This check is made because it could happen that the requester + * comes up after the publish event, so it still receives the notification of + * a new intra process message, but it wasn't registered with the manager at + * the time of publishing, causing it to take when it wasn't intended. + * This should be avioded unless latching-like behavior is involved. + * + * The message parameter is used to store the taken message. + * On the last expected call to this method, the ownership is transfered out + * of internal storage and into the message parameter. + * On all previous calls a copy of the internally stored message is made and + * the ownership of the copy is transfered to the message parameter. + * TODO(wjwwood): update this documentation when latching is supported. + * + * The message parameter can be set to nullptr if: + * + * - The publisher id is not found. + * - The message sequence is not found for the given publisher id. + * - The requesting subscription's id is not in the list of intended takers. + * - The requesting subscription's id has been used before with this message. + * + * This method may allocate memory to copy the stored message. + * + * /param intra_process_publisher_id the id of the message's publisher. + * /param message_sequence_number the sequence number of the message. + * /param requesting_subscriptions_intra_process_id the subscription's id. + * /param message the message typed unique_ptr used to return the message. + */ + template + void + take_intra_process_message( + uint64_t intra_process_publisher_id, + uint64_t message_sequence_number, + uint64_t requesting_subscriptions_intra_process_id, + std::unique_ptr & message) + { + message = nullptr; + publisher_info * info; + { + auto it = publishers_.find(intra_process_publisher_id); + if (it == publishers_.end()) { + // Publisher is either invalid or no longer exists. + return; + } + info = &it->second; + } + // Figure out how many subscriptions are left. + std::set * target_subs; + { + auto it = info->target_subscriptions_by_message_sequence.find(message_sequence_number); + if (it == info->target_subscriptions_by_message_sequence.end()) { + // Message is no longer being stored by this publisher. + return; + } + target_subs = &it->second; + } + { + auto it = std::find( + target_subs->begin(), target_subs->end(), + requesting_subscriptions_intra_process_id); + if (it == target_subs->end()) { + // This publisher id/message seq pair was not intended for this subscription. + return; + } + target_subs->erase(it); + } + typedef typename mapped_ring_buffer::MappedRingBuffer TypedMRB; + typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(info->buffer); + // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. + if (target_subs->size()) { + // There are more subscriptions to serve, return a copy. + typed_buffer->get_copy_at_key(message_sequence_number, message); + } else { + // This is the last one to be returned, transfer ownership. + typed_buffer->pop_at_key(message_sequence_number, message); + } + } + +private: + static uint64_t get_next_unique_id() + { + auto next_id = next_unique_id_.fetch_add(1, std::memory_order_relaxed); + // Check for rollover (we started at 1). + if (0 == next_id) { + // This puts a technical limit on the number of times you can add a publisher or subscriber. + // But even if you could add (and remove) them at 1 kHz (very optimistic rate) + // it would still be a very long time before you could exhaust the pool of id's: + // 2^64 / 1000 times per sec / 60 sec / 60 min / 24 hours / 365 days = 584,942,417 years + // So around 585 million years. Even at 1 GHz, it would take 585 years. + // I think it's safe to avoid trying to handle overflow. + // If we roll over then it's most likely a bug. + throw std::overflow_error( + "exhausted the unique id's for publishers and subscribers in this process " + "(congratulations your computer is either extremely fast or extremely old)"); + } + return next_id; + } + + static std::atomic next_unique_id_; + + std::unordered_map subscriptions_; + std::map> subscription_ids_by_topic_; + + struct publisher_info + { + RCLCPP_DISABLE_COPY(publisher_info); + + publisher_info() = default; + + publisher::Publisher::WeakPtr publisher; + std::atomic sequence_number; + mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer; + std::unordered_map> target_subscriptions_by_message_sequence; + }; + + std::unordered_map publishers_; + +}; + +std::atomic IntraProcessManager::next_unique_id_{1}; + +} /* namespace intra_process_manager */ +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_ */ diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp new file mode 100644 index 0000000000..29bc717040 --- /dev/null +++ b/rclcpp/test/test_intra_process_manager.cpp @@ -0,0 +1,748 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +// Mock up publisher and subscription base to avoid needing an rmw impl. +namespace rclcpp { namespace publisher { namespace mock { + +class Publisher +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(Publisher); + + Publisher() : mock_topic_name(""), mock_queue_size(0) {} + + std::string get_topic_name() const {return mock_topic_name;} + size_t get_queue_size() const {return mock_queue_size;} + + std::string mock_topic_name; + size_t mock_queue_size; +}; + +}}} + +namespace rclcpp { namespace subscription { namespace mock { + +class SubscriptionBase +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionBase); + + SubscriptionBase() : mock_topic_name(""), mock_queue_size(0) {} + + std::string get_topic_name() const {return mock_topic_name;} + size_t get_queue_size() const {return mock_queue_size;} + + std::string mock_topic_name; + size_t mock_queue_size; +}; + +}}} + +// Prevent rclcpp/publisher.hpp and rclcpp/subscription.hpp from being imported. +#define RCLCPP_RCLCPP_PUBLISHER_HPP_ +#define RCLCPP_RCLCPP_SUBSCRIPTION_HPP_ +// Force ipm to use our mock publisher class. +#define Publisher mock::Publisher +#define SubscriptionBase mock::SubscriptionBase +#include +#undef SubscriptionBase +#undef Publisher + +#include + +/* +This tests the "normal" usage of the class: + - Creates two publishers on different topics. + - Creates a subscription which matches one of them. + - Publishes on each publisher with different message content. + - Try's to take the message from the non-matching publish, should fail. + - Try's to take the message from the matching publish, should work. + - Asserts the message it got back was the one that went in (since there's only one subscription). + - Try's to take the message again, should fail. +*/ +TEST(test_intra_process_manager, nominal) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 2; + + auto p2 = std::make_shared(); + p2->mock_topic_name = "nominal2"; + p2->mock_queue_size = 10; + + auto s1 = std::make_shared(); + s1->mock_topic_name = "nominal1"; + s1->mock_queue_size = 10; + + auto p1_id = ipm.add_publisher(p1); + auto p2_id = ipm.add_publisher(p2); + auto s1_id = ipm.add_subscription(s1); + + auto ipm_msg = std::make_shared(); + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto p1_m1_original_address = unique_msg.get(); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + ipm_msg->message_sequence = 43; + ipm_msg->publisher_id = 43; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto p2_m1_id = ipm.store_intra_process_message(p2_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + ipm.take_intra_process_message(p2_id, p2_m1_id, s1_id, unique_msg); + EXPECT_EQ(nullptr, unique_msg); // Should fail since p2 and s1 don't have the same topic. + unique_msg.reset(); + + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + EXPECT_EQ(p1_m1_original_address, unique_msg.get()); + } + + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_EQ(nullptr, unique_msg); // Should fail, since the message was already taken. + + ipm_msg->message_sequence = 44; + ipm_msg->publisher_id = 44; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + ipm_msg->message_sequence = 45; + ipm_msg->publisher_id = 45; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + ipm_msg->message_sequence = 46; + ipm_msg->publisher_id = 46; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(44ul, unique_msg->message_sequence); + EXPECT_EQ(44ul, unique_msg->publisher_id); + } +} + +/* +Simulates the case where a publisher is removed between publishing and the matching take. + - Creates a publisher and subscription on the same topic. + - Publishes a message. + - Remove the publisher. + - Try's to take the message, should fail since the publisher (and its storage) is gone. +*/ +TEST(test_intra_process_manager, remove_publisher_before_trying_to_take) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 10; + + auto s1 = std::make_shared(); + s1->mock_topic_name = "nominal1"; + s1->mock_queue_size = 10; + + auto p1_id = ipm.add_publisher(p1); + auto s1_id = ipm.add_subscription(s1); + + auto ipm_msg = std::make_shared(); + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + ipm.remove_publisher(p1_id); + + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_EQ(nullptr, unique_msg); // Should fail, since the publisher is gone. +} + +/* +Tests whether or not removed subscriptions affect take behavior. + - Creates a publisher and three subscriptions on the same topic. + - Publish a message, keep the original point for later comparison. + - Take with one subscription, should work. + - Remove a different subscription. + - Take with the final subscription, should work. + - Assert the previous take returned ownership of the original object published. +*/ +TEST(test_intra_process_manager, removed_subscription_affects_take) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 10; + + auto s1 = std::make_shared(); + s1->mock_topic_name = "nominal1"; + s1->mock_queue_size = 10; + + auto s2 = std::make_shared(); + s2->mock_topic_name = "nominal1"; + s2->mock_queue_size = 10; + + auto s3 = std::make_shared(); + s3->mock_topic_name = "nominal1"; + s3->mock_queue_size = 10; + + auto p1_id = ipm.add_publisher(p1); + auto s1_id = ipm.add_subscription(s1); + auto s2_id = ipm.add_subscription(s2); + auto s3_id = ipm.add_subscription(s3); + + auto ipm_msg = std::make_shared(); + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer = unique_msg.get(); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + EXPECT_NE(original_message_pointer, unique_msg.get()); + } + unique_msg.reset(); + + ipm.remove_subscription(s2_id); + + // Take using s3, the remaining subscription. + ipm.take_intra_process_message(p1_id, p1_m1_id, s3_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + // Should match the original pointer since s2 was removed first. + EXPECT_EQ(original_message_pointer, unique_msg.get()); + } + + // Take using s2, should fail since s2 was removed. + unique_msg.reset(); + ipm.take_intra_process_message(p1_id, p1_m1_id, s2_id, unique_msg); + EXPECT_EQ(nullptr, unique_msg); +} + +/* +This tests normal operation with multiple subscriptions and one publisher. + - Creates a publisher and three subscriptions on the same topic. + - Publish a message. + - Take with each subscription, checking that the last takes the original back. +*/ +TEST(test_intra_process_manager, multiple_subscriptions_one_publisher) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 10; + + auto s1 = std::make_shared(); + s1->mock_topic_name = "nominal1"; + s1->mock_queue_size = 10; + + auto s2 = std::make_shared(); + s2->mock_topic_name = "nominal1"; + s2->mock_queue_size = 10; + + auto s3 = std::make_shared(); + s3->mock_topic_name = "nominal1"; + s3->mock_queue_size = 10; + + auto p1_id = ipm.add_publisher(p1); + auto s1_id = ipm.add_subscription(s1); + auto s2_id = ipm.add_subscription(s2); + auto s3_id = ipm.add_subscription(s3); + + auto ipm_msg = std::make_shared(); + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer = unique_msg.get(); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + EXPECT_NE(original_message_pointer, unique_msg.get()); + } + unique_msg.reset(); + + ipm.take_intra_process_message(p1_id, p1_m1_id, s2_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + EXPECT_NE(original_message_pointer, unique_msg.get()); + } + unique_msg.reset(); + + ipm.take_intra_process_message(p1_id, p1_m1_id, s3_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + // Should match the original pointer. + EXPECT_EQ(original_message_pointer, unique_msg.get()); + } +} + +/* +This tests normal operation with multiple publishers and one subscription. + - Creates a publisher and three subscriptions on the same topic. + - Publish a message. + - Take with each subscription, checking that the last takes the original back. +*/ +TEST(test_intra_process_manager, multiple_publishers_one_subscription) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 10; + + auto p2 = std::make_shared(); + p2->mock_topic_name = "nominal1"; + p2->mock_queue_size = 10; + + auto p3 = std::make_shared(); + p3->mock_topic_name = "nominal1"; + p3->mock_queue_size = 10; + + auto s1 = std::make_shared(); + s1->mock_topic_name = "nominal1"; + s1->mock_queue_size = 10; + + auto p1_id = ipm.add_publisher(p1); + auto p2_id = ipm.add_publisher(p2); + auto p3_id = ipm.add_publisher(p3); + auto s1_id = ipm.add_subscription(s1); + + auto ipm_msg = std::make_shared(); + // First publish + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer1 = unique_msg.get(); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + // Second publish + ipm_msg->message_sequence = 43; + ipm_msg->publisher_id = 43; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer2 = unique_msg.get(); + auto p2_m1_id = ipm.store_intra_process_message(p2_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + // Third publish + ipm_msg->message_sequence = 44; + ipm_msg->publisher_id = 44; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer3 = unique_msg.get(); + auto p3_m1_id = ipm.store_intra_process_message(p3_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + // First take + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + EXPECT_EQ(original_message_pointer1, unique_msg.get()); + } + unique_msg.reset(); + + // Second take + ipm.take_intra_process_message(p2_id, p2_m1_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(43ul, unique_msg->message_sequence); + EXPECT_EQ(43ul, unique_msg->publisher_id); + EXPECT_EQ(original_message_pointer2, unique_msg.get()); + } + unique_msg.reset(); + + // Third take + ipm.take_intra_process_message(p3_id, p3_m1_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(44ul, unique_msg->message_sequence); + EXPECT_EQ(44ul, unique_msg->publisher_id); + EXPECT_EQ(original_message_pointer3, unique_msg.get()); + } + unique_msg.reset(); +} + +/* +This tests normal operation with multiple publishers and subscriptions. + - Creates three publishers and three subscriptions on the same topic. + - Publish a message on each publisher. + - Take from each publisher with each subscription, checking the pointer. +*/ +TEST(test_intra_process_manager, multiple_publishers_multiple_subscription) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 10; + + auto p2 = std::make_shared(); + p2->mock_topic_name = "nominal1"; + p2->mock_queue_size = 10; + + auto p3 = std::make_shared(); + p3->mock_topic_name = "nominal1"; + p3->mock_queue_size = 10; + + auto s1 = std::make_shared(); + s1->mock_topic_name = "nominal1"; + s1->mock_queue_size = 10; + + auto s2 = std::make_shared(); + s2->mock_topic_name = "nominal1"; + s2->mock_queue_size = 10; + + auto s3 = std::make_shared(); + s3->mock_topic_name = "nominal1"; + s3->mock_queue_size = 10; + + auto p1_id = ipm.add_publisher(p1); + auto p2_id = ipm.add_publisher(p2); + auto p3_id = ipm.add_publisher(p3); + auto s1_id = ipm.add_subscription(s1); + auto s2_id = ipm.add_subscription(s2); + auto s3_id = ipm.add_subscription(s3); + + auto ipm_msg = std::make_shared(); + // First publish + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer1 = unique_msg.get(); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + // Second publish + ipm_msg->message_sequence = 43; + ipm_msg->publisher_id = 43; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer2 = unique_msg.get(); + auto p2_m1_id = ipm.store_intra_process_message(p2_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + // Third publish + ipm_msg->message_sequence = 44; + ipm_msg->publisher_id = 44; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer3 = unique_msg.get(); + auto p3_m1_id = ipm.store_intra_process_message(p3_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + // First take + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + EXPECT_NE(original_message_pointer1, unique_msg.get()); + } + unique_msg.reset(); + + ipm.take_intra_process_message(p1_id, p1_m1_id, s2_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + EXPECT_NE(original_message_pointer1, unique_msg.get()); + } + unique_msg.reset(); + + ipm.take_intra_process_message(p1_id, p1_m1_id, s3_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + EXPECT_EQ(original_message_pointer1, unique_msg.get()); // Final take. + } + unique_msg.reset(); + + // Second take + ipm.take_intra_process_message(p2_id, p2_m1_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(43ul, unique_msg->message_sequence); + EXPECT_EQ(43ul, unique_msg->publisher_id); + EXPECT_NE(original_message_pointer2, unique_msg.get()); + } + unique_msg.reset(); + + ipm.take_intra_process_message(p2_id, p2_m1_id, s2_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(43ul, unique_msg->message_sequence); + EXPECT_EQ(43ul, unique_msg->publisher_id); + EXPECT_NE(original_message_pointer2, unique_msg.get()); + } + unique_msg.reset(); + + ipm.take_intra_process_message(p2_id, p2_m1_id, s3_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(43ul, unique_msg->message_sequence); + EXPECT_EQ(43ul, unique_msg->publisher_id); + EXPECT_EQ(original_message_pointer2, unique_msg.get()); + } + unique_msg.reset(); + + // Third take + ipm.take_intra_process_message(p3_id, p3_m1_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(44ul, unique_msg->message_sequence); + EXPECT_EQ(44ul, unique_msg->publisher_id); + EXPECT_NE(original_message_pointer3, unique_msg.get()); + } + unique_msg.reset(); + + ipm.take_intra_process_message(p3_id, p3_m1_id, s2_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(44ul, unique_msg->message_sequence); + EXPECT_EQ(44ul, unique_msg->publisher_id); + EXPECT_NE(original_message_pointer3, unique_msg.get()); + } + unique_msg.reset(); + + ipm.take_intra_process_message(p3_id, p3_m1_id, s3_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(44ul, unique_msg->message_sequence); + EXPECT_EQ(44ul, unique_msg->publisher_id); + EXPECT_EQ(original_message_pointer3, unique_msg.get()); + } + unique_msg.reset(); +} + +/* +Tests displacing a message from the ring buffer before take is called. + - Creates a publisher (buffer_size = 2) and a subscription on the same topic. + - Publish a message on the publisher. + - Publish another message. + - Take the second message. + - Publish a message. + - Try to take the first message, should fail. +*/ +TEST(test_intra_process_manager, ring_buffer_displacement) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 2; + + auto s1 = std::make_shared(); + s1->mock_topic_name = "nominal1"; + s1->mock_queue_size = 10; + + auto p1_id = ipm.add_publisher(p1); + auto s1_id = ipm.add_subscription(s1); + + auto ipm_msg = std::make_shared(); + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer1 = unique_msg.get(); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + ipm_msg->message_sequence = 43; + ipm_msg->publisher_id = 43; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto original_message_pointer2 = unique_msg.get(); + auto p1_m2_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + ipm.take_intra_process_message(p1_id, p1_m2_id, s1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); + if (unique_msg) { + EXPECT_EQ(43ul, unique_msg->message_sequence); + EXPECT_EQ(43ul, unique_msg->publisher_id); + EXPECT_EQ(original_message_pointer2, unique_msg.get()); + } + unique_msg.reset(); + + ipm_msg->message_sequence = 44; + ipm_msg->publisher_id = 44; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + ipm.store_intra_process_message(p1_id, unique_msg); + EXPECT_NE(nullptr, unique_msg); // Should return the thing in the ring buffer it displaced. + if (unique_msg) { + // This should have been the first published message. + EXPECT_EQ(42ul, unique_msg->message_sequence); + EXPECT_EQ(42ul, unique_msg->publisher_id); + EXPECT_EQ(original_message_pointer1, unique_msg.get()); + } + unique_msg.reset(); + + // Since it just got displaced it should no longer be there to take. + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_EQ(nullptr, unique_msg); +} + +/* +Simulates race condition where a subscription is created after publish. + - Creates a publisher. + - Publish a message on the publisher. + - Create a subscription on the same topic. + - Try to take the message with the newly created subscription, should fail. +*/ +TEST(test_intra_process_manager, subscription_creation_race_condition) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 2; + + auto p1_id = ipm.add_publisher(p1); + + auto ipm_msg = std::make_shared(); + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + auto s1 = std::make_shared(); + s1->mock_topic_name = "nominal1"; + s1->mock_queue_size = 10; + + auto s1_id = ipm.add_subscription(s1); + + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_EQ(nullptr, unique_msg); +} + +/* +Simulates race condition where a publisher goes out of scope before take. + - Create a subscription. + - Creates a publisher on the same topic in a scope. + - Publish a message on the publisher in a scope. + - Let the scope expire. + - Try to take the message with the subscription, should fail. +*/ +TEST(test_intra_process_manager, publisher_out_of_scope_take) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + auto s1 = std::make_shared(); + s1->mock_topic_name = "nominal1"; + s1->mock_queue_size = 10; + + auto s1_id = ipm.add_subscription(s1); + + uint64_t p1_id; + uint64_t p1_m1_id; + { + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 2; + + p1_id = ipm.add_publisher(p1); + + auto ipm_msg = std::make_shared(); + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + ASSERT_EQ(nullptr, unique_msg); + + // Explicitly remove publisher from ipm (emulate's publisher's destructor). + ipm.remove_publisher(p1_id); + } + + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(nullptr); + // Should fail because the publisher is out of scope. + ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); + EXPECT_EQ(nullptr, unique_msg); +} + +/* +Simulates race condition where a publisher goes out of scope before store. + - Creates a publisher in a scope. + - Let the scope expire. + - Publish a message on the publisher in a scope, should throw. +*/ +TEST(test_intra_process_manager, publisher_out_of_scope_store) { + rclcpp::intra_process_manager::IntraProcessManager ipm; + + uint64_t p1_id; + { + auto p1 = std::make_shared(); + p1->mock_topic_name = "nominal1"; + p1->mock_queue_size = 2; + + p1_id = ipm.add_publisher(p1); + } + + auto ipm_msg = std::make_shared(); + ipm_msg->message_sequence = 42; + ipm_msg->publisher_id = 42; + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; + unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + + EXPECT_THROW(ipm.store_intra_process_message(p1_id, unique_msg), std::runtime_error); + ASSERT_EQ(nullptr, unique_msg); +} From 5568cc2326c94eb2af7c6d130f0652bba3ec3fa5 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Tue, 18 Aug 2015 18:43:05 -0700 Subject: [PATCH 3/6] allow storing arbitrary singletons in the Context --- rclcpp/include/rclcpp/context.hpp | 39 +++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/rclcpp/include/rclcpp/context.hpp b/rclcpp/include/rclcpp/context.hpp index 55287926d8..f859a7f226 100644 --- a/rclcpp/include/rclcpp/context.hpp +++ b/rclcpp/include/rclcpp/context.hpp @@ -15,16 +15,24 @@ #ifndef RCLCPP_RCLCPP_CONTEXT_HPP_ #define RCLCPP_RCLCPP_CONTEXT_HPP_ +#include + +#include + #include +#include +#include +#include +#include -#include +#include namespace rclcpp { + namespace context { -/* ROS Context */ class Context { public: @@ -32,9 +40,36 @@ class Context Context() {} + template + std::shared_ptr + get_sub_context(Args && ... args) + { + std::lock_guard lock(mutex_); + + std::type_index type_i(typeid(SubContext)); + std::shared_ptr sub_context; + auto it = sub_contexts_.find(type_i); + if (it == sub_contexts_.end()) { + // It doesn't exist yet, make it + sub_context = std::shared_ptr( + new SubContext(std::forward(args) ...), + [] (SubContext * sub_context_ptr) { + delete sub_context_ptr; + }); + sub_contexts_[type_i] = sub_context; + } else { + // It exists, get it out and cast it. + sub_context = std::static_pointer_cast(it->second); + } + return sub_context; + } + private: RCLCPP_DISABLE_COPY(Context); + std::unordered_map> sub_contexts_; + std::mutex mutex_; + }; } /* namespace context */ From fb4e836da00dd7d97fceafc9a071eaef53fcadba Mon Sep 17 00:00:00 2001 From: William Woodall Date: Tue, 18 Aug 2015 18:46:51 -0700 Subject: [PATCH 4/6] changed default arguments for Node changes how the default context is gotten and adds an option for enabling/disabling intra process comms --- .../rclcpp/contexts/default_context.hpp | 7 +++++++ rclcpp/include/rclcpp/node.hpp | 8 ++++++-- rclcpp/include/rclcpp/node_impl.hpp | 20 +++++++++++-------- rclcpp/include/rclcpp/utilities.hpp | 1 + 4 files changed, 26 insertions(+), 10 deletions(-) diff --git a/rclcpp/include/rclcpp/contexts/default_context.hpp b/rclcpp/include/rclcpp/contexts/default_context.hpp index dc78f78338..c10225e68c 100644 --- a/rclcpp/include/rclcpp/contexts/default_context.hpp +++ b/rclcpp/include/rclcpp/contexts/default_context.hpp @@ -33,6 +33,13 @@ class DefaultContext : public rclcpp::context::Context }; +DefaultContext::SharedPtr +get_global_default_context() +{ + static DefaultContext::SharedPtr default_context = DefaultContext::make_shared(); + return default_context; +} + } // namespace default_context } // namespace contexts } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 95d916dd09..4266a0ca9e 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -103,9 +103,11 @@ class Node RCLCPP_SMART_PTR_DEFINITIONS(Node); /* Create a node based on the node name. */ - Node(const std::string & node_name); + Node(const std::string & node_name, bool use_intra_process_comms = false); /* Create a node based on the node name and a rclcpp::context::Context. */ - Node(const std::string & node_name, rclcpp::context::Context::SharedPtr context); + Node( + const std::string & node_name, rclcpp::context::Context::SharedPtr context, + bool use_intra_process_comms = false); /* Get the name of the node. */ const std::string & @@ -214,6 +216,8 @@ class Node size_t number_of_services_; size_t number_of_clients_; + bool use_intra_process_comms_; + mutable std::mutex mutex_; std::map parameters_; diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index e67b8f3d23..22025617f5 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -39,15 +39,20 @@ using namespace rclcpp; using namespace rclcpp::node; -using rclcpp::contexts::default_context::DefaultContext; - -Node::Node(const std::string & node_name) -: Node(node_name, DefaultContext::make_shared()) +Node::Node(const std::string & node_name, bool use_intra_process_comms) +: Node( + node_name, + rclcpp::contexts::default_context::get_global_default_context(), + use_intra_process_comms) {} -Node::Node(const std::string & node_name, context::Context::SharedPtr context) +Node::Node( + const std::string & node_name, + context::Context::SharedPtr context, + bool use_intra_process_comms) : name_(node_name), context_(context), - number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0) + number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0), + use_intra_process_comms_(use_intra_process_comms) { size_t domain_id = 0; char * ros_domain_id = nullptr; @@ -163,8 +168,7 @@ Node::create_subscription( if (!subscriber_handle) { // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) throw std::runtime_error( - std::string("could not create subscription: ") + - rmw_get_error_string_safe()); + std::string("could not create subscription: ") + rmw_get_error_string_safe()); // *INDENT-ON* } diff --git a/rclcpp/include/rclcpp/utilities.hpp b/rclcpp/include/rclcpp/utilities.hpp index c0e067c3ea..9e77976d98 100644 --- a/rclcpp/include/rclcpp/utilities.hpp +++ b/rclcpp/include/rclcpp/utilities.hpp @@ -27,6 +27,7 @@ #include #include +#include #include #include From aedc494f8fa07b12662360f19a59e90626895803 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Tue, 18 Aug 2015 18:48:13 -0700 Subject: [PATCH 5/6] implement intra process comms this involves changes in the executor, node, publisher, and subscription classes I'd like a more decoupled way to integrate this into the executor and node but I was unable to find a good way to do so. --- rclcpp/include/rclcpp/any_executable.hpp | 1 + rclcpp/include/rclcpp/executor.hpp | 68 ++++++++++++---- rclcpp/include/rclcpp/node.hpp | 8 ++ rclcpp/include/rclcpp/node_impl.hpp | 79 ++++++++++++++++++- rclcpp/include/rclcpp/publisher.hpp | 99 ++++++++++++++++++++---- rclcpp/include/rclcpp/subscription.hpp | 68 +++++++++++++++- 6 files changed, 289 insertions(+), 34 deletions(-) diff --git a/rclcpp/include/rclcpp/any_executable.hpp b/rclcpp/include/rclcpp/any_executable.hpp index 3f698f1911..1bdddb880b 100644 --- a/rclcpp/include/rclcpp/any_executable.hpp +++ b/rclcpp/include/rclcpp/any_executable.hpp @@ -33,6 +33,7 @@ struct AnyExecutable {} // Only one of the following pointers will be set. rclcpp::subscription::SubscriptionBase::SharedPtr subscription; + rclcpp::subscription::SubscriptionBase::SharedPtr subscription_intra_process; rclcpp::timer::TimerBase::SharedPtr timer; rclcpp::service::ServiceBase::SharedPtr service; rclcpp::client::ClientBase::SharedPtr client; diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index f32daa6938..c58370cc84 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -15,15 +15,16 @@ #ifndef RCLCPP_RCLCPP_EXECUTOR_HPP_ #define RCLCPP_RCLCPP_EXECUTOR_HPP_ -#include - #include #include #include +#include #include #include #include +#include + #include #include #include @@ -159,6 +160,9 @@ class Executor if (any_exec->subscription) { execute_subscription(any_exec->subscription); } + if (any_exec->subscription_intra_process) { + execute_intra_process_subscription(any_exec->subscription_intra_process); + } if (any_exec->service) { execute_service(any_exec->service); } @@ -194,6 +198,24 @@ class Executor subscription->return_message(message); } + static void + execute_intra_process_subscription( + rclcpp::subscription::SubscriptionBase::SharedPtr & subscription) + { + rcl_interfaces::msg::IntraProcessMessage ipm; + bool taken = false; + rmw_ret_t status = rmw_take(subscription->intra_process_subscription_handle_, &ipm, &taken); + if (status == RMW_RET_OK) { + if (taken) { + subscription->handle_intra_process_message(ipm); + } + } else { + fprintf(stderr, + "[rclcpp::error] take failed for intra process subscription on topic '%s': %s\n", + subscription->get_topic_name().c_str(), rmw_get_error_string_safe()); + } + } + static void execute_timer( rclcpp::timer::TimerBase::SharedPtr & timer) @@ -293,22 +315,24 @@ class Executor })); } // Use the number of subscriptions to allocate memory in the handles - size_t number_of_subscriptions = subs.size(); + size_t max_number_of_subscriptions = subs.size() * 2; // Times two for intra-process. rmw_subscriptions_t subscriber_handles; - subscriber_handles.subscriber_count = number_of_subscriptions; + subscriber_handles.subscriber_count = 0; // TODO(wjwwood): Avoid redundant malloc's - subscriber_handles.subscribers = - memory_strategy_->borrow_handles(HandleType::subscription_handle, number_of_subscriptions); + subscriber_handles.subscribers = memory_strategy_->borrow_handles( + HandleType::subscription_handle, max_number_of_subscriptions); if (subscriber_handles.subscribers == NULL) { // TODO(wjwwood): Use a different error here? maybe std::bad_alloc? throw std::runtime_error("Could not malloc for subscriber pointers."); } // Then fill the SubscriberHandles with ready subscriptions - size_t subscriber_handle_index = 0; for (auto & subscription : subs) { - subscriber_handles.subscribers[subscriber_handle_index] = \ + subscriber_handles.subscribers[subscriber_handles.subscriber_count++] = subscription->subscription_handle_->data; - subscriber_handle_index += 1; + if (subscription->intra_process_subscription_handle_) { + subscriber_handles.subscribers[subscriber_handles.subscriber_count++] = + subscription->intra_process_subscription_handle_->data; + } } // Use the number of services to allocate memory in the handles @@ -414,7 +438,7 @@ class Executor } // Add the new work to the class's list of things waiting to be executed // Starting with the subscribers - for (size_t i = 0; i < number_of_subscriptions; ++i) { + for (size_t i = 0; i < subscriber_handles.subscriber_count; ++i) { void * handle = subscriber_handles.subscribers[i]; if (handle) { subscriber_handles_.push_back(handle); @@ -463,13 +487,18 @@ class Executor } for (auto & weak_subscription : group->subscription_ptrs_) { auto subscription = weak_subscription.lock(); - if (subscription && subscription->subscription_handle_->data == subscriber_handle) { - return subscription; + if (subscription) { + if (subscription->subscription_handle_->data == subscriber_handle) { + return subscription; + } + if (subscription->intra_process_subscription_handle_->data == subscriber_handle) { + return subscription; + } } } } } - return rclcpp::subscription::SubscriptionBase::SharedPtr(); + return nullptr; } rclcpp::service::ServiceBase::SharedPtr @@ -653,6 +682,11 @@ class Executor while (it != subscriber_handles_.end()) { auto subscription = get_subscription_by_handle(*it); if (subscription) { + // Figure out if this is for intra-process or not. + bool is_intra_process = false; + if (subscription->intra_process_subscription_handle_) { + is_intra_process = subscription->intra_process_subscription_handle_->data == *it; + } // Find the group for this handle and see if it can be serviced auto group = get_group_by_subscription(subscription); if (!group) { @@ -668,7 +702,11 @@ class Executor continue; } // Otherwise it is safe to set and return the any_exec - any_exec->subscription = subscription; + if (is_intra_process) { + any_exec->subscription_intra_process = subscription; + } else { + any_exec->subscription = subscription; + } any_exec->callback_group = group; any_exec->node = get_node_by_group(group); subscriber_handles_.erase(it++); @@ -804,7 +842,7 @@ class Executor } // Check the subscriptions to see if there are any that are ready get_next_subscription(any_exec); - if (any_exec->subscription) { + if (any_exec->subscription || any_exec->subscription_intra_process) { return any_exec; } // Check the services to see if there are any that are ready diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 4266a0ca9e..a4d30a837f 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -199,6 +200,8 @@ class Node private: RCLCPP_DISABLE_COPY(Node); + static const rosidl_message_type_support_t * ipm_ts; + bool group_in_node(callback_group::CallbackGroup::SharedPtr & group); @@ -309,6 +312,11 @@ class Node } }; +const rosidl_message_type_support_t * Node::ipm_ts = + rosidl_generator_cpp::get_message_type_support_handle< + rcl_interfaces::msg::IntraProcessMessage + >(); + } /* namespace node */ } /* namespace rclcpp */ diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 22025617f5..a6a46e5b6d 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -24,12 +24,14 @@ #include #include +#include #include #include #include #include #include +#include #include #ifndef RCLCPP_RCLCPP_NODE_HPP_ @@ -128,7 +130,45 @@ Node::create_publisher( // *INDENT-ON* } - return publisher::Publisher::make_shared(node_handle_, publisher_handle); + auto publisher = publisher::Publisher::make_shared( + node_handle_, publisher_handle, topic_name, qos_profile.depth); + + if (use_intra_process_comms_) { + rmw_publisher_t * intra_process_publisher_handle = rmw_create_publisher( + node_handle_.get(), ipm_ts, (topic_name + "__intra").c_str(), qos_profile); + if (!intra_process_publisher_handle) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("could not create intra process publisher: ") + + rmw_get_error_string_safe()); + // *INDENT-ON* + } + + auto intra_process_manager = + context_->get_sub_context(); + uint64_t intra_process_publisher_id = + intra_process_manager->add_publisher(publisher); + rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager; + auto shared_publish_callback = + [weak_ipm] (uint64_t publisher_id, std::shared_ptr msg) -> uint64_t + { + auto ipm = weak_ipm.lock(); + if (!ipm) { + // TODO(wjwwood): should this just return silently? Or maybe return with a logged warning? + throw std::runtime_error( + "intra process publish called after destruction of intra process manager"); + } + auto typed_msg = std::static_pointer_cast(msg); + std::unique_ptr unique_msg(new MessageT(*typed_msg)); + uint64_t message_seq = ipm->store_intra_process_message(publisher_id, unique_msg); + return message_seq; + }; + publisher->setup_intra_process( + intra_process_publisher_id, + shared_publish_callback, + intra_process_publisher_handle); + } + return publisher; } bool @@ -182,10 +222,45 @@ Node::create_subscription( callback, msg_mem_strat); auto sub_base_ptr = std::dynamic_pointer_cast(sub); + // Setup intra process. + if (use_intra_process_comms_) { + rmw_subscription_t * intra_process_subscriber_handle = rmw_create_subscription( + node_handle_.get(), ipm_ts, + (topic_name + "__intra").c_str(), qos_profile, false); + if (!subscriber_handle) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("could not create intra process subscription: ") + rmw_get_error_string_safe()); + // *INDENT-ON* + } + auto intra_process_manager = + context_->get_sub_context(); + rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager; + uint64_t intra_process_subscription_id = + intra_process_manager->add_subscription(sub_base_ptr); + sub->setup_intra_process( + intra_process_subscription_id, + intra_process_subscriber_handle, + [weak_ipm] ( + uint64_t publisher_id, + uint64_t message_sequence, + uint64_t subscription_id, + std::unique_ptr & message) + { + auto ipm = weak_ipm.lock(); + if (!ipm) { + // TODO(wjwwood): should this just return silently? Or maybe return with a logged warning? + throw std::runtime_error( + "intra process take called after destruction of intra process manager"); + } + ipm->take_intra_process_message(publisher_id, message_sequence, subscription_id, message); + }); + } + // Assign to a group. if (group) { if (!group_in_node(group)) { // TODO: use custom exception - throw std::runtime_error("Cannot create timer, group not in node."); + throw std::runtime_error("Cannot create subscription, group not in node."); } group->add_subscription(sub_base_ptr); } else { diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 3a8cc386ea..ad9102bee2 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -15,15 +15,18 @@ #ifndef RCLCPP_RCLCPP_PUBLISHER_HPP_ #define RCLCPP_RCLCPP_PUBLISHER_HPP_ +#include + #include #include #include +#include +#include +#include #include #include -#include - namespace rclcpp { @@ -38,23 +41,37 @@ namespace publisher class Publisher { + friend rclcpp::node::Node; public: RCLCPP_SMART_PTR_DEFINITIONS(Publisher); - Publisher(std::shared_ptr node_handle, rmw_publisher_t * publisher_handle) - : node_handle_(node_handle), publisher_handle_(publisher_handle) + Publisher( + std::shared_ptr node_handle, + rmw_publisher_t * publisher_handle, + std::string topic, + size_t queue_size) + : node_handle_(node_handle), publisher_handle_(publisher_handle), + intra_process_publisher_handle_(nullptr), + topic_(topic), queue_size_(queue_size), + intra_process_publisher_id_(0), store_intra_process_message_(nullptr) {} virtual ~Publisher() { + if (intra_process_publisher_handle_) { + if (rmw_destroy_publisher(node_handle_.get(), intra_process_publisher_handle_)) { + fprintf( + stderr, + "Error in destruction of intra process rmw publisher handle: %s\n", + rmw_get_error_string_safe()); + } + } if (publisher_handle_) { if (rmw_destroy_publisher(node_handle_.get(), publisher_handle_) != RMW_RET_OK) { - // *INDENT-OFF* - std::stringstream ss; - ss << "Error in destruction of rmw publisher handle: " - << rmw_get_error_string_safe() << '\n'; - // *INDENT-ON* - (std::cerr << ss.str()).flush(); + fprintf( + stderr, + "Error in destruction of rmw publisher handle: %s\n", + rmw_get_error_string_safe()); } } } @@ -63,19 +80,69 @@ class Publisher void publish(std::shared_ptr & msg) { - rmw_ret_t status = rmw_publish(publisher_handle_, msg.get()); - if (status != RMW_RET_OK) { - // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) - throw std::runtime_error( - std::string("failed to publish message: ") + rmw_get_error_string_safe()); - // *INDENT-ON* + rmw_ret_t status; + if (!store_intra_process_message_) { + // TODO(wjwwood): for now, make intra process and inter process mutually exclusive. + // Later we'll have them together, when we have a way to filter more efficiently. + status = rmw_publish(publisher_handle_, msg.get()); + if (status != RMW_RET_OK) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("failed to publish message: ") + rmw_get_error_string_safe()); + // *INDENT-ON* + } + } + if (store_intra_process_message_) { + uint64_t message_seq = store_intra_process_message_(intra_process_publisher_id_, msg); + rcl_interfaces::msg::IntraProcessMessage ipm; + ipm.publisher_id = intra_process_publisher_id_; + ipm.message_sequence = message_seq; + status = rmw_publish(intra_process_publisher_handle_, &ipm); + if (status != RMW_RET_OK) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("failed to publish intra process message: ") + rmw_get_error_string_safe()); + // *INDENT-ON* + } } } + std::string + get_topic_name() const + { + return topic_; + } + + size_t + get_queue_size() const + { + return queue_size_; + } + + typedef std::function)> StoreSharedMessageCallbackT; +protected: + void + setup_intra_process( + uint64_t intra_process_publisher_id, + StoreSharedMessageCallbackT callback, + rmw_publisher_t * intra_process_publisher_handle) + { + intra_process_publisher_id_ = intra_process_publisher_id; + store_intra_process_message_ = callback; + intra_process_publisher_handle_ = intra_process_publisher_handle; + } + private: std::shared_ptr node_handle_; rmw_publisher_t * publisher_handle_; + rmw_publisher_t * intra_process_publisher_handle_; + + std::string topic_; + size_t queue_size_; + + uint64_t intra_process_publisher_id_; + StoreSharedMessageCallbackT store_intra_process_message_; }; diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 141747c373..13235c3bf7 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -21,6 +21,7 @@ #include #include +#include #include #include @@ -36,6 +37,11 @@ namespace executor class Executor; } // namespace executor +namespace node +{ +class Node; +} // namespace node + namespace subscription { @@ -51,7 +57,8 @@ class SubscriptionBase rmw_subscription_t * subscription_handle, const std::string & topic_name, bool ignore_local_publications) - : node_handle_(node_handle), + : intra_process_subscription_handle_(nullptr), + node_handle_(node_handle), subscription_handle_(subscription_handle), topic_name_(topic_name), ignore_local_publications_(ignore_local_publications) @@ -70,6 +77,15 @@ class SubscriptionBase (std::cerr << ss.str()).flush(); } } + if (intra_process_subscription_handle_) { + auto ret = rmw_destroy_subscription(node_handle_.get(), intra_process_subscription_handle_); + if (ret != RMW_RET_OK) { + std::stringstream ss; + ss << "Error in destruction of rmw intra process subscription handle: " << + rmw_get_error_string_safe() << '\n'; + (std::cerr << ss.str()).flush(); + } + } } const std::string & get_topic_name() const @@ -80,6 +96,10 @@ class SubscriptionBase virtual std::shared_ptr create_message() = 0; virtual void handle_message(std::shared_ptr & message) = 0; virtual void return_message(std::shared_ptr & message) = 0; + virtual void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm) = 0; + +protected: + rmw_subscription_t * intra_process_subscription_handle_; private: RCLCPP_DISABLE_COPY(SubscriptionBase); @@ -87,6 +107,7 @@ class SubscriptionBase std::shared_ptr node_handle_; rmw_subscription_t * subscription_handle_; + std::string topic_name_; bool ignore_local_publications_; @@ -95,6 +116,8 @@ class SubscriptionBase template class Subscription : public SubscriptionBase { + friend class rclcpp::node::Node; + public: using CallbackType = std::function &)>; RCLCPP_SMART_PTR_DEFINITIONS(Subscription); @@ -135,13 +158,56 @@ class Subscription : public SubscriptionBase message_memory_strategy_->return_message(typed_message); } + void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm) + { + if (!get_intra_process_message_callback_) { + // throw std::runtime_error( + // "handle_intra_process_message called before setup_intra_process"); + // TODO(wjwwood): for now, this could mean that intra process was just not enabled. + // However, this can only really happen if this node has it disabled, but the other doesn't. + return; + } + std::unique_ptr msg; + get_intra_process_message_callback_( + ipm.publisher_id, + ipm.message_sequence, + intra_process_subscription_id_, + msg); + if (!msg) { + // This either occurred because the publisher no longer exists or the + // message requested is no longer being stored. + // TODO(wjwwood): should we notify someone of this? log error, log warning? + return; + } + typename MessageT::SharedPtr shared_msg = std::move(msg); + callback_(shared_msg); + } + private: + typedef + std::function< + void (uint64_t, uint64_t, uint64_t, std::unique_ptr &) + > GetMessageCallbackType; + + void setup_intra_process( + uint64_t intra_process_subscription_id, + rmw_subscription_t * intra_process_subscription, + GetMessageCallbackType callback) + { + intra_process_subscription_id_ = intra_process_subscription_id; + intra_process_subscription_handle_ = intra_process_subscription; + get_intra_process_message_callback_ = callback; + } + RCLCPP_DISABLE_COPY(Subscription); CallbackType callback_; typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy_; + GetMessageCallbackType get_intra_process_message_callback_; + uint64_t intra_process_subscription_id_; + }; } /* namespace subscription */ From 12b939cf5a80fe221742aa510da0148c5a4dd622 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Wed, 19 Aug 2015 13:10:15 -0700 Subject: [PATCH 6/6] fixes to address comments and CI failures --- rclcpp/CMakeLists.txt | 4 +- rclcpp/include/rclcpp/context.hpp | 2 + .../include/rclcpp/intra_process_manager.hpp | 54 ++-- rclcpp/include/rclcpp/mapped_ring_buffer.hpp | 31 ++- rclcpp/include/rclcpp/node.hpp | 8 +- rclcpp/include/rclcpp/node_impl.hpp | 20 +- rclcpp/include/rclcpp/publisher.hpp | 6 +- rclcpp/include/rclcpp/subscription.hpp | 6 +- rclcpp/test/test_intra_process_manager.cpp | 248 ++++++++++-------- rclcpp/test/test_mapped_ring_buffer.cpp | 66 +++-- 10 files changed, 264 insertions(+), 181 deletions(-) diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 1501ef855e..2cc262dffe 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -22,7 +22,9 @@ if(AMENT_ENABLE_TESTING) ament_add_gtest(test_mapped_ring_buffer test/test_mapped_ring_buffer.cpp) ament_add_gtest(test_intra_process_manager test/test_intra_process_manager.cpp) - target_include_directories(test_intra_process_manager PUBLIC "${rcl_interfaces_INCLUDE_DIRS}") + if(TARGET test_intra_process_manager) + target_include_directories(test_intra_process_manager PUBLIC "${rcl_interfaces_INCLUDE_DIRS}") + endif() endif() ament_package( diff --git a/rclcpp/include/rclcpp/context.hpp b/rclcpp/include/rclcpp/context.hpp index f859a7f226..1c56cbb11f 100644 --- a/rclcpp/include/rclcpp/context.hpp +++ b/rclcpp/include/rclcpp/context.hpp @@ -51,11 +51,13 @@ class Context auto it = sub_contexts_.find(type_i); if (it == sub_contexts_.end()) { // It doesn't exist yet, make it + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) sub_context = std::shared_ptr( new SubContext(std::forward(args) ...), [] (SubContext * sub_context_ptr) { delete sub_context_ptr; }); + // *INDENT-ON* sub_contexts_[type_i] = sub_context; } else { // It exists, get it out and cast it. diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp index 38eee41d87..e68973141f 100644 --- a/rclcpp/include/rclcpp/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -21,8 +21,9 @@ #include #include -#include +#include #include +#include #include #include #include @@ -115,6 +116,7 @@ class IntraProcessManager { private: RCLCPP_DISABLE_COPY(IntraProcessManager); + public: RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager); @@ -129,8 +131,8 @@ class IntraProcessManager * * This method will allocate memory. * - * /param subscription the Subscription to register. - * /return an unsigned 64-bit integer which is the subscription's unique id. + * \param subscription the Subscription to register. + * \return an unsigned 64-bit integer which is the subscription's unique id. */ uint64_t add_subscription(subscription::SubscriptionBase::SharedPtr subscription) @@ -144,7 +146,7 @@ class IntraProcessManager /// Unregister a subscription using the subscription's unique id. /* This method does not allocate memory. * - * /param intra_process_subscription_id id of the subscription to remove. + * \param intra_process_subscription_id id of the subscription to remove. */ void remove_subscription(uint64_t intra_process_subscription_id) @@ -180,19 +182,21 @@ class IntraProcessManager * * This method will allocate memory. * - * /param publisher publisher to be registered with the manager. - * /param buffer_size if 0 (default) a size is calculated based on the QoS. - * /return an unsigned 64-bit integer which is the publisher's unique id. + * \param publisher publisher to be registered with the manager. + * \param buffer_size if 0 (default) a size is calculated based on the QoS. + * \return an unsigned 64-bit integer which is the publisher's unique id. */ template uint64_t - add_publisher(publisher::Publisher::SharedPtr publisher, size_t buffer_size=0) + add_publisher(publisher::Publisher::SharedPtr publisher, size_t buffer_size = 0) { auto id = IntraProcessManager::get_next_unique_id(); publishers_[id].publisher = publisher; size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size(); // As long as the size of the ring buffer is less than the max sequence number, we're safe. - assert(size <= std::numeric_limits::max()); + if (size > std::numeric_limits::max()) { + throw std::invalid_argument("the calculated buffer size is too large"); + } publishers_[id].sequence_number.store(0); publishers_[id].buffer = mapped_ring_buffer::MappedRingBuffer::make_shared(size); publishers_[id].target_subscriptions_by_message_sequence.reserve(size); @@ -202,7 +206,7 @@ class IntraProcessManager /// Unregister a publisher using the publisher's unique id. /* This method does not allocate memory. * - * /param intra_process_publisher_id id of the publisher to remove. + * \param intra_process_publisher_id id of the publisher to remove. */ void remove_publisher(uint64_t intra_process_publisher_id) @@ -236,9 +240,9 @@ class IntraProcessManager * * This method does allocate memory. * - * /param intra_process_publisher_id the id of the publisher of this message. - * /param message the message that is being stored. - * /return the message sequence number. + * \param intra_process_publisher_id the id of the publisher of this message. + * \param message the message that is being stored. + * \return the message sequence number. */ template uint64_t @@ -250,7 +254,7 @@ class IntraProcessManager if (it == publishers_.end()) { throw std::runtime_error("store_intra_process_message called with invalid publisher id"); } - publisher_info & info = it->second; + PublisherInfo & info = it->second; // Calculate the next message sequence number. uint64_t message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed); // Insert the message into the ring buffer using the message_seq to identify it. @@ -309,10 +313,10 @@ class IntraProcessManager * * This method may allocate memory to copy the stored message. * - * /param intra_process_publisher_id the id of the message's publisher. - * /param message_sequence_number the sequence number of the message. - * /param requesting_subscriptions_intra_process_id the subscription's id. - * /param message the message typed unique_ptr used to return the message. + * \param intra_process_publisher_id the id of the message's publisher. + * \param message_sequence_number the sequence number of the message. + * \param requesting_subscriptions_intra_process_id the subscription's id. + * \param message the message typed unique_ptr used to return the message. */ template void @@ -323,7 +327,7 @@ class IntraProcessManager std::unique_ptr & message) { message = nullptr; - publisher_info * info; + PublisherInfo * info; { auto it = publishers_.find(intra_process_publisher_id); if (it == publishers_.end()) { @@ -377,9 +381,11 @@ class IntraProcessManager // So around 585 million years. Even at 1 GHz, it would take 585 years. // I think it's safe to avoid trying to handle overflow. // If we roll over then it's most likely a bug. + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) throw std::overflow_error( "exhausted the unique id's for publishers and subscribers in this process " "(congratulations your computer is either extremely fast or extremely old)"); + // *INDENT-ON* } return next_id; } @@ -389,11 +395,11 @@ class IntraProcessManager std::unordered_map subscriptions_; std::map> subscription_ids_by_topic_; - struct publisher_info + struct PublisherInfo { - RCLCPP_DISABLE_COPY(publisher_info); + RCLCPP_DISABLE_COPY(PublisherInfo); - publisher_info() = default; + PublisherInfo() = default; publisher::Publisher::WeakPtr publisher; std::atomic sequence_number; @@ -401,11 +407,11 @@ class IntraProcessManager std::unordered_map> target_subscriptions_by_message_sequence; }; - std::unordered_map publishers_; + std::unordered_map publishers_; }; -std::atomic IntraProcessManager::next_unique_id_{1}; +std::atomic IntraProcessManager::next_unique_id_ {1}; } /* namespace intra_process_manager */ } /* namespace rclcpp */ diff --git a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp index d4fc0f7288..4b5bb474a6 100644 --- a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp +++ b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp @@ -59,9 +59,10 @@ class MappedRingBuffer : public MappedRingBufferBase /// Constructor. /* The constructor will allocate memory while reserving space. * - * /param size size of the ring buffer; must be positive and non-zero. + * \param size size of the ring buffer; must be positive and non-zero. */ - MappedRingBuffer(size_t size) : elements_(size), head_(0) + MappedRingBuffer(size_t size) + : elements_(size), head_(0) { if (size == 0) { throw std::invalid_argument("size must be a positive, non-zero value"); @@ -75,8 +76,10 @@ class MappedRingBuffer : public MappedRingBufferBase * * The key is not guaranteed to be unique, see the class docs for more. * - * /param key the key associated with the stored value - * /param value if the key is found, the value is stored in this parameter + * The contents of value before the method is called are discarded. + * + * \param key the key associated with the stored value + * \param value if the key is found, the value is stored in this parameter */ void get_copy_at_key(uint64_t key, std::unique_ptr & value) @@ -102,8 +105,10 @@ class MappedRingBuffer : public MappedRingBufferBase * originally stored object, since it was returned by the first call to this * method. * - * /param key the key associated with the stored value - * /param value if the key is found, the value is stored in this parameter + * The contents of value before the method is called are discarded. + * + * \param key the key associated with the stored value + * \param value if the key is found, the value is stored in this parameter */ void get_ownership_at_key(uint64_t key, std::unique_ptr & value) @@ -125,8 +130,10 @@ class MappedRingBuffer : public MappedRingBufferBase * * The key is not guaranteed to be unique, see the class docs for more. * - * /param key the key associated with the stored value - * /param value if the key is found, the value is stored in this parameter + * The contents of value before the method is called are discarded. + * + * \param key the key associated with the stored value + * \param value if the key is found, the value is stored in this parameter */ void pop_at_key(uint64_t key, std::unique_ptr & value) @@ -147,8 +154,8 @@ class MappedRingBuffer : public MappedRingBufferBase * After insertion, if a pair was replaced, then value will contain ownership * of that displaced value. Otherwise it will be a nullptr. * - * /param key the key associated with the value to be stored - * /param value the value to store, and optionally the value displaced + * \param key the key associated with the value to be stored + * \param value the value to store, and optionally the value displaced */ bool push_and_replace(uint64_t key, std::unique_ptr & value) @@ -188,9 +195,11 @@ class MappedRingBuffer : public MappedRingBufferBase typename std::vector::iterator get_iterator_of_key(uint64_t key) { - auto it = std::find_if(elements_.begin(), elements_.end(), [key] (element & e) -> bool { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + auto it = std::find_if(elements_.begin(), elements_.end(), [key](element & e) -> bool { return e.key == key && e.in_use; }); + // *INDENT-ON* return it; } diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index a4d30a837f..03ff9de492 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -200,7 +200,7 @@ class Node private: RCLCPP_DISABLE_COPY(Node); - static const rosidl_message_type_support_t * ipm_ts; + static const rosidl_message_type_support_t * ipm_ts_; bool group_in_node(callback_group::CallbackGroup::SharedPtr & group); @@ -312,10 +312,8 @@ class Node } }; -const rosidl_message_type_support_t * Node::ipm_ts = - rosidl_generator_cpp::get_message_type_support_handle< - rcl_interfaces::msg::IntraProcessMessage - >(); +const rosidl_message_type_support_t * Node::ipm_ts_ = + rosidl_generator_cpp::get_message_type_support_handle(); } /* namespace node */ } /* namespace rclcpp */ diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index a6a46e5b6d..23cdd27c9e 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -43,9 +43,9 @@ using namespace rclcpp::node; Node::Node(const std::string & node_name, bool use_intra_process_comms) : Node( - node_name, - rclcpp::contexts::default_context::get_global_default_context(), - use_intra_process_comms) + node_name, + rclcpp::contexts::default_context::get_global_default_context(), + use_intra_process_comms) {} Node::Node( @@ -87,6 +87,7 @@ Node::Node( // *INDENT-ON* } // Initialize node handle shared_ptr with custom deleter. + // *INDENT-OFF* node_handle_.reset(node, [](rmw_node_t * node) { auto ret = rmw_destroy_node(node); if (ret != RMW_RET_OK) { @@ -94,6 +95,7 @@ Node::Node( stderr, "Error in destruction of rmw node handle: %s\n", rmw_get_error_string_safe()); } }); + // *INDENT-ON* using rclcpp::callback_group::CallbackGroupType; default_callback_group_ = create_callback_group( @@ -135,7 +137,7 @@ Node::create_publisher( if (use_intra_process_comms_) { rmw_publisher_t * intra_process_publisher_handle = rmw_create_publisher( - node_handle_.get(), ipm_ts, (topic_name + "__intra").c_str(), qos_profile); + node_handle_.get(), ipm_ts_, (topic_name + "__intra").c_str(), qos_profile); if (!intra_process_publisher_handle) { // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) throw std::runtime_error( @@ -149,8 +151,9 @@ Node::create_publisher( uint64_t intra_process_publisher_id = intra_process_manager->add_publisher(publisher); rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager; + // *INDENT-OFF* auto shared_publish_callback = - [weak_ipm] (uint64_t publisher_id, std::shared_ptr msg) -> uint64_t + [weak_ipm](uint64_t publisher_id, std::shared_ptr msg) -> uint64_t { auto ipm = weak_ipm.lock(); if (!ipm) { @@ -163,6 +166,7 @@ Node::create_publisher( uint64_t message_seq = ipm->store_intra_process_message(publisher_id, unique_msg); return message_seq; }; + // *INDENT-ON* publisher->setup_intra_process( intra_process_publisher_id, shared_publish_callback, @@ -225,7 +229,7 @@ Node::create_subscription( // Setup intra process. if (use_intra_process_comms_) { rmw_subscription_t * intra_process_subscriber_handle = rmw_create_subscription( - node_handle_.get(), ipm_ts, + node_handle_.get(), ipm_ts_, (topic_name + "__intra").c_str(), qos_profile, false); if (!subscriber_handle) { // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) @@ -238,10 +242,11 @@ Node::create_subscription( rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager; uint64_t intra_process_subscription_id = intra_process_manager->add_subscription(sub_base_ptr); + // *INDENT-OFF* sub->setup_intra_process( intra_process_subscription_id, intra_process_subscriber_handle, - [weak_ipm] ( + [weak_ipm]( uint64_t publisher_id, uint64_t message_sequence, uint64_t subscription_id, @@ -255,6 +260,7 @@ Node::create_subscription( } ipm->take_intra_process_message(publisher_id, message_sequence, subscription_id, message); }); + // *INDENT-ON* } // Assign to a group. if (group) { diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index ad9102bee2..6f0ae14fd7 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -42,6 +42,7 @@ namespace publisher class Publisher { friend rclcpp::node::Node; + public: RCLCPP_SMART_PTR_DEFINITIONS(Publisher); @@ -107,7 +108,7 @@ class Publisher } } - std::string + const std::string & get_topic_name() const { return topic_; @@ -119,7 +120,8 @@ class Publisher return queue_size_; } - typedef std::function)> StoreSharedMessageCallbackT; + typedef std::function)> StoreSharedMessageCallbackT; + protected: void setup_intra_process( diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 13235c3bf7..33dc9ffefe 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -185,9 +185,9 @@ class Subscription : public SubscriptionBase private: typedef - std::function< - void (uint64_t, uint64_t, uint64_t, std::unique_ptr &) - > GetMessageCallbackType; + std::function< + void (uint64_t, uint64_t, uint64_t, std::unique_ptr &) + > GetMessageCallbackType; void setup_intra_process( uint64_t intra_process_subscription_id, diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp index 29bc717040..8a57705768 100644 --- a/rclcpp/test/test_intra_process_manager.cpp +++ b/rclcpp/test/test_intra_process_manager.cpp @@ -12,46 +12,76 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include #include // Mock up publisher and subscription base to avoid needing an rmw impl. -namespace rclcpp { namespace publisher { namespace mock { +namespace rclcpp +{ +namespace publisher +{ +namespace mock +{ class Publisher { public: RCLCPP_SMART_PTR_DEFINITIONS(Publisher); - Publisher() : mock_topic_name(""), mock_queue_size(0) {} + Publisher() + : mock_topic_name(""), mock_queue_size(0) {} - std::string get_topic_name() const {return mock_topic_name;} - size_t get_queue_size() const {return mock_queue_size;} + const std::string & get_topic_name() const + { + return mock_topic_name; + } + size_t get_queue_size() const + { + return mock_queue_size; + } std::string mock_topic_name; size_t mock_queue_size; }; -}}} +} +} +} -namespace rclcpp { namespace subscription { namespace mock { +namespace rclcpp +{ +namespace subscription +{ +namespace mock +{ class SubscriptionBase { public: RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionBase); - SubscriptionBase() : mock_topic_name(""), mock_queue_size(0) {} + SubscriptionBase() + : mock_topic_name(""), mock_queue_size(0) {} - std::string get_topic_name() const {return mock_topic_name;} - size_t get_queue_size() const {return mock_queue_size;} + const std::string & get_topic_name() const + { + return mock_topic_name; + } + size_t get_queue_size() const + { + return mock_queue_size; + } std::string mock_topic_name; size_t mock_queue_size; }; -}}} +} +} +} // Prevent rclcpp/publisher.hpp and rclcpp/subscription.hpp from being imported. #define RCLCPP_RCLCPP_PUBLISHER_HPP_ @@ -66,16 +96,16 @@ class SubscriptionBase #include /* -This tests the "normal" usage of the class: - - Creates two publishers on different topics. - - Creates a subscription which matches one of them. - - Publishes on each publisher with different message content. - - Try's to take the message from the non-matching publish, should fail. - - Try's to take the message from the matching publish, should work. - - Asserts the message it got back was the one that went in (since there's only one subscription). - - Try's to take the message again, should fail. -*/ -TEST(test_intra_process_manager, nominal) { + This tests the "normal" usage of the class: + - Creates two publishers on different topics. + - Creates a subscription which matches one of them. + - Publishes on each publisher with different message content. + - Try's to take the message from the non-matching publish, should fail. + - Try's to take the message from the matching publish, should work. + - Asserts the message it got back was the one that went in (since there's only one subscription). + - Try's to take the message again, should fail. + */ +TEST(TestIntraProcessManager, nominal) { rclcpp::intra_process_manager::IntraProcessManager ipm; auto p1 = std::make_shared(); @@ -97,8 +127,9 @@ TEST(test_intra_process_manager, nominal) { auto ipm_msg = std::make_shared(); ipm_msg->message_sequence = 42; ipm_msg->publisher_id = 42; - rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; - unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg( + new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) + ); auto p1_m1_original_address = unique_msg.get(); auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); @@ -153,13 +184,13 @@ TEST(test_intra_process_manager, nominal) { } /* -Simulates the case where a publisher is removed between publishing and the matching take. - - Creates a publisher and subscription on the same topic. - - Publishes a message. - - Remove the publisher. - - Try's to take the message, should fail since the publisher (and its storage) is gone. -*/ -TEST(test_intra_process_manager, remove_publisher_before_trying_to_take) { + Simulates the case where a publisher is removed between publishing and the matching take. + - Creates a publisher and subscription on the same topic. + - Publishes a message. + - Remove the publisher. + - Try's to take the message, should fail since the publisher (and its storage) is gone. + */ +TEST(TestIntraProcessManager, remove_publisher_before_trying_to_take) { rclcpp::intra_process_manager::IntraProcessManager ipm; auto p1 = std::make_shared(); @@ -176,8 +207,9 @@ TEST(test_intra_process_manager, remove_publisher_before_trying_to_take) { auto ipm_msg = std::make_shared(); ipm_msg->message_sequence = 42; ipm_msg->publisher_id = 42; - rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; - unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg( + new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) + ); auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); ASSERT_EQ(nullptr, unique_msg); @@ -189,15 +221,15 @@ TEST(test_intra_process_manager, remove_publisher_before_trying_to_take) { } /* -Tests whether or not removed subscriptions affect take behavior. - - Creates a publisher and three subscriptions on the same topic. - - Publish a message, keep the original point for later comparison. - - Take with one subscription, should work. - - Remove a different subscription. - - Take with the final subscription, should work. - - Assert the previous take returned ownership of the original object published. -*/ -TEST(test_intra_process_manager, removed_subscription_affects_take) { + Tests whether or not removed subscriptions affect take behavior. + - Creates a publisher and three subscriptions on the same topic. + - Publish a message, keep the original point for later comparison. + - Take with one subscription, should work. + - Remove a different subscription. + - Take with the final subscription, should work. + - Assert the previous take returned ownership of the original object published. + */ +TEST(TestIntraProcessManager, removed_subscription_affects_take) { rclcpp::intra_process_manager::IntraProcessManager ipm; auto p1 = std::make_shared(); @@ -224,8 +256,9 @@ TEST(test_intra_process_manager, removed_subscription_affects_take) { auto ipm_msg = std::make_shared(); ipm_msg->message_sequence = 42; ipm_msg->publisher_id = 42; - rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; - unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg( + new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) + ); auto original_message_pointer = unique_msg.get(); auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); @@ -259,12 +292,12 @@ TEST(test_intra_process_manager, removed_subscription_affects_take) { } /* -This tests normal operation with multiple subscriptions and one publisher. - - Creates a publisher and three subscriptions on the same topic. - - Publish a message. - - Take with each subscription, checking that the last takes the original back. -*/ -TEST(test_intra_process_manager, multiple_subscriptions_one_publisher) { + This tests normal operation with multiple subscriptions and one publisher. + - Creates a publisher and three subscriptions on the same topic. + - Publish a message. + - Take with each subscription, checking that the last takes the original back. + */ +TEST(TestIntraProcessManager, multiple_subscriptions_one_publisher) { rclcpp::intra_process_manager::IntraProcessManager ipm; auto p1 = std::make_shared(); @@ -291,8 +324,9 @@ TEST(test_intra_process_manager, multiple_subscriptions_one_publisher) { auto ipm_msg = std::make_shared(); ipm_msg->message_sequence = 42; ipm_msg->publisher_id = 42; - rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; - unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg( + new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) + ); auto original_message_pointer = unique_msg.get(); auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); @@ -327,12 +361,12 @@ TEST(test_intra_process_manager, multiple_subscriptions_one_publisher) { } /* -This tests normal operation with multiple publishers and one subscription. - - Creates a publisher and three subscriptions on the same topic. - - Publish a message. - - Take with each subscription, checking that the last takes the original back. -*/ -TEST(test_intra_process_manager, multiple_publishers_one_subscription) { + This tests normal operation with multiple publishers and one subscription. + - Creates a publisher and three subscriptions on the same topic. + - Publish a message. + - Take with each subscription, checking that the last takes the original back. + */ +TEST(TestIntraProcessManager, multiple_publishers_one_subscription) { rclcpp::intra_process_manager::IntraProcessManager ipm; auto p1 = std::make_shared(); @@ -360,8 +394,9 @@ TEST(test_intra_process_manager, multiple_publishers_one_subscription) { // First publish ipm_msg->message_sequence = 42; ipm_msg->publisher_id = 42; - rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; - unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg( + new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) + ); auto original_message_pointer1 = unique_msg.get(); auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); @@ -417,12 +452,12 @@ TEST(test_intra_process_manager, multiple_publishers_one_subscription) { } /* -This tests normal operation with multiple publishers and subscriptions. - - Creates three publishers and three subscriptions on the same topic. - - Publish a message on each publisher. - - Take from each publisher with each subscription, checking the pointer. -*/ -TEST(test_intra_process_manager, multiple_publishers_multiple_subscription) { + This tests normal operation with multiple publishers and subscriptions. + - Creates three publishers and three subscriptions on the same topic. + - Publish a message on each publisher. + - Take from each publisher with each subscription, checking the pointer. + */ +TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) { rclcpp::intra_process_manager::IntraProcessManager ipm; auto p1 = std::make_shared(); @@ -460,8 +495,9 @@ TEST(test_intra_process_manager, multiple_publishers_multiple_subscription) { // First publish ipm_msg->message_sequence = 42; ipm_msg->publisher_id = 42; - rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; - unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg( + new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) + ); auto original_message_pointer1 = unique_msg.get(); auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); @@ -571,15 +607,15 @@ TEST(test_intra_process_manager, multiple_publishers_multiple_subscription) { } /* -Tests displacing a message from the ring buffer before take is called. - - Creates a publisher (buffer_size = 2) and a subscription on the same topic. - - Publish a message on the publisher. - - Publish another message. - - Take the second message. - - Publish a message. - - Try to take the first message, should fail. -*/ -TEST(test_intra_process_manager, ring_buffer_displacement) { + Tests displacing a message from the ring buffer before take is called. + - Creates a publisher (buffer_size = 2) and a subscription on the same topic. + - Publish a message on the publisher. + - Publish another message. + - Take the second message. + - Publish a message. + - Try to take the first message, should fail. + */ +TEST(TestIntraProcessManager, ring_buffer_displacement) { rclcpp::intra_process_manager::IntraProcessManager ipm; auto p1 = std::make_shared(); @@ -596,8 +632,9 @@ TEST(test_intra_process_manager, ring_buffer_displacement) { auto ipm_msg = std::make_shared(); ipm_msg->message_sequence = 42; ipm_msg->publisher_id = 42; - rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; - unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg( + new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) + ); auto original_message_pointer1 = unique_msg.get(); auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); @@ -640,13 +677,13 @@ TEST(test_intra_process_manager, ring_buffer_displacement) { } /* -Simulates race condition where a subscription is created after publish. - - Creates a publisher. - - Publish a message on the publisher. - - Create a subscription on the same topic. - - Try to take the message with the newly created subscription, should fail. -*/ -TEST(test_intra_process_manager, subscription_creation_race_condition) { + Simulates race condition where a subscription is created after publish. + - Creates a publisher. + - Publish a message on the publisher. + - Create a subscription on the same topic. + - Try to take the message with the newly created subscription, should fail. + */ +TEST(TestIntraProcessManager, subscription_creation_race_condition) { rclcpp::intra_process_manager::IntraProcessManager ipm; auto p1 = std::make_shared(); @@ -658,8 +695,9 @@ TEST(test_intra_process_manager, subscription_creation_race_condition) { auto ipm_msg = std::make_shared(); ipm_msg->message_sequence = 42; ipm_msg->publisher_id = 42; - rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; - unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg( + new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) + ); auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); ASSERT_EQ(nullptr, unique_msg); @@ -675,14 +713,14 @@ TEST(test_intra_process_manager, subscription_creation_race_condition) { } /* -Simulates race condition where a publisher goes out of scope before take. - - Create a subscription. - - Creates a publisher on the same topic in a scope. - - Publish a message on the publisher in a scope. - - Let the scope expire. - - Try to take the message with the subscription, should fail. -*/ -TEST(test_intra_process_manager, publisher_out_of_scope_take) { + Simulates race condition where a publisher goes out of scope before take. + - Create a subscription. + - Creates a publisher on the same topic in a scope. + - Publish a message on the publisher in a scope. + - Let the scope expire. + - Try to take the message with the subscription, should fail. + */ +TEST(TestIntraProcessManager, publisher_out_of_scope_take) { rclcpp::intra_process_manager::IntraProcessManager ipm; auto s1 = std::make_shared(); @@ -703,8 +741,9 @@ TEST(test_intra_process_manager, publisher_out_of_scope_take) { auto ipm_msg = std::make_shared(); ipm_msg->message_sequence = 42; ipm_msg->publisher_id = 42; - rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; - unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg( + new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) + ); p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); ASSERT_EQ(nullptr, unique_msg); @@ -720,12 +759,12 @@ TEST(test_intra_process_manager, publisher_out_of_scope_take) { } /* -Simulates race condition where a publisher goes out of scope before store. - - Creates a publisher in a scope. - - Let the scope expire. - - Publish a message on the publisher in a scope, should throw. -*/ -TEST(test_intra_process_manager, publisher_out_of_scope_store) { + Simulates race condition where a publisher goes out of scope before store. + - Creates a publisher in a scope. + - Let the scope expire. + - Publish a message on the publisher in a scope, should throw. + */ +TEST(TestIntraProcessManager, publisher_out_of_scope_store) { rclcpp::intra_process_manager::IntraProcessManager ipm; uint64_t p1_id; @@ -740,8 +779,9 @@ TEST(test_intra_process_manager, publisher_out_of_scope_store) { auto ipm_msg = std::make_shared(); ipm_msg->message_sequence = 42; ipm_msg->publisher_id = 42; - rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg; - unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); + rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg( + new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) + ); EXPECT_THROW(ipm.store_intra_process_message(p1_id, unique_msg), std::runtime_error); ASSERT_EQ(nullptr, unique_msg); diff --git a/rclcpp/test/test_mapped_ring_buffer.cpp b/rclcpp/test/test_mapped_ring_buffer.cpp index 4e40882507..3cb6128515 100644 --- a/rclcpp/test/test_mapped_ring_buffer.cpp +++ b/rclcpp/test/test_mapped_ring_buffer.cpp @@ -17,9 +17,9 @@ #include /* -Tests get_copy and pop on an empty mrb. -*/ -TEST(test_mapped_ring_buffer, empty) { + Tests get_copy and pop on an empty mrb. + */ +TEST(TestMappedRingBuffer, empty) { // Cannot create a buffer of size zero. EXPECT_THROW(rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(0), std::invalid_argument); // Getting or popping an empty buffer should result in a nullptr. @@ -34,9 +34,9 @@ TEST(test_mapped_ring_buffer, empty) { } /* -Tests push_and_replace with a temporary object. -*/ -TEST(test_mapped_ring_buffer, temporary_l_value) { + Tests push_and_replace with a temporary object. + */ +TEST(TestMappedRingBuffer, temporary_l_value) { rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); // Pass in value with temporary object mrb.push_and_replace(1, std::unique_ptr(new char('a'))); @@ -53,9 +53,9 @@ TEST(test_mapped_ring_buffer, temporary_l_value) { } /* -Tests normal usage of the mrb. -*/ -TEST(test_mapped_ring_buffer, nominal) { + Tests normal usage of the mrb. + */ +TEST(TestMappedRingBuffer, nominal) { rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); std::unique_ptr expected(new char('a')); // Store expected value's address for later comparison. @@ -66,12 +66,16 @@ TEST(test_mapped_ring_buffer, nominal) { std::unique_ptr actual; mrb.get_copy_at_key(1, actual); EXPECT_NE(nullptr, actual); - if (actual) EXPECT_EQ('a', *actual); + if (actual) { + EXPECT_EQ('a', *actual); + } EXPECT_NE(expected_orig, actual.get()); mrb.pop_at_key(1, actual); EXPECT_NE(nullptr, actual); - if (actual) EXPECT_EQ('a', *actual); + if (actual) { + EXPECT_EQ('a', *actual); + } EXPECT_EQ(expected_orig, actual.get()); mrb.get_copy_at_key(1, actual); @@ -91,17 +95,21 @@ TEST(test_mapped_ring_buffer, nominal) { mrb.get_copy_at_key(2, actual); EXPECT_NE(nullptr, actual); - if (actual) EXPECT_EQ('b', *actual); + if (actual) { + EXPECT_EQ('b', *actual); + } mrb.get_copy_at_key(3, actual); EXPECT_NE(nullptr, actual); - if (actual) EXPECT_EQ('c', *actual); + if (actual) { + EXPECT_EQ('c', *actual); + } } /* -Tests get_ownership on a normal mrb. -*/ -TEST(test_mapped_ring_buffer, get_ownership) { + Tests get_ownership on a normal mrb. + */ +TEST(TestMappedRingBuffer, get_ownership) { rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); std::unique_ptr expected(new char('a')); // Store expected value's address for later comparison. @@ -112,17 +120,23 @@ TEST(test_mapped_ring_buffer, get_ownership) { std::unique_ptr actual; mrb.get_copy_at_key(1, actual); EXPECT_NE(nullptr, actual); - if (actual) EXPECT_EQ('a', *actual); + if (actual) { + EXPECT_EQ('a', *actual); + } EXPECT_NE(expected_orig, actual.get()); mrb.get_ownership_at_key(1, actual); EXPECT_NE(nullptr, actual); - if (actual) EXPECT_EQ('a', *actual); + if (actual) { + EXPECT_EQ('a', *actual); + } EXPECT_EQ(expected_orig, actual.get()); mrb.pop_at_key(1, actual); EXPECT_NE(nullptr, actual); - if (actual) EXPECT_EQ('a', *actual); // The value should be the same. + if (actual) { + EXPECT_EQ('a', *actual); // The value should be the same. + } EXPECT_NE(expected_orig, actual.get()); // Even though we pop'ed, we didn't get the original. mrb.get_copy_at_key(1, actual); @@ -130,9 +144,9 @@ TEST(test_mapped_ring_buffer, get_ownership) { } /* -Tests the affect of reusing keys (non-unique keys) in a mrb. -*/ -TEST(test_mapped_ring_buffer, non_unique_keys) { + Tests the affect of reusing keys (non-unique keys) in a mrb. + */ +TEST(TestMappedRingBuffer, non_unique_keys) { rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); std::unique_ptr input(new char('a')); @@ -145,10 +159,14 @@ TEST(test_mapped_ring_buffer, non_unique_keys) { std::unique_ptr actual; mrb.pop_at_key(1, actual); EXPECT_NE(nullptr, actual); - if (actual) EXPECT_EQ('a', *actual); + if (actual) { + EXPECT_EQ('a', *actual); + } actual = nullptr; mrb.pop_at_key(1, actual); EXPECT_NE(nullptr, actual); - if (actual) EXPECT_EQ('b', *actual); + if (actual) { + EXPECT_EQ('b', *actual); + } }