Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iox-#14 replace ChunkInfo with ChunkHeader members #438

Merged
merged 8 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ inline void Iceoryx2DDSGateway<channel_t, gateway_t>::forward(const channel_t& c
{
subscriber->take().and_then([&channel](popo::Sample<const void>& sample) {
auto dataWriter = channel.getExternalTerminal();
dataWriter->write(static_cast<const uint8_t*>(sample.get()), sample.getHeader()->m_info.m_payloadSize);
dataWriter->write(static_cast<const uint8_t*>(sample.get()), sample.getHeader()->m_payloadSize);
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion iceoryx_dds/test/mocks/chunk_mock_dds.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ChunkMockDDS
memset(m_rawMemory, 0xFF, Size);

m_chunkHeader = new (m_rawMemory) iox::mepoo::ChunkHeader();
m_chunkHeader->m_info.m_payloadSize = sizeof(T);
m_chunkHeader->m_payloadSize = sizeof(T);

// Set the value
auto payloadPtr = reinterpret_cast<T*>(m_rawMemory + sizeof(iox::mepoo::ChunkHeader));
Expand Down
2 changes: 1 addition & 1 deletion iceoryx_dds/test/moduletests/test_iox_to_dds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ TEST_F(Iceoryx2DDSGatewayTest, ForwardsChunkFromSubscriberToDataWriter)
// Verify expected write to the data writer
auto mockWriter = createMockDDSTerminal(testService);
EXPECT_CALL(*mockWriter,
write(SafeMatcherCast<uint8_t*>(Pointee(Eq(42))), mockChunk.chunkHeader()->m_info.m_payloadSize))
write(SafeMatcherCast<uint8_t*>(Pointee(Eq(42))), mockChunk.chunkHeader()->m_payloadSize))
.Times(1);
stageMockDDSTerminal(std::move(mockWriter));

Expand Down
2 changes: 1 addition & 1 deletion iceoryx_examples/waitset/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ void subscriberCallback(iox::popo::UntypedSubscriber* const subscriber)
{
subscriber->take().and_then([&](iox::popo::Sample<const void>& sample) {
std::cout << "subscriber: " << std::hex << subscriber << " length: " << std::dec
<< sample.getHeader()->m_info.m_payloadSize << " ptr: " << std::hex << sample.getHeader()->payload()
<< sample.getHeader()->m_payloadSize << " ptr: " << std::hex << sample.getHeader()->payload()
<< std::endl;
});
}
Expand Down
2 changes: 1 addition & 1 deletion iceoryx_examples/waitset/ice_waitset_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void subscriberCallback(iox::popo::UntypedSubscriber* const subscriber)
{
subscriber->take().and_then([&](iox::popo::Sample<const void>& sample) {
std::cout << "subscriber: " << std::hex << subscriber << " length: " << std::dec
<< sample.getHeader()->m_info.m_payloadSize << " ptr: " << std::hex << sample.getHeader()->payload()
<< sample.getHeader()->m_payloadSize << " ptr: " << std::hex << sample.getHeader()->payload()
<< std::endl;
});
}
Expand Down
12 changes: 12 additions & 0 deletions iceoryx_posh/include/iceoryx_posh/iceoryx_posh_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,18 @@ using ConfigFilePathString_t = cxx::string<1024>;
using ProcessName_t = cxx::string<MAX_PROCESS_NAME_LENGTH>;
using NodeName_t = cxx::string<100>;

namespace mepoo
{
using SequenceNumber_t = std::uint64_t;
using BaseClock_t = std::chrono::steady_clock;

// use signed integer for duration;
// there is a bug in gcc 4.8 which leads to a wrong calcutated time
// when sleep_until() is used with a timepoint in the past
using DurationNs_t = std::chrono::duration<std::int64_t, std::nano>;
using TimePointNs_t = std::chrono::time_point<BaseClock_t, DurationNs_t>;
Comment on lines +202 to +208
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these could probably also fit into the plain iox namespace. It's a bit weird to have a DurationNs_t which is not based on cxx::Duration though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove them completely and should always use units::duration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, no chrono based timepoints?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the issue in cxx::Duration with the rounding error resolved? For example when you use 1 ns then its represented as zero in the duration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not, it's still using a double. @elfenpiff will receive the honour I suppose :P

}

namespace runtime
{
// alias for IdString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ inline cxx::expected<ChunkManagement*, TypedMemPoolError> TypedMemPool<T>::acqui
}

new (chunkHeader) ChunkHeader();
chunkHeader->m_info.m_payloadSize = sizeof(T);
chunkHeader->m_info.m_usedSizeOfChunk = MemoryManager::sizeWithChunkHeaderStruct(sizeof(T));
chunkHeader->m_payloadSize = sizeof(T);

new (chunkManagement) ChunkManagement(chunkHeader, &m_memPool, &m_chunkManagementPool);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@ ChunkSender<ChunkSenderDataType>::tryAllocate(const uint32_t payloadSize, const
const uint32_t neededChunkSize = getMembers()->m_memoryMgr->sizeWithChunkHeaderStruct(payloadSize);
budrus marked this conversation as resolved.
Show resolved Hide resolved

if (getMembers()->m_lastChunk && getMembers()->m_lastChunk.hasNoOtherOwners()
&& (getMembers()->m_lastChunk.getChunkHeader()->m_info.m_totalSizeOfChunk >= neededChunkSize))
&& (getMembers()->m_lastChunk.getChunkHeader()->m_chunkSize >= neededChunkSize))
{
if (getMembers()->m_chunksInUse.insert(getMembers()->m_lastChunk))
{
getMembers()->m_lastChunk.getChunkHeader()->m_info.m_payloadSize = payloadSize;
getMembers()->m_lastChunk.getChunkHeader()->m_info.m_usedSizeOfChunk = neededChunkSize;
getMembers()->m_lastChunk.getChunkHeader()->m_payloadSize = payloadSize;
return cxx::success<mepoo::ChunkHeader*>(getMembers()->m_lastChunk.getChunkHeader());
}
else
Expand Down Expand Up @@ -151,19 +150,7 @@ inline bool ChunkSender<ChunkSenderDataType>::getChunkReadyForSend(const mepoo::
{
if (getMembers()->m_chunksInUse.remove(chunkHeader, chunk))
{
auto& chunkInfo = chunk.getChunkHeader()->m_info;
if (!chunkInfo.m_externalSequenceNumber_bl)
{
// if the sequence number is NOT set by the user, we take the one from the chunk sender for the chunk info
chunkInfo.m_sequenceNumber = getMembers()->m_sequenceNumber;
getMembers()->m_sequenceNumber++;
}
else
{
// if the seqence number in the chunk info is set by the user, we still increment the internal one as this
// might be needed by midddleware spcific evaluation (like in introspection)
getMembers()->m_sequenceNumber++;
}
chunk.getChunkHeader()->m_sequenceNumber = getMembers()->m_sequenceNumber++;
return true;
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct ChunkSenderData : public ChunkDistributorDataType
const relative_ptr<mepoo::MemoryManager> m_memoryMgr;
mepoo::MemoryInfo m_memoryInfo;
UsedChunkList<MaxChunksAllocatedSimultaneously> m_chunksInUse;
mepoo::SequenceNumberType m_sequenceNumber{0u};
mepoo::SequenceNumber_t m_sequenceNumber{0U};
mepoo::SharedChunk m_lastChunk{nullptr};
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ class PortIntrospection
capro::ServiceDescription service;
NodeName_t node;

using TimePointNs = mepoo::TimePointNs;
using DurationNs = mepoo::DurationNs;
TimePointNs m_sequenceNumberTimestamp = TimePointNs(DurationNs(0));
mepoo::SequenceNumberType m_sequenceNumber{0};
using TimePointNs_t = mepoo::TimePointNs_t;
using DurationNs_t = mepoo::DurationNs_t;
TimePointNs_t m_sequenceNumberTimestamp {DurationNs_t(0)};
mepoo::SequenceNumber_t m_sequenceNumber{0U};

// map from indices to object pointers
std::map<int, ConnectionInfo*> connectionMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ class RouDiProcess
/// @return the session ID for this process
uint64_t getSessionId() noexcept;

void setTimestamp(const mepoo::TimePointNs timestamp) noexcept;
void setTimestamp(const mepoo::TimePointNs_t timestamp) noexcept;

mepoo::TimePointNs getTimestamp() noexcept;
mepoo::TimePointNs_t getTimestamp() noexcept;

mepoo::MemoryManager* getPayloadMemoryManager() const noexcept;
uint64_t getPayloadSegmentId() const noexcept;
Expand All @@ -79,7 +79,7 @@ class RouDiProcess
private:
int m_pid;
runtime::MqInterfaceUser m_mq;
mepoo::TimePointNs m_timestamp;
mepoo::TimePointNs_t m_timestamp;
mepoo::MemoryManager* m_payloadMemoryManager{nullptr};
bool m_isMonitored{true};
uint64_t m_payloadSegmentId;
Expand Down
8 changes: 4 additions & 4 deletions iceoryx_posh/include/iceoryx_posh/mepoo/chunk_header.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "iceoryx_posh/iceoryx_posh_types.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/typed_unique_id.hpp"
#include "iceoryx_posh/mepoo/chunk_info.hpp"

#include <cstdint>

Expand All @@ -36,9 +35,6 @@ struct alignas(32) ChunkHeader
/// - semantic meaning of a member changes
static constexpr uint8_t CHUNK_HEADER_VERSION{1U};

/// @deprecated iox-#14
ChunkInfo m_info;

/// @brief The size of the whole chunk, including the header
uint32_t m_chunkSize{0U};

Expand Down Expand Up @@ -75,6 +71,10 @@ struct alignas(32) ChunkHeader
/// @param[in] payload is the pointer to the payload of the chunk
/// @return the pointer to the `ChunkHeader` or a `nullptr` if `payload` is a `nullptr`
static ChunkHeader* fromPayload(const void* const payload) noexcept;

/// @brief Calculates the used size of the chunk with the ChunkHeader, custom heander and payload
/// @return the used size of the chunk
uint32_t usedSizeOfChunk();
};

} // namespace mepoo
Expand Down
55 changes: 0 additions & 55 deletions iceoryx_posh/include/iceoryx_posh/mepoo/chunk_info.hpp

This file was deleted.

11 changes: 10 additions & 1 deletion iceoryx_posh/source/mepoo/chunk_header.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

#include "iceoryx_posh/mepoo/chunk_header.hpp"
#include "iceoryx_posh/internal/mepoo/mem_pool.hpp"
#include "iceoryx_utils/cxx/helplets.hpp"

namespace iox
{
Expand Down Expand Up @@ -43,5 +43,14 @@ ChunkHeader* ChunkHeader::fromPayload(const void* const payload) noexcept
return reinterpret_cast<ChunkHeader*>(payloadAddress - *payloadOffset);
}

uint32_t ChunkHeader::usedSizeOfChunk()
{
auto usedSizeOfChunk = static_cast<uint64_t>(m_payloadOffset) + static_cast<uint64_t>(m_payloadSize);

cxx::Expects(usedSizeOfChunk <= m_chunkSize);
elBoberido marked this conversation as resolved.
Show resolved Hide resolved

return static_cast<uint32_t>(usedSizeOfChunk);
}

} // namespace mepoo
} // namespace iox
12 changes: 5 additions & 7 deletions iceoryx_posh/source/mepoo/memory_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,11 @@ SharedChunk MemoryManager::getChunk(const MaxSize_t f_size)
}
else
{
new (chunk) ChunkHeader();
static_cast<ChunkHeader*>(chunk)->m_info.m_payloadSize = f_size;
static_cast<ChunkHeader*>(chunk)->m_info.m_usedSizeOfChunk = adjustedSize;
static_cast<ChunkHeader*>(chunk)->m_info.m_totalSizeOfChunk = totalSizeOfAquiredChunk;
ChunkManagement* chunkManagement = static_cast<ChunkManagement*>(m_chunkManagementPool.front().getChunk());
new (chunkManagement)
ChunkManagement(static_cast<ChunkHeader*>(chunk), memPoolPointer, &m_chunkManagementPool.front());
auto chunkHeader = new (chunk) ChunkHeader();
chunkHeader->m_chunkSize = totalSizeOfAquiredChunk;
chunkHeader->m_payloadSize = f_size;
auto chunkManagement = new (m_chunkManagementPool.front().getChunk())
ChunkManagement(chunkHeader, memPoolPointer, &m_chunkManagementPool.front());
return SharedChunk(chunkManagement);
}
}
Expand Down
12 changes: 6 additions & 6 deletions iceoryx_posh/source/roudi/roudi_process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ RouDiProcess::RouDiProcess(const ProcessName_t& name,
const uint64_t sessionId) noexcept
: m_pid(pid)
, m_mq(name)
, m_timestamp(mepoo::BaseClock::now())
, m_timestamp(mepoo::BaseClock_t::now())
, m_payloadMemoryManager(payloadMemoryManager)
, m_isMonitored(isMonitored)
, m_payloadSegmentId(payloadSegmentId)
Expand Down Expand Up @@ -70,12 +70,12 @@ uint64_t RouDiProcess::getSessionId() noexcept
return m_sessionId.load(std::memory_order_relaxed);
}

void RouDiProcess::setTimestamp(const mepoo::TimePointNs timestamp) noexcept
void RouDiProcess::setTimestamp(const mepoo::TimePointNs_t timestamp) noexcept
{
m_timestamp = timestamp;
}

mepoo::TimePointNs RouDiProcess::getTimestamp() noexcept
mepoo::TimePointNs_t RouDiProcess::getTimestamp() noexcept
{
return m_timestamp;
}
Expand Down Expand Up @@ -401,7 +401,7 @@ bool ProcessManager::addProcess(const ProcessName_t& name,
m_processList.back().sendToMQ(sendBuffer);

// set current timestamp again (already done in RouDiProcess's constructor
m_processList.back().setTimestamp(mepoo::BaseClock::now());
m_processList.back().setTimestamp(mepoo::BaseClock_t::now());

m_processIntrospection->addProcess(pid, ProcessName_t(cxx::TruncateToCapacity, name.c_str()));

Expand Down Expand Up @@ -453,7 +453,7 @@ void ProcessManager::updateLivelinessOfProcess(const ProcessName_t& name) noexce
if (nullptr != process)
{
// reset timestamp
process->setTimestamp(mepoo::BaseClock::now());
process->setTimestamp(mepoo::BaseClock_t::now());
}
else
{
Expand Down Expand Up @@ -768,7 +768,7 @@ void ProcessManager::monitorProcesses() noexcept
{
std::lock_guard<std::mutex> g(m_mutex);

auto currentTimestamp = mepoo::BaseClock::now();
auto currentTimestamp = mepoo::BaseClock_t::now();

auto processIterator = m_processList.begin();
while (processIterator != m_processList.end())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ struct ChunkDistributorConfig

struct ChunkQueueConfig
{
static constexpr uint64_t MAX_QUEUE_CAPACITY = NUM_CHUNKS_IN_POOL;
static constexpr uint64_t MAX_QUEUE_CAPACITY = iox::MAX_SUBSCRIBER_QUEUE_CAPACITY;
budrus marked this conversation as resolved.
Show resolved Hide resolved
};

using ChunkQueueData_t = ChunkQueueData<ChunkQueueConfig, ThreadSafePolicy>;
Expand Down
7 changes: 2 additions & 5 deletions iceoryx_posh/test/integrationtests/test_posh_mepoo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@ using iox::mepoo::MePooConfig;
using iox::roudi::RouDiEnvironment;
using ::testing::Return;


using TimePointNs = iox::mepoo::TimePointNs;
using BaseClock = iox::mepoo::BaseClock;
using Timer = iox::posix::Timer;
using iox::posix::Timer;

class Mepoo_IntegrationTest : public Test
{
Expand Down Expand Up @@ -300,7 +297,7 @@ class Mepoo_IntegrationTest : public Test
{
publisherPort->tryAllocateChunk(topicSize).and_then([&](auto sample) {
new (sample->payload()) Topic;
sample->m_info.m_payloadSize = topicSize;
sample->m_payloadSize = topicSize;
publisherPort->sendChunk(sample);
m_roudiEnv->InterOpWait();
});
Expand Down
Loading