diff --git a/src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.cpp b/src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.cpp index 48c415a97e..77512c42df 100644 --- a/src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.cpp +++ b/src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.cpp @@ -84,6 +84,8 @@ namespace BloombergLP { namespace mqbnet { const char* TCPSessionFactory::k_CHANNEL_PROPERTY_PEER_IP = "tcp.peer.ip"; +const char* TCPSessionFactory::k_CHANNEL_PROPERTY_LOCAL_PORT = + "tcp.local.port"; const char* TCPSessionFactory::k_CHANNEL_PROPERTY_CHANNEL_ID = "channelpool.channel.id"; const char* TCPSessionFactory::k_CHANNEL_STATUS_CLOSE_REASON = @@ -155,7 +157,8 @@ void ntcChannelPreCreation( BSLS_ANNOTATION_UNUSED const bsl::shared_ptr& operationHandle) { - ntsa::Endpoint peerEndpoint = channel->peerEndpoint(); + ntsa::Endpoint peerEndpoint = channel->peerEndpoint(); + ntsa::Endpoint sourceEndpoint = channel->sourceEndpoint(); if (peerEndpoint.isIp() && peerEndpoint.ip().host().isV4()) { channel->properties().set( @@ -163,6 +166,12 @@ void ntcChannelPreCreation( static_cast(peerEndpoint.ip().host().v4().value())); } + if (sourceEndpoint.isIp()) { + channel->properties().set( + TCPSessionFactory::k_CHANNEL_PROPERTY_LOCAL_PORT, + static_cast(sourceEndpoint.ip().port())); + } + channel->properties().set(TCPSessionFactory::k_CHANNEL_PROPERTY_CHANNEL_ID, channel->channelId()); } @@ -280,36 +289,31 @@ TCPSessionFactory::channelStatContextCreator( const bsl::shared_ptr& channel, const bsl::shared_ptr& handle) { - mwcst::StatContext* parent = 0; - int peerAddress; channel->properties().load(&peerAddress, k_CHANNEL_PROPERTY_PEER_IP); - ntsa::Ipv4Address ipv4Address(static_cast(peerAddress)); - ntsa::IpAddress ipAddress(ipv4Address); - if (!mwcio::ChannelUtil::isLocalHost(ipAddress)) { - parent = d_statController_p->channelsStatContext( - mqbstat::StatController::ChannelSelector::e_LOCAL); - } - else { - parent = d_statController_p->channelsStatContext( - mqbstat::StatController::ChannelSelector::e_REMOTE); - } - + ntsa::Ipv4Address ipv4Address(static_cast(peerAddress)); + ntsa::IpAddress ipAddress(ipv4Address); + mwcst::StatContext* parent = d_statController_p->channelsStatContext( + mwcio::ChannelUtil::isLocalHost(ipAddress) + ? mqbstat::StatController::ChannelSelector::e_LOCAL + : mqbstat::StatController::ChannelSelector::e_REMOTE); BSLS_ASSERT_SAFE(parent); - bsl::string name; - if (handle->options().is()) { - name = handle->options().the().endpoint(); - } - else { - name = channel->peerUri(); - } + bsl::string endpoint = + handle->options().is() + ? handle->options().the().endpoint() + : channel->peerUri(); - bdlma::LocalSequentialAllocator<2048> localAllocator(d_allocator_p); - mwcst::StatContextConfiguration statConfig(name, &localAllocator); + int localPort; + channel->properties().load( + &localPort, + TCPSessionFactory::k_CHANNEL_PROPERTY_LOCAL_PORT); - return parent->addSubcontext(statConfig); + bslmt::LockGuard guard(&d_mutex); // LOCK + return d_ports.addChannelContext(parent, + endpoint, + static_cast(localPort)); } void TCPSessionFactory::negotiate( @@ -732,6 +736,11 @@ void TCPSessionFactory::onClose(const bsl::shared_ptr& channel, { --d_nbActiveChannels; + int port; + channel->properties().load( + &port, + TCPSessionFactory::k_CHANNEL_PROPERTY_LOCAL_PORT); + ChannelInfoSp channelInfo; { // Lookup the session and remove it from internal map @@ -742,6 +751,7 @@ void TCPSessionFactory::onClose(const bsl::shared_ptr& channel, channelInfo = it->second; d_channels.erase(it); } + d_ports.onDeleteChannelContext(port); } // close mutex lock guard // UNLOCK if (!channelInfo) { @@ -938,6 +948,7 @@ TCPSessionFactory::TCPSessionFactory( , d_noSessionCondition(bsls::SystemClockType::e_MONOTONIC) , d_noClientCondition(bsls::SystemClockType::e_MONOTONIC) , d_channels(allocator) +, d_ports(allocator) , d_heartbeatSchedulerActive(false) , d_heartbeatChannels(allocator) , d_initialMissedHeartbeatCounter(calculateInitialMissedHbCounter(config)) @@ -1462,5 +1473,55 @@ bool TCPSessionFactory::isEndpointLoopback(const bslstl::StringRef& uri) const mwcio::ChannelUtil::isLocalHost(endpoint.host()); } +// ------------------------------------ +// class TCPSessionFactory::PortManager +// ------------------------------------ + +TCPSessionFactory::PortManager::PortManager(bslma::Allocator* allocator) +: d_portMap(allocator) +, d_allocator_p(allocator) +{ +} + +bslma::ManagedPtr +TCPSessionFactory::PortManager::addChannelContext(mwcst::StatContext* parent, + const bsl::string& endpoint, + bsl::uint16_t port) +{ + bdlma::LocalSequentialAllocator<2048> localAllocator(d_allocator_p); + mwcst::StatContextConfiguration statConfig(endpoint, &localAllocator); + + bslma::ManagedPtr channelStatContext; + + PortMap::iterator portIt = d_portMap.find(port); + + if (portIt != d_portMap.end()) { + channelStatContext = portIt->second.d_portContext->addSubcontext( + statConfig); + ++portIt->second.d_numChannels; + } + else { + mwcst::StatContextConfiguration portConfig( + static_cast(port), + &localAllocator); + bsl::shared_ptr portStatContext = + parent->addSubcontext( + portConfig.storeExpiredSubcontextValues(true)); + channelStatContext = portStatContext->addSubcontext(statConfig); + d_portMap.emplace(port, PortContext({portStatContext, 1})); + } + + return channelStatContext; +} + +void TCPSessionFactory::PortManager::onDeleteChannelContext(bsl::uint16_t port) +{ + // Lookup the port's StatContext and remove it from the internal containers + PortMap::iterator it = d_portMap.find(port); + if (it != d_portMap.end() && --it->second.d_numChannels == 0) { + d_portMap.erase(it); + } +} + } // close package namespace } // close enterprise namespace diff --git a/src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.h b/src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.h index 08bafecadb..d3552c6f73 100644 --- a/src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.h +++ b/src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.h @@ -137,6 +137,9 @@ class TCPSessionFactory { /// Name of a property set on the channel representing the peer's IP. static const char* k_CHANNEL_PROPERTY_PEER_IP; + /// Name of a property set on the channel representing the local port. + static const char* k_CHANNEL_PROPERTY_LOCAL_PORT; + /// Name of a property set on the channel representing the BTE channel /// id. static const char* k_CHANNEL_PROPERTY_CHANNEL_ID; @@ -214,6 +217,43 @@ class TCPSessionFactory { // scheduler thread. }; + /// This class provides mechanism to store a map of port stat contexts. + class PortManager { + public: + // PUBLIC TYPES + struct PortContext { + bsl::shared_ptr d_portContext; + bsl::size_t d_numChannels; + }; + typedef bsl::unordered_map PortMap; + + private: + // PRIVATE DATA + + /// A map of all ports + PortMap d_portMap; + + /// Allocator to use + bslma::Allocator* d_allocator_p; + + public: + // CREATORS + explicit PortManager(bslma::Allocator* allocator = 0); + + // PUBLIC METHODS + /// Create a sub context of the specified 'parent' with the specified + /// 'endpoint' as the StatContext's name. Increases the number of + /// channels on the specified 'port'. + bslma::ManagedPtr + addChannelContext(mwcst::StatContext* parent, + const bsl::string& endpoint, + bsl::uint16_t port); + + /// Handle the deletion of a StatContext associated with a channel + /// connected to the specified 'port'. + void onDeleteChannelContext(bsl::uint16_t port); + }; + typedef bsl::shared_ptr ChannelInfoSp; /// Map associating a `Channel` to its corresponding `ChannelInfo` (as @@ -322,6 +362,9 @@ class TCPSessionFactory { ChannelMap d_channels; // Map of all active channels + PortManager d_ports; + // Manager of all open ports + bool d_heartbeatSchedulerActive; // True if the recurring // heartbeat check event is diff --git a/src/groups/mwc/mwcio/mwcio_ntcchannel.cpp b/src/groups/mwc/mwcio/mwcio_ntcchannel.cpp index 28826882b9..84e369efe1 100644 --- a/src/groups/mwc/mwcio/mwcio_ntcchannel.cpp +++ b/src/groups/mwc/mwcio/mwcio_ntcchannel.cpp @@ -1384,12 +1384,14 @@ int NtcChannel::channelId() const ntsa::Endpoint NtcChannel::peerEndpoint() const { - if (d_streamSocket_sp) { - return d_streamSocket_sp->remoteEndpoint(); - } - else { - return ntsa::Endpoint(); - } + return d_streamSocket_sp ? d_streamSocket_sp->remoteEndpoint() + : ntsa::Endpoint(); +} + +ntsa::Endpoint NtcChannel::sourceEndpoint() const +{ + return d_streamSocket_sp ? d_streamSocket_sp->sourceEndpoint() + : ntsa::Endpoint(); } const bsl::string& NtcChannel::peerUri() const diff --git a/src/groups/mwc/mwcio/mwcio_ntcchannel.h b/src/groups/mwc/mwcio/mwcio_ntcchannel.h index ffebe4657f..0ccd613a34 100644 --- a/src/groups/mwc/mwcio/mwcio_ntcchannel.h +++ b/src/groups/mwc/mwcio/mwcio_ntcchannel.h @@ -414,9 +414,12 @@ class NtcChannel : public mwcio::Channel, /// Return the channel ID. int channelId() const; - /// Load into the specified `result` the endpoint of the peer. + /// Return the endpoint of the "remote" peer. ntsa::Endpoint peerEndpoint() const; + /// Return the endpoint of the "source" peer. + ntsa::Endpoint sourceEndpoint() const; + /// Return the URI of the "remote" end of this channel. It is up to the /// underlying implementation to define the format of the returned URI. const bsl::string& peerUri() const BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mwc/mwcio/mwcio_statchannel.cpp b/src/groups/mwc/mwcio/mwcio_statchannel.cpp index 55034b7deb..a57c6e3d59 100644 --- a/src/groups/mwc/mwcio/mwcio_statchannel.cpp +++ b/src/groups/mwc/mwcio/mwcio_statchannel.cpp @@ -81,11 +81,12 @@ StatChannel::StatChannel(const StatChannelConfig& config, { // PRECONDITIONS BSLS_ASSERT_SAFE(config.d_statContext_sp); + d_config.d_statContext_sp->adjustValue(Stat::e_CONNECTIONS, 1); } StatChannel::~StatChannel() { - // NOTHING + d_config.d_statContext_sp->adjustValue(Stat::e_CONNECTIONS, -1); } // MANIPULATORS diff --git a/src/groups/mwc/mwcio/mwcio_statchannel.h b/src/groups/mwc/mwcio/mwcio_statchannel.h index 7a96d2e961..3e0d777f8d 100644 --- a/src/groups/mwc/mwcio/mwcio_statchannel.h +++ b/src/groups/mwc/mwcio/mwcio_statchannel.h @@ -98,7 +98,7 @@ class StatChannel : public DecoratingChannelPartialImp { /// `mwcio::StatChannelFactory`). struct Stat { // TYPES - enum Enum { e_BYTES_IN = 0, e_BYTES_OUT = 1 }; + enum Enum { e_BYTES_IN = 0, e_BYTES_OUT = 1, e_CONNECTIONS = 2 }; }; private: diff --git a/src/groups/mwc/mwcio/mwcio_statchannelfactory.cpp b/src/groups/mwc/mwcio/mwcio_statchannelfactory.cpp index ac450c175b..53955e7244 100644 --- a/src/groups/mwc/mwcio/mwcio_statchannelfactory.cpp +++ b/src/groups/mwc/mwcio/mwcio_statchannelfactory.cpp @@ -220,6 +220,7 @@ StatChannelFactoryUtil::statContextConfiguration(const bsl::string& name, config.isTable(true); config.value("in_bytes") .value("out_bytes") + .value("connections") .storeExpiredSubcontextValues(true); if (historySize != -1) { @@ -268,6 +269,10 @@ void StatChannelFactoryUtil::initializeStatsTable( StatChannel::Stat::e_BYTES_OUT, mwcst::StatUtil::value, start); + schema.addColumn("connections", + StatChannel::Stat::e_CONNECTIONS, + mwcst::StatUtil::value, + start); if (!(end == mwcst::StatValue::SnapshotLocation())) { schema.addColumn("in_bytes_delta", @@ -280,6 +285,11 @@ void StatChannelFactoryUtil::initializeStatsTable( mwcst::StatUtil::valueDifference, start, end); + schema.addColumn("connections_delta", + StatChannel::Stat::e_CONNECTIONS, + mwcst::StatUtil::valueDifference, + start, + end); } // Configure records @@ -316,6 +326,14 @@ void StatChannelFactoryUtil::initializeStatsTable( .printAsMemory(); } tip->addColumn("out_bytes", "total").zeroString("").printAsMemory(); + + tip->setColumnGroup("Connections"); + if (!(end == mwcst::StatValue::SnapshotLocation())) { + tip->addColumn("connections_delta", "delta") + .zeroString("") + .setPrecision(0); + } + tip->addColumn("connections", "total").setPrecision(0); } bsls::Types::Int64 @@ -352,6 +370,12 @@ StatChannelFactoryUtil::getValue(const mwcst::StatContext& context, case Stat::e_BYTES_OUT_ABS: { return STAT_SINGLE(value, StatChannel::Stat::e_BYTES_OUT); } + case Stat::e_CONNECTIONS_DELTA: { + return STAT_RANGE(valueDifference, StatChannel::Stat::e_CONNECTIONS); + } + case Stat::e_CONNECTIONS_ABS: { + return STAT_SINGLE(value, StatChannel::Stat::e_CONNECTIONS); + } default: { BSLS_ASSERT_SAFE(false && "Attempting to access an unknown stat"); } diff --git a/src/groups/mwc/mwcio/mwcio_statchannelfactory.h b/src/groups/mwc/mwcio/mwcio_statchannelfactory.h index 5983bac1fe..e451b940b3 100644 --- a/src/groups/mwc/mwcio/mwcio_statchannelfactory.h +++ b/src/groups/mwc/mwcio/mwcio_statchannelfactory.h @@ -247,7 +247,9 @@ struct StatChannelFactoryUtil { e_BYTES_IN_DELTA, e_BYTES_IN_ABS, e_BYTES_OUT_DELTA, - e_BYTES_OUT_ABS + e_BYTES_OUT_ABS, + e_CONNECTIONS_DELTA, + e_CONNECTIONS_ABS }; }; diff --git a/src/plugins/bmqprometheus/bmqprometheus_prometheusstatconsumer.cpp b/src/plugins/bmqprometheus/bmqprometheus_prometheusstatconsumer.cpp index d82e0d5707..b7fa44559c 100644 --- a/src/plugins/bmqprometheus/bmqprometheus_prometheusstatconsumer.cpp +++ b/src/plugins/bmqprometheus/bmqprometheus_prometheusstatconsumer.cpp @@ -122,6 +122,12 @@ class Tagger { return *this; } + Tagger& setPort(const bslstl::StringRef& value) + { + labels["Port"] = value; + return *this; + } + // ACCESSORS ::prometheus::Labels& getLabels() { return labels; } }; @@ -445,12 +451,9 @@ void PrometheusStatConsumer::captureSystemStats() #undef COPY_METRIC - ::prometheus::Labels labels{{"DataType", "host-data"}}; - bslstl::StringRef instanceName = - mqbcfg::BrokerConfig::get().brokerInstanceName(); - if (!instanceName.empty()) { - labels.emplace("instanceName", instanceName); - } + Tagger tagger; + tagger.setInstance(mqbcfg::BrokerConfig::get().brokerInstanceName()) + .setDataType("host-data"); for (bsl::vector >::iterator it = datapoints.begin(); @@ -458,7 +461,7 @@ void PrometheusStatConsumer::captureSystemStats() ++it) { auto& gauge = ::prometheus::BuildGauge().Name(it->first).Register( *d_prometheusRegistry_p); - gauge.Add(labels).Set(it->second); + gauge.Add(tagger.getLabels()).Set(it->second); } // POSTCONDITIONS @@ -479,6 +482,10 @@ void PrometheusStatConsumer::captureNetworkStats() // NOTE: Should be StatController::k_CHANNEL_STAT_*, but can't due to // dependency limitations. + Tagger tagger; + tagger.setInstance(mqbcfg::BrokerConfig::get().brokerInstanceName()) + .setDataType("host-data"); + #define RETRIEVE_METRIC(TAIL, STAT, CONTEXT) \ datapoints.emplace_back("brkr_system_net_" TAIL, \ mwcio::StatChannelFactoryUtil::getValue( \ @@ -493,22 +500,56 @@ void PrometheusStatConsumer::captureNetworkStats() #undef RETRIEVE_METRIC - ::prometheus::Labels labels{{"DataType", "host-data"}}; - bslstl::StringRef instanceName = - mqbcfg::BrokerConfig::get().brokerInstanceName(); - if (!instanceName.empty()) { - labels.emplace("instanceName", instanceName); - } - for (bsl::vector >::iterator it = datapoints.begin(); it != datapoints.end(); ++it) { auto& counter = ::prometheus::BuildCounter().Name(it->first).Register( *d_prometheusRegistry_p); - counter.Add(labels).Increment(it->second); + counter.Add(tagger.getLabels()).Increment(it->second); } + auto reportConnections = [&](const bsl::string& metricName, + const mwcst::StatContext* context) { + // In order to eliminate possible duplication of port contexts + // aggregate them before posting + bsl::unordered_map > + portMap; + + mwcst::StatContextIterator it = context->subcontextIterator(); + for (; it; ++it) { + if (it->isDeleted()) { + // As we iterate over 'living' sub contexts in the begining and + // over deleted sub contexts in the end, we can just stop here. + break; + } + tagger.setPort(bsl::to_string(it->id())); + ::prometheus::BuildCounter() + .Name("brkr_system_net_" + metricName + "_delta") + .Register(*d_prometheusRegistry_p) + .Add(tagger.getLabels()) + .Increment(static_cast( + mwcio::StatChannelFactoryUtil::getValue( + *it, + d_snapshotId, + mwcio::StatChannelFactoryUtil::Stat:: + e_CONNECTIONS_DELTA))); + ::prometheus::BuildGauge() + .Name("brkr_system_net_" + metricName) + .Register(*d_prometheusRegistry_p) + .Add(tagger.getLabels()) + .Set(static_cast< + double>(mwcio::StatChannelFactoryUtil::getValue( + *it, + d_snapshotId, + mwcio::StatChannelFactoryUtil::Stat::e_CONNECTIONS_ABS))); + } + }; + + reportConnections("local_tcp_connections", localContext); + reportConnections("remote_tcp_connections", remoteContext); + // POSTCONDITIONS BSLS_ASSERT_SAFE(datapoints.size() == k_NUM_NETWORK_STATS); }