diff --git a/iceoryx_posh/CMakeLists.txt b/iceoryx_posh/CMakeLists.txt index 8958baa83c..e1b6bf4562 100644 --- a/iceoryx_posh/CMakeLists.txt +++ b/iceoryx_posh/CMakeLists.txt @@ -72,7 +72,9 @@ add_library(iceoryx_posh source/popo/sender_port.cpp source/popo/sender_port_data.cpp source/popo/receiver_handler.cpp - source/popo/building_blocks/chunk_queue.cpp + source/popo/building_blocks/chunk_queue_pusher.cpp + source/popo/building_blocks/chunk_queue_popper.cpp + source/popo/building_blocks/chunk_receiver.cpp source/runtime/message_queue_interface.cpp source/runtime/message_queue_message.cpp source/runtime/port_config_info.cpp diff --git a/iceoryx_posh/include/iceoryx_posh/iceoryx_posh_types.hpp b/iceoryx_posh/include/iceoryx_posh/iceoryx_posh_types.hpp index f2c14b991f..a83733c958 100644 --- a/iceoryx_posh/include/iceoryx_posh/iceoryx_posh_types.hpp +++ b/iceoryx_posh/include/iceoryx_posh/iceoryx_posh_types.hpp @@ -54,8 +54,15 @@ constexpr uint32_t MAX_PORT_NUMBER = 1024u; constexpr uint32_t MAX_INTERFACE_NUMBER = 4u; constexpr uint32_t MAX_RECEIVERS_PER_SENDERPORT = 256u; constexpr uint32_t MAX_CHUNKS_ALLOCATE_PER_SENDER = 8u; -constexpr uint64_t MAX_SENDER_SAMPLE_HISTORY_CAPACITY = 16u; -constexpr uint32_t MAX_RECEIVER_QUEUE_CAPACITY = 256u; +constexpr uint64_t MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR = 16u; +constexpr uint32_t MAX_CHUNKS_HELD_PER_RECEIVER = 256u; +constexpr uint32_t MAX_RECEIVER_QUEUE_CAPACITY = MAX_CHUNKS_HELD_PER_RECEIVER; +/// With MAX_RECEIVER_QUEUE_CAPACITY = MAX_CHUNKS_HELD_PER_RECEIVER we couple the maximum number of chunks a user is +/// allowed to hold with the maximum queue capacity. +/// This allows that a polling user can replace all the held chunks in one execution with all new ones +/// from a completely filled queue. Or the other way round, when we have a contract with the user +/// regarding how many chunks they are allowed to hold, then the queue size needs not be bigger. We +/// can provide this number of newest chunks, more the user would not be allowed to hold anyway constexpr uint32_t MAX_INTERFACE_CAPRO_FIFO_SIZE = MAX_PORT_NUMBER; constexpr uint32_t MAX_APPLICATION_CAPRO_FIFO_SIZE = 128u; @@ -119,4 +126,3 @@ using FindServiceHandler = std::function +/// For the stored queues and the history, containers are used which are not thread safe. Therefore we use an +/// inter-process mutex. But this can lead to deadlocks if a user process gets terminated while one of its +/// threads is in the ChunkDistributor and holds a lock. An easier setup would be if changing the queues +/// by a middleware thread and sending chunks by the user process would not interleave. I.e. there is no concurrent +/// access to the containers. Then a memory synchronization would be sufficient. +/// The cleanup() call is the biggest challenge. This is used to free chunks that are still held by a not properly +/// terminated user application. Even if access from middleware and user threads do not overlap, the history +/// container to cleanup could be in an inconsistent state as the application was hard terminated while changing it. +/// We would need a container like the UsedChunkList to have one that is robust against such inconsistencies.... +/// A perfect job for our future selves +template class ChunkDistributor { public: - using MemberType_t = ChunkDistributorData; + using MemberType_t = ChunkDistributorDataType; + using ChunkQueueData_t = typename ChunkDistributorDataType::ChunkQueueData_t; + using ChunkQueuePusher_t = typename ChunkDistributorDataType::ChunkQueuePusher_t; ChunkDistributor(MemberType_t* const chunkDistrubutorDataPtr) noexcept; @@ -62,11 +68,11 @@ class ChunkDistributor /// @param[in] requestedHistory number of last chunks from history to send if available. If history size is smaller /// then the available history size chunks are provided /// @return true on success otherwise false - bool addQueue(ChunkQueue::MemberType_t* const queueToAdd, uint64_t requestedHistory = 0) noexcept; + bool addQueue(ChunkQueueData_t* const queueToAdd, uint64_t requestedHistory = 0) noexcept; /// @brief Remove a queue from the internal list of chunk queues /// @param[in] chunk queue to remove from the list - void removeQueue(ChunkQueue::MemberType_t* const queueToRemove) noexcept; + void removeQueue(ChunkQueueData_t* const queueToRemove) noexcept; /// @brief Delete all the stored chunk queues void removeAllQueues() noexcept; @@ -84,7 +90,7 @@ class ChunkDistributor /// history /// @param[in] chunk queue to which this chunk shall be delivered /// @param[in] shared chunk to be delivered - void deliverToQueue(ChunkQueue::MemberType_t* const queue, mepoo::SharedChunk chunk) noexcept; + void deliverToQueue(ChunkQueueData_t* const queue, mepoo::SharedChunk chunk) noexcept; /// @brief Update the chunk history but do not deliver the chunk to any chunk queue. E.g. use case is to to update a /// non offered field in ara @@ -117,3 +123,5 @@ class ChunkDistributor } // namespace iox #include "iceoryx_posh/internal/popo/building_blocks/chunk_distributor.inl" + +#endif diff --git a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_distributor.inl b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_distributor.inl index 573a9b7de4..8694c366fe 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_distributor.inl +++ b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_distributor.inl @@ -17,58 +17,57 @@ namespace iox { namespace popo { -template -inline ChunkDistributor::ChunkDistributor( +template +inline ChunkDistributor::ChunkDistributor( MemberType_t* const chunkDistrubutorDataPtr) noexcept : m_chunkDistrubutorDataPtr(chunkDistrubutorDataPtr) { } -template -inline const typename ChunkDistributor::MemberType_t* -ChunkDistributor::getMembers() const noexcept +template +inline const typename ChunkDistributor::MemberType_t* +ChunkDistributor::getMembers() const noexcept { return m_chunkDistrubutorDataPtr; } -template -inline typename ChunkDistributor::MemberType_t* -ChunkDistributor::getMembers() noexcept +template +inline typename ChunkDistributor::MemberType_t* +ChunkDistributor::getMembers() noexcept { return m_chunkDistrubutorDataPtr; } -template -inline bool ChunkDistributor::addQueue(ChunkQueue::MemberType_t* const queueToAdd, +template +inline bool ChunkDistributor::addQueue(ChunkQueueData_t* const queueToAdd, uint64_t requestedHistory) noexcept { - typename MemberType_t::lockGuard_t lock(*getMembers()); + typename MemberType_t::LockGuard_t lock(*getMembers()); if (nullptr == queueToAdd) { return false; } - auto l_alreadyKnownReceiver = - std::find_if(getMembers()->m_queues.begin(), - getMembers()->m_queues.end(), - [&](ChunkQueue::MemberType_t* const queue) { return queue == queueToAdd; }); + auto alreadyKnownReceiver = std::find_if(getMembers()->m_queues.begin(), + getMembers()->m_queues.end(), + [&](ChunkQueueData_t* const queue) { return queue == queueToAdd; }); // check if the queue is not already in the list - if (l_alreadyKnownReceiver == getMembers()->m_queues.end()) + if (alreadyKnownReceiver == getMembers()->m_queues.end()) { if (getMembers()->m_queues.size() < getMembers()->m_queues.capacity()) { getMembers()->m_queues.push_back(queueToAdd); - uint64_t currChunkHistorySize = getMembers()->m_sampleHistory.size(); + uint64_t currChunkHistorySize = getMembers()->m_history.size(); // if the current history is large enough we send the requested number of chunks, else we send the // total history auto startIndex = (requestedHistory <= currChunkHistorySize) ? currChunkHistorySize - requestedHistory : 0u; for (auto i = startIndex; i < currChunkHistorySize; ++i) { - deliverToQueue(queueToAdd, getMembers()->m_sampleHistory[i]); + deliverToQueue(queueToAdd, getMembers()->m_history[i]); } } else @@ -81,39 +80,38 @@ inline bool ChunkDistributor::addQueue(ChunkQueue::Mem return true; } -template -inline void -ChunkDistributor::removeQueue(ChunkQueue::MemberType_t* const queueToRemove) noexcept +template +inline void ChunkDistributor::removeQueue(ChunkQueueData_t* const queueToRemove) noexcept { - typename MemberType_t::lockGuard_t lock(*getMembers()); + typename MemberType_t::LockGuard_t lock(*getMembers()); - auto l_iter = std::find(getMembers()->m_queues.begin(), getMembers()->m_queues.end(), queueToRemove); - if (l_iter != getMembers()->m_queues.end()) + auto iter = std::find(getMembers()->m_queues.begin(), getMembers()->m_queues.end(), queueToRemove); + if (iter != getMembers()->m_queues.end()) { - getMembers()->m_queues.erase(l_iter); + getMembers()->m_queues.erase(iter); } } -template -inline void ChunkDistributor::removeAllQueues() noexcept +template +inline void ChunkDistributor::removeAllQueues() noexcept { - typename MemberType_t::lockGuard_t lock(*getMembers()); + typename MemberType_t::LockGuard_t lock(*getMembers()); getMembers()->m_queues.clear(); } -template -inline bool ChunkDistributor::hasStoredQueues() noexcept +template +inline bool ChunkDistributor::hasStoredQueues() noexcept { - typename MemberType_t::lockGuard_t lock(*getMembers()); + typename MemberType_t::LockGuard_t lock(*getMembers()); return !getMembers()->m_queues.empty(); } -template -inline void ChunkDistributor::deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept +template +inline void ChunkDistributor::deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept { - typename MemberType_t::lockGuard_t lock(*getMembers()); + typename MemberType_t::LockGuard_t lock(*getMembers()); // send to all the queues for (auto& queue : getMembers()->m_queues) @@ -125,52 +123,52 @@ inline void ChunkDistributor::deliverToAllStoredQueues addToHistoryWithoutDelivery(chunk); } -template -inline void ChunkDistributor::deliverToQueue(ChunkQueue::MemberType_t* const queue, +template +inline void ChunkDistributor::deliverToQueue(ChunkQueueData_t* const queue, mepoo::SharedChunk chunk) noexcept { - ChunkQueue(queue).push(chunk); + ChunkQueuePusher_t(queue).push(chunk); } -template -inline void ChunkDistributor::addToHistoryWithoutDelivery(mepoo::SharedChunk chunk) noexcept +template +inline void ChunkDistributor::addToHistoryWithoutDelivery(mepoo::SharedChunk chunk) noexcept { - typename MemberType_t::lockGuard_t lock(*getMembers()); + typename MemberType_t::LockGuard_t lock(*getMembers()); if (0 < getMembers()->m_historyCapacity) { - if (getMembers()->m_sampleHistory.size() >= getMembers()->m_historyCapacity) + if (getMembers()->m_history.size() >= getMembers()->m_historyCapacity) { - getMembers()->m_sampleHistory.erase(getMembers()->m_sampleHistory.begin()); + getMembers()->m_history.erase(getMembers()->m_history.begin()); } - getMembers()->m_sampleHistory.push_back(chunk); + getMembers()->m_history.push_back(chunk); } } -template -inline uint64_t ChunkDistributor::getHistorySize() noexcept +template +inline uint64_t ChunkDistributor::getHistorySize() noexcept { - typename MemberType_t::lockGuard_t lock(*getMembers()); + typename MemberType_t::LockGuard_t lock(*getMembers()); - return getMembers()->m_sampleHistory.size(); + return getMembers()->m_history.size(); } -template -inline uint64_t ChunkDistributor::getHistoryCapacity() const noexcept +template +inline uint64_t ChunkDistributor::getHistoryCapacity() const noexcept { return getMembers()->m_historyCapacity; } -template -inline void ChunkDistributor::clearHistory() noexcept +template +inline void ChunkDistributor::clearHistory() noexcept { - typename MemberType_t::lockGuard_t lock(*getMembers()); + typename MemberType_t::LockGuard_t lock(*getMembers()); - getMembers()->m_sampleHistory.clear(); + getMembers()->m_history.clear(); } -template -inline void ChunkDistributor::cleanup() noexcept +template +inline void ChunkDistributor::cleanup() noexcept { if (getMembers()->tryLock()) { diff --git a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_distributor_data.hpp b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_distributor_data.hpp index d6a87e5469..34a362d815 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_distributor_data.hpp +++ b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_distributor_data.hpp @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -#pragma once +#ifndef IOX_POPO_CHUNK_DISTRIBUTOR_DATA_HPP_ +#define IOX_POPO_CHUNK_DISTRIBUTOR_DATA_HPP_ #include "iceoryx_posh/iceoryx_posh_types.hpp" #include "iceoryx_posh/internal/log/posh_logging.hpp" #include "iceoryx_posh/internal/mepoo/shared_chunk.hpp" -#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_pusher.hpp" #include "iceoryx_utils/cxx/algorithm.hpp" #include "iceoryx_utils/cxx/vector.hpp" #include "iceoryx_utils/internal/posix_wrapper/mutex.hpp" @@ -29,8 +30,6 @@ namespace iox { namespace popo { -struct ChunkQueueData; - class ThreadSafePolicy { public: // needs to be public since we want to use std::lock_guard @@ -66,33 +65,37 @@ class SingleThreadedPolicy } }; -template +template struct ChunkDistributorData : public LockingPolicy { - using lockGuard_t = std::lock_guard>; + using LockGuard_t = std::lock_guard>; + using ChunkQueuePusher_t = ChunkQueuePusherType; + using ChunkQueueData_t = typename ChunkQueuePusherType::MemberType_t; ChunkDistributorData(uint64_t historyCapacity = 0u) noexcept - : m_historyCapacity(algorithm::min(historyCapacity, MAX_SENDER_SAMPLE_HISTORY_CAPACITY)) + : m_historyCapacity(algorithm::min(historyCapacity, MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR)) { if (m_historyCapacity != historyCapacity) { LogWarn() << "Chunk history too large, reducing from " << historyCapacity << " to " - << MAX_SENDER_SAMPLE_HISTORY_CAPACITY; + << MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR; } } const uint64_t m_historyCapacity; - using QueueContainer_t = cxx::vector; + using QueueContainer_t = cxx::vector; QueueContainer_t m_queues; /// @todo using ChunkManagement instead of SharedChunk as in UsedChunkList? /// When to store a SharedChunk and when the included ChunkManagement must be used? /// If we would make the ChunkDistributor lock-free, can we than extend the UsedChunkList to /// be like a ring buffer and use this for the history? This would be needed to be able to safely cleanup - using SampleHistoryContainer_t = cxx::vector; - SampleHistoryContainer_t m_sampleHistory; + using HistoryContainer_t = cxx::vector; + HistoryContainer_t m_history; }; } // namespace popo } // namespace iox + +#endif diff --git a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue_data.hpp b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue_data.hpp index ce3583b9e7..c22f0bb595 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue_data.hpp +++ b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue_data.hpp @@ -12,44 +12,32 @@ // See the License for the specific language governing permissions and // limitations under the License. -#pragma once +#ifndef IOX_POPO_CHUNK_QUEUE_DATA_HPP_ +#define IOX_POPO_CHUNK_QUEUE_DATA_HPP_ #include "iceoryx_posh/iceoryx_posh_types.hpp" -#include "iceoryx_utils/cxx/variant_queue.hpp" #include "iceoryx_posh/internal/mepoo/shared_pointer.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_types.hpp" +#include "iceoryx_utils/cxx/variant_queue.hpp" #include "iceoryx_utils/posix_wrapper/semaphore.hpp" namespace iox { namespace popo { - struct ChunkQueueData { - struct ChunkTuple - { - ChunkTuple() = default; - ChunkTuple(iox::relative_ptr f_chunk) noexcept - : m_segmentId(f_chunk.getId()) - , m_chunkOffset(f_chunk.getOffset()) - { - } - - RelativePointer::id_t m_segmentId{iox::RelativePointer::NULL_POINTER_ID}; - RelativePointer::offset_t m_chunkOffset{iox::RelativePointer::NULL_POINTER_OFFSET}; - }; - ChunkQueueData(cxx::VariantQueueTypes queueType) noexcept - : m_queue(queueType) + : m_queue(queueType) { - } cxx::VariantQueue m_queue; mepoo::SharedPointer m_semaphore; std::atomic_bool m_semaphoreAttached{false}; - }; } // namespace popo } // namespace iox + +#endif diff --git a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue.hpp b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue_popper.hpp similarity index 67% rename from iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue.hpp rename to iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue_popper.hpp index c52dc9cb9c..9a03912a51 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue.hpp +++ b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue_popper.hpp @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -#pragma once +#ifndef IOX_POPO_CHUNK_QUEUE_POPPER_HPP_ +#define IOX_POPO_CHUNK_QUEUE_POPPER_HPP_ #include "iceoryx_posh/internal/mepoo/shared_chunk.hpp" #include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_data.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_types.hpp" #include "iceoryx_utils/cxx/optional.hpp" @@ -23,34 +25,23 @@ namespace iox { namespace popo { -enum class ChunkQueueError -{ - SEMAPHORE_ALREADY_SET -}; - -/// @brief The ChunkQueue is the low layer building block to receive SharedChunks. It follows a first-in-first-out -/// principle. Together with the ChunkDistributor, the ChunkQueue builds the infrastructure to exchange memory chunks -/// between different data producers and consumers that could be located in different processes. A ChunkQueue is used -/// to build elements of higher abstraction layers that also do memory managemet and provide an API towards the real -/// user -class ChunkQueue +/// @brief The ChunkQueuePopper is the low layer building block to receive SharedChunks. It follows a first-in-first-out +/// principle. Together with the ChunkDistributor and the ChunkQueuePusher, the ChunkQueuePopper builds the +/// infrastructure to exchange memory chunks between different data producers and consumers that could be located in +/// different processes. A ChunkQueuePopper is used to build elements of higher abstraction layers that also do memory +/// managemet and provide an API towards the real user +class ChunkQueuePopper { public: using MemberType_t = ChunkQueueData; - ChunkQueue(MemberType_t* const chunkQueueDataPtr) noexcept; + ChunkQueuePopper(MemberType_t* const chunkQueueDataPtr) noexcept; - ChunkQueue(const ChunkQueue& other) = delete; - ChunkQueue& operator=(const ChunkQueue&) = delete; - ChunkQueue(ChunkQueue&& rhs) = default; - ChunkQueue& operator=(ChunkQueue&& rhs) = default; - ~ChunkQueue() = default; - - /// @brief push a new chunk to the chunk queue - /// @param[in] shared chunk object - /// @return if the values was pushed successfully into the chunk queue it returns - /// true, otherwise false - bool push(mepoo::SharedChunk chunk) noexcept; + ChunkQueuePopper(const ChunkQueuePopper& other) = delete; + ChunkQueuePopper& operator=(const ChunkQueuePopper&) = delete; + ChunkQueuePopper(ChunkQueuePopper&& rhs) = default; + ChunkQueuePopper& operator=(ChunkQueuePopper&& rhs) = default; + ~ChunkQueuePopper() = default; /// @brief pop a chunk from the chunk queue /// @return optional for a shared chunk that is set if the queue is not empty @@ -67,8 +58,7 @@ class ChunkQueue /// @brief set the capacity of the queue /// @param[in] newCapacity valid values are 0 < newCapacity < MAX_RECEIVER_QUEUE_CAPACITY - /// @pre it is important that no pop or push calls occur during - /// this call + /// @pre it is important that no pop or push calls occur during this call /// @concurrent not thread safe void setCapacity(const uint32_t newCapacity) noexcept; @@ -99,3 +89,5 @@ class ChunkQueue } // namespace popo } // namespace iox + +#endif diff --git a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue_pusher.hpp b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue_pusher.hpp new file mode 100644 index 0000000000..53a918fe4f --- /dev/null +++ b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue_pusher.hpp @@ -0,0 +1,62 @@ +// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef IOX_POPO_CHUNK_QUEUE_PUSHER_HPP_ +#define IOX_POPO_CHUNK_QUEUE_PUSHER_HPP_ + +#include "iceoryx_posh/internal/mepoo/shared_chunk.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_data.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_types.hpp" +#include "iceoryx_utils/cxx/expected.hpp" + + +namespace iox +{ +namespace popo +{ +/// @brief The ChunkQueuePusher is the low layer building block to push SharedChunks in a chunk queue. +/// Together with the ChunkDistributor and ChunkQueuePopper the ChunkQueuePusher builds the infrastructure +/// to exchange memory chunks between different data producers and consumers that could be located in different +/// processes. A ChunkQueuePusher is the part of the chunk queue that is knwon by the ChunkDistributor +class ChunkQueuePusher +{ + public: + using MemberType_t = ChunkQueueData; + + ChunkQueuePusher(MemberType_t* const chunkQueueDataPtr) noexcept; + + ChunkQueuePusher(const ChunkQueuePusher& other) = delete; + ChunkQueuePusher& operator=(const ChunkQueuePusher&) = delete; + ChunkQueuePusher(ChunkQueuePusher&& rhs) = default; + ChunkQueuePusher& operator=(ChunkQueuePusher&& rhs) = default; + ~ChunkQueuePusher() = default; + + /// @brief push a new chunk to the chunk queue + /// @param[in] shared chunk object + /// @return if the values was pushed successfully into the chunk queue it returns + /// success, otherwise a ChunkQueueError + cxx::expected push(mepoo::SharedChunk chunk) noexcept; + + protected: + const MemberType_t* getMembers() const noexcept; + MemberType_t* getMembers() noexcept; + + private: + MemberType_t* m_chunkQueueDataPtr{nullptr}; +}; + +} // namespace popo +} // namespace iox + +#endif diff --git a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue_types.hpp b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue_types.hpp new file mode 100644 index 0000000000..6a9505e7f1 --- /dev/null +++ b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_queue_types.hpp @@ -0,0 +1,47 @@ +// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef IOX_POPO_CHUNK_QUEUE_TYPES_HPP_ +#define IOX_POPO_CHUNK_QUEUE_TYPES_HPP_ + +#include "iceoryx_posh/internal/mepoo/chunk_management.hpp" +#include "iceoryx_utils/internal/relocatable_pointer/relative_ptr.hpp" + +namespace iox +{ +namespace popo +{ +enum class ChunkQueueError +{ + SEMAPHORE_ALREADY_SET, + QUEUE_OVERFLOW +}; + +struct ChunkTuple +{ + ChunkTuple() = default; + ChunkTuple(iox::relative_ptr f_chunk) noexcept + : m_segmentId(f_chunk.getId()) + , m_chunkOffset(f_chunk.getOffset()) + { + } + + RelativePointer::id_t m_segmentId{iox::RelativePointer::NULL_POINTER_ID}; + RelativePointer::offset_t m_chunkOffset{iox::RelativePointer::NULL_POINTER_OFFSET}; +}; + +} // namespace popo +} // namespace iox + +#endif diff --git a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_receiver.hpp b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_receiver.hpp new file mode 100644 index 0000000000..94b2f12714 --- /dev/null +++ b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_receiver.hpp @@ -0,0 +1,76 @@ +// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef IOX_POPO_CHUNK_RECEIVER_HPP_ +#define IOX_POPO_CHUNK_RECEIVER_HPP_ + +#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_popper.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_receiver_data.hpp" +#include "iceoryx_posh/mepoo/chunk_header.hpp" +#include "iceoryx_utils/cxx/expected.hpp" +#include "iceoryx_utils/cxx/optional.hpp" + +namespace iox +{ +namespace popo +{ +enum class ChunkReceiverError +{ + TOO_MANY_CHUNKS_HELD_IN_PARALLEL +}; + +/// @brief The ChunkReceiver is a building block of the shared memory communication infrastructure. It extends +/// the functionality of a ChunkQueuePopper with the abililty to pass chunks to the user side (user process). +/// Together with the ChunkSender, they are the next abstraction layer on top of ChunkDistributor and ChunkQueuePopper. +/// The +/// ChunkRceiver holds the ownership of the SharedChunks and does a bookkeeping which chunks are currently passed to the +/// user side. +class ChunkReceiver : public ChunkQueuePopper +{ + public: + using MemberType_t = ChunkReceiverData; + + ChunkReceiver(MemberType_t* const chunkReceiverDataPtr) noexcept; + + ChunkReceiver(const ChunkReceiver& other) = delete; + ChunkReceiver& operator=(const ChunkReceiver&) = delete; + ChunkReceiver(ChunkReceiver&& rhs) = default; + ChunkReceiver& operator=(ChunkReceiver&& rhs) = default; + ~ChunkReceiver() = default; + + /// @brief Tries to get the next received chunk. If there is a new one the ChunkHeader of this new chunk is received + /// The ownerhip of the SharedChunk remains in the ChunkReceiver for being able to cleanup if the user process + /// disappears + /// @return optional that has a new chunk header or no value if there are no new chunks in the underlying queue, + /// ChunkReceiverError on error + cxx::expected, ChunkReceiverError> get() noexcept; + + /// @brief Release a chunk that was obtained with get + /// @param[in] chunkHeader, pointer to the ChunkHeader to free + void release(const mepoo::ChunkHeader* chunkHeader) noexcept; + + /// @brief Release all the chunks that are currently held. Caution: Only call this if the user process is no more + /// running E.g. This cleans up chunks that were held by a user process that died unexpectetly, for avoiding lost + /// chunks in the system + void releaseAll() noexcept; + + private: + const MemberType_t* getMembers() const noexcept; + MemberType_t* getMembers() noexcept; +}; + +} // namespace popo +} // namespace iox + +#endif diff --git a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_receiver_data.hpp b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_receiver_data.hpp new file mode 100644 index 0000000000..3d173003b4 --- /dev/null +++ b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_receiver_data.hpp @@ -0,0 +1,50 @@ +// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef IOX_POPO_CHUNK_RECEIVER_DATA_HPP_ +#define IOX_POPO_CHUNK_RECEIVER_DATA_HPP_ + +#include "iceoryx_posh/iceoryx_posh_types.hpp" +#include "iceoryx_posh/internal/mepoo/shared_chunk.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_data.hpp" +#include "iceoryx_posh/internal/popo/used_chunk_list.hpp" +#include "iceoryx_posh/mepoo/memory_info.hpp" +#include "iceoryx_utils/cxx/variant_queue.hpp" + +namespace iox +{ +namespace popo +{ +struct ChunkReceiverData : public ChunkQueueData +{ + ChunkReceiverData(cxx::VariantQueueTypes queueType, + const mepoo::MemoryInfo& memoryInfo = mepoo::MemoryInfo()) noexcept + : ChunkQueueData(queueType) + , m_memoryInfo(memoryInfo) + { + } + + mepoo::MemoryInfo m_memoryInfo; + + /// we use one more than MAX_CHUNKS_HELD_PER_RECEIVER for being able to provide one new chunk + /// to the user if they already have the allowed MAX_CHUNKS_HELD_PER_RECEIVER. But then the user + /// has to return one to not brake the contract. This is aligned with AUTOSAR Adaptive ara::com + static constexpr uint32_t MAX_CHUNKS_IN_USE = MAX_CHUNKS_HELD_PER_RECEIVER + 1; + UsedChunkList m_chunksInUse; +}; + +} // namespace popo +} // namespace iox + +#endif diff --git a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender.hpp b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender.hpp index b2522e6674..c6cb109dbb 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender.hpp +++ b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender.hpp @@ -12,12 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef IOX_POSH_POPO_CHUNK_SENDER_HPP_ -#define IOX_POSH_POPO_CHUNK_SENDER_HPP_ +#ifndef IOX_POPO_CHUNK_SENDER_HPP_ +#define IOX_POPO_CHUNK_SENDER_HPP_ #include "iceoryx_posh/internal/mepoo/shared_chunk.hpp" -#include "iceoryx_posh/internal/popo/building_blocks/chunk_distributor.hpp" -#include "iceoryx_posh/internal/popo/building_blocks/chunk_sender.hpp" #include "iceoryx_posh/internal/popo/building_blocks/chunk_sender_data.hpp" #include "iceoryx_posh/mepoo/chunk_header.hpp" #include "iceoryx_utils/cxx/expected.hpp" @@ -28,17 +26,16 @@ namespace iox { namespace popo { -/// @brief error which can occur in the VariantQueue enum class ChunkSenderError { RUNNING_OUT_OF_CHUNKS, - TOO_MANY_CHUKS_ALLOCATED_IN_PARALLEL + TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL }; /// @brief The ChunkSender is a building block of the shared memory communication infrastructure. It extends /// the functionality of a ChunkDistributor with the abililty to allocate and free memory chunks. /// For getting chunks of memory the MemoryManger is used. Together with the ChunkReceiver, they are the next -/// abstraction layer on top of ChunkDistributor and ChunkQueue. The ChunkSender holds the ownership of the +/// abstraction layer on top of ChunkDistributor and ChunkQueuePopper. The ChunkSender holds the ownership of the /// SharedChunks and does a bookkeeping which chunks are currently passed to the user side. template class ChunkSender : public ChunkDistributorType @@ -65,7 +62,7 @@ class ChunkSender : public ChunkDistributorType /// @param[in] chunkHeader, pointer to the ChunkHeader to free void free(mepoo::ChunkHeader* const chunkHeader) noexcept; - /// @brief Send an allocated chunk to all connected ChunkQueue + /// @brief Send an allocated chunk to all connected ChunkQueuePopper /// @param[in] chunkHeader, pointer to the ChunkHeader to send void send(mepoo::ChunkHeader* const chunkHeader) noexcept; @@ -75,12 +72,12 @@ class ChunkSender : public ChunkDistributorType /// @brief Returns the last sent chunk if there is one /// @return pointer to the ChunkHeader of the last sent Chunk if there is one, empty optional if not - cxx::optional getLastChunk() const noexcept; + cxx::optional getLast() const noexcept; /// @brief Release all the chunks that are currently held. Caution: Only call this if the user process is no more /// running E.g. This cleans up chunks that were held by a user process that died unexpectetly, for avoiding lost /// chunks in the system - void releaseAllChunks() noexcept; + void releaseAll() noexcept; private: /// @brief Get the SharedChunk from the provided ChunkHeader and do all that is required to send the chunk diff --git a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender.inl b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender.inl index 95ffd4773f..b6dd90303a 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender.inl +++ b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender.inl @@ -54,7 +54,7 @@ ChunkSender::allocate(const uint32_t payloadSize) noexcept } else { - return cxx::error(ChunkSenderError::TOO_MANY_CHUKS_ALLOCATED_IN_PARALLEL); + return cxx::error(ChunkSenderError::TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL); } } else @@ -75,7 +75,7 @@ ChunkSender::allocate(const uint32_t payloadSize) noexcept { // release the allocated chunk chunk = nullptr; - return cxx::error(ChunkSenderError::TOO_MANY_CHUKS_ALLOCATED_IN_PARALLEL); + return cxx::error(ChunkSenderError::TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL); } } else @@ -122,7 +122,7 @@ inline void ChunkSender::pushToHistory(mepoo::ChunkHeader* } template -inline cxx::optional ChunkSender::getLastChunk() const noexcept +inline cxx::optional ChunkSender::getLast() const noexcept { if (getMembers()->m_lastChunk) { @@ -135,7 +135,7 @@ inline cxx::optional ChunkSender -inline void ChunkSender::releaseAllChunks() noexcept +inline void ChunkSender::releaseAll() noexcept { getMembers()->m_chunksInUse.cleanup(); this->cleanup(); @@ -151,12 +151,15 @@ inline bool ChunkSender::getChunkReadyForSend(mepoo::Chunk auto& chunkInfo = chunk.getChunkHeader()->m_info; if (!chunkInfo.m_externalSequenceNumber_bl) { + // if the sequence number is NOT set by the user, we take the one from the chunk sender for the chunk info chunkInfo.m_sequenceNumber = getMembers()->m_sequenceNumber; getMembers()->m_sequenceNumber++; } else { - getMembers()->m_sequenceNumber++; // for Introspection, else nobody updates. + // if the seqence number in the chunk info is set by the user, we still increment the internal one as this + // might be needed by midddleware spcific evaluation (like in introspection) + getMembers()->m_sequenceNumber++; } return true; } diff --git a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender_data.hpp b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender_data.hpp index 8fdceb8c04..2129b5cd06 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender_data.hpp +++ b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender_data.hpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef IOX_POSH_POPO_CHUNK_SENDER_DATA_HPP_ -#define IOX_POSH_POPO_CHUNK_SENDER_DATA_HPP_ +#ifndef IOX_POPO_CHUNK_SENDER_DATA_HPP_ +#define IOX_POPO_CHUNK_SENDER_DATA_HPP_ #include "iceoryx_posh/iceoryx_posh_types.hpp" #include "iceoryx_posh/internal/mepoo/memory_manager.hpp" diff --git a/iceoryx_posh/source/popo/building_blocks/chunk_queue.cpp b/iceoryx_posh/source/popo/building_blocks/chunk_queue_popper.cpp similarity index 50% rename from iceoryx_posh/source/popo/building_blocks/chunk_queue.cpp rename to iceoryx_posh/source/popo/building_blocks/chunk_queue_popper.cpp index f75ee34fa8..e977493a95 100644 --- a/iceoryx_posh/source/popo/building_blocks/chunk_queue.cpp +++ b/iceoryx_posh/source/popo/building_blocks/chunk_queue_popper.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_popper.hpp" #include "iceoryx_posh/internal/log/posh_logging.hpp" @@ -20,49 +20,22 @@ namespace iox { namespace popo { -ChunkQueue::ChunkQueue(MemberType_t* const chunkQueueDataPtr) noexcept +ChunkQueuePopper::ChunkQueuePopper(MemberType_t* const chunkQueueDataPtr) noexcept : m_chunkQueueDataPtr(chunkQueueDataPtr) { } -const ChunkQueue::MemberType_t* ChunkQueue::getMembers() const noexcept +const ChunkQueuePopper::MemberType_t* ChunkQueuePopper::getMembers() const noexcept { return m_chunkQueueDataPtr; } -ChunkQueue::MemberType_t* ChunkQueue::getMembers() noexcept +ChunkQueuePopper::MemberType_t* ChunkQueuePopper::getMembers() noexcept { return m_chunkQueueDataPtr; } -bool ChunkQueue::push(mepoo::SharedChunk chunk) noexcept -{ - ChunkQueueData::ChunkTuple chunkTupleIn(chunk.releaseWithRelativePtr()); - - return !getMembers() - ->m_queue.push(chunkTupleIn) - .on_success( - [this](cxx::expected, cxx::VariantQueueError>& retVal) { - // drop the chunk if one is returned by a safe overflow - if (*retVal) - { - auto chunkTupleOut = **retVal; - auto chunkManagement = iox::relative_ptr( - chunkTupleOut.m_chunkOffset, chunkTupleOut.m_segmentId); - auto chunk = mepoo::SharedChunk(chunkManagement); - } - - if (getMembers()->m_semaphoreAttached.load(std::memory_order_acquire) - && getMembers()->m_semaphore) - { - std::cout << "fubar" << std::endl; - getMembers()->m_semaphore->post(); - } - }) - .has_error(); -} - -cxx::optional ChunkQueue::pop() noexcept +cxx::optional ChunkQueuePopper::pop() noexcept { auto retVal = getMembers()->m_queue.pop(); @@ -81,34 +54,47 @@ cxx::optional ChunkQueue::pop() noexcept } } -bool ChunkQueue::empty() noexcept +bool ChunkQueuePopper::empty() noexcept { return getMembers()->m_queue.empty(); } -uint64_t ChunkQueue::size() noexcept +uint64_t ChunkQueuePopper::size() noexcept { return getMembers()->m_queue.size(); } -void ChunkQueue::setCapacity(const uint32_t newCapacity) noexcept +void ChunkQueuePopper::setCapacity(const uint32_t newCapacity) noexcept { getMembers()->m_queue.setCapacity(newCapacity); } -uint64_t ChunkQueue::capacity() noexcept +uint64_t ChunkQueuePopper::capacity() noexcept { return getMembers()->m_queue.capacity(); } -void ChunkQueue::clear() noexcept +void ChunkQueuePopper::clear() noexcept { - while (getMembers()->m_queue.pop()) + do { - } + auto retVal = getMembers()->m_queue.pop(); + if (retVal) + { + auto chunkTupleOut = *retVal; + auto chunkManagement = + iox::relative_ptr(chunkTupleOut.m_chunkOffset, chunkTupleOut.m_segmentId); + auto chunk = mepoo::SharedChunk(chunkManagement); + } + else + { + break; + } + } while (true); } -cxx::expected ChunkQueue::attachSemaphore(mepoo::SharedPointer semaphore) noexcept +cxx::expected +ChunkQueuePopper::attachSemaphore(mepoo::SharedPointer semaphore) noexcept { if (isSemaphoreAttached()) { @@ -123,7 +109,7 @@ cxx::expected ChunkQueue::attachSemaphore(mepoo::SharedPointer< } } -bool ChunkQueue::isSemaphoreAttached() noexcept +bool ChunkQueuePopper::isSemaphoreAttached() noexcept { return getMembers()->m_semaphoreAttached.load(std::memory_order_relaxed); } diff --git a/iceoryx_posh/source/popo/building_blocks/chunk_queue_pusher.cpp b/iceoryx_posh/source/popo/building_blocks/chunk_queue_pusher.cpp new file mode 100644 index 0000000000..c0776412ca --- /dev/null +++ b/iceoryx_posh/source/popo/building_blocks/chunk_queue_pusher.cpp @@ -0,0 +1,70 @@ +// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_pusher.hpp" + +#include "iceoryx_posh/internal/log/posh_logging.hpp" + +namespace iox +{ +namespace popo +{ +ChunkQueuePusher::ChunkQueuePusher(MemberType_t* const chunkQueueDataPtr) noexcept + : m_chunkQueueDataPtr(chunkQueueDataPtr) +{ +} + +const ChunkQueuePusher::MemberType_t* ChunkQueuePusher::getMembers() const noexcept +{ + return m_chunkQueueDataPtr; +} + +ChunkQueuePusher::MemberType_t* ChunkQueuePusher::getMembers() noexcept +{ + return m_chunkQueueDataPtr; +} + +cxx::expected ChunkQueuePusher::push(mepoo::SharedChunk chunk) noexcept +{ + ChunkTuple chunkTupleIn(chunk.releaseWithRelativePtr()); + + auto pushRet = getMembers()->m_queue.push(chunkTupleIn); + + if (pushRet.has_error()) + { + return cxx::error(ChunkQueueError::QUEUE_OVERFLOW); + } + else + { + // drop the chunk if one is returned by a safe overflow + if ((*pushRet).has_value()) + { + auto chunkTupleOut = **pushRet; + auto chunkManagement = + iox::relative_ptr(chunkTupleOut.m_chunkOffset, chunkTupleOut.m_segmentId); + // this will release the chunk + auto returnedChunk = mepoo::SharedChunk(chunkManagement); + } + + if (getMembers()->m_semaphoreAttached.load(std::memory_order_acquire) && getMembers()->m_semaphore) + { + getMembers()->m_semaphore->post(); + } + + return cxx::success(); + } +} + +} // namespace popo +} // namespace iox diff --git a/iceoryx_posh/source/popo/building_blocks/chunk_receiver.cpp b/iceoryx_posh/source/popo/building_blocks/chunk_receiver.cpp new file mode 100644 index 0000000000..8b4d7be166 --- /dev/null +++ b/iceoryx_posh/source/popo/building_blocks/chunk_receiver.cpp @@ -0,0 +1,81 @@ +// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "iceoryx_posh/internal/popo/building_blocks/chunk_receiver.hpp" +#include "iceoryx_posh/internal/log/posh_logging.hpp" +#include "iceoryx_utils/error_handling/error_handling.hpp" + +namespace iox +{ +namespace popo +{ +ChunkReceiver::ChunkReceiver(MemberType_t* const chunkReceiverDataPtr) noexcept + : ChunkQueuePopper(chunkReceiverDataPtr) +{ +} + +const ChunkReceiver::MemberType_t* ChunkReceiver::getMembers() const noexcept +{ + return reinterpret_cast(ChunkQueuePopper::getMembers()); +} + +ChunkReceiver::MemberType_t* ChunkReceiver::getMembers() noexcept +{ + return reinterpret_cast(ChunkQueuePopper::getMembers()); +} + +cxx::expected, ChunkReceiverError> ChunkReceiver::get() noexcept +{ + auto popRet = this->pop(); + + if (popRet.has_value()) + { + auto sharedChunk = *popRet; + + // if the application holds too many chunks, don't provide more + if (getMembers()->m_chunksInUse.insert(sharedChunk)) + { + return cxx::success>(sharedChunk.getChunkHeader()); + } + else + { + // release the chunk + sharedChunk = nullptr; + return cxx::error(ChunkReceiverError::TOO_MANY_CHUNKS_HELD_IN_PARALLEL); + } + } + else + { + // no new chunk + return cxx::success>(cxx::nullopt_t()); + } +} + +void ChunkReceiver::release(const mepoo::ChunkHeader* chunkHeader) noexcept +{ + mepoo::SharedChunk chunk(nullptr); + if (!getMembers()->m_chunksInUse.remove(chunkHeader, chunk)) + { + errorHandler(Error::kPOPO__CHUNK_RECEIVER_INVALID_CHUNK_TO_RELEASE_FROM_USER, nullptr, ErrorLevel::SEVERE); + } +} + +void ChunkReceiver::releaseAll() noexcept +{ + getMembers()->m_chunksInUse.cleanup(); + this->clear(); +} + +} // namespace popo +} // namespace iox diff --git a/iceoryx_posh/test/moduletests/test_popo_chunk_distributor.cpp b/iceoryx_posh/test/moduletests/test_popo_chunk_distributor.cpp index 85908467b8..f2aeddd5b1 100644 --- a/iceoryx_posh/test/moduletests/test_popo_chunk_distributor.cpp +++ b/iceoryx_posh/test/moduletests/test_popo_chunk_distributor.cpp @@ -16,6 +16,8 @@ #include "iceoryx_posh/internal/popo/building_blocks/chunk_distributor.hpp" #include "iceoryx_posh/internal/popo/building_blocks/chunk_distributor_data.hpp" #include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_data.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_popper.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_pusher.hpp" #include "iceoryx_posh/mepoo/chunk_header.hpp" #include "iceoryx_utils/cxx/variant_queue.hpp" #include "test.hpp" @@ -57,8 +59,8 @@ class ChunkDistributor_test : public Test iox::posix::Allocator allocator{memory, MEMORY_SIZE}; MemPool mempool{128, 20, &allocator, &allocator}; MemPool chunkMgmtPool{128, 20, &allocator, &allocator}; - using ChunkDistributorData_t = ChunkDistributorData; - using ChunkDistributor_t = ChunkDistributor; + using ChunkDistributorData_t = ChunkDistributorData; + using ChunkDistributor_t = ChunkDistributor; void SetUp(){}; void TearDown(){}; @@ -68,9 +70,9 @@ class ChunkDistributor_test : public Test return std::make_shared(VariantQueueTypes::SoFi_SingleProducerSingleConsumer); } - std::shared_ptr> getChunkDistributorData() + std::shared_ptr getChunkDistributorData() { - return std::make_shared>(HISTORY_SIZE); + return std::make_shared(HISTORY_SIZE); } }; @@ -125,7 +127,7 @@ TYPED_TEST(ChunkDistributor_test, QueueOverflow) auto sutData = this->getChunkDistributorData(); typename TestFixture::ChunkDistributor_t sut(sutData.get()); - for (auto i = 0; i < this->MAX_NUMBER_QUEUES; ++i) + for (uint32_t i = 0; i < this->MAX_NUMBER_QUEUES; ++i) { auto queueData = this->getChunkQueueData(); EXPECT_THAT(sut.addQueue(queueData.get()), Eq(true)); @@ -204,7 +206,7 @@ TYPED_TEST(ChunkDistributor_test, DeliverToAllStoredQueuesWithOneQueue) auto chunk = this->allocateChunk(4451); sut.deliverToAllStoredQueues(chunk); - ChunkQueue queue(queueData.get()); + ChunkQueuePopper queue(queueData.get()); auto result = queue.pop(); ASSERT_THAT(result.has_value(), Eq(true)); @@ -222,7 +224,7 @@ TYPED_TEST(ChunkDistributor_test, DeliverToAllStoredQueuesWithOneQueueDeliversOn auto chunk = this->allocateChunk(4451); sut.deliverToAllStoredQueues(chunk); - ChunkQueue queue(queueData.get()); + ChunkQueuePopper queue(queueData.get()); EXPECT_THAT(queue.size(), Eq(1)); EXPECT_THAT(sut.getHistorySize(), Eq(1)); } @@ -240,7 +242,7 @@ TYPED_TEST(ChunkDistributor_test, DeliverToAllStoredQueuesWithDuplicatedQueueDel auto chunk = this->allocateChunk(4451); sut.deliverToAllStoredQueues(chunk); - ChunkQueue queue(queueData.get()); + ChunkQueuePopper queue(queueData.get()); EXPECT_THAT(queue.size(), Eq(1)); EXPECT_THAT(sut.getHistorySize(), Eq(1)); } @@ -258,7 +260,7 @@ TYPED_TEST(ChunkDistributor_test, DeliverToAllStoredQueuesWithOneQueueMultipleCh for (auto i = 0; i < limit; ++i) sut.deliverToAllStoredQueues(this->allocateChunk(i * 123)); - ChunkQueue queue(queueData.get()); + ChunkQueuePopper queue(queueData.get()); for (auto i = 0; i < limit; ++i) { auto result = queue.pop(); @@ -280,7 +282,7 @@ TYPED_TEST(ChunkDistributor_test, DeliverToAllStoredQueuesWithOneQueueDeliverMul for (auto i = 0; i < limit; ++i) sut.deliverToAllStoredQueues(this->allocateChunk(i * 123)); - ChunkQueue queue(queueData.get()); + ChunkQueuePopper queue(queueData.get()); EXPECT_THAT(queue.size(), Eq(limit)); EXPECT_THAT(sut.getHistorySize(), Eq(limit)); } @@ -303,7 +305,7 @@ TYPED_TEST(ChunkDistributor_test, DeliverToAllStoredQueuesWithMultipleQueues) for (auto i = 0; i < limit; ++i) { - ChunkQueue queue(queueData[i].get()); + ChunkQueuePopper queue(queueData[i].get()); auto result = queue.pop(); ASSERT_THAT(result.has_value(), Eq(true)); EXPECT_THAT(this->getSharedChunkValue(*result), Eq(24451)); @@ -331,7 +333,7 @@ TYPED_TEST(ChunkDistributor_test, DeliverToAllStoredQueuesWithMultipleQueuesMult { for (auto k = 0; k < limit; ++k) { - ChunkQueue queue(queueData[i].get()); + ChunkQueuePopper queue(queueData[i].get()); auto result = queue.pop(); ASSERT_THAT(result.has_value(), Eq(true)); EXPECT_THAT(this->getSharedChunkValue(*result), Eq(k * 34)); @@ -391,7 +393,7 @@ TYPED_TEST(ChunkDistributor_test, DeliverToQueueDirectlyWhenNotAdded) auto chunk = this->allocateChunk(4451); sut.deliverToQueue(queueData.get(), chunk); - ChunkQueue queue(queueData.get()); + ChunkQueuePopper queue(queueData.get()); auto result = queue.pop(); ASSERT_THAT(result.has_value(), Eq(true)); @@ -409,7 +411,7 @@ TYPED_TEST(ChunkDistributor_test, DeliverToQueueDirectlyWhenAdded) auto chunk = this->allocateChunk(451); sut.deliverToQueue(queueData.get(), chunk); - ChunkQueue queue(queueData.get()); + ChunkQueuePopper queue(queueData.get()); auto result = queue.pop(); ASSERT_THAT(result.has_value(), Eq(true)); @@ -456,7 +458,7 @@ TYPED_TEST(ChunkDistributor_test, DeliverHistoryOnAddWithLessThanAvailable) // add a queue with a requested history of one must deliver the latest sample auto queueData = this->getChunkQueueData(); - ChunkQueue queue(queueData.get()); + ChunkQueuePopper queue(queueData.get()); sut.addQueue(queueData.get(), 1); EXPECT_THAT(queue.size(), Eq(1)); @@ -479,7 +481,7 @@ TYPED_TEST(ChunkDistributor_test, DeliverHistoryOnAddWithExactAvailable) // add a queue with a requested history of 3 must deliver all three in the order oldest to newest auto queueData = this->getChunkQueueData(); - ChunkQueue queue(queueData.get()); + ChunkQueuePopper queue(queueData.get()); sut.addQueue(queueData.get(), 3); EXPECT_THAT(queue.size(), Eq(3)); @@ -507,7 +509,7 @@ TYPED_TEST(ChunkDistributor_test, DeliverHistoryOnAddWithMoreThanAvailable) // add a queue with a requested history of 5 must deliver only the three available in the order oldest to newest auto queueData = this->getChunkQueueData(); - ChunkQueue queue(queueData.get()); + ChunkQueuePopper queue(queueData.get()); sut.addQueue(queueData.get(), 5); EXPECT_THAT(queue.size(), Eq(3)); diff --git a/iceoryx_posh/test/moduletests/test_popo_chunk_queue.cpp b/iceoryx_posh/test/moduletests/test_popo_chunk_queue.cpp index d873843b1f..15c6fc4a2b 100644 --- a/iceoryx_posh/test/moduletests/test_popo_chunk_queue.cpp +++ b/iceoryx_posh/test/moduletests/test_popo_chunk_queue.cpp @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue.hpp" #include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_data.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_popper.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_pusher.hpp" #include "iceoryx_posh/internal/mepoo/memory_manager.hpp" #include "iceoryx_posh/internal/mepoo/shared_chunk.hpp" @@ -60,7 +61,8 @@ class ChunkQueue_test : public TestWithParam, publi void TearDown() override{}; ChunkQueueData m_chunkData{GetParam()}; - ChunkQueue m_dut{&m_chunkData}; + ChunkQueuePopper m_popper{&m_chunkData}; + ChunkQueuePusher m_pusher{&m_chunkData}; }; INSTANTIATE_TEST_CASE_P(ChunkQueueAll, @@ -70,37 +72,38 @@ INSTANTIATE_TEST_CASE_P(ChunkQueueAll, TEST_P(ChunkQueue_test, InitialEmpty) { - EXPECT_THAT(m_dut.empty(), Eq(true)); + EXPECT_THAT(m_popper.empty(), Eq(true)); } TEST_P(ChunkQueue_test, InitialSemaphoreAttached) { - EXPECT_THAT(m_dut.isSemaphoreAttached(), Eq(false)); + EXPECT_THAT(m_popper.isSemaphoreAttached(), Eq(false)); } TEST_P(ChunkQueue_test, PushOneChunk) { auto chunk = allocateChunk(); - EXPECT_THAT(m_dut.push(chunk), Eq(true)); - EXPECT_THAT(m_dut.empty(), Eq(false)); + auto ret = m_pusher.push(chunk); + EXPECT_FALSE(ret.has_error()); + EXPECT_THAT(m_popper.empty(), Eq(false)); /// @note size not implemented on FIFO if (GetParam() != iox::cxx::VariantQueueTypes::FiFo_SingleProducerSingleConsumer) { - EXPECT_THAT(m_dut.size(), Eq(1u)); + EXPECT_THAT(m_popper.size(), Eq(1u)); } } TEST_P(ChunkQueue_test, PopOneChunk) { auto chunk = allocateChunk(); - m_dut.push(chunk); + m_pusher.push(chunk); - EXPECT_THAT(m_dut.pop().has_value(), Eq(true)); - EXPECT_THAT(m_dut.empty(), Eq(true)); + EXPECT_THAT(m_popper.pop().has_value(), Eq(true)); + EXPECT_THAT(m_popper.empty(), Eq(true)); /// @note size not implemented on FIFO if (GetParam() != iox::cxx::VariantQueueTypes::FiFo_SingleProducerSingleConsumer) { - EXPECT_THAT(m_dut.size(), Eq(0u)); + EXPECT_THAT(m_popper.size(), Eq(0u)); } } @@ -111,12 +114,12 @@ TEST_P(ChunkQueue_test, PushedChunksMustBePoppedInTheSameOrder) { auto chunk = allocateChunk(); *reinterpret_cast(chunk.getPayload()) = i; - m_dut.push(chunk); + m_pusher.push(chunk); } for (int i = 0; i < NUMBER_CHUNKS; ++i) { - auto chunk = m_dut.pop(); + auto chunk = m_popper.pop(); ASSERT_THAT(chunk.has_value(), Eq(true)); auto data = *reinterpret_cast(chunk->getPayload()); EXPECT_THAT(data, Eq(i)); @@ -125,16 +128,16 @@ TEST_P(ChunkQueue_test, PushedChunksMustBePoppedInTheSameOrder) TEST_P(ChunkQueue_test, ClearOnEmpty) { - m_dut.clear(); - EXPECT_THAT(m_dut.empty(), Eq(true)); + m_popper.clear(); + EXPECT_THAT(m_popper.empty(), Eq(true)); } TEST_P(ChunkQueue_test, ClearWithData) { auto chunk = allocateChunk(); - m_dut.push(chunk); - m_dut.clear(); - EXPECT_THAT(m_dut.empty(), Eq(true)); + m_pusher.push(chunk); + m_popper.clear(); + EXPECT_THAT(m_popper.empty(), Eq(true)); } TEST_P(ChunkQueue_test, AttachSemaphore) @@ -142,10 +145,10 @@ TEST_P(ChunkQueue_test, AttachSemaphore) auto semaphore = semaphorePool.createObjectWithCreationPattern(0); ASSERT_THAT(semaphore.has_error(), Eq(false)); - auto ret = m_dut.attachSemaphore(*semaphore); + auto ret = m_popper.attachSemaphore(*semaphore); EXPECT_FALSE(ret.has_error()); - EXPECT_THAT(m_dut.isSemaphoreAttached(), Eq(true)); + EXPECT_THAT(m_popper.isSemaphoreAttached(), Eq(true)); } TEST_P(ChunkQueue_test, DISABLED_PushAndTriggersSemaphore) @@ -153,13 +156,13 @@ TEST_P(ChunkQueue_test, DISABLED_PushAndTriggersSemaphore) auto semaphore = semaphorePool.createObjectWithCreationPattern(0); ASSERT_THAT(semaphore.has_error(), Eq(false)); - auto ret = m_dut.attachSemaphore(*semaphore); + auto ret = m_popper.attachSemaphore(*semaphore); EXPECT_FALSE(ret.has_error()); EXPECT_THAT(semaphore->get()->tryWait(), Eq(false)); auto chunk = allocateChunk(); - m_dut.push(chunk); + m_pusher.push(chunk); EXPECT_THAT(semaphore->get()->tryWait(), Eq(true)); EXPECT_THAT(semaphore->get()->tryWait(), Eq(false)); // shouldn't trigger a second time @@ -172,10 +175,10 @@ TEST_P(ChunkQueue_test, DISABLED_AttachSecondSemaphore) auto semaphore2 = semaphorePool.createObjectWithCreationPattern(0); ASSERT_THAT(semaphore2.has_error(), Eq(false)); - auto ret1 = m_dut.attachSemaphore(*semaphore1); + auto ret1 = m_popper.attachSemaphore(*semaphore1); EXPECT_FALSE(ret1.has_error()); - auto ret2 = m_dut.attachSemaphore(*semaphore2); + auto ret2 = m_popper.attachSemaphore(*semaphore2); EXPECT_TRUE(ret2.has_error()); ASSERT_THAT(ret2.get_error(), Eq(ChunkQueueError::SEMAPHORE_ALREADY_SET)); @@ -183,7 +186,7 @@ TEST_P(ChunkQueue_test, DISABLED_AttachSecondSemaphore) EXPECT_THAT(semaphore2->get()->tryWait(), Eq(false)); auto chunk = allocateChunk(); - m_dut.push(chunk); + m_pusher.push(chunk); EXPECT_THAT(semaphore1->get()->tryWait(), Eq(true)); EXPECT_THAT(semaphore2->get()->tryWait(), Eq(false)); @@ -197,27 +200,28 @@ class ChunkQueueFiFo_test : public Test, public ChunkQueue_testBase void TearDown() override{}; ChunkQueueData m_chunkData{iox::cxx::VariantQueueTypes::FiFo_SingleProducerSingleConsumer}; - ChunkQueue m_dut{&m_chunkData}; + ChunkQueuePopper m_popper{&m_chunkData}; + ChunkQueuePusher m_pusher{&m_chunkData}; }; /// @note API currently not supported TEST_F(ChunkQueueFiFo_test, DISABLED_InitialSize) { - EXPECT_THAT(m_dut.size(), Eq(0u)); + EXPECT_THAT(m_popper.size(), Eq(0u)); } /// @note API currently not supported TEST_F(ChunkQueueFiFo_test, DISABLED_Capacity) { - EXPECT_THAT(m_dut.capacity(), Eq(iox::MAX_RECEIVER_QUEUE_CAPACITY)); + EXPECT_THAT(m_popper.capacity(), Eq(iox::MAX_RECEIVER_QUEUE_CAPACITY)); } /// @note API currently not supported TEST_F(ChunkQueueFiFo_test, DISABLED_SetCapacity) { - m_dut.setCapacity(RESIZED_CAPACITY); - EXPECT_THAT(m_dut.capacity(), Eq(RESIZED_CAPACITY)); + m_popper.setCapacity(RESIZED_CAPACITY); + EXPECT_THAT(m_popper.capacity(), Eq(RESIZED_CAPACITY)); } TEST_F(ChunkQueueFiFo_test, PushFull) @@ -225,11 +229,13 @@ TEST_F(ChunkQueueFiFo_test, PushFull) for (auto i = 0u; i < iox::MAX_RECEIVER_QUEUE_CAPACITY; ++i) { auto chunk = allocateChunk(); - m_dut.push(chunk); + m_pusher.push(chunk); } auto chunk = allocateChunk(); - EXPECT_THAT(m_dut.push(chunk), Eq(false)); - EXPECT_THAT(m_dut.empty(), Eq(false)); + auto ret = m_pusher.push(chunk); + EXPECT_TRUE(ret.has_error()); + EXPECT_THAT(ret.get_error(), Eq(iox::popo::ChunkQueueError::QUEUE_OVERFLOW)); + EXPECT_THAT(m_popper.empty(), Eq(false)); } /// @note this could be changed to a parameterized ChunkQueueOverflowingFIFO_test when there are more FIFOs available @@ -240,24 +246,25 @@ class ChunkQueueSoFi_test : public Test, public ChunkQueue_testBase void TearDown() override{}; ChunkQueueData m_chunkData{iox::cxx::VariantQueueTypes::SoFi_SingleProducerSingleConsumer}; - ChunkQueue m_dut{&m_chunkData}; + ChunkQueuePopper m_popper{&m_chunkData}; + ChunkQueuePusher m_pusher{&m_chunkData}; }; TEST_F(ChunkQueueSoFi_test, InitialSize) { - EXPECT_THAT(m_dut.size(), Eq(0u)); + EXPECT_THAT(m_popper.size(), Eq(0u)); } TEST_F(ChunkQueueSoFi_test, Capacity) { - EXPECT_THAT(m_dut.capacity(), Eq(iox::MAX_RECEIVER_QUEUE_CAPACITY)); + EXPECT_THAT(m_popper.capacity(), Eq(iox::MAX_RECEIVER_QUEUE_CAPACITY)); } TEST_F(ChunkQueueSoFi_test, SetCapacity) { - m_dut.setCapacity(RESIZED_CAPACITY); - EXPECT_THAT(m_dut.capacity(), Eq(RESIZED_CAPACITY)); + m_popper.setCapacity(RESIZED_CAPACITY); + EXPECT_THAT(m_popper.capacity(), Eq(RESIZED_CAPACITY)); } TEST_F(ChunkQueueSoFi_test, PushFull) @@ -265,11 +272,21 @@ TEST_F(ChunkQueueSoFi_test, PushFull) for (auto i = 0u; i < iox::MAX_RECEIVER_QUEUE_CAPACITY * 2; ++i) { auto chunk = allocateChunk(); - m_dut.push(chunk); + m_pusher.push(chunk); } - auto chunk = allocateChunk(); - EXPECT_THAT(m_dut.push(chunk), Eq(true)); - EXPECT_THAT(m_dut.empty(), Eq(false)); - constexpr uint32_t SOFI_SIZE_WHEN_FULL = iox::MAX_RECEIVER_QUEUE_CAPACITY + 1; - EXPECT_THAT(m_dut.size(), Eq(SOFI_SIZE_WHEN_FULL)); + + { + // pushing is still fine + auto chunk = allocateChunk(); + auto ret = m_pusher.push(chunk); + EXPECT_FALSE(ret.has_error()); + EXPECT_THAT(m_popper.empty(), Eq(false)); + } + // get al the chunks in the queue + while (m_popper.pop().has_value()) + { + } + + // now all chunks are released + EXPECT_THAT(mempool.getUsedChunks(), Eq(0u)); } diff --git a/iceoryx_posh/test/moduletests/test_popo_chunk_receiver.cpp b/iceoryx_posh/test/moduletests/test_popo_chunk_receiver.cpp new file mode 100644 index 0000000000..c032b59d50 --- /dev/null +++ b/iceoryx_posh/test/moduletests/test_popo_chunk_receiver.cpp @@ -0,0 +1,213 @@ +// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "iceoryx_posh/iceoryx_posh_types.hpp" +#include "iceoryx_posh/internal/mepoo/memory_manager.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_pusher.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_receiver.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_receiver_data.hpp" +#include "iceoryx_posh/mepoo/mepoo_config.hpp" +#include "iceoryx_utils/error_handling/error_handling.hpp" +#include "iceoryx_utils/internal/posix_wrapper/shared_memory_object/allocator.hpp" +#include "test.hpp" + +#include + +using namespace ::testing; + +struct DummySample +{ + uint64_t dummy{42}; +}; + +class ChunkReceiver_test : public Test +{ + protected: + ChunkReceiver_test() + { + m_mempoolconf.addMemPool({CHUNK_SIZE, NUM_CHUNKS_IN_POOL}); + m_memoryManager.configureMemoryManager(m_mempoolconf, &m_memoryAllocator, &m_memoryAllocator); + } + + ~ChunkReceiver_test() + { + } + + void SetUp() + { + } + + void TearDown() + { + } + + static constexpr size_t MEMORY_SIZE = 1024 * 1024; + uint8_t m_memory[MEMORY_SIZE]; + static constexpr uint32_t NUM_CHUNKS_IN_POOL = iox::MAX_CHUNKS_HELD_PER_RECEIVER + iox::MAX_RECEIVER_QUEUE_CAPACITY; + static constexpr uint32_t CHUNK_SIZE = 128; + + iox::posix::Allocator m_memoryAllocator{m_memory, MEMORY_SIZE}; + iox::mepoo::MePooConfig m_mempoolconf; + iox::mepoo::MemoryManager m_memoryManager; + + iox::popo::ChunkReceiverData m_chunkReceiverData{iox::cxx::VariantQueueTypes::SoFi_SingleProducerSingleConsumer}; + iox::popo::ChunkReceiver m_chunkReceiver{&m_chunkReceiverData}; + iox::popo::ChunkQueuePusher m_chunkQueuePusher{&m_chunkReceiverData}; +}; + +TEST_F(ChunkReceiver_test, getNoChunkFromEmptyQueue) +{ + auto getRet = m_chunkReceiver.get(); + EXPECT_FALSE(getRet.has_error()); + EXPECT_FALSE((*getRet).has_value()); +} + +TEST_F(ChunkReceiver_test, getAndReleaseOneChunk) +{ + { + // have a scope her to release the shared chunk we allocate + auto sharedChunk = m_memoryManager.getChunk(sizeof(DummySample)); + EXPECT_TRUE(sharedChunk); + EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(1u)); + auto pushRet = m_chunkQueuePusher.push(sharedChunk); + EXPECT_FALSE(pushRet.has_error()); + + auto getRet = m_chunkReceiver.get(); + EXPECT_FALSE(getRet.has_error()); + EXPECT_TRUE((*getRet).has_value()); + + EXPECT_TRUE(sharedChunk.getPayload() == (**getRet)->payload()); + m_chunkReceiver.release(**getRet); + } + + EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(0u)); +} + +TEST_F(ChunkReceiver_test, getAndReleaseMultipleChunks) +{ + std::vector chunks; + + for (size_t i = 0; i < iox::MAX_CHUNKS_HELD_PER_RECEIVER; i++) + { + auto sharedChunk = m_memoryManager.getChunk(sizeof(DummySample)); + EXPECT_TRUE(sharedChunk); + auto sample = sharedChunk.getPayload(); + new (sample) DummySample(); + static_cast(sample)->dummy = i; + + auto pushRet = m_chunkQueuePusher.push(sharedChunk); + EXPECT_FALSE(pushRet.has_error()); + + auto getRet = m_chunkReceiver.get(); + EXPECT_FALSE(getRet.has_error()); + EXPECT_TRUE((*getRet).has_value()); + chunks.push_back(**getRet); + } + + EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(iox::MAX_CHUNKS_HELD_PER_RECEIVER)); + + for (size_t i = 0; i < iox::MAX_CHUNKS_HELD_PER_RECEIVER; i++) + { + const auto chunk = chunks.back(); + chunks.pop_back(); + auto dummySample = *reinterpret_cast(chunk->payload()); + EXPECT_THAT(dummySample.dummy, Eq(iox::MAX_CHUNKS_HELD_PER_RECEIVER - 1 - i)); + m_chunkReceiver.release(chunk); + } + + EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(0u)); +} + +TEST_F(ChunkReceiver_test, getTooMuchWithoutRelease) +{ + // one more is OK, but we assume that one is released then (aligned with ara::com behavior) + // therefore MAX_CHUNKS_HELD_PER_RECEIVER+1 + for (size_t i = 0; i < iox::MAX_CHUNKS_HELD_PER_RECEIVER + 1; i++) + { + auto sharedChunk = m_memoryManager.getChunk(sizeof(DummySample)); + EXPECT_TRUE(sharedChunk); + + auto pushRet = m_chunkQueuePusher.push(sharedChunk); + EXPECT_FALSE(pushRet.has_error()); + + auto getRet = m_chunkReceiver.get(); + EXPECT_FALSE(getRet.has_error()); + EXPECT_TRUE((*getRet).has_value()); + } + + // but now it breaks + auto sharedChunk = m_memoryManager.getChunk(sizeof(DummySample)); + EXPECT_TRUE(sharedChunk); + + auto pushRet = m_chunkQueuePusher.push(sharedChunk); + EXPECT_FALSE(pushRet.has_error()); + + auto getRet = m_chunkReceiver.get(); + EXPECT_TRUE(getRet.has_error()); + EXPECT_THAT(getRet.get_error(), Eq(iox::popo::ChunkReceiverError::TOO_MANY_CHUNKS_HELD_IN_PARALLEL)); +} + +TEST_F(ChunkReceiver_test, releaseInvalidChunk) +{ + { + // have a scope her to release the shared chunk we allocate + auto sharedChunk = m_memoryManager.getChunk(sizeof(DummySample)); + EXPECT_TRUE(sharedChunk); + EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(1u)); + auto pushRet = m_chunkQueuePusher.push(sharedChunk); + EXPECT_FALSE(pushRet.has_error()); + + auto getRet = m_chunkReceiver.get(); + EXPECT_FALSE(getRet.has_error()); + EXPECT_TRUE((*getRet).has_value()); + + EXPECT_TRUE(sharedChunk.getPayload() == (**getRet)->payload()); + } + + auto errorHandlerCalled{false}; + auto errorHandlerGuard = iox::ErrorHandler::SetTemporaryErrorHandler([&errorHandlerCalled]( + const iox::Error, const std::function, const iox::ErrorLevel) { errorHandlerCalled = true; }); + + auto myCrazyChunk = std::make_shared(); + m_chunkReceiver.release(myCrazyChunk.get()); + + EXPECT_TRUE(errorHandlerCalled); + EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(1u)); +} + +TEST_F(ChunkReceiver_test, Cleanup) +{ + for (size_t i = 0; i < iox::MAX_CHUNKS_HELD_PER_RECEIVER + iox::MAX_RECEIVER_QUEUE_CAPACITY; i++) + { + // MAX_CHUNKS_HELD_PER_RECEIVER on user side and MAX_RECEIVER_QUEUE_CAPACITY in the queue + auto sharedChunk = m_memoryManager.getChunk(sizeof(DummySample)); + EXPECT_TRUE(sharedChunk); + auto pushRet = m_chunkQueuePusher.push(sharedChunk); + EXPECT_FALSE(pushRet.has_error()); + + if (i < iox::MAX_CHUNKS_HELD_PER_RECEIVER) + { + auto getRet = m_chunkReceiver.get(); + EXPECT_FALSE(getRet.has_error()); + EXPECT_TRUE((*getRet).has_value()); + } + } + + EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, + Eq(iox::MAX_CHUNKS_HELD_PER_RECEIVER + iox::MAX_RECEIVER_QUEUE_CAPACITY)); + + m_chunkReceiver.releaseAll(); + + EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(0u)); +} \ No newline at end of file diff --git a/iceoryx_posh/test/moduletests/test_popo_chunk_sender.cpp b/iceoryx_posh/test/moduletests/test_popo_chunk_sender.cpp index 231673649f..ef31d1b8b4 100644 --- a/iceoryx_posh/test/moduletests/test_popo_chunk_sender.cpp +++ b/iceoryx_posh/test/moduletests/test_popo_chunk_sender.cpp @@ -1,4 +1,4 @@ -// Copyright (c) 2019 by Robert Bosch GmbH. All rights reserved. +// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,8 +16,9 @@ #include "iceoryx_posh/internal/mepoo/memory_manager.hpp" #include "iceoryx_posh/internal/popo/building_blocks/chunk_distributor.hpp" #include "iceoryx_posh/internal/popo/building_blocks/chunk_distributor_data.hpp" -#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue.hpp" #include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_data.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_popper.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_pusher.hpp" #include "iceoryx_posh/internal/popo/building_blocks/chunk_sender.hpp" #include "iceoryx_posh/internal/popo/building_blocks/chunk_sender_data.hpp" #include "iceoryx_posh/mepoo/mepoo_config.hpp" @@ -34,17 +35,17 @@ struct DummySample uint64_t dummy{42}; }; -class ChunkSender_testBase : public Test +class ChunkSender_test : public Test { protected: - ChunkSender_testBase() + ChunkSender_test() { m_mempoolconf.addMemPool({SMALL_CHUNK, NUM_CHUNKS_IN_POOL}); m_mempoolconf.addMemPool({BIG_CHUNK, NUM_CHUNKS_IN_POOL}); m_memoryManager.configureMemoryManager(m_mempoolconf, &m_memoryAllocator, &m_memoryAllocator); } - ~ChunkSender_testBase() + ~ChunkSender_test() { } @@ -57,7 +58,7 @@ class ChunkSender_testBase : public Test } static constexpr size_t MEMORY_SIZE = 1024 * 1024; - uint8_t m_memory[1024 * 1024]; + uint8_t m_memory[MEMORY_SIZE]; static constexpr uint32_t NUM_CHUNKS_IN_POOL = 20; static constexpr uint32_t SMALL_CHUNK = 128; static constexpr uint32_t BIG_CHUNK = 256; @@ -70,24 +71,16 @@ class ChunkSender_testBase : public Test iox::popo::ChunkQueueData m_chunkQueueData{iox::cxx::VariantQueueTypes::SoFi_SingleProducerSingleConsumer}; - using ChunkDistributorData_t = iox::popo::ChunkDistributorData; + using ChunkDistributorData_t = + iox::popo::ChunkDistributorData; iox::popo::ChunkSenderData m_chunkSenderData{&m_memoryManager, 0}; // must be 0 for test iox::popo::ChunkSenderData m_chunkSenderDataWithHistory{&m_memoryManager, HISTORY_CAPACITY}; - using ChunkDistributor_t = iox::popo::ChunkDistributor; + using ChunkDistributor_t = iox::popo::ChunkDistributor; iox::popo::ChunkSender m_chunkSender{&m_chunkSenderData}; iox::popo::ChunkSender m_chunkSenderWithHistory{&m_chunkSenderDataWithHistory}; }; -class ChunkSender_test : public ChunkSender_testBase -{ - public: - ChunkSender_test() - : ChunkSender_testBase() - { - } -}; - TEST_F(ChunkSender_test, allocate_OneChunk) { auto chunk = m_chunkSender.allocate(sizeof(DummySample)); @@ -133,7 +126,7 @@ TEST_F(ChunkSender_test, allocate_Overflow) // Allocate one more sample for overflow auto chunk = m_chunkSender.allocate(sizeof(DummySample)); EXPECT_TRUE(chunk.has_error()); - EXPECT_THAT(chunk.get_error(), Eq(iox::popo::ChunkSenderError::TOO_MANY_CHUKS_ALLOCATED_IN_PARALLEL)); + EXPECT_THAT(chunk.get_error(), Eq(iox::popo::ChunkSenderError::TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL)); EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(iox::MAX_CHUNKS_ALLOCATE_PER_SENDER)); } @@ -200,7 +193,7 @@ TEST_F(ChunkSender_test, sendMultipleWithoutReceiverAndAlwaysLast) { auto chunk = m_chunkSender.allocate(sizeof(DummySample)); EXPECT_FALSE(chunk.has_error()); - auto lastChunk = m_chunkSender.getLastChunk(); + auto lastChunk = m_chunkSender.getLast(); if (i > 0) { EXPECT_TRUE(lastChunk.has_value()); @@ -227,7 +220,7 @@ TEST_F(ChunkSender_test, sendMultipleWithoutReceiverWithHistoryNoLastReuse) { auto chunk = m_chunkSenderWithHistory.allocate(sizeof(DummySample)); EXPECT_FALSE(chunk.has_error()); - auto lastChunk = m_chunkSenderWithHistory.getLastChunk(); + auto lastChunk = m_chunkSenderWithHistory.getLast(); if (i > 0) { EXPECT_TRUE(lastChunk.has_value()); @@ -264,7 +257,7 @@ TEST_F(ChunkSender_test, sendOneWithReceiver) // consume the sample { - iox::popo::ChunkQueue myQueue(&m_chunkQueueData); + iox::popo::ChunkQueuePopper myQueue(&m_chunkQueueData); EXPECT_FALSE(myQueue.empty()); auto popRet = myQueue.pop(); EXPECT_TRUE(popRet.has_value()); @@ -277,7 +270,7 @@ TEST_F(ChunkSender_test, sendOneWithReceiver) TEST_F(ChunkSender_test, sendMultipleWithReceiver) { m_chunkSender.addQueue(&m_chunkQueueData); - iox::popo::ChunkQueue checkQueue(&m_chunkQueueData); + iox::popo::ChunkQueuePopper checkQueue(&m_chunkQueueData); EXPECT_TRUE(NUM_CHUNKS_IN_POOL < checkQueue.capacity()); for (size_t i = 0; i < NUM_CHUNKS_IN_POOL; i++) @@ -296,7 +289,7 @@ TEST_F(ChunkSender_test, sendMultipleWithReceiver) for (size_t i = 0; i < NUM_CHUNKS_IN_POOL; i++) { - iox::popo::ChunkQueue myQueue(&m_chunkQueueData); + iox::popo::ChunkQueuePopper myQueue(&m_chunkQueueData); EXPECT_FALSE(myQueue.empty()); auto popRet = myQueue.pop(); EXPECT_TRUE(popRet.has_value()); @@ -309,7 +302,7 @@ TEST_F(ChunkSender_test, sendMultipleWithReceiver) TEST_F(ChunkSender_test, sendMultipleWithReceiverExternalSequenceNumber) { m_chunkSender.addQueue(&m_chunkQueueData); - iox::popo::ChunkQueue checkQueue(&m_chunkQueueData); + iox::popo::ChunkQueuePopper checkQueue(&m_chunkQueueData); EXPECT_TRUE(NUM_CHUNKS_IN_POOL < checkQueue.capacity()); for (size_t i = 0; i < NUM_CHUNKS_IN_POOL; i++) @@ -327,7 +320,7 @@ TEST_F(ChunkSender_test, sendMultipleWithReceiverExternalSequenceNumber) for (size_t i = 0; i < NUM_CHUNKS_IN_POOL; i++) { - iox::popo::ChunkQueue myQueue(&m_chunkQueueData); + iox::popo::ChunkQueuePopper myQueue(&m_chunkQueueData); EXPECT_FALSE(myQueue.empty()); auto popRet = myQueue.pop(); EXPECT_TRUE(popRet.has_value()); @@ -339,7 +332,7 @@ TEST_F(ChunkSender_test, sendMultipleWithReceiverExternalSequenceNumber) TEST_F(ChunkSender_test, sendTillRunningOutOfChunks) { m_chunkSender.addQueue(&m_chunkQueueData); - iox::popo::ChunkQueue checkQueue(&m_chunkQueueData); + iox::popo::ChunkQueuePopper checkQueue(&m_chunkQueueData); EXPECT_TRUE(NUM_CHUNKS_IN_POOL < checkQueue.capacity()); for (size_t i = 0; i < NUM_CHUNKS_IN_POOL; i++) @@ -377,12 +370,11 @@ TEST_F(ChunkSender_test, sendInvalidChunk) auto errorHandlerGuard = iox::ErrorHandler::SetTemporaryErrorHandler([&errorHandlerCalled]( const iox::Error, const std::function, const iox::ErrorLevel) { errorHandlerCalled = true; }); - iox::mepoo::ChunkHeader* myCrazyChunk = new iox::mepoo::ChunkHeader(); - m_chunkSender.send(myCrazyChunk); + auto myCrazyChunk = std::make_shared(); + m_chunkSender.send(myCrazyChunk.get()); EXPECT_TRUE(errorHandlerCalled); EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(1u)); - delete myCrazyChunk; } TEST_F(ChunkSender_test, pushToHistory) @@ -423,7 +415,7 @@ TEST_F(ChunkSender_test, sendMultipleWithReceiverNoLastReuse) { auto chunk = m_chunkSender.allocate(sizeof(DummySample)); EXPECT_FALSE(chunk.has_error()); - auto lastChunk = m_chunkSender.getLastChunk(); + auto lastChunk = m_chunkSender.getLast(); if (i > 0) { EXPECT_TRUE(lastChunk.has_value()); @@ -452,7 +444,7 @@ TEST_F(ChunkSender_test, sendMultipleWithReceiverLastReuseBecauseAlreadyConsumed { auto chunk = m_chunkSender.allocate(sizeof(DummySample)); EXPECT_FALSE(chunk.has_error()); - auto lastChunk = m_chunkSender.getLastChunk(); + auto lastChunk = m_chunkSender.getLast(); if (i > 0) { EXPECT_TRUE(lastChunk.has_value()); @@ -468,7 +460,7 @@ TEST_F(ChunkSender_test, sendMultipleWithReceiverLastReuseBecauseAlreadyConsumed new (sample) DummySample(); m_chunkSender.send(*chunk); - iox::popo::ChunkQueue myQueue(&m_chunkQueueData); + iox::popo::ChunkQueuePopper myQueue(&m_chunkQueueData); EXPECT_FALSE(myQueue.empty()); auto popRet = myQueue.pop(); EXPECT_TRUE(popRet.has_value()); @@ -494,7 +486,7 @@ TEST_F(ChunkSender_test, ReuseLastIfSmaller) EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(0u)); EXPECT_THAT(m_memoryManager.getMemPoolInfo(1).m_usedChunks, Eq(1u)); - auto lastChunk = m_chunkSender.getLastChunk(); + auto lastChunk = m_chunkSender.getLast(); EXPECT_TRUE(lastChunk.has_value()); // We get the last chunk again EXPECT_TRUE(*chunkSmaller == *lastChunk); @@ -517,7 +509,7 @@ TEST_F(ChunkSender_test, NoReuseOfLastIfBigger) EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(1u)); EXPECT_THAT(m_memoryManager.getMemPoolInfo(1).m_usedChunks, Eq(1u)); - auto lastChunk = m_chunkSender.getLastChunk(); + auto lastChunk = m_chunkSender.getLast(); EXPECT_TRUE(lastChunk.has_value()); // not the last chunk EXPECT_FALSE(*chunkBigger == *lastChunk); @@ -540,7 +532,7 @@ TEST_F(ChunkSender_test, ReuseOfLastIfBiggerButFitsInChunk) EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(1u)); EXPECT_THAT(m_memoryManager.getMemPoolInfo(1).m_usedChunks, Eq(0u)); - auto lastChunk = m_chunkSender.getLastChunk(); + auto lastChunk = m_chunkSender.getLast(); EXPECT_TRUE(lastChunk.has_value()); // not the last chunk EXPECT_TRUE(*chunkBigger == *lastChunk); @@ -567,7 +559,7 @@ TEST_F(ChunkSender_test, Cleanup) EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(HISTORY_CAPACITY + iox::MAX_CHUNKS_ALLOCATE_PER_SENDER)); - m_chunkSenderWithHistory.releaseAllChunks(); + m_chunkSenderWithHistory.releaseAll(); EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(0u)); } \ No newline at end of file diff --git a/iceoryx_utils/include/iceoryx_utils/error_handling/error_handling.hpp b/iceoryx_utils/include/iceoryx_utils/error_handling/error_handling.hpp index 31e9cc46bf..e3ffe2f707 100644 --- a/iceoryx_utils/include/iceoryx_utils/error_handling/error_handling.hpp +++ b/iceoryx_utils/include/iceoryx_utils/error_handling/error_handling.hpp @@ -51,6 +51,7 @@ namespace iox error(POPO__CHUNK_DISTRIBUTOR_CLEANUP_DEADLOCK_BECAUSE_BAD_APPLICATION_TERMINATION) \ error(POPO__CHUNK_SENDER_INVALID_CHUNK_TO_FREE_FROM_USER) \ error(POPO__CHUNK_SENDER_INVALID_CHUNK_TO_SEND_FROM_USER) \ + error(POPO__CHUNK_RECEIVER_INVALID_CHUNK_TO_RELEASE_FROM_USER) \ error(MEPOO__MEMPOOL_CONFIG_MUST_BE_ORDERED_BY_INCREASING_SIZE) \ error(MEPOO__MEMPOOL_GETCHUNK_CHUNK_IS_TOO_LARGE) \ error(MEPOO__MEMPOOL_CHUNKSIZE_MUST_BE_LARGER_THAN_SHARED_MEMORY_ALIGNMENT_AND_MULTIPLE_OF_ALIGNMENT) \ diff --git a/iceoryx_utils/include/iceoryx_utils/internal/cxx/variant_queue.inl b/iceoryx_utils/include/iceoryx_utils/internal/cxx/variant_queue.inl index 30fabd9e8b..4c2ca204ff 100644 --- a/iceoryx_utils/include/iceoryx_utils/internal/cxx/variant_queue.inl +++ b/iceoryx_utils/include/iceoryx_utils/internal/cxx/variant_queue.inl @@ -68,16 +68,16 @@ VariantQueue::push(const ValueType& value) noexcept case VariantQueueTypes::SoFi_SingleProducerSingleConsumer: { ValueType overriddenValue; - auto override = + auto notskipped = m_fifo.template get_at_index(VariantQueueTypes::SoFi_SingleProducerSingleConsumer)>() ->push(value, overriddenValue); - if (override) + if (notskipped) { - ret = success>(optional(std::move(overriddenValue))); + ret = success>(nullopt_t()); } else { - ret = success>(nullopt_t()); + ret = success>(optional(std::move(overriddenValue))); } break; } diff --git a/iceoryx_utils/test/moduletests/test_cxx_variant_queue.cpp b/iceoryx_utils/test/moduletests/test_cxx_variant_queue.cpp index 2fff85caa7..daedf6d355 100644 --- a/iceoryx_utils/test/moduletests/test_cxx_variant_queue.cpp +++ b/iceoryx_utils/test/moduletests/test_cxx_variant_queue.cpp @@ -108,10 +108,13 @@ TEST_F(VariantQueue_test, handlesOverflow) { PerformTestForQueueTypes([](uint64_t typeID) { VariantQueue sut(static_cast(typeID)); + // current SOFI can hold capacity +1 values, so push some more to ensure overflow sut.push(14123); sut.push(24123); + sut.push(22222); + sut.push(33333); auto hasPushed = sut.push(667); - EXPECT_THAT((hasPushed.has_error() || hasPushed.get_value().has_value()), Eq(true)); + EXPECT_THAT((hasPushed.has_error() || (hasPushed.get_value()).has_value()), Eq(true)); }); }