Skip to content

Commit

Permalink
iox-eclipse-iceoryx#1693 iox-eclipse-iceoryx#1963 support for iox::st…
Browse files Browse the repository at this point in the history
…ring in MessageQueue

By default, the MessageQueue class relies on std::string objects
to send and receive data, as defined by its interface. However, this
approach can potentially lead to dynamic memory allocation when
handling larger data payloads that exceed the stack space
optimization (SSO) limit.

To address this issue, an alternative API has been introduced that
enables data transmission and reception using iox::string.
This approach allows for direct data manipulation within stack
memory, effectively eliminating the need for dynamic memory
allocation.

Signed-off-by: Luca Bartoli <[email protected]>
  • Loading branch information
lucabart97 authored and elBoberido committed Feb 5, 2024
1 parent 92e1d8e commit de85d41
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 75 deletions.
2 changes: 1 addition & 1 deletion doc/website/release-notes/iceoryx-unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@
- Fast POD data in `iox::vector` [#2082](https://github.com/eclipse-iceoryx/iceoryx/issues/2082)
- MinGW support for Windows [#2150](https://github.com/eclipse-iceoryx/iceoryx/issues/2150)
- Add support for `iox::string` in `UnixDomainSocket` and created `unix_domain_socket.inl` [#2040](https://github.com/eclipse-iceoryx/iceoryx/issues/2040)
- Add support for `iox::string` in `MessageQueue` and created `message_queue.inl` [#1963](https://github.com/eclipse-iceoryx/iceoryx/issues/1963)
- Add support for `iox::string` in `NamedPipe` and created `named_pipe.inl` [#1693](https://github.com/eclipse-iceoryx/iceoryx/issues/1693)


**Bugfixes:**

- FreeBSD CI build is broken [\#1338](https://github.com/eclipse-iceoryx/iceoryx/issues/1338)
Expand Down
213 changes: 213 additions & 0 deletions iceoryx_hoofs/posix/ipc/include/iox/detail/message_queue.inl
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright 2024, Eclipse Foundation and the iceoryx contributors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0
#ifndef IOX_HOOFS_POSIX_IPC_MESSAGE_QUEUE_INL
#define IOX_HOOFS_POSIX_IPC_MESSAGE_QUEUE_INL

#include "iox/duration.hpp"
#include "iox/message_queue.hpp"
#include "iox/not_null.hpp"
#include "iox/posix_call.hpp"

namespace iox
{
template <typename Type, MessageQueue::Termination Terminator>
expected<void, PosixIpcChannelError>
MessageQueue::timedSendImpl(not_null<const Type*> msg, uint64_t msgSize, const units::Duration& timeout) const noexcept
{
uint64_t msgSizeToSend = msgSize;
if constexpr (Terminator == Termination::NULL_TERMINATOR)
{
msgSizeToSend += NULL_TERMINATOR_SIZE;
}

if (msgSizeToSend > static_cast<uint64_t>(m_attributes.mq_msgsize))
{
IOX_LOG(ERROR, "the message which should be sent to the message queue '" << m_name << "' is too long");
return err(PosixIpcChannelError::MESSAGE_TOO_LONG);
}

timespec timeOut = timeout.timespec(units::TimeSpecReference::Epoch);
auto mqCall = IOX_POSIX_CALL(mq_timedsend)(m_mqDescriptor, msg, msgSizeToSend, 1U, &timeOut)
.failureReturnValue(ERROR_CODE)
// don't use the suppressErrorMessagesForErrnos method since QNX used EINTR instead of ETIMEDOUT
.ignoreErrnos(TIMEOUT_ERRNO)
.evaluate();

if (mqCall.has_error())
{
return err(errnoToEnum(mqCall.error().errnum));
}

if (mqCall->errnum == TIMEOUT_ERRNO)
{
return err(errnoToEnum(ETIMEDOUT));
}

return ok();
}

template <typename Type, MessageQueue::Termination Terminator>
expected<void, PosixIpcChannelError> MessageQueue::sendImpl(not_null<const Type*> msg, uint64_t msgSize) const noexcept
{
uint64_t msgSizeToSend = msgSize;
if constexpr (Terminator == Termination::NULL_TERMINATOR)
{
msgSizeToSend += NULL_TERMINATOR_SIZE;
}

if (msgSizeToSend > static_cast<uint64_t>(m_attributes.mq_msgsize))
{
IOX_LOG(ERROR, "the message which should be sent to the message queue '" << m_name << "' is too long");
return err(PosixIpcChannelError::MESSAGE_TOO_LONG);
}

auto mqCall =
IOX_POSIX_CALL(mq_send)(m_mqDescriptor, msg, msgSizeToSend, 1U).failureReturnValue(ERROR_CODE).evaluate();

if (mqCall.has_error())
{
return err(errnoToEnum(mqCall.error().errnum));
}

if (mqCall->errnum == TIMEOUT_ERRNO)
{
return err(errnoToEnum(ETIMEDOUT));
}

return ok();
}

template <typename Type, MessageQueue::Termination Terminator>
expected<uint64_t, PosixIpcChannelError>
MessageQueue::timedReceiveImpl(not_null<Type*> msg, uint64_t maxMsgSize, const units::Duration& timeout) const noexcept
{
timespec timeOut = timeout.timespec(units::TimeSpecReference::Epoch);
auto mqCall = IOX_POSIX_CALL(mq_timedreceive)(m_mqDescriptor, msg, maxMsgSize, nullptr, &timeOut)
.failureReturnValue(ERROR_CODE)
// don't use the suppressErrorMessagesForErrnos method since QNX used EINTR instead of ETIMEDOUT
.ignoreErrnos(TIMEOUT_ERRNO)
.evaluate();

if (mqCall.has_error())
{
return err(errnoToEnum(mqCall.error().errnum));
}

if (mqCall->errnum == TIMEOUT_ERRNO)
{
return err(errnoToEnum(ETIMEDOUT));
}

return receiveVerification<Type, Terminator>(msg, static_cast<uint64_t>(mqCall->value));
}

template <typename Type, MessageQueue::Termination Terminator>
expected<uint64_t, PosixIpcChannelError> MessageQueue::receiveImpl(not_null<Type*> msg,
uint64_t maxMsgSize) const noexcept
{
auto mqCall =
IOX_POSIX_CALL(mq_receive)(m_mqDescriptor, msg, maxMsgSize, nullptr).failureReturnValue(ERROR_CODE).evaluate();

if (mqCall.has_error())
{
return err(errnoToEnum(mqCall.error().errnum));
}

if (mqCall->errnum == TIMEOUT_ERRNO)
{
return err(errnoToEnum(ETIMEDOUT));
}

return receiveVerification<Type, Terminator>(msg, static_cast<uint64_t>(mqCall->value));
}

template <uint64_t N>
expected<void, PosixIpcChannelError> MessageQueue::send(const iox::string<N>& buf) const noexcept
{
return sendImpl<char, Termination::NULL_TERMINATOR>(buf.c_str(), buf.size());
}

template <uint64_t N>
expected<void, PosixIpcChannelError> MessageQueue::timedSend(const iox::string<N>& buf,
const units::Duration& timeout) const noexcept
{
return timedSendImpl<char, Termination::NULL_TERMINATOR>(buf.c_str(), buf.size(), timeout);
}

template <uint64_t N>
expected<void, PosixIpcChannelError> MessageQueue::receive(iox::string<N>& buf) const noexcept
{
static_assert(N <= MAX_MESSAGE_SIZE, "Size exceeds transmission limit!");

auto result = expected<uint64_t, PosixIpcChannelError>(in_place, uint64_t(0));
buf.unsafe_raw_access([&](auto* str, const auto info) -> uint64_t {
result = receiveImpl<char, Termination::NULL_TERMINATOR>(str, info.total_size);
if (result.has_error())
{
return 0;
}
return result.value();
});
if (result.has_error())
{
return err(result.error());
}
return ok();
}

template <uint64_t N>
expected<void, PosixIpcChannelError> MessageQueue::timedReceive(iox::string<N>& buf,
const units::Duration& timeout) const noexcept
{
static_assert(N <= MAX_MESSAGE_SIZE, "Size exceeds transmission limit!");

auto result = expected<uint64_t, PosixIpcChannelError>(in_place, uint64_t(0));
buf.unsafe_raw_access([&](auto* str, const auto info) -> uint64_t {
result = timedReceiveImpl<char, Termination::NULL_TERMINATOR>(str, info.total_size, timeout);
if (result.has_error())
{
return 0;
}
return result.value();
});
if (result.has_error())
{
return err(result.error());
}
return ok();
}

template <typename Type, MessageQueue::Termination Terminator>
expected<uint64_t, PosixIpcChannelError> MessageQueue::receiveVerification(not_null<Type*> msg,
uint64_t msgLenght) const noexcept
{
if constexpr (Terminator == Termination::NULL_TERMINATOR)
{
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
if (msg[msgLenght - NULL_TERMINATOR_SIZE] != 0)
{
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
msg[0] = 0;
return err(PosixIpcChannelError::INTERNAL_LOGIC_ERROR);
}
return ok<uint64_t>(msgLenght - NULL_TERMINATOR_SIZE);
}

return ok<uint64_t>(msgLenght);
}
} // namespace iox

#endif
56 changes: 56 additions & 0 deletions iceoryx_hoofs/posix/ipc/include/iox/message_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "iox/builder.hpp"
#include "iox/duration.hpp"
#include "iox/expected.hpp"
#include "iox/not_null.hpp"
#include "iox/optional.hpp"
#include "iox/posix_ipc_channel.hpp"

Expand Down Expand Up @@ -58,6 +59,7 @@ class MessageQueue
static constexpr uint64_t MAX_NUMBER_OF_MESSAGES = 10;

using Builder_t = MessageQueueBuilder;
using Message_t = iox::string<MAX_MESSAGE_SIZE>;

MessageQueue() noexcept = delete;
MessageQueue(const MessageQueue& other) = delete;
Expand Down Expand Up @@ -88,6 +90,38 @@ class MessageQueue
expected<void, PosixIpcChannelError> timedSend(const std::string& msg,
const units::Duration& timeout) const noexcept;

/// @brief send a message using iox::string
/// @tparam N capacity of the iox::string
/// @param[in] buf data to send
/// @return PosixIpcChannelError if error occured
template <uint64_t N>
expected<void, PosixIpcChannelError> send(const iox::string<N>& buf) const noexcept;

/// @brief try to send a message for a given timeout duration using iox::string
/// @tparam N capacity of the iox::string
/// @param[in] buf data to send
/// @param[in] timeout for the send operation
/// @return PosixIpcChannelError if error occured
template <uint64_t N>
expected<void, PosixIpcChannelError> timedSend(const iox::string<N>& buf,
const units::Duration& timeout) const noexcept;

/// @brief receive message using iox::string
/// @tparam N capacity of the iox::string
/// @param[in] buf data to receive
/// @return PosixIpcChannelError if error occured
template <uint64_t N>
expected<void, PosixIpcChannelError> receive(iox::string<N>& buf) const noexcept;

/// @brief try to receive message for a given timeout duration using iox::string
/// @tparam N capacity of the iox::string
/// @param[in] buf data to receive
/// @param[in] timeout for the send operation
/// @return PosixIpcChannelError if error occured
template <uint64_t N>
expected<void, PosixIpcChannelError> timedReceive(iox::string<N>& buf,
const units::Duration& timeout) const noexcept;

static expected<bool, PosixIpcChannelError> isOutdated() noexcept;

private:
Expand All @@ -109,6 +143,26 @@ class MessageQueue
sanitizeIpcChannelName(const PosixIpcChannelName_t& name) noexcept;
expected<void, PosixIpcChannelError> destroy() noexcept;

enum class Termination
{
NONE,
NULL_TERMINATOR
};

template <typename Type, Termination Terminator>
expected<void, PosixIpcChannelError>
timedSendImpl(not_null<const Type*> msg, uint64_t msgSize, const units::Duration& timeout) const noexcept;
template <typename Type, Termination Terminator>
expected<uint64_t, PosixIpcChannelError>
timedReceiveImpl(not_null<Type*> msg, uint64_t maxMsgSize, const units::Duration& timeout) const noexcept;
template <typename Type, Termination Terminator>
expected<void, PosixIpcChannelError> sendImpl(not_null<const Type*> msg, uint64_t msgSize) const noexcept;
template <typename Type, Termination Terminator>
expected<uint64_t, PosixIpcChannelError> receiveImpl(not_null<Type*> msg, uint64_t maxMsgSize) const noexcept;
template <typename Type, Termination Terminator>
expected<uint64_t, PosixIpcChannelError> receiveVerification(not_null<Type*> msg,
uint64_t msgLenght) const noexcept;

private:
PosixIpcChannelName_t m_name;
mq_attr m_attributes{};
Expand Down Expand Up @@ -149,4 +203,6 @@ class MessageQueueBuilder

} // namespace iox

#include "detail/message_queue.inl"

#endif // IOX_HOOFS_POSIX_IPC_MESSAGE_QUEUE_HPP
Loading

0 comments on commit de85d41

Please sign in to comment.