diff --git a/rosbag2_cpp/CMakeLists.txt b/rosbag2_cpp/CMakeLists.txt index e13d7dd4b8..24faf74ccd 100644 --- a/rosbag2_cpp/CMakeLists.txt +++ b/rosbag2_cpp/CMakeLists.txt @@ -48,6 +48,7 @@ add_library(${PROJECT_NAME} SHARED src/rosbag2_cpp/cache/cache_consumer.cpp src/rosbag2_cpp/cache/message_cache_buffer.cpp src/rosbag2_cpp/cache/message_cache.cpp + src/rosbag2_cpp/clocks/time_controller_clock.cpp src/rosbag2_cpp/converter.cpp src/rosbag2_cpp/info.cpp src/rosbag2_cpp/reader.cpp @@ -206,6 +207,12 @@ if(BUILD_TESTING) ament_target_dependencies(test_multifile_reader rosbag2_storage) target_link_libraries(test_multifile_reader ${PROJECT_NAME}) endif() + + ament_add_gmock(test_time_controller_clock + test/rosbag2_cpp/test_time_controller_clock.cpp) + if(TARGET test_time_controller_clock) + target_link_libraries(test_time_controller_clock ${PROJECT_NAME}) + endif() endif() ament_package() diff --git a/rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp b/rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp new file mode 100644 index 0000000000..78f25f95c3 --- /dev/null +++ b/rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp @@ -0,0 +1,71 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_CPP__CLOCKS__PLAYER_CLOCK_HPP_ +#define ROSBAG2_CPP__CLOCKS__PLAYER_CLOCK_HPP_ + +#include +#include +#include + +#include "rcutils/time.h" +#include "rosbag2_cpp/visibility_control.hpp" + +namespace rosbag2_cpp +{ + +/** + * Virtual interface used to drive the timing of bag playback. + * This clock should be used to query times and sleep between message playing, + * so that the complexity involved around time control and time sources + * is encapsulated in this one place. + */ +class PlayerClock +{ +public: + /** + * Type representing an arbitrary steady time, used to measure real-time durations + * This type is never exposed by the PlayerClock - it is only used as input to the PlayerClock. + */ + typedef std::function NowFunction; + + ROSBAG2_CPP_PUBLIC + virtual ~PlayerClock() = default; + + /** + * Calculate and return current rcutils_time_point_value_t based on starting time, playback rate, pause state. + */ + ROSBAG2_CPP_PUBLIC + virtual rcutils_time_point_value_t now() const = 0; + + /** + * Try to sleep (non-busy) the current thread until the provided time is reached - according to this Clock + * + * Return true if the time has been reached, false if it was not successfully reached after sleeping + * for the appropriate duration. + * The user should not take action based on this sleep until it returns true. + */ + ROSBAG2_CPP_PUBLIC + virtual bool sleep_until(rcutils_time_point_value_t until) = 0; + + /** + * Return the current playback rate. + */ + ROSBAG2_CPP_PUBLIC + virtual double get_rate() const = 0; +}; + +} // namespace rosbag2_cpp + +#endif // ROSBAG2_CPP__CLOCKS__PLAYER_CLOCK_HPP_ diff --git a/rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp b/rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp new file mode 100644 index 0000000000..980e43545f --- /dev/null +++ b/rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp @@ -0,0 +1,83 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_CPP__CLOCKS__TIME_CONTROLLER_CLOCK_HPP_ +#define ROSBAG2_CPP__CLOCKS__TIME_CONTROLLER_CLOCK_HPP_ + +#include +#include + +#include "rosbag2_cpp/clocks/player_clock.hpp" + +namespace rosbag2_cpp +{ + +/** + * Version of the PlayerClock interface that has control over time. + * It does not listen to any external ROS Time Source and can optionally publish /clock + */ +class TimeControllerClockImpl; +class TimeControllerClock : public PlayerClock +{ +public: + /** + * Constructor. + * + * \param starting_time: provides an initial offset for managing time + * This will likely be the timestamp of the first message in the bag + * \param rate: Rate of playback, a unit-free ratio. 1.0 is real-time + * \param now_fn: Function used to get the current steady time + * defaults to std::chrono::steady_clock::now + * Used to control for unit testing, or for specialized needs + * \throws std::runtime_error if rate is <= 0 + */ + ROSBAG2_CPP_PUBLIC + TimeControllerClock( + rcutils_time_point_value_t starting_time, + double rate = 1.0, + NowFunction now_fn = std::chrono::steady_clock::now); + + ROSBAG2_CPP_PUBLIC + virtual ~TimeControllerClock(); + + /** + * Calculate and return current rcutils_time_point_value_t based on starting time, playback rate, pause state. + */ + ROSBAG2_CPP_PUBLIC + rcutils_time_point_value_t now() const override; + + /** + * Try to sleep (non-busy) the current thread until the provided time is reached - according to this Clock + * + * Return true if the time has been reached, false if it was not successfully reached after sleeping + * for the appropriate duration. + * The user should not take action based on this sleep until it returns true. + */ + ROSBAG2_CPP_PUBLIC + bool sleep_until(rcutils_time_point_value_t until) override; + + /** + * Return the current playback rate. + */ + ROSBAG2_CPP_PUBLIC + double get_rate() const override; + +private: + std::unique_ptr impl_; +}; + + +} // namespace rosbag2_cpp + +#endif // ROSBAG2_CPP__CLOCKS__TIME_CONTROLLER_CLOCK_HPP_ diff --git a/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp b/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp new file mode 100644 index 0000000000..dc00bf14d2 --- /dev/null +++ b/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp @@ -0,0 +1,108 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include "rcpputils/thread_safety_annotations.hpp" +#include "rosbag2_cpp/clocks/time_controller_clock.hpp" +#include "rosbag2_cpp/types.hpp" + +namespace rosbag2_cpp +{ + +class TimeControllerClockImpl +{ +public: + /** + * Stores an exact time match between a system steady clock and the playback ROS clock. + * This snapshot is taken whenever a factor changes such that a new reference is needed, + * such as pause, resume, rate change, or jump + */ + struct TimeReference + { + rcutils_time_point_value_t ros; + std::chrono::steady_clock::time_point steady; + }; + + TimeControllerClockImpl() = default; + virtual ~TimeControllerClockImpl() = default; + + template + rcutils_duration_value_t duration_nanos(const T & duration) + { + return std::chrono::duration_cast(duration).count(); + } + + rcutils_time_point_value_t steady_to_ros(std::chrono::steady_clock::time_point steady_time) + { + return reference.ros + static_cast( + rate * duration_nanos(steady_time - reference.steady)); + } + + std::chrono::steady_clock::time_point ros_to_steady(rcutils_time_point_value_t ros_time) + { + const auto diff_nanos = static_cast( + (ros_time - reference.ros) / rate); + return reference.steady + std::chrono::nanoseconds(diff_nanos); + } + + std::mutex mutex; + std::condition_variable cv; + PlayerClock::NowFunction now_fn RCPPUTILS_TSA_GUARDED_BY(mutex); + double rate RCPPUTILS_TSA_GUARDED_BY(mutex) = 1.0; + TimeReference reference RCPPUTILS_TSA_GUARDED_BY(mutex); +}; + +TimeControllerClock::TimeControllerClock( + rcutils_time_point_value_t starting_time, + double rate, + NowFunction now_fn) +: impl_(std::make_unique()) +{ + std::lock_guard lock(impl_->mutex); + impl_->now_fn = now_fn; + impl_->reference.ros = starting_time; + impl_->reference.steady = impl_->now_fn(); + impl_->rate = rate; +} + +TimeControllerClock::~TimeControllerClock() +{} + +rcutils_time_point_value_t TimeControllerClock::now() const +{ + std::lock_guard lock(impl_->mutex); + return impl_->steady_to_ros(impl_->now_fn()); +} + +bool TimeControllerClock::sleep_until(rcutils_time_point_value_t until) +{ + { + std::unique_lock lock(impl_->mutex); + const auto steady_until = impl_->ros_to_steady(until); + impl_->cv.wait_until(lock, steady_until); + } + return now() >= until; +} + +double TimeControllerClock::get_rate() const +{ + std::lock_guard lock(impl_->mutex); + return impl_->rate; +} + +} // namespace rosbag2_cpp diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp new file mode 100644 index 0000000000..9953bc0550 --- /dev/null +++ b/rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp @@ -0,0 +1,114 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "rosbag2_cpp/clocks/time_controller_clock.hpp" + +using namespace testing; // NOLINT +using SteadyTimePoint = std::chrono::steady_clock::time_point; +using NowFunction = rosbag2_cpp::PlayerClock::NowFunction; + +class TimeControllerClockTest : public Test +{ +public: + TimeControllerClockTest() + { + now_fn = [this]() { + return return_time; + }; + } + + rcutils_time_point_value_t as_nanos(const SteadyTimePoint & time) + { + return std::chrono::duration_cast(time.time_since_epoch()).count(); + } + + NowFunction now_fn; + SteadyTimePoint return_time; // defaults to 0 + rcutils_time_point_value_t ros_start_time = 0; +}; + +TEST_F(TimeControllerClockTest, steadytime_precision) +{ + const double playback_rate = 1.0; + rosbag2_cpp::TimeControllerClock pclock(ros_start_time, playback_rate, now_fn); + + const SteadyTimePoint begin_time(std::chrono::seconds(0)); + return_time = begin_time; + EXPECT_EQ(pclock.now(), as_nanos(begin_time)); + + const SteadyTimePoint ten_seconds(std::chrono::seconds(10)); + return_time = ten_seconds; + EXPECT_EQ(pclock.now(), as_nanos(ten_seconds)); + + // NOTE: this would have already lost precision at 100 seconds if we were multiplying by float + const SteadyTimePoint hundred_seconds(std::chrono::seconds(100)); + return_time = hundred_seconds; + EXPECT_EQ(pclock.now(), as_nanos(hundred_seconds)); + + const int64_t near_limit_nanos = 1LL << 61; + const auto near_limit_time = SteadyTimePoint(std::chrono::nanoseconds(near_limit_nanos)); + return_time = near_limit_time; + EXPECT_EQ(pclock.now(), near_limit_nanos); + + // Expect to lose exact equality at this point due to precision limit of double*int mult + const int64_t over_limit_nanos = (1LL << 61) + 1LL; + const auto over_limit_time = SteadyTimePoint(std::chrono::nanoseconds(over_limit_nanos)); + return_time = over_limit_time; + EXPECT_NE(pclock.now(), over_limit_nanos); + EXPECT_LT(std::abs(pclock.now() - over_limit_nanos), 10LL); +} + +TEST_F(TimeControllerClockTest, nonzero_start_time) +{ + ros_start_time = 1234567890LL; + const double playback_rate = 1.0; + rosbag2_cpp::TimeControllerClock pclock(ros_start_time, playback_rate, now_fn); + + const SteadyTimePoint begin_time(std::chrono::seconds(0)); + return_time = begin_time; + EXPECT_EQ(pclock.now(), ros_start_time); + + return_time = SteadyTimePoint(std::chrono::seconds(1)); + EXPECT_EQ(pclock.now(), ros_start_time + as_nanos(return_time)); +} + +TEST_F(TimeControllerClockTest, fast_rate) +{ + const double playback_rate = 2.5; + rosbag2_cpp::TimeControllerClock pclock(ros_start_time, playback_rate, now_fn); + + const SteadyTimePoint begin_time(std::chrono::seconds(0)); + return_time = begin_time; + EXPECT_EQ(pclock.now(), as_nanos(begin_time)); + + const SteadyTimePoint some_time(std::chrono::seconds(3)); + return_time = some_time; + EXPECT_EQ(pclock.now(), as_nanos(some_time) * playback_rate); +} + +TEST_F(TimeControllerClockTest, slow_rate) +{ + const double playback_rate = 0.4; + rosbag2_cpp::TimeControllerClock pclock(ros_start_time, playback_rate, now_fn); + + const SteadyTimePoint begin_time(std::chrono::seconds(0)); + return_time = begin_time; + EXPECT_EQ(pclock.now(), as_nanos(begin_time)); + + const SteadyTimePoint some_time(std::chrono::seconds(12)); + return_time = some_time; + EXPECT_EQ(pclock.now(), as_nanos(some_time) * playback_rate); +} diff --git a/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp b/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp index ffce2bd650..e971e4889f 100644 --- a/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp +++ b/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp @@ -31,6 +31,8 @@ struct SerializedBagMessage std::string topic_name; }; +typedef std::shared_ptr SerializedBagMessageSharedPtr; + } // namespace rosbag2_storage #endif // ROSBAG2_STORAGE__SERIALIZED_BAG_MESSAGE_HPP_ diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 8cf8c962c4..6088cf200b 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -29,6 +29,7 @@ #include "rcutils/time.h" +#include "rosbag2_cpp/clocks/time_controller_clock.hpp" #include "rosbag2_cpp/reader.hpp" #include "rosbag2_cpp/typesupport_helpers.hpp" @@ -38,7 +39,6 @@ #include "qos.hpp" #include "rosbag2_node.hpp" -#include "replayable_message.hpp" namespace { @@ -94,6 +94,17 @@ bool Player::is_storage_completely_loaded() const void Player::play(const PlayOptions & options) { + if (reader_->has_next()) { + // Reader does not have "peek", so we must "pop" the first message to see its timestamp + auto message = reader_->read_next(); + prepare_clock(options, message->time_stamp); + // Make sure that first message gets played by putting it into the play queue + message_queue_.enqueue(message); + } else { + // The bag contains no messages - there is nothing to play + return; + } + topic_qos_profile_overrides_ = options.topic_qos_profile_overrides; prepare_publishers(options); @@ -103,7 +114,7 @@ void Player::play(const PlayOptions & options) wait_for_filled_queue(options); - play_messages_from_queue(options); + play_messages_from_queue(); } void Player::wait_for_filled_queue(const PlayOptions & options) const @@ -118,49 +129,35 @@ void Player::wait_for_filled_queue(const PlayOptions & options) const void Player::load_storage_content(const PlayOptions & options) { - TimePoint time_first_message; - - ReplayableMessage message; - if (reader_->has_next()) { - message.message = reader_->read_next(); - message.time_since_start = std::chrono::nanoseconds(0); - time_first_message = TimePoint(std::chrono::nanoseconds(message.message->time_stamp)); - message_queue_.enqueue(message); - } - auto queue_lower_boundary = static_cast(options.read_ahead_queue_size * read_ahead_lower_bound_percentage_); auto queue_upper_boundary = options.read_ahead_queue_size; while (reader_->has_next() && rclcpp::ok()) { if (message_queue_.size_approx() < queue_lower_boundary) { - enqueue_up_to_boundary(time_first_message, queue_upper_boundary); + enqueue_up_to_boundary(queue_upper_boundary); } else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } } -void Player::enqueue_up_to_boundary(const TimePoint & time_first_message, uint64_t boundary) +void Player::enqueue_up_to_boundary(uint64_t boundary) { - ReplayableMessage message; + rosbag2_storage::SerializedBagMessageSharedPtr message; for (size_t i = message_queue_.size_approx(); i < boundary; i++) { if (!reader_->has_next()) { break; } - message.message = reader_->read_next(); - message.time_since_start = - TimePoint(std::chrono::nanoseconds(message.message->time_stamp)) - time_first_message; - + message = reader_->read_next(); message_queue_.enqueue(message); } } -void Player::play_messages_from_queue(const PlayOptions & options) +void Player::play_messages_from_queue() { - start_time_ = std::chrono::system_clock::now(); do { - play_messages_until_queue_empty(options); + play_messages_until_queue_empty(); if (!is_storage_completely_loaded() && rclcpp::ok()) { ROSBAG2_TRANSPORT_LOG_WARN( "Message queue starved. Messages will be delayed. Consider " @@ -169,24 +166,17 @@ void Player::play_messages_from_queue(const PlayOptions & options) } while (!is_storage_completely_loaded() && rclcpp::ok()); } -void Player::play_messages_until_queue_empty(const PlayOptions & options) +void Player::play_messages_until_queue_empty() { - ReplayableMessage message; - - float rate = 1.0; - // Use rate if in valid range - if (options.rate > 0.0) { - rate = options.rate; - } - + rosbag2_storage::SerializedBagMessageSharedPtr message; while (message_queue_.try_dequeue(message) && rclcpp::ok()) { - std::this_thread::sleep_until( - start_time_ + std::chrono::duration_cast( - 1.0 / rate * message.time_since_start)); + // Do not move on until sleep_until returns true + // It will always sleep, so this is not a tight busy loop on pause + while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)) {} if (rclcpp::ok()) { - auto publisher_iter = publishers_.find(message.message->topic_name); + auto publisher_iter = publishers_.find(message->topic_name); if (publisher_iter != publishers_.end()) { - publisher_iter->second->publish(message.message->serialized_data); + publisher_iter->second->publish(message->serialized_data); } } } @@ -224,4 +214,10 @@ void Player::prepare_publishers(const PlayOptions & options) } } +void Player::prepare_clock(const PlayOptions & options, rcutils_time_point_value_t starting_time) +{ + double rate = options.rate > 0.0 ? options.rate : 1.0; + clock_ = std::make_unique(starting_time, rate); +} + } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/player.hpp b/rosbag2_transport/src/rosbag2_transport/player.hpp index cc7a612ae1..d65a6dd530 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.hpp +++ b/rosbag2_transport/src/rosbag2_transport/player.hpp @@ -26,12 +26,10 @@ #include "rclcpp/qos.hpp" +#include "rosbag2_cpp/clocks/player_clock.hpp" +#include "rosbag2_storage/serialized_bag_message.hpp" #include "rosbag2_transport/play_options.hpp" -#include "replayable_message.hpp" - -using TimePoint = std::chrono::time_point; - namespace rosbag2_cpp { class Reader; @@ -55,21 +53,22 @@ class Player private: void load_storage_content(const PlayOptions & options); bool is_storage_completely_loaded() const; - void enqueue_up_to_boundary(const TimePoint & time_first_message, uint64_t boundary); + void enqueue_up_to_boundary(uint64_t boundary); void wait_for_filled_queue(const PlayOptions & options) const; - void play_messages_from_queue(const PlayOptions & options); - void play_messages_until_queue_empty(const PlayOptions & options); + void play_messages_from_queue(); + void play_messages_until_queue_empty(); void prepare_publishers(const PlayOptions & options); + void prepare_clock(const PlayOptions & options, rcutils_time_point_value_t starting_time); static constexpr double read_ahead_lower_bound_percentage_ = 0.9; static const std::chrono::milliseconds queue_read_wait_period_; std::shared_ptr reader_; - moodycamel::ReaderWriterQueue message_queue_; - std::chrono::time_point start_time_; + moodycamel::ReaderWriterQueue message_queue_; mutable std::future storage_loading_future_; std::shared_ptr rosbag2_transport_; std::unordered_map> publishers_; std::unordered_map topic_qos_profile_overrides_; + std::unique_ptr clock_; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/replayable_message.hpp b/rosbag2_transport/src/rosbag2_transport/replayable_message.hpp deleted file mode 100644 index 70b07f31df..0000000000 --- a/rosbag2_transport/src/rosbag2_transport/replayable_message.hpp +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2018 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef ROSBAG2_TRANSPORT__REPLAYABLE_MESSAGE_HPP_ -#define ROSBAG2_TRANSPORT__REPLAYABLE_MESSAGE_HPP_ - -#include -#include - -#include "rosbag2_storage/serialized_bag_message.hpp" - -namespace rosbag2_transport -{ - -struct ReplayableMessage -{ - std::shared_ptr message; - std::chrono::nanoseconds time_since_start; -}; - -} // namespace rosbag2_transport - -#endif // ROSBAG2_TRANSPORT__REPLAYABLE_MESSAGE_HPP_