Skip to content

Commit

Permalink
Merge pull request #73 from ros2/intra_process
Browse files Browse the repository at this point in the history
Implement intra process communications for Pub/Sub
  • Loading branch information
wjwwood committed Aug 21, 2015
2 parents b9bf46e + 12b939c commit 7f016cd
Show file tree
Hide file tree
Showing 15 changed files with 1,967 additions and 46 deletions.
13 changes: 13 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ cmake_minimum_required(VERSION 2.8.3)
project(rclcpp)

find_package(ament_cmake REQUIRED)
find_package(rcl_interfaces REQUIRED)

ament_export_dependencies(rmw)
ament_export_dependencies(rcl_interfaces)
Expand All @@ -12,6 +13,18 @@ ament_export_include_directories(include)
if(AMENT_ENABLE_TESTING)
find_package(ament_lint_auto REQUIRED)
ament_lint_auto_find_test_dependencies()

if(NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall -Wextra")
endif()

include_directories(include)

ament_add_gtest(test_mapped_ring_buffer test/test_mapped_ring_buffer.cpp)
ament_add_gtest(test_intra_process_manager test/test_intra_process_manager.cpp)
if(TARGET test_intra_process_manager)
target_include_directories(test_intra_process_manager PUBLIC "${rcl_interfaces_INCLUDE_DIRS}")
endif()
endif()

ament_package(
Expand Down
1 change: 1 addition & 0 deletions rclcpp/include/rclcpp/any_executable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct AnyExecutable
{}
// Only one of the following pointers will be set.
rclcpp::subscription::SubscriptionBase::SharedPtr subscription;
rclcpp::subscription::SubscriptionBase::SharedPtr subscription_intra_process;
rclcpp::timer::TimerBase::SharedPtr timer;
rclcpp::service::ServiceBase::SharedPtr service;
rclcpp::client::ClientBase::SharedPtr client;
Expand Down
41 changes: 39 additions & 2 deletions rclcpp/include/rclcpp/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,63 @@
#ifndef RCLCPP_RCLCPP_CONTEXT_HPP_
#define RCLCPP_RCLCPP_CONTEXT_HPP_

#include <rclcpp/macros.hpp>

#include <iostream>

#include <memory>
#include <mutex>
#include <typeinfo>
#include <typeindex>
#include <unordered_map>

#include <rclcpp/macros.hpp>
#include <rmw/rmw.h>

namespace rclcpp
{

namespace context
{

/* ROS Context */
class Context
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(Context);

Context() {}

template<typename SubContext, typename ... Args>
std::shared_ptr<SubContext>
get_sub_context(Args && ... args)
{
std::lock_guard<std::mutex> lock(mutex_);

std::type_index type_i(typeid(SubContext));
std::shared_ptr<SubContext> sub_context;
auto it = sub_contexts_.find(type_i);
if (it == sub_contexts_.end()) {
// It doesn't exist yet, make it
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
sub_context = std::shared_ptr<SubContext>(
new SubContext(std::forward<Args>(args) ...),
[] (SubContext * sub_context_ptr) {
delete sub_context_ptr;
});
// *INDENT-ON*
sub_contexts_[type_i] = sub_context;
} else {
// It exists, get it out and cast it.
sub_context = std::static_pointer_cast<SubContext>(it->second);
}
return sub_context;
}

private:
RCLCPP_DISABLE_COPY(Context);

std::unordered_map<std::type_index, std::shared_ptr<void>> sub_contexts_;
std::mutex mutex_;

};

} /* namespace context */
Expand Down
7 changes: 7 additions & 0 deletions rclcpp/include/rclcpp/contexts/default_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ class DefaultContext : public rclcpp::context::Context

};

DefaultContext::SharedPtr
get_global_default_context()
{
static DefaultContext::SharedPtr default_context = DefaultContext::make_shared();
return default_context;
}

} // namespace default_context
} // namespace contexts
} // namespace rclcpp
Expand Down
68 changes: 53 additions & 15 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
#ifndef RCLCPP_RCLCPP_EXECUTOR_HPP_
#define RCLCPP_RCLCPP_EXECUTOR_HPP_

#include <iostream>

#include <algorithm>
#include <cassert>
#include <cstdlib>
#include <iostream>
#include <list>
#include <memory>
#include <vector>

#include <rcl_interfaces/msg/intra_process_message.hpp>

#include <rclcpp/any_executable.hpp>
#include <rclcpp/macros.hpp>
#include <rclcpp/memory_strategy.hpp>
Expand Down Expand Up @@ -159,6 +160,9 @@ class Executor
if (any_exec->subscription) {
execute_subscription(any_exec->subscription);
}
if (any_exec->subscription_intra_process) {
execute_intra_process_subscription(any_exec->subscription_intra_process);
}
if (any_exec->service) {
execute_service(any_exec->service);
}
Expand Down Expand Up @@ -194,6 +198,24 @@ class Executor
subscription->return_message(message);
}

static void
execute_intra_process_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr & subscription)
{
rcl_interfaces::msg::IntraProcessMessage ipm;
bool taken = false;
rmw_ret_t status = rmw_take(subscription->intra_process_subscription_handle_, &ipm, &taken);
if (status == RMW_RET_OK) {
if (taken) {
subscription->handle_intra_process_message(ipm);
}
} else {
fprintf(stderr,
"[rclcpp::error] take failed for intra process subscription on topic '%s': %s\n",
subscription->get_topic_name().c_str(), rmw_get_error_string_safe());
}
}

static void
execute_timer(
rclcpp::timer::TimerBase::SharedPtr & timer)
Expand Down Expand Up @@ -293,22 +315,24 @@ class Executor
}));
}
// Use the number of subscriptions to allocate memory in the handles
size_t number_of_subscriptions = subs.size();
size_t max_number_of_subscriptions = subs.size() * 2; // Times two for intra-process.
rmw_subscriptions_t subscriber_handles;
subscriber_handles.subscriber_count = number_of_subscriptions;
subscriber_handles.subscriber_count = 0;
// TODO(wjwwood): Avoid redundant malloc's
subscriber_handles.subscribers =
memory_strategy_->borrow_handles(HandleType::subscription_handle, number_of_subscriptions);
subscriber_handles.subscribers = memory_strategy_->borrow_handles(
HandleType::subscription_handle, max_number_of_subscriptions);
if (subscriber_handles.subscribers == NULL) {
// TODO(wjwwood): Use a different error here? maybe std::bad_alloc?
throw std::runtime_error("Could not malloc for subscriber pointers.");
}
// Then fill the SubscriberHandles with ready subscriptions
size_t subscriber_handle_index = 0;
for (auto & subscription : subs) {
subscriber_handles.subscribers[subscriber_handle_index] = \
subscriber_handles.subscribers[subscriber_handles.subscriber_count++] =
subscription->subscription_handle_->data;
subscriber_handle_index += 1;
if (subscription->intra_process_subscription_handle_) {
subscriber_handles.subscribers[subscriber_handles.subscriber_count++] =
subscription->intra_process_subscription_handle_->data;
}
}

// Use the number of services to allocate memory in the handles
Expand Down Expand Up @@ -414,7 +438,7 @@ class Executor
}
// Add the new work to the class's list of things waiting to be executed
// Starting with the subscribers
for (size_t i = 0; i < number_of_subscriptions; ++i) {
for (size_t i = 0; i < subscriber_handles.subscriber_count; ++i) {
void * handle = subscriber_handles.subscribers[i];
if (handle) {
subscriber_handles_.push_back(handle);
Expand Down Expand Up @@ -463,13 +487,18 @@ class Executor
}
for (auto & weak_subscription : group->subscription_ptrs_) {
auto subscription = weak_subscription.lock();
if (subscription && subscription->subscription_handle_->data == subscriber_handle) {
return subscription;
if (subscription) {
if (subscription->subscription_handle_->data == subscriber_handle) {
return subscription;
}
if (subscription->intra_process_subscription_handle_->data == subscriber_handle) {
return subscription;
}
}
}
}
}
return rclcpp::subscription::SubscriptionBase::SharedPtr();
return nullptr;
}

rclcpp::service::ServiceBase::SharedPtr
Expand Down Expand Up @@ -653,6 +682,11 @@ class Executor
while (it != subscriber_handles_.end()) {
auto subscription = get_subscription_by_handle(*it);
if (subscription) {
// Figure out if this is for intra-process or not.
bool is_intra_process = false;
if (subscription->intra_process_subscription_handle_) {
is_intra_process = subscription->intra_process_subscription_handle_->data == *it;
}
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_subscription(subscription);
if (!group) {
Expand All @@ -668,7 +702,11 @@ class Executor
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->subscription = subscription;
if (is_intra_process) {
any_exec->subscription_intra_process = subscription;
} else {
any_exec->subscription = subscription;
}
any_exec->callback_group = group;
any_exec->node = get_node_by_group(group);
subscriber_handles_.erase(it++);
Expand Down Expand Up @@ -804,7 +842,7 @@ class Executor
}
// Check the subscriptions to see if there are any that are ready
get_next_subscription(any_exec);
if (any_exec->subscription) {
if (any_exec->subscription || any_exec->subscription_intra_process) {
return any_exec;
}
// Check the services to see if there are any that are ready
Expand Down
Loading

0 comments on commit 7f016cd

Please sign in to comment.