From 9866e115cd7c7a98f4ec86b0415c9e29337b0627 Mon Sep 17 00:00:00 2001 From: Dirk Thomas Date: Wed, 29 Jul 2015 11:35:06 -0700 Subject: [PATCH] fix returning from rmw_wait while any samples have not been taken yet --- rmw_connext_cpp/src/functions.cpp | 60 ++++++++++++++--------- rmw_connext_dynamic_cpp/src/functions.cpp | 57 ++++++++++++--------- 2 files changed, 71 insertions(+), 46 deletions(-) diff --git a/rmw_connext_cpp/src/functions.cpp b/rmw_connext_cpp/src/functions.cpp index aa47a799..d58df4c7 100644 --- a/rmw_connext_cpp/src/functions.cpp +++ b/rmw_connext_cpp/src/functions.cpp @@ -71,6 +71,7 @@ struct ConnextStaticSubscriberInfo { DDSSubscriber * dds_subscriber_; DDSDataReader * topic_reader_; + DDSReadCondition * read_condition_; bool ignore_local_publications; const message_type_support_callbacks_t * callbacks_; }; @@ -584,6 +585,7 @@ rmw_create_subscription(const rmw_node_t * node, DDSTopicDescription * topic_description = nullptr; DDS_DataReaderQos datareader_qos; DDSDataReader * topic_reader = nullptr; + DDSReadCondition * read_condition = nullptr; void * buf = nullptr; ConnextStaticSubscriberInfo * subscriber_info = nullptr; // Begin initializing elements. @@ -663,6 +665,13 @@ rmw_create_subscription(const rmw_node_t * node, goto fail; } + read_condition = topic_reader->create_readcondition( + DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE); + if (!read_condition) { + RMW_SET_ERROR_MSG("failed to create read condition"); + goto fail; + } + // Allocate memory for the ConnextStaticSubscriberInfo object. buf = rmw_allocate(sizeof(ConnextStaticSubscriberInfo)); if (!buf) { @@ -674,6 +683,7 @@ rmw_create_subscription(const rmw_node_t * node, buf = nullptr; // Only free the subscriber_info pointer; don't need the buf pointer anymore. subscriber_info->dds_subscriber_ = dds_subscriber; subscriber_info->topic_reader_ = topic_reader; + subscriber_info->read_condition_ = read_condition; subscriber_info->callbacks_ = callbacks; subscriber_info->ignore_local_publications = ignore_local_publications; @@ -687,6 +697,14 @@ rmw_create_subscription(const rmw_node_t * node, // Assumption: participant is valid. if (dds_subscriber) { if (topic_reader) { + if (read_condition) { + if (topic_reader->delete_readcondition(read_condition) != DDS_RETCODE_OK) { + std::stringstream ss; + ss << "leaking readcondition while handling failure at " << + __FILE__ << ":" << __LINE__ << '\n'; + (std::cerr << ss.str()).flush(); + } + } if (dds_subscriber->delete_datareader(topic_reader) != DDS_RETCODE_OK) { std::stringstream ss; ss << "leaking datareader while handling failure at " << @@ -746,11 +764,22 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) if (dds_subscriber) { auto topic_reader = subscriber_info->topic_reader_; if (topic_reader) { + auto read_condition = subscriber_info->read_condition_; + if (read_condition) { + if (topic_reader->delete_readcondition(read_condition) != DDS_RETCODE_OK) { + RMW_SET_ERROR_MSG("failed to delete readcondition"); + return RMW_RET_ERROR; + } + subscriber_info->read_condition_ = nullptr; + } if (dds_subscriber->delete_datareader(topic_reader) != DDS_RETCODE_OK) { RMW_SET_ERROR_MSG("failed to delete datareader"); return RMW_RET_ERROR; } subscriber_info->topic_reader_ = nullptr; + } else if (subscriber_info->read_condition_) { + RMW_SET_ERROR_MSG("cannot delete readcondition because the datareader is null"); + return RMW_RET_ERROR; } if (participant->delete_subscriber(dds_subscriber) != DDS_RETCODE_OK) { RMW_SET_ERROR_MSG("failed to delete subscriber"); @@ -910,22 +939,12 @@ rmw_wait(rmw_subscriptions_t * subscriptions, RMW_SET_ERROR_MSG("subscriber info handle is null"); return RMW_RET_ERROR; } - DDSDataReader * topic_reader = subscriber_info->topic_reader_; - if (!topic_reader) { - RMW_SET_ERROR_MSG("topic reader handle is null"); - return RMW_RET_ERROR; - } - DDSStatusCondition * condition = topic_reader->get_statuscondition(); - if (!condition) { - RMW_SET_ERROR_MSG("condition handle is null"); - return RMW_RET_ERROR; - } - DDS_ReturnCode_t status = condition->set_enabled_statuses(DDS_DATA_AVAILABLE_STATUS); - if (status != DDS_RETCODE_OK) { - RMW_SET_ERROR_MSG("failed to set enabled statuses"); + DDSReadCondition * read_condition = subscriber_info->read_condition_; + if (!read_condition) { + RMW_SET_ERROR_MSG("read condition handle is null"); return RMW_RET_ERROR; } - status = waitset.attach_condition(condition); + DDS_ReturnCode_t status = waitset.attach_condition(read_condition); if (status != DDS_RETCODE_OK) { RMW_SET_ERROR_MSG("failed to attach condition"); return RMW_RET_ERROR; @@ -1033,21 +1052,16 @@ rmw_wait(rmw_subscriptions_t * subscriptions, RMW_SET_ERROR_MSG("subscriber info handle is null"); return RMW_RET_ERROR; } - DDSDataReader * topic_reader = subscriber_info->topic_reader_; - if (!topic_reader) { - RMW_SET_ERROR_MSG("topic reader handle is null"); - return RMW_RET_ERROR; - } - DDSStatusCondition * condition = topic_reader->get_statuscondition(); - if (!condition) { - RMW_SET_ERROR_MSG("condition handle is null"); + DDSReadCondition * read_condition = subscriber_info->read_condition_; + if (!read_condition) { + RMW_SET_ERROR_MSG("read condition handle is null"); return RMW_RET_ERROR; } // search for subscriber condition in active set DDS_Long j = 0; for (; j < active_conditions.length(); ++j) { - if (active_conditions[j] == condition) { + if (active_conditions[j] == read_condition) { break; } } diff --git a/rmw_connext_dynamic_cpp/src/functions.cpp b/rmw_connext_dynamic_cpp/src/functions.cpp index ebf60849..89aa2fe4 100644 --- a/rmw_connext_dynamic_cpp/src/functions.cpp +++ b/rmw_connext_dynamic_cpp/src/functions.cpp @@ -1211,6 +1211,7 @@ struct CustomSubscriberInfo DDSDynamicDataTypeSupport * dynamic_data_type_support_; DDSDynamicDataReader * dynamic_reader_; DDSDataReader * data_reader_; + DDSReadCondition * read_condition_; DDSSubscriber * dds_subscriber_; bool ignore_local_publications; DDS_TypeCode * type_code_; @@ -1275,6 +1276,7 @@ rmw_create_subscription( DDSTopic * topic; DDSTopicDescription * topic_description = nullptr; DDSDataReader * topic_reader = nullptr; + DDSReadCondition * read_condition = nullptr; DDSDynamicDataReader * dynamic_reader = nullptr; DDS_DataReaderQos datareader_qos; DDS_DynamicData * dynamic_data = nullptr; @@ -1378,6 +1380,13 @@ rmw_create_subscription( goto fail; } + read_condition = topic_reader->create_readcondition( + DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE); + if (!read_condition) { + RMW_SET_ERROR_MSG("failed to create read condition"); + goto fail; + } + dynamic_reader = DDSDynamicDataReader::narrow(topic_reader); if (!dynamic_reader) { RMW_SET_ERROR_MSG("failed to narrow datareader"); @@ -1402,6 +1411,7 @@ rmw_create_subscription( custom_subscriber_info->dynamic_data_type_support_ = ddts; custom_subscriber_info->dynamic_reader_ = dynamic_reader; custom_subscriber_info->data_reader_ = topic_reader; + custom_subscriber_info->read_condition_ = read_condition; custom_subscriber_info->dds_subscriber_ = dds_subscriber; custom_subscriber_info->ignore_local_publications = ignore_local_publications; custom_subscriber_info->type_code_ = type_code; @@ -1432,6 +1442,14 @@ rmw_create_subscription( } } if (topic_reader) { + if (read_condition) { + if (topic_reader->delete_readcondition(read_condition) != DDS_RETCODE_OK) { + std::stringstream ss; + ss << "leaking readcondition while handling failure at " << + __FILE__ << ":" << __LINE__ << '\n'; + (std::cerr << ss.str()).flush(); + } + } if (dds_subscriber) { if (dds_subscriber->delete_datareader(topic_reader) != DDS_RETCODE_OK) { std::stringstream ss; @@ -1555,6 +1573,14 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) if (dds_subscriber) { auto data_reader = custom_subscription_info->data_reader_; if (data_reader) { + auto read_condition = custom_subscription_info->read_condition_; + if (read_condition) { + if (data_reader->delete_readcondition(read_condition) != DDS_RETCODE_OK) { + RMW_SET_ERROR_MSG("failed to delete readcondition"); + return RMW_RET_ERROR; + } + custom_subscription_info->read_condition_ = nullptr; + } if (dds_subscriber->delete_datareader(data_reader) != DDS_RETCODE_OK) { RMW_SET_ERROR_MSG("failed to delete datareader"); return RMW_RET_ERROR; @@ -2161,22 +2187,12 @@ rmw_wait( RMW_SET_ERROR_MSG("subscriber info handle is null"); return RMW_RET_ERROR; } - DDSDynamicDataReader * dynamic_reader = subscriber_info->dynamic_reader_; - if (!dynamic_reader) { - RMW_SET_ERROR_MSG("data reader handle is null"); + DDSReadCondition * read_condition = subscriber_info->read_condition_; + if (!read_condition) { + RMW_SET_ERROR_MSG("read condition handle is null"); return RMW_RET_ERROR; } - DDSStatusCondition * condition = dynamic_reader->get_statuscondition(); - if (!condition) { - RMW_SET_ERROR_MSG("condition handle is null"); - return RMW_RET_ERROR; - } - DDS_ReturnCode_t status = condition->set_enabled_statuses(DDS_DATA_AVAILABLE_STATUS); - if (status != DDS_RETCODE_OK) { - RMW_SET_ERROR_MSG("failed to set enabled statuses"); - return RMW_RET_ERROR; - } - status = waitset.attach_condition(condition); + DDS_ReturnCode_t status = waitset.attach_condition(read_condition); if (status != DDS_RETCODE_OK) { RMW_SET_ERROR_MSG("failed to attach condition"); return RMW_RET_ERROR; @@ -2284,21 +2300,16 @@ rmw_wait( RMW_SET_ERROR_MSG("subscriber info handle is null"); return RMW_RET_ERROR; } - DDSDynamicDataReader * dynamic_reader = subscriber_info->dynamic_reader_; - if (!dynamic_reader) { - RMW_SET_ERROR_MSG("topic reader handle is null"); - return RMW_RET_ERROR; - } - DDSCondition * condition = dynamic_reader->get_statuscondition(); - if (!condition) { - RMW_SET_ERROR_MSG("condition handle is null"); + DDSReadCondition * read_condition = subscriber_info->read_condition_; + if (!read_condition) { + RMW_SET_ERROR_MSG("read condition handle is null"); return RMW_RET_ERROR; } // search for subscriber condition in active set DDS_Long j = 0; for (; j < active_conditions.length(); ++j) { - if (active_conditions[j] == condition) { + if (active_conditions[j] == read_condition) { break; } }