diff --git a/iceoryx_posh/CMakeLists.txt b/iceoryx_posh/CMakeLists.txt index 3e6abe8bb4..3dfae645ca 100644 --- a/iceoryx_posh/CMakeLists.txt +++ b/iceoryx_posh/CMakeLists.txt @@ -74,6 +74,7 @@ add_library(iceoryx_posh source/popo/receiver_handler.cpp source/popo/building_blocks/chunk_queue.cpp source/popo/building_blocks/chunk_distributor.cpp + source/popo/building_blocks/chunk_sender.cpp source/runtime/message_queue_interface.cpp source/runtime/message_queue_message.cpp source/runtime/port_config_info.cpp diff --git a/iceoryx_posh/include/iceoryx_posh/iceoryx_posh_types.hpp b/iceoryx_posh/include/iceoryx_posh/iceoryx_posh_types.hpp index 24aecb512a..142fd7e4e0 100644 --- a/iceoryx_posh/include/iceoryx_posh/iceoryx_posh_types.hpp +++ b/iceoryx_posh/include/iceoryx_posh/iceoryx_posh_types.hpp @@ -49,7 +49,7 @@ constexpr units::Duration PROCESS_KEEP_ALIVE_TIMEOUT = 5 * PROCESS_KEEP_ALIVE_IN constexpr uint32_t MAX_PORT_NUMBER = 4096u; constexpr uint32_t MAX_INTERFACE_NUMBER = 16u; constexpr uint32_t MAX_RECEIVERS_PER_SENDERPORT = 256u; -constexpr uint32_t MAX_SAMPLE_ALLOCATE_PER_SENDER = 16u; +constexpr uint32_t MAX_CHUNKS_ALLOCATE_PER_SENDER = 16u; constexpr uint64_t MAX_SENDER_SAMPLE_HISTORY_CAPACITY = 16u; constexpr uint32_t MAX_RECEIVER_QUEUE_CAPACITY = 256u; constexpr uint32_t MAX_INTERFACE_CAPRO_FIFO_SIZE = MAX_PORT_NUMBER; diff --git a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_distributor_data.hpp b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_distributor_data.hpp index 1efa48bf51..efbaba0be6 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_distributor_data.hpp +++ b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_distributor_data.hpp @@ -31,7 +31,7 @@ struct ChunkQueueData; struct ChunkDistributorData { - ChunkDistributorData(uint64_t historyCapacity = MAX_SENDER_SAMPLE_HISTORY_CAPACITY) noexcept + ChunkDistributorData(uint64_t historyCapacity = 0u) noexcept : m_historyCapacity(algorithm::min(historyCapacity, MAX_SENDER_SAMPLE_HISTORY_CAPACITY)) { if (m_historyCapacity != historyCapacity) diff --git a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender.hpp b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender.hpp new file mode 100644 index 0000000000..fcd6895d10 --- /dev/null +++ b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender.hpp @@ -0,0 +1,91 @@ +// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef IOX_POSH_POPO_CHUNK_SENDER_HPP_ +#define IOX_POSH_POPO_CHUNK_SENDER_HPP_ + +#include "iceoryx_posh/internal/mepoo/shared_chunk.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_distributor.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_sender_data.hpp" +#include "iceoryx_posh/mepoo/chunk_header.hpp" +#include "iceoryx_utils/cxx/expected.hpp" + +namespace iox +{ +namespace popo +{ +/// @brief error which can occur in the VariantQueue +enum class ChunkSenderError +{ + RUNNING_OUT_OF_CHUNKS, + TOO_MANY_CHUKS_ALLOCATED_IN_PARALLEL +}; + +/// @brief The ChunkSender is a building block of the shared memory communication infrastructure. It extends +/// the functionality of a ChunkDistributor with the abililty to allocate and free memory chunks. +/// For getting chunks of memory the MemoryManger is used. Together with the ChunkReceiver, they are the next +/// abstraction layer on top of ChunkDistributor and ChunkQueue. The ChunkSender holds the ownership of the +/// SharedChunks and does a bookkeeping which chunks are currently passed to the user side. +class ChunkSender : public ChunkDistributor +{ + public: + using MemberType_t = ChunkSenderData; + + ChunkSender(MemberType_t* const chunkDistributorDataPtr) noexcept; + + ChunkSender(const ChunkSender& other) = delete; + ChunkSender& operator=(const ChunkSender&) = delete; + ChunkSender(ChunkSender&& rhs) = default; + ChunkSender& operator=(ChunkSender&& rhs) = default; + ~ChunkSender() = default; + + /// @brief Allocate a chunk, the ownerhip of the SharedChunk remains in the ChunkSender for being able to cleanup if + /// the user process disappears + /// @param[in] payloadSize, size of the user paylaod without additional headers + /// @return on success pointer to a ChunkHeader which can be used to access the payload and header fields, error if + /// not + cxx::expected allocate(const uint32_t payloadSize) noexcept; + + /// @brief Free an allocated chunk without sending it + /// @param[in] chunkHeader, pointer to the ChunkHeader to free + void free(mepoo::ChunkHeader* const chunkHeader) noexcept; + + /// @brief Send an allocated chunk to all connected ChunkQueue + /// @param[in] chunkHeader, pointer to the ChunkHeader to send + void send(mepoo::ChunkHeader* const chunkHeader) noexcept; + + /// @brief Push an allocated chunk to the history without sending it + /// @param[in] chunkHeader, pointer to the ChunkHeader to push to the history + void pushToHistory(mepoo::ChunkHeader* const chunkHeader) noexcept; + + /// @brief Release all the chunks that are currently held. Caution: Only call this if the user process is no more + /// running E.g. This cleans up chunks that were held by a user process that died unexpectetly, for avoiding lost + /// chunks in the system + void cleanup() noexcept; + + private: + /// @brief Get the SharedChunk from the provided ChunkHeader and do all that is required to send the chunk + /// @param[in] chunkHeader of the chunk that shall be send + /// @param[in][out] chunk that corresponds to the chunk header + /// @return true if there was a matching chunk with this header, false if not + bool getChunkReadyForSend(mepoo::ChunkHeader* chunkHeader, mepoo::SharedChunk& chunk) noexcept; + + const MemberType_t* getMembers() const noexcept; + MemberType_t* getMembers() noexcept; +}; + +} // namespace popo +} // namespace iox + +#endif diff --git a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender_data.hpp b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender_data.hpp index e5a70d6bd6..2c4d816344 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender_data.hpp +++ b/iceoryx_posh/include/iceoryx_posh/internal/popo/building_blocks/chunk_sender_data.hpp @@ -27,24 +27,25 @@ namespace iox { namespace popo { -struct ChunkSenderData +struct ChunkSenderData : public ChunkDistributorData { - ChunkSenderData(mepoo::MemoryManager* const memMgr, - uint64_t historyCapacity = MAX_SENDER_SAMPLE_HISTORY_CAPACITY, - const MemoryInfo& memoryInfo = MemoryInfo()) noexcept - : m_memoryMgr(memMgr) + ChunkSenderData(mepoo::MemoryManager* const memoryManager, + uint64_t historyCapacity = 0u, + const mepoo::MemoryInfo& memoryInfo = mepoo::MemoryInfo()) noexcept + : ChunkDistributorData(historyCapacity) + , m_memoryMgr(memoryManager) , m_memoryInfo(memoryInfo) - , m_chunkDistributor(historyCapacity) { + assert(nullptr != memoryManager); } - iox::relative_ptr m_memoryMgr; - MemoryInfo m_memoryInfo; - ChunkDistributorData m_chunkDistributorData; - UsedChunkList m_chunksInUse; + relative_ptr m_memoryMgr; + mepoo::MemoryInfo m_memoryInfo; + UsedChunkList m_chunksInUse; mepoo::SequenceNumberType m_sequenceNumber{0u}; mepoo::SharedChunk m_lastChunk{nullptr}; + /// @todo This here? // bool m_isUnique{false}; // throughput related members // std::atomic m_activePayloadSize{0u}; diff --git a/iceoryx_posh/include/iceoryx_posh/internal/popo/sender_port_data.hpp b/iceoryx_posh/include/iceoryx_posh/internal/popo/sender_port_data.hpp index 0925be20cd..487fec697e 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/popo/sender_port_data.hpp +++ b/iceoryx_posh/include/iceoryx_posh/internal/popo/sender_port_data.hpp @@ -64,7 +64,7 @@ struct SenderPortData : public BasePortData bool m_isUnique{false}; - UsedChunkList m_allocatedChunksList; + UsedChunkList m_allocatedChunksList; mepoo::SequenceNumberType m_sequenceNumber{0u}; // throughput related members @@ -88,4 +88,3 @@ struct SenderPortData : public BasePortData } // namespace popo } // namespace iox - diff --git a/iceoryx_posh/source/popo/building_blocks/chunk_distributor.cpp b/iceoryx_posh/source/popo/building_blocks/chunk_distributor.cpp index f3326a2be5..f921e600cb 100644 --- a/iceoryx_posh/source/popo/building_blocks/chunk_distributor.cpp +++ b/iceoryx_posh/source/popo/building_blocks/chunk_distributor.cpp @@ -113,11 +113,14 @@ void ChunkDistributor::deliverToQueue(ChunkQueue::MemberType_t* const queue, mep void ChunkDistributor::addToHistoryWithoutDelivery(mepoo::SharedChunk chunk) noexcept { - if (getMembers()->m_sampleHistory.size() >= getMembers()->m_historyCapacity) + if (0 < getMembers()->m_historyCapacity) { - getMembers()->m_sampleHistory.erase(getMembers()->m_sampleHistory.begin()); + if (getMembers()->m_sampleHistory.size() >= getMembers()->m_historyCapacity) + { + getMembers()->m_sampleHistory.erase(getMembers()->m_sampleHistory.begin()); + } + getMembers()->m_sampleHistory.push_back(chunk); } - getMembers()->m_sampleHistory.push_back(chunk); } uint64_t ChunkDistributor::getHistorySize() noexcept diff --git a/iceoryx_posh/source/popo/building_blocks/chunk_sender.cpp b/iceoryx_posh/source/popo/building_blocks/chunk_sender.cpp new file mode 100644 index 0000000000..f03a6cef1d --- /dev/null +++ b/iceoryx_posh/source/popo/building_blocks/chunk_sender.cpp @@ -0,0 +1,149 @@ +// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "iceoryx_posh/internal/popo/building_blocks/chunk_sender.hpp" +#include "iceoryx_utils/error_handling/error_handling.hpp" + +namespace iox +{ +namespace popo +{ +ChunkSender::ChunkSender(MemberType_t* const chunkSenderDataPtr) noexcept + : ChunkDistributor(chunkSenderDataPtr) +{ +} + +const ChunkSender::MemberType_t* ChunkSender::getMembers() const noexcept +{ + return reinterpret_cast(ChunkDistributor::getMembers()); +} + +ChunkSender::MemberType_t* ChunkSender::getMembers() noexcept +{ + return reinterpret_cast(ChunkDistributor::getMembers()); +} + +cxx::expected ChunkSender::allocate(const uint32_t payloadSize) noexcept +{ + // use the chunk stored in m_lastChunk if there is one, there is no other owner and the new payload fits in this + // chunk + if (getMembers()->m_lastChunk && getMembers()->m_lastChunk.hasNoOtherOwners() + && getMembers()->m_lastChunk.getChunkHeader()->m_info.m_usedSizeOfChunk + >= getMembers()->m_memoryMgr->sizeWithChunkHeaderStruct(payloadSize)) + { + if (getMembers()->m_chunksInUse.insert(getMembers()->m_lastChunk)) + { + getMembers()->m_lastChunk.getChunkHeader()->m_info.m_payloadSize = payloadSize; + getMembers()->m_lastChunk.getChunkHeader()->m_info.m_usedSizeOfChunk = + getMembers()->m_memoryMgr->sizeWithChunkHeaderStruct(payloadSize); + return cxx::success(getMembers()->m_lastChunk.getChunkHeader()); + } + else + { + return cxx::error(ChunkSenderError::TOO_MANY_CHUKS_ALLOCATED_IN_PARALLEL); + } + } + else + { + // START of critical section, chunk will be lost if process gets hard terminated in between + // get a new chunk + mepoo::SharedChunk chunk = getMembers()->m_memoryMgr->getChunk(payloadSize); + + if (chunk) + { + // if the application allocated too much chunks, return no more chunks + if (getMembers()->m_chunksInUse.insert(chunk)) + { + // STOP of critical section, chunk will be lost if process gets hard terminated in between + return cxx::success(chunk.getChunkHeader()); + } + else + { + // release the allocated chunk + chunk = nullptr; + return cxx::error(ChunkSenderError::TOO_MANY_CHUKS_ALLOCATED_IN_PARALLEL); + } + } + else + { + return cxx::error(ChunkSenderError::RUNNING_OUT_OF_CHUNKS); + } + } +} + +void ChunkSender::free(mepoo::ChunkHeader* const chunkHeader) noexcept +{ + mepoo::SharedChunk chunk(nullptr); + if (!getMembers()->m_chunksInUse.remove(chunkHeader, chunk)) + { + errorHandler(Error::kPOPO__CHUNK_SENDER_INVALID_CHUNK_TO_FREE_FROM_USER, nullptr, ErrorLevel::SEVERE); + } +} + +void ChunkSender::send(mepoo::ChunkHeader* const chunkHeader) noexcept +{ + mepoo::SharedChunk chunk(nullptr); + // START of critical section, chunk will be lost if process gets hard terminated in between + if (getChunkReadyForSend(chunkHeader, chunk)) + { + this->deliverToAllStoredQueues(chunk); + getMembers()->m_lastChunk = chunk; + } + // STOP of critical section, chunk will be lost if process gets hard terminated in between +} + +void ChunkSender::pushToHistory(mepoo::ChunkHeader* const chunkHeader) noexcept +{ + mepoo::SharedChunk chunk(nullptr); + // START of critical section, chunk will be lost if process gets hard terminated in between + if (getChunkReadyForSend(chunkHeader, chunk)) + { + this->addToHistoryWithoutDelivery(chunk); + getMembers()->m_lastChunk = chunk; + } + // STOP of critical section, chunk will be lost if process gets hard terminated in between +} + +bool ChunkSender::getChunkReadyForSend(mepoo::ChunkHeader* chunkHeader, mepoo::SharedChunk& chunk) noexcept +{ + if (getMembers()->m_chunksInUse.remove(chunkHeader, chunk)) + { + auto& chunkInfo = chunk.getChunkHeader()->m_info; + if (!chunkInfo.m_externalSequenceNumber_bl) + { + chunkInfo.m_sequenceNumber = getMembers()->m_sequenceNumber; + getMembers()->m_sequenceNumber++; + } + else + { + getMembers()->m_sequenceNumber++; // for Introspection, else nobody updates. + } + return true; + } + else + { + errorHandler(Error::kPOPO__CHUNK_SENDER_INVALID_CHUNK_TO_SEND_FROM_USER, nullptr, ErrorLevel::SEVERE); + return false; + } +} + +void ChunkSender::cleanup() noexcept +{ + getMembers()->m_chunksInUse.cleanup(); + this->clearHistory(); + getMembers()->m_lastChunk = nullptr; +} + +} // namespace popo +} // namespace iox diff --git a/iceoryx_posh/test/moduletests/test_popo_chunk_distributor.cpp b/iceoryx_posh/test/moduletests/test_popo_chunk_distributor.cpp index 7a15d66c82..7c6d2123dd 100644 --- a/iceoryx_posh/test/moduletests/test_popo_chunk_distributor.cpp +++ b/iceoryx_posh/test/moduletests/test_popo_chunk_distributor.cpp @@ -47,6 +47,7 @@ class ChunkDistributor_test : public Test static constexpr size_t MEGABYTE = 1 << 20; static constexpr size_t MEMORY_SIZE = 1 * MEGABYTE; + const uint64_t HISTORY_SIZE = 16; char memory[MEMORY_SIZE]; iox::posix::Allocator allocator{memory, MEMORY_SIZE}; MemPool mempool{128, 20, &allocator, &allocator}; @@ -63,7 +64,7 @@ class ChunkDistributor_test : public Test std::shared_ptr getChunkDistributorData() { - return std::make_shared(); + return std::make_shared(HISTORY_SIZE); } }; diff --git a/iceoryx_posh/test/moduletests/test_chunk_queue.cpp b/iceoryx_posh/test/moduletests/test_popo_chunk_queue.cpp similarity index 100% rename from iceoryx_posh/test/moduletests/test_chunk_queue.cpp rename to iceoryx_posh/test/moduletests/test_popo_chunk_queue.cpp diff --git a/iceoryx_posh/test/moduletests/test_popo_chunk_sender.cpp b/iceoryx_posh/test/moduletests/test_popo_chunk_sender.cpp new file mode 100644 index 0000000000..9a56ac1b4a --- /dev/null +++ b/iceoryx_posh/test/moduletests/test_popo_chunk_sender.cpp @@ -0,0 +1,386 @@ +// Copyright (c) 2019 by Robert Bosch GmbH. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "iceoryx_posh/iceoryx_posh_types.hpp" +#include "iceoryx_posh/internal/mepoo/memory_manager.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_data.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_sender.hpp" +#include "iceoryx_posh/internal/popo/building_blocks/chunk_sender_data.hpp" +#include "iceoryx_posh/mepoo/mepoo_config.hpp" +#include "iceoryx_utils/error_handling/error_handling.hpp" +#include "iceoryx_utils/internal/posix_wrapper/shared_memory_object/allocator.hpp" +#include "test.hpp" + +using namespace ::testing; + +struct DummySample +{ + uint64_t dummy{42}; +}; + +class ChunkSender_testBase : public Test +{ + protected: + ChunkSender_testBase() + { + m_mempoolconf.addMemPool({128, 20}); + m_mempoolconf.addMemPool({256, 20}); + m_memoryManager.configureMemoryManager(m_mempoolconf, &m_memoryAllocator, &m_memoryAllocator); + } + + ~ChunkSender_testBase() + { + } + + void SetUp() + { + } + + void TearDown() + { + } + + void ReceiveDummyData() + { + // // Be sure to receive the chunk we just sent to be able to recycle it + // const iox::mepoo::ChunkHeader* receivedSample1; + // m_receiver->getChunk(receivedSample1); + // m_receiver->releaseChunk(receivedSample1); + } + + static constexpr size_t MEMORY_SIZE = 1024 * 1024; + uint8_t m_memory[1024 * 1024]; + iox::posix::Allocator m_memoryAllocator{m_memory, MEMORY_SIZE}; + iox::mepoo::MePooConfig m_mempoolconf; + iox::mepoo::MemoryManager m_memoryManager; + + iox::popo::ChunkQueueData m_chunkQueueData{iox::cxx::VariantQueueTypes::SoFi_SingleProducerSingleConsumer}; + iox::popo::ChunkSenderData m_chunkSenderData{&m_memoryManager}; + iox::popo::ChunkSender m_chunkSender{&m_chunkSenderData}; +}; + +class ChunkSender_test : public ChunkSender_testBase +{ + public: + ChunkSender_test() + : ChunkSender_testBase() + { + } +}; + +TEST_F(ChunkSender_test, allocate_OneChunk) +{ + auto chunk = m_chunkSender.allocate(sizeof(DummySample)); + EXPECT_FALSE(chunk.has_error()); + EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(1u)); +} + +TEST_F(ChunkSender_test, allocate_MultipleChunks) +{ + auto chunk1 = m_chunkSender.allocate(sizeof(DummySample)); + auto chunk2 = m_chunkSender.allocate(sizeof(DummySample)); + + EXPECT_FALSE(chunk1.has_error()); + EXPECT_FALSE(chunk2.has_error()); + if (!chunk1.has_error() && !chunk2.has_error()) + { + // must be different chunks + EXPECT_THAT(*chunk1, Ne(*chunk2)); + } + EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(2u)); +} + +TEST_F(ChunkSender_test, allocate_Overflow) +{ + std::vector chunks; + + // allocate chunks until MAX_CHUNKS_ALLOCATE_PER_SENDER level + for (size_t i = 0; i < iox::MAX_CHUNKS_ALLOCATE_PER_SENDER; i++) + { + auto chunk = m_chunkSender.allocate(sizeof(DummySample)); + if (!chunk.has_error()) + { + chunks.push_back(*chunk); + } + } + + for (size_t i = 0; i < iox::MAX_CHUNKS_ALLOCATE_PER_SENDER; i++) + { + EXPECT_THAT(chunks[i], Ne(nullptr)); + } + EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(iox::MAX_CHUNKS_ALLOCATE_PER_SENDER)); + + // Allocate one more sample for overflow + auto chunk = m_chunkSender.allocate(sizeof(DummySample)); + EXPECT_TRUE(chunk.has_error()); + EXPECT_THAT(chunk.get_error(), Eq(iox::popo::ChunkSenderError::TOO_MANY_CHUKS_ALLOCATED_IN_PARALLEL)); + EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(iox::MAX_CHUNKS_ALLOCATE_PER_SENDER)); +} + +TEST_F(ChunkSender_test, freeChunk) +{ + std::vector chunks; + + // allocate chunks until MAX_CHUNKS_ALLOCATE_PER_SENDER level + for (size_t i = 0; i < iox::MAX_CHUNKS_ALLOCATE_PER_SENDER; i++) + { + auto chunk = m_chunkSender.allocate(sizeof(DummySample)); + if (!chunk.has_error()) + { + chunks.push_back(*chunk); + } + } + + EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(iox::MAX_CHUNKS_ALLOCATE_PER_SENDER)); + + // free them all + for (size_t i = 0; i < iox::MAX_CHUNKS_ALLOCATE_PER_SENDER; i++) + { + m_chunkSender.free(chunks[i]); + } + + EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(0u)); +} + +TEST_F(ChunkSender_test, freeInvalidChunk) +{ + auto chunk = m_chunkSender.allocate(sizeof(DummySample)); + EXPECT_FALSE(chunk.has_error()); + EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(1u)); + + auto errorHandlerCalled{false}; + auto errorHandlerGuard = iox::ErrorHandler::SetTemporaryErrorHandler( + [&errorHandlerCalled](const iox::Error, const std::function, const iox::ErrorLevel) { + errorHandlerCalled = true; + }); + + iox::mepoo::ChunkHeader* myCrazyChunk = new iox::mepoo::ChunkHeader(); + m_chunkSender.free(myCrazyChunk); + + EXPECT_TRUE(errorHandlerCalled); + EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(1u)); + delete myCrazyChunk; +} + +// TEST_F(ChunkSender_test, reserveSample_DynamicSamplesSameSizeReturningValidLastChunk) +// { +// auto sentSample1 = m_sender->reserveChunk(sizeof(DummySample), m_useDynamicPayloadSizes); +// m_sender->deliverChunk(sentSample1); + +// ReceiveDummyData(); + +// // Do it again to see whether the same chunk is returned +// auto sentSample2 = m_sender->reserveChunk(sizeof(DummySample), m_useDynamicPayloadSizes); +// m_sender->deliverChunk(sentSample2); +// EXPECT_THAT(sentSample2->m_info.m_payloadSize, Eq(sizeof(DummySample))); +// EXPECT_THAT(sentSample2->payload(), Eq(sentSample1->payload())); +// } + + +// TEST_F(ChunkSender_test, reserveSample_DynamicSamplesSmallerSizeReturningValidLastChunk) +// { +// auto sentSample1 = m_sender->reserveChunk(sizeof(DummySample), m_useDynamicPayloadSizes); +// m_sender->deliverChunk(sentSample1); + +// ReceiveDummyData(); + +// // Reserve a smaller chunk to see whether the same chunk is returned +// auto sentSample2 = m_sender->reserveChunk(sizeof(DummySample) - 7, m_useDynamicPayloadSizes); +// m_sender->deliverChunk(sentSample2); +// EXPECT_THAT(sentSample2->m_info.m_payloadSize, Eq(sizeof(DummySample) - 7)); +// EXPECT_THAT(sentSample2->payload(), Eq(sentSample1->payload())); +// } + +// TEST_F(ChunkSender_test, reserveSample_DynamicSamplesLargerSizeReturningNotLastChunk) +// { +// auto sentSample1 = m_sender->reserveChunk(sizeof(DummySample), m_useDynamicPayloadSizes); +// m_sender->deliverChunk(sentSample1); + +// ReceiveDummyData(); + +// // Reserve a larger chunk to see whether a chunk of the larger mempool is supplied +// auto sentSample2 = m_sender->reserveChunk(sizeof(DummySample) + 200, m_useDynamicPayloadSizes); +// m_sender->deliverChunk(sentSample2); +// EXPECT_THAT(sentSample2->m_info.m_payloadSize, Eq(sizeof(DummySample) + 200)); +// EXPECT_THAT(sentSample2->payload(), Ne(sentSample1->payload())); +// } + + +// TEST_F(ChunkSender_test, doNotDeliverDataOnSubscription) +// { +// EXPECT_THAT(m_receiver->newData(), Eq(false)); +// } + +// TEST_F(ChunkSender_test, deliverSample_OneSample) +// { +// auto sample = m_sender->reserveChunk(sizeof(DummySample)); + +// new (sample) DummySample(); +// sample->m_info.m_payloadSize = sizeof(DummySample); +// sample->m_info.m_externalSequenceNumber_bl = true; +// sample->m_info.m_sequenceNumber = 1337; +// m_sender->deliverChunk(sample); + +// ASSERT_THAT(m_receiver->newData(), Eq(true)); +// const iox::mepoo::ChunkHeader* receivedSample; +// ASSERT_THAT(m_receiver->getChunk(receivedSample), Eq(true)); +// ASSERT_THAT(m_receiver->releaseChunk(receivedSample), Eq(true)); +// ASSERT_THAT(receivedSample->m_info.m_sequenceNumber, Eq(1337u)); +// } + +// TEST_F(ChunkSender_test, deliverSample_MultipleSample) +// { +// auto sample1 = m_sender->reserveChunk(sizeof(DummySample)); +// new (sample1->payload()) DummySample(); +// sample1->m_info.m_payloadSize = sizeof(DummySample); +// sample1->m_info.m_externalSequenceNumber_bl = true; +// sample1->m_info.m_sequenceNumber = 14337; +// m_sender->deliverChunk(sample1); + +// auto sample2 = m_sender->reserveChunk(sizeof(DummySample)); +// new (sample2->payload()) DummySample(); +// sample2->m_info.m_payloadSize = sizeof(DummySample); +// sample2->m_info.m_externalSequenceNumber_bl = true; +// sample2->m_info.m_sequenceNumber = 42u; +// m_sender->deliverChunk(sample2); + + +// ASSERT_THAT(m_receiver->newData(), Eq(true)); +// const iox::mepoo::ChunkHeader* receivedSample; +// ASSERT_THAT(m_receiver->getChunk(receivedSample), Eq(true)); +// ASSERT_THAT(m_receiver->releaseChunk(receivedSample), Eq(true)); +// ASSERT_THAT(receivedSample->m_info.m_sequenceNumber, Eq(14337u)); + +// ASSERT_THAT(m_receiver->getChunk(receivedSample), Eq(true)); +// ASSERT_THAT(m_receiver->releaseChunk(receivedSample), Eq(true)); +// ASSERT_THAT(receivedSample->m_info.m_sequenceNumber, Eq(42u)); +// } + +// TEST_F(ChunkSender_test, DISABLED_doDeliverOnSubscription_InitialValue) +// { +// ServiceDescription l_service2{2, 2, 2}; +// auto m_sender2 = CreateSender(l_service2); +// m_sender2->enableDoDeliverOnSubscription(); + +// auto latestValue = m_sender2->reserveChunk(sizeof(DummySample)); +// latestValue->m_info.m_externalSequenceNumber_bl = true; +// latestValue->m_info.m_sequenceNumber = 4711; +// m_sender2->deliverChunk(latestValue); + +// auto m_receiver2 = CreateReceiver(m_service); +// SubscribeReceiverToSender(m_receiver2, m_sender2); + +// ASSERT_THAT(m_receiver2->newData(), Eq(true)); +// const iox::mepoo::ChunkHeader* receivedSample; +// ASSERT_THAT(m_receiver2->getChunk(receivedSample), Eq(true)); +// ASSERT_THAT(receivedSample->m_info.m_sequenceNumber, Eq(4711u)); +// m_receiver2->releaseChunk(receivedSample); +// } + +// TEST_F(ChunkSender_test, doDeliverOnSubscription_LatestValue) +// { +// m_sender->enableDoDeliverOnSubscription(); + +// auto latestValue = m_sender->reserveChunk(sizeof(DummySample)); +// latestValue->m_info.m_externalSequenceNumber_bl = true; +// latestValue->m_info.m_sequenceNumber = 41112; +// m_sender->deliverChunk(latestValue); + +// auto m_receiver2 = CreateReceiver(m_service); +// SubscribeReceiverToSender(m_receiver2, m_sender); + + +// EXPECT_THAT(m_sender->isPortActive(), Eq(true)); +// ASSERT_THAT(m_receiver2->newData(), Eq(true)); +// const iox::mepoo::ChunkHeader* receivedSample; +// ASSERT_THAT(m_receiver2->getChunk(receivedSample), Eq(true)); +// ASSERT_THAT(receivedSample->m_info.m_sequenceNumber, Eq(41112u)); +// m_receiver2->releaseChunk(latestValue); +// } + +// TEST_F(ChunkSender_test, testCaPro) +// { +// m_sender->enableDoDeliverOnSubscription(); + +// auto latestValue = m_sender->reserveChunk(sizeof(DummySample)); +// latestValue->m_info.m_externalSequenceNumber_bl = true; +// latestValue->m_info.m_sequenceNumber = 47112; +// m_sender->deliverChunk(latestValue); + +// auto m_receiver2 = CreateReceiver(m_service); +// SubscribeReceiverToSender(m_receiver2, m_sender); + + +// EXPECT_THAT(m_sender->isPortActive(), Eq(true)); +// ASSERT_THAT(m_receiver2->newData(), Eq(true)); +// const iox::mepoo::ChunkHeader* receivedSample; +// ASSERT_THAT(m_receiver2->getChunk(receivedSample), Eq(true)); +// ASSERT_THAT(receivedSample->m_info.m_sequenceNumber, Eq(47112u)); +// m_receiver2->releaseChunk(receivedSample); +// } + +// TEST_F(ChunkSender_testLatchedTopic, getSameSampleAfterOneDeliver) +// { +// auto sample = m_sender->reserveChunk(sizeof(DummySample)); +// new (sample) DummySample(); +// sample->m_info.m_payloadSize = sizeof(DummySample); +// m_sender->deliverChunk(sample); + + +// const iox::mepoo::ChunkHeader* receivedSample; +// ASSERT_THAT(m_receiver->getChunk(receivedSample), Eq(true)); +// m_receiver->releaseChunk(receivedSample); + +// uint64_t sampleAddress = reinterpret_cast(sample); +// EXPECT_THAT(reinterpret_cast(m_sender->reserveChunk(sizeof(DummySample))), Eq(sampleAddress)); +// } + +// TEST_F(ChunkSender_testLatchedTopic, getDifferentSampleWhenStillInUse) +// { +// auto sample = m_sender->reserveChunk(sizeof(DummySample)); +// new (sample) DummySample(); +// sample->m_info.m_payloadSize = sizeof(DummySample); +// m_sender->deliverChunk(sample); + +// const iox::mepoo::ChunkHeader* receivedSample; +// ASSERT_THAT(m_receiver->getChunk(receivedSample), Eq(true)); + +// uint64_t sampleAddress = reinterpret_cast(sample); +// EXPECT_THAT(reinterpret_cast(m_sender->reserveChunk(sizeof(DummySample))), Ne(sampleAddress)); +// m_receiver->releaseChunk(receivedSample); +// } + +// TEST_F(ChunkSender_testLatchedTopic, getSameSampleAfterSecondDelivery) +// { +// auto sample = m_sender->reserveChunk(sizeof(DummySample)); +// new (sample) DummySample(); +// sample->m_info.m_payloadSize = sizeof(DummySample); +// m_sender->deliverChunk(sample); + +// sample = m_sender->reserveChunk(sizeof(DummySample)); +// new (sample) DummySample(); +// sample->m_info.m_payloadSize = sizeof(DummySample); +// m_sender->deliverChunk(sample); + +// const iox::mepoo::ChunkHeader* receivedSample; +// ASSERT_THAT(m_receiver->getChunk(receivedSample), Eq(true)); +// m_receiver->releaseChunk(receivedSample); + +// ASSERT_THAT(m_receiver->getChunk(receivedSample), Eq(true)); +// m_receiver->releaseChunk(receivedSample); + +// uint64_t sampleAddress = reinterpret_cast(sample); +// EXPECT_THAT(reinterpret_cast(m_sender->reserveChunk(sizeof(DummySample))), Eq(sampleAddress)); +// } diff --git a/iceoryx_posh/test/moduletests/test_posh_senderport.cpp b/iceoryx_posh/test/moduletests/test_posh_senderport.cpp index 5f902a43ed..777f039919 100644 --- a/iceoryx_posh/test/moduletests/test_posh_senderport.cpp +++ b/iceoryx_posh/test/moduletests/test_posh_senderport.cpp @@ -225,13 +225,13 @@ TEST_F(SenderPort_test, reserveSample_Overflow) { std::vector samples; - // allocate samples until MAX_SAMPLE_ALLOCATE_PER_SENDER level - for (size_t i = 0; i < iox::MAX_SAMPLE_ALLOCATE_PER_SENDER; i++) + // allocate samples until MAX_CHUNKS_ALLOCATE_PER_SENDER level + for (size_t i = 0; i < iox::MAX_CHUNKS_ALLOCATE_PER_SENDER; i++) { samples.push_back(m_sender->reserveChunk(sizeof(DummySample))); } - for (size_t i = 0; i < iox::MAX_SAMPLE_ALLOCATE_PER_SENDER; i++) + for (size_t i = 0; i < iox::MAX_CHUNKS_ALLOCATE_PER_SENDER; i++) { EXPECT_THAT(samples[i], Ne(nullptr)); } diff --git a/iceoryx_utils/include/iceoryx_utils/error_handling/error_handling.hpp b/iceoryx_utils/include/iceoryx_utils/error_handling/error_handling.hpp index 47fe08ad70..f447aa778a 100644 --- a/iceoryx_utils/include/iceoryx_utils/error_handling/error_handling.hpp +++ b/iceoryx_utils/include/iceoryx_utils/error_handling/error_handling.hpp @@ -48,6 +48,8 @@ namespace iox error(POSH__SERVICE_DISCOVERY_INSTANCE_CONTAINER_OVERFLOW) \ error(POSH__SERVICE_DISCOVERY_FIND_SERVICE_CALLBACKS_CONTAINER_OVERFLOW) \ error(POPO__CHUNK_DISTRIBUTOR_OVERFLOW_OF_QUEUE_CONTAINER) \ + error(POPO__CHUNK_SENDER_INVALID_CHUNK_TO_FREE_FROM_USER) \ + error(POPO__CHUNK_SENDER_INVALID_CHUNK_TO_SEND_FROM_USER) \ error(MEPOO__MEMPOOL_CONFIG_MUST_BE_ORDERED_BY_INCREASING_SIZE) \ error(MEPOO__MEMPOOL_GETCHUNK_CHUNK_IS_TOO_LARGE) \ error(MEPOO__MEMPOOL_CHUNKSIZE_MUST_BE_LARGER_THAN_SHARED_MEMORY_ALIGNMENT_AND_MULTIPLE_OF_ALIGNMENT) \