Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iox #25 chunk receiver building block #84

Merged
4 changes: 3 additions & 1 deletion iceoryx_posh/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ add_library(iceoryx_posh
source/popo/sender_port.cpp
source/popo/sender_port_data.cpp
source/popo/receiver_handler.cpp
source/popo/building_blocks/chunk_queue.cpp
source/popo/building_blocks/chunk_queue_pusher.cpp
source/popo/building_blocks/chunk_queue_popper.cpp
Comment on lines +75 to +76
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to have two interfaces!

source/popo/building_blocks/chunk_receiver.cpp
source/runtime/message_queue_interface.cpp
source/runtime/message_queue_message.cpp
source/runtime/port_config_info.cpp
Expand Down
12 changes: 9 additions & 3 deletions iceoryx_posh/include/iceoryx_posh/iceoryx_posh_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,15 @@ constexpr uint32_t MAX_PORT_NUMBER = 1024u;
constexpr uint32_t MAX_INTERFACE_NUMBER = 4u;
constexpr uint32_t MAX_RECEIVERS_PER_SENDERPORT = 256u;
constexpr uint32_t MAX_CHUNKS_ALLOCATE_PER_SENDER = 8u;
constexpr uint64_t MAX_SENDER_SAMPLE_HISTORY_CAPACITY = 16u;
constexpr uint32_t MAX_RECEIVER_QUEUE_CAPACITY = 256u;
constexpr uint64_t MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR = 16u;
constexpr uint32_t MAX_CHUNKS_HELD_PER_RECEIVER = 256u;
constexpr uint32_t MAX_RECEIVER_QUEUE_CAPACITY = MAX_CHUNKS_HELD_PER_RECEIVER;
/// With MAX_RECEIVER_QUEUE_CAPACITY = MAX_CHUNKS_HELD_PER_RECEIVER we couple the maximum number of chunks a user is
/// allowed to hold with the maximum queue capacity.
/// This allows that a polling user can replace all the held chunks in one execution with all new ones
/// from a completely filled queue. Or the other way round, when we have a contract with the user
/// regarding how many chunks they are allowed to hold, then the queue size needs not be bigger. We
/// can provide this number of newest chunks, more the user would not be allowed to hold anyway
constexpr uint32_t MAX_INTERFACE_CAPRO_FIFO_SIZE = MAX_PORT_NUMBER;
constexpr uint32_t MAX_APPLICATION_CAPRO_FIFO_SIZE = 128u;

Expand Down Expand Up @@ -119,4 +126,3 @@ using FindServiceHandler = std::function<void(InstanceContainer&, FindServiceHan
} // namespace runtime

} // namespace iox

Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#ifndef IOX_POPO_CHUNK_DISTRIBUTOR_HPP_
#define IOX_POPO_CHUNK_DISTRIBUTOR_HPP_

#include "iceoryx_posh/internal/mepoo/shared_chunk.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/chunk_distributor_data.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_pusher.hpp"

namespace iox
{
namespace popo
{
/// @brief The ChunkDistributor is the low layer building block to send SharedChunks to a dynamic number of ChunkQueus.
/// Together with the ChunkQueue, the ChunkDistributor builds the infrastructure to exchange memory chunks between
/// Together with the ChunkQueuePusher, the ChunkDistributor builds the infrastructure to exchange memory chunks between
/// different data producers and consumers that could be located in different processes. Besides a modifiable container
/// of ChunkQueues to which a SharedChunk can be deliverd, it holds a configurable history of last sent chunks. This
/// allows to provide a newly added queue a number of last chunks to start from. This is needed for functionality
Expand All @@ -31,22 +33,26 @@ namespace popo
///
/// About Concurrency:
/// This ChunkDistributor can be used with different LockingPolicies for different scenarios
/// When different threads operate on it (e.g. application sends chunks and RouDi adds and removes queues),
/// a locking policy must be used that ensures consistent data in the ChunkDistributorData.
Comment on lines -34 to -35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep fighting the trailing whitespaces :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't your IDE automatically remove trailing whitespaces ... me starting IDE war and running away ;)

/// When different threads operate on it (e.g. application sends chunks and RouDi adds and removes queues),
/// a locking policy must be used that ensures consistent data in the ChunkDistributorData.
/// @todo There are currently some challenge:
/// For the stored queues and the history, containers are used which are not thread safe. Therefore we use an inter-process
/// mutex. But this can lead to deadlocks if a user process gets terminated while one of its threads is in the ChunkDistributor
/// and holds a lock. An easier setup would be if changing the queues by a middleware thread and sending chunks by the user process
/// would not interleave. I.e. there is no concurrent access to the containers. Then a memory synchronization would be sufficient
/// The cleanup() call is the biggest challenge. This is used to free chunks that are still held by a not properly terminated
/// user application. Even if access from middleware and user threads do not overlap, the history container to cleanup could be
/// in an inconsistent state as the application was hard terminated while changing it. We would need a container like the
/// UsedChunkList to have one that is robust against such inconsistencies.... A perfect job for our future selves
template <uint32_t MaxQueues, typename LockingPolicy>
/// For the stored queues and the history, containers are used which are not thread safe. Therefore we use an
/// inter-process mutex. But this can lead to deadlocks if a user process gets terminated while one of its
/// threads is in the ChunkDistributor and holds a lock. An easier setup would be if changing the queues
/// by a middleware thread and sending chunks by the user process would not interleave. I.e. there is no concurrent
/// access to the containers. Then a memory synchronization would be sufficient.
/// The cleanup() call is the biggest challenge. This is used to free chunks that are still held by a not properly
/// terminated user application. Even if access from middleware and user threads do not overlap, the history
/// container to cleanup could be in an inconsistent state as the application was hard terminated while changing it.
/// We would need a container like the UsedChunkList to have one that is robust against such inconsistencies....
/// A perfect job for our future selves
template <typename ChunkDistributorDataType>
class ChunkDistributor
{
public:
using MemberType_t = ChunkDistributorData<MaxQueues, LockingPolicy>;
using MemberType_t = ChunkDistributorDataType;
using ChunkQueueData_t = typename ChunkDistributorDataType::ChunkQueueData_t;
using ChunkQueuePusher_t = typename ChunkDistributorDataType::ChunkQueuePusher_t;

ChunkDistributor(MemberType_t* const chunkDistrubutorDataPtr) noexcept;

Expand All @@ -62,11 +68,11 @@ class ChunkDistributor
/// @param[in] requestedHistory number of last chunks from history to send if available. If history size is smaller
/// then the available history size chunks are provided
/// @return true on success otherwise false
bool addQueue(ChunkQueue::MemberType_t* const queueToAdd, uint64_t requestedHistory = 0) noexcept;
bool addQueue(ChunkQueueData_t* const queueToAdd, uint64_t requestedHistory = 0) noexcept;

/// @brief Remove a queue from the internal list of chunk queues
/// @param[in] chunk queue to remove from the list
void removeQueue(ChunkQueue::MemberType_t* const queueToRemove) noexcept;
void removeQueue(ChunkQueueData_t* const queueToRemove) noexcept;

/// @brief Delete all the stored chunk queues
void removeAllQueues() noexcept;
Expand All @@ -84,7 +90,7 @@ class ChunkDistributor
/// history
/// @param[in] chunk queue to which this chunk shall be delivered
/// @param[in] shared chunk to be delivered
void deliverToQueue(ChunkQueue::MemberType_t* const queue, mepoo::SharedChunk chunk) noexcept;
void deliverToQueue(ChunkQueueData_t* const queue, mepoo::SharedChunk chunk) noexcept;

/// @brief Update the chunk history but do not deliver the chunk to any chunk queue. E.g. use case is to to update a
/// non offered field in ara
Expand Down Expand Up @@ -117,3 +123,5 @@ class ChunkDistributor
} // namespace iox

#include "iceoryx_posh/internal/popo/building_blocks/chunk_distributor.inl"

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -17,58 +17,57 @@ namespace iox
{
namespace popo
{
template <uint32_t MaxQueues, typename LockingPolicy>
inline ChunkDistributor<MaxQueues, LockingPolicy>::ChunkDistributor(
template <typename ChunkDistributorDataType>
inline ChunkDistributor<ChunkDistributorDataType>::ChunkDistributor(
MemberType_t* const chunkDistrubutorDataPtr) noexcept
: m_chunkDistrubutorDataPtr(chunkDistrubutorDataPtr)
{
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline const typename ChunkDistributor<MaxQueues, LockingPolicy>::MemberType_t*
ChunkDistributor<MaxQueues, LockingPolicy>::getMembers() const noexcept
template <typename ChunkDistributorDataType>
inline const typename ChunkDistributor<ChunkDistributorDataType>::MemberType_t*
ChunkDistributor<ChunkDistributorDataType>::getMembers() const noexcept
{
return m_chunkDistrubutorDataPtr;
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline typename ChunkDistributor<MaxQueues, LockingPolicy>::MemberType_t*
ChunkDistributor<MaxQueues, LockingPolicy>::getMembers() noexcept
template <typename ChunkDistributorDataType>
inline typename ChunkDistributor<ChunkDistributorDataType>::MemberType_t*
ChunkDistributor<ChunkDistributorDataType>::getMembers() noexcept
{
return m_chunkDistrubutorDataPtr;
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline bool ChunkDistributor<MaxQueues, LockingPolicy>::addQueue(ChunkQueue::MemberType_t* const queueToAdd,
template <typename ChunkDistributorDataType>
inline bool ChunkDistributor<ChunkDistributorDataType>::addQueue(ChunkQueueData_t* const queueToAdd,
uint64_t requestedHistory) noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());
typename MemberType_t::LockGuard_t lock(*getMembers());

if (nullptr == queueToAdd)
{
return false;
}

auto l_alreadyKnownReceiver =
std::find_if(getMembers()->m_queues.begin(),
getMembers()->m_queues.end(),
[&](ChunkQueue::MemberType_t* const queue) { return queue == queueToAdd; });
auto alreadyKnownReceiver = std::find_if(getMembers()->m_queues.begin(),
getMembers()->m_queues.end(),
[&](ChunkQueueData_t* const queue) { return queue == queueToAdd; });

// check if the queue is not already in the list
if (l_alreadyKnownReceiver == getMembers()->m_queues.end())
if (alreadyKnownReceiver == getMembers()->m_queues.end())
{
if (getMembers()->m_queues.size() < getMembers()->m_queues.capacity())
{
getMembers()->m_queues.push_back(queueToAdd);

uint64_t currChunkHistorySize = getMembers()->m_sampleHistory.size();
uint64_t currChunkHistorySize = getMembers()->m_history.size();

// if the current history is large enough we send the requested number of chunks, else we send the
// total history
auto startIndex = (requestedHistory <= currChunkHistorySize) ? currChunkHistorySize - requestedHistory : 0u;
for (auto i = startIndex; i < currChunkHistorySize; ++i)
{
deliverToQueue(queueToAdd, getMembers()->m_sampleHistory[i]);
deliverToQueue(queueToAdd, getMembers()->m_history[i]);
}
}
else
Expand All @@ -81,39 +80,38 @@ inline bool ChunkDistributor<MaxQueues, LockingPolicy>::addQueue(ChunkQueue::Mem
return true;
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void
ChunkDistributor<MaxQueues, LockingPolicy>::removeQueue(ChunkQueue::MemberType_t* const queueToRemove) noexcept
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::removeQueue(ChunkQueueData_t* const queueToRemove) noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());
typename MemberType_t::LockGuard_t lock(*getMembers());

auto l_iter = std::find(getMembers()->m_queues.begin(), getMembers()->m_queues.end(), queueToRemove);
if (l_iter != getMembers()->m_queues.end())
auto iter = std::find(getMembers()->m_queues.begin(), getMembers()->m_queues.end(), queueToRemove);
if (iter != getMembers()->m_queues.end())
{
getMembers()->m_queues.erase(l_iter);
getMembers()->m_queues.erase(iter);
}
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::removeAllQueues() noexcept
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::removeAllQueues() noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());
typename MemberType_t::LockGuard_t lock(*getMembers());

getMembers()->m_queues.clear();
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline bool ChunkDistributor<MaxQueues, LockingPolicy>::hasStoredQueues() noexcept
template <typename ChunkDistributorDataType>
inline bool ChunkDistributor<ChunkDistributorDataType>::hasStoredQueues() noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());
typename MemberType_t::LockGuard_t lock(*getMembers());

return !getMembers()->m_queues.empty();
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());
typename MemberType_t::LockGuard_t lock(*getMembers());

// send to all the queues
for (auto& queue : getMembers()->m_queues)
Expand All @@ -125,52 +123,52 @@ inline void ChunkDistributor<MaxQueues, LockingPolicy>::deliverToAllStoredQueues
addToHistoryWithoutDelivery(chunk);
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::deliverToQueue(ChunkQueue::MemberType_t* const queue,
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::deliverToQueue(ChunkQueueData_t* const queue,
mepoo::SharedChunk chunk) noexcept
{
ChunkQueue(queue).push(chunk);
ChunkQueuePusher_t(queue).push(chunk);
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::addToHistoryWithoutDelivery(mepoo::SharedChunk chunk) noexcept
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::addToHistoryWithoutDelivery(mepoo::SharedChunk chunk) noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());
typename MemberType_t::LockGuard_t lock(*getMembers());

if (0 < getMembers()->m_historyCapacity)
{
if (getMembers()->m_sampleHistory.size() >= getMembers()->m_historyCapacity)
if (getMembers()->m_history.size() >= getMembers()->m_historyCapacity)
{
getMembers()->m_sampleHistory.erase(getMembers()->m_sampleHistory.begin());
getMembers()->m_history.erase(getMembers()->m_history.begin());
}
getMembers()->m_sampleHistory.push_back(chunk);
getMembers()->m_history.push_back(chunk);
}
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline uint64_t ChunkDistributor<MaxQueues, LockingPolicy>::getHistorySize() noexcept
template <typename ChunkDistributorDataType>
inline uint64_t ChunkDistributor<ChunkDistributorDataType>::getHistorySize() noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());
typename MemberType_t::LockGuard_t lock(*getMembers());

return getMembers()->m_sampleHistory.size();
return getMembers()->m_history.size();
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline uint64_t ChunkDistributor<MaxQueues, LockingPolicy>::getHistoryCapacity() const noexcept
template <typename ChunkDistributorDataType>
inline uint64_t ChunkDistributor<ChunkDistributorDataType>::getHistoryCapacity() const noexcept
{
return getMembers()->m_historyCapacity;
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::clearHistory() noexcept
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::clearHistory() noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());
typename MemberType_t::LockGuard_t lock(*getMembers());

getMembers()->m_sampleHistory.clear();
getMembers()->m_history.clear();
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::cleanup() noexcept
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::cleanup() noexcept
{
if (getMembers()->tryLock())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#ifndef IOX_POPO_CHUNK_DISTRIBUTOR_DATA_HPP_
#define IOX_POPO_CHUNK_DISTRIBUTOR_DATA_HPP_

#include "iceoryx_posh/iceoryx_posh_types.hpp"
#include "iceoryx_posh/internal/log/posh_logging.hpp"
#include "iceoryx_posh/internal/mepoo/shared_chunk.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_pusher.hpp"
#include "iceoryx_utils/cxx/algorithm.hpp"
#include "iceoryx_utils/cxx/vector.hpp"
#include "iceoryx_utils/internal/posix_wrapper/mutex.hpp"
Expand All @@ -29,8 +30,6 @@ namespace iox
{
namespace popo
{
struct ChunkQueueData;

Comment on lines -32 to -33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

class ThreadSafePolicy
{
public: // needs to be public since we want to use std::lock_guard
Expand Down Expand Up @@ -66,33 +65,37 @@ class SingleThreadedPolicy
}
};

template <uint32_t MaxQueues, typename LockingPolicy>
template <uint32_t MaxQueues, typename LockingPolicy, typename ChunkQueuePusherType = ChunkQueuePusher>
struct ChunkDistributorData : public LockingPolicy
{
using lockGuard_t = std::lock_guard<ChunkDistributorData<MaxQueues, LockingPolicy>>;
using LockGuard_t = std::lock_guard<ChunkDistributorData<MaxQueues, LockingPolicy, ChunkQueuePusherType>>;
using ChunkQueuePusher_t = ChunkQueuePusherType;
using ChunkQueueData_t = typename ChunkQueuePusherType::MemberType_t;

ChunkDistributorData(uint64_t historyCapacity = 0u) noexcept
: m_historyCapacity(algorithm::min(historyCapacity, MAX_SENDER_SAMPLE_HISTORY_CAPACITY))
: m_historyCapacity(algorithm::min(historyCapacity, MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR))
{
if (m_historyCapacity != historyCapacity)
{
LogWarn() << "Chunk history too large, reducing from " << historyCapacity << " to "
<< MAX_SENDER_SAMPLE_HISTORY_CAPACITY;
<< MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR;
}
}

const uint64_t m_historyCapacity;

using QueueContainer_t = cxx::vector<ChunkQueue::MemberType_t*, MaxQueues>;
using QueueContainer_t = cxx::vector<ChunkQueueData_t*, MaxQueues>;
QueueContainer_t m_queues;

/// @todo using ChunkManagement instead of SharedChunk as in UsedChunkList?
/// When to store a SharedChunk and when the included ChunkManagement must be used?
/// If we would make the ChunkDistributor lock-free, can we than extend the UsedChunkList to
/// be like a ring buffer and use this for the history? This would be needed to be able to safely cleanup
using SampleHistoryContainer_t = cxx::vector<mepoo::SharedChunk, MAX_SENDER_SAMPLE_HISTORY_CAPACITY>;
SampleHistoryContainer_t m_sampleHistory;
using HistoryContainer_t = cxx::vector<mepoo::SharedChunk, MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR>;
HistoryContainer_t m_history;
};

} // namespace popo
} // namespace iox

#endif
Loading