Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iox #25 chunk receiver building block #84

Merged
3 changes: 2 additions & 1 deletion iceoryx_posh/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ add_library(iceoryx_posh
source/popo/sender_port.cpp
source/popo/sender_port_data.cpp
source/popo/receiver_handler.cpp
source/popo/building_blocks/chunk_queue.cpp
source/popo/building_blocks/chunk_queue.cpp
source/popo/building_blocks/chunk_receiver.cpp
source/runtime/message_queue_interface.cpp
source/runtime/message_queue_message.cpp
source/runtime/port_config_info.cpp
Expand Down
12 changes: 9 additions & 3 deletions iceoryx_posh/include/iceoryx_posh/iceoryx_posh_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,15 @@ constexpr uint32_t MAX_PORT_NUMBER = 1024u;
constexpr uint32_t MAX_INTERFACE_NUMBER = 4u;
constexpr uint32_t MAX_RECEIVERS_PER_SENDERPORT = 256u;
constexpr uint32_t MAX_CHUNKS_ALLOCATE_PER_SENDER = 8u;
constexpr uint64_t MAX_SENDER_SAMPLE_HISTORY_CAPACITY = 16u;
constexpr uint32_t MAX_RECEIVER_QUEUE_CAPACITY = 256u;
constexpr uint64_t MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR = 16u;
constexpr uint32_t MAX_CHUNKS_HELD_PER_RECEIVER = 256u;
constexpr uint32_t MAX_RECEIVER_QUEUE_CAPACITY = MAX_CHUNKS_HELD_PER_RECEIVER;
/// With MAX_RECEIVER_QUEUE_CAPACITY = MAX_CHUNKS_HELD_PER_RECEIVER we couple the maximum number of chunks a user is
/// allowed to hold with the maximum queue capacity.
/// This allows that a polling user can replace all the held chunks in one execution with all new ones
/// from a completely filled queue. Or the other way round, when we have a contract with the user
/// regarding how many chunks they are allowed to hold, then the queue size needs not be bigger. We
/// can provide this number of newest chunks, more the user would not be allowed to hold anyway
constexpr uint32_t MAX_INTERFACE_CAPRO_FIFO_SIZE = MAX_PORT_NUMBER;
constexpr uint32_t MAX_APPLICATION_CAPRO_FIFO_SIZE = 128u;

Expand Down Expand Up @@ -119,4 +126,3 @@ using FindServiceHandler = std::function<void(InstanceContainer&, FindServiceHan
} // namespace runtime

} // namespace iox

Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,24 @@ namespace popo
///
/// About Concurrency:
/// This ChunkDistributor can be used with different LockingPolicies for different scenarios
/// When different threads operate on it (e.g. application sends chunks and RouDi adds and removes queues),
/// a locking policy must be used that ensures consistent data in the ChunkDistributorData.
Comment on lines -34 to -35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep fighting the trailing whitespaces :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't your IDE automatically remove trailing whitespaces ... me starting IDE war and running away ;)

/// When different threads operate on it (e.g. application sends chunks and RouDi adds and removes queues),
/// a locking policy must be used that ensures consistent data in the ChunkDistributorData.
/// @todo There are currently some challenge:
/// For the stored queues and the history, containers are used which are not thread safe. Therefore we use an inter-process
/// mutex. But this can lead to deadlocks if a user process gets terminated while one of its threads is in the ChunkDistributor
/// and holds a lock. An easier setup would be if changing the queues by a middleware thread and sending chunks by the user process
/// would not interleave. I.e. there is no concurrent access to the containers. Then a memory synchronization would be sufficient
/// The cleanup() call is the biggest challenge. This is used to free chunks that are still held by a not properly terminated
/// user application. Even if access from middleware and user threads do not overlap, the history container to cleanup could be
/// in an inconsistent state as the application was hard terminated while changing it. We would need a container like the
/// UsedChunkList to have one that is robust against such inconsistencies.... A perfect job for our future selves
template <uint32_t MaxQueues, typename LockingPolicy>
/// For the stored queues and the history, containers are used which are not thread safe. Therefore we use an
/// inter-process mutex. But this can lead to deadlocks if a user process gets terminated while one of its
/// threads is in the ChunkDistributor and holds a lock. An easier setup would be if changing the queues
/// by a middleware thread and sending chunks by the user process would not interleave. I.e. there is no concurrent
/// access to the containers. Then a memory synchronization would be sufficient.
/// The cleanup() call is the biggest challenge. This is used to free chunks that are still held by a not properly
/// terminated user application. Even if access from middleware and user threads do not overlap, the history
/// container to cleanup could be in an inconsistent state as the application was hard terminated while changing it.
/// We would need a container like the UsedChunkList to have one that is robust against such inconsistencies....
/// A perfect job for our future selves
template <typename ChunkDistributorDataType>
class ChunkDistributor
{
public:
using MemberType_t = ChunkDistributorData<MaxQueues, LockingPolicy>;
using MemberType_t = ChunkDistributorDataType;

ChunkDistributor(MemberType_t* const chunkDistrubutorDataPtr) noexcept;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,29 @@ namespace iox
{
namespace popo
{
template <uint32_t MaxQueues, typename LockingPolicy>
inline ChunkDistributor<MaxQueues, LockingPolicy>::ChunkDistributor(
template <typename ChunkDistributorDataType>
inline ChunkDistributor<ChunkDistributorDataType>::ChunkDistributor(
MemberType_t* const chunkDistrubutorDataPtr) noexcept
: m_chunkDistrubutorDataPtr(chunkDistrubutorDataPtr)
{
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline const typename ChunkDistributor<MaxQueues, LockingPolicy>::MemberType_t*
ChunkDistributor<MaxQueues, LockingPolicy>::getMembers() const noexcept
template <typename ChunkDistributorDataType>
inline const typename ChunkDistributor<ChunkDistributorDataType>::MemberType_t*
ChunkDistributor<ChunkDistributorDataType>::getMembers() const noexcept
{
return m_chunkDistrubutorDataPtr;
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline typename ChunkDistributor<MaxQueues, LockingPolicy>::MemberType_t*
ChunkDistributor<MaxQueues, LockingPolicy>::getMembers() noexcept
template <typename ChunkDistributorDataType>
inline typename ChunkDistributor<ChunkDistributorDataType>::MemberType_t*
ChunkDistributor<ChunkDistributorDataType>::getMembers() noexcept
{
return m_chunkDistrubutorDataPtr;
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline bool ChunkDistributor<MaxQueues, LockingPolicy>::addQueue(ChunkQueue::MemberType_t* const queueToAdd,
template <typename ChunkDistributorDataType>
inline bool ChunkDistributor<ChunkDistributorDataType>::addQueue(ChunkQueue::MemberType_t* const queueToAdd,
uint64_t requestedHistory) noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());
Expand All @@ -49,26 +49,26 @@ inline bool ChunkDistributor<MaxQueues, LockingPolicy>::addQueue(ChunkQueue::Mem
return false;
}

auto l_alreadyKnownReceiver =
auto alreadyKnownReceiver =
std::find_if(getMembers()->m_queues.begin(),
getMembers()->m_queues.end(),
[&](ChunkQueue::MemberType_t* const queue) { return queue == queueToAdd; });

// check if the queue is not already in the list
if (l_alreadyKnownReceiver == getMembers()->m_queues.end())
if (alreadyKnownReceiver == getMembers()->m_queues.end())
{
if (getMembers()->m_queues.size() < getMembers()->m_queues.capacity())
{
getMembers()->m_queues.push_back(queueToAdd);

uint64_t currChunkHistorySize = getMembers()->m_sampleHistory.size();
uint64_t currChunkHistorySize = getMembers()->m_history.size();

// if the current history is large enough we send the requested number of chunks, else we send the
// total history
auto startIndex = (requestedHistory <= currChunkHistorySize) ? currChunkHistorySize - requestedHistory : 0u;
for (auto i = startIndex; i < currChunkHistorySize; ++i)
{
deliverToQueue(queueToAdd, getMembers()->m_sampleHistory[i]);
deliverToQueue(queueToAdd, getMembers()->m_history[i]);
}
}
else
Expand All @@ -81,37 +81,37 @@ inline bool ChunkDistributor<MaxQueues, LockingPolicy>::addQueue(ChunkQueue::Mem
return true;
}

template <uint32_t MaxQueues, typename LockingPolicy>
template <typename ChunkDistributorDataType>
inline void
ChunkDistributor<MaxQueues, LockingPolicy>::removeQueue(ChunkQueue::MemberType_t* const queueToRemove) noexcept
ChunkDistributor<ChunkDistributorDataType>::removeQueue(ChunkQueue::MemberType_t* const queueToRemove) noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());

auto l_iter = std::find(getMembers()->m_queues.begin(), getMembers()->m_queues.end(), queueToRemove);
if (l_iter != getMembers()->m_queues.end())
auto iter = std::find(getMembers()->m_queues.begin(), getMembers()->m_queues.end(), queueToRemove);
if (iter != getMembers()->m_queues.end())
{
getMembers()->m_queues.erase(l_iter);
getMembers()->m_queues.erase(iter);
}
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::removeAllQueues() noexcept
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::removeAllQueues() noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());

getMembers()->m_queues.clear();
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline bool ChunkDistributor<MaxQueues, LockingPolicy>::hasStoredQueues() noexcept
template <typename ChunkDistributorDataType>
inline bool ChunkDistributor<ChunkDistributorDataType>::hasStoredQueues() noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());

return !getMembers()->m_queues.empty();
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());

Expand All @@ -125,52 +125,52 @@ inline void ChunkDistributor<MaxQueues, LockingPolicy>::deliverToAllStoredQueues
addToHistoryWithoutDelivery(chunk);
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::deliverToQueue(ChunkQueue::MemberType_t* const queue,
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::deliverToQueue(ChunkQueue::MemberType_t* const queue,
mepoo::SharedChunk chunk) noexcept
{
ChunkQueue(queue).push(chunk);
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::addToHistoryWithoutDelivery(mepoo::SharedChunk chunk) noexcept
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::addToHistoryWithoutDelivery(mepoo::SharedChunk chunk) noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());

if (0 < getMembers()->m_historyCapacity)
{
if (getMembers()->m_sampleHistory.size() >= getMembers()->m_historyCapacity)
if (getMembers()->m_history.size() >= getMembers()->m_historyCapacity)
{
getMembers()->m_sampleHistory.erase(getMembers()->m_sampleHistory.begin());
getMembers()->m_history.erase(getMembers()->m_history.begin());
}
getMembers()->m_sampleHistory.push_back(chunk);
getMembers()->m_history.push_back(chunk);
}
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline uint64_t ChunkDistributor<MaxQueues, LockingPolicy>::getHistorySize() noexcept
template <typename ChunkDistributorDataType>
inline uint64_t ChunkDistributor<ChunkDistributorDataType>::getHistorySize() noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());

return getMembers()->m_sampleHistory.size();
return getMembers()->m_history.size();
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline uint64_t ChunkDistributor<MaxQueues, LockingPolicy>::getHistoryCapacity() const noexcept
template <typename ChunkDistributorDataType>
inline uint64_t ChunkDistributor<ChunkDistributorDataType>::getHistoryCapacity() const noexcept
{
return getMembers()->m_historyCapacity;
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::clearHistory() noexcept
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::clearHistory() noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());

getMembers()->m_sampleHistory.clear();
getMembers()->m_history.clear();
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::cleanup() noexcept
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::cleanup() noexcept
{
if (getMembers()->tryLock())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ namespace iox
{
namespace popo
{
struct ChunkQueueData;

Comment on lines -32 to -33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

class ThreadSafePolicy
{
public: // needs to be public since we want to use std::lock_guard
Expand Down Expand Up @@ -72,12 +70,12 @@ struct ChunkDistributorData : public LockingPolicy
using lockGuard_t = std::lock_guard<ChunkDistributorData<MaxQueues, LockingPolicy>>;

ChunkDistributorData(uint64_t historyCapacity = 0u) noexcept
: m_historyCapacity(algorithm::min(historyCapacity, MAX_SENDER_SAMPLE_HISTORY_CAPACITY))
: m_historyCapacity(algorithm::min(historyCapacity, MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR))
{
if (m_historyCapacity != historyCapacity)
{
LogWarn() << "Chunk history too large, reducing from " << historyCapacity << " to "
<< MAX_SENDER_SAMPLE_HISTORY_CAPACITY;
<< MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR;
}
}

Expand All @@ -90,8 +88,8 @@ struct ChunkDistributorData : public LockingPolicy
/// When to store a SharedChunk and when the included ChunkManagement must be used?
/// If we would make the ChunkDistributor lock-free, can we than extend the UsedChunkList to
/// be like a ring buffer and use this for the history? This would be needed to be able to safely cleanup
using SampleHistoryContainer_t = cxx::vector<mepoo::SharedChunk, MAX_SENDER_SAMPLE_HISTORY_CAPACITY>;
SampleHistoryContainer_t m_sampleHistory;
using HistoryContainer_t = cxx::vector<mepoo::SharedChunk, MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR>;
HistoryContainer_t m_history;
};

} // namespace popo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ namespace popo
{
enum class ChunkQueueError
{
SEMAPHORE_ALREADY_SET
SEMAPHORE_ALREADY_SET,
QUEUE_OVERFLOW
};

/// @brief The ChunkQueue is the low layer building block to receive SharedChunks. It follows a first-in-first-out
Expand All @@ -49,8 +50,8 @@ class ChunkQueue
/// @brief push a new chunk to the chunk queue
/// @param[in] shared chunk object
/// @return if the values was pushed successfully into the chunk queue it returns
/// true, otherwise false
bool push(mepoo::SharedChunk chunk) noexcept;
/// success, otherwise a ChunkQueueError
cxx::expected<ChunkQueueError> push(mepoo::SharedChunk chunk) noexcept;

/// @brief pop a chunk from the chunk queue
/// @return optional for a shared chunk that is set if the queue is not empty
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef IOX_POSH_POPO_CHUNK_RECEIVER_HPP_
#define IOX_POSH_POPO_CHUNK_RECEIVER_HPP_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we decided to switch to include guards (#30), we can keep them here. Isn't the #define IOX_CHUNK_RECEIVER_HPP enough to avoid ambiguity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I use them here as we decided to use them again. But we haven't defined a rule for them yet, or?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope there's no rule yet. Let's define it with #30


#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/chunk_receiver_data.hpp"
#include "iceoryx_posh/mepoo/chunk_header.hpp"
#include "iceoryx_utils/cxx/expected.hpp"
#include "iceoryx_utils/cxx/optional.hpp"

namespace iox
{
namespace popo
{
enum class ChunkReceiverError
{
TOO_MANY_CHUNKS_HELD_IN_PARALLEL
};

/// @brief The ChunkReceiver is a building block of the shared memory communication infrastructure. It extends
/// the functionality of a ChunkQueue with the abililty to pass chunks to the user side (user process).
/// Together with the ChunkSender, they are the next abstraction layer on top of ChunkDistributor and ChunkQueue. The
/// ChunkRceiver holds the ownership of the SharedChunks and does a bookkeeping which chunks are currently passed to the
/// user side.
class ChunkReceiver : public ChunkQueue
{
public:
using MemberType_t = ChunkReceiverData;

ChunkReceiver(MemberType_t* const chunkReceiverDataPtr) noexcept;

ChunkReceiver(const ChunkReceiver& other) = delete;
ChunkReceiver& operator=(const ChunkReceiver&) = delete;
ChunkReceiver(ChunkReceiver&& rhs) = default;
ChunkReceiver& operator=(ChunkReceiver&& rhs) = default;
~ChunkReceiver() = default;

/// @brief Tries to get the next received chunk. If there is a new one the ChunkHeader of this new chunk is received
/// The ownerhip of the SharedChunk remains in the ChunkReceiver for being able to cleanup if the user process
/// disappears
/// @return optional that has a new chunk header or no value if there are no new chunks in the underlying queue,
/// ChunkReceiverError on error
cxx::expected<cxx::optional<const mepoo::ChunkHeader*>, ChunkReceiverError> get() noexcept;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the optional inside an expected? Isn't it enough to either provide a ChunkHeader pointer or an ChunkReceiverError?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, if there is no new chunk, you will get an empty optional without an error. Like a pop on an empty FiFo


/// @brief Release a chunk that was obtained with get
/// @param[in] chunkHeader, pointer to the ChunkHeader to free
void release(const mepoo::ChunkHeader* chunkHeader) noexcept;

/// @brief Release all the chunks that are currently held. Caution: Only call this if the user process is no more
/// running E.g. This cleans up chunks that were held by a user process that died unexpectetly, for avoiding lost
/// chunks in the system
void releaseAll() noexcept;

private:
const MemberType_t* getMembers() const noexcept;
MemberType_t* getMembers() noexcept;
};

} // namespace popo
} // namespace iox

#endif
Loading