Skip to content

Commit

Permalink
Merge pull request ros2#10 from mauropasse/mauro/pr-events-executor
Browse files Browse the repository at this point in the history
Rename set_events_executor_callback->set_listener_callback
  • Loading branch information
iRobot ROS authored Nov 20, 2020
2 parents 54fea9c + 7bd43b2 commit fbc75da
Show file tree
Hide file tree
Showing 19 changed files with 162 additions and 162 deletions.
10 changes: 5 additions & 5 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,14 +334,14 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client)
}

rmw_ret_t
rmw_client_set_events_executor_callback(
const void * executor_context,
EventsExecutorCallback callback,
rmw_client_set_listener_callback(
const void * callback_context,
rmw_listener_cb_t callback,
const void * client_handle,
rmw_client_t * rmw_client)
{
return rmw_fastrtps_shared_cpp::__rmw_client_set_events_executor_callback(
executor_context,
return rmw_fastrtps_shared_cpp::__rmw_client_set_listener_callback(
callback_context,
callback,
client_handle,
rmw_client);
Expand Down
10 changes: 5 additions & 5 deletions rmw_fastrtps_cpp/src/rmw_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ rmw_subscription_event_init(
}

rmw_ret_t
rmw_event_set_events_executor_callback(
const void * executor_context,
EventsExecutorCallback callback,
rmw_event_set_listener_callback(
const void * callback_context,
rmw_listener_cb_t callback,
const void * waitable_handle,
rmw_event_t * rmw_event,
bool use_previous_events)
{
return rmw_fastrtps_shared_cpp::__rmw_event_set_events_executor_callback(
executor_context,
return rmw_fastrtps_shared_cpp::__rmw_event_set_listener_callback(
callback_context,
callback,
waitable_handle,
rmw_event,
Expand Down
10 changes: 5 additions & 5 deletions rmw_fastrtps_cpp/src/rmw_guard_condition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ rmw_destroy_guard_condition(rmw_guard_condition_t * guard_condition)
}

rmw_ret_t
rmw_guard_condition_set_events_executor_callback(
const void * executor_context,
EventsExecutorCallback callback,
rmw_guard_condition_set_listener_callback(
const void * callback_context,
rmw_listener_cb_t callback,
const void * guard_condition_handle,
rmw_guard_condition_t * rmw_guard_condition,
bool use_previous_events)
{
return rmw_fastrtps_shared_cpp::__rmw_guard_condition_set_events_executor_callback(
executor_context,
return rmw_fastrtps_shared_cpp::__rmw_guard_condition_set_listener_callback(
callback_context,
callback,
guard_condition_handle,
rmw_guard_condition,
Expand Down
10 changes: 5 additions & 5 deletions rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,14 +340,14 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service)
}

rmw_ret_t
rmw_service_set_events_executor_callback(
const void * executor_context,
EventsExecutorCallback callback,
rmw_service_set_listener_callback(
const void * callback_context,
rmw_listener_cb_t callback,
const void * service_handle,
rmw_service_t * rmw_service)
{
return rmw_fastrtps_shared_cpp::__rmw_service_set_events_executor_callback(
executor_context,
return rmw_fastrtps_shared_cpp::__rmw_service_set_listener_callback(
callback_context,
callback,
service_handle,
rmw_service);
Expand Down
10 changes: 5 additions & 5 deletions rmw_fastrtps_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,14 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
}

rmw_ret_t
rmw_subscription_set_events_executor_callback(
const void * executor_context,
EventsExecutorCallback callback,
rmw_subscription_set_listener_callback(
const void * callback_context,
rmw_listener_cb_t callback,
const void * subscription_handle,
rmw_subscription_t * rmw_subscription)
{
return rmw_fastrtps_shared_cpp::__rmw_subscription_set_events_executor_callback(
executor_context,
return rmw_fastrtps_shared_cpp::__rmw_subscription_set_listener_callback(
callback_context,
callback,
subscription_handle,
rmw_subscription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

#include "rcpputils/thread_safety_annotations.hpp"

#include "rmw/executor_event_types.h"
#include "rmw/listener_event_types.h"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

Expand Down Expand Up @@ -111,10 +111,10 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
}

// Add the client event to the event queue
std::unique_lock<std::mutex> lock_mutex(executor_callback_mutex_);
std::unique_lock<std::mutex> lock_mutex(listener_callback_mutex_);

if(executor_callback_) {
executor_callback_(executor_context_, { client_handle_, CLIENT_EVENT });
if(listener_callback_) {
listener_callback_(callback_context_, { client_handle_, CLIENT_EVENT });
} else {
unread_count_++;
}
Expand Down Expand Up @@ -179,28 +179,28 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
// new event from this listener has ocurred
void
clientSetExecutorCallback(
const void * executor_context,
EventsExecutorCallback callback,
const void * callback_context,
rmw_listener_cb_t callback,
const void * client_handle)
{
std::unique_lock<std::mutex> lock_mutex(executor_callback_mutex_);
std::unique_lock<std::mutex> lock_mutex(listener_callback_mutex_);

if(executor_context && client_handle && callback)
if(callback_context && client_handle && callback)
{
executor_context_ = executor_context;
executor_callback_ = callback;
callback_context_ = callback_context;
listener_callback_ = callback;
client_handle_ = client_handle;
} else {
// Unset callback: If any of the pointers is NULL, do not use callback.
executor_context_ = nullptr;
executor_callback_ = nullptr;
callback_context_ = nullptr;
listener_callback_ = nullptr;
client_handle_ = nullptr;
return;
}

// Push events arrived before setting the the executor callback
for(uint64_t i = 0; i < unread_count_; i++) {
executor_callback_(executor_context_, { client_handle_, CLIENT_EVENT });
listener_callback_(callback_context_, { client_handle_, CLIENT_EVENT });
}

// Reset unread count
Expand All @@ -227,10 +227,10 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::set<eprosima::fastrtps::rtps::GUID_t> publishers_;

EventsExecutorCallback executor_callback_{nullptr};
rmw_listener_cb_t listener_callback_{nullptr};
const void * client_handle_{nullptr};
const void * executor_context_{nullptr};
std::mutex executor_callback_mutex_;
const void * callback_context_{nullptr};
std::mutex listener_callback_mutex_;
uint64_t unread_count_ = 0;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
#include "fastrtps/publisher/PublisherListener.h"

#include "rmw/event.h"
#include "rmw/executor_event_types.h"
#include "rmw/listener_event_types.h"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

Expand Down Expand Up @@ -70,16 +70,16 @@ class EventListenerInterface
// Provide handlers to perform an action when a
// new event from this listener has ocurred
virtual void eventSetExecutorCallback(
const void * executor_context,
EventsExecutorCallback callback,
const void * callback_context,
rmw_listener_cb_t callback,
const void * waitable_handle,
bool use_previous_events) = 0;

EventsExecutorCallback executor_callback_{nullptr};
const void * executor_context_{nullptr};
rmw_listener_cb_t listener_callback_{nullptr};
const void * callback_context_{nullptr};
const void * waitable_handle_{nullptr};
uint64_t unread_events_count_ = 0;
std::mutex executor_callback_mutex_;
std::mutex listener_callback_mutex_;
};

class EventListenerInterface::ConditionalScopedLock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ class PubListener : public EventListenerInterface, public eprosima::fastrtps::Pu

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void eventSetExecutorCallback(
const void * executor_context,
EventsExecutorCallback callback,
const void * callback_context,
rmw_listener_cb_t callback,
const void * waitable_handle,
bool use_previous_events) final;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

#include "rcpputils/thread_safety_annotations.hpp"

#include "rmw/executor_event_types.h"
#include "rmw/listener_event_types.h"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"
#include "rmw_fastrtps_shared_cpp/guid_utils.hpp"
Expand Down Expand Up @@ -114,10 +114,10 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
}

// Add the service to the event queue
std::unique_lock<std::mutex> lock_mutex(executor_callback_mutex_);
std::unique_lock<std::mutex> lock_mutex(listener_callback_mutex_);

if(executor_callback_) {
executor_callback_(executor_context_, { service_handle_, SERVICE_EVENT });
if(listener_callback_) {
listener_callback_(callback_context_, { service_handle_, SERVICE_EVENT });
} else {
unread_count_++;
}
Expand Down Expand Up @@ -175,28 +175,28 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
// new event from this listener has ocurred
void
serviceSetExecutorCallback(
const void * executor_context,
EventsExecutorCallback callback,
const void * callback_context,
rmw_listener_cb_t callback,
const void * service_handle)
{
std::unique_lock<std::mutex> lock_mutex(executor_callback_mutex_);
std::unique_lock<std::mutex> lock_mutex(listener_callback_mutex_);

if(executor_context && service_handle && callback)
if(callback_context && service_handle && callback)
{
executor_context_ = executor_context;
executor_callback_ = callback;
callback_context_ = callback_context;
listener_callback_ = callback;
service_handle_ = service_handle;
} else {
// Unset callback: If any of the pointers is NULL, do not use callback.
executor_context_ = nullptr;
executor_callback_ = nullptr;
callback_context_ = nullptr;
listener_callback_ = nullptr;
service_handle_ = nullptr;
return;
}

// Push events arrived before setting the the executor callback
for(uint64_t i = 0; i < unread_count_; i++) {
executor_callback_(executor_context_, { service_handle_, SERVICE_EVENT });
listener_callback_(callback_context_, { service_handle_, SERVICE_EVENT });
}

// Reset unread count
Expand All @@ -211,10 +211,10 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

EventsExecutorCallback executor_callback_{nullptr};
rmw_listener_cb_t listener_callback_{nullptr};
const void * service_handle_{nullptr};
const void * executor_context_{nullptr};
std::mutex executor_callback_mutex_;
const void * callback_context_{nullptr};
std::mutex listener_callback_mutex_;
uint64_t unread_count_ = 0;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

#include "rcpputils/thread_safety_annotations.hpp"

#include "rmw/executor_event_types.h"
#include "rmw/impl/cpp/macros.hpp"
#include "rmw/listener_event_types.h"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"
#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp"
Expand Down Expand Up @@ -85,10 +85,10 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su
onNewDataMessage(eprosima::fastrtps::Subscriber * sub) final
{
// Callback: add the subscription event to the event queue
std::unique_lock<std::mutex> lock_mutex(executor_callback_mutex_);
std::unique_lock<std::mutex> lock_mutex(listener_callback_mutex_);

if(executor_callback_) {
executor_callback_(executor_context_, { subscription_handle_, SUBSCRIPTION_EVENT });
if(listener_callback_) {
listener_callback_(callback_context_, { subscription_handle_, SUBSCRIPTION_EVENT });
} else {
update_unread_count(sub);
new_data_unread_count_++;
Expand All @@ -114,8 +114,8 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void eventSetExecutorCallback(
const void * executor_context,
EventsExecutorCallback callback,
const void * callback_context,
rmw_listener_cb_t callback,
const void * waitable_handle,
bool use_previous_events) final;

Expand Down Expand Up @@ -172,28 +172,28 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su
// new event from this listener has ocurred
void
subcriptionSetExecutorCallback(
const void * executor_context,
EventsExecutorCallback callback,
const void * callback_context,
rmw_listener_cb_t callback,
const void * subscription_handle)
{
std::unique_lock<std::mutex> lock_mutex(executor_callback_mutex_);
std::unique_lock<std::mutex> lock_mutex(listener_callback_mutex_);

if(executor_context && subscription_handle && callback)
if(callback_context && subscription_handle && callback)
{
executor_context_ = executor_context;
executor_callback_ = callback;
callback_context_ = callback_context;
listener_callback_ = callback;
subscription_handle_ = subscription_handle;
} else {
// Unset callback: If any of the pointers is NULL, do not use callback.
executor_context_ = nullptr;
executor_callback_ = nullptr;
callback_context_ = nullptr;
listener_callback_ = nullptr;
subscription_handle_ = nullptr;
return;
}

// Push events arrived before setting the executor's callback
for(uint64_t i = 0; i < new_data_unread_count_; i++) {
executor_callback_(executor_context_, { subscription_handle_, SUBSCRIPTION_EVENT });
listener_callback_(callback_context_, { subscription_handle_, SUBSCRIPTION_EVENT });
}

// Reset unread count
Expand All @@ -218,7 +218,7 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su

std::set<eprosima::fastrtps::rtps::GUID_t> publishers_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

ExecutorEvent subscription_event_{nullptr, SUBSCRIPTION_EVENT};
rmw_listener_event_t subscription_event_{nullptr, SUBSCRIPTION_EVENT};
const void * subscription_handle_;
uint64_t new_data_unread_count_ = 0;
};
Expand Down
Loading

0 comments on commit fbc75da

Please sign in to comment.