Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/discovery rework #28

Merged
merged 19 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#Folders
*.vscode
.idea
build
cmake-build-debug
cmake-build-release
thirdparty/Micro-CDR/CMakeFiles
include/rtps/config.h
CMakeFiles
CMakeFiles
27 changes: 22 additions & 5 deletions include/rtps/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,39 @@ class ThreadPool {
static void readCallback(void *arg, udp_pcb *pcb, pbuf *p,
const ip_addr_t *addr, Ip4Port_t port);

bool addBuiltinPort(const Ip4Port_t &port);

private:
receiveJumppad_fp m_receiveJumppad;
void *m_callee;
bool m_running = false;
std::array<sys_thread_t, Config::THREAD_POOL_NUM_WRITERS> m_writers;
std::array<sys_thread_t, Config::THREAD_POOL_NUM_READERS> m_readers;

std::array<Ip4Port_t, 2 * Config::MAX_NUM_PARTICIPANTS> m_builtinPorts;
size_t m_builtinPortsIdx = 0;

sys_sem_t m_readerNotificationSem;
sys_sem_t m_writerNotificationSem;

ThreadSafeCircularBuffer<Writer *, Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH>
m_queueOutgoing;
ThreadSafeCircularBuffer<PacketInfo,
Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH>
m_queueIncoming;
void updateDiagnostics();

using BufferUsertrafficOutgoing = ThreadSafeCircularBuffer<
Writer *, Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH_USERTRAFFIC>;
using BufferMetatrafficOutgoing = ThreadSafeCircularBuffer<
Writer *, Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH_METATRAFFIC>;
using BufferUsertrafficIncoming = ThreadSafeCircularBuffer<
PacketInfo, Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH_USERTRAFFIC>;
using BufferMetatrafficIncoming = ThreadSafeCircularBuffer<
PacketInfo, Config::THREAD_POOL_WORKLOAD_QUEUE_LENGTH_METATRAFFIC>;

BufferUsertrafficOutgoing m_outgoingUserTraffic;
BufferMetatrafficOutgoing m_outgoingMetaTraffic;

BufferUsertrafficIncoming m_incomingUserTraffic;
BufferMetatrafficIncoming m_incomingMetaTraffic;

bool isBuiltinPort(const Ip4Port_t &port);
static void writerThreadFunction(void *arg);
static void readerThreadFunction(void *arg);
void doWriterWork();
Expand Down
14 changes: 13 additions & 1 deletion include/rtps/common/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Author: i11 - Embedded Software, RWTH Aachen University
#include <array>
#include <cstdint>
#include <initializer_list>
#include <limits>

// TODO subnamespaces
namespace rtps {
Expand Down Expand Up @@ -174,14 +175,25 @@ struct SequenceNumber_t {
return *this;
}

SequenceNumber_t &operator--() {
if (low == 0) {
--high;
low = std::numeric_limits<decltype(low)>::max();
} else {
--low;
}

return *this;
}

SequenceNumber_t operator++(int) {
SequenceNumber_t tmp(*this);
++*this;
return tmp;
}
};

#define SNS_MAX_NUM_BITS 32
#define SNS_MAX_NUM_BITS 256
#define SNS_NUM_BYTES (SNS_MAX_NUM_BITS / 8)
static_assert(!(SNS_MAX_NUM_BITS % 32) && SNS_MAX_NUM_BITS != 0,
"SNS_MAX_NUM_BITS must be multiple of 32");
Expand Down
9 changes: 4 additions & 5 deletions include/rtps/communication/PacketInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ This file is part of embeddedRTPS.
Author: i11 - Embedded Software, RWTH Aachen University
*/

// Copyright 2023 Apex.AI, Inc.
// All rights reserved.

#ifndef RTPS_PACKETINFO_H
#define RTPS_PACKETINFO_H

Expand All @@ -45,11 +48,7 @@ struct PacketInfo {
PacketInfo() = default;
~PacketInfo() = default;

PacketInfo &operator=(const PacketInfo &other) {
copyTriviallyCopyable(other);
this->buffer = other.buffer;
return *this;
}
PacketInfo &operator=(const PacketInfo &other) = delete;

PacketInfo &operator=(PacketInfo &&other) noexcept {
copyTriviallyCopyable(other);
Expand Down
45 changes: 24 additions & 21 deletions include/rtps/config_r5.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,51 +10,51 @@ namespace rtps {
namespace Config {
const VendorId_t VENDOR_ID = {13, 37};
const std::array<uint8_t, 4> IP_ADDRESS = {
137, 226, 8, 70}; // Needs to be set in lwipcfg.h too.
const GuidPrefix_t BASE_GUID_PREFIX{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12};
192, 168, 127, 9}; // Needs to be set in lwipcfg.h too.
const GuidPrefix_t BASE_GUID_PREFIX = GUID_RANDOM;

const uint8_t DOMAIN_ID = 0; // 230 possible with UDP
const uint8_t DOMAIN_ID = 0; // 230 possible with UDP
const uint8_t NUM_STATELESS_WRITERS = 64;
const uint8_t NUM_STATELESS_READERS = 64;
const uint8_t NUM_STATEFUL_READERS = 4;
const uint8_t NUM_STATEFUL_WRITERS = 4;
const uint8_t MAX_NUM_PARTICIPANTS = 1;
const uint8_t NUM_WRITERS_PER_PARTICIPANT =
64; // 3 will be reserved for SPDP & SEDP
64; // 3 will be reserved for SPDP & SEDP
const uint8_t NUM_READERS_PER_PARTICIPANT =
64; // 3 will be reserved for SPDP & SEDP
const uint8_t NUM_WRITER_PROXIES_PER_READER = 60;
const uint8_t NUM_READER_PROXIES_PER_WRITER = 60;
64; // 3 will be reserved for SPDP & SEDP
const uint8_t NUM_WRITER_PROXIES_PER_READER = 100;
const uint8_t NUM_READER_PROXIES_PER_WRITER = 100;

const uint8_t MAX_NUM_UNMATCHED_REMOTE_WRITERS = 150;
const uint8_t MAX_NUM_UNMATCHED_REMOTE_READERS = 150;
const uint32_t MAX_NUM_UNMATCHED_REMOTE_WRITERS = 400;
const uint32_t MAX_NUM_UNMATCHED_REMOTE_READERS = 400;

const uint8_t MAX_NUM_READER_CALLBACKS = 5;

const uint8_t HISTORY_SIZE_STATELESS = 64;
const uint8_t HISTORY_SIZE_STATEFUL = 64;
const uint8_t HISTORY_SIZE_STATEFUL = 100;

const uint8_t MAX_TYPENAME_LENGTH = 64;
const uint8_t MAX_TOPICNAME_LENGTH = 64;

const int HEARTBEAT_STACKSIZE = 1200; // byte
const int THREAD_POOL_WRITER_STACKSIZE = 1100; // byte
const int THREAD_POOL_READER_STACKSIZE = 3600; // byte
const uint16_t SPDP_WRITER_STACKSIZE = 550; // byte
const int HEARTBEAT_STACKSIZE = 1200; // byte
const int THREAD_POOL_WRITER_STACKSIZE = 10000; // byte
const int THREAD_POOL_READER_STACKSIZE = 32000; // byte
const uint16_t SPDP_WRITER_STACKSIZE = 550; // byte

const uint16_t SF_WRITER_HB_PERIOD_MS = 4000;
const uint16_t SPDP_RESEND_PERIOD_MS = 1000;
const uint8_t SPDP_WRITER_PRIO = 3;
const uint8_t SPDP_CYCLECOUNT_HEARTBEAT =
2; // Every X*SPDP_RESEND_PERIOD_MS, check for missing heartbeats
2; // Every X*SPDP_RESEND_PERIOD_MS, check for missing heartbeats
const uint8_t SPDP_MAX_NUMBER_FOUND_PARTICIPANTS = 100;
const uint8_t SPDP_MAX_NUM_LOCATORS = 1;
const Duration_t SPDP_DEFAULT_REMOTE_LEASE_DURATION = {
5, 0}; // Default lease duration for remote participants, usually
// overwritten by remote info
5, 0}; // Default lease duration for remote participants, usually
// overwritten by remote info
const Duration_t SPDP_MAX_REMOTE_LEASE_DURATION = {
180,
0}; // Absolute maximum lease duration, ignoring remote participant info
0}; // Absolute maximum lease duration, ignoring remote participant info

const int MAX_NUM_UDP_CONNECTIONS = 10;

Expand All @@ -64,12 +64,15 @@ const int THREAD_POOL_WRITER_PRIO = 3;
const int THREAD_POOL_READER_PRIO = 3;
const int THREAD_POOL_WORKLOAD_QUEUE_LENGTH = 10;

const int THREAD_POOL_WORKLOAD_QUEUE_LENGTH_USERTRAFFIC = 30;
const int THREAD_POOL_WORKLOAD_QUEUE_LENGTH_METATRAFFIC = 30;

constexpr int OVERALL_HEAP_SIZE =
THREAD_POOL_NUM_WRITERS * THREAD_POOL_WRITER_STACKSIZE +
THREAD_POOL_NUM_READERS * THREAD_POOL_READER_STACKSIZE +
MAX_NUM_PARTICIPANTS * SPDP_WRITER_STACKSIZE +
NUM_STATEFUL_WRITERS * HEARTBEAT_STACKSIZE;
} // namespace Config
} // namespace rtps
} // namespace Config
} // namespace rtps

#endif // RTPS_CONFIG_H
#endif // RTPS_CONFIG_H
57 changes: 36 additions & 21 deletions include/rtps/config_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,46 +37,61 @@ namespace rtps {
namespace Config {
const VendorId_t VENDOR_ID = {13, 37};
const std::array<uint8_t, 4> IP_ADDRESS = {
192, 168, 0, 66}; // Needs to be set in lwipcfg.h too.
const GuidPrefix_t BASE_GUID_PREFIX{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12};
192, 168, 1, 103}; // Needs to be set in lwipcfg.h too.
const GuidPrefix_t BASE_GUID_PREFIX{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 13};

const uint8_t DOMAIN_ID = 0; // 230 possible with UDP
const uint8_t NUM_STATELESS_WRITERS = 2;
const uint8_t NUM_STATELESS_READERS = 2;
const uint8_t NUM_STATELESS_WRITERS = 5;
const uint8_t NUM_STATELESS_READERS = 5;
const uint8_t NUM_STATEFUL_READERS = 2;
const uint8_t NUM_STATEFUL_WRITERS = 2;
const uint8_t MAX_NUM_PARTICIPANTS = 1;
const uint8_t NUM_WRITERS_PER_PARTICIPANT = 4;
const uint8_t NUM_READERS_PER_PARTICIPANT = 4;
const uint8_t NUM_WRITER_PROXIES_PER_READER = 3;
const uint8_t NUM_READER_PROXIES_PER_WRITER = 3;
const uint8_t NUM_WRITERS_PER_PARTICIPANT = 10;
const uint8_t NUM_READERS_PER_PARTICIPANT = 10;
const uint8_t NUM_WRITER_PROXIES_PER_READER = 6;
const uint8_t NUM_READER_PROXIES_PER_WRITER = 6;

const uint8_t HISTORY_SIZE = 10;
const uint8_t MAX_NUM_UNMATCHED_REMOTE_WRITERS = 50;
const uint8_t MAX_NUM_UNMATCHED_REMOTE_READERS = 50;

const uint8_t MAX_NUM_READER_CALLBACKS = 5;

const uint8_t MAX_TYPENAME_LENGTH = 20;
const uint8_t MAX_TOPICNAME_LENGTH = 20;

const uint8_t HISTORY_SIZE_STATELESS = 2;
const uint8_t HISTORY_SIZE_STATEFUL = 10;

const uint8_t MAX_TYPENAME_LENGTH = 64;
const uint8_t MAX_TOPICNAME_LENGTH = 64;

const int HEARTBEAT_STACKSIZE = 1200; // byte
const int THREAD_POOL_WRITER_STACKSIZE = 1100; // byte
const int THREAD_POOL_READER_STACKSIZE = 1600; // byte
const uint16_t SPDP_WRITER_STACKSIZE = 550; // byte
const int THREAD_POOL_READER_STACKSIZE = 3000; // byte
const uint16_t SPDP_WRITER_STACKSIZE = 1000; // byte

const uint16_t SF_WRITER_HB_PERIOD_MS = 4000;
const uint16_t SPDP_RESEND_PERIOD_MS = 10000;
const uint16_t SPDP_RESEND_PERIOD_MS = 1000;
const uint8_t SPDP_CYCLECOUNT_HEARTBEAT =
2; // skip x SPDP rounds before checking liveliness
const uint8_t SPDP_WRITER_PRIO = 3;
const uint8_t SPDP_MAX_NUMBER_FOUND_PARTICIPANTS = 5;
const uint8_t SPDP_MAX_NUM_LOCATORS = 5;
const Duration_t SPDP_LEASE_DURATION = {100, 0};
const uint8_t SPDP_WRITER_PRIO = 24;
const uint8_t SPDP_MAX_NUMBER_FOUND_PARTICIPANTS = 10;
const uint8_t SPDP_MAX_NUM_LOCATORS = 1;
const Duration_t SPDP_DEFAULT_REMOTE_LEASE_DURATION = {
5, 0}; // Default lease duration for remote participants, usually
// overwritten by remote info
const Duration_t SPDP_MAX_REMOTE_LEASE_DURATION = {
180,
0}; // Absolute maximum lease duration, ignoring remote participant info

const Duration_t SPDP_LEASE_DURATION = {5, 0};

const int MAX_NUM_UDP_CONNECTIONS = 10;

const int THREAD_POOL_NUM_WRITERS = 1;
const int THREAD_POOL_NUM_READERS = 1;
const int THREAD_POOL_WRITER_PRIO = 3;
const int THREAD_POOL_READER_PRIO = 3;
const int THREAD_POOL_WORKLOAD_QUEUE_LENGTH = 10;
const int THREAD_POOL_WRITER_PRIO = 24;
const int THREAD_POOL_READER_PRIO = 24;
const int THREAD_POOL_WORKLOAD_QUEUE_LENGTH_USERTRAFFIC = 60;
const int THREAD_POOL_WORKLOAD_QUEUE_LENGTH_METATRAFFIC = 60;

constexpr int OVERALL_HEAP_SIZE =
THREAD_POOL_NUM_WRITERS * THREAD_POOL_WRITER_STACKSIZE +
Expand Down
13 changes: 8 additions & 5 deletions include/rtps/discovery/SEDPAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class Reader;
class SEDPAgent {
public:
void init(Participant &part, const BuiltInEndpoints &endpoints);
void addWriter(Writer &writer);
void addReader(Reader &reader);
bool addWriter(Writer &writer);
bool addReader(Reader &reader);
bool deleteReader(Reader *reader);
bool deleteWriter(Writer *reader);

Expand All @@ -54,8 +54,10 @@ class SEDPAgent {
uint32_t getNumRemoteUnmatchedWriters();

protected: // For testing purposes
void handlePublisherReaderMessage(const TopicData &writerData);
void handleSubscriptionReaderMessage(const TopicData &writerData);
void handlePublisherReaderMessage(const TopicData &writerData,
const ReaderCacheChange &change);
void handleSubscriptionReaderMessage(const TopicData &writerData,
const ReaderCacheChange &change);

private:
Participant *m_part;
Expand All @@ -82,7 +84,8 @@ class SEDPAgent {
void addUnmatchedRemoteWriter(const TopicDataCompressed &writerData);
void addUnmatchedRemoteReader(const TopicDataCompressed &readerData);

void handleRemoteEndpointDeletion(const TopicData &topic);
void handleRemoteEndpointDeletion(const TopicData &topic,
const ReaderCacheChange &change);

void (*mfp_onNewPublisherCallback)(void *arg) = nullptr;
void *m_onNewPublisherArgs = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions include/rtps/discovery/SPDPAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Author: i11 - Embedded Software, RWTH Aachen University
if (true) { \
printf("[SPDP] "); \
printf(__VA_ARGS__); \
printf("\n"); \
printf("\r\n"); \
}
#else
#define SPDP_LOG(...) //
Expand All @@ -56,6 +56,7 @@ class SPDPAgent {
void init(Participant &participant, BuiltInEndpoints &endpoints);
void start();
void stop();
SemaphoreHandle_t m_mutex;

private:
Participant *mp_participant = nullptr;
Expand All @@ -67,7 +68,6 @@ class SPDPAgent {
ucdrBuffer m_microbuffer{};
uint8_t m_cycleHB = 0;

SemaphoreHandle_t m_mutex;
bool initialized = false;
static void receiveCallback(void *callee,
const ReaderCacheChange &cacheChange);
Expand Down
4 changes: 2 additions & 2 deletions include/rtps/discovery/TopicData.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ struct TopicData {
TopicData(Guid_t guid, ReliabilityKind_t reliability, FullLengthLocator loc)
: endpointGuid(guid), typeName{'\0'}, topicName{'\0'},
reliabilityKind(reliability),
durabilityKind(DurabilityKind_t::VOLATILE), unicastLocator(loc) {
}
durabilityKind(DurabilityKind_t::VOLATILE), unicastLocator(loc) {}


bool matchesTopicOf(const TopicData &other);

Expand Down
3 changes: 2 additions & 1 deletion include/rtps/entities/Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,10 @@ class Reader {
using dumpProxyCallback = void (*)(const Reader *reader, const WriterProxy &,
void *arg);

//! Dangerous, only
int dumpAllProxies(dumpProxyCallback target, void *arg);

virtual bool sendPreemptiveAckNack(const WriterProxy &writer);

protected:
void executeCallbacks(const ReaderCacheChange &cacheChange);
bool initMutex();
Expand Down
4 changes: 2 additions & 2 deletions include/rtps/entities/ReaderProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ Author: i11 - Embedded Software, RWTH Aachen University
namespace rtps {
struct ReaderProxy {
Guid_t remoteReaderGuid;
Count_t ackNackCount = {0};
LocatorIPv4 remoteLocator;
bool is_reliable = false;
LocatorIPv4 remoteMulticastLocator;
bool useMulticast = false;
bool suppressUnicast = false;
bool unknown_eid = false;
Count_t ackNackCount = {0};
bool finalFlag = false;
SequenceNumber_t lastAckNackSequenceNumber = {0, 1};

ReaderProxy()
: remoteReaderGuid({GUIDPREFIX_UNKNOWN, ENTITYID_UNKNOWN}),
finalFlag(false){};
ackNackCount{0}, remoteLocator(LocatorIPv4()), finalFlag(false){};
ReaderProxy(const Guid_t &guid, const LocatorIPv4 &loc, bool reliable)
: remoteReaderGuid(guid), remoteLocator(loc),
is_reliable(reliable), ackNackCount{0}, finalFlag(false){};
Expand Down
2 changes: 2 additions & 0 deletions include/rtps/entities/StatefulReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ template <class NetworkDriver> class StatefulReaderT final : public Reader {
bool onNewGapMessage(const SubmessageGap &msg,
const GuidPrefix_t &remotePrefix) override;

bool sendPreemptiveAckNack(const WriterProxy &writer) override;

private:
Ip4Port_t m_srcPort; // TODO intended for reuse but buffer not used as such
NetworkDriver *m_transport;
Expand Down
Loading