Skip to content

Commit

Permalink
Merge remote-tracking branch 'michael/iox-#25-chunk-receiver-building…
Browse files Browse the repository at this point in the history
…-block' into iox-eclipse-iceoryx#25-communication-setup-with-chunk-building-blocks-integration-test
  • Loading branch information
mossmaurice committed Apr 20, 2020
2 parents ffbff44 + 472aa97 commit 7dcc0e2
Show file tree
Hide file tree
Showing 24 changed files with 889 additions and 292 deletions.
4 changes: 3 additions & 1 deletion iceoryx_posh/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ add_library(iceoryx_posh
source/popo/sender_port.cpp
source/popo/sender_port_data.cpp
source/popo/receiver_handler.cpp
source/popo/building_blocks/chunk_queue.cpp
source/popo/building_blocks/chunk_queue_pusher.cpp
source/popo/building_blocks/chunk_queue_popper.cpp
source/popo/building_blocks/chunk_receiver.cpp
source/runtime/message_queue_interface.cpp
source/runtime/message_queue_message.cpp
source/runtime/port_config_info.cpp
Expand Down
12 changes: 9 additions & 3 deletions iceoryx_posh/include/iceoryx_posh/iceoryx_posh_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,15 @@ constexpr uint32_t MAX_PORT_NUMBER = 1024u;
constexpr uint32_t MAX_INTERFACE_NUMBER = 4u;
constexpr uint32_t MAX_RECEIVERS_PER_SENDERPORT = 256u;
constexpr uint32_t MAX_CHUNKS_ALLOCATE_PER_SENDER = 8u;
constexpr uint64_t MAX_SENDER_SAMPLE_HISTORY_CAPACITY = 16u;
constexpr uint32_t MAX_RECEIVER_QUEUE_CAPACITY = 256u;
constexpr uint64_t MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR = 16u;
constexpr uint32_t MAX_CHUNKS_HELD_PER_RECEIVER = 256u;
constexpr uint32_t MAX_RECEIVER_QUEUE_CAPACITY = MAX_CHUNKS_HELD_PER_RECEIVER;
/// With MAX_RECEIVER_QUEUE_CAPACITY = MAX_CHUNKS_HELD_PER_RECEIVER we couple the maximum number of chunks a user is
/// allowed to hold with the maximum queue capacity.
/// This allows that a polling user can replace all the held chunks in one execution with all new ones
/// from a completely filled queue. Or the other way round, when we have a contract with the user
/// regarding how many chunks they are allowed to hold, then the queue size needs not be bigger. We
/// can provide this number of newest chunks, more the user would not be allowed to hold anyway
constexpr uint32_t MAX_INTERFACE_CAPRO_FIFO_SIZE = MAX_PORT_NUMBER;
constexpr uint32_t MAX_APPLICATION_CAPRO_FIFO_SIZE = 128u;

Expand Down Expand Up @@ -119,4 +126,3 @@ using FindServiceHandler = std::function<void(InstanceContainer&, FindServiceHan
} // namespace runtime

} // namespace iox

Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#ifndef IOX_POPO_CHUNK_DISTRIBUTOR_HPP_
#define IOX_POPO_CHUNK_DISTRIBUTOR_HPP_

#include "iceoryx_posh/internal/mepoo/shared_chunk.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/chunk_distributor_data.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_pusher.hpp"

namespace iox
{
namespace popo
{
/// @brief The ChunkDistributor is the low layer building block to send SharedChunks to a dynamic number of ChunkQueus.
/// Together with the ChunkQueue, the ChunkDistributor builds the infrastructure to exchange memory chunks between
/// Together with the ChunkQueuePusher, the ChunkDistributor builds the infrastructure to exchange memory chunks between
/// different data producers and consumers that could be located in different processes. Besides a modifiable container
/// of ChunkQueues to which a SharedChunk can be deliverd, it holds a configurable history of last sent chunks. This
/// allows to provide a newly added queue a number of last chunks to start from. This is needed for functionality
Expand All @@ -31,22 +33,26 @@ namespace popo
///
/// About Concurrency:
/// This ChunkDistributor can be used with different LockingPolicies for different scenarios
/// When different threads operate on it (e.g. application sends chunks and RouDi adds and removes queues),
/// a locking policy must be used that ensures consistent data in the ChunkDistributorData.
/// When different threads operate on it (e.g. application sends chunks and RouDi adds and removes queues),
/// a locking policy must be used that ensures consistent data in the ChunkDistributorData.
/// @todo There are currently some challenge:
/// For the stored queues and the history, containers are used which are not thread safe. Therefore we use an inter-process
/// mutex. But this can lead to deadlocks if a user process gets terminated while one of its threads is in the ChunkDistributor
/// and holds a lock. An easier setup would be if changing the queues by a middleware thread and sending chunks by the user process
/// would not interleave. I.e. there is no concurrent access to the containers. Then a memory synchronization would be sufficient
/// The cleanup() call is the biggest challenge. This is used to free chunks that are still held by a not properly terminated
/// user application. Even if access from middleware and user threads do not overlap, the history container to cleanup could be
/// in an inconsistent state as the application was hard terminated while changing it. We would need a container like the
/// UsedChunkList to have one that is robust against such inconsistencies.... A perfect job for our future selves
template <uint32_t MaxQueues, typename LockingPolicy>
/// For the stored queues and the history, containers are used which are not thread safe. Therefore we use an
/// inter-process mutex. But this can lead to deadlocks if a user process gets terminated while one of its
/// threads is in the ChunkDistributor and holds a lock. An easier setup would be if changing the queues
/// by a middleware thread and sending chunks by the user process would not interleave. I.e. there is no concurrent
/// access to the containers. Then a memory synchronization would be sufficient.
/// The cleanup() call is the biggest challenge. This is used to free chunks that are still held by a not properly
/// terminated user application. Even if access from middleware and user threads do not overlap, the history
/// container to cleanup could be in an inconsistent state as the application was hard terminated while changing it.
/// We would need a container like the UsedChunkList to have one that is robust against such inconsistencies....
/// A perfect job for our future selves
template <typename ChunkDistributorDataType>
class ChunkDistributor
{
public:
using MemberType_t = ChunkDistributorData<MaxQueues, LockingPolicy>;
using MemberType_t = ChunkDistributorDataType;
using ChunkQueueData_t = typename ChunkDistributorDataType::ChunkQueueData_t;
using ChunkQueuePusher_t = typename ChunkDistributorDataType::ChunkQueuePusher_t;

ChunkDistributor(MemberType_t* const chunkDistrubutorDataPtr) noexcept;

Expand All @@ -62,11 +68,11 @@ class ChunkDistributor
/// @param[in] requestedHistory number of last chunks from history to send if available. If history size is smaller
/// then the available history size chunks are provided
/// @return true on success otherwise false
bool addQueue(ChunkQueue::MemberType_t* const queueToAdd, uint64_t requestedHistory = 0) noexcept;
bool addQueue(ChunkQueueData_t* const queueToAdd, uint64_t requestedHistory = 0) noexcept;

/// @brief Remove a queue from the internal list of chunk queues
/// @param[in] chunk queue to remove from the list
void removeQueue(ChunkQueue::MemberType_t* const queueToRemove) noexcept;
void removeQueue(ChunkQueueData_t* const queueToRemove) noexcept;

/// @brief Delete all the stored chunk queues
void removeAllQueues() noexcept;
Expand All @@ -84,7 +90,7 @@ class ChunkDistributor
/// history
/// @param[in] chunk queue to which this chunk shall be delivered
/// @param[in] shared chunk to be delivered
void deliverToQueue(ChunkQueue::MemberType_t* const queue, mepoo::SharedChunk chunk) noexcept;
void deliverToQueue(ChunkQueueData_t* const queue, mepoo::SharedChunk chunk) noexcept;

/// @brief Update the chunk history but do not deliver the chunk to any chunk queue. E.g. use case is to to update a
/// non offered field in ara
Expand Down Expand Up @@ -117,3 +123,5 @@ class ChunkDistributor
} // namespace iox

#include "iceoryx_posh/internal/popo/building_blocks/chunk_distributor.inl"

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -17,58 +17,57 @@ namespace iox
{
namespace popo
{
template <uint32_t MaxQueues, typename LockingPolicy>
inline ChunkDistributor<MaxQueues, LockingPolicy>::ChunkDistributor(
template <typename ChunkDistributorDataType>
inline ChunkDistributor<ChunkDistributorDataType>::ChunkDistributor(
MemberType_t* const chunkDistrubutorDataPtr) noexcept
: m_chunkDistrubutorDataPtr(chunkDistrubutorDataPtr)
{
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline const typename ChunkDistributor<MaxQueues, LockingPolicy>::MemberType_t*
ChunkDistributor<MaxQueues, LockingPolicy>::getMembers() const noexcept
template <typename ChunkDistributorDataType>
inline const typename ChunkDistributor<ChunkDistributorDataType>::MemberType_t*
ChunkDistributor<ChunkDistributorDataType>::getMembers() const noexcept
{
return m_chunkDistrubutorDataPtr;
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline typename ChunkDistributor<MaxQueues, LockingPolicy>::MemberType_t*
ChunkDistributor<MaxQueues, LockingPolicy>::getMembers() noexcept
template <typename ChunkDistributorDataType>
inline typename ChunkDistributor<ChunkDistributorDataType>::MemberType_t*
ChunkDistributor<ChunkDistributorDataType>::getMembers() noexcept
{
return m_chunkDistrubutorDataPtr;
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline bool ChunkDistributor<MaxQueues, LockingPolicy>::addQueue(ChunkQueue::MemberType_t* const queueToAdd,
template <typename ChunkDistributorDataType>
inline bool ChunkDistributor<ChunkDistributorDataType>::addQueue(ChunkQueueData_t* const queueToAdd,
uint64_t requestedHistory) noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());
typename MemberType_t::LockGuard_t lock(*getMembers());

if (nullptr == queueToAdd)
{
return false;
}

auto l_alreadyKnownReceiver =
std::find_if(getMembers()->m_queues.begin(),
getMembers()->m_queues.end(),
[&](ChunkQueue::MemberType_t* const queue) { return queue == queueToAdd; });
auto alreadyKnownReceiver = std::find_if(getMembers()->m_queues.begin(),
getMembers()->m_queues.end(),
[&](ChunkQueueData_t* const queue) { return queue == queueToAdd; });

// check if the queue is not already in the list
if (l_alreadyKnownReceiver == getMembers()->m_queues.end())
if (alreadyKnownReceiver == getMembers()->m_queues.end())
{
if (getMembers()->m_queues.size() < getMembers()->m_queues.capacity())
{
getMembers()->m_queues.push_back(queueToAdd);

uint64_t currChunkHistorySize = getMembers()->m_sampleHistory.size();
uint64_t currChunkHistorySize = getMembers()->m_history.size();

// if the current history is large enough we send the requested number of chunks, else we send the
// total history
auto startIndex = (requestedHistory <= currChunkHistorySize) ? currChunkHistorySize - requestedHistory : 0u;
for (auto i = startIndex; i < currChunkHistorySize; ++i)
{
deliverToQueue(queueToAdd, getMembers()->m_sampleHistory[i]);
deliverToQueue(queueToAdd, getMembers()->m_history[i]);
}
}
else
Expand All @@ -81,39 +80,38 @@ inline bool ChunkDistributor<MaxQueues, LockingPolicy>::addQueue(ChunkQueue::Mem
return true;
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void
ChunkDistributor<MaxQueues, LockingPolicy>::removeQueue(ChunkQueue::MemberType_t* const queueToRemove) noexcept
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::removeQueue(ChunkQueueData_t* const queueToRemove) noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());
typename MemberType_t::LockGuard_t lock(*getMembers());

auto l_iter = std::find(getMembers()->m_queues.begin(), getMembers()->m_queues.end(), queueToRemove);
if (l_iter != getMembers()->m_queues.end())
auto iter = std::find(getMembers()->m_queues.begin(), getMembers()->m_queues.end(), queueToRemove);
if (iter != getMembers()->m_queues.end())
{
getMembers()->m_queues.erase(l_iter);
getMembers()->m_queues.erase(iter);
}
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::removeAllQueues() noexcept
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::removeAllQueues() noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());
typename MemberType_t::LockGuard_t lock(*getMembers());

getMembers()->m_queues.clear();
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline bool ChunkDistributor<MaxQueues, LockingPolicy>::hasStoredQueues() noexcept
template <typename ChunkDistributorDataType>
inline bool ChunkDistributor<ChunkDistributorDataType>::hasStoredQueues() noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());
typename MemberType_t::LockGuard_t lock(*getMembers());

return !getMembers()->m_queues.empty();
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());
typename MemberType_t::LockGuard_t lock(*getMembers());

// send to all the queues
for (auto& queue : getMembers()->m_queues)
Expand All @@ -125,52 +123,52 @@ inline void ChunkDistributor<MaxQueues, LockingPolicy>::deliverToAllStoredQueues
addToHistoryWithoutDelivery(chunk);
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::deliverToQueue(ChunkQueue::MemberType_t* const queue,
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::deliverToQueue(ChunkQueueData_t* const queue,
mepoo::SharedChunk chunk) noexcept
{
ChunkQueue(queue).push(chunk);
ChunkQueuePusher_t(queue).push(chunk);
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::addToHistoryWithoutDelivery(mepoo::SharedChunk chunk) noexcept
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::addToHistoryWithoutDelivery(mepoo::SharedChunk chunk) noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());
typename MemberType_t::LockGuard_t lock(*getMembers());

if (0 < getMembers()->m_historyCapacity)
{
if (getMembers()->m_sampleHistory.size() >= getMembers()->m_historyCapacity)
if (getMembers()->m_history.size() >= getMembers()->m_historyCapacity)
{
getMembers()->m_sampleHistory.erase(getMembers()->m_sampleHistory.begin());
getMembers()->m_history.erase(getMembers()->m_history.begin());
}
getMembers()->m_sampleHistory.push_back(chunk);
getMembers()->m_history.push_back(chunk);
}
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline uint64_t ChunkDistributor<MaxQueues, LockingPolicy>::getHistorySize() noexcept
template <typename ChunkDistributorDataType>
inline uint64_t ChunkDistributor<ChunkDistributorDataType>::getHistorySize() noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());
typename MemberType_t::LockGuard_t lock(*getMembers());

return getMembers()->m_sampleHistory.size();
return getMembers()->m_history.size();
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline uint64_t ChunkDistributor<MaxQueues, LockingPolicy>::getHistoryCapacity() const noexcept
template <typename ChunkDistributorDataType>
inline uint64_t ChunkDistributor<ChunkDistributorDataType>::getHistoryCapacity() const noexcept
{
return getMembers()->m_historyCapacity;
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::clearHistory() noexcept
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::clearHistory() noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());
typename MemberType_t::LockGuard_t lock(*getMembers());

getMembers()->m_sampleHistory.clear();
getMembers()->m_history.clear();
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::cleanup() noexcept
template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::cleanup() noexcept
{
if (getMembers()->tryLock())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#ifndef IOX_POPO_CHUNK_DISTRIBUTOR_DATA_HPP_
#define IOX_POPO_CHUNK_DISTRIBUTOR_DATA_HPP_

#include "iceoryx_posh/iceoryx_posh_types.hpp"
#include "iceoryx_posh/internal/log/posh_logging.hpp"
#include "iceoryx_posh/internal/mepoo/shared_chunk.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_pusher.hpp"
#include "iceoryx_utils/cxx/algorithm.hpp"
#include "iceoryx_utils/cxx/vector.hpp"
#include "iceoryx_utils/internal/posix_wrapper/mutex.hpp"
Expand All @@ -29,8 +30,6 @@ namespace iox
{
namespace popo
{
struct ChunkQueueData;

class ThreadSafePolicy
{
public: // needs to be public since we want to use std::lock_guard
Expand Down Expand Up @@ -66,33 +65,37 @@ class SingleThreadedPolicy
}
};

template <uint32_t MaxQueues, typename LockingPolicy>
template <uint32_t MaxQueues, typename LockingPolicy, typename ChunkQueuePusherType = ChunkQueuePusher>
struct ChunkDistributorData : public LockingPolicy
{
using lockGuard_t = std::lock_guard<ChunkDistributorData<MaxQueues, LockingPolicy>>;
using LockGuard_t = std::lock_guard<ChunkDistributorData<MaxQueues, LockingPolicy, ChunkQueuePusherType>>;
using ChunkQueuePusher_t = ChunkQueuePusherType;
using ChunkQueueData_t = typename ChunkQueuePusherType::MemberType_t;

ChunkDistributorData(uint64_t historyCapacity = 0u) noexcept
: m_historyCapacity(algorithm::min(historyCapacity, MAX_SENDER_SAMPLE_HISTORY_CAPACITY))
: m_historyCapacity(algorithm::min(historyCapacity, MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR))
{
if (m_historyCapacity != historyCapacity)
{
LogWarn() << "Chunk history too large, reducing from " << historyCapacity << " to "
<< MAX_SENDER_SAMPLE_HISTORY_CAPACITY;
<< MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR;
}
}

const uint64_t m_historyCapacity;

using QueueContainer_t = cxx::vector<ChunkQueue::MemberType_t*, MaxQueues>;
using QueueContainer_t = cxx::vector<ChunkQueueData_t*, MaxQueues>;
QueueContainer_t m_queues;

/// @todo using ChunkManagement instead of SharedChunk as in UsedChunkList?
/// When to store a SharedChunk and when the included ChunkManagement must be used?
/// If we would make the ChunkDistributor lock-free, can we than extend the UsedChunkList to
/// be like a ring buffer and use this for the history? This would be needed to be able to safely cleanup
using SampleHistoryContainer_t = cxx::vector<mepoo::SharedChunk, MAX_SENDER_SAMPLE_HISTORY_CAPACITY>;
SampleHistoryContainer_t m_sampleHistory;
using HistoryContainer_t = cxx::vector<mepoo::SharedChunk, MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR>;
HistoryContainer_t m_history;
};

} // namespace popo
} // namespace iox

#endif
Loading

0 comments on commit 7dcc0e2

Please sign in to comment.