diff --git a/.gitignore b/.gitignore index 87d425ce..6cba691b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,9 @@ #Folders +*.vscode .idea build cmake-build-debug cmake-build-release thirdparty/Micro-CDR/CMakeFiles include/rtps/config.h -CMakeFiles \ No newline at end of file +CMakeFiles diff --git a/include/rtps/ThreadPool.h b/include/rtps/ThreadPool.h index aafc59fa..9ef5ed76 100644 --- a/include/rtps/ThreadPool.h +++ b/include/rtps/ThreadPool.h @@ -56,6 +56,8 @@ class ThreadPool { static void readCallback(void *arg, udp_pcb *pcb, pbuf *p, const ip_addr_t *addr, Ip4Port_t port); + bool addBuiltinPort(const Ip4Port_t &port); + private: receiveJumppad_fp m_receiveJumppad; void *m_callee; @@ -63,15 +65,30 @@ class ThreadPool { std::array m_writers; std::array m_readers; + std::array m_builtinPorts; + size_t m_builtinPortsIdx = 0; + sys_sem_t m_readerNotificationSem; sys_sem_t m_writerNotificationSem; - ThreadSafeCircularBuffer - m_queueOutgoing; - ThreadSafeCircularBuffer - m_queueIncoming; + void updateDiagnostics(); + + using BufferUsertrafficOutgoing = ThreadSafeCircularBuffer< + Writer *, Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH_USERTRAFFIC>; + using BufferMetatrafficOutgoing = ThreadSafeCircularBuffer< + Writer *, Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH_METATRAFFIC>; + using BufferUsertrafficIncoming = ThreadSafeCircularBuffer< + PacketInfo, Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH_USERTRAFFIC>; + using BufferMetatrafficIncoming = ThreadSafeCircularBuffer< + PacketInfo, Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH_METATRAFFIC>; + + BufferUsertrafficOutgoing m_outgoingUserTraffic; + BufferMetatrafficOutgoing m_outgoingMetaTraffic; + + BufferUsertrafficIncoming m_incomingUserTraffic; + BufferMetatrafficIncoming m_incomingMetaTraffic; + bool isBuiltinPort(const Ip4Port_t &port); static void writerThreadFunction(void *arg); static void readerThreadFunction(void *arg); void doWriterWork(); diff --git a/include/rtps/common/types.h b/include/rtps/common/types.h index 2baf9f4e..35123c0d 100644 --- a/include/rtps/common/types.h +++ b/include/rtps/common/types.h @@ -30,6 +30,7 @@ Author: i11 - Embedded Software, RWTH Aachen University #include #include #include +#include // TODO subnamespaces namespace rtps { @@ -174,6 +175,17 @@ struct SequenceNumber_t { return *this; } + SequenceNumber_t &operator--() { + if (low == 0) { + --high; + low = std::numeric_limits::max(); + } else { + --low; + } + + return *this; + } + SequenceNumber_t operator++(int) { SequenceNumber_t tmp(*this); ++*this; @@ -181,7 +193,7 @@ struct SequenceNumber_t { } }; -#define SNS_MAX_NUM_BITS 32 +#define SNS_MAX_NUM_BITS 256 #define SNS_NUM_BYTES (SNS_MAX_NUM_BITS / 8) static_assert(!(SNS_MAX_NUM_BITS % 32) && SNS_MAX_NUM_BITS != 0, "SNS_MAX_NUM_BITS must be multiple of 32"); diff --git a/include/rtps/communication/PacketInfo.h b/include/rtps/communication/PacketInfo.h index 09cc50a0..240158a3 100644 --- a/include/rtps/communication/PacketInfo.h +++ b/include/rtps/communication/PacketInfo.h @@ -22,6 +22,9 @@ This file is part of embeddedRTPS. Author: i11 - Embedded Software, RWTH Aachen University */ +// Copyright 2023 Apex.AI, Inc. +// All rights reserved. + #ifndef RTPS_PACKETINFO_H #define RTPS_PACKETINFO_H @@ -45,11 +48,7 @@ struct PacketInfo { PacketInfo() = default; ~PacketInfo() = default; - PacketInfo &operator=(const PacketInfo &other) { - copyTriviallyCopyable(other); - this->buffer = other.buffer; - return *this; - } + PacketInfo &operator=(const PacketInfo &other) = delete; PacketInfo &operator=(PacketInfo &&other) noexcept { copyTriviallyCopyable(other); diff --git a/include/rtps/config_r5.h b/include/rtps/config_r5.h index feb90cbf..78f974c9 100644 --- a/include/rtps/config_r5.h +++ b/include/rtps/config_r5.h @@ -10,51 +10,51 @@ namespace rtps { namespace Config { const VendorId_t VENDOR_ID = {13, 37}; const std::array IP_ADDRESS = { - 137, 226, 8, 70}; // Needs to be set in lwipcfg.h too. -const GuidPrefix_t BASE_GUID_PREFIX{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12}; + 192, 168, 127, 9}; // Needs to be set in lwipcfg.h too. +const GuidPrefix_t BASE_GUID_PREFIX = GUID_RANDOM; -const uint8_t DOMAIN_ID = 0; // 230 possible with UDP +const uint8_t DOMAIN_ID = 0; // 230 possible with UDP const uint8_t NUM_STATELESS_WRITERS = 64; const uint8_t NUM_STATELESS_READERS = 64; const uint8_t NUM_STATEFUL_READERS = 4; const uint8_t NUM_STATEFUL_WRITERS = 4; const uint8_t MAX_NUM_PARTICIPANTS = 1; const uint8_t NUM_WRITERS_PER_PARTICIPANT = - 64; // 3 will be reserved for SPDP & SEDP + 64; // 3 will be reserved for SPDP & SEDP const uint8_t NUM_READERS_PER_PARTICIPANT = - 64; // 3 will be reserved for SPDP & SEDP -const uint8_t NUM_WRITER_PROXIES_PER_READER = 60; -const uint8_t NUM_READER_PROXIES_PER_WRITER = 60; + 64; // 3 will be reserved for SPDP & SEDP +const uint8_t NUM_WRITER_PROXIES_PER_READER = 100; +const uint8_t NUM_READER_PROXIES_PER_WRITER = 100; -const uint8_t MAX_NUM_UNMATCHED_REMOTE_WRITERS = 150; -const uint8_t MAX_NUM_UNMATCHED_REMOTE_READERS = 150; +const uint32_t MAX_NUM_UNMATCHED_REMOTE_WRITERS = 400; +const uint32_t MAX_NUM_UNMATCHED_REMOTE_READERS = 400; const uint8_t MAX_NUM_READER_CALLBACKS = 5; const uint8_t HISTORY_SIZE_STATELESS = 64; -const uint8_t HISTORY_SIZE_STATEFUL = 64; +const uint8_t HISTORY_SIZE_STATEFUL = 100; const uint8_t MAX_TYPENAME_LENGTH = 64; const uint8_t MAX_TOPICNAME_LENGTH = 64; -const int HEARTBEAT_STACKSIZE = 1200; // byte -const int THREAD_POOL_WRITER_STACKSIZE = 1100; // byte -const int THREAD_POOL_READER_STACKSIZE = 3600; // byte -const uint16_t SPDP_WRITER_STACKSIZE = 550; // byte +const int HEARTBEAT_STACKSIZE = 1200; // byte +const int THREAD_POOL_WRITER_STACKSIZE = 10000; // byte +const int THREAD_POOL_READER_STACKSIZE = 32000; // byte +const uint16_t SPDP_WRITER_STACKSIZE = 550; // byte const uint16_t SF_WRITER_HB_PERIOD_MS = 4000; const uint16_t SPDP_RESEND_PERIOD_MS = 1000; const uint8_t SPDP_WRITER_PRIO = 3; const uint8_t SPDP_CYCLECOUNT_HEARTBEAT = - 2; // Every X*SPDP_RESEND_PERIOD_MS, check for missing heartbeats + 2; // Every X*SPDP_RESEND_PERIOD_MS, check for missing heartbeats const uint8_t SPDP_MAX_NUMBER_FOUND_PARTICIPANTS = 100; const uint8_t SPDP_MAX_NUM_LOCATORS = 1; const Duration_t SPDP_DEFAULT_REMOTE_LEASE_DURATION = { - 5, 0}; // Default lease duration for remote participants, usually - // overwritten by remote info + 5, 0}; // Default lease duration for remote participants, usually + // overwritten by remote info const Duration_t SPDP_MAX_REMOTE_LEASE_DURATION = { 180, - 0}; // Absolute maximum lease duration, ignoring remote participant info + 0}; // Absolute maximum lease duration, ignoring remote participant info const int MAX_NUM_UDP_CONNECTIONS = 10; @@ -64,12 +64,15 @@ const int THREAD_POOL_WRITER_PRIO = 3; const int THREAD_POOL_READER_PRIO = 3; const int THREAD_POOL_WORKLOAD_QUEUE_LENGTH = 10; +const int THREAD_POOL_WORKLOAD_QUEUE_LENGTH_USERTRAFFIC = 30; +const int THREAD_POOL_WORKLOAD_QUEUE_LENGTH_METATRAFFIC = 30; + constexpr int OVERALL_HEAP_SIZE = THREAD_POOL_NUM_WRITERS * THREAD_POOL_WRITER_STACKSIZE + THREAD_POOL_NUM_READERS * THREAD_POOL_READER_STACKSIZE + MAX_NUM_PARTICIPANTS * SPDP_WRITER_STACKSIZE + NUM_STATEFUL_WRITERS * HEARTBEAT_STACKSIZE; -} // namespace Config -} // namespace rtps +} // namespace Config +} // namespace rtps -#endif // RTPS_CONFIG_H +#endif // RTPS_CONFIG_H diff --git a/include/rtps/config_stm.h b/include/rtps/config_stm.h index 85e4c92d..b5b65137 100644 --- a/include/rtps/config_stm.h +++ b/include/rtps/config_stm.h @@ -37,46 +37,61 @@ namespace rtps { namespace Config { const VendorId_t VENDOR_ID = {13, 37}; const std::array IP_ADDRESS = { - 192, 168, 0, 66}; // Needs to be set in lwipcfg.h too. -const GuidPrefix_t BASE_GUID_PREFIX{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12}; + 192, 168, 1, 103}; // Needs to be set in lwipcfg.h too. +const GuidPrefix_t BASE_GUID_PREFIX{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 13}; const uint8_t DOMAIN_ID = 0; // 230 possible with UDP -const uint8_t NUM_STATELESS_WRITERS = 2; -const uint8_t NUM_STATELESS_READERS = 2; +const uint8_t NUM_STATELESS_WRITERS = 5; +const uint8_t NUM_STATELESS_READERS = 5; const uint8_t NUM_STATEFUL_READERS = 2; const uint8_t NUM_STATEFUL_WRITERS = 2; const uint8_t MAX_NUM_PARTICIPANTS = 1; -const uint8_t NUM_WRITERS_PER_PARTICIPANT = 4; -const uint8_t NUM_READERS_PER_PARTICIPANT = 4; -const uint8_t NUM_WRITER_PROXIES_PER_READER = 3; -const uint8_t NUM_READER_PROXIES_PER_WRITER = 3; +const uint8_t NUM_WRITERS_PER_PARTICIPANT = 10; +const uint8_t NUM_READERS_PER_PARTICIPANT = 10; +const uint8_t NUM_WRITER_PROXIES_PER_READER = 6; +const uint8_t NUM_READER_PROXIES_PER_WRITER = 6; -const uint8_t HISTORY_SIZE = 10; +const uint8_t MAX_NUM_UNMATCHED_REMOTE_WRITERS = 50; +const uint8_t MAX_NUM_UNMATCHED_REMOTE_READERS = 50; + +const uint8_t MAX_NUM_READER_CALLBACKS = 5; -const uint8_t MAX_TYPENAME_LENGTH = 20; -const uint8_t MAX_TOPICNAME_LENGTH = 20; + +const uint8_t HISTORY_SIZE_STATELESS = 2; +const uint8_t HISTORY_SIZE_STATEFUL = 10; + +const uint8_t MAX_TYPENAME_LENGTH = 64; +const uint8_t MAX_TOPICNAME_LENGTH = 64; const int HEARTBEAT_STACKSIZE = 1200; // byte const int THREAD_POOL_WRITER_STACKSIZE = 1100; // byte -const int THREAD_POOL_READER_STACKSIZE = 1600; // byte -const uint16_t SPDP_WRITER_STACKSIZE = 550; // byte +const int THREAD_POOL_READER_STACKSIZE = 3000; // byte +const uint16_t SPDP_WRITER_STACKSIZE = 1000; // byte const uint16_t SF_WRITER_HB_PERIOD_MS = 4000; -const uint16_t SPDP_RESEND_PERIOD_MS = 10000; +const uint16_t SPDP_RESEND_PERIOD_MS = 1000; const uint8_t SPDP_CYCLECOUNT_HEARTBEAT = 2; // skip x SPDP rounds before checking liveliness -const uint8_t SPDP_WRITER_PRIO = 3; -const uint8_t SPDP_MAX_NUMBER_FOUND_PARTICIPANTS = 5; -const uint8_t SPDP_MAX_NUM_LOCATORS = 5; -const Duration_t SPDP_LEASE_DURATION = {100, 0}; +const uint8_t SPDP_WRITER_PRIO = 24; +const uint8_t SPDP_MAX_NUMBER_FOUND_PARTICIPANTS = 10; +const uint8_t SPDP_MAX_NUM_LOCATORS = 1; +const Duration_t SPDP_DEFAULT_REMOTE_LEASE_DURATION = { + 5, 0}; // Default lease duration for remote participants, usually + // overwritten by remote info +const Duration_t SPDP_MAX_REMOTE_LEASE_DURATION = { + 180, + 0}; // Absolute maximum lease duration, ignoring remote participant info + +const Duration_t SPDP_LEASE_DURATION = {5, 0}; const int MAX_NUM_UDP_CONNECTIONS = 10; const int THREAD_POOL_NUM_WRITERS = 1; const int THREAD_POOL_NUM_READERS = 1; -const int THREAD_POOL_WRITER_PRIO = 3; -const int THREAD_POOL_READER_PRIO = 3; -const int THREAD_POOL_WORKLOAD_QUEUE_LENGTH = 10; +const int THREAD_POOL_WRITER_PRIO = 24; +const int THREAD_POOL_READER_PRIO = 24; +const int THREAD_POOL_WORKLOAD_QUEUE_LENGTH_USERTRAFFIC = 60; +const int THREAD_POOL_WORKLOAD_QUEUE_LENGTH_METATRAFFIC = 60; constexpr int OVERALL_HEAP_SIZE = THREAD_POOL_NUM_WRITERS * THREAD_POOL_WRITER_STACKSIZE + diff --git a/include/rtps/discovery/SEDPAgent.h b/include/rtps/discovery/SEDPAgent.h index c351358b..4b6928c9 100644 --- a/include/rtps/discovery/SEDPAgent.h +++ b/include/rtps/discovery/SEDPAgent.h @@ -38,8 +38,8 @@ class Reader; class SEDPAgent { public: void init(Participant &part, const BuiltInEndpoints &endpoints); - void addWriter(Writer &writer); - void addReader(Reader &reader); + bool addWriter(Writer &writer); + bool addReader(Reader &reader); bool deleteReader(Reader *reader); bool deleteWriter(Writer *reader); @@ -54,8 +54,10 @@ class SEDPAgent { uint32_t getNumRemoteUnmatchedWriters(); protected: // For testing purposes - void handlePublisherReaderMessage(const TopicData &writerData); - void handleSubscriptionReaderMessage(const TopicData &writerData); + void handlePublisherReaderMessage(const TopicData &writerData, + const ReaderCacheChange &change); + void handleSubscriptionReaderMessage(const TopicData &writerData, + const ReaderCacheChange &change); private: Participant *m_part; @@ -82,7 +84,8 @@ class SEDPAgent { void addUnmatchedRemoteWriter(const TopicDataCompressed &writerData); void addUnmatchedRemoteReader(const TopicDataCompressed &readerData); - void handleRemoteEndpointDeletion(const TopicData &topic); + void handleRemoteEndpointDeletion(const TopicData &topic, + const ReaderCacheChange &change); void (*mfp_onNewPublisherCallback)(void *arg) = nullptr; void *m_onNewPublisherArgs = nullptr; diff --git a/include/rtps/discovery/SPDPAgent.h b/include/rtps/discovery/SPDPAgent.h index a2960ff7..183fb22e 100644 --- a/include/rtps/discovery/SPDPAgent.h +++ b/include/rtps/discovery/SPDPAgent.h @@ -39,7 +39,7 @@ Author: i11 - Embedded Software, RWTH Aachen University if (true) { \ printf("[SPDP] "); \ printf(__VA_ARGS__); \ - printf("\n"); \ + printf("\r\n"); \ } #else #define SPDP_LOG(...) // @@ -56,6 +56,7 @@ class SPDPAgent { void init(Participant &participant, BuiltInEndpoints &endpoints); void start(); void stop(); + SemaphoreHandle_t m_mutex; private: Participant *mp_participant = nullptr; @@ -67,7 +68,6 @@ class SPDPAgent { ucdrBuffer m_microbuffer{}; uint8_t m_cycleHB = 0; - SemaphoreHandle_t m_mutex; bool initialized = false; static void receiveCallback(void *callee, const ReaderCacheChange &cacheChange); diff --git a/include/rtps/discovery/TopicData.h b/include/rtps/discovery/TopicData.h index 8f3eead5..24c6a2c1 100644 --- a/include/rtps/discovery/TopicData.h +++ b/include/rtps/discovery/TopicData.h @@ -69,8 +69,8 @@ struct TopicData { TopicData(Guid_t guid, ReliabilityKind_t reliability, FullLengthLocator loc) : endpointGuid(guid), typeName{'\0'}, topicName{'\0'}, reliabilityKind(reliability), - durabilityKind(DurabilityKind_t::VOLATILE), unicastLocator(loc) { - } + durabilityKind(DurabilityKind_t::VOLATILE), unicastLocator(loc) {} + bool matchesTopicOf(const TopicData &other); diff --git a/include/rtps/entities/Reader.h b/include/rtps/entities/Reader.h index 5a64695b..30cba503 100644 --- a/include/rtps/entities/Reader.h +++ b/include/rtps/entities/Reader.h @@ -110,9 +110,10 @@ class Reader { using dumpProxyCallback = void (*)(const Reader *reader, const WriterProxy &, void *arg); - //! Dangerous, only int dumpAllProxies(dumpProxyCallback target, void *arg); + virtual bool sendPreemptiveAckNack(const WriterProxy &writer); + protected: void executeCallbacks(const ReaderCacheChange &cacheChange); bool initMutex(); diff --git a/include/rtps/entities/ReaderProxy.h b/include/rtps/entities/ReaderProxy.h index dbdab1bd..f747200d 100644 --- a/include/rtps/entities/ReaderProxy.h +++ b/include/rtps/entities/ReaderProxy.h @@ -31,19 +31,19 @@ Author: i11 - Embedded Software, RWTH Aachen University namespace rtps { struct ReaderProxy { Guid_t remoteReaderGuid; + Count_t ackNackCount = {0}; LocatorIPv4 remoteLocator; bool is_reliable = false; LocatorIPv4 remoteMulticastLocator; bool useMulticast = false; bool suppressUnicast = false; bool unknown_eid = false; - Count_t ackNackCount = {0}; bool finalFlag = false; SequenceNumber_t lastAckNackSequenceNumber = {0, 1}; ReaderProxy() : remoteReaderGuid({GUIDPREFIX_UNKNOWN, ENTITYID_UNKNOWN}), - finalFlag(false){}; + ackNackCount{0}, remoteLocator(LocatorIPv4()), finalFlag(false){}; ReaderProxy(const Guid_t &guid, const LocatorIPv4 &loc, bool reliable) : remoteReaderGuid(guid), remoteLocator(loc), is_reliable(reliable), ackNackCount{0}, finalFlag(false){}; diff --git a/include/rtps/entities/StatefulReader.h b/include/rtps/entities/StatefulReader.h index 04a897e2..55497f94 100644 --- a/include/rtps/entities/StatefulReader.h +++ b/include/rtps/entities/StatefulReader.h @@ -46,6 +46,8 @@ template class StatefulReaderT final : public Reader { bool onNewGapMessage(const SubmessageGap &msg, const GuidPrefix_t &remotePrefix) override; + bool sendPreemptiveAckNack(const WriterProxy &writer) override; + private: Ip4Port_t m_srcPort; // TODO intended for reuse but buffer not used as such NetworkDriver *m_transport; diff --git a/include/rtps/entities/StatefulReader.tpp b/include/rtps/entities/StatefulReader.tpp index f463b2da..6d4e4ab4 100644 --- a/include/rtps/entities/StatefulReader.tpp +++ b/include/rtps/entities/StatefulReader.tpp @@ -26,6 +26,7 @@ Author: i11 - Embedded Software, RWTH Aachen University #include "lwip/tcpip.h" #include "rtps/entities/StatefulReader.h" #include "rtps/messages/MessageFactory.h" +#include "rtps/utils/Diagnostics.h" #include "rtps/utils/Lock.h" #include "rtps/utils/Log.h" @@ -35,7 +36,7 @@ Author: i11 - Embedded Software, RWTH Aachen University if (true) { \ printf("[StatefulReader %s] ", &m_attributes.topicName[0]); \ printf(__VA_ARGS__); \ - printf("\n"); \ + printf("\r\n"); \ } #else #define SFR_LOG(...) // @@ -71,9 +72,27 @@ void StatefulReaderT::newChange( for (auto &proxy : m_proxies) { if (proxy.remoteWriterGuid == cacheChange.writerGuid) { if (proxy.expectedSN == cacheChange.sn) { + SFR_LOG("Delivering SN %u.%u | ! GUID %u %u %u %u \r\n", + (int)cacheChange.sn.high, (int)cacheChange.sn.low, + cacheChange.writerGuid.prefix.id[0], + cacheChange.writerGuid.prefix.id[1], + cacheChange.writerGuid.prefix.id[2], + cacheChange.writerGuid.prefix.id[3]); executeCallbacks(cacheChange); ++proxy.expectedSN; + SFR_LOG("Done processing SN %u.%u\r\n", (int)cacheChange.sn.high, + (int)cacheChange.sn.low); return; + } else { + Diagnostics::StatefulReader::sfr_unexpected_sn++; + SFR_LOG( + "Unexpected SN %u.%u != %u.%u, dropping! GUID %u %u %u %u | \r\n", + (int)proxy.expectedSN.high, (int)proxy.expectedSN.low, + (int)cacheChange.sn.high, (int)cacheChange.sn.low, + cacheChange.writerGuid.prefix.id[0], + cacheChange.writerGuid.prefix.id[1], + cacheChange.writerGuid.prefix.id[2], + cacheChange.writerGuid.prefix.id[3]); } } } @@ -97,6 +116,7 @@ bool StatefulReaderT::onNewGapMessage( if (!m_is_initialized_) { return false; } + SFR_LOG("Processing gap message %u %u", msg.gapStart, msg.gapList.base); Guid_t writerProxyGuid; writerProxyGuid.prefix = remotePrefix; @@ -114,43 +134,76 @@ bool StatefulReaderT::onNewGapMessage( return false; } - // We have not seen all messages leading up to gap start -> do nothing + // Case 1: We are still waiting for messages before gapStart if (writer->expectedSN < msg.gapStart) { + PacketInfo info; + info.srcPort = m_srcPort; + info.destAddr = writer->remoteLocator.getIp4Address(); + info.destPort = writer->remoteLocator.port; + rtps::MessageFactory::addHeader(info.buffer, + m_attributes.endpointGuid.prefix); + SequenceNumber_t last_valid = msg.gapStart; + --last_valid; + auto missing_sns = writer->getMissing(writer->expectedSN, last_valid); + rtps::MessageFactory::addAckNack(info.buffer, msg.writerId, msg.readerId, + missing_sns, writer->getNextAckNackCount(), + false); + m_transport->sendPacket(info); return true; } - // Start from base and search for first unset bit - SequenceNumber_t first_valid = msg.gapList.base; - for (unsigned int i = 0; i < msg.gapList.numBits; i++, first_valid++) { - if (!msg.gapList.isSet(i)) { - break; + // Case 2: We are expecting a message between [gapStart; gapList.base -1] + // Advance expectedSN beyond gapList.base + if (writer->expectedSN < msg.gapList.base) { + auto before = writer->expectedSN; + writer->expectedSN = msg.gapList.base; + + // writer->expectedSN++; + + // Advance expectedSN to first unset bit + for (uint32_t bit = 0; bit < SNS_MAX_NUM_BITS; + writer->expectedSN++, bit++) { + if (!msg.gapList.isSet(bit)) { + break; + } } - } - if (first_valid < writer->expectedSN) { - SFR_LOG("GAP: Ignoring gap, we expect a message beyond the gap"); return true; - } - SFR_LOG("GAP: moving expected SN to %u\n", (int)first_valid.low); - writer->expectedSN = first_valid; + }else{ - // Send an ack nack message - PacketInfo info; - info.srcPort = m_srcPort; - info.destAddr = writer->remoteLocator.getIp4Address(); - info.destPort = writer->remoteLocator.port; - rtps::MessageFactory::addHeader(info.buffer, - m_attributes.endpointGuid.prefix); - SequenceNumberSet set; - set.numBits = 1; - set.base = writer->expectedSN; - set.bitMap[0] = uint32_t{1} << 31; - rtps::MessageFactory::addAckNack(info.buffer, msg.writerId, msg.readerId, set, - writer->getNextAckNackCount(), false); - m_transport->sendPacket(info); + // Case 3: We are expecting a sequence number beyond gap list base, + // check if we need to update expectedSN + auto i = msg.gapList.base; + for(uint32_t bit = 0; bit < SNS_MAX_NUM_BITS; i++, bit++){ + if(i < writer->expectedSN){ + continue; + } - return false; + if(msg.gapList.isSet(bit)){ + writer->expectedSN++; + }else{ + PacketInfo info; + info.srcPort = m_srcPort; + info.destAddr = writer->remoteLocator.getIp4Address(); + info.destPort = writer->remoteLocator.port; + rtps::MessageFactory::addHeader(info.buffer, + m_attributes.endpointGuid.prefix); + SequenceNumberSet set; + set.base = writer->expectedSN; + set.numBits = 1; + set.bitMap[0] = set.bitMap[0] |= uint32_t{1} << 31; + rtps::MessageFactory::addAckNack(info.buffer, msg.writerId, msg.readerId, + set, writer->getNextAckNackCount(), + false); + m_transport->sendPacket(info); + + return true; + } + } + + + } } template @@ -179,10 +232,9 @@ bool StatefulReaderT::onNewHeartbeat( return false; } - if (msg.count.value <= writer->hbCount.value) { - - SFR_LOG("Ignore heartbeat. Count too low.\n"); - return false; + if (writer->expectedSN < msg.firstSN) { + SFR_LOG("expectedSN < firstSN, advancing expectedSN"); + writer->expectedSN = msg.firstSN; } writer->hbCount.value = msg.count.value; @@ -190,11 +242,41 @@ bool StatefulReaderT::onNewHeartbeat( info.destPort = writer->remoteLocator.port; rtps::MessageFactory::addHeader(info.buffer, m_attributes.endpointGuid.prefix); + auto missing_sns = writer->getMissing(msg.firstSN, msg.lastSN); + bool final_flag = (missing_sns.numBits == 0); rtps::MessageFactory::addAckNack(info.buffer, msg.writerId, msg.readerId, - writer->getMissing(msg.firstSN, msg.lastSN), - writer->getNextAckNackCount(), false); + missing_sns, writer->getNextAckNackCount(), + final_flag); + + SFR_LOG("Sending acknack base %u bits %u .\n", (int)missing_sns.base.low, + (int)missing_sns.numBits); + m_transport->sendPacket(info); + return true; +} + +template +bool StatefulReaderT::sendPreemptiveAckNack( + const WriterProxy &writer) { + Lock lock(m_proxies_mutex); + if (!m_is_initialized_) { + return false; + } + + PacketInfo info; + info.srcPort = m_attributes.unicastLocator.port; + info.destAddr = writer.remoteLocator.getIp4Address(); + info.destPort = writer.remoteLocator.port; + rtps::MessageFactory::addHeader(info.buffer, + m_attributes.endpointGuid.prefix); + SequenceNumberSet number_set; + number_set.base.high = 0; + number_set.base.low = 0; + number_set.numBits = 0; + rtps::MessageFactory::addAckNack( + info.buffer, writer.remoteWriterGuid.entityId, + m_attributes.endpointGuid.entityId, number_set, Count_t{1}, false); - SFR_LOG("Sending acknack.\n"); + SFR_LOG("Sending preemptive acknack.\n"); m_transport->sendPacket(info); return true; } diff --git a/include/rtps/entities/StatefulWriter.h b/include/rtps/entities/StatefulWriter.h index ad6ffc4c..60d71489 100644 --- a/include/rtps/entities/StatefulWriter.h +++ b/include/rtps/entities/StatefulWriter.h @@ -56,6 +56,17 @@ template class StatefulWriterT final : public Writer { NetworkDriver *m_transport; HistoryCacheWithDeletion m_history; + + /* + * Cache changes marked as disposeAfterWrite are retained for a short amount + * in case of retransmission The whole 'disposeAfterWrite' mechanisms only + * exists to allow for repeated creation and deletion of endpoints during + * operation. Otherwise the history will quickly reach its limits. Will be + * replaced with something more elegant in the future. + */ + ThreadSafeCircularBuffer m_disposeWithDelay; + void dropDisposeAfterWriteChanges(); + sys_thread_t m_heartbeatThread; Count_t m_hbCount{1}; @@ -68,7 +79,8 @@ template class StatefulWriterT final : public Writer { static void hbFunctionJumppad(void *thisPointer); void sendHeartBeatLoop(); void sendHeartBeat(); - void sendGap(const ReaderProxy &reader, const SequenceNumber_t &missingSN); + void sendGap(const ReaderProxy &reader, const SequenceNumber_t &firstMissing, + const SequenceNumber_t &nextValid); }; using StatefulWriter = StatefulWriterT; diff --git a/include/rtps/entities/StatefulWriter.tpp b/include/rtps/entities/StatefulWriter.tpp index 319d2780..399d7b7f 100644 --- a/include/rtps/entities/StatefulWriter.tpp +++ b/include/rtps/entities/StatefulWriter.tpp @@ -38,7 +38,7 @@ using rtps::StatefulWriterT; if (true) { \ printf("[Stateful Writer %s] ", this->m_attributes.topicName); \ printf(__VA_ARGS__); \ - printf("\n"); \ + printf("\r\n"); \ } #else #define SFW_LOG(...) // @@ -85,6 +85,8 @@ bool StatefulWriterT::init(TopicData attributes, m_history.clear(); m_hbCount = {1}; + m_disposeWithDelay.init(); + // Thread already exists, do not create new one (reusing slot case) m_is_initialized_ = true; @@ -135,11 +137,13 @@ const rtps::CacheChange *StatefulWriterT::newChange( if (m_history.isFull()) { // Right now we drop elements anyway because we cannot detect non-responding // readers yet. return nullptr; - SequenceNumber_t newMin = ++SequenceNumber_t(m_history.getSeqNumMin()); + SequenceNumber_t newMin = + ++SequenceNumber_t(m_history.getCurrentSeqNumMin()); if (m_nextSequenceNumberToSend < newMin) { m_nextSequenceNumberToSend = newMin; // Make sure we have the correct sn to send } + SFW_LOG("History full! Dropping changes %s.\r\n", this->m_attributes.topicName); } auto *result = @@ -158,14 +162,23 @@ template void StatefulWriterT::progress() { Lock{m_mutex}; CacheChange *next = m_history.getChangeBySN(m_nextSequenceNumberToSend); if (next != nullptr) { + uint32_t i = 0; for (const auto &proxy : m_proxies) { if (!m_enforceUnicast) { sendDataWRMulticast(proxy, next); } else { + i++; sendData(proxy, next); } } + SFW_LOG("Sending data with SN %u.%u", (int)m_nextSequenceNumberToSend.low, + (int)m_nextSequenceNumberToSend.high); + + if (next->disposeAfterWrite) { + SFW_LOG("Dispose after write msg sent to %u proxies\r\n", (int)i); + } + /* * Use case: deletion of local endpoints * -> send Data Message with Disposed Flag set @@ -174,15 +187,24 @@ template void StatefulWriterT::progress() { * -> onAckNack will send Gap Messages to skip deleted local endpoints * during SEDP */ - if (next->diposeAfterWrite) { - m_history.dropChange(next->sequenceNumber); + if (next->disposeAfterWrite) { + next->sentTickCount = xTaskGetTickCount(); + if (!m_disposeWithDelay.copyElementIntoBuffer(next->sequenceNumber)) { + SFW_LOG("Failed to enqueue dispose after write!"); + m_history.dropChange(next->sequenceNumber); + } else { + SFW_LOG("Delayed dispose scheduled for sn %u %u\r\n", + (int)next->sequenceNumber.high, (int)next->sequenceNumber.low); + } } + + ++m_nextSequenceNumberToSend; + sendHeartBeat(); + } else { SFW_LOG("Couldn't get a CacheChange with SN (%i,%u)\n", m_nextSequenceNumberToSend.high, m_nextSequenceNumberToSend.low); } - - ++m_nextSequenceNumberToSend; } template @@ -190,7 +212,7 @@ void StatefulWriterT::setAllChangesToUnsent() { INIT_GUARD() Lock lock(m_mutex); - m_nextSequenceNumberToSend = m_history.getSeqNumMin(); + m_nextSequenceNumberToSend = m_history.getCurrentSeqNumMin(); if (mp_threadPool != nullptr) { mp_threadPool->addWorkload(this); @@ -224,53 +246,83 @@ void StatefulWriterT::onNewAckNack( return; } - if (msg.count.value <= reader->ackNackCount.value) { + reader->ackNackCount = msg.count; + reader->finalFlag = msg.header.finalFlag(); + reader->lastAckNackSequenceNumber = msg.readerSNState.base; - SFW_LOG("Count too small. Dropping acknack.\n"); + rtps::SequenceNumber_t nextSN = msg.readerSNState.base; + // Preemptive ack nack + if (nextSN.low == 0 && nextSN.high == 0) { + sendHeartBeat(); return; } - reader->ackNackCount = msg.count; - reader->finalFlag = msg.header.finalFlag(); - reader->lastAckNackSequenceNumber = msg.readerSNState.base; + if (m_history.isEmpty()) { + // We have never sent anything -> heartbeat + if (m_history.getLastUsedSequenceNumber() == rtps::SequenceNumber_t{0, 0}) { + sendHeartBeat(); + } else { + // No data but we have sent something in the past -> GapStart = + // readerSNState.base, NextValid = lastUsedSequenceNumber+1 + rtps::SequenceNumber_t nextValid = m_history.getLastUsedSequenceNumber(); + ++nextValid; + sendGap(*reader, msg.readerSNState.base, nextValid); + } - // Send missing packets - SequenceNumber_t nextSN = msg.readerSNState.base; + return; + } - if (nextSN.low == 0 && nextSN.high == 0) { - SFW_LOG("Received preemptive acknack. Ignored.\n"); - } else { - SFW_LOG("Received non-preemptive acknack.\n"); + // Requesting smaller SN than minimum sequence number -> sendGap + if (msg.readerSNState.base < m_history.getCurrentSeqNumMin()) { + sendGap(*reader, msg.readerSNState.base, m_history.getCurrentSeqNumMin()); + return; } - for (uint32_t i = 0; i < msg.readerSNState.numBits; ++i, ++nextSN) { - if (msg.readerSNState.isSet(i)) { + SFW_LOG("Received non-preemptive acknack with %u bits set.\r\n", + msg.readerSNState.numBits); + for (uint32_t i = 0; i < msg.readerSNState.numBits && + nextSN <= m_history.getLastUsedSequenceNumber(); + ++i, ++nextSN) { - SFW_LOG("Send Packet on acknack.\n"); - const CacheChange *cache = m_history.getChangeBySN(nextSN); + if (msg.readerSNState.isSet(i)) { - // We should have this SN -> send GAP Message - if (cache == nullptr && m_history.isSNInRange(nextSN)) { - sendGap(*reader, nextSN); - continue; - } + SFW_LOG("Looking for change %u | Bit %u", nextSN.low, i); + const rtps::CacheChange *cache = m_history.getChangeBySN(nextSN); + // We still have the cache, send DATA if (cache != nullptr) { + if (cache->disposeAfterWrite) { + SFW_LOG("SERVING FROM DISPOSE AFTER WRITE CACHE\r\n"); + } sendData(*reader, cache); + } else { + SFW_LOG("> Change not found, search for next valid SN %u \r\n", + nextSN.low); + // Cache not found, look for next valid SN + rtps::SequenceNumber_t gapBegin = nextSN; + rtps::CacheChange *nextValidChange = nullptr; + uint32_t j = i + 1; + for (++nextSN; nextSN <= m_history.getLastUsedSequenceNumber(); + ++nextSN, ++j) { + nextValidChange = m_history.getChangeBySN(nextSN); + if (nextValidChange != nullptr) { + break; + } + } + if (nextValidChange == nullptr) { + sendGap(*reader, gapBegin, nextSN); + return; + } else { + sendGap(*reader, gapBegin, nextValidChange->sequenceNumber); + } + // sendData(nullptr, nextValidChange); + nextSN = nextValidChange->sequenceNumber; + --nextSN; + i = --j; } } } - // Check for sequence numbers after defined range - SequenceNumber_t maxSN; - { maxSN = m_history.getSeqNumMax(); } - while (nextSN <= maxSN) { - const CacheChange *cache = m_history.getChangeBySN(nextSN); - if (cache != nullptr) { - sendData(*reader, cache); - } - ++nextSN; - } } template @@ -310,7 +362,8 @@ bool StatefulWriterT::sendData(const ReaderProxy &reader, template void StatefulWriterT::sendGap( - const ReaderProxy &reader, const SequenceNumber_t &missingSN) { + const ReaderProxy &reader, const SequenceNumber_t &firstMissing, + const SequenceNumber_t &nextValid) { INIT_GUARD() // TODO smarter packaging e.g. by creating MessageStruct and serialize after // adjusting values Reusing the pbuf is not possible. See @@ -328,9 +381,9 @@ void StatefulWriterT::sendGap( info.destAddr = locator.getIp4Address(); info.destPort = (Ip4Port_t)locator.port; - MessageFactory::addSubmessageGap(info.buffer, - m_attributes.endpointGuid.entityId, - reader.remoteReaderGuid.entityId, missingSN); + MessageFactory::addSubmessageGap( + info.buffer, m_attributes.endpointGuid.entityId, + reader.remoteReaderGuid.entityId, firstMissing, nextValid); m_transport->sendPacket(info); } @@ -384,15 +437,61 @@ void StatefulWriterT::sendHeartBeatLoop() { m_thread_running = true; while (m_running) { sendHeartBeat(); + dropDisposeAfterWriteChanges(); + bool unconfirmed_changes = false; + for (auto it : m_proxies) { + if (it.lastAckNackSequenceNumber < m_nextSequenceNumberToSend) { + unconfirmed_changes = true; + break; + } + } + + // Temporarily increase HB frequency if there are unconfirmed remote changes + if (unconfirmed_changes) { + SFW_LOG("HB SPEEDUP!\r\n"); #ifdef OS_IS_FREERTOS - vTaskDelay(pdMS_TO_TICKS(Config::SF_WRITER_HB_PERIOD_MS)); + vTaskDelay(pdMS_TO_TICKS(Config::SF_WRITER_HB_PERIOD_MS / 4)); + } else { + vTaskDelay(pdMS_TO_TICKS(Config::SF_WRITER_HB_PERIOD_MS)); + } #else - sys_msleep(Config::SF_WRITER_HB_PERIOD_MS); + sys_msleep(Config::SF_WRITER_HB_PERIOD_MS / 4); + } else { + sys_msleep(Config::SF_WRITER_HB_PERIOD_MS); + } #endif } m_thread_running = false; } +template +void StatefulWriterT::dropDisposeAfterWriteChanges() { + SequenceNumber_t oldest_retained; + while (m_disposeWithDelay.peakFirst(oldest_retained)) { + + CacheChange *change = m_history.getChangeBySN(oldest_retained); + if (change == nullptr || !change->disposeAfterWrite) { + // Not in history anymore, drop + m_disposeWithDelay.moveFirstInto(oldest_retained); + return; + } + + auto age = (xTaskGetTickCount() - change->sentTickCount); + if (age > pdMS_TO_TICKS(4000)) { + m_history.dropChange(change->sequenceNumber); + SFW_LOG("Removing SN %u %u for good\r\n", + static_cast(oldest_retained.low), + static_cast(oldest_retained.high)); + SequenceNumber_t tmp; + m_disposeWithDelay.moveFirstInto(tmp); + + continue; + } else { + return; + } + } +} + template void StatefulWriterT::sendHeartBeat() { INIT_GUARD() @@ -409,25 +508,41 @@ void StatefulWriterT::sendHeartBeat() { SequenceNumber_t firstSN; SequenceNumber_t lastSN; + MessageFactory::addHeader(info.buffer, m_attributes.endpointGuid.prefix); + { Lock lock(m_mutex); - firstSN = m_history.getSeqNumMin(); - lastSN = m_history.getSeqNumMax(); - // Proxy has confirmed all sequence numbers and set final flag - if ((proxy.lastAckNackSequenceNumber > lastSN) && proxy.finalFlag && - proxy.ackNackCount.value > 0) { - continue; + if (!m_history.isEmpty()) { + firstSN = m_history.getCurrentSeqNumMin(); + lastSN = m_history.getCurrentSeqNumMax(); + + // Otherwise we may announce changes that have not been sent at least + // once! + if (lastSN > m_nextSequenceNumberToSend || + lastSN == m_nextSequenceNumberToSend) { + lastSN = m_nextSequenceNumberToSend; + --lastSN; + } + + // Proxy has confirmed all sequence numbers and set final flag + if ((proxy.lastAckNackSequenceNumber > lastSN) && proxy.finalFlag && + proxy.ackNackCount.value > 0) { + continue; + } + } else if (m_history.getLastUsedSequenceNumber() == + SequenceNumber_t{0, 0}) { + firstSN = SequenceNumber_t{0, 1}; + lastSN = SequenceNumber_t{0, 0}; + } else { + firstSN = SequenceNumber_t{0, 1}; + lastSN = m_history.getLastUsedSequenceNumber(); } } - if (firstSN == SEQUENCENUMBER_UNKNOWN || lastSN == SEQUENCENUMBER_UNKNOWN) { - if (strlen(&this->m_attributes.typeName[0]) != 0) { - SFW_LOG("Skipping heartbeat. No data.\n"); - } - return; - } + SFW_LOG("Sending HB with SN range [%u.%u;%u.%u]", firstSN.low, firstSN.high, + lastSN.low, lastSN.high); MessageFactory::addHeartbeat( info.buffer, m_attributes.endpointGuid.entityId, diff --git a/include/rtps/entities/StatelessWriter.tpp b/include/rtps/entities/StatelessWriter.tpp index 371bbc06..bb4982e0 100644 --- a/include/rtps/entities/StatelessWriter.tpp +++ b/include/rtps/entities/StatelessWriter.tpp @@ -115,6 +115,7 @@ const CacheChange *StatelessWriterT::newChange( m_nextSequenceNumberToSend = newMin; // Make sure we have the correct sn to send } + SLW_LOG("History is full, dropping oldest %s\r\n", this->m_attributes.topicName); } auto *result = m_history.addChange(data, size); @@ -215,10 +216,11 @@ void StatelessWriterT::progress() { info.destAddr = proxy.remoteLocator.getIp4Address(); info.destPort = (Ip4Port_t)proxy.remoteLocator.port; } - + m_transport->sendPacket(info); } } + m_history.removeUntilIncl(m_nextSequenceNumberToSend); ++m_nextSequenceNumberToSend; } diff --git a/include/rtps/entities/Writer.h b/include/rtps/entities/Writer.h index 1653a0d9..d1693b7b 100644 --- a/include/rtps/entities/Writer.h +++ b/include/rtps/entities/Writer.h @@ -79,6 +79,8 @@ class Writer { void setSEDPSequenceNumber(const SequenceNumber_t &sn); const SequenceNumber_t &getSEDPSequenceNumber(); + bool isBuiltinEndpoint(); + protected: SequenceNumber_t m_sedp_sequence_number; diff --git a/include/rtps/entities/WriterProxy.h b/include/rtps/entities/WriterProxy.h index 0c01c19a..4db9e529 100644 --- a/include/rtps/entities/WriterProxy.h +++ b/include/rtps/entities/WriterProxy.h @@ -46,16 +46,21 @@ struct WriterProxy { // For now, we don't store any packets, so we just request all starting from // the next expected - SequenceNumberSet getMissing(const SequenceNumber_t & /*firstAvail*/, + SequenceNumberSet getMissing(const SequenceNumber_t &firstAvail, const SequenceNumber_t &lastAvail) { SequenceNumberSet set; if (lastAvail < expectedSN) { set.base = expectedSN; set.numBits = 0; } else { - set.numBits = 1; set.base = expectedSN; - set.bitMap[0] = uint32_t{1} << 31; + SequenceNumber_t i; + uint32_t bit; + for (bit = 0, i = expectedSN; i <= lastAvail && bit < SNS_MAX_NUM_BITS; + i++, bit++) { + set.bitMap[0] |= uint32_t{1} << (31 - bit); + set.numBits++; + } } return set; diff --git a/include/rtps/messages/MessageFactory.h b/include/rtps/messages/MessageFactory.h index 0869c4fe..fae2f73f 100644 --- a/include/rtps/messages/MessageFactory.h +++ b/include/rtps/messages/MessageFactory.h @@ -22,6 +22,9 @@ This file is part of embeddedRTPS. Author: i11 - Embedded Software, RWTH Aachen University */ +// Copyright 2023 Apex.AI, Inc. +// All rights reserved. + #ifndef RTPS_MESSAGEFACTORY_H #define RTPS_MESSAGEFACTORY_H @@ -133,8 +136,7 @@ void addSubMessageData(Buffer &buffer, const Buffer &filledPayload, serializeMessage(buffer, msg); if (filledPayload.isValid()) { - Buffer shallowCopy = filledPayload; - buffer.append(std::move(shallowCopy)); + buffer.append(filledPayload); } } @@ -176,7 +178,8 @@ void addAckNack(Buffer &buffer, EntityId_t writerId, EntityId_t readerId, if (final_flag) { subMsg.header.flags |= FLAG_FINAL; // For now, we don't want any response } else { - subMsg.header.flags &= ~FLAG_FINAL; // For now, we don't want any response + subMsg.header.flags &= + ~FLAG_FINAL; // Send future heartbeats, even if no change occured } subMsg.header.octetsToNextHeader = SubmessageAckNack::getRawSize(readerSNState) - numBytesUntilEndOfLength; @@ -191,7 +194,8 @@ void addAckNack(Buffer &buffer, EntityId_t writerId, EntityId_t readerId, template void addSubmessageGap(Buffer &buffer, EntityId_t writerId, EntityId_t readerId, - SequenceNumber_t missingSN) { + const SequenceNumber_t &firstMissing, + const SequenceNumber_t &nextValid) { SubmessageGap subMsg; subMsg.header.submessageId = SubmessageKind::GAP; #if IS_LITTLE_ENDIAN @@ -203,8 +207,8 @@ void addSubmessageGap(Buffer &buffer, EntityId_t writerId, EntityId_t readerId, subMsg.writerId = writerId; subMsg.readerId = readerId; - subMsg.gapStart = missingSN; - subMsg.gapList.base = ++missingSN; + subMsg.gapStart = firstMissing; + subMsg.gapList.base = nextValid; subMsg.gapList.numBits = 0; serializeMessage(buffer, subMsg); diff --git a/include/rtps/storages/CacheChange.h b/include/rtps/storages/CacheChange.h index 23a70409..c4e3beed 100644 --- a/include/rtps/storages/CacheChange.h +++ b/include/rtps/storages/CacheChange.h @@ -32,9 +32,21 @@ namespace rtps { struct CacheChange { ChangeKind_t kind = ChangeKind_t::INVALID; bool inLineQoS = false; - bool diposeAfterWrite = false; + bool disposeAfterWrite = false; + TickType_t sentTickCount = 0; SequenceNumber_t sequenceNumber = SEQUENCENUMBER_UNKNOWN; - PBufWrapper data{}; + PBufWrapper data; + + CacheChange &operator=(const CacheChange &other) = delete; + + CacheChange &operator=(CacheChange &&other) noexcept { + kind = other.kind; + inLineQoS = other.inLineQoS; + disposeAfterWrite = other.disposeAfterWrite; + sentTickCount = other.sentTickCount; + sequenceNumber = other.sequenceNumber; + data = std::move(other.data); + } CacheChange() = default; CacheChange(ChangeKind_t kind, SequenceNumber_t sequenceNumber) @@ -44,8 +56,11 @@ struct CacheChange { kind = ChangeKind_t::INVALID; sequenceNumber = SEQUENCENUMBER_UNKNOWN; inLineQoS = false; - diposeAfterWrite = false; + disposeAfterWrite = false; + sentTickCount = 0; } + + bool isInitialized() { return (kind != ChangeKind_t::INVALID); } }; } // namespace rtps diff --git a/include/rtps/storages/HistoryCacheWithDeletion.h b/include/rtps/storages/HistoryCacheWithDeletion.h index ac1e436e..cb1c382f 100644 --- a/include/rtps/storages/HistoryCacheWithDeletion.h +++ b/include/rtps/storages/HistoryCacheWithDeletion.h @@ -40,6 +40,8 @@ template class HistoryCacheWithDeletion { public: HistoryCacheWithDeletion() = default; + uint32_t m_dispose_after_write_cnt = 0; + bool isFull() const { uint16_t it = m_head; incrementIterator(it); @@ -51,11 +53,15 @@ template class HistoryCacheWithDeletion { CacheChange change; change.kind = ChangeKind_t::ALIVE; change.inLineQoS = inLineQoS; - change.diposeAfterWrite = disposeAfterWrite; + change.disposeAfterWrite = disposeAfterWrite; change.data.reserve(size); change.data.append(data, size); change.sequenceNumber = ++m_lastUsedSequenceNumber; + if (disposeAfterWrite) { + m_dispose_after_write_cnt++; + } + CacheChange *place = &m_buffer[m_head]; incrementHead(); @@ -72,7 +78,7 @@ template class HistoryCacheWithDeletion { return; } - if (getSeqNumMax() <= sn) { // We won't overrun head + if (getCurrentSeqNumMax() <= sn) { // We won't overrun head m_head = m_tail; return; } @@ -82,13 +88,12 @@ template class HistoryCacheWithDeletion { } } - void dropOldest() { removeUntilIncl(getSeqNumMin()); } + void dropOldest() { removeUntilIncl(getCurrentSeqNumMin()); } bool dropChange(const SequenceNumber_t &sn) { uint16_t idx_to_clear; CacheChange *change; if (!getChangeBySN(sn, &change, idx_to_clear)) { - printf("History: couldn't find SN with = %u\n", (int)sn.low); return false; // sn does not exist, nothing to do } @@ -105,7 +110,7 @@ template class HistoryCacheWithDeletion { prev = m_buffer.size() - 1; } - m_buffer[idx_to_clear] = m_buffer[prev]; + m_buffer[idx_to_clear] = std::move(m_buffer[prev]); idx_to_clear = prev; } while (prev != m_tail); @@ -135,7 +140,9 @@ template class HistoryCacheWithDeletion { } } - const SequenceNumber_t &getSeqNumMin() const { + bool isEmpty() { return (m_head == m_tail); } + + const SequenceNumber_t &getCurrentSeqNumMin() const { if (m_head == m_tail) { return SEQUENCENUMBER_UNKNOWN; } else { @@ -143,7 +150,7 @@ template class HistoryCacheWithDeletion { } } - const SequenceNumber_t &getSeqNumMax() const { + const SequenceNumber_t &getCurrentSeqNumMax() const { if (m_head == m_tail) { return SEQUENCENUMBER_UNKNOWN; } else { @@ -151,6 +158,10 @@ template class HistoryCacheWithDeletion { } } + const SequenceNumber_t &getLastUsedSequenceNumber() { + return m_lastUsedSequenceNumber; + } + void clear() { m_head = 0; m_tail = 0; @@ -183,8 +194,11 @@ template class HistoryCacheWithDeletion { } #endif bool isSNInRange(const SequenceNumber_t &sn) { - SequenceNumber_t minSN = getSeqNumMin(); - if (sn < minSN || getSeqNumMax() < sn) { + if (isEmpty()) { + return false; + } + SequenceNumber_t minSN = getCurrentSeqNumMin(); + if (sn < minSN || getCurrentSeqNumMax() < sn) { return false; } return true; @@ -247,6 +261,9 @@ template class HistoryCacheWithDeletion { } inline void incrementTail() { + if (m_buffer[m_tail].disposeAfterWrite) { + m_dispose_after_write_cnt--; + } if (m_head != m_tail) { m_buffer[m_tail].reset(); incrementIterator(m_tail); diff --git a/include/rtps/storages/MemoryPool.h b/include/rtps/storages/MemoryPool.h index a7d500c7..c4bbc2c2 100644 --- a/include/rtps/storages/MemoryPool.h +++ b/include/rtps/storages/MemoryPool.h @@ -137,7 +137,7 @@ template class MemoryPool { bool retcode = false; for (auto it = begin(); it != end(); ++it) { if (jumppad(isCorrectElement, *it)) { - const uint8_t bucket = it.m_bit / uint32_t{8}; + const uint32_t bucket = it.m_bit / uint32_t{8}; const uint32_t pos = it.m_bit & uint32_t{ diff --git a/include/rtps/storages/PBufWrapper.h b/include/rtps/storages/PBufWrapper.h index 6e8339e5..7a5ff2a7 100644 --- a/include/rtps/storages/PBufWrapper.h +++ b/include/rtps/storages/PBufWrapper.h @@ -22,6 +22,9 @@ This file is part of embeddedRTPS. Author: i11 - Embedded Software, RWTH Aachen University */ +// Copyright 2023 Apex.AI, Inc. +// All rights reserved. + #ifndef RTPS_PBUFWRAPPER_H #define RTPS_PBUFWRAPPER_H @@ -38,28 +41,26 @@ struct PBufWrapper { explicit PBufWrapper(pbuf *bufferToWrap); explicit PBufWrapper(DataSize_t length); - // Shallow Copy. No copying of the underlying pbuf. Just another reference - // like a shared pointer. - PBufWrapper(const PBufWrapper &other); - PBufWrapper &operator=(const PBufWrapper &other); + PBufWrapper(const PBufWrapper &other) = delete; + PBufWrapper &operator=(const PBufWrapper &other) = delete; PBufWrapper(PBufWrapper &&other) noexcept; PBufWrapper &operator=(PBufWrapper &&other) noexcept; ~PBufWrapper(); - PBufWrapper deepCopy() const; - bool isValid() const; bool append(const uint8_t *data, DataSize_t length); /// Note that unused reserved memory is now part of the wrapper. New calls to /// append(uint8_t*[...]) will continue behind the appended wrapper - void append(PBufWrapper &&other); + void append(const PBufWrapper &other); bool reserve(DataSize_t length); + void destroy(); + /// After calling this function, data is added starting from the beginning /// again. It does not revert reserve. void reset(); diff --git a/include/rtps/storages/SimpleHistoryCache.h b/include/rtps/storages/SimpleHistoryCache.h index 25313dbc..ba6715eb 100644 --- a/include/rtps/storages/SimpleHistoryCache.h +++ b/include/rtps/storages/SimpleHistoryCache.h @@ -52,7 +52,7 @@ template class SimpleHistoryCache { CacheChange change; change.kind = ChangeKind_t::ALIVE; change.inLineQoS = inLineQoS; - change.diposeAfterWrite = disposeAfterWrite; + change.disposeAfterWrite = disposeAfterWrite; change.data.reserve(size); change.data.append(data, size); change.sequenceNumber = ++m_lastUsedSequenceNumber; @@ -78,7 +78,7 @@ template class SimpleHistoryCache { return; } - while (m_buffer[m_tail].sequenceNumber <= sn) { + while (m_buffer[m_tail].sequenceNumber <= sn && (m_head != m_tail)) { incrementTail(); } } diff --git a/include/rtps/storages/ThreadSafeCircularBuffer.h b/include/rtps/storages/ThreadSafeCircularBuffer.h index 5ea9eb55..35f2ba00 100644 --- a/include/rtps/storages/ThreadSafeCircularBuffer.h +++ b/include/rtps/storages/ThreadSafeCircularBuffer.h @@ -39,6 +39,7 @@ template class ThreadSafeCircularBuffer { bool init(); bool moveElementIntoBuffer(T &&elem); + bool copyElementIntoBuffer(const T &elem); /** * Removes the first into the given hull. Also moves responsibility for @@ -46,6 +47,10 @@ template class ThreadSafeCircularBuffer { * @return true if element was injected. False if no element was present. */ bool moveFirstInto(T &hull); + bool peakFirst(T &hull); + + uint32_t numElements(); + uint32_t insertionFailures(); void clear(); @@ -53,6 +58,8 @@ template class ThreadSafeCircularBuffer { std::array m_buffer{}; uint16_t m_head = 0; uint16_t m_tail = 0; + uint32_t m_num_elements = 0; + uint32_t m_insertion_failures = 0; static_assert(SIZE + 1 < std::numeric_limits::max(), "Iterator is large enough for given size"); diff --git a/include/rtps/storages/ThreadSafeCircularBuffer.tpp b/include/rtps/storages/ThreadSafeCircularBuffer.tpp index 41ce60ed..be1d7e84 100644 --- a/include/rtps/storages/ThreadSafeCircularBuffer.tpp +++ b/include/rtps/storages/ThreadSafeCircularBuffer.tpp @@ -42,6 +42,20 @@ bool ThreadSafeCircularBuffer::moveElementIntoBuffer(T &&elem) { incrementHead(); return true; } else { + m_insertion_failures++; + return false; + } +} + +template +bool ThreadSafeCircularBuffer::copyElementIntoBuffer(const T &elem) { + Lock lock(m_mutex); + if (!isFull()) { + m_buffer[m_head] = elem; + incrementHead(); + return true; + } else { + m_insertion_failures++; return false; } } @@ -58,10 +72,27 @@ bool ThreadSafeCircularBuffer::moveFirstInto(T &hull) { } } +template +bool ThreadSafeCircularBuffer::peakFirst(T &hull) { + Lock lock(m_mutex); + if (m_head != m_tail) { + hull = m_buffer[m_tail]; + return true; + } else { + return false; + } +} + +template +uint32_t ThreadSafeCircularBuffer::numElements() { + return m_num_elements; +} + template void ThreadSafeCircularBuffer::clear() { Lock lock(m_mutex); m_head = m_tail; + m_num_elements = 0; } template @@ -83,11 +114,13 @@ ThreadSafeCircularBuffer::incrementIterator(uint16_t &iterator) { template inline void ThreadSafeCircularBuffer::incrementTail() { incrementIterator(m_tail); + m_num_elements--; } template inline void ThreadSafeCircularBuffer::incrementHead() { incrementIterator(m_head); + m_num_elements++; if (m_head == m_tail) { incrementTail(); } diff --git a/include/rtps/utils/Diagnostics.h b/include/rtps/utils/Diagnostics.h new file mode 100644 index 00000000..fbc4aad9 --- /dev/null +++ b/include/rtps/utils/Diagnostics.h @@ -0,0 +1,85 @@ +/* +The MIT License +Copyright (c) 2019 Lehrstuhl Informatik 11 - RWTH Aachen University +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE + +This file is part of embeddedRTPS. + +Author: i11 - Embedded Software, RWTH Aachen University +*/ + +#ifndef RTPS_DIAGNOSTICS_H +#define RTPS_DIAGNOSTICS_H + +#include + +namespace rtps { +namespace Diagnostics { + +namespace ThreadPool { +extern uint32_t dropped_incoming_packets_usertraffic; +extern uint32_t dropped_incoming_packets_metatraffic; + +extern uint32_t dropped_outgoing_packets_usertraffic; +extern uint32_t dropped_outgoing_packets_metatraffic; + +extern uint32_t processed_incoming_metatraffic; +extern uint32_t processed_outgoing_metatraffic; +extern uint32_t processed_incoming_usertraffic; +extern uint32_t processed_outgoing_usertraffic; + +extern uint32_t max_ever_elements_outgoing_usertraffic_queue; +extern uint32_t max_ever_elements_incoming_usertraffic_queue; + +extern uint32_t max_ever_elements_outgoing_metatraffic_queue; +extern uint32_t max_ever_elements_incoming_metatraffic_queue; +} // namespace ThreadPool + +namespace StatefulReader { +extern uint32_t sfr_unexpected_sn; +extern uint32_t sfr_retransmit_requests; +} // namespace StatefulReader + +namespace Network { +extern uint32_t lwip_allocation_failures; +} + +namespace OS { +extern uint32_t current_free_heap_size; +} + +namespace SEDP { +extern uint32_t max_ever_remote_participants; +extern uint32_t current_remote_participants; + +extern uint32_t max_ever_matched_reader_proxies; +extern uint32_t current_max_matched_reader_proxies; + +extern uint32_t max_ever_matched_writer_proxies; +extern uint32_t current_max_matched_writer_proxies; + +extern uint32_t max_ever_unmatched_reader_proxies; +extern uint32_t current_max_unmatched_reader_proxies; + +extern uint32_t max_ever_unmatched_writer_proxies; +extern uint32_t current_max_unmatched_writer_proxies; +} // namespace SEDP + +} // namespace Diagnostics +} // namespace rtps + +#endif diff --git a/src/ThreadPool.cpp b/src/ThreadPool.cpp index a6edfe8f..aca048b9 100644 --- a/src/ThreadPool.cpp +++ b/src/ThreadPool.cpp @@ -25,7 +25,9 @@ Author: i11 - Embedded Software, RWTH Aachen University #include "rtps/ThreadPool.h" #include "lwip/tcpip.h" +#include "rtps/entities/Domain.h" #include "rtps/entities/Writer.h" +#include "rtps/utils/Diagnostics.h" #include "rtps/utils/Log.h" #include "rtps/utils/udpUtils.h" @@ -37,7 +39,7 @@ using rtps::ThreadPool; if (true) { \ printf("[ThreadPool] "); \ printf(__VA_ARGS__); \ - printf("\n"); \ + printf("\r\n"); \ } #else #define THREAD_POOL_LOG(...) // @@ -46,7 +48,8 @@ using rtps::ThreadPool; ThreadPool::ThreadPool(receiveJumppad_fp receiveCallback, void *callee) : m_receiveJumppad(receiveCallback), m_callee(callee) { - if (!m_queueOutgoing.init() || !m_queueIncoming.init()) { + if (!m_outgoingMetaTraffic.init() || !m_outgoingUserTraffic.init() || + !m_incomingMetaTraffic.init() || !m_incomingUserTraffic.init()) { return; } err_t inputErr = sys_sem_new(&m_readerNotificationSem, 0); @@ -71,6 +74,29 @@ ThreadPool::~ThreadPool() { } } +void ThreadPool::updateDiagnostics() { + + rtps::Diagnostics::ThreadPool::max_ever_elements_incoming_usertraffic_queue = + std::max(rtps::Diagnostics::ThreadPool:: + max_ever_elements_incoming_usertraffic_queue, + m_incomingUserTraffic.numElements()); + + rtps::Diagnostics::ThreadPool::max_ever_elements_outgoing_usertraffic_queue = + std::max(rtps::Diagnostics::ThreadPool:: + max_ever_elements_outgoing_usertraffic_queue, + m_outgoingUserTraffic.numElements()); + + rtps::Diagnostics::ThreadPool::max_ever_elements_incoming_metatraffic_queue = + std::max(rtps::Diagnostics::ThreadPool:: + max_ever_elements_incoming_metatraffic_queue, + m_incomingMetaTraffic.numElements()); + + rtps::Diagnostics::ThreadPool::max_ever_elements_outgoing_metatraffic_queue = + std::max(rtps::Diagnostics::ThreadPool:: + max_ever_elements_outgoing_metatraffic_queue, + m_outgoingMetaTraffic.numElements()); +} + bool ThreadPool::startThreads() { if (m_running) { return true; @@ -117,23 +143,71 @@ void ThreadPool::stopThreads() { } void ThreadPool::clearQueues() { - m_queueOutgoing.clear(); - m_queueIncoming.clear(); + m_outgoingMetaTraffic.clear(); + m_outgoingUserTraffic.clear(); + m_incomingMetaTraffic.clear(); + m_incomingUserTraffic.clear(); } bool ThreadPool::addWorkload(Writer *workload) { - bool res = m_queueOutgoing.moveElementIntoBuffer(std::move(workload)); + bool res = false; + if (workload->isBuiltinEndpoint()) { + res = m_outgoingMetaTraffic.moveElementIntoBuffer(std::move(workload)); + } else { + res = m_outgoingUserTraffic.moveElementIntoBuffer(std::move(workload)); + } if (res) { sys_sem_signal(&m_writerNotificationSem); + } else { + if(workload->isBuiltinEndpoint()){ + rtps::Diagnostics::ThreadPool::dropped_outgoing_packets_metatraffic++; + }else{ + rtps::Diagnostics::ThreadPool::dropped_outgoing_packets_usertraffic++; + } + THREAD_POOL_LOG("Failed to enqueue outgoing packet."); } return res; } +bool ThreadPool::addBuiltinPort(const Ip4Port_t &port) { + if (m_builtinPortsIdx == m_builtinPorts.size()) { + return false; + } + + // TODO: Does not allow for participant deletion! + m_builtinPorts[m_builtinPortsIdx] = port; + m_builtinPortsIdx++; + + return true; +} + +bool ThreadPool::isBuiltinPort(const Ip4Port_t &port) { + if (getBuiltInMulticastLocator().port == port) { + return true; + } + + for (unsigned int i = 0; i < m_builtinPortsIdx; i++) { + if (m_builtinPorts[i] == port) { + return true; + } + } + + return false; +} + bool ThreadPool::addNewPacket(PacketInfo &&packet) { - bool res = m_queueIncoming.moveElementIntoBuffer(std::move(packet)); + bool res = false; + if (isBuiltinPort(packet.destPort)) { + res = m_incomingMetaTraffic.moveElementIntoBuffer(std::move(packet)); + } else { + res = m_incomingUserTraffic.moveElementIntoBuffer(std::move(packet)); + } if (res) { sys_sem_signal(&m_readerNotificationSem); + } else { + THREAD_POOL_LOG("failed to enqueue packet for port %u", + static_cast(packet.destPort)); } return res; } @@ -152,19 +226,34 @@ void ThreadPool::writerThreadFunction(void *arg) { void ThreadPool::doWriterWork() { while (m_running) { - Writer *workload; - auto isWorkToDo = m_queueOutgoing.moveFirstInto(workload); - if (!isWorkToDo) { - sys_sem_wait(&m_writerNotificationSem); - continue; + Writer *workload_usertraffic = nullptr; + bool workload_usertraffic_available = m_outgoingUserTraffic.moveFirstInto(workload_usertraffic); + if (workload_usertraffic_available) { + workload_usertraffic->progress(); + Diagnostics::ThreadPool::processed_outgoing_usertraffic++; + } + + Writer *workload_metatraffic = nullptr; + bool workload_metatraffic_available = m_outgoingMetaTraffic.moveFirstInto(workload_metatraffic); + if (workload_metatraffic_available) { + workload_metatraffic->progress(); + Diagnostics::ThreadPool::processed_outgoing_metatraffic++; } - workload->progress(); + if (workload_usertraffic_available || workload_metatraffic_available) { + continue; + } else { + THREAD_POOL_LOG("WriterWorker | User = %u, Meta = %u\r\n", + static_cast(Diagnostics::ThreadPool::processed_outgoing_usertraffic), + static_cast(Diagnostics::ThreadPool::processed_outgoing_metatraffic)); + updateDiagnostics(); + sys_sem_wait(&m_writerNotificationSem); + } } } void ThreadPool::readCallback(void *args, udp_pcb *target, pbuf *pbuf, - const ip_addr_t * /*addr*/, Ip4Port_t port) { + const ip_addr_t *addr, Ip4Port_t port) { auto &pool = *static_cast(args); PacketInfo packet; @@ -185,6 +274,11 @@ void ThreadPool::readCallback(void *args, udp_pcb *target, pbuf *pbuf, if (!pool.addNewPacket(std::move(packet))) { THREAD_POOL_LOG("ThreadPool: dropped packet\n"); + if (pool.isBuiltinPort(port)) { + rtps::Diagnostics::ThreadPool::dropped_incoming_packets_metatraffic++; + } else { + rtps::Diagnostics::ThreadPool::dropped_incoming_packets_usertraffic++; + } } } @@ -200,16 +294,31 @@ void ThreadPool::readerThreadFunction(void *arg) { } void ThreadPool::doReaderWork() { - + uint32_t metatraffic = 0; + uint32_t usertraffic = 0; while (m_running) { - PacketInfo packet; - auto isWorkToDo = m_queueIncoming.moveFirstInto(packet); - if (!isWorkToDo) { - sys_sem_wait(&m_readerNotificationSem); - continue; + PacketInfo packet_user; + auto isUserWorkToDo = m_incomingUserTraffic.moveFirstInto(packet_user); + if (isUserWorkToDo) { + Diagnostics::ThreadPool::processed_incoming_usertraffic++; + m_receiveJumppad(m_callee, const_cast(packet_user)); } - m_receiveJumppad(m_callee, const_cast(packet)); + PacketInfo packet_meta; + auto isMetaWorkToDo = m_incomingMetaTraffic.moveFirstInto(packet_meta); + if (isMetaWorkToDo) { + Diagnostics::ThreadPool::processed_incoming_metatraffic++; + m_receiveJumppad(m_callee, const_cast(packet_meta)); + } + + if (isUserWorkToDo || isMetaWorkToDo) { + continue; + } + THREAD_POOL_LOG("ReaderWorker | User = %u, Meta = %u\r\n", + static_cast(usertraffic), + static_cast(metatraffic)); + updateDiagnostics(); + sys_sem_wait(&m_readerNotificationSem); } } diff --git a/src/communication/UdpDriver.cpp b/src/communication/UdpDriver.cpp index 7e0e402e..32c2452f 100644 --- a/src/communication/UdpDriver.cpp +++ b/src/communication/UdpDriver.cpp @@ -38,7 +38,7 @@ using rtps::UdpDriver; if (true) { \ printf("[UDP Driver] "); \ printf(__VA_ARGS__); \ - printf("\n"); \ + printf("\r\n"); \ } #else #define UDP_DRIVER_LOG(...) // diff --git a/src/discovery/ParticipantProxyData.cpp b/src/discovery/ParticipantProxyData.cpp index 46113607..5ba2b481 100644 --- a/src/discovery/ParticipantProxyData.cpp +++ b/src/discovery/ParticipantProxyData.cpp @@ -34,6 +34,7 @@ void ParticipantProxyData::reset() { m_guid = Guid_t{GUIDPREFIX_UNKNOWN, ENTITYID_UNKNOWN}; m_manualLivelinessCount = Count_t{1}; m_expectsInlineQos = false; + onAliveSignal(); for (int i = 0; i < Config::SPDP_MAX_NUM_LOCATORS; ++i) { m_metatrafficUnicastLocatorList[i].setInvalid(); m_metatrafficMulticastLocatorList[i].setInvalid(); @@ -173,12 +174,12 @@ bool ParticipantProxyData::readLocatorIntoList( if (ret && (full_length_locator.isSameSubnet() || full_length_locator.isMulticastAddress())) { proxy_locator = LocatorIPv4(full_length_locator); - SPDP_LOG("Adding locator: %u %u %u %u \n", + SPDP_LOG("Adding locator: %u %u %u %u", (int)proxy_locator.address[0], (int)proxy_locator.address[1], (int)proxy_locator.address[2], (int)proxy_locator.address[3]); return true; } else { - SPDP_LOG("Ignoring locator: %u %u %u %u \n", + SPDP_LOG("Ignoring locator: %u %u %u %u", (int)full_length_locator.address[12], (int)full_length_locator.address[13], (int)full_length_locator.address[14], diff --git a/src/discovery/SEDPAgent.cpp b/src/discovery/SEDPAgent.cpp index 7462f93a..49aa154f 100644 --- a/src/discovery/SEDPAgent.cpp +++ b/src/discovery/SEDPAgent.cpp @@ -38,7 +38,7 @@ using rtps::SEDPAgent; if (true) { \ printf("[SEDP] "); \ printf(__VA_ARGS__); \ - printf("\n"); \ + printf("\r\n"); \ } #else #define SEDP_LOG(...) // @@ -103,7 +103,7 @@ void SEDPAgent::handlePublisherReaderMessage(const ReaderCacheChange &change) { TopicData topicData; if (topicData.readFromUcdrBuffer(cdrBuffer)) { - handlePublisherReaderMessage(topicData); + handlePublisherReaderMessage(topicData, change); } } @@ -156,6 +156,7 @@ void SEDPAgent::removeUnmatchedEntity(const Guid_t &guid) { void SEDPAgent::removeUnmatchedEntitiesOfParticipant( const GuidPrefix_t &guidPrefix) { + Lock{m_mutex}; auto isElementToRemove = [&](const TopicDataCompressed &topicData) { return topicData.endpointGuid.prefix == guidPrefix; }; @@ -176,7 +177,8 @@ uint32_t SEDPAgent::getNumRemoteUnmatchedWriters() { return m_unmatchedRemoteWriters.getNumElements(); } -void SEDPAgent::handlePublisherReaderMessage(const TopicData &writerData) { +void SEDPAgent::handlePublisherReaderMessage(const TopicData &writerData, + const ReaderCacheChange &change) { // TODO Is it okay to add Endpoint if the respective participant is unknown // participant? if (!m_part->findRemoteParticipant(writerData.endpointGuid.prefix)) { @@ -184,7 +186,7 @@ void SEDPAgent::handlePublisherReaderMessage(const TopicData &writerData) { } if (writerData.isDisposedFlagSet() || writerData.isUnregisteredFlagSet()) { - handleRemoteEndpointDeletion(writerData); + handleRemoteEndpointDeletion(writerData, change); return; } @@ -236,11 +238,16 @@ void SEDPAgent::handleSubscriptionReaderMessage( TopicData topicData; if (topicData.readFromUcdrBuffer(cdrBuffer)) { - handleSubscriptionReaderMessage(topicData); + handleSubscriptionReaderMessage(topicData, change); } } -void SEDPAgent::handleRemoteEndpointDeletion(const TopicData &topic) { +void SEDPAgent::handleRemoteEndpointDeletion(const TopicData &topic, + const ReaderCacheChange &change) { + SEDP_LOG("Endpoint deletion message SN %u.%u GUID %u %u %u %u \r\n", + (int)change.sn.high, (int)change.sn.low, + change.writerGuid.prefix.id[0], change.writerGuid.prefix.id[1], + change.writerGuid.prefix.id[2], change.writerGuid.prefix.id[3]); if (!topic.entityIdFromKeyHashValid) { return; } @@ -256,13 +263,14 @@ void SEDPAgent::handleRemoteEndpointDeletion(const TopicData &topic) { removeUnmatchedEntity(guid); } -void SEDPAgent::handleSubscriptionReaderMessage(const TopicData &readerData) { +void SEDPAgent::handleSubscriptionReaderMessage( + const TopicData &readerData, const ReaderCacheChange &change) { if (!m_part->findRemoteParticipant(readerData.endpointGuid.prefix)) { return; } if (readerData.isDisposedFlagSet() || readerData.isUnregisteredFlagSet()) { - handleRemoteEndpointDeletion(readerData); + handleRemoteEndpointDeletion(readerData, change); return; } @@ -329,15 +337,15 @@ void SEDPAgent::tryMatchUnmatchedEndpoints() { } } -void SEDPAgent::addWriter(Writer &writer) { +bool SEDPAgent::addWriter(Writer &writer) { if (m_endpoints.sedpPubWriter == nullptr) { - return; + return true; } EntityKind_t writerKind = writer.m_attributes.endpointGuid.entityId.entityKind; if (writerKind == EntityKind_t::BUILD_IN_WRITER_WITH_KEY || writerKind == EntityKind_t::BUILD_IN_WRITER_WITHOUT_KEY) { - return; // No need to announce builtin endpoints + return true; // No need to announce builtin endpoints } Lock lock{m_mutex}; @@ -358,6 +366,7 @@ void SEDPAgent::addWriter(Writer &writer) { auto change = m_endpoints.sedpPubWriter->newChange( ChangeKind_t::ALIVE, m_buffer, ucdr_buffer_length(µbuffer)); writer.setSEDPSequenceNumber(change->sequenceNumber); + return (change != nullptr); #if SEDP_VERBOSE SEDP_LOG("Added new change to sedpPubWriter.\n"); #endif @@ -382,14 +391,8 @@ bool SEDPAgent::announceEndpointDeletion(A *local_endpoint, ucdr_serialize_array_uint8_t( µbuffer, local_endpoint->m_attributes.endpointGuid.prefix.id.data(), sizeof(GuidPrefix_t::id)); - ucdr_serialize_array_uint8_t( - µbuffer, - local_endpoint->m_attributes.endpointGuid.entityId.entityKey.data(), - sizeof(EntityId_t::entityKey)); - ucdr_serialize_uint8_t( - µbuffer, - static_cast( - local_endpoint->m_attributes.endpointGuid.entityId.entityKind)); + ucdr_serialize_array_uint8_t(µbuffer, local_endpoint->m_attributes.endpointGuid.entityId.entityKey.data(), 3); + ucdr_serialize_uint8_t(µbuffer, static_cast(local_endpoint->m_attributes.endpointGuid.entityId.entityKind)); ucdr_serialize_uint16_t(µbuffer, ParameterId::PID_STATUS_INFO); ucdr_serialize_uint16_t(µbuffer, static_cast(4)); @@ -409,6 +412,8 @@ bool SEDPAgent::announceEndpointDeletion(A *local_endpoint, auto ret = sedp_endpoint->newChange(ChangeKind_t::ALIVE, m_buffer, ucdr_buffer_length(µbuffer), true, true); + SEDP_LOG("Annoucing endpoint delete, SN = %u.%u\r\n", + (int)ret->sequenceNumber.low, (int)ret->sequenceNumber.high); return (ret != nullptr); } @@ -448,6 +453,8 @@ bool SEDPAgent::deleteReader(Reader *reader) { return false; } + // Move all matched proxies of this endpoint to the list of unmatched + // endpoints reader->dumpAllProxies(SEDPAgent::jumppadTakeProxyOfDisposedReader, this); return true; @@ -465,21 +472,23 @@ bool SEDPAgent::deleteWriter(Writer *writer) { return false; } + // Move all matched proxies of this endpoint to the list of unmatched + // endpoints writer->dumpAllProxies(SEDPAgent::jumppadTakeProxyOfDisposedWriter, this); return true; } -void SEDPAgent::addReader(Reader &reader) { +bool SEDPAgent::addReader(Reader &reader) { if (m_endpoints.sedpSubWriter == nullptr) { - return; + return true; } EntityKind_t readerKind = reader.m_attributes.endpointGuid.entityId.entityKind; if (readerKind == EntityKind_t::BUILD_IN_READER_WITH_KEY || readerKind == EntityKind_t::BUILD_IN_READER_WITHOUT_KEY) { - return; // No need to announce builtin endpoints + return true; // No need to announce builtin endpoints } Lock lock{m_mutex}; @@ -500,6 +509,7 @@ void SEDPAgent::addReader(Reader &reader) { auto change = m_endpoints.sedpSubWriter->newChange( ChangeKind_t::ALIVE, m_buffer, ucdr_buffer_length(µbuffer)); reader.setSEDPSequenceNumber(change->sequenceNumber); + return (change != nullptr); #if SEDP_VERBOSE SEDP_LOG("Added new change to sedpSubWriter.\n"); #endif diff --git a/src/discovery/SPDPAgent.cpp b/src/discovery/SPDPAgent.cpp index 2476f1ea..dabc5fc4 100644 --- a/src/discovery/SPDPAgent.cpp +++ b/src/discovery/SPDPAgent.cpp @@ -57,8 +57,9 @@ void SPDPAgent::start() { return; } m_running = true; - sys_thread_new("SPDPThread", runBroadcast, this, - Config::SPDP_WRITER_STACKSIZE, Config::SPDP_WRITER_PRIO); + auto t = + sys_thread_new("SPDPThread", runBroadcast, this, + Config::SPDP_WRITER_STACKSIZE, Config::SPDP_WRITER_PRIO); } void SPDPAgent::stop() { m_running = false; } @@ -145,12 +146,12 @@ void SPDPAgent::processProxyData() { return; // Our own packet } + SPDP_LOG("Message from GUID = %u %u %u %u", m_proxyDataBuffer.m_guid.prefix.id[4], m_proxyDataBuffer.m_guid.prefix.id[5], m_proxyDataBuffer.m_guid.prefix.id[6], m_proxyDataBuffer.m_guid.prefix.id[7]); const rtps::ParticipantProxyData *remote_part; remote_part = mp_participant->findRemoteParticipant(m_proxyDataBuffer.m_guid.prefix); if (remote_part != nullptr) { - SPDP_LOG("Not adding remote participant guid.prefix = %u \n", - (unsigned int)Guid_t::sum(remote_part->m_guid)); + SPDP_LOG("Not adding this participant"); mp_participant->refreshRemoteParticipantLiveliness( m_proxyDataBuffer.m_guid.prefix); return; // Already in our list @@ -196,23 +197,6 @@ bool SPDPAgent::addProxiesForBuiltInEndpoints() { ip4_addr_t ip4addr = locator->getIp4Address(); const char *addr = ip4addr_ntoa(&ip4addr); #endif - SPDP_LOG("Adding IPv4 Locator %s\n", addr); - - if (m_proxyDataBuffer.hasPublicationWriter()) { - const WriterProxy proxy{{m_proxyDataBuffer.m_guid.prefix, - ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER}, - *locator, - true}; - m_buildInEndpoints.sedpPubReader->addNewMatchedWriter(proxy); - } - - if (m_proxyDataBuffer.hasSubscriptionWriter()) { - const WriterProxy proxy{{m_proxyDataBuffer.m_guid.prefix, - ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER}, - *locator, - true}; - m_buildInEndpoints.sedpSubReader->addNewMatchedWriter(proxy); - } if (m_proxyDataBuffer.hasPublicationReader()) { const ReaderProxy proxy{{m_proxyDataBuffer.m_guid.prefix, @@ -230,6 +214,24 @@ bool SPDPAgent::addProxiesForBuiltInEndpoints() { m_buildInEndpoints.sedpSubWriter->addNewMatchedReader(proxy); } + if (m_proxyDataBuffer.hasPublicationWriter()) { + const WriterProxy proxy{{m_proxyDataBuffer.m_guid.prefix, + ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER}, + *locator, + true}; + m_buildInEndpoints.sedpPubReader->addNewMatchedWriter(proxy); + m_buildInEndpoints.sedpPubReader->sendPreemptiveAckNack(proxy); + } + + if (m_proxyDataBuffer.hasSubscriptionWriter()) { + const WriterProxy proxy{{m_proxyDataBuffer.m_guid.prefix, + ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER}, + *locator, + true}; + m_buildInEndpoints.sedpSubReader->addNewMatchedWriter(proxy); + m_buildInEndpoints.sedpPubReader->sendPreemptiveAckNack(proxy); + } + return true; } diff --git a/src/entities/Domain.cpp b/src/entities/Domain.cpp index 4a5c9e3d..c77a05d8 100644 --- a/src/entities/Domain.cpp +++ b/src/entities/Domain.cpp @@ -229,6 +229,7 @@ void Domain::createBuiltinWritersAndReaders(Participant &part) { void Domain::registerPort(const Participant &part) { m_transport.createUdpConnection(getUserUnicastPort(part.m_participantId)); m_transport.createUdpConnection(getBuiltInUnicastPort(part.m_participantId)); + m_threadPool.addBuiltinPort(getBuiltInUnicastPort(part.m_participantId)); } void Domain::registerMulticastPort(FullLengthLocator mcastLocator) { @@ -468,6 +469,9 @@ rtps::Reader *Domain::createReader(Participant &part, const char *topicName, bool rtps::Domain::deleteReader(Participant &part, Reader *reader) { Lock{m_mutex}; + if(reader == nullptr || !reader->isInitialized()){ + return false; + } if (!part.deleteReader(reader)) { return false; } @@ -478,6 +482,9 @@ bool rtps::Domain::deleteReader(Participant &part, Reader *reader) { bool rtps::Domain::deleteWriter(Participant &part, Writer *writer) { Lock{m_mutex}; + if(writer == nullptr || !writer->isInitialized()){ + return false; + } if (!part.deleteWriter(writer)) { return false; } @@ -488,21 +495,23 @@ bool rtps::Domain::deleteWriter(Participant &part, Writer *writer) { void rtps::Domain::printInfo() { for (unsigned int i = 0; i < m_participants.size(); i++) { - printf("Participant %u\n", i); + DOMAIN_LOG("Participant %u\r\n", i); m_participants[i].printInfo(); } } rtps::GuidPrefix_t Domain::generateGuidPrefix(ParticipantId_t id) const { GuidPrefix_t prefix; - if(Config::BASE_GUID_PREFIX == GUID_RANDOM){ - for (unsigned int i = 0; i < rtps::Config::BASE_GUID_PREFIX.id.size(); i++) { - prefix.id[i] = rand(); - } - }else{ - for (unsigned int i = 0; i < rtps::Config::BASE_GUID_PREFIX.id.size(); i++) { - prefix.id[i] = Config::BASE_GUID_PREFIX.id[i]; - } + if (Config::BASE_GUID_PREFIX == GUID_RANDOM) { + for (unsigned int i = 0; i < rtps::Config::BASE_GUID_PREFIX.id.size(); + i++) { + prefix.id[i] = rand(); + } + } else { + for (unsigned int i = 0; i < rtps::Config::BASE_GUID_PREFIX.id.size(); + i++) { + prefix.id[i] = Config::BASE_GUID_PREFIX.id[i]; + } } return prefix; } diff --git a/src/entities/Participant.cpp b/src/entities/Participant.cpp index 36c7537e..d85ab229 100644 --- a/src/entities/Participant.cpp +++ b/src/entities/Participant.cpp @@ -34,7 +34,7 @@ Author: i11 - Embedded Software, RWTH Aachen University if (true) { \ printf("[Participant] "); \ printf(__VA_ARGS__); \ - printf("\n"); \ + printf("\r\n"); \ } #else #define PARTICIPANT_LOG(...) // @@ -403,92 +403,118 @@ rtps::MessageReceiver *Participant::getMessageReceiver() { return &m_receiver; } bool Participant::checkAndResetHeartbeats() { Lock{m_mutex}; - PARTICIPANT_LOG("Have %u remote participants\n", + Lock{m_spdpAgent.m_mutex}; + PARTICIPANT_LOG("Have %u remote participants", (unsigned int)m_remoteParticipants.getNumElements()); PARTICIPANT_LOG( - "Unmatched remote writers/readers, %u / %u\n", + "Unmatched remote writers/readers, %u / %u", static_cast(m_sedpAgent.getNumRemoteUnmatchedWriters()), static_cast(m_sedpAgent.getNumRemoteUnmatchedReaders())); for (auto &remote : m_remoteParticipants) { - PARTICIPANT_LOG("remote participant age = %u\n", - (unsigned int)remote.getAliveSignalAgeInMilliseconds()); + PARTICIPANT_LOG("Remote GUID = %u %u %u %u | Age = %u [ms]", + remote.m_guid.prefix.id[4], remote.m_guid.prefix.id[5], remote.m_guid.prefix.id[6], remote.m_guid.prefix.id[7], (unsigned int)remote.getAliveSignalAgeInMilliseconds() ); if (remote.isAlive()) { - PARTICIPANT_LOG("remote participant is alive\n"); continue; } - PARTICIPANT_LOG("removing remote participant\n"); + PARTICIPANT_LOG("removing remote participant"); bool success = removeRemoteParticipant(remote.m_guid.prefix); if (!success) { return false; + }else{ + return true; } } return true; } void Participant::printInfo() { + + uint32_t max_reader_proxies = 0; for (unsigned int i = 0; i < m_readers.size(); i++) { if (m_readers[i] != nullptr && m_readers[i]->isInitialized()) { if (m_hasBuilInEndpoints && i < 3) { +#ifdef PARTICIPANT_PRINTINFO_LONG if (m_readers[i]->m_attributes.endpointGuid.entityId == ENTITYID_SPDP_BUILTIN_PARTICIPANT_READER) { - printf("Reader %u: SPDP BUILTIN READER | Remote Proxies = %u \n ", i, - static_cast(m_readers[i]->getProxiesCount())); + printf("Reader %u: SPDP BUILTIN READER | Remote Proxies = %u \r\n ", + i, static_cast(m_readers[i]->getProxiesCount())); } if (m_readers[i]->m_attributes.endpointGuid.entityId == ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER) { - printf("Reader %u: SEDP PUBLICATION READER | Remote Proxies = %u \n ", - i, static_cast(m_readers[i]->getProxiesCount())); + printf( + "Reader %u: SEDP PUBLICATION READER | Remote Proxies = %u \r\n ", + i, static_cast(m_readers[i]->getProxiesCount())); } if (m_readers[i]->m_attributes.endpointGuid.entityId == ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER) { - printf("Reader %u: SEDP SUBSCRIPTION READER | Remote Proxies = %u \n", - i, static_cast(m_readers[i]->getProxiesCount())); + printf( + "Reader %u: SEDP SUBSCRIPTION READER | Remote Proxies = %u \r\n", + i, static_cast(m_readers[i]->getProxiesCount())); } +#endif continue; } + + max_reader_proxies = + std::max(max_reader_proxies, m_readers[i]->getProxiesCount()); +#ifdef PARTICIPANT_PRINTINFO_LONG printf("Reader %u: Topic = %s | Type = %s | Remote Proxies = %u | SEDP " - "SN = %u \n", + "SN = %u \r\n ", i, m_readers[i]->m_attributes.topicName, m_readers[i]->m_attributes.typeName, static_cast(m_readers[i]->getProxiesCount()), static_cast(m_readers[i]->getSEDPSequenceNumber().low)); +#endif } } + uint32_t max_writer_proxies = 0; for (unsigned int i = 0; i < m_writers.size(); i++) { + if (m_hasBuilInEndpoints && i < 3) { +#ifdef PARTICIPANT_PRINTINFO_LONG if (m_writers[i]->m_attributes.endpointGuid.entityId == ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER) { - printf("Writer %u: SPDP WRITER | Remote Proxies = %u \n ", i, + printf("Writer %u: SPDP WRITER | Remote Proxies = %u \r\n ", i, static_cast(m_writers[i]->getProxiesCount())); } if (m_writers[i]->m_attributes.endpointGuid.entityId == ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER) { - printf("Writer %u: SEDP PUBLICATION WRITER | Remote Proxies = %u \n", - i, static_cast(m_writers[i]->getProxiesCount())); + printf( + "Writer %u: SEDP PUBLICATION WRITER | Remote Proxies = %u \r\n ", + i, static_cast(m_writers[i]->getProxiesCount())); } if (m_writers[i]->m_attributes.endpointGuid.entityId == ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER) { - printf("Writer %u: SEDP SUBSCRIPTION WRITER | Remote Proxies = %u \n", - i, static_cast(m_writers[i]->getProxiesCount())); + printf( + "Writer %u: SEDP SUBSCRIPTION WRITER | Remote Proxies = %u \r\n ", + i, static_cast(m_writers[i]->getProxiesCount())); } +#endif continue; } + if (m_writers[i] != nullptr && m_writers[i]->isInitialized()) { + max_writer_proxies = + std::max(max_writer_proxies, m_writers[i]->getProxiesCount()); +#ifdef PARTICIPANT_PRINTINFO_LONG printf("Writer %u: Topic = %s | Type = %s | Remote Proxies = %u | SEDP " - "SN = %u \n", + "SN = %u \r\n ", i, m_writers[i]->m_attributes.topicName, m_writers[i]->m_attributes.typeName, static_cast(m_writers[i]->getProxiesCount()), static_cast(m_writers[i]->getSEDPSequenceNumber().low)); +#endif } } - printf("Unmatched Remote Readers = %u\n", + printf("Max Writer Proxies %u \r\n ", max_writer_proxies); + printf("Max Reader Proxies %u \r\n ", max_reader_proxies); + printf("Unmatched Remote Readers = %u\r\n", static_cast(m_sedpAgent.getNumRemoteUnmatchedReaders())); - printf("Unmatched Remote Writers = %u\n", + printf("Unmatched Remote Writers = %u \r\n ", static_cast(m_sedpAgent.getNumRemoteUnmatchedWriters())); - printf("Remote Participants = %u\n", + printf("Remote Participants = %u \r\n ", static_cast(m_remoteParticipants.getNumElements())); } @@ -510,5 +536,7 @@ void Participant::addBuiltInEndpoints(BuiltInEndpoints &endpoints) { } void Participant::newMessage(const uint8_t *data, DataSize_t size) { - m_receiver.processMessage(data, size); + if (!m_receiver.processMessage(data, size)) { + PARTICIPANT_LOG("MESSAGE PROCESSING FAILE \r\n"); + } } diff --git a/src/entities/Reader.cpp b/src/entities/Reader.cpp index e7d0fd01..58ab92d5 100644 --- a/src/entities/Reader.cpp +++ b/src/entities/Reader.cpp @@ -158,3 +158,7 @@ int rtps::Reader::dumpAllProxies(dumpProxyCallback target, void *arg) { } return dump_count; } + +bool rtps::Reader::sendPreemptiveAckNack(const WriterProxy &writer) { + return true; +} \ No newline at end of file diff --git a/src/entities/Writer.cpp b/src/entities/Writer.cpp index a9fec89b..da07719c 100644 --- a/src/entities/Writer.cpp +++ b/src/entities/Writer.cpp @@ -110,6 +110,13 @@ void rtps::Writer::removeAllProxiesOfParticipant( resetSendOptions(); } +bool rtps::Writer::isBuiltinEndpoint() { + return !(m_attributes.endpointGuid.entityId.entityKind == + EntityKind_t::USER_DEFINED_WRITER_WITHOUT_KEY || + m_attributes.endpointGuid.entityId.entityKind == + EntityKind_t::USER_DEFINED_WRITER_WITH_KEY); +} + bool rtps::Writer::isIrrelevant(ChangeKind_t kind) const { // Right now we only allow alive changes // return kind == ChangeKind_t::INVALID || (m_topicKind == TopicKind_t::NO_KEY diff --git a/src/messages/MessageTypes.cpp b/src/messages/MessageTypes.cpp index 0012b62c..059d50b2 100644 --- a/src/messages/MessageTypes.cpp +++ b/src/messages/MessageTypes.cpp @@ -209,7 +209,7 @@ bool rtps::deserializeMessage(const MessageProcessingInfo &info, doCopyAndMoveOn(reinterpret_cast(&msg.gapStart.low), currentPos, sizeof(msg.gapStart.low)); - size_t num_bitfields = + size_t num_bitfields = msg.header.octetsToNextHeader - 4 - 4 - 8 - 8 - 4; remainingSizeAtBeginning - (currentPos - info.getPointerToCurrentPos()); deserializeSNS(currentPos, msg.gapList, num_bitfields); diff --git a/src/storages/PBufWrapper.cpp b/src/storages/PBufWrapper.cpp index a99550c0..c183f049 100644 --- a/src/storages/PBufWrapper.cpp +++ b/src/storages/PBufWrapper.cpp @@ -22,6 +22,9 @@ This file is part of embeddedRTPS. Author: i11 - Embedded Software, RWTH Aachen University */ +// Copyright 2023 Apex.AI, Inc. +// All rights reserved. + #include "rtps/storages/PBufWrapper.h" #include "rtps/utils/Log.h" @@ -51,24 +54,11 @@ PBufWrapper::PBufWrapper(DataSize_t length) } } -// TODO: Uses copy assignment. Improvement possible -PBufWrapper::PBufWrapper(const PBufWrapper &other) { *this = other; } - // TODO: Uses move assignment. Improvement possible PBufWrapper::PBufWrapper(PBufWrapper &&other) noexcept { *this = std::move(other); } -PBufWrapper &PBufWrapper::operator=(const PBufWrapper &other) { - copySimpleMembersAndResetBuffer(other); - - if (other.firstElement != nullptr) { - pbuf_ref(other.firstElement); - } - firstElement = other.firstElement; - return *this; -} - PBufWrapper &PBufWrapper::operator=(PBufWrapper &&other) noexcept { copySimpleMembersAndResetBuffer(other); @@ -88,26 +78,17 @@ void PBufWrapper::copySimpleMembersAndResetBuffer(const PBufWrapper &other) { } } -PBufWrapper::~PBufWrapper() { +void PBufWrapper::destroy() +{ if (firstElement != nullptr) { pbuf_free(firstElement); + firstElement = nullptr; } + m_freeSpace = 0; } -PBufWrapper PBufWrapper::deepCopy() const { - PBufWrapper clone; - clone.copySimpleMembersAndResetBuffer(*this); - - // Decided not to use pbuf_clone because it prevents const - clone.firstElement = pbuf_alloc(m_layer, this->firstElement->tot_len, m_type); - if (clone.firstElement != nullptr) { - if (pbuf_copy(clone.firstElement, this->firstElement) != ERR_OK) { - PBUF_WRAP_LOG("PBufWrapper::deepCopy: Copy of pbuf failed"); - } - } else { - clone.m_freeSpace = 0; - } - return clone; +PBufWrapper::~PBufWrapper() { + destroy(); } bool PBufWrapper::isValid() const { return firstElement != nullptr; } @@ -131,29 +112,24 @@ bool PBufWrapper::append(const uint8_t *data, DataSize_t length) { if (err != ERR_OK) { return false; } - m_freeSpace -= length; return true; } -void PBufWrapper::append(PBufWrapper &&other) { - if (this == &other) { - return; - } +void PBufWrapper::append(const PBufWrapper &other) { if (this->firstElement == nullptr) { - *this = std::move(other); + m_freeSpace = other.m_freeSpace; + this->firstElement = other.firstElement; + pbuf_ref(this->firstElement); return; } - m_freeSpace = other.m_freeSpace; - pbuf *const newElement = other.firstElement; - pbuf_cat(this->firstElement, newElement); - - other.firstElement = nullptr; + m_freeSpace += other.m_freeSpace; + pbuf_chain(this->firstElement, other.firstElement); } bool PBufWrapper::reserve(DataSize_t length) { - auto additionalAllocation = length - m_freeSpace; + int16_t additionalAllocation = length - m_freeSpace; if (additionalAllocation <= 0) { return true; } diff --git a/src/utils/Diagnostics.cpp b/src/utils/Diagnostics.cpp new file mode 100644 index 00000000..2f50f0c1 --- /dev/null +++ b/src/utils/Diagnostics.cpp @@ -0,0 +1,53 @@ +#include + +namespace rtps { +namespace Diagnostics { + +namespace ThreadPool { +uint32_t dropped_incoming_packets_usertraffic = 0; +uint32_t dropped_incoming_packets_metatraffic = 0; + +uint32_t dropped_outgoing_packets_usertraffic = 0; +uint32_t dropped_outgoing_packets_metatraffic = 0; + +uint32_t processed_incoming_metatraffic = 0; +uint32_t processed_outgoing_metatraffic = 0; +uint32_t processed_incoming_usertraffic = 0; +uint32_t processed_outgoing_usertraffic = 0; + +uint32_t max_ever_elements_outgoing_usertraffic_queue; +uint32_t max_ever_elements_incoming_usertraffic_queue; + +uint32_t max_ever_elements_outgoing_metatraffic_queue; +uint32_t max_ever_elements_incoming_metatraffic_queue; + +} // namespace ThreadPool + +namespace StatefulReader { +uint32_t sfr_unexpected_sn; +uint32_t sfr_retransmit_requests; +} // namespace StatefulReader + +namespace Network { +uint32_t lwip_allocation_failures; +} + +namespace SEDP { +uint32_t max_ever_remote_participants; +uint32_t current_remote_participants; + +uint32_t max_ever_matched_reader_proxies; +uint32_t current_max_matched_reader_proxies; + +uint32_t max_ever_matched_writer_proxies; +uint32_t current_max_matched_writer_proxies; + +uint32_t max_ever_unmatched_reader_proxies; +uint32_t current_max_unmatched_reader_proxies; + +uint32_t max_ever_unmatched_writer_proxies; +uint32_t current_max_unmatched_writer_proxies; +} // namespace SEDP + +} // namespace Diagnostics +} // namespace rtps