Skip to content

Commit

Permalink
iox-eclipse-iceoryx#32 use the message queue from the posix wrapper i…
Browse files Browse the repository at this point in the history
…nstead of the posix mq_* calls
  • Loading branch information
elBoberido committed Feb 3, 2020
1 parent bc655d2 commit d07ca86
Show file tree
Hide file tree
Showing 12 changed files with 231 additions and 342 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class RouDiMultiProcess
bool m_killProcessesInDestructor;
std::atomic_bool m_runThreads;

const uint32_t m_MessageQueueTimeoutMilliseconds = 100;
const units::Duration m_messageQueueTimeout{units::Duration::milliseconds(static_cast<unsigned long long>(100))};

/// locks the socket for preventing multiple start of RouDi
RouDiLock m_roudilock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "iceoryx_posh/iceoryx_posh_types.hpp"
#include "iceoryx_posh/internal/runtime/message_queue_message.hpp"
#include "iceoryx_utils/internal/posix_wrapper/message_queue.hpp"
#include "iceoryx_utils/internal/units/duration.hpp"
#include "iceoryx_utils/platform/mqueue.hpp"
#include "iceoryx_utils/platform/stat.hpp"
Expand Down Expand Up @@ -117,13 +118,13 @@ class MqBase

/// @brief Tries to receive a message from the message queue within a
/// specified timeout. It stores the message in answer.
/// @param[in] timeout_ms Timeout in milliseconds.
/// @param[in] timeout for receiving a message.
/// @param[in] answer The answer of the message queue. If timedReceive
/// failed the content of answer is undefined.
/// @return If a valid message was received before the timeout occures
/// it returns true, otherwise false.
/// It also returns false if clock_gettime() failed
bool timedReceive(const uint32_t timeout_ms, MqMessage& answer) const noexcept;
bool timedReceive(const units::Duration timeout, MqMessage& answer) const noexcept;

/// @brief Tries to send the message specified in msg.
/// @param[in] msg Must be a valid message, if its an invalid message
Expand Down Expand Up @@ -187,11 +188,11 @@ class MqBase
// same InterfaceName are using the same message queue
MqBase(const std::string& InterfaceName, const long maxMessages, const long messageSize) noexcept;

MqBase(const MqBase&) = default;
MqBase(const MqBase&) = delete;
MqBase(MqBase&&) = default;
virtual ~MqBase() = default;

MqBase& operator=(const MqBase&) = default;
MqBase& operator=(const MqBase&) = delete;
MqBase& operator=(MqBase&&) = default;

/// @brief Set the content of answer from buffer.
Expand All @@ -202,22 +203,16 @@ class MqBase

/// @brief Opens a message queue with mq_open and default permissions
/// stored in m_perms and stores the descriptor in m_roudiMq
/// @param[in] name Unique identifier of the message queue
/// @param[in] oflag Flags that control the operation of the call.
/// They are defined in fcntl.h
/// @param[in] channelSide of the queue. SERVER will also destroy the message queue in the dTor, while CLIENT
/// keeps the message queue in the file system after the dTor is called
/// @return Returns true if a message queue could be opened, otherwise
/// false.
bool openMessageQueue(const std::string& name, const int oflag) noexcept;
bool openMessageQueue(const posix::IpcChannelSide channelSide) noexcept;

/// @brief Closes a message queue with mq_close
/// @return Returns true if the message queue could be closed, otherwise
/// false.
bool closeMessageQueue() const noexcept;

/// @brief Unlinks a message queue with mq_unlink
/// @return Returns true if the message queue could be unlinked,
/// otherwise false.
bool unlinkMessageQueue() const noexcept;
bool closeMessageQueue() noexcept;

/// @brief If a message queue was moved then m_interfaceName was cleared
/// and this object gave up the control of that specific
Expand All @@ -229,25 +224,14 @@ class MqBase
bool hasClosableMessageQueue() const noexcept;

protected:
static constexpr long MQ_FLAGS = 0; // ignored by mq_open
static constexpr long MQ_CUR_MSGS = 0; // ignored by mq_open
static constexpr long MAX_MESSAGE_LENGTH = 4096;
static constexpr int NULL_TERMINATOR_SIZE = 1;
/// @todo in QNX there are two addtional fields, mq_sendwait and mq_recvwait
/// mq_sendwait: number of processes waiting to send
/// mq_recvwait: number of processes waiting to receive

bool m_isInitialized{true};

// dont initialize m_attr with actual values here,
// fields have a different order in QNX,
// so we need to initialize by name (see ctor of MqBase)
struct mq_attr m_attr; // = {}; removed since they are initialized explicitly in MqBase (see above)
int m_perms{S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH};

int m_oflag{O_RDONLY};
mqd_t m_roudiMq{-1};
std::string m_interfaceName;
long m_maxMessageSize{0};
long m_maxMessages{0};
iox::posix::IpcChannelSide m_channelSide{posix::IpcChannelSide::CLIENT};
iox::posix::MessageQueue m_mq;
};

/// @brief Class for handling a message queue via mq_open and mq_close.
Expand Down Expand Up @@ -278,10 +262,8 @@ class MqInterfaceUser : public MqBase
/// @brief Since this object manages a system resource (message queue)
/// only the move constructor and assignment operator is
/// defined.
MqInterfaceUser(MqInterfaceUser&&) noexcept;
MqInterfaceUser& operator=(MqInterfaceUser&&) noexcept;

~MqInterfaceUser() noexcept;
MqInterfaceUser(MqInterfaceUser&&) = default;
MqInterfaceUser& operator=(MqInterfaceUser&&) = default;
};

/// @brief Class for handling a message queue via mq_open and mq_unlink
Expand Down Expand Up @@ -314,10 +296,12 @@ class MqInterfaceCreator : public MqBase
/// @brief Since this object manages a system resource (message queue)
/// only the move constructor and assignment operator is
/// defined.
MqInterfaceCreator(MqInterfaceCreator&&) noexcept;
MqInterfaceCreator& operator=(MqInterfaceCreator&&) noexcept;
MqInterfaceCreator(MqInterfaceCreator&&) = default;
MqInterfaceCreator& operator=(MqInterfaceCreator&&) = default;

~MqInterfaceCreator() noexcept;
private:
friend class MqRuntimeInterface;
void cleanupResource();
};

class MqRuntimeInterface
Expand Down
2 changes: 1 addition & 1 deletion iceoryx_posh/source/roudi/roudi_multi_process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void RouDiMultiProcess::mqThread()
runtime::MqMessage message;
/// @todo do we really need timedReceive? an alternative solution would be to close the message queue,
/// which also results in a return from mq_receive, and check the relevant errno and shutdown RouDi
if (!roudiMqInterface.timedReceive(m_MessageQueueTimeoutMilliseconds, message))
if (!roudiMqInterface.timedReceive(m_messageQueueTimeout, message))
{
// TODO: errorHandling
}
Expand Down
Loading

0 comments on commit d07ca86

Please sign in to comment.