Skip to content

Commit

Permalink
Restructure cache interfaces and cleanup cache implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Cameron Miller <[email protected]>
  • Loading branch information
Cameron Miller committed Sep 8, 2021
1 parent d38d470 commit 5fecccc
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_CPP__CACHE_INTERFACES__BASE_CACHE_BUFFER_INTERFACE_HPP_
#define ROSBAG2_CPP__CACHE_INTERFACES__BASE_CACHE_BUFFER_INTERFACE_HPP_
#ifndef ROSBAG2_CPP__CACHE__CACHE_BUFFER_INTERFACE_HPP_
#define ROSBAG2_CPP__CACHE__CACHE_BUFFER_INTERFACE_HPP_

#include <memory>
#include <vector>
Expand All @@ -23,14 +23,13 @@

namespace rosbag2_cpp
{
using buffer_element_t = std::shared_ptr<const rosbag2_storage::SerializedBagMessage>;
namespace cache_interfaces
namespace cache
{

class ROSBAG2_CPP_PUBLIC BaseCacheBufferInterface
using buffer_element_t = std::shared_ptr<const rosbag2_storage::SerializedBagMessage>;
class ROSBAG2_CPP_PUBLIC CacheBufferInterface
{
public:
virtual ~BaseCacheBufferInterface() {}
virtual ~CacheBufferInterface() {}

virtual bool push(buffer_element_t msg) = 0;

Expand All @@ -41,7 +40,7 @@ class ROSBAG2_CPP_PUBLIC BaseCacheBufferInterface
virtual const std::vector<buffer_element_t> & data() = 0;
};

} // namespace cache_interfaces
} // namespace cache
} // namespace rosbag2_cpp

#endif // ROSBAG2_CPP__CACHE_INTERFACES__BASE_CACHE_BUFFER_INTERFACE_HPP_
#endif // ROSBAG2_CPP__CACHE__CACHE_BUFFER_INTERFACE_HPP_
8 changes: 4 additions & 4 deletions rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <vector>

#include "rosbag2_cpp/cache/message_cache.hpp"
#include "rosbag2_cpp/cache_interfaces/base_message_cache_interface.hpp"
#include "rosbag2_cpp/cache/message_cache_interface.hpp"

// This is necessary because of using stl types here. It is completely safe, because
// a) the member is not accessible from the outside
Expand Down Expand Up @@ -62,10 +62,10 @@ class ROSBAG2_CPP_PUBLIC CacheConsumer
{
public:
using consume_callback_function_t = std::function<void (const
std::vector<rosbag2_cpp::buffer_element_t> &)>;
std::vector<buffer_element_t> &)>;

CacheConsumer(
std::shared_ptr<rosbag2_cpp::cache_interfaces::BaseMessageCacheInterface> message_cache,
std::shared_ptr<MessageCacheInterface> message_cache,
consume_callback_function_t consume_callback);

~CacheConsumer();
Expand All @@ -77,7 +77,7 @@ class ROSBAG2_CPP_PUBLIC CacheConsumer
void change_consume_callback(consume_callback_function_t callback);

private:
std::shared_ptr<rosbag2_cpp::cache_interfaces::BaseMessageCacheInterface> message_cache_;
std::shared_ptr<MessageCacheInterface> message_cache_;
consume_callback_function_t consume_callback_;

/// Write buffer data to a storage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#include <string>

#include "rosbag2_cpp/cache/message_cache_circular_buffer.hpp"
#include "rosbag2_cpp/cache_interfaces/base_message_cache_interface.hpp"
#include "rosbag2_cpp/cache_interfaces/base_cache_buffer_interface.hpp"
#include "rosbag2_cpp/cache/message_cache_interface.hpp"
#include "rosbag2_cpp/cache/cache_buffer_interface.hpp"
#include "rosbag2_cpp/visibility_control.hpp"

#include "rosbag2_storage/serialized_bag_message.hpp"
Expand All @@ -40,7 +40,7 @@ namespace cache
{

class ROSBAG2_CPP_PUBLIC CircularMessageCache
: public rosbag2_cpp::cache_interfaces::BaseMessageCacheInterface
: public MessageCacheInterface
{
public:
explicit CircularMessageCache(uint64_t max_buffer_size);
Expand All @@ -49,8 +49,7 @@ class ROSBAG2_CPP_PUBLIC CircularMessageCache
void push(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg) override;

/// get current buffer to consume
std::shared_ptr<rosbag2_cpp::cache_interfaces::BaseCacheBufferInterface>
consumer_buffer() override;
std::shared_ptr<CacheBufferInterface> consumer_buffer() override;

/// Swap the primary and secondary buffer before consumption.
/// NOTE: consumer_buffer() should be called sequentially after
Expand Down
9 changes: 4 additions & 5 deletions rosbag2_cpp/include/rosbag2_cpp/cache/message_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
#include <vector>

#include "rosbag2_cpp/cache/message_cache_buffer.hpp"
#include "rosbag2_cpp/cache_interfaces/base_message_cache_interface.hpp"
#include "rosbag2_cpp/cache_interfaces/base_cache_buffer_interface.hpp"
#include "rosbag2_cpp/cache/message_cache_interface.hpp"
#include "rosbag2_cpp/cache/cache_buffer_interface.hpp"
#include "rosbag2_cpp/visibility_control.hpp"

#include "rosbag2_storage/serialized_bag_message.hpp"
Expand Down Expand Up @@ -62,7 +62,7 @@ namespace cache
* performance issues, most likely with the CacheConsumer consumer callback.
*/
class ROSBAG2_CPP_PUBLIC MessageCache
: public rosbag2_cpp::cache_interfaces::BaseMessageCacheInterface
: public MessageCacheInterface
{
public:
explicit MessageCache(uint64_t max_buffer_size);
Expand Down Expand Up @@ -91,8 +91,7 @@ class ROSBAG2_CPP_PUBLIC MessageCache
void swap_buffers() override;

/// Consumer API: get current buffer to consume
std::shared_ptr<rosbag2_cpp::cache_interfaces::BaseCacheBufferInterface>
consumer_buffer() override;
std::shared_ptr<CacheBufferInterface> consumer_buffer() override;

/// Exposes counts of messages dropped per topic
std::unordered_map<std::string, uint32_t> messages_dropped() const;
Expand Down
10 changes: 5 additions & 5 deletions rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include <vector>

#include "rosbag2_cpp/visibility_control.hpp"
#include "rosbag2_cpp/cache_interfaces/base_cache_buffer_interface.hpp"
#include "rosbag2_cpp/cache/cache_buffer_interface.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"

// This is necessary because of using stl types here. It is completely safe, because
Expand All @@ -46,7 +46,7 @@ namespace cache
* ->byte_size() - like interface
*/
class ROSBAG2_CPP_PUBLIC MessageCacheBuffer
: public rosbag2_cpp::cache_interfaces::BaseCacheBufferInterface
: public CacheBufferInterface
{
public:
explicit MessageCacheBuffer(const uint64_t max_cache_size);
Expand All @@ -56,7 +56,7 @@ class ROSBAG2_CPP_PUBLIC MessageCacheBuffer
* this results in exceeding buffer size, we mark buffer to drop all new incoming messages.
* This flag is cleared when buffers are swapped.
*/
bool push(rosbag2_cpp::buffer_element_t msg) override;
bool push(buffer_element_t msg) override;

/// Clear buffer
void clear() override;
Expand All @@ -65,10 +65,10 @@ class ROSBAG2_CPP_PUBLIC MessageCacheBuffer
size_t size() override;

/// Get buffer data
const std::vector<rosbag2_cpp::buffer_element_t> & data() override;
const std::vector<buffer_element_t> & data() override;

private:
std::vector<rosbag2_cpp::buffer_element_t> buffer_;
std::vector<buffer_element_t> buffer_;
uint64_t buffer_bytes_size_ {0u};
const uint64_t max_bytes_size_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include <vector>

#include "rosbag2_cpp/visibility_control.hpp"
#include "rosbag2_cpp/cache_interfaces/base_cache_buffer_interface.hpp"
#include "rosbag2_cpp/cache/cache_buffer_interface.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"

// This is necessary because of using stl types here. It is completely safe, because
Expand All @@ -46,7 +46,7 @@ namespace cache
* size.
*/
class ROSBAG2_CPP_PUBLIC MessageCacheCircularBuffer
: public rosbag2_cpp::cache_interfaces::BaseCacheBufferInterface
: public CacheBufferInterface
{
public:
// Delete default constructor since max_cache_size is required
Expand All @@ -57,7 +57,7 @@ class ROSBAG2_CPP_PUBLIC MessageCacheCircularBuffer
* If buffer size has some space left, we push the message regardless of its size,
* but if this results in exceeding buffer size, we begin dropping old messages.
*/
bool push(rosbag2_cpp::buffer_element_t msg) override;
bool push(buffer_element_t msg) override;

/// Clear buffer
void clear() override;
Expand All @@ -66,11 +66,11 @@ class ROSBAG2_CPP_PUBLIC MessageCacheCircularBuffer
size_t size() override;

/// Get buffer data
const std::vector<rosbag2_cpp::buffer_element_t> & data() override;
const std::vector<buffer_element_t> & data() override;

private:
std::deque<rosbag2_cpp::buffer_element_t> buffer_;
std::vector<rosbag2_cpp::buffer_element_t> msg_vector_;
std::deque<buffer_element_t> buffer_;
std::vector<buffer_element_t> msg_vector_;
uint64_t buffer_bytes_size_ {0u};
const uint64_t max_bytes_size_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,28 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_CPP__CACHE_INTERFACES__BASE_MESSAGE_CACHE_INTERFACE_HPP_
#define ROSBAG2_CPP__CACHE_INTERFACES__BASE_MESSAGE_CACHE_INTERFACE_HPP_
#ifndef ROSBAG2_CPP__CACHE__MESSAGE_CACHE_INTERFACE_HPP_
#define ROSBAG2_CPP__CACHE__MESSAGE_CACHE_INTERFACE_HPP_

#include <memory>

#include "rosbag2_cpp/visibility_control.hpp"
#include "rosbag2_cpp/cache_interfaces/base_cache_buffer_interface.hpp"
#include "rosbag2_cpp/cache/cache_buffer_interface.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"

namespace rosbag2_cpp
{
namespace cache_interfaces
namespace cache
{

class ROSBAG2_CPP_PUBLIC BaseMessageCacheInterface
class ROSBAG2_CPP_PUBLIC MessageCacheInterface
{
public:
virtual ~BaseMessageCacheInterface() {}
virtual ~MessageCacheInterface() {}

virtual void push(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg) = 0;

virtual std::shared_ptr<rosbag2_cpp::cache_interfaces::BaseCacheBufferInterface>
consumer_buffer() = 0;
virtual std::shared_ptr<CacheBufferInterface> consumer_buffer() = 0;

virtual void swap_buffers() = 0;

Expand All @@ -45,7 +44,7 @@ class ROSBAG2_CPP_PUBLIC BaseMessageCacheInterface
virtual void finalize() {}
};

} // namespace cache_interfaces
} // namespace cache
} // namespace rosbag2_cpp

#endif // ROSBAG2_CPP__CACHE_INTERFACES__BASE_MESSAGE_CACHE_INTERFACE_HPP_
#endif // ROSBAG2_CPP__CACHE__MESSAGE_CACHE_INTERFACE_HPP_
4 changes: 2 additions & 2 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

#include "rosbag2_cpp/cache/cache_consumer.hpp"
#include "rosbag2_cpp/cache/message_cache.hpp"
#include "rosbag2_cpp/cache_interfaces/base_message_cache_interface.hpp"
#include "rosbag2_cpp/cache/message_cache_interface.hpp"
#include "rosbag2_cpp/converter.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory.hpp"
#include "rosbag2_cpp/writer_interfaces/base_writer_interface.hpp"
Expand Down Expand Up @@ -116,7 +116,7 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
std::unique_ptr<Converter> converter_;

bool use_cache_;
std::shared_ptr<rosbag2_cpp::cache_interfaces::BaseMessageCacheInterface> message_cache_;
std::shared_ptr<rosbag2_cpp::cache::MessageCacheInterface> message_cache_;
std::unique_ptr<rosbag2_cpp::cache::CacheConsumer> cache_consumer_;

void switch_to_next_storage();
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_cpp/src/rosbag2_cpp/cache/cache_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace cache
{

CacheConsumer::CacheConsumer(
std::shared_ptr<rosbag2_cpp::cache_interfaces::BaseMessageCacheInterface> message_cache,
std::shared_ptr<MessageCacheInterface> message_cache,
consume_callback_function_t consume_callback)
: message_cache_(message_cache),
consume_callback_(consume_callback)
Expand Down
5 changes: 2 additions & 3 deletions rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

#include "rosbag2_cpp/cache/circular_message_cache.hpp"
#include "rosbag2_cpp/cache/message_cache_circular_buffer.hpp"
#include "rosbag2_cpp/cache_interfaces/base_cache_buffer_interface.hpp"
#include "rosbag2_cpp/cache/cache_buffer_interface.hpp"
#include "rosbag2_cpp/logging.hpp"

namespace rosbag2_cpp
Expand All @@ -39,8 +39,7 @@ void CircularMessageCache::push(std::shared_ptr<const rosbag2_storage::Serialize
primary_buffer_->push(msg);
}

std::shared_ptr<rosbag2_cpp::cache_interfaces::BaseCacheBufferInterface> CircularMessageCache::
consumer_buffer()
std::shared_ptr<CacheBufferInterface> CircularMessageCache::consumer_buffer()
{
return secondary_buffer_;
}
Expand Down
5 changes: 2 additions & 3 deletions rosbag2_cpp/src/rosbag2_cpp/cache/message_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include <utility>

#include "rosbag2_cpp/cache/message_cache.hpp"
#include "rosbag2_cpp/cache_interfaces/base_message_cache_interface.hpp"
#include "rosbag2_cpp/cache/message_cache_interface.hpp"
#include "rosbag2_cpp/logging.hpp"

namespace rosbag2_cpp
Expand Down Expand Up @@ -88,8 +88,7 @@ void MessageCache::swap_buffers()
std::swap(primary_buffer_, secondary_buffer_);
}

std::shared_ptr<rosbag2_cpp::cache_interfaces::BaseCacheBufferInterface> MessageCache::
consumer_buffer()
std::shared_ptr<CacheBufferInterface> MessageCache::consumer_buffer()
{
return secondary_buffer_;
}
Expand Down

0 comments on commit 5fecccc

Please sign in to comment.