Skip to content

Commit

Permalink
iox-#25: first version of ChunkSender and first tests
Browse files Browse the repository at this point in the history
Signed-off-by: Poehnl Michael (CC-AD/ESW1) <[email protected]>
  • Loading branch information
Poehnl Michael (CC-AD/ESW1) committed Mar 29, 2020
1 parent 731bb84 commit c5d5675
Show file tree
Hide file tree
Showing 13 changed files with 654 additions and 21 deletions.
1 change: 1 addition & 0 deletions iceoryx_posh/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ add_library(iceoryx_posh
source/popo/receiver_handler.cpp
source/popo/building_blocks/chunk_queue.cpp
source/popo/building_blocks/chunk_distributor.cpp
source/popo/building_blocks/chunk_sender.cpp
source/runtime/message_queue_interface.cpp
source/runtime/message_queue_message.cpp
source/runtime/port_config_info.cpp
Expand Down
2 changes: 1 addition & 1 deletion iceoryx_posh/include/iceoryx_posh/iceoryx_posh_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ constexpr units::Duration PROCESS_KEEP_ALIVE_TIMEOUT = 5 * PROCESS_KEEP_ALIVE_IN
constexpr uint32_t MAX_PORT_NUMBER = 4096u;
constexpr uint32_t MAX_INTERFACE_NUMBER = 16u;
constexpr uint32_t MAX_RECEIVERS_PER_SENDERPORT = 256u;
constexpr uint32_t MAX_SAMPLE_ALLOCATE_PER_SENDER = 16u;
constexpr uint32_t MAX_CHUNKS_ALLOCATE_PER_SENDER = 16u;
constexpr uint64_t MAX_SENDER_SAMPLE_HISTORY_CAPACITY = 16u;
constexpr uint32_t MAX_RECEIVER_QUEUE_CAPACITY = 256u;
constexpr uint32_t MAX_INTERFACE_CAPRO_FIFO_SIZE = MAX_PORT_NUMBER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ struct ChunkQueueData;

struct ChunkDistributorData
{
ChunkDistributorData(uint64_t historyCapacity = MAX_SENDER_SAMPLE_HISTORY_CAPACITY) noexcept
ChunkDistributorData(uint64_t historyCapacity = 0u) noexcept
: m_historyCapacity(algorithm::min(historyCapacity, MAX_SENDER_SAMPLE_HISTORY_CAPACITY))
{
if (m_historyCapacity != historyCapacity)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef IOX_POSH_POPO_CHUNK_SENDER_HPP_
#define IOX_POSH_POPO_CHUNK_SENDER_HPP_

#include "iceoryx_posh/internal/mepoo/shared_chunk.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/chunk_distributor.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/chunk_sender_data.hpp"
#include "iceoryx_posh/mepoo/chunk_header.hpp"
#include "iceoryx_utils/cxx/expected.hpp"

namespace iox
{
namespace popo
{
/// @brief error which can occur in the VariantQueue
enum class ChunkSenderError
{
RUNNING_OUT_OF_CHUNKS,
TOO_MANY_CHUKS_ALLOCATED_IN_PARALLEL
};

/// @brief The ChunkSender is a building block of the shared memory communication infrastructure. It extends
/// the functionality of a ChunkDistributor with the abililty to allocate and free memory chunks.
/// For getting chunks of memory the MemoryManger is used. Together with the ChunkReceiver, they are the next
/// abstraction layer on top of ChunkDistributor and ChunkQueue. The ChunkSender holds the ownership of the
/// SharedChunks and does a bookkeeping which chunks are currently passed to the user side.
class ChunkSender : public ChunkDistributor
{
public:
using MemberType_t = ChunkSenderData;

ChunkSender(MemberType_t* const chunkDistributorDataPtr) noexcept;

ChunkSender(const ChunkSender& other) = delete;
ChunkSender& operator=(const ChunkSender&) = delete;
ChunkSender(ChunkSender&& rhs) = default;
ChunkSender& operator=(ChunkSender&& rhs) = default;
~ChunkSender() = default;

/// @brief Allocate a chunk, the ownerhip of the SharedChunk remains in the ChunkSender for being able to cleanup if
/// the user process disappears
/// @param[in] payloadSize, size of the user paylaod without additional headers
/// @return on success pointer to a ChunkHeader which can be used to access the payload and header fields, error if
/// not
cxx::expected<mepoo::ChunkHeader*, ChunkSenderError> allocate(const uint32_t payloadSize) noexcept;

/// @brief Free an allocated chunk without sending it
/// @param[in] chunkHeader, pointer to the ChunkHeader to free
void free(mepoo::ChunkHeader* const chunkHeader) noexcept;

/// @brief Send an allocated chunk to all connected ChunkQueue
/// @param[in] chunkHeader, pointer to the ChunkHeader to send
void send(mepoo::ChunkHeader* const chunkHeader) noexcept;

/// @brief Push an allocated chunk to the history without sending it
/// @param[in] chunkHeader, pointer to the ChunkHeader to push to the history
void pushToHistory(mepoo::ChunkHeader* const chunkHeader) noexcept;

/// @brief Release all the chunks that are currently held. Caution: Only call this if the user process is no more
/// running E.g. This cleans up chunks that were held by a user process that died unexpectetly, for avoiding lost
/// chunks in the system
void cleanup() noexcept;

private:
/// @brief Get the SharedChunk from the provided ChunkHeader and do all that is required to send the chunk
/// @param[in] chunkHeader of the chunk that shall be send
/// @param[in][out] chunk that corresponds to the chunk header
/// @return true if there was a matching chunk with this header, false if not
bool getChunkReadyForSend(mepoo::ChunkHeader* chunkHeader, mepoo::SharedChunk& chunk) noexcept;

const MemberType_t* getMembers() const noexcept;
MemberType_t* getMembers() noexcept;
};

} // namespace popo
} // namespace iox

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,25 @@ namespace iox
{
namespace popo
{
struct ChunkSenderData
struct ChunkSenderData : public ChunkDistributorData
{
ChunkSenderData(mepoo::MemoryManager* const memMgr,
uint64_t historyCapacity = MAX_SENDER_SAMPLE_HISTORY_CAPACITY,
const MemoryInfo& memoryInfo = MemoryInfo()) noexcept
: m_memoryMgr(memMgr)
ChunkSenderData(mepoo::MemoryManager* const memoryManager,
uint64_t historyCapacity = 0u,
const mepoo::MemoryInfo& memoryInfo = mepoo::MemoryInfo()) noexcept
: ChunkDistributorData(historyCapacity)
, m_memoryMgr(memoryManager)
, m_memoryInfo(memoryInfo)
, m_chunkDistributor(historyCapacity)
{
assert(nullptr != memoryManager);
}

iox::relative_ptr<mepoo::MemoryManager> m_memoryMgr;
MemoryInfo m_memoryInfo;
ChunkDistributorData m_chunkDistributorData;
UsedChunkList<MAX_SAMPLE_ALLOCATE_PER_SENDER> m_chunksInUse;
relative_ptr<mepoo::MemoryManager> m_memoryMgr;
mepoo::MemoryInfo m_memoryInfo;
UsedChunkList<MAX_CHUNKS_ALLOCATE_PER_SENDER> m_chunksInUse;
mepoo::SequenceNumberType m_sequenceNumber{0u};
mepoo::SharedChunk m_lastChunk{nullptr};

/// @todo This here?
// bool m_isUnique{false};
// throughput related members
// std::atomic<uint32_t> m_activePayloadSize{0u};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ struct SenderPortData : public BasePortData
bool m_isUnique{false};


UsedChunkList<MAX_SAMPLE_ALLOCATE_PER_SENDER> m_allocatedChunksList;
UsedChunkList<MAX_CHUNKS_ALLOCATE_PER_SENDER> m_allocatedChunksList;

mepoo::SequenceNumberType m_sequenceNumber{0u};
// throughput related members
Expand All @@ -88,4 +88,3 @@ struct SenderPortData : public BasePortData

} // namespace popo
} // namespace iox

Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,14 @@ void ChunkDistributor::deliverToQueue(ChunkQueue::MemberType_t* const queue, mep

void ChunkDistributor::addToHistoryWithoutDelivery(mepoo::SharedChunk chunk) noexcept
{
if (getMembers()->m_sampleHistory.size() >= getMembers()->m_historyCapacity)
if (0 < getMembers()->m_historyCapacity)
{
getMembers()->m_sampleHistory.erase(getMembers()->m_sampleHistory.begin());
if (getMembers()->m_sampleHistory.size() >= getMembers()->m_historyCapacity)
{
getMembers()->m_sampleHistory.erase(getMembers()->m_sampleHistory.begin());
}
getMembers()->m_sampleHistory.push_back(chunk);
}
getMembers()->m_sampleHistory.push_back(chunk);
}

uint64_t ChunkDistributor::getHistorySize() noexcept
Expand Down
149 changes: 149 additions & 0 deletions iceoryx_posh/source/popo/building_blocks/chunk_sender.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "iceoryx_posh/internal/popo/building_blocks/chunk_sender.hpp"
#include "iceoryx_utils/error_handling/error_handling.hpp"

namespace iox
{
namespace popo
{
ChunkSender::ChunkSender(MemberType_t* const chunkSenderDataPtr) noexcept
: ChunkDistributor(chunkSenderDataPtr)
{
}

const ChunkSender::MemberType_t* ChunkSender::getMembers() const noexcept
{
return reinterpret_cast<const MemberType_t*>(ChunkDistributor::getMembers());
}

ChunkSender::MemberType_t* ChunkSender::getMembers() noexcept
{
return reinterpret_cast<MemberType_t*>(ChunkDistributor::getMembers());
}

cxx::expected<mepoo::ChunkHeader*, ChunkSenderError> ChunkSender::allocate(const uint32_t payloadSize) noexcept
{
// use the chunk stored in m_lastChunk if there is one, there is no other owner and the new payload fits in this
// chunk
if (getMembers()->m_lastChunk && getMembers()->m_lastChunk.hasNoOtherOwners()
&& getMembers()->m_lastChunk.getChunkHeader()->m_info.m_usedSizeOfChunk
>= getMembers()->m_memoryMgr->sizeWithChunkHeaderStruct(payloadSize))
{
if (getMembers()->m_chunksInUse.insert(getMembers()->m_lastChunk))
{
getMembers()->m_lastChunk.getChunkHeader()->m_info.m_payloadSize = payloadSize;
getMembers()->m_lastChunk.getChunkHeader()->m_info.m_usedSizeOfChunk =
getMembers()->m_memoryMgr->sizeWithChunkHeaderStruct(payloadSize);
return cxx::success<mepoo::ChunkHeader*>(getMembers()->m_lastChunk.getChunkHeader());
}
else
{
return cxx::error<ChunkSenderError>(ChunkSenderError::TOO_MANY_CHUKS_ALLOCATED_IN_PARALLEL);
}
}
else
{
// START of critical section, chunk will be lost if process gets hard terminated in between
// get a new chunk
mepoo::SharedChunk chunk = getMembers()->m_memoryMgr->getChunk(payloadSize);

if (chunk)
{
// if the application allocated too much chunks, return no more chunks
if (getMembers()->m_chunksInUse.insert(chunk))
{
// STOP of critical section, chunk will be lost if process gets hard terminated in between
return cxx::success<mepoo::ChunkHeader*>(chunk.getChunkHeader());
}
else
{
// release the allocated chunk
chunk = nullptr;
return cxx::error<ChunkSenderError>(ChunkSenderError::TOO_MANY_CHUKS_ALLOCATED_IN_PARALLEL);
}
}
else
{
return cxx::error<ChunkSenderError>(ChunkSenderError::RUNNING_OUT_OF_CHUNKS);
}
}
}

void ChunkSender::free(mepoo::ChunkHeader* const chunkHeader) noexcept
{
mepoo::SharedChunk chunk(nullptr);
if (!getMembers()->m_chunksInUse.remove(chunkHeader, chunk))
{
errorHandler(Error::kPOPO__CHUNK_SENDER_INVALID_CHUNK_TO_FREE_FROM_USER, nullptr, ErrorLevel::SEVERE);
}
}

void ChunkSender::send(mepoo::ChunkHeader* const chunkHeader) noexcept
{
mepoo::SharedChunk chunk(nullptr);
// START of critical section, chunk will be lost if process gets hard terminated in between
if (getChunkReadyForSend(chunkHeader, chunk))
{
this->deliverToAllStoredQueues(chunk);
getMembers()->m_lastChunk = chunk;
}
// STOP of critical section, chunk will be lost if process gets hard terminated in between
}

void ChunkSender::pushToHistory(mepoo::ChunkHeader* const chunkHeader) noexcept
{
mepoo::SharedChunk chunk(nullptr);
// START of critical section, chunk will be lost if process gets hard terminated in between
if (getChunkReadyForSend(chunkHeader, chunk))
{
this->addToHistoryWithoutDelivery(chunk);
getMembers()->m_lastChunk = chunk;
}
// STOP of critical section, chunk will be lost if process gets hard terminated in between
}

bool ChunkSender::getChunkReadyForSend(mepoo::ChunkHeader* chunkHeader, mepoo::SharedChunk& chunk) noexcept
{
if (getMembers()->m_chunksInUse.remove(chunkHeader, chunk))
{
auto& chunkInfo = chunk.getChunkHeader()->m_info;
if (!chunkInfo.m_externalSequenceNumber_bl)
{
chunkInfo.m_sequenceNumber = getMembers()->m_sequenceNumber;
getMembers()->m_sequenceNumber++;
}
else
{
getMembers()->m_sequenceNumber++; // for Introspection, else nobody updates.
}
return true;
}
else
{
errorHandler(Error::kPOPO__CHUNK_SENDER_INVALID_CHUNK_TO_SEND_FROM_USER, nullptr, ErrorLevel::SEVERE);
return false;
}
}

void ChunkSender::cleanup() noexcept
{
getMembers()->m_chunksInUse.cleanup();
this->clearHistory();
getMembers()->m_lastChunk = nullptr;
}

} // namespace popo
} // namespace iox
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ChunkDistributor_test : public Test

static constexpr size_t MEGABYTE = 1 << 20;
static constexpr size_t MEMORY_SIZE = 1 * MEGABYTE;
const uint64_t HISTORY_SIZE = 16;
char memory[MEMORY_SIZE];
iox::posix::Allocator allocator{memory, MEMORY_SIZE};
MemPool mempool{128, 20, &allocator, &allocator};
Expand All @@ -63,7 +64,7 @@ class ChunkDistributor_test : public Test

std::shared_ptr<ChunkDistributorData> getChunkDistributorData()
{
return std::make_shared<ChunkDistributorData>();
return std::make_shared<ChunkDistributorData>(HISTORY_SIZE);
}
};

Expand Down
Loading

0 comments on commit c5d5675

Please sign in to comment.