diff --git a/rosbag2_cpp/CMakeLists.txt b/rosbag2_cpp/CMakeLists.txt index 208cd1bf00..7f5a13f509 100644 --- a/rosbag2_cpp/CMakeLists.txt +++ b/rosbag2_cpp/CMakeLists.txt @@ -53,6 +53,7 @@ add_library(${PROJECT_NAME} SHARED src/rosbag2_cpp/cache/message_cache.cpp src/rosbag2_cpp/converter.cpp src/rosbag2_cpp/info.cpp + src/rosbag2_cpp/player_clock.cpp src/rosbag2_cpp/reader.cpp src/rosbag2_cpp/readers/sequential_reader.cpp src/rosbag2_cpp/rmw_implemented_serialization_format_converter.cpp @@ -217,6 +218,13 @@ if(BUILD_TESTING) ament_target_dependencies(test_multifile_reader rosbag2_storage) target_link_libraries(test_multifile_reader ${PROJECT_NAME}) endif() + + ament_add_gmock(test_player_clock + test/rosbag2_cpp/test_player_clock.cpp) + if(TARGET test_player_clock) + # ament_target_dependencies(test_player_clock rosbag2_cpp) + target_link_libraries(test_player_clock ${PROJECT_NAME}) + endif() endif() ament_package() diff --git a/rosbag2_cpp/include/rosbag2_cpp/player_clock.hpp b/rosbag2_cpp/include/rosbag2_cpp/player_clock.hpp new file mode 100644 index 0000000000..b61e065fff --- /dev/null +++ b/rosbag2_cpp/include/rosbag2_cpp/player_clock.hpp @@ -0,0 +1,84 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_CPP__PLAYER_CLOCK_HPP_ +#define ROSBAG2_CPP__PLAYER_CLOCK_HPP_ + +#include +#include +#include + +#include "rcutils/time.h" + +namespace rosbag2_cpp +{ +class PlayerClockImpl; + +/** + * Used to control the timing of bag playback. + * This clock should be used to query times and sleep between message playing, + * so that the complexity involved around time control and time sources + * is encapsulated in this one place. + */ +class PlayerClock +{ +public: + typedef rcutils_time_point_value_t PlayerTimePoint; + typedef std::chrono::time_point RealTimePoint; + typedef std::function NowFunction; + + /** + * Constructor. + * + * \param starting_time: provides an initial offset for managing time + * This will likely be the timestamp of the first message in the bag + * \param rate: Rate of playback, a unit-free ratio. 1.0 is real-time + * \param now_fn: Function used to get the current steady time + * defaults to std::chrono::steady_clock::now + * Used to control for unit testing, or for specialized needs + * \throws std::runtime_error if rate is <= 0 + */ + PlayerClock( + PlayerTimePoint starting_time, + double rate = 1.0, + NowFunction now_fn = std::chrono::steady_clock::now); + + virtual ~PlayerClock(); + + /** + * Calculate and return current PlayerTimePoint based on starting time, playback rate, pause state. + */ + PlayerTimePoint now() const; + + /** + * Try to sleep (non-busy) the current thread until the provided time is reached - according to this Clock + * + * Return true if the time has been reached, false if it was not successfully reached after sleeping + * for the appropriate duration. + * The user should not take action based on this sleep until it returns true. + */ + bool sleep_until(PlayerTimePoint until); + + /** + * Return the current playback rate. + */ + float get_rate() const; + +private: + std::unique_ptr impl_; +}; + +} // namespace rosbag2_cpp + +#endif // ROSBAG2_CPP__PLAYER_CLOCK_HPP_ diff --git a/rosbag2_cpp/src/rosbag2_cpp/player_clock.cpp b/rosbag2_cpp/src/rosbag2_cpp/player_clock.cpp new file mode 100644 index 0000000000..b44af6723c --- /dev/null +++ b/rosbag2_cpp/src/rosbag2_cpp/player_clock.cpp @@ -0,0 +1,99 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "rosbag2_cpp/player_clock.hpp" +#include "rosbag2_cpp/types.hpp" + +namespace +{ + +/** + * Stores an exact time match between a system steady clock and the semantic play clock. + * This is created whenever a factor changes such that a new base reference is needed + * such as pause, resume, rate change, or jump + */ +struct TimeSync +{ + rosbag2_cpp::PlayerClock::PlayerTimePoint player_time; + rosbag2_cpp::PlayerClock::RealTimePoint real_time; +}; + +template +rcutils_duration_value_t duration_nanos(const T & duration) +{ + return std::chrono::duration_cast(duration).count(); +} + +} // namespace + +namespace rosbag2_cpp +{ + +class PlayerClockImpl +{ +public: + PlayerClockImpl() = default; + + TimeSync reference; + double rate = 1.0; + PlayerClock::NowFunction now_fn; + std::mutex mutex; +}; + +PlayerClock::PlayerClock(PlayerTimePoint starting_time, double rate, NowFunction now_fn) +: impl_(std::make_unique()) +{ + std::lock_guard lock(impl_->mutex); + impl_->now_fn = now_fn; + impl_->reference.player_time = starting_time; + impl_->reference.real_time = impl_->now_fn(); + impl_->rate = rate; +} + +PlayerClock::~PlayerClock() +{} + +PlayerClock::PlayerTimePoint PlayerClock::now() const +{ + std::lock_guard lock(impl_->mutex); + const auto real_diff = impl_->now_fn() - impl_->reference.real_time; + const int64_t player_diff = duration_nanos(real_diff) * impl_->rate; + return impl_->reference.player_time + player_diff; +} + +bool PlayerClock::sleep_until(PlayerTimePoint until) +{ + RealTimePoint real_until; + { + std::lock_guard lock(impl_->mutex); + const rcutils_duration_value_t diff_nanos = + (until - impl_->reference.player_time) / impl_->rate; + real_until = impl_->reference.real_time + std::chrono::nanoseconds(diff_nanos); + } + // TODO(emersonknapp) - when we have methods that can change timeflow during a sleep, + // it will probably be better to use a condition_variable::wait_until + std::this_thread::sleep_until(real_until); + return now() >= until; +} + +float PlayerClock::get_rate() const +{ + return impl_->rate; +} + +} // namespace rosbag2_cpp diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_player_clock.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_player_clock.cpp new file mode 100644 index 0000000000..d287b9fe6f --- /dev/null +++ b/rosbag2_cpp/test/rosbag2_cpp/test_player_clock.cpp @@ -0,0 +1,110 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "rosbag2_cpp/player_clock.hpp" + +using namespace testing; // NOLINT +using RealTimePoint = rosbag2_cpp::PlayerClock::RealTimePoint; +using PlayerTimePoint = rosbag2_cpp::PlayerClock::PlayerTimePoint; +using NowFunction = rosbag2_cpp::PlayerClock::NowFunction; + +PlayerTimePoint as_nanos(const RealTimePoint & time) +{ + return std::chrono::duration_cast(time.time_since_epoch()).count(); +} + +TEST(PlayerClock, realtime_precision) +{ + RealTimePoint return_time; + NowFunction now_fn = [&return_time]() { + return return_time; + }; + rosbag2_cpp::PlayerClock pclock(0, 1.0, now_fn); + + const RealTimePoint begin_time(std::chrono::seconds(0)); + return_time = begin_time; + EXPECT_EQ(pclock.now(), as_nanos(begin_time)); + + const RealTimePoint ten_seconds(std::chrono::seconds(10)); + return_time = ten_seconds; + EXPECT_EQ(pclock.now(), as_nanos(ten_seconds)); + + // NOTE: this would have already lost precision at 100 seconds if we were multiplying by float + const RealTimePoint hundred_seconds(std::chrono::seconds(100)); + return_time = hundred_seconds; + EXPECT_EQ(pclock.now(), as_nanos(hundred_seconds)); + + const int64_t near_limit_nanos = 1LL << 61; + const auto near_limit_time = RealTimePoint(std::chrono::nanoseconds(near_limit_nanos)); + return_time = near_limit_time; + EXPECT_EQ(pclock.now(), near_limit_nanos); + + // Expect to lose exact equality at this point due to precision limit of double*int mult + const int64_t over_limit_nanos = (1LL << 61) + 1LL; + const auto over_limit_time = RealTimePoint(std::chrono::nanoseconds(over_limit_nanos)); + return_time = over_limit_time; + EXPECT_NE(pclock.now(), over_limit_nanos); + EXPECT_LT(std::abs(pclock.now() - over_limit_nanos), 10LL); +} + +TEST(PlayerClock, nonzero_start_time) +{ + RealTimePoint return_time; + NowFunction now_fn = [&return_time]() { + return return_time; + }; + const PlayerTimePoint start_time = 1234567890LL; + rosbag2_cpp::PlayerClock pclock(start_time, 1.0, now_fn); + + EXPECT_EQ(pclock.now(), start_time); + + return_time = RealTimePoint(std::chrono::seconds(1)); + EXPECT_EQ(pclock.now(), start_time + as_nanos(return_time)); +} + +TEST(PlayerClock, fast_rate) +{ + RealTimePoint return_time; + NowFunction now_fn = [&return_time]() { + return return_time; + }; + rosbag2_cpp::PlayerClock pclock(0, 2.5, now_fn); + + const RealTimePoint begin_time(std::chrono::seconds(0)); + return_time = begin_time; + EXPECT_EQ(pclock.now(), as_nanos(begin_time)); + + const RealTimePoint some_time(std::chrono::seconds(3)); + return_time = some_time; + EXPECT_EQ(pclock.now(), as_nanos(some_time) * 2.5); +} + +TEST(PlayerClock, slow_rate) +{ + RealTimePoint return_time; + NowFunction now_fn = [&return_time]() { + return return_time; + }; + rosbag2_cpp::PlayerClock pclock(0, 0.4, now_fn); + + const RealTimePoint begin_time(std::chrono::seconds(0)); + return_time = begin_time; + EXPECT_EQ(pclock.now(), as_nanos(begin_time)); + + const RealTimePoint some_time(std::chrono::seconds(12)); + return_time = some_time; + EXPECT_EQ(pclock.now(), as_nanos(some_time) * 0.4); +} diff --git a/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp b/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp index ffce2bd650..2831999077 100644 --- a/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp +++ b/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp @@ -31,6 +31,8 @@ struct SerializedBagMessage std::string topic_name; }; +typedef std::shared_ptr SerializedBagMessagePtr; + } // namespace rosbag2_storage #endif // ROSBAG2_STORAGE__SERIALIZED_BAG_MESSAGE_HPP_ diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 8cf8c962c4..a70215ad6f 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -38,7 +38,6 @@ #include "qos.hpp" #include "rosbag2_node.hpp" -#include "replayable_message.hpp" namespace { @@ -94,6 +93,26 @@ bool Player::is_storage_completely_loaded() const void Player::play(const PlayOptions & options) { + // Initialize Clock + { + rosbag2_cpp::PlayerClock::PlayerTimePoint time_first_message; + float rate = 1.0; + + if (options.rate > 0.0) { + rate = options.rate; + } + if (reader_->has_next()) { + auto message = reader_->read_next(); + time_first_message = message->time_stamp; + // Could not peek, so need to enqueue this popped first message to be played. + message_queue_.enqueue(message); + } else { + // There are no messages in the storage at all + return; + } + clock_ = std::make_unique(time_first_message, rate); + } + topic_qos_profile_overrides_ = options.topic_qos_profile_overrides; prepare_publishers(options); @@ -103,7 +122,7 @@ void Player::play(const PlayOptions & options) wait_for_filled_queue(options); - play_messages_from_queue(options); + play_messages_from_queue(); } void Player::wait_for_filled_queue(const PlayOptions & options) const @@ -118,49 +137,35 @@ void Player::wait_for_filled_queue(const PlayOptions & options) const void Player::load_storage_content(const PlayOptions & options) { - TimePoint time_first_message; - - ReplayableMessage message; - if (reader_->has_next()) { - message.message = reader_->read_next(); - message.time_since_start = std::chrono::nanoseconds(0); - time_first_message = TimePoint(std::chrono::nanoseconds(message.message->time_stamp)); - message_queue_.enqueue(message); - } - auto queue_lower_boundary = static_cast(options.read_ahead_queue_size * read_ahead_lower_bound_percentage_); auto queue_upper_boundary = options.read_ahead_queue_size; while (reader_->has_next() && rclcpp::ok()) { if (message_queue_.size_approx() < queue_lower_boundary) { - enqueue_up_to_boundary(time_first_message, queue_upper_boundary); + enqueue_up_to_boundary(queue_upper_boundary); } else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } } -void Player::enqueue_up_to_boundary(const TimePoint & time_first_message, uint64_t boundary) +void Player::enqueue_up_to_boundary(uint64_t boundary) { - ReplayableMessage message; + rosbag2_storage::SerializedBagMessagePtr message; for (size_t i = message_queue_.size_approx(); i < boundary; i++) { if (!reader_->has_next()) { break; } - message.message = reader_->read_next(); - message.time_since_start = - TimePoint(std::chrono::nanoseconds(message.message->time_stamp)) - time_first_message; - + message = reader_->read_next(); message_queue_.enqueue(message); } } -void Player::play_messages_from_queue(const PlayOptions & options) +void Player::play_messages_from_queue() { - start_time_ = std::chrono::system_clock::now(); do { - play_messages_until_queue_empty(options); + play_messages_until_queue_empty(); if (!is_storage_completely_loaded() && rclcpp::ok()) { ROSBAG2_TRANSPORT_LOG_WARN( "Message queue starved. Messages will be delayed. Consider " @@ -169,24 +174,15 @@ void Player::play_messages_from_queue(const PlayOptions & options) } while (!is_storage_completely_loaded() && rclcpp::ok()); } -void Player::play_messages_until_queue_empty(const PlayOptions & options) +void Player::play_messages_until_queue_empty() { - ReplayableMessage message; - - float rate = 1.0; - // Use rate if in valid range - if (options.rate > 0.0) { - rate = options.rate; - } - + rosbag2_storage::SerializedBagMessagePtr message; while (message_queue_.try_dequeue(message) && rclcpp::ok()) { - std::this_thread::sleep_until( - start_time_ + std::chrono::duration_cast( - 1.0 / rate * message.time_since_start)); + clock_->sleep_until(message->time_stamp); if (rclcpp::ok()) { - auto publisher_iter = publishers_.find(message.message->topic_name); + auto publisher_iter = publishers_.find(message->topic_name); if (publisher_iter != publishers_.end()) { - publisher_iter->second->publish(message.message->serialized_data); + publisher_iter->second->publish(message->serialized_data); } } } diff --git a/rosbag2_transport/src/rosbag2_transport/player.hpp b/rosbag2_transport/src/rosbag2_transport/player.hpp index cc7a612ae1..b6bdd348c2 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.hpp +++ b/rosbag2_transport/src/rosbag2_transport/player.hpp @@ -26,12 +26,10 @@ #include "rclcpp/qos.hpp" +#include "rosbag2_cpp/player_clock.hpp" +#include "rosbag2_storage/serialized_bag_message.hpp" #include "rosbag2_transport/play_options.hpp" -#include "replayable_message.hpp" - -using TimePoint = std::chrono::time_point; - namespace rosbag2_cpp { class Reader; @@ -55,21 +53,21 @@ class Player private: void load_storage_content(const PlayOptions & options); bool is_storage_completely_loaded() const; - void enqueue_up_to_boundary(const TimePoint & time_first_message, uint64_t boundary); + void enqueue_up_to_boundary(uint64_t boundary); void wait_for_filled_queue(const PlayOptions & options) const; - void play_messages_from_queue(const PlayOptions & options); - void play_messages_until_queue_empty(const PlayOptions & options); + void play_messages_from_queue(); + void play_messages_until_queue_empty(); void prepare_publishers(const PlayOptions & options); static constexpr double read_ahead_lower_bound_percentage_ = 0.9; static const std::chrono::milliseconds queue_read_wait_period_; std::shared_ptr reader_; - moodycamel::ReaderWriterQueue message_queue_; - std::chrono::time_point start_time_; + moodycamel::ReaderWriterQueue message_queue_; mutable std::future storage_loading_future_; std::shared_ptr rosbag2_transport_; std::unordered_map> publishers_; std::unordered_map topic_qos_profile_overrides_; + std::unique_ptr clock_; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/replayable_message.hpp b/rosbag2_transport/src/rosbag2_transport/replayable_message.hpp deleted file mode 100644 index 70b07f31df..0000000000 --- a/rosbag2_transport/src/rosbag2_transport/replayable_message.hpp +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2018 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef ROSBAG2_TRANSPORT__REPLAYABLE_MESSAGE_HPP_ -#define ROSBAG2_TRANSPORT__REPLAYABLE_MESSAGE_HPP_ - -#include -#include - -#include "rosbag2_storage/serialized_bag_message.hpp" - -namespace rosbag2_transport -{ - -struct ReplayableMessage -{ - std::shared_ptr message; - std::chrono::nanoseconds time_since_start; -}; - -} // namespace rosbag2_transport - -#endif // ROSBAG2_TRANSPORT__REPLAYABLE_MESSAGE_HPP_