Skip to content

Commit

Permalink
Feat[MQB, MWC]: report the number of tcp connections (#384)
Browse files Browse the repository at this point in the history
Signed-off-by: Anton Pryakhin <[email protected]>
  • Loading branch information
waldgange authored Sep 19, 2024
1 parent 3f35614 commit 6044740
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 49 deletions.
109 changes: 85 additions & 24 deletions src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ namespace BloombergLP {
namespace mqbnet {

const char* TCPSessionFactory::k_CHANNEL_PROPERTY_PEER_IP = "tcp.peer.ip";
const char* TCPSessionFactory::k_CHANNEL_PROPERTY_LOCAL_PORT =
"tcp.local.port";
const char* TCPSessionFactory::k_CHANNEL_PROPERTY_CHANNEL_ID =
"channelpool.channel.id";
const char* TCPSessionFactory::k_CHANNEL_STATUS_CLOSE_REASON =
Expand Down Expand Up @@ -155,14 +157,21 @@ void ntcChannelPreCreation(
BSLS_ANNOTATION_UNUSED const
bsl::shared_ptr<mwcio::ChannelFactory::OpHandle>& operationHandle)
{
ntsa::Endpoint peerEndpoint = channel->peerEndpoint();
ntsa::Endpoint peerEndpoint = channel->peerEndpoint();
ntsa::Endpoint sourceEndpoint = channel->sourceEndpoint();

if (peerEndpoint.isIp() && peerEndpoint.ip().host().isV4()) {
channel->properties().set(
TCPSessionFactory::k_CHANNEL_PROPERTY_PEER_IP,
static_cast<int>(peerEndpoint.ip().host().v4().value()));
}

if (sourceEndpoint.isIp()) {
channel->properties().set(
TCPSessionFactory::k_CHANNEL_PROPERTY_LOCAL_PORT,
static_cast<int>(sourceEndpoint.ip().port()));
}

channel->properties().set(TCPSessionFactory::k_CHANNEL_PROPERTY_CHANNEL_ID,
channel->channelId());
}
Expand Down Expand Up @@ -280,36 +289,31 @@ TCPSessionFactory::channelStatContextCreator(
const bsl::shared_ptr<mwcio::Channel>& channel,
const bsl::shared_ptr<mwcio::StatChannelFactoryHandle>& handle)
{
mwcst::StatContext* parent = 0;

int peerAddress;
channel->properties().load(&peerAddress, k_CHANNEL_PROPERTY_PEER_IP);

ntsa::Ipv4Address ipv4Address(static_cast<bsl::uint32_t>(peerAddress));
ntsa::IpAddress ipAddress(ipv4Address);
if (!mwcio::ChannelUtil::isLocalHost(ipAddress)) {
parent = d_statController_p->channelsStatContext(
mqbstat::StatController::ChannelSelector::e_LOCAL);
}
else {
parent = d_statController_p->channelsStatContext(
mqbstat::StatController::ChannelSelector::e_REMOTE);
}

ntsa::Ipv4Address ipv4Address(static_cast<bsl::uint32_t>(peerAddress));
ntsa::IpAddress ipAddress(ipv4Address);
mwcst::StatContext* parent = d_statController_p->channelsStatContext(
mwcio::ChannelUtil::isLocalHost(ipAddress)
? mqbstat::StatController::ChannelSelector::e_LOCAL
: mqbstat::StatController::ChannelSelector::e_REMOTE);
BSLS_ASSERT_SAFE(parent);

bsl::string name;
if (handle->options().is<mwcio::ConnectOptions>()) {
name = handle->options().the<mwcio::ConnectOptions>().endpoint();
}
else {
name = channel->peerUri();
}
bsl::string endpoint =
handle->options().is<mwcio::ConnectOptions>()
? handle->options().the<mwcio::ConnectOptions>().endpoint()
: channel->peerUri();

bdlma::LocalSequentialAllocator<2048> localAllocator(d_allocator_p);
mwcst::StatContextConfiguration statConfig(name, &localAllocator);
int localPort;
channel->properties().load(
&localPort,
TCPSessionFactory::k_CHANNEL_PROPERTY_LOCAL_PORT);

return parent->addSubcontext(statConfig);
bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex); // LOCK
return d_ports.addChannelContext(parent,
endpoint,
static_cast<bsl::uint16_t>(localPort));
}

void TCPSessionFactory::negotiate(
Expand Down Expand Up @@ -732,6 +736,11 @@ void TCPSessionFactory::onClose(const bsl::shared_ptr<mwcio::Channel>& channel,
{
--d_nbActiveChannels;

int port;
channel->properties().load(
&port,
TCPSessionFactory::k_CHANNEL_PROPERTY_LOCAL_PORT);

ChannelInfoSp channelInfo;
{
// Lookup the session and remove it from internal map
Expand All @@ -742,6 +751,7 @@ void TCPSessionFactory::onClose(const bsl::shared_ptr<mwcio::Channel>& channel,
channelInfo = it->second;
d_channels.erase(it);
}
d_ports.onDeleteChannelContext(port);
} // close mutex lock guard // UNLOCK

if (!channelInfo) {
Expand Down Expand Up @@ -938,6 +948,7 @@ TCPSessionFactory::TCPSessionFactory(
, d_noSessionCondition(bsls::SystemClockType::e_MONOTONIC)
, d_noClientCondition(bsls::SystemClockType::e_MONOTONIC)
, d_channels(allocator)
, d_ports(allocator)
, d_heartbeatSchedulerActive(false)
, d_heartbeatChannels(allocator)
, d_initialMissedHeartbeatCounter(calculateInitialMissedHbCounter(config))
Expand Down Expand Up @@ -1462,5 +1473,55 @@ bool TCPSessionFactory::isEndpointLoopback(const bslstl::StringRef& uri) const
mwcio::ChannelUtil::isLocalHost(endpoint.host());
}

// ------------------------------------
// class TCPSessionFactory::PortManager
// ------------------------------------

TCPSessionFactory::PortManager::PortManager(bslma::Allocator* allocator)
: d_portMap(allocator)
, d_allocator_p(allocator)
{
}

bslma::ManagedPtr<mwcst::StatContext>
TCPSessionFactory::PortManager::addChannelContext(mwcst::StatContext* parent,
const bsl::string& endpoint,
bsl::uint16_t port)
{
bdlma::LocalSequentialAllocator<2048> localAllocator(d_allocator_p);
mwcst::StatContextConfiguration statConfig(endpoint, &localAllocator);

bslma::ManagedPtr<mwcst::StatContext> channelStatContext;

PortMap::iterator portIt = d_portMap.find(port);

if (portIt != d_portMap.end()) {
channelStatContext = portIt->second.d_portContext->addSubcontext(
statConfig);
++portIt->second.d_numChannels;
}
else {
mwcst::StatContextConfiguration portConfig(
static_cast<bsls::Types::Int64>(port),
&localAllocator);
bsl::shared_ptr<mwcst::StatContext> portStatContext =
parent->addSubcontext(
portConfig.storeExpiredSubcontextValues(true));
channelStatContext = portStatContext->addSubcontext(statConfig);
d_portMap.emplace(port, PortContext({portStatContext, 1}));
}

return channelStatContext;
}

void TCPSessionFactory::PortManager::onDeleteChannelContext(bsl::uint16_t port)
{
// Lookup the port's StatContext and remove it from the internal containers
PortMap::iterator it = d_portMap.find(port);
if (it != d_portMap.end() && --it->second.d_numChannels == 0) {
d_portMap.erase(it);
}
}

} // close package namespace
} // close enterprise namespace
43 changes: 43 additions & 0 deletions src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ class TCPSessionFactory {
/// Name of a property set on the channel representing the peer's IP.
static const char* k_CHANNEL_PROPERTY_PEER_IP;

/// Name of a property set on the channel representing the local port.
static const char* k_CHANNEL_PROPERTY_LOCAL_PORT;

/// Name of a property set on the channel representing the BTE channel
/// id.
static const char* k_CHANNEL_PROPERTY_CHANNEL_ID;
Expand Down Expand Up @@ -214,6 +217,43 @@ class TCPSessionFactory {
// scheduler thread.
};

/// This class provides mechanism to store a map of port stat contexts.
class PortManager {
public:
// PUBLIC TYPES
struct PortContext {
bsl::shared_ptr<mwcst::StatContext> d_portContext;
bsl::size_t d_numChannels;
};
typedef bsl::unordered_map<bsl::uint16_t, PortContext> PortMap;

private:
// PRIVATE DATA

/// A map of all ports
PortMap d_portMap;

/// Allocator to use
bslma::Allocator* d_allocator_p;

public:
// CREATORS
explicit PortManager(bslma::Allocator* allocator = 0);

// PUBLIC METHODS
/// Create a sub context of the specified 'parent' with the specified
/// 'endpoint' as the StatContext's name. Increases the number of
/// channels on the specified 'port'.
bslma::ManagedPtr<mwcst::StatContext>
addChannelContext(mwcst::StatContext* parent,
const bsl::string& endpoint,
bsl::uint16_t port);

/// Handle the deletion of a StatContext associated with a channel
/// connected to the specified 'port'.
void onDeleteChannelContext(bsl::uint16_t port);
};

typedef bsl::shared_ptr<ChannelInfo> ChannelInfoSp;

/// Map associating a `Channel` to its corresponding `ChannelInfo` (as
Expand Down Expand Up @@ -322,6 +362,9 @@ class TCPSessionFactory {
ChannelMap d_channels;
// Map of all active channels

PortManager d_ports;
// Manager of all open ports

bool d_heartbeatSchedulerActive;
// True if the recurring
// heartbeat check event is
Expand Down
14 changes: 8 additions & 6 deletions src/groups/mwc/mwcio/mwcio_ntcchannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1384,12 +1384,14 @@ int NtcChannel::channelId() const

ntsa::Endpoint NtcChannel::peerEndpoint() const
{
if (d_streamSocket_sp) {
return d_streamSocket_sp->remoteEndpoint();
}
else {
return ntsa::Endpoint();
}
return d_streamSocket_sp ? d_streamSocket_sp->remoteEndpoint()
: ntsa::Endpoint();
}

ntsa::Endpoint NtcChannel::sourceEndpoint() const
{
return d_streamSocket_sp ? d_streamSocket_sp->sourceEndpoint()
: ntsa::Endpoint();
}

const bsl::string& NtcChannel::peerUri() const
Expand Down
5 changes: 4 additions & 1 deletion src/groups/mwc/mwcio/mwcio_ntcchannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,12 @@ class NtcChannel : public mwcio::Channel,
/// Return the channel ID.
int channelId() const;

/// Load into the specified `result` the endpoint of the peer.
/// Return the endpoint of the "remote" peer.
ntsa::Endpoint peerEndpoint() const;

/// Return the endpoint of the "source" peer.
ntsa::Endpoint sourceEndpoint() const;

/// Return the URI of the "remote" end of this channel. It is up to the
/// underlying implementation to define the format of the returned URI.
const bsl::string& peerUri() const BSLS_KEYWORD_OVERRIDE;
Expand Down
3 changes: 2 additions & 1 deletion src/groups/mwc/mwcio/mwcio_statchannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ StatChannel::StatChannel(const StatChannelConfig& config,
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(config.d_statContext_sp);
d_config.d_statContext_sp->adjustValue(Stat::e_CONNECTIONS, 1);
}

StatChannel::~StatChannel()
{
// NOTHING
d_config.d_statContext_sp->adjustValue(Stat::e_CONNECTIONS, -1);
}

// MANIPULATORS
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mwc/mwcio/mwcio_statchannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class StatChannel : public DecoratingChannelPartialImp {
/// `mwcio::StatChannelFactory`).
struct Stat {
// TYPES
enum Enum { e_BYTES_IN = 0, e_BYTES_OUT = 1 };
enum Enum { e_BYTES_IN = 0, e_BYTES_OUT = 1, e_CONNECTIONS = 2 };
};

private:
Expand Down
24 changes: 24 additions & 0 deletions src/groups/mwc/mwcio/mwcio_statchannelfactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ StatChannelFactoryUtil::statContextConfiguration(const bsl::string& name,
config.isTable(true);
config.value("in_bytes")
.value("out_bytes")
.value("connections")
.storeExpiredSubcontextValues(true);

if (historySize != -1) {
Expand Down Expand Up @@ -268,6 +269,10 @@ void StatChannelFactoryUtil::initializeStatsTable(
StatChannel::Stat::e_BYTES_OUT,
mwcst::StatUtil::value,
start);
schema.addColumn("connections",
StatChannel::Stat::e_CONNECTIONS,
mwcst::StatUtil::value,
start);

if (!(end == mwcst::StatValue::SnapshotLocation())) {
schema.addColumn("in_bytes_delta",
Expand All @@ -280,6 +285,11 @@ void StatChannelFactoryUtil::initializeStatsTable(
mwcst::StatUtil::valueDifference,
start,
end);
schema.addColumn("connections_delta",
StatChannel::Stat::e_CONNECTIONS,
mwcst::StatUtil::valueDifference,
start,
end);
}

// Configure records
Expand Down Expand Up @@ -316,6 +326,14 @@ void StatChannelFactoryUtil::initializeStatsTable(
.printAsMemory();
}
tip->addColumn("out_bytes", "total").zeroString("").printAsMemory();

tip->setColumnGroup("Connections");
if (!(end == mwcst::StatValue::SnapshotLocation())) {
tip->addColumn("connections_delta", "delta")
.zeroString("")
.setPrecision(0);
}
tip->addColumn("connections", "total").setPrecision(0);
}

bsls::Types::Int64
Expand Down Expand Up @@ -352,6 +370,12 @@ StatChannelFactoryUtil::getValue(const mwcst::StatContext& context,
case Stat::e_BYTES_OUT_ABS: {
return STAT_SINGLE(value, StatChannel::Stat::e_BYTES_OUT);
}
case Stat::e_CONNECTIONS_DELTA: {
return STAT_RANGE(valueDifference, StatChannel::Stat::e_CONNECTIONS);
}
case Stat::e_CONNECTIONS_ABS: {
return STAT_SINGLE(value, StatChannel::Stat::e_CONNECTIONS);
}
default: {
BSLS_ASSERT_SAFE(false && "Attempting to access an unknown stat");
}
Expand Down
4 changes: 3 additions & 1 deletion src/groups/mwc/mwcio/mwcio_statchannelfactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ struct StatChannelFactoryUtil {
e_BYTES_IN_DELTA,
e_BYTES_IN_ABS,
e_BYTES_OUT_DELTA,
e_BYTES_OUT_ABS
e_BYTES_OUT_ABS,
e_CONNECTIONS_DELTA,
e_CONNECTIONS_ABS
};
};

Expand Down
Loading

0 comments on commit 6044740

Please sign in to comment.