Skip to content

Commit 866a71f

Browse files
Feature/discovery rework (#28)
* WIP cleaning up * linting * R5 config * STM32 config * removing printf * removing artifact * Update SPDPAgent.cpp * memory pool bug fix, fixes regarding robustness when removing remote participants * serialization fix for aurix/tasking compiler * some more discovery edge cases * Refactor PBufWrapper move/copy implementations PBuf reference counting is done inside LwIP. PBufWrapper should be transparent implementation without intrusive reference counting. This commit makes `PBufWrapper` class movable and non-copyable. Signed-off-by: Alexander Livenets <[email protected]> * state after unicaragil final event * Extending CacheChange API as changes to PBufWrapper caused compile issues in HistoryCacheWithDeletion * Updating default STM config --------- Signed-off-by: Alexandru Kampmann <[email protected]> Co-authored-by: Alexander Livenets <[email protected]>
1 parent 2e6ca4b commit 866a71f

39 files changed

+961
-329
lines changed

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
#Folders
2+
*.vscode
23
.idea
34
build
45
cmake-build-debug
56
cmake-build-release
67
thirdparty/Micro-CDR/CMakeFiles
78
include/rtps/config.h
8-
CMakeFiles
9+
CMakeFiles

include/rtps/ThreadPool.h

+22-5
Original file line numberDiff line numberDiff line change
@@ -56,22 +56,39 @@ class ThreadPool {
5656
static void readCallback(void *arg, udp_pcb *pcb, pbuf *p,
5757
const ip_addr_t *addr, Ip4Port_t port);
5858

59+
bool addBuiltinPort(const Ip4Port_t &port);
60+
5961
private:
6062
receiveJumppad_fp m_receiveJumppad;
6163
void *m_callee;
6264
bool m_running = false;
6365
std::array<sys_thread_t, Config::THREAD_POOL_NUM_WRITERS> m_writers;
6466
std::array<sys_thread_t, Config::THREAD_POOL_NUM_READERS> m_readers;
6567

68+
std::array<Ip4Port_t, 2 * Config::MAX_NUM_PARTICIPANTS> m_builtinPorts;
69+
size_t m_builtinPortsIdx = 0;
70+
6671
sys_sem_t m_readerNotificationSem;
6772
sys_sem_t m_writerNotificationSem;
6873

69-
ThreadSafeCircularBuffer<Writer *, Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH>
70-
m_queueOutgoing;
71-
ThreadSafeCircularBuffer<PacketInfo,
72-
Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH>
73-
m_queueIncoming;
74+
void updateDiagnostics();
75+
76+
using BufferUsertrafficOutgoing = ThreadSafeCircularBuffer<
77+
Writer *, Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH_USERTRAFFIC>;
78+
using BufferMetatrafficOutgoing = ThreadSafeCircularBuffer<
79+
Writer *, Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH_METATRAFFIC>;
80+
using BufferUsertrafficIncoming = ThreadSafeCircularBuffer<
81+
PacketInfo, Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH_USERTRAFFIC>;
82+
using BufferMetatrafficIncoming = ThreadSafeCircularBuffer<
83+
PacketInfo, Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH_METATRAFFIC>;
84+
85+
BufferUsertrafficOutgoing m_outgoingUserTraffic;
86+
BufferMetatrafficOutgoing m_outgoingMetaTraffic;
87+
88+
BufferUsertrafficIncoming m_incomingUserTraffic;
89+
BufferMetatrafficIncoming m_incomingMetaTraffic;
7490

91+
bool isBuiltinPort(const Ip4Port_t &port);
7592
static void writerThreadFunction(void *arg);
7693
static void readerThreadFunction(void *arg);
7794
void doWriterWork();

include/rtps/common/types.h

+13-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ Author: i11 - Embedded Software, RWTH Aachen University
3030
#include <array>
3131
#include <cstdint>
3232
#include <initializer_list>
33+
#include <limits>
3334

3435
// TODO subnamespaces
3536
namespace rtps {
@@ -174,14 +175,25 @@ struct SequenceNumber_t {
174175
return *this;
175176
}
176177

178+
SequenceNumber_t &operator--() {
179+
if (low == 0) {
180+
--high;
181+
low = std::numeric_limits<decltype(low)>::max();
182+
} else {
183+
--low;
184+
}
185+
186+
return *this;
187+
}
188+
177189
SequenceNumber_t operator++(int) {
178190
SequenceNumber_t tmp(*this);
179191
++*this;
180192
return tmp;
181193
}
182194
};
183195

184-
#define SNS_MAX_NUM_BITS 32
196+
#define SNS_MAX_NUM_BITS 256
185197
#define SNS_NUM_BYTES (SNS_MAX_NUM_BITS / 8)
186198
static_assert(!(SNS_MAX_NUM_BITS % 32) && SNS_MAX_NUM_BITS != 0,
187199
"SNS_MAX_NUM_BITS must be multiple of 32");

include/rtps/communication/PacketInfo.h

+4-5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ This file is part of embeddedRTPS.
2222
Author: i11 - Embedded Software, RWTH Aachen University
2323
*/
2424

25+
// Copyright 2023 Apex.AI, Inc.
26+
// All rights reserved.
27+
2528
#ifndef RTPS_PACKETINFO_H
2629
#define RTPS_PACKETINFO_H
2730

@@ -45,11 +48,7 @@ struct PacketInfo {
4548
PacketInfo() = default;
4649
~PacketInfo() = default;
4750

48-
PacketInfo &operator=(const PacketInfo &other) {
49-
copyTriviallyCopyable(other);
50-
this->buffer = other.buffer;
51-
return *this;
52-
}
51+
PacketInfo &operator=(const PacketInfo &other) = delete;
5352

5453
PacketInfo &operator=(PacketInfo &&other) noexcept {
5554
copyTriviallyCopyable(other);

include/rtps/config_r5.h

+24-21
Original file line numberDiff line numberDiff line change
@@ -10,51 +10,51 @@ namespace rtps {
1010
namespace Config {
1111
const VendorId_t VENDOR_ID = {13, 37};
1212
const std::array<uint8_t, 4> IP_ADDRESS = {
13-
137, 226, 8, 70}; // Needs to be set in lwipcfg.h too.
14-
const GuidPrefix_t BASE_GUID_PREFIX{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12};
13+
192, 168, 127, 9}; // Needs to be set in lwipcfg.h too.
14+
const GuidPrefix_t BASE_GUID_PREFIX = GUID_RANDOM;
1515

16-
const uint8_t DOMAIN_ID = 0; // 230 possible with UDP
16+
const uint8_t DOMAIN_ID = 0; // 230 possible with UDP
1717
const uint8_t NUM_STATELESS_WRITERS = 64;
1818
const uint8_t NUM_STATELESS_READERS = 64;
1919
const uint8_t NUM_STATEFUL_READERS = 4;
2020
const uint8_t NUM_STATEFUL_WRITERS = 4;
2121
const uint8_t MAX_NUM_PARTICIPANTS = 1;
2222
const uint8_t NUM_WRITERS_PER_PARTICIPANT =
23-
64; // 3 will be reserved for SPDP & SEDP
23+
64; // 3 will be reserved for SPDP & SEDP
2424
const uint8_t NUM_READERS_PER_PARTICIPANT =
25-
64; // 3 will be reserved for SPDP & SEDP
26-
const uint8_t NUM_WRITER_PROXIES_PER_READER = 60;
27-
const uint8_t NUM_READER_PROXIES_PER_WRITER = 60;
25+
64; // 3 will be reserved for SPDP & SEDP
26+
const uint8_t NUM_WRITER_PROXIES_PER_READER = 100;
27+
const uint8_t NUM_READER_PROXIES_PER_WRITER = 100;
2828

29-
const uint8_t MAX_NUM_UNMATCHED_REMOTE_WRITERS = 150;
30-
const uint8_t MAX_NUM_UNMATCHED_REMOTE_READERS = 150;
29+
const uint32_t MAX_NUM_UNMATCHED_REMOTE_WRITERS = 400;
30+
const uint32_t MAX_NUM_UNMATCHED_REMOTE_READERS = 400;
3131

3232
const uint8_t MAX_NUM_READER_CALLBACKS = 5;
3333

3434
const uint8_t HISTORY_SIZE_STATELESS = 64;
35-
const uint8_t HISTORY_SIZE_STATEFUL = 64;
35+
const uint8_t HISTORY_SIZE_STATEFUL = 100;
3636

3737
const uint8_t MAX_TYPENAME_LENGTH = 64;
3838
const uint8_t MAX_TOPICNAME_LENGTH = 64;
3939

40-
const int HEARTBEAT_STACKSIZE = 1200; // byte
41-
const int THREAD_POOL_WRITER_STACKSIZE = 1100; // byte
42-
const int THREAD_POOL_READER_STACKSIZE = 3600; // byte
43-
const uint16_t SPDP_WRITER_STACKSIZE = 550; // byte
40+
const int HEARTBEAT_STACKSIZE = 1200; // byte
41+
const int THREAD_POOL_WRITER_STACKSIZE = 10000; // byte
42+
const int THREAD_POOL_READER_STACKSIZE = 32000; // byte
43+
const uint16_t SPDP_WRITER_STACKSIZE = 550; // byte
4444

4545
const uint16_t SF_WRITER_HB_PERIOD_MS = 4000;
4646
const uint16_t SPDP_RESEND_PERIOD_MS = 1000;
4747
const uint8_t SPDP_WRITER_PRIO = 3;
4848
const uint8_t SPDP_CYCLECOUNT_HEARTBEAT =
49-
2; // Every X*SPDP_RESEND_PERIOD_MS, check for missing heartbeats
49+
2; // Every X*SPDP_RESEND_PERIOD_MS, check for missing heartbeats
5050
const uint8_t SPDP_MAX_NUMBER_FOUND_PARTICIPANTS = 100;
5151
const uint8_t SPDP_MAX_NUM_LOCATORS = 1;
5252
const Duration_t SPDP_DEFAULT_REMOTE_LEASE_DURATION = {
53-
5, 0}; // Default lease duration for remote participants, usually
54-
// overwritten by remote info
53+
5, 0}; // Default lease duration for remote participants, usually
54+
// overwritten by remote info
5555
const Duration_t SPDP_MAX_REMOTE_LEASE_DURATION = {
5656
180,
57-
0}; // Absolute maximum lease duration, ignoring remote participant info
57+
0}; // Absolute maximum lease duration, ignoring remote participant info
5858

5959
const int MAX_NUM_UDP_CONNECTIONS = 10;
6060

@@ -64,12 +64,15 @@ const int THREAD_POOL_WRITER_PRIO = 3;
6464
const int THREAD_POOL_READER_PRIO = 3;
6565
const int THREAD_POOL_WORKLOAD_QUEUE_LENGTH = 10;
6666

67+
const int THREAD_POOL_WORKLOAD_QUEUE_LENGTH_USERTRAFFIC = 30;
68+
const int THREAD_POOL_WORKLOAD_QUEUE_LENGTH_METATRAFFIC = 30;
69+
6770
constexpr int OVERALL_HEAP_SIZE =
6871
THREAD_POOL_NUM_WRITERS * THREAD_POOL_WRITER_STACKSIZE +
6972
THREAD_POOL_NUM_READERS * THREAD_POOL_READER_STACKSIZE +
7073
MAX_NUM_PARTICIPANTS * SPDP_WRITER_STACKSIZE +
7174
NUM_STATEFUL_WRITERS * HEARTBEAT_STACKSIZE;
72-
} // namespace Config
73-
} // namespace rtps
75+
} // namespace Config
76+
} // namespace rtps
7477

75-
#endif // RTPS_CONFIG_H
78+
#endif // RTPS_CONFIG_H

include/rtps/config_stm.h

+36-21
Original file line numberDiff line numberDiff line change
@@ -37,46 +37,61 @@ namespace rtps {
3737
namespace Config {
3838
const VendorId_t VENDOR_ID = {13, 37};
3939
const std::array<uint8_t, 4> IP_ADDRESS = {
40-
192, 168, 0, 66}; // Needs to be set in lwipcfg.h too.
41-
const GuidPrefix_t BASE_GUID_PREFIX{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12};
40+
192, 168, 1, 103}; // Needs to be set in lwipcfg.h too.
41+
const GuidPrefix_t BASE_GUID_PREFIX{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 13};
4242

4343
const uint8_t DOMAIN_ID = 0; // 230 possible with UDP
44-
const uint8_t NUM_STATELESS_WRITERS = 2;
45-
const uint8_t NUM_STATELESS_READERS = 2;
44+
const uint8_t NUM_STATELESS_WRITERS = 5;
45+
const uint8_t NUM_STATELESS_READERS = 5;
4646
const uint8_t NUM_STATEFUL_READERS = 2;
4747
const uint8_t NUM_STATEFUL_WRITERS = 2;
4848
const uint8_t MAX_NUM_PARTICIPANTS = 1;
49-
const uint8_t NUM_WRITERS_PER_PARTICIPANT = 4;
50-
const uint8_t NUM_READERS_PER_PARTICIPANT = 4;
51-
const uint8_t NUM_WRITER_PROXIES_PER_READER = 3;
52-
const uint8_t NUM_READER_PROXIES_PER_WRITER = 3;
49+
const uint8_t NUM_WRITERS_PER_PARTICIPANT = 10;
50+
const uint8_t NUM_READERS_PER_PARTICIPANT = 10;
51+
const uint8_t NUM_WRITER_PROXIES_PER_READER = 6;
52+
const uint8_t NUM_READER_PROXIES_PER_WRITER = 6;
5353

54-
const uint8_t HISTORY_SIZE = 10;
54+
const uint8_t MAX_NUM_UNMATCHED_REMOTE_WRITERS = 50;
55+
const uint8_t MAX_NUM_UNMATCHED_REMOTE_READERS = 50;
56+
57+
const uint8_t MAX_NUM_READER_CALLBACKS = 5;
5558

56-
const uint8_t MAX_TYPENAME_LENGTH = 20;
57-
const uint8_t MAX_TOPICNAME_LENGTH = 20;
59+
60+
const uint8_t HISTORY_SIZE_STATELESS = 2;
61+
const uint8_t HISTORY_SIZE_STATEFUL = 10;
62+
63+
const uint8_t MAX_TYPENAME_LENGTH = 64;
64+
const uint8_t MAX_TOPICNAME_LENGTH = 64;
5865

5966
const int HEARTBEAT_STACKSIZE = 1200; // byte
6067
const int THREAD_POOL_WRITER_STACKSIZE = 1100; // byte
61-
const int THREAD_POOL_READER_STACKSIZE = 1600; // byte
62-
const uint16_t SPDP_WRITER_STACKSIZE = 550; // byte
68+
const int THREAD_POOL_READER_STACKSIZE = 3000; // byte
69+
const uint16_t SPDP_WRITER_STACKSIZE = 1000; // byte
6370

6471
const uint16_t SF_WRITER_HB_PERIOD_MS = 4000;
65-
const uint16_t SPDP_RESEND_PERIOD_MS = 10000;
72+
const uint16_t SPDP_RESEND_PERIOD_MS = 1000;
6673
const uint8_t SPDP_CYCLECOUNT_HEARTBEAT =
6774
2; // skip x SPDP rounds before checking liveliness
68-
const uint8_t SPDP_WRITER_PRIO = 3;
69-
const uint8_t SPDP_MAX_NUMBER_FOUND_PARTICIPANTS = 5;
70-
const uint8_t SPDP_MAX_NUM_LOCATORS = 5;
71-
const Duration_t SPDP_LEASE_DURATION = {100, 0};
75+
const uint8_t SPDP_WRITER_PRIO = 24;
76+
const uint8_t SPDP_MAX_NUMBER_FOUND_PARTICIPANTS = 10;
77+
const uint8_t SPDP_MAX_NUM_LOCATORS = 1;
78+
const Duration_t SPDP_DEFAULT_REMOTE_LEASE_DURATION = {
79+
5, 0}; // Default lease duration for remote participants, usually
80+
// overwritten by remote info
81+
const Duration_t SPDP_MAX_REMOTE_LEASE_DURATION = {
82+
180,
83+
0}; // Absolute maximum lease duration, ignoring remote participant info
84+
85+
const Duration_t SPDP_LEASE_DURATION = {5, 0};
7286

7387
const int MAX_NUM_UDP_CONNECTIONS = 10;
7488

7589
const int THREAD_POOL_NUM_WRITERS = 1;
7690
const int THREAD_POOL_NUM_READERS = 1;
77-
const int THREAD_POOL_WRITER_PRIO = 3;
78-
const int THREAD_POOL_READER_PRIO = 3;
79-
const int THREAD_POOL_WORKLOAD_QUEUE_LENGTH = 10;
91+
const int THREAD_POOL_WRITER_PRIO = 24;
92+
const int THREAD_POOL_READER_PRIO = 24;
93+
const int THREAD_POOL_WORKLOAD_QUEUE_LENGTH_USERTRAFFIC = 60;
94+
const int THREAD_POOL_WORKLOAD_QUEUE_LENGTH_METATRAFFIC = 60;
8095

8196
constexpr int OVERALL_HEAP_SIZE =
8297
THREAD_POOL_NUM_WRITERS * THREAD_POOL_WRITER_STACKSIZE +

include/rtps/discovery/SEDPAgent.h

+8-5
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ class Reader;
3838
class SEDPAgent {
3939
public:
4040
void init(Participant &part, const BuiltInEndpoints &endpoints);
41-
void addWriter(Writer &writer);
42-
void addReader(Reader &reader);
41+
bool addWriter(Writer &writer);
42+
bool addReader(Reader &reader);
4343
bool deleteReader(Reader *reader);
4444
bool deleteWriter(Writer *reader);
4545

@@ -54,8 +54,10 @@ class SEDPAgent {
5454
uint32_t getNumRemoteUnmatchedWriters();
5555

5656
protected: // For testing purposes
57-
void handlePublisherReaderMessage(const TopicData &writerData);
58-
void handleSubscriptionReaderMessage(const TopicData &writerData);
57+
void handlePublisherReaderMessage(const TopicData &writerData,
58+
const ReaderCacheChange &change);
59+
void handleSubscriptionReaderMessage(const TopicData &writerData,
60+
const ReaderCacheChange &change);
5961

6062
private:
6163
Participant *m_part;
@@ -82,7 +84,8 @@ class SEDPAgent {
8284
void addUnmatchedRemoteWriter(const TopicDataCompressed &writerData);
8385
void addUnmatchedRemoteReader(const TopicDataCompressed &readerData);
8486

85-
void handleRemoteEndpointDeletion(const TopicData &topic);
87+
void handleRemoteEndpointDeletion(const TopicData &topic,
88+
const ReaderCacheChange &change);
8689

8790
void (*mfp_onNewPublisherCallback)(void *arg) = nullptr;
8891
void *m_onNewPublisherArgs = nullptr;

include/rtps/discovery/SPDPAgent.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ Author: i11 - Embedded Software, RWTH Aachen University
3939
if (true) { \
4040
printf("[SPDP] "); \
4141
printf(__VA_ARGS__); \
42-
printf("\n"); \
42+
printf("\r\n"); \
4343
}
4444
#else
4545
#define SPDP_LOG(...) //
@@ -56,6 +56,7 @@ class SPDPAgent {
5656
void init(Participant &participant, BuiltInEndpoints &endpoints);
5757
void start();
5858
void stop();
59+
SemaphoreHandle_t m_mutex;
5960

6061
private:
6162
Participant *mp_participant = nullptr;
@@ -67,7 +68,6 @@ class SPDPAgent {
6768
ucdrBuffer m_microbuffer{};
6869
uint8_t m_cycleHB = 0;
6970

70-
SemaphoreHandle_t m_mutex;
7171
bool initialized = false;
7272
static void receiveCallback(void *callee,
7373
const ReaderCacheChange &cacheChange);

include/rtps/discovery/TopicData.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ struct TopicData {
6969
TopicData(Guid_t guid, ReliabilityKind_t reliability, FullLengthLocator loc)
7070
: endpointGuid(guid), typeName{'\0'}, topicName{'\0'},
7171
reliabilityKind(reliability),
72-
durabilityKind(DurabilityKind_t::VOLATILE), unicastLocator(loc) {
73-
}
72+
durabilityKind(DurabilityKind_t::VOLATILE), unicastLocator(loc) {}
73+
7474

7575
bool matchesTopicOf(const TopicData &other);
7676

include/rtps/entities/Reader.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,10 @@ class Reader {
110110
using dumpProxyCallback = void (*)(const Reader *reader, const WriterProxy &,
111111
void *arg);
112112

113-
//! Dangerous, only
114113
int dumpAllProxies(dumpProxyCallback target, void *arg);
115114

115+
virtual bool sendPreemptiveAckNack(const WriterProxy &writer);
116+
116117
protected:
117118
void executeCallbacks(const ReaderCacheChange &cacheChange);
118119
bool initMutex();

include/rtps/entities/ReaderProxy.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,19 @@ Author: i11 - Embedded Software, RWTH Aachen University
3131
namespace rtps {
3232
struct ReaderProxy {
3333
Guid_t remoteReaderGuid;
34+
Count_t ackNackCount = {0};
3435
LocatorIPv4 remoteLocator;
3536
bool is_reliable = false;
3637
LocatorIPv4 remoteMulticastLocator;
3738
bool useMulticast = false;
3839
bool suppressUnicast = false;
3940
bool unknown_eid = false;
40-
Count_t ackNackCount = {0};
4141
bool finalFlag = false;
4242
SequenceNumber_t lastAckNackSequenceNumber = {0, 1};
4343

4444
ReaderProxy()
4545
: remoteReaderGuid({GUIDPREFIX_UNKNOWN, ENTITYID_UNKNOWN}),
46-
finalFlag(false){};
46+
ackNackCount{0}, remoteLocator(LocatorIPv4()), finalFlag(false){};
4747
ReaderProxy(const Guid_t &guid, const LocatorIPv4 &loc, bool reliable)
4848
: remoteReaderGuid(guid), remoteLocator(loc),
4949
is_reliable(reliable), ackNackCount{0}, finalFlag(false){};

include/rtps/entities/StatefulReader.h

+2
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ template <class NetworkDriver> class StatefulReaderT final : public Reader {
4646
bool onNewGapMessage(const SubmessageGap &msg,
4747
const GuidPrefix_t &remotePrefix) override;
4848

49+
bool sendPreemptiveAckNack(const WriterProxy &writer) override;
50+
4951
private:
5052
Ip4Port_t m_srcPort; // TODO intended for reuse but buffer not used as such
5153
NetworkDriver *m_transport;

0 commit comments

Comments
 (0)