Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow programmatic configuration of unicast relays. #498

Merged
merged 9 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 38 additions & 20 deletions include/gz/transport/Discovery.hh
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,42 @@ namespace gz
}
}

/// \brief Register a new relay address.
/// \param[in] _ip New IP address.
public: void AddRelayAddress(const std::string &_ip)
{
std::lock_guard<std::mutex> lock(this->mutex);
// Sanity check: Make sure that this IP address is not already saved.
for (auto const &addr : this->relayAddrs)
{
if (addr.sin_addr.s_addr == inet_addr(_ip.c_str()))
return;
}

sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(_ip.c_str());
addr.sin_port = htons(static_cast<u_short>(this->port));

this->relayAddrs.push_back(addr);
}

// \brief Gets this instance's relay addresses.
// \return The list of relay addresses.
public: std::vector<std::string> RelayAddresses() const
{
std::vector<std::string> result;

std::lock_guard<std::mutex> lock(this->mutex);

for (auto const &addr : this->relayAddrs) {
result.push_back(inet_ntoa(addr.sin_addr));
}

return result;
}

/// \brief Broadcast periodic heartbeats.
private: void UpdateHeartbeat()
{
Expand Down Expand Up @@ -1254,6 +1290,8 @@ namespace gz
if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
{
// Send the discovery message to the unicast relays.
std::lock_guard<std::mutex> lock(this->mutex);

for (const auto &sockAddr : this->relayAddrs)
{
errno = 0;
Expand Down Expand Up @@ -1420,26 +1458,6 @@ namespace gz
return true;
}

/// \brief Register a new relay address.
/// \param[in] _ip New IP address.
private: void AddRelayAddress(const std::string &_ip)
{
// Sanity check: Make sure that this IP address is not already saved.
for (auto const &addr : this->relayAddrs)
{
if (addr.sin_addr.s_addr == inet_addr(_ip.c_str()))
return;
}

sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(_ip.c_str());
addr.sin_port = htons(static_cast<u_short>(this->port));

this->relayAddrs.push_back(addr);
}

/// \brief Default activity interval value (ms.).
/// \sa ActivityInterval.
/// \sa SetActivityInterval.
Expand Down
13 changes: 13 additions & 0 deletions include/gz/transport/Node.hh
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,19 @@ namespace gz
public: std::optional<TopicStatistics> TopicStats(
const std::string &_topic) const;

/// \brief Adds a unicast relay IP. All nodes in this process will send
/// UDP unicast traffic to the address to connect networks when UDP
/// multicast traffic is not forwarded.
/// It's also possible to use the environment variable GZ_RELAY to add
/// relays.
/// \param[in] _relayAddress IPv4 address of the relay to add.
public: void AddGlobalRelay(const std::string& _relayAddress);

/// \brief Gets the relay addresses configured for all nodes in this
/// process.
/// \return The relay addresses.
public: std::vector<std::string> GlobalRelays() const;

/// \brief Get a pointer to the shared node (singleton shared by all the
/// nodes).
/// \return The pointer to the shared node.
Expand Down
13 changes: 13 additions & 0 deletions include/gz/transport/NodeShared.hh
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,19 @@ namespace gz
public: std::optional<TopicStatistics> TopicStats(
const std::string &_topic) const;

/// \brief Adds a unicast relay IP. All nodes in this process will send
/// UDP unicast traffic to the address to connect networks when UDP
/// multicast traffic is not forwarded.
/// It's also possible to use the environment variable GZ_RELAY to add
/// relays.
/// \param[in] _relayAddress IPv4 address of the relay to add.
public: void AddGlobalRelay(const std::string& _relayAddress);

/// \brief Gets the relay addresses configured for all nodes in this
/// process.
/// \return The relay addresses.
public: std::vector<std::string> GlobalRelays() const;

/// \brief Constructor.
protected: NodeShared();

Expand Down
10 changes: 10 additions & 0 deletions src/Node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1139,3 +1139,13 @@ bool Node::RequestRaw(const std::string &_topic,
bool executed = this->Request(_topic, *req, _timeout, *res, _result);
return executed && res->SerializeToString(&_response);
}

/////////////////////////////////////////////////
void Node::AddGlobalRelay(const std::string& _relayAddress) {
Shared()->AddGlobalRelay(_relayAddress);
}

/////////////////////////////////////////////////
std::vector<std::string> Node::GlobalRelays() const {
return Shared()->GlobalRelays();
}
20 changes: 20 additions & 0 deletions src/NodeShared.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <iostream>
#include <map>
#include <mutex>
#include <set>
#include <shared_mutex> //NOLINT
#include <string>
#include <thread>
Expand Down Expand Up @@ -1941,3 +1942,22 @@ int NodeSharedPrivate::NonNegativeEnvVar(const std::string &_envVar,
}
return numVal;
}

void NodeShared::AddGlobalRelay(const std::string& _relayAddress) {
dataPtr->msgDiscovery->AddRelayAddress(_relayAddress);
dataPtr->srvDiscovery->AddRelayAddress(_relayAddress);
}

std::vector<std::string> NodeShared::GlobalRelays() const {
// Merge relays from message and service discovery. They should be identical
// since they're typically build from the same sources.
//
// This is confusing - do we want to add different handling here?
auto msgRelays = dataPtr->msgDiscovery->RelayAddresses();
std::set<std::string> msgRelaySet(msgRelays.cbegin(), msgRelays.cend());
auto srvRelays = dataPtr->srvDiscovery->RelayAddresses();
std::set<std::string> srvRelaySet(srvRelays.cbegin(), srvRelays.cend());
srvRelaySet.merge(msgRelaySet);

return std::vector<std::string>(srvRelaySet.cbegin(), srvRelaySet.cend());
}
25 changes: 25 additions & 0 deletions src/Node_TEST.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2341,6 +2341,31 @@ TEST(NodeTest, statistics)
EXPECT_EQ(std::nullopt, node.TopicStats("/test"));
}

//////////////////////////////////////////////////
/// \brief Test adding and querying relays
TEST(NodeTest, relay) {
transport::Node node;

// Make sure the relay we're about to add hasn't already been configured.
const std::vector<std::string> relaysBeforeAdd = node.GlobalRelays();
{
auto relaysBeforeIt = std::find_if(
relaysBeforeAdd.cbegin(), relaysBeforeAdd.cend(),
[](const std::string &relay) { return relay == "127.0.0.123"; });
ASSERT_EQ(relaysBeforeIt, relaysBeforeAdd.cend());
}
node.AddGlobalRelay("127.0.0.123");

const std::vector<std::string> relaysAfterAdd = node.GlobalRelays();
{
EXPECT_EQ(relaysAfterAdd.size(), relaysBeforeAdd.size() + 1);
auto relayIt = std::find_if(
relaysAfterAdd.cbegin(), relaysAfterAdd.cend(),
[](const std::string &relay) { return relay == "127.0.0.123"; });
EXPECT_NE(relayIt, relaysAfterAdd.cend());
}
}

//////////////////////////////////////////////////
int main(int argc, char **argv)
{
Expand Down