Skip to content

Commit

Permalink
Add functions for waiting for publishers and subscribers
Browse files Browse the repository at this point in the history
These blocking functions are especially useful for tests where we want
to wait for some number of publishers/subscribers to be available
before proceeding with some other checks.

Signed-off-by: Jacob Perron <[email protected]>
  • Loading branch information
jacobperron committed Apr 1, 2021
1 parent b7784eb commit b1dbca9
Show file tree
Hide file tree
Showing 3 changed files with 336 additions and 0 deletions.
90 changes: 90 additions & 0 deletions rcl/include/rcl/graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ extern "C"
#include <rmw/get_topic_names_and_types.h>
#include <rmw/topic_endpoint_info_array.h>

#include "rcutils/time.h"
#include "rcutils/types.h"

#include "rosidl_runtime_c/service_type_support_struct.h"
Expand Down Expand Up @@ -581,6 +582,95 @@ rcl_count_subscribers(
const char * topic_name,
size_t * count);

/// Wait for there to be a specified number of publishers on a given topic.
/**
* The `node` parameter must point to a valid node.
*
* The `allocator` parameter must point to a valid allocator.
*
* The `topic_name` parameter must not be `NULL`, and must not be an empty string.
* It should also follow the topic name rules.
*
* This function blocks and will return when the number of publishers for `topic_name`
* is greater than or equal to the `count` parameter, or the specificed `timeout` is reached.
*
* The `timeout` parameter is in nanoseconds.
* A negative value disables the timeout (i.e. this function to blocks until the number of
* publishers is greater than or equals to `count`).
*
* The `success` parameter must point to a valid bool.
* The `success` parameter is the output for this function and will be set.
*
* <hr>
* Attribute | Adherence
* ------------------ | -------------
* Allocates Memory | Yes
* Thread-Safe | No
* Uses Atomics | No
* Lock-Free | Maybe [1]
* <i>[1] implementation may need to protect the data structure with a lock</i>
*
* \param[in] node the handle to the node being used to query the ROS graph
* \param[in] allocator to allocate space for the rcl_wait_set_t used to wait for graph events
* \param[in] topic_name the name of the topic in question
* \param[in] count number of publishers to wait for
* \param[in] timeout maximum duration to wait for publishers
* \param[out] success `true` if the number of publishers is equal to or greater than count, or
* `false` if a timeout occurred waiting for publishers.
* \return #RCL_RET_OK if there was no errors, or
* \return #RCL_RET_NODE_INVALID if the node is invalid, or
* \return #RCL_RET_INVALID_ARGUMENT if any arguments are invalid, or
* \return #RCL_RET_TIMEOUT if a timeout occurs before the number of publishers is detected, or
* \return #RCL_RET_ERROR if an unspecified error occurred.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_wait_for_publishers(
const rcl_node_t * node,
rcl_allocator_t * allocator,
const char * topic_name,
const size_t count,
rcutils_duration_value_t timeout,
bool * success);

/// Wait for there to be a specified number of subscribers on a given topic.
/**
* \see rcl_wait_for_publishers
*
* <hr>
* Attribute | Adherence
* ------------------ | -------------
* Allocates Memory | Yes
* Thread-Safe | No
* Uses Atomics | No
* Lock-Free | Maybe [1]
* <i>[1] implementation may need to protect the data structure with a lock</i>
*
* \param[in] node the handle to the node being used to query the ROS graph
* \param[in] allocator to allocate space for the rcl_wait_set_t used to wait for graph events
* \param[in] topic_name the name of the topic in question
* \param[in] count number of subscribers to wait for
* \param[in] timeout maximum duration to wait for subscribers
* \param[out] success `true` if the number of subscribers is equal to or greater than count, or
* `false` if a timeout occurred waiting for subscribers.
* \return #RCL_RET_OK if there was no errors, or
* \return #RCL_RET_NODE_INVALID if the node is invalid, or
* \return #RCL_RET_INVALID_ARGUMENT if any arguments are invalid, or
* \return #RCL_RET_TIMEOUT if a timeout occurs before the number of subscribers is detected, or
* \return #RCL_RET_ERROR if an unspecified error occurred.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_wait_for_subscribers(
const rcl_node_t * node,
rcl_allocator_t * allocator,
const char * topic_name,
const size_t count,
rcutils_duration_value_t timeout,
bool * success);

/// Return a list of all publishers to a topic.
/**
* The `node` parameter must point to a valid node.
Expand Down
156 changes: 156 additions & 0 deletions rcl/src/rcl/graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ extern "C"
#include "rcl/graph.h"

#include "rcl/error_handling.h"
#include "rcl/guard_condition.h"
#include "rcl/wait.h"
#include "rcutils/allocator.h"
#include "rcutils/error_handling.h"
#include "rcutils/macros.h"
#include "rcutils/time.h"
#include "rcutils/types.h"
#include "rmw/error_handling.h"
#include "rmw/get_node_info_and_types.h"
Expand Down Expand Up @@ -452,6 +456,158 @@ rcl_count_subscribers(
return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
}

typedef rcl_ret_t (* count_entities_func_t)(
const rcl_node_t * node,
const char * topic_name,
size_t * count);

rcl_ret_t
_rcl_wait_for_entities(
const rcl_node_t * node,
rcl_allocator_t * allocator,
const char * topic_name,
const size_t expected_count,
rcutils_duration_value_t timeout,
bool * success,
count_entities_func_t count_entities_func)
{
if (!rcl_node_is_valid(node)) {
return RCL_RET_NODE_INVALID;
}
RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(topic_name, RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(success, RCL_RET_INVALID_ARGUMENT);

*success = false;

// We can avoid waiting if there are already the expected number of publishers
size_t count = 0u;
rcl_ret_t count_ret = count_entities_func(node, topic_name, &count);
if (count_ret != RCL_RET_OK) {
// Error message already set
return count_ret;
}
if (expected_count <= count) {
*success = true;
return RCL_RET_OK;
}

// Create a wait set and add the node graph guard condition to it
rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
rcl_ret_t ret = rcl_wait_set_init(
&wait_set, 0, 1, 0, 0, 0, 0, node->context, *allocator);
if (ret != RCL_RET_OK) {
// Error message already set
return ret;
}

const rcl_guard_condition_t * guard_condition = rcl_node_get_graph_guard_condition(node);
if (!guard_condition) {
// Error message already set
return RCL_RET_ERROR;
}

// Add it to the wait set
ret = rcl_wait_set_add_guard_condition(&wait_set, guard_condition, NULL);
if (ret != RCL_RET_OK) {
// Error message already set
return ret;
}

// Get current time
rcutils_time_point_value_t start;
rcutils_ret_t time_ret = rcutils_steady_time_now(&start);
if (time_ret != RCUTILS_RET_OK) {
rcutils_error_string_t error = rcutils_get_error_string();
rcutils_reset_error();
RCL_SET_ERROR_MSG(error.str);
return RCL_RET_ERROR;
}

// Wait for expected count or timeout
while (true) {
ret = rcl_wait(&wait_set, timeout);
if (ret != RCL_RET_OK && ret != RCL_RET_TIMEOUT) {
// Error message already set
return ret;
}

// Check count
count_ret = count_entities_func(node, topic_name, &count);
if (count_ret != RCL_RET_OK) {
// Error already set
return count_ret;
}
if (expected_count <= count) {
*success = true;
break;
}

// If we're not waiting indefinitely, compute time remaining
if (timeout >= 0) {
rcutils_time_point_value_t now;
time_ret = rcutils_steady_time_now(&now);
if (time_ret != RCUTILS_RET_OK) {
rcutils_error_string_t error = rcutils_get_error_string();
rcutils_reset_error();
RCL_SET_ERROR_MSG(error.str);
return RCL_RET_ERROR;
}
timeout = timeout - (now - start);
if (timeout <= 0) {
return RCL_RET_TIMEOUT;
}
}

// Clear wait set for next iteration
ret = rcl_wait_set_clear(&wait_set);
if (ret != RCL_RET_OK) {
// Error message already set
return ret;
}
}

return RCL_RET_OK;
}

rcl_ret_t
rcl_wait_for_publishers(
const rcl_node_t * node,
rcl_allocator_t * allocator,
const char * topic_name,
const size_t expected_count,
rcutils_duration_value_t timeout,
bool * success)
{
return _rcl_wait_for_entities(
node,
allocator,
topic_name,
expected_count,
timeout,
success,
rcl_count_publishers);
}

rcl_ret_t
rcl_wait_for_subscribers(
const rcl_node_t * node,
rcl_allocator_t * allocator,
const char * topic_name,
const size_t expected_count,
rcutils_duration_value_t timeout,
bool * success)
{
return _rcl_wait_for_entities(
node,
allocator,
topic_name,
expected_count,
timeout,
success,
rcl_count_subscribers);
}

typedef rmw_ret_t (* get_topic_endpoint_info_func_t)(
const rmw_node_t * node,
rcutils_allocator_t * allocator,
Expand Down
90 changes: 90 additions & 0 deletions rcl/test/rcl/test_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,96 @@ TEST_F(
rcl_reset_error();
}

/* Test the rcl_wait_for_publishers function.
*/
TEST_F(
CLASSNAME(TestGraphFixture, RMW_IMPLEMENTATION),
test_rcl_wait_for_publishers
) {
rcl_ret_t ret;
rcl_node_t zero_node = rcl_get_zero_initialized_node();
rcl_allocator_t zero_allocator = static_cast<rcl_allocator_t>(
rcutils_get_zero_initialized_allocator());
rcl_allocator_t allocator = rcl_get_default_allocator();
const char * topic_name = "/topic_test_rcl_wait_for_publishers";
bool success = false;

// Invalid node
ret = rcl_wait_for_publishers(nullptr, &allocator, topic_name, 1u, 100, &success);
EXPECT_EQ(RCL_RET_NODE_INVALID, ret);
rcl_reset_error();
ret = rcl_wait_for_publishers(&zero_node, &allocator, topic_name, 1u, 100, &success);
EXPECT_EQ(RCL_RET_NODE_INVALID, ret);
rcl_reset_error();
ret = rcl_wait_for_publishers(this->old_node_ptr, &allocator, topic_name, 1u, 100, &success);
EXPECT_EQ(RCL_RET_NODE_INVALID, ret) << rcl_get_error_string().str;
rcl_reset_error();
// Invalid allocator
ret = rcl_wait_for_publishers(this->node_ptr, nullptr, topic_name, 1u, 100, &success);
EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str;
rcl_reset_error();
ret = rcl_wait_for_publishers(this->node_ptr, &zero_allocator, topic_name, 1u, 100, &success);
EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str;
rcl_reset_error();
// Invalid topic name
ret = rcl_wait_for_publishers(this->node_ptr, &allocator, nullptr, 1u, 100, &success);
EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str;
rcl_reset_error();
// Invalid output arg
ret = rcl_wait_for_publishers(this->node_ptr, &allocator, topic_name, 1u, 100, nullptr);
EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str;
rcl_reset_error();
// Valid call (expect timeout since there are no publishers)
ret = rcl_wait_for_publishers(this->node_ptr, &allocator, topic_name, 1u, 100, &success);
EXPECT_EQ(RCL_RET_TIMEOUT, ret) << rcl_get_error_string().str;
rcl_reset_error();
}

/* Test the rcl_wait_for_subscribers function.
*/
TEST_F(
CLASSNAME(TestGraphFixture, RMW_IMPLEMENTATION),
test_rcl_wait_for_subscribers
) {
rcl_ret_t ret;
rcl_node_t zero_node = rcl_get_zero_initialized_node();
rcl_allocator_t zero_allocator = static_cast<rcl_allocator_t>(
rcutils_get_zero_initialized_allocator());
rcl_allocator_t allocator = rcl_get_default_allocator();
const char * topic_name = "/topic_test_rcl_wait_for_subscribers";
bool success = false;

// Invalid node
ret = rcl_wait_for_subscribers(nullptr, &allocator, topic_name, 1u, 100, &success);
EXPECT_EQ(RCL_RET_NODE_INVALID, ret);
rcl_reset_error();
ret = rcl_wait_for_subscribers(&zero_node, &allocator, topic_name, 1u, 100, &success);
EXPECT_EQ(RCL_RET_NODE_INVALID, ret);
rcl_reset_error();
ret = rcl_wait_for_subscribers(this->old_node_ptr, &allocator, topic_name, 1u, 100, &success);
EXPECT_EQ(RCL_RET_NODE_INVALID, ret) << rcl_get_error_string().str;
rcl_reset_error();
// Invalid allocator
ret = rcl_wait_for_subscribers(this->node_ptr, nullptr, topic_name, 1u, 100, &success);
EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str;
rcl_reset_error();
ret = rcl_wait_for_subscribers(this->node_ptr, &zero_allocator, topic_name, 1u, 100, &success);
EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str;
rcl_reset_error();
// Invalid topic name
ret = rcl_wait_for_subscribers(this->node_ptr, &allocator, nullptr, 1u, 100, &success);
EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str;
rcl_reset_error();
// Invalid output arg
ret = rcl_wait_for_subscribers(this->node_ptr, &allocator, topic_name, 1u, 100, nullptr);
EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str;
rcl_reset_error();
// Valid call (expect timeout since there are no subscribers)
ret = rcl_wait_for_subscribers(this->node_ptr, &allocator, topic_name, 1u, 100, &success);
EXPECT_EQ(RCL_RET_TIMEOUT, ret) << rcl_get_error_string().str;
rcl_reset_error();
}

void
check_graph_state(
const rcl_node_t * node_ptr,
Expand Down

0 comments on commit b1dbca9

Please sign in to comment.