diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp index d356d1279ba..e73b1043b6d 100644 --- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp @@ -353,22 +353,14 @@ bool DataReaderHistory::get_first_untaken_info( { std::lock_guard lock(*getMutex()); - CacheChange_t* change = nullptr; - WriterProxy* wp = nullptr; - if (mp_reader->nextUntakenCache(&change, &wp)) - { - auto it = data_available_instances_.find(change->instanceHandle); - assert(it != data_available_instances_.end()); - auto& instance_changes = it->second->cache_changes; - auto item = - std::find_if(instance_changes.cbegin(), instance_changes.cend(), - [change](const DataReaderCacheChange& v) - { - return v == change; - }); - ReadTakeCommand::generate_info(info, *(it->second), *item); - mp_reader->change_read_by_user(change, wp, false); - return true; + for (auto& it : data_available_instances_) + { + auto& instance_changes = it.second->cache_changes; + if (!instance_changes.empty()) + { + ReadTakeCommand::generate_info(info, *(it.second), instance_changes.front()); + return true; + } } return false; diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index 3eb7ad94f6f..5f473ae29e6 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -278,7 +278,8 @@ class PubSubReader PubSubReader( const std::string& topic_name, bool take = true, - bool statistics = false) + bool statistics = false, + bool read = true) : participant_listener_(*this) , listener_(*this) , participant_(nullptr) @@ -293,10 +294,12 @@ class PubSubReader , receiving_(false) , current_processed_count_(0) , number_samples_expected_(0) + , current_unread_count_(0) , discovery_result_(false) , onDiscovery_(nullptr) , onEndpointDiscovery_(nullptr) , take_(take) + , read_(read) , statistics_(statistics) #if HAVE_SECURITY , authorized_(0) @@ -550,6 +553,16 @@ class PubSubReader return current_processed_count_; } + size_t block_for_unread_count_of( + size_t n_unread) + { + block([this, n_unread]() -> bool + { + return current_unread_count_ >= n_unread; + }); + return current_unread_count_; + } + void block( std::function checker) { @@ -1677,6 +1690,14 @@ class PubSubReader type data; eprosima::fastdds::dds::SampleInfo info; + if (!take_ && !read_) + { + current_unread_count_ = datareader->get_unread_count(); + std::cout << "Total unread count " << current_unread_count_ << std::endl; + cv_.notify_one(); + return; + } + ReturnCode_t success = take_ ? datareader->take_next_sample((void*)&data, &info) : datareader->read_next_sample((void*)&data, &info); @@ -1845,6 +1866,7 @@ class PubSubReader std::map last_seq; std::atomic current_processed_count_; std::atomic number_samples_expected_; + std::atomic current_unread_count_; bool discovery_result_; std::string xml_file_ = ""; @@ -1854,9 +1876,12 @@ class PubSubReader std::function onDiscovery_; std::function onEndpointDiscovery_; - //! True to take data from history. False to read + //! True to take data from history. On False, read_ is checked. bool take_; + //! True to read data from history. False, do nothing on data reception. + bool read_; + //! True if the class is called from the statistics blackbox (specific topic name and domain id). bool statistics_; diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index 37427bec878..3ea9e7b7545 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -507,6 +507,7 @@ class PubSubWriter bool send_sample( type& msg) { + default_send_print(msg); return datawriter_->write((void*)&msg); } diff --git a/test/blackbox/common/DDSBlackboxTestsDataReader.cpp b/test/blackbox/common/DDSBlackboxTestsDataReader.cpp index 559c2f05f57..2d092f059cc 100644 --- a/test/blackbox/common/DDSBlackboxTestsDataReader.cpp +++ b/test/blackbox/common/DDSBlackboxTestsDataReader.cpp @@ -159,6 +159,75 @@ TEST_P(DDSDataReader, LivelinessChangedStatusGet) } +// Regression test of Refs #16608, Github #3203. Checks that total_unread_ variable is consistent with +// unread changes in reader's history after performing a get_first_untaken_info() on a change with no writer matched. +TEST_P(DDSDataReader, ConsistentTotalUnreadAfterGetFirstUntakenInfo) +{ + if (enable_datasharing) + { + //! TODO: Datasharing changes the behavior of this test. Changes are + //! instantly removed on removePublisher() call and on the PUBListener callback + GTEST_SKIP() << "Data-sharing removes the changes instantly changing the behavior of this test. Skipping"; + } + + //! Spawn a couple of participants writer/reader + PubSubWriter pubsub_writer(TEST_TOPIC_NAME); + //! Create a reader that does nothing when new data is available. Neither take nor read it. + PubSubReader pubsub_reader(TEST_TOPIC_NAME, false, false, false); + + // Initialization of all the participants + std::cout << "Initializing PubSubs for topic " << TEST_TOPIC_NAME << std::endl; + + //! Participant Writer configuration and qos + pubsub_writer.reliability(eprosima::fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS) + .durability_kind(eprosima::fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS) + .history_kind(eprosima::fastdds::dds::HistoryQosPolicyKind::KEEP_ALL_HISTORY_QOS) + .init(); + ASSERT_EQ(pubsub_writer.isInitialized(), true); + + //! Participant Reader configuration and qos + pubsub_reader.reliability(eprosima::fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS) + .durability_kind(eprosima::fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS) + .history_kind(eprosima::fastdds::dds::HistoryQosPolicyKind::KEEP_ALL_HISTORY_QOS) + .init(); + ASSERT_EQ(pubsub_reader.isInitialized(), true); + + eprosima::fastdds::dds::DataReader& reader = pubsub_reader.get_native_reader(); + eprosima::fastdds::dds::SampleInfo info; + + EXPECT_EQ(ReturnCode_t::RETCODE_NO_DATA, reader.get_first_untaken_info(&info)); + + // Wait for discovery. + pubsub_reader.wait_discovery(); + pubsub_writer.wait_discovery(); + + auto data = default_helloworld_data_generator(); + + pubsub_reader.startReception(data); + + pubsub_writer.send(data); + EXPECT_TRUE(data.empty()); + + pubsub_reader.block_for_unread_count_of(3); + pubsub_writer.removePublisher(); + pubsub_reader.wait_writer_undiscovery(); + + //! Try reading the first untaken info. + //! Checks whether total_unread_ is consistent with + //! the number of unread changes in history + //! This API call should NOT modify the history + EXPECT_EQ(ReturnCode_t::RETCODE_OK, reader.get_first_untaken_info(&info)); + + HelloWorld msg; + eprosima::fastdds::dds::SampleInfo sinfo; + + //! Try getting a sample + auto result = reader.take_next_sample((void*)&msg, &sinfo); + + //! Assert last operation + ASSERT_EQ(result, ReturnCode_t::RETCODE_OK) << "Reader's unread count is: " << reader.get_unread_count(); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else