diff --git a/source/extensions/filters/network/thrift_proxy/router/router.h b/source/extensions/filters/network/thrift_proxy/router/router.h index 50135fe02340..603a08c4c015 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router.h +++ b/source/extensions/filters/network/thrift_proxy/router/router.h @@ -88,11 +88,37 @@ class Config { using ConfigConstSharedPtr = std::shared_ptr; +#define ALL_THRIFT_ROUTER_STATS(COUNTER, GAUGE, HISTOGRAM) \ + COUNTER(route_missing) \ + COUNTER(unknown_cluster) \ + COUNTER(upstream_rq_maintenance_mode) \ + COUNTER(no_healthy_upstream) + +struct RouterStats { + ALL_THRIFT_ROUTER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT, GENERATE_HISTOGRAM_STRUCT) +}; + /** * This interface is used by an upstream request to communicate its state. */ class RequestOwner : public ProtocolConverter { public: + RequestOwner(Upstream::ClusterManager& cluster_manager, const std::string& stat_prefix, + Stats::Scope& scope) + : cluster_manager_(cluster_manager), stats_(generateStats(stat_prefix, scope)), + stat_name_set_(scope.symbolTable().makeSet("thrift_proxy")), + symbol_table_(scope.symbolTable()), + upstream_rq_call_(stat_name_set_->add("thrift.upstream_rq_call")), + upstream_rq_oneway_(stat_name_set_->add("thrift.upstream_rq_oneway")), + upstream_rq_invalid_type_(stat_name_set_->add("thrift.upstream_rq_invalid_type")), + upstream_resp_reply_(stat_name_set_->add("thrift.upstream_resp_reply")), + upstream_resp_reply_success_(stat_name_set_->add("thrift.upstream_resp_success")), + upstream_resp_reply_error_(stat_name_set_->add("thrift.upstream_resp_error")), + upstream_resp_exception_(stat_name_set_->add("thrift.upstream_resp_exception")), + upstream_resp_invalid_type_(stat_name_set_->add("thrift.upstream_resp_invalid_type")), + upstream_rq_time_(stat_name_set_->add("thrift.upstream_rq_time")), + upstream_rq_size_(stat_name_set_->add("thrift.upstream_rq_size")), + upstream_resp_size_(stat_name_set_->add("thrift.upstream_resp_size")) {} ~RequestOwner() override = default; /** @@ -149,6 +175,137 @@ class RequestOwner : public ProtocolConverter { * @param unit Unit the unit of the duration. */ virtual void recordResponseDuration(uint64_t value, Stats::Histogram::Unit unit) PURE; + + /** + * @return Upstream::ClusterManager& the cluster manager. + */ + Upstream::ClusterManager& clusterManager() { return cluster_manager_; } + + /** + * Common stats. + */ + RouterStats& stats() { return stats_; } + + /** + * Increment counter for received responses that are replies. + */ + void incResponseReply(const Upstream::ClusterInfo& cluster) { + incClusterScopeCounter(cluster, {upstream_resp_reply_}); + } + + /** + * Increment counter for request calls. + */ + void incRequestCall(const Upstream::ClusterInfo& cluster) { + incClusterScopeCounter(cluster, {upstream_rq_call_}); + } + + /** + * Increment counter for requests that are one way only. + */ + void incRequestOneWay(const Upstream::ClusterInfo& cluster) { + incClusterScopeCounter(cluster, {upstream_rq_oneway_}); + } + + /** + * Increment counter for requests that are invalid. + */ + void incRequestInvalid(const Upstream::ClusterInfo& cluster) { + incClusterScopeCounter(cluster, {upstream_rq_invalid_type_}); + } + + /** + * Increment counter for received responses that are replies that are successful. + */ + void incResponseReplySuccess(const Upstream::ClusterInfo& cluster) { + incClusterScopeCounter(cluster, {upstream_resp_reply_success_}); + } + + /** + * Increment counter for received responses that are replies that are an error. + */ + void incResponseReplyError(const Upstream::ClusterInfo& cluster) { + incClusterScopeCounter(cluster, {upstream_resp_reply_error_}); + } + + /** + * Increment counter for received responses that are exceptions. + */ + void incResponseException(const Upstream::ClusterInfo& cluster) { + incClusterScopeCounter(cluster, {upstream_resp_exception_}); + } + + /** + * Increment counter for received responses that are invalid. + */ + void incResponseInvalidType(const Upstream::ClusterInfo& cluster) { + incClusterScopeCounter(cluster, {upstream_resp_invalid_type_}); + } + + /** + * Record a value for the request size histogram. + */ + void recordUpstreamRequestSize(const Upstream::ClusterInfo& cluster, uint64_t value) { + recordClusterScopeHistogram(cluster, {upstream_rq_size_}, Stats::Histogram::Unit::Bytes, value); + } + + /** + * Record a value for the response size histogram. + */ + void recordUpstreamResponseSize(const Upstream::ClusterInfo& cluster, uint64_t value) { + recordClusterScopeHistogram(cluster, {upstream_resp_size_}, Stats::Histogram::Unit::Bytes, + value); + } + + /** + * Records the duration of the request for a given cluster. + * + * @param cluster ClusterInfo the cluster to record the duration for. + * @param value uint64_t the value of the duration. + * @param unit Unit the unit of the duration. + */ + void recordClusterResponseDuration(const Upstream::ClusterInfo& cluster, uint64_t value, + Stats::Histogram::Unit unit) { + recordClusterScopeHistogram(cluster, {upstream_rq_time_}, unit, value); + } + +private: + void incClusterScopeCounter(const Upstream::ClusterInfo& cluster, + const Stats::StatNameVec& names) const { + const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join(names); + cluster.statsScope().counterFromStatName(Stats::StatName(stat_name_storage.get())).inc(); + } + + void recordClusterScopeHistogram(const Upstream::ClusterInfo& cluster, + const Stats::StatNameVec& names, Stats::Histogram::Unit unit, + uint64_t value) const { + const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join(names); + cluster.statsScope() + .histogramFromStatName(Stats::StatName(stat_name_storage.get()), unit) + .recordValue(value); + } + + RouterStats generateStats(const std::string& prefix, Stats::Scope& scope) { + return RouterStats{ALL_THRIFT_ROUTER_STATS(POOL_COUNTER_PREFIX(scope, prefix), + POOL_GAUGE_PREFIX(scope, prefix), + POOL_HISTOGRAM_PREFIX(scope, prefix))}; + } + + Upstream::ClusterManager& cluster_manager_; + RouterStats stats_; + Stats::StatNameSetPtr stat_name_set_; + Stats::SymbolTable& symbol_table_; + const Stats::StatName upstream_rq_call_; + const Stats::StatName upstream_rq_oneway_; + const Stats::StatName upstream_rq_invalid_type_; + const Stats::StatName upstream_resp_reply_; + const Stats::StatName upstream_resp_reply_success_; + const Stats::StatName upstream_resp_reply_error_; + const Stats::StatName upstream_resp_exception_; + const Stats::StatName upstream_resp_invalid_type_; + const Stats::StatName upstream_rq_time_; + const Stats::StatName upstream_rq_size_; + const Stats::StatName upstream_resp_size_; }; } // namespace Router diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc index 66acf6ca0786..77e2e1897030 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc @@ -214,7 +214,7 @@ FilterStatus Router::messageBegin(MessageMetadataSharedPtr metadata) { route_ = callbacks_->route(); if (!route_) { ENVOY_STREAM_LOG(debug, "no route match for method '{}'", *callbacks_, metadata->methodName()); - stats_.route_missing_.inc(); + stats().route_missing_.inc(); callbacks_->sendLocalReply( AppException(AppExceptionType::UnknownMethod, fmt::format("no route for method '{}'", metadata->methodName())), @@ -225,10 +225,10 @@ FilterStatus Router::messageBegin(MessageMetadataSharedPtr metadata) { route_entry_ = route_->routeEntry(); const std::string& cluster_name = route_entry_->clusterName(); - Upstream::ThreadLocalCluster* cluster = cluster_manager_.getThreadLocalCluster(cluster_name); + Upstream::ThreadLocalCluster* cluster = clusterManager().getThreadLocalCluster(cluster_name); if (!cluster) { ENVOY_STREAM_LOG(debug, "unknown cluster '{}'", *callbacks_, cluster_name); - stats_.unknown_cluster_.inc(); + stats().unknown_cluster_.inc(); callbacks_->sendLocalReply(AppException(AppExceptionType::InternalError, fmt::format("unknown cluster '{}'", cluster_name)), true); @@ -240,20 +240,20 @@ FilterStatus Router::messageBegin(MessageMetadataSharedPtr metadata) { metadata->methodName()); switch (metadata->messageType()) { case MessageType::Call: - incClusterScopeCounter({upstream_rq_call_}); + incRequestCall(*cluster_); break; case MessageType::Oneway: - incClusterScopeCounter({upstream_rq_oneway_}); + incRequestOneWay(*cluster_); break; default: - incClusterScopeCounter({upstream_rq_invalid_type_}); + incRequestInvalid(*cluster_); break; } if (cluster_->maintenanceMode()) { - stats_.upstream_rq_maintenance_mode_.inc(); + stats().upstream_rq_maintenance_mode_.inc(); callbacks_->sendLocalReply( AppException(AppExceptionType::InternalError, fmt::format("maintenance mode for cluster '{}'", cluster_name)), @@ -282,7 +282,7 @@ FilterStatus Router::messageBegin(MessageMetadataSharedPtr metadata) { auto conn_pool_data = cluster->tcpConnPool(Upstream::ResourcePriority::Default, this); if (!conn_pool_data) { - stats_.no_healthy_upstream_.inc(); + stats().no_healthy_upstream_.inc(); callbacks_->sendLocalReply( AppException(AppExceptionType::InternalError, fmt::format("no healthy upstream for '{}'", cluster_name)), @@ -316,7 +316,7 @@ FilterStatus Router::messageEnd() { upstream_request_buffer_); request_size_ += transport_buffer.length(); - recordClusterScopeHistogram({upstream_rq_size_}, Stats::Histogram::Unit::Bytes, request_size_); + recordUpstreamRequestSize(*cluster_, request_size_); upstream_request_->conn_data_->connection().write(transport_buffer, false); upstream_request_->onRequestComplete(); @@ -355,31 +355,31 @@ void Router::onUpstreamData(Buffer::Instance& data, bool end_stream) { ThriftFilters::ResponseStatus status = callbacks_->upstreamData(data); if (status == ThriftFilters::ResponseStatus::Complete) { ENVOY_STREAM_LOG(debug, "response complete", *callbacks_); - recordClusterScopeHistogram({upstream_resp_size_}, Stats::Histogram::Unit::Bytes, - response_size_); + + recordUpstreamResponseSize(*cluster_, response_size_); switch (callbacks_->responseMetadata()->messageType()) { case MessageType::Reply: - incClusterScopeCounter({upstream_resp_reply_}); + incResponseReply(*cluster_); if (callbacks_->responseSuccess()) { upstream_request_->upstream_host_->outlierDetector().putResult( Upstream::Outlier::Result::ExtOriginRequestSuccess); - incClusterScopeCounter({upstream_resp_reply_success_}); + incResponseReplySuccess(*cluster_); } else { upstream_request_->upstream_host_->outlierDetector().putResult( Upstream::Outlier::Result::ExtOriginRequestFailed); - incClusterScopeCounter({upstream_resp_reply_error_}); + incResponseReplyError(*cluster_); } break; case MessageType::Exception: upstream_request_->upstream_host_->outlierDetector().putResult( Upstream::Outlier::Result::ExtOriginRequestFailed); - incClusterScopeCounter({upstream_resp_exception_}); + incResponseException(*cluster_); break; default: - incClusterScopeCounter({upstream_resp_invalid_type_}); + incResponseInvalidType(*cluster_); break; } upstream_request_->onResponseComplete(); diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.h b/source/extensions/filters/network/thrift_proxy/router/router_impl.h index ab916d5bc29e..502a2659cff6 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.h +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.h @@ -159,16 +159,6 @@ class RouteMatcher { std::vector routes_; }; -#define ALL_THRIFT_ROUTER_STATS(COUNTER, GAUGE, HISTOGRAM) \ - COUNTER(route_missing) \ - COUNTER(unknown_cluster) \ - COUNTER(upstream_rq_maintenance_mode) \ - COUNTER(no_healthy_upstream) - -struct RouterStats { - ALL_THRIFT_ROUTER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT, GENERATE_HISTOGRAM_STRUCT) -}; - class Router : public Tcp::ConnectionPool::UpstreamCallbacks, public Upstream::LoadBalancerContextBase, public RequestOwner, @@ -177,21 +167,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, public: Router(Upstream::ClusterManager& cluster_manager, const std::string& stat_prefix, Stats::Scope& scope) - : cluster_manager_(cluster_manager), stats_(generateStats(stat_prefix, scope)), - stat_name_set_(scope.symbolTable().makeSet("thrift_proxy")), - symbol_table_(scope.symbolTable()), - upstream_rq_call_(stat_name_set_->add("thrift.upstream_rq_call")), - upstream_rq_oneway_(stat_name_set_->add("thrift.upstream_rq_oneway")), - upstream_rq_invalid_type_(stat_name_set_->add("thrift.upstream_rq_invalid_type")), - upstream_resp_reply_(stat_name_set_->add("thrift.upstream_resp_reply")), - upstream_resp_reply_success_(stat_name_set_->add("thrift.upstream_resp_success")), - upstream_resp_reply_error_(stat_name_set_->add("thrift.upstream_resp_error")), - upstream_resp_exception_(stat_name_set_->add("thrift.upstream_resp_exception")), - upstream_resp_invalid_type_(stat_name_set_->add("thrift.upstream_resp_invalid_type")), - upstream_rq_time_(stat_name_set_->add("thrift.upstream_rq_time")), - upstream_rq_size_(stat_name_set_->add("thrift.upstream_rq_size")), - upstream_resp_size_(stat_name_set_->add("thrift.upstream_resp_size")), - passthrough_supported_(false) {} + : RequestOwner(cluster_manager, stat_prefix, scope), passthrough_supported_(false) {} ~Router() override = default; @@ -211,7 +187,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, callbacks_->sendLocalReply(response, end_stream); } void recordResponseDuration(uint64_t value, Stats::Histogram::Unit unit) override { - recordClusterScopeHistogram({upstream_rq_time_}, unit, value); + recordClusterResponseDuration(*cluster_, value, unit); } // RequestOwner::ProtocolConverter @@ -223,10 +199,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, // Upstream::LoadBalancerContext const Network::Connection* downstreamConnection() const override; const Envoy::Router::MetadataMatchCriteria* metadataMatchCriteria() override { - if (route_entry_) { - return route_entry_->metadataMatchCriteria(); - } - return nullptr; + return route_entry_ ? route_entry_->metadataMatchCriteria() : nullptr; } // Tcp::ConnectionPool::UpstreamCallbacks @@ -280,42 +253,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, MonotonicTime downstream_request_complete_time_; }; - // Stats - void incClusterScopeCounter(const Stats::StatNameVec& names) const { - const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join(names); - cluster_->statsScope().counterFromStatName(Stats::StatName(stat_name_storage.get())).inc(); - } - - void recordClusterScopeHistogram(const Stats::StatNameVec& names, Stats::Histogram::Unit unit, - uint64_t count) const { - const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join(names); - cluster_->statsScope() - .histogramFromStatName(Stats::StatName(stat_name_storage.get()), unit) - .recordValue(count); - } - void cleanup(); - RouterStats generateStats(const std::string& prefix, Stats::Scope& scope) { - return RouterStats{ALL_THRIFT_ROUTER_STATS(POOL_COUNTER_PREFIX(scope, prefix), - POOL_GAUGE_PREFIX(scope, prefix), - POOL_HISTOGRAM_PREFIX(scope, prefix))}; - } - - Upstream::ClusterManager& cluster_manager_; - RouterStats stats_; - Stats::StatNameSetPtr stat_name_set_; - Stats::SymbolTable& symbol_table_; - const Stats::StatName upstream_rq_call_; - const Stats::StatName upstream_rq_oneway_; - const Stats::StatName upstream_rq_invalid_type_; - const Stats::StatName upstream_resp_reply_; - const Stats::StatName upstream_resp_reply_success_; - const Stats::StatName upstream_resp_reply_error_; - const Stats::StatName upstream_resp_exception_; - const Stats::StatName upstream_resp_invalid_type_; - const Stats::StatName upstream_rq_time_; - const Stats::StatName upstream_rq_size_; - const Stats::StatName upstream_resp_size_; ThriftFilters::DecoderFilterCallbacks* callbacks_{}; RouteConstSharedPtr route_{};