Skip to content

Commit

Permalink
Merge pull request #81 from budrus/iox-#25-chunk-sender-and-receiver-…
Browse files Browse the repository at this point in the history
…building-blocks

Iox #25 chunk sender building block
  • Loading branch information
budrus authored Apr 6, 2020
2 parents 80c5ae5 + 0f6e9e3 commit f777d22
Show file tree
Hide file tree
Showing 17 changed files with 1,348 additions and 306 deletions.
3 changes: 1 addition & 2 deletions iceoryx_posh/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ add_library(iceoryx_posh
source/popo/sender_port.cpp
source/popo/sender_port_data.cpp
source/popo/receiver_handler.cpp
source/popo/building_blocks/chunk_queue.cpp
source/popo/building_blocks/chunk_distributor.cpp
source/popo/building_blocks/chunk_queue.cpp
source/runtime/message_queue_interface.cpp
source/runtime/message_queue_message.cpp
source/runtime/port_config_info.cpp
Expand Down
2 changes: 1 addition & 1 deletion iceoryx_posh/include/iceoryx_posh/iceoryx_posh_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ constexpr uint32_t MAX_PORT_NUMBER = 1024u;
#endif
constexpr uint32_t MAX_INTERFACE_NUMBER = 4u;
constexpr uint32_t MAX_RECEIVERS_PER_SENDERPORT = 256u;
constexpr uint32_t MAX_SAMPLE_ALLOCATE_PER_SENDER = 8u;
constexpr uint32_t MAX_CHUNKS_ALLOCATE_PER_SENDER = 8u;
constexpr uint64_t MAX_SENDER_SAMPLE_HISTORY_CAPACITY = 16u;
constexpr uint32_t MAX_RECEIVER_QUEUE_CAPACITY = 256u;
constexpr uint32_t MAX_INTERFACE_CAPRO_FIFO_SIZE = MAX_PORT_NUMBER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,25 @@ namespace popo
/// allows to provide a newly added queue a number of last chunks to start from. This is needed for functionality
/// known as latched topic in ROS or field in ara::com. A ChunkDistributor is used to build elements of higher
/// abstraction layers that also do memory managemet and provide an API towards the real user
///
/// About Concurrency:
/// This ChunkDistributor can be used with different LockingPolicies for different scenarios
/// When different threads operate on it (e.g. application sends chunks and RouDi adds and removes queues),
/// a locking policy must be used that ensures consistent data in the ChunkDistributorData.
/// @todo There are currently some challenge:
/// For the stored queues and the history, containers are used which are not thread safe. Therefore we use an inter-process
/// mutex. But this can lead to deadlocks if a user process gets terminated while one of its threads is in the ChunkDistributor
/// and holds a lock. An easier setup would be if changing the queues by a middleware thread and sending chunks by the user process
/// would not interleave. I.e. there is no concurrent access to the containers. Then a memory synchronization would be sufficient
/// The cleanup() call is the biggest challenge. This is used to free chunks that are still held by a not properly terminated
/// user application. Even if access from middleware and user threads do not overlap, the history container to cleanup could be
/// in an inconsistent state as the application was hard terminated while changing it. We would need a container like the
/// UsedChunkList to have one that is robust against such inconsistencies.... A perfect job for our future selves
template <uint32_t MaxQueues, typename LockingPolicy>
class ChunkDistributor
{
public:
using MemberType_t = ChunkDistributorData;
using MemberType_t = ChunkDistributorData<MaxQueues, LockingPolicy>;

ChunkDistributor(MemberType_t* const chunkDistrubutorDataPtr) noexcept;

Expand Down Expand Up @@ -82,11 +97,14 @@ class ChunkDistributor

/// @brief Get the capacity of the chunk history
/// @return chunk history capacity
uint64_t getHistoryCapacity() noexcept;
uint64_t getHistoryCapacity() const noexcept;

/// @brief Clears the chunk history
void clearHistory() noexcept;

/// @brief cleanup the used shrared memory chunks
void cleanup() noexcept;

protected:
const MemberType_t* getMembers() const noexcept;
MemberType_t* getMembers() noexcept;
Expand All @@ -97,3 +115,5 @@ class ChunkDistributor

} // namespace popo
} // namespace iox

#include "iceoryx_posh/internal/popo/building_blocks/chunk_distributor.inl"
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.


namespace iox
{
namespace popo
{
template <uint32_t MaxQueues, typename LockingPolicy>
inline ChunkDistributor<MaxQueues, LockingPolicy>::ChunkDistributor(
MemberType_t* const chunkDistrubutorDataPtr) noexcept
: m_chunkDistrubutorDataPtr(chunkDistrubutorDataPtr)
{
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline const typename ChunkDistributor<MaxQueues, LockingPolicy>::MemberType_t*
ChunkDistributor<MaxQueues, LockingPolicy>::getMembers() const noexcept
{
return m_chunkDistrubutorDataPtr;
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline typename ChunkDistributor<MaxQueues, LockingPolicy>::MemberType_t*
ChunkDistributor<MaxQueues, LockingPolicy>::getMembers() noexcept
{
return m_chunkDistrubutorDataPtr;
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline bool ChunkDistributor<MaxQueues, LockingPolicy>::addQueue(ChunkQueue::MemberType_t* const queueToAdd,
uint64_t requestedHistory) noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());

if (nullptr == queueToAdd)
{
return false;
}

auto l_alreadyKnownReceiver =
std::find_if(getMembers()->m_queues.begin(),
getMembers()->m_queues.end(),
[&](ChunkQueue::MemberType_t* const queue) { return queue == queueToAdd; });

// check if the queue is not already in the list
if (l_alreadyKnownReceiver == getMembers()->m_queues.end())
{
if (getMembers()->m_queues.size() < getMembers()->m_queues.capacity())
{
getMembers()->m_queues.push_back(queueToAdd);

uint64_t currChunkHistorySize = getMembers()->m_sampleHistory.size();

// if the current history is large enough we send the requested number of chunks, else we send the
// total history
auto startIndex = (requestedHistory <= currChunkHistorySize) ? currChunkHistorySize - requestedHistory : 0u;
for (auto i = startIndex; i < currChunkHistorySize; ++i)
{
deliverToQueue(queueToAdd, getMembers()->m_sampleHistory[i]);
}
}
else
{
errorHandler(Error::kPOPO__CHUNK_DISTRIBUTOR_OVERFLOW_OF_QUEUE_CONTAINER, nullptr, ErrorLevel::SEVERE);
return false;
}
}

return true;
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void
ChunkDistributor<MaxQueues, LockingPolicy>::removeQueue(ChunkQueue::MemberType_t* const queueToRemove) noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());

auto l_iter = std::find(getMembers()->m_queues.begin(), getMembers()->m_queues.end(), queueToRemove);
if (l_iter != getMembers()->m_queues.end())
{
getMembers()->m_queues.erase(l_iter);
}
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::removeAllQueues() noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());

getMembers()->m_queues.clear();
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline bool ChunkDistributor<MaxQueues, LockingPolicy>::hasStoredQueues() noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());

return !getMembers()->m_queues.empty();
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());

// send to all the queues
for (auto& queue : getMembers()->m_queues)
{
deliverToQueue(queue, chunk);
}

// update the history
addToHistoryWithoutDelivery(chunk);
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::deliverToQueue(ChunkQueue::MemberType_t* const queue,
mepoo::SharedChunk chunk) noexcept
{
ChunkQueue(queue).push(chunk);
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::addToHistoryWithoutDelivery(mepoo::SharedChunk chunk) noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());

if (0 < getMembers()->m_historyCapacity)
{
if (getMembers()->m_sampleHistory.size() >= getMembers()->m_historyCapacity)
{
getMembers()->m_sampleHistory.erase(getMembers()->m_sampleHistory.begin());
}
getMembers()->m_sampleHistory.push_back(chunk);
}
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline uint64_t ChunkDistributor<MaxQueues, LockingPolicy>::getHistorySize() noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());

return getMembers()->m_sampleHistory.size();
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline uint64_t ChunkDistributor<MaxQueues, LockingPolicy>::getHistoryCapacity() const noexcept
{
return getMembers()->m_historyCapacity;
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::clearHistory() noexcept
{
typename MemberType_t::lockGuard_t lock(*getMembers());

getMembers()->m_sampleHistory.clear();
}

template <uint32_t MaxQueues, typename LockingPolicy>
inline void ChunkDistributor<MaxQueues, LockingPolicy>::cleanup() noexcept
{
if (getMembers()->tryLock())
{
clearHistory();
getMembers()->unlock();
}
else
{
/// @todo currently we have a deadlock / mutex destroy vulnerability if the ThreadSafePolicy is used
/// and a sending application dies when having the lock for sending. If the RouDi daemon wants to
/// cleanup or does discovery changes we have a deadlock or an exception when destroying the mutex
/// As long as we don't have a multi-threaded lock-free ChunkDistributor or another concept we die here
errorHandler(Error::kPOPO__CHUNK_DISTRIBUTOR_CLEANUP_DEADLOCK_BECAUSE_BAD_APPLICATION_TERMINATION,
nullptr,
ErrorLevel::FATAL);
}
}

} // namespace popo
} // namespace iox
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,58 @@
#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue.hpp"
#include "iceoryx_utils/cxx/algorithm.hpp"
#include "iceoryx_utils/cxx/vector.hpp"
#include "iceoryx_utils/internal/posix_wrapper/mutex.hpp"

#include <cstdint>
#include <mutex>

namespace iox
{
namespace popo
{
struct ChunkQueueData;

struct ChunkDistributorData
class ThreadSafePolicy
{
ChunkDistributorData(uint64_t historyCapacity = MAX_SENDER_SAMPLE_HISTORY_CAPACITY) noexcept
public: // needs to be public since we want to use std::lock_guard
void lock()
{
m_mutex.lock();
}
void unlock()
{
m_mutex.unlock();
}
bool tryLock()
{
return m_mutex.try_lock();
}

private:
posix::mutex m_mutex{true}; // recursive lock
};

class SingleThreadedPolicy
{
public: // needs to be public since we want to use std::lock_guard
void lock()
{
}
void unlock()
{
}
bool tryLock()
{
return true;
}
};

template <uint32_t MaxQueues, typename LockingPolicy>
struct ChunkDistributorData : public LockingPolicy
{
using lockGuard_t = std::lock_guard<ChunkDistributorData<MaxQueues, LockingPolicy>>;

ChunkDistributorData(uint64_t historyCapacity = 0u) noexcept
: m_historyCapacity(algorithm::min(historyCapacity, MAX_SENDER_SAMPLE_HISTORY_CAPACITY))
{
if (m_historyCapacity != historyCapacity)
Expand All @@ -43,10 +83,13 @@ struct ChunkDistributorData

const uint64_t m_historyCapacity;

using QueueContainer_t = cxx::vector<ChunkQueue::MemberType_t*, MAX_RECEIVERS_PER_SENDERPORT>;
using QueueContainer_t = cxx::vector<ChunkQueue::MemberType_t*, MaxQueues>;
QueueContainer_t m_queues;

/// @todo using ChunkManagement instead of SheredChunk as in UsedChunkList?
/// @todo using ChunkManagement instead of SharedChunk as in UsedChunkList?
/// When to store a SharedChunk and when the included ChunkManagement must be used?
/// If we would make the ChunkDistributor lock-free, can we than extend the UsedChunkList to
/// be like a ring buffer and use this for the history? This would be needed to be able to safely cleanup
using SampleHistoryContainer_t = cxx::vector<mepoo::SharedChunk, MAX_SENDER_SAMPLE_HISTORY_CAPACITY>;
SampleHistoryContainer_t m_sampleHistory;
};
Expand Down
Loading

0 comments on commit f777d22

Please sign in to comment.