Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keeps track of the number of connected clients #448

Merged
merged 7 commits into from
Oct 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/openlcb/SimpleStack.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,15 @@ public:
{
/// @TODO (balazs.racz) make this more efficient by rendering to string
/// only once for all connections.
/// @TODO (balazs.racz) do not leak this.
new GcTcpHub(can_hub(), port);
gcHubServer_.reset(new GcTcpHub(can_hub(), port));
}

/// Retrieve the instance of the GridConnect Hub server, which was started
/// with start_tcp_hub_server().
/// @return the TCP hub server, or nullptr if no server was ever started.
GcTcpHub *get_tcp_hub_server()
{
return gcHubServer_.get();
}

/// Connects to a CAN hub using TCP with the gridconnect protocol.
Expand Down Expand Up @@ -473,6 +480,9 @@ private:
/// the CAN interface to function. Will be called exactly once by the
/// constructor of the base class.
std::unique_ptr<PhysicalIf> create_if(const openlcb::NodeID node_id);

/// Holds the ownership of the TCP hub server (if one was created).
std::unique_ptr<GcTcpHub> gcHubServer_;
};

class SimpleTcpStackBase : public SimpleStackBase
Expand Down
21 changes: 17 additions & 4 deletions src/utils/GcTcpHub.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,30 @@
#include "nmranet_config.h"
#include "utils/GridConnectHub.hxx"

void GcTcpHub::OnNewConnection(int fd)
void GcTcpHub::on_new_connection(int fd)
{
const bool use_select =
(config_gridconnect_tcp_use_select() == CONSTANT_TRUE);
create_gc_port_for_can_hub(canHub_, fd, nullptr, use_select);
{
AtomicHolder h(this);
numClients_++;
}
create_gc_port_for_can_hub(canHub_, fd, this, use_select);
}

void GcTcpHub::notify()
{
AtomicHolder h(this);
if (numClients_)
{
numClients_--;
}
}

GcTcpHub::GcTcpHub(CanHubFlow *can_hub, int port)
: canHub_(can_hub)
, tcpListener_(port, std::bind(&GcTcpHub::OnNewConnection, this,
std::placeholders::_1))
, tcpListener_(port,
std::bind(&GcTcpHub::on_new_connection, this, std::placeholders::_1))
{
}

Expand Down
7 changes: 7 additions & 0 deletions src/utils/GcTcpHub.cxxtest
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ protected:
fprintf(stderr, "waiting for exiting.\r");
usleep(100000);
}
EXPECT_EQ(0U, tcpHub_.get_num_clients());
}

struct Client
Expand Down Expand Up @@ -154,6 +155,7 @@ protected:

TEST_F(GcTcpHubTest, CreateDestroy)
{
EXPECT_EQ(0u, tcpHub_.get_num_clients());
}

TEST_F(GcTcpHubTest, TwoClientsPingPong)
Expand All @@ -165,6 +167,9 @@ TEST_F(GcTcpHubTest, TwoClientsPingPong)
writeline(b.fd_, ":S001N01;");
EXPECT_EQ(":S001N01;", readline(a.fd_, ';'));
EXPECT_EQ(3U, can_hub0.size());

EXPECT_EQ(2u, tcpHub_.get_num_clients());

// Test writing outwards.
send_packet(":S002N0102;");
EXPECT_EQ(":S002N0102;", readline(a.fd_, ';'));
Expand All @@ -177,13 +182,15 @@ TEST_F(GcTcpHubTest, ClientCloseExpect)
unsigned can_hub_size = can_hub0.size();
LOG(INFO, "can hub: %p ", &can_hub0);
EXPECT_EQ(1U, can_hub_size);
EXPECT_EQ(0U, tcpHub_.get_num_clients());
{
Client a;
Client b;
expect_packet(":S001N01;");
writeline(b.fd_, ":S001N01;");
EXPECT_EQ(":S001N01;", readline(a.fd_, ';'));
EXPECT_EQ(can_hub_size + 2, can_hub0.size());
EXPECT_EQ(2U, tcpHub_.get_num_clients());
wait();
}
// Test writing outwards.
Expand Down
16 changes: 14 additions & 2 deletions src/utils/GcTcpHub.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ExecutorBase;
* format. Any new incoming connection will be wired into the same virtual CAN
* hub. All packets will be forwarded to every participant, without
* loopback. */
class GcTcpHub
class GcTcpHub : private Notifiable, private Atomic
{
public:
/// Constructor.
Expand All @@ -60,16 +60,28 @@ public:
return tcpListener_.is_started();
}

/// @return currently connected client count.
unsigned get_num_clients()
{
return numClients_;
}

private:
/// Callback when a new connection arrives.
///
/// @param fd filedes of the freshly established incoming connection.
///
void OnNewConnection(int fd);
void on_new_connection(int fd);

/// Error callback from the gridconnect socket. This is invoked when a
/// client disconnects.
void notify() override;

/// @param can_hub Which CAN-hub should we attach the TCP gridconnect hub
/// onto.
CanHubFlow *canHub_;
/// How many clients are connected right now.
unsigned numClients_ {0};
/// Helper object representing the listening on the socket.
SocketListener tcpListener_;
};
Expand Down