Skip to content

Commit

Permalink
thrift proxy: move more of Router into RequestOwner (envoyproxy#17376)
Browse files Browse the repository at this point in the history
This is PR 2/4 towards implementing shadow requests for thrift.

This moves all the stats into the common RequestOwner interface so that
they can be reused. The next, similar sized, PR will decouple
UpstreamRequest from the Router so that it can be reused by the upcoming
ShadowRequest class.

The final PR should introduce the ShadowWriter and ShadowRequest classes
and make use of RequestOwner to tie them together.

Risk Level: low, refactor
Testing: existing tests pass
Docs Changes: n/a
Release Notes: n/a
Signed-off-by: Raul Gutierrez Segales <[email protected]>
  • Loading branch information
Raúl Gutiérrez Segalés authored and Le Yao committed Sep 30, 2021
1 parent d50d575 commit 5bb9b06
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 81 deletions.
157 changes: 157 additions & 0 deletions source/extensions/filters/network/thrift_proxy/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,37 @@ class Config {

using ConfigConstSharedPtr = std::shared_ptr<const Config>;

#define ALL_THRIFT_ROUTER_STATS(COUNTER, GAUGE, HISTOGRAM) \
COUNTER(route_missing) \
COUNTER(unknown_cluster) \
COUNTER(upstream_rq_maintenance_mode) \
COUNTER(no_healthy_upstream)

struct RouterStats {
ALL_THRIFT_ROUTER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT, GENERATE_HISTOGRAM_STRUCT)
};

/**
* This interface is used by an upstream request to communicate its state.
*/
class RequestOwner : public ProtocolConverter {
public:
RequestOwner(Upstream::ClusterManager& cluster_manager, const std::string& stat_prefix,
Stats::Scope& scope)
: cluster_manager_(cluster_manager), stats_(generateStats(stat_prefix, scope)),
stat_name_set_(scope.symbolTable().makeSet("thrift_proxy")),
symbol_table_(scope.symbolTable()),
upstream_rq_call_(stat_name_set_->add("thrift.upstream_rq_call")),
upstream_rq_oneway_(stat_name_set_->add("thrift.upstream_rq_oneway")),
upstream_rq_invalid_type_(stat_name_set_->add("thrift.upstream_rq_invalid_type")),
upstream_resp_reply_(stat_name_set_->add("thrift.upstream_resp_reply")),
upstream_resp_reply_success_(stat_name_set_->add("thrift.upstream_resp_success")),
upstream_resp_reply_error_(stat_name_set_->add("thrift.upstream_resp_error")),
upstream_resp_exception_(stat_name_set_->add("thrift.upstream_resp_exception")),
upstream_resp_invalid_type_(stat_name_set_->add("thrift.upstream_resp_invalid_type")),
upstream_rq_time_(stat_name_set_->add("thrift.upstream_rq_time")),
upstream_rq_size_(stat_name_set_->add("thrift.upstream_rq_size")),
upstream_resp_size_(stat_name_set_->add("thrift.upstream_resp_size")) {}
~RequestOwner() override = default;

/**
Expand Down Expand Up @@ -149,6 +175,137 @@ class RequestOwner : public ProtocolConverter {
* @param unit Unit the unit of the duration.
*/
virtual void recordResponseDuration(uint64_t value, Stats::Histogram::Unit unit) PURE;

/**
* @return Upstream::ClusterManager& the cluster manager.
*/
Upstream::ClusterManager& clusterManager() { return cluster_manager_; }

/**
* Common stats.
*/
RouterStats& stats() { return stats_; }

/**
* Increment counter for received responses that are replies.
*/
void incResponseReply(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_resp_reply_});
}

/**
* Increment counter for request calls.
*/
void incRequestCall(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_rq_call_});
}

/**
* Increment counter for requests that are one way only.
*/
void incRequestOneWay(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_rq_oneway_});
}

/**
* Increment counter for requests that are invalid.
*/
void incRequestInvalid(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_rq_invalid_type_});
}

/**
* Increment counter for received responses that are replies that are successful.
*/
void incResponseReplySuccess(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_resp_reply_success_});
}

/**
* Increment counter for received responses that are replies that are an error.
*/
void incResponseReplyError(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_resp_reply_error_});
}

/**
* Increment counter for received responses that are exceptions.
*/
void incResponseException(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_resp_exception_});
}

/**
* Increment counter for received responses that are invalid.
*/
void incResponseInvalidType(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_resp_invalid_type_});
}

/**
* Record a value for the request size histogram.
*/
void recordUpstreamRequestSize(const Upstream::ClusterInfo& cluster, uint64_t value) {
recordClusterScopeHistogram(cluster, {upstream_rq_size_}, Stats::Histogram::Unit::Bytes, value);
}

/**
* Record a value for the response size histogram.
*/
void recordUpstreamResponseSize(const Upstream::ClusterInfo& cluster, uint64_t value) {
recordClusterScopeHistogram(cluster, {upstream_resp_size_}, Stats::Histogram::Unit::Bytes,
value);
}

/**
* Records the duration of the request for a given cluster.
*
* @param cluster ClusterInfo the cluster to record the duration for.
* @param value uint64_t the value of the duration.
* @param unit Unit the unit of the duration.
*/
void recordClusterResponseDuration(const Upstream::ClusterInfo& cluster, uint64_t value,
Stats::Histogram::Unit unit) {
recordClusterScopeHistogram(cluster, {upstream_rq_time_}, unit, value);
}

private:
void incClusterScopeCounter(const Upstream::ClusterInfo& cluster,
const Stats::StatNameVec& names) const {
const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join(names);
cluster.statsScope().counterFromStatName(Stats::StatName(stat_name_storage.get())).inc();
}

void recordClusterScopeHistogram(const Upstream::ClusterInfo& cluster,
const Stats::StatNameVec& names, Stats::Histogram::Unit unit,
uint64_t value) const {
const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join(names);
cluster.statsScope()
.histogramFromStatName(Stats::StatName(stat_name_storage.get()), unit)
.recordValue(value);
}

RouterStats generateStats(const std::string& prefix, Stats::Scope& scope) {
return RouterStats{ALL_THRIFT_ROUTER_STATS(POOL_COUNTER_PREFIX(scope, prefix),
POOL_GAUGE_PREFIX(scope, prefix),
POOL_HISTOGRAM_PREFIX(scope, prefix))};
}

Upstream::ClusterManager& cluster_manager_;
RouterStats stats_;
Stats::StatNameSetPtr stat_name_set_;
Stats::SymbolTable& symbol_table_;
const Stats::StatName upstream_rq_call_;
const Stats::StatName upstream_rq_oneway_;
const Stats::StatName upstream_rq_invalid_type_;
const Stats::StatName upstream_resp_reply_;
const Stats::StatName upstream_resp_reply_success_;
const Stats::StatName upstream_resp_reply_error_;
const Stats::StatName upstream_resp_exception_;
const Stats::StatName upstream_resp_invalid_type_;
const Stats::StatName upstream_rq_time_;
const Stats::StatName upstream_rq_size_;
const Stats::StatName upstream_resp_size_;
};

} // namespace Router
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ FilterStatus Router::messageBegin(MessageMetadataSharedPtr metadata) {
route_ = callbacks_->route();
if (!route_) {
ENVOY_STREAM_LOG(debug, "no route match for method '{}'", *callbacks_, metadata->methodName());
stats_.route_missing_.inc();
stats().route_missing_.inc();
callbacks_->sendLocalReply(
AppException(AppExceptionType::UnknownMethod,
fmt::format("no route for method '{}'", metadata->methodName())),
Expand All @@ -225,10 +225,10 @@ FilterStatus Router::messageBegin(MessageMetadataSharedPtr metadata) {
route_entry_ = route_->routeEntry();
const std::string& cluster_name = route_entry_->clusterName();

Upstream::ThreadLocalCluster* cluster = cluster_manager_.getThreadLocalCluster(cluster_name);
Upstream::ThreadLocalCluster* cluster = clusterManager().getThreadLocalCluster(cluster_name);
if (!cluster) {
ENVOY_STREAM_LOG(debug, "unknown cluster '{}'", *callbacks_, cluster_name);
stats_.unknown_cluster_.inc();
stats().unknown_cluster_.inc();
callbacks_->sendLocalReply(AppException(AppExceptionType::InternalError,
fmt::format("unknown cluster '{}'", cluster_name)),
true);
Expand All @@ -240,20 +240,20 @@ FilterStatus Router::messageBegin(MessageMetadataSharedPtr metadata) {
metadata->methodName());
switch (metadata->messageType()) {
case MessageType::Call:
incClusterScopeCounter({upstream_rq_call_});
incRequestCall(*cluster_);
break;

case MessageType::Oneway:
incClusterScopeCounter({upstream_rq_oneway_});
incRequestOneWay(*cluster_);
break;

default:
incClusterScopeCounter({upstream_rq_invalid_type_});
incRequestInvalid(*cluster_);
break;
}

if (cluster_->maintenanceMode()) {
stats_.upstream_rq_maintenance_mode_.inc();
stats().upstream_rq_maintenance_mode_.inc();
callbacks_->sendLocalReply(
AppException(AppExceptionType::InternalError,
fmt::format("maintenance mode for cluster '{}'", cluster_name)),
Expand Down Expand Up @@ -282,7 +282,7 @@ FilterStatus Router::messageBegin(MessageMetadataSharedPtr metadata) {

auto conn_pool_data = cluster->tcpConnPool(Upstream::ResourcePriority::Default, this);
if (!conn_pool_data) {
stats_.no_healthy_upstream_.inc();
stats().no_healthy_upstream_.inc();
callbacks_->sendLocalReply(
AppException(AppExceptionType::InternalError,
fmt::format("no healthy upstream for '{}'", cluster_name)),
Expand Down Expand Up @@ -316,7 +316,7 @@ FilterStatus Router::messageEnd() {
upstream_request_buffer_);

request_size_ += transport_buffer.length();
recordClusterScopeHistogram({upstream_rq_size_}, Stats::Histogram::Unit::Bytes, request_size_);
recordUpstreamRequestSize(*cluster_, request_size_);

upstream_request_->conn_data_->connection().write(transport_buffer, false);
upstream_request_->onRequestComplete();
Expand Down Expand Up @@ -355,31 +355,31 @@ void Router::onUpstreamData(Buffer::Instance& data, bool end_stream) {
ThriftFilters::ResponseStatus status = callbacks_->upstreamData(data);
if (status == ThriftFilters::ResponseStatus::Complete) {
ENVOY_STREAM_LOG(debug, "response complete", *callbacks_);
recordClusterScopeHistogram({upstream_resp_size_}, Stats::Histogram::Unit::Bytes,
response_size_);

recordUpstreamResponseSize(*cluster_, response_size_);

switch (callbacks_->responseMetadata()->messageType()) {
case MessageType::Reply:
incClusterScopeCounter({upstream_resp_reply_});
incResponseReply(*cluster_);
if (callbacks_->responseSuccess()) {
upstream_request_->upstream_host_->outlierDetector().putResult(
Upstream::Outlier::Result::ExtOriginRequestSuccess);
incClusterScopeCounter({upstream_resp_reply_success_});
incResponseReplySuccess(*cluster_);
} else {
upstream_request_->upstream_host_->outlierDetector().putResult(
Upstream::Outlier::Result::ExtOriginRequestFailed);
incClusterScopeCounter({upstream_resp_reply_error_});
incResponseReplyError(*cluster_);
}
break;

case MessageType::Exception:
upstream_request_->upstream_host_->outlierDetector().putResult(
Upstream::Outlier::Result::ExtOriginRequestFailed);
incClusterScopeCounter({upstream_resp_exception_});
incResponseException(*cluster_);
break;

default:
incClusterScopeCounter({upstream_resp_invalid_type_});
incResponseInvalidType(*cluster_);
break;
}
upstream_request_->onResponseComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,6 @@ class RouteMatcher {
std::vector<RouteEntryImplBaseConstSharedPtr> routes_;
};

#define ALL_THRIFT_ROUTER_STATS(COUNTER, GAUGE, HISTOGRAM) \
COUNTER(route_missing) \
COUNTER(unknown_cluster) \
COUNTER(upstream_rq_maintenance_mode) \
COUNTER(no_healthy_upstream)

struct RouterStats {
ALL_THRIFT_ROUTER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT, GENERATE_HISTOGRAM_STRUCT)
};

class Router : public Tcp::ConnectionPool::UpstreamCallbacks,
public Upstream::LoadBalancerContextBase,
public RequestOwner,
Expand All @@ -177,21 +167,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,
public:
Router(Upstream::ClusterManager& cluster_manager, const std::string& stat_prefix,
Stats::Scope& scope)
: cluster_manager_(cluster_manager), stats_(generateStats(stat_prefix, scope)),
stat_name_set_(scope.symbolTable().makeSet("thrift_proxy")),
symbol_table_(scope.symbolTable()),
upstream_rq_call_(stat_name_set_->add("thrift.upstream_rq_call")),
upstream_rq_oneway_(stat_name_set_->add("thrift.upstream_rq_oneway")),
upstream_rq_invalid_type_(stat_name_set_->add("thrift.upstream_rq_invalid_type")),
upstream_resp_reply_(stat_name_set_->add("thrift.upstream_resp_reply")),
upstream_resp_reply_success_(stat_name_set_->add("thrift.upstream_resp_success")),
upstream_resp_reply_error_(stat_name_set_->add("thrift.upstream_resp_error")),
upstream_resp_exception_(stat_name_set_->add("thrift.upstream_resp_exception")),
upstream_resp_invalid_type_(stat_name_set_->add("thrift.upstream_resp_invalid_type")),
upstream_rq_time_(stat_name_set_->add("thrift.upstream_rq_time")),
upstream_rq_size_(stat_name_set_->add("thrift.upstream_rq_size")),
upstream_resp_size_(stat_name_set_->add("thrift.upstream_resp_size")),
passthrough_supported_(false) {}
: RequestOwner(cluster_manager, stat_prefix, scope), passthrough_supported_(false) {}

~Router() override = default;

Expand All @@ -211,7 +187,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,
callbacks_->sendLocalReply(response, end_stream);
}
void recordResponseDuration(uint64_t value, Stats::Histogram::Unit unit) override {
recordClusterScopeHistogram({upstream_rq_time_}, unit, value);
recordClusterResponseDuration(*cluster_, value, unit);
}

// RequestOwner::ProtocolConverter
Expand All @@ -223,10 +199,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,
// Upstream::LoadBalancerContext
const Network::Connection* downstreamConnection() const override;
const Envoy::Router::MetadataMatchCriteria* metadataMatchCriteria() override {
if (route_entry_) {
return route_entry_->metadataMatchCriteria();
}
return nullptr;
return route_entry_ ? route_entry_->metadataMatchCriteria() : nullptr;
}

// Tcp::ConnectionPool::UpstreamCallbacks
Expand Down Expand Up @@ -280,42 +253,7 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,
MonotonicTime downstream_request_complete_time_;
};

// Stats
void incClusterScopeCounter(const Stats::StatNameVec& names) const {
const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join(names);
cluster_->statsScope().counterFromStatName(Stats::StatName(stat_name_storage.get())).inc();
}

void recordClusterScopeHistogram(const Stats::StatNameVec& names, Stats::Histogram::Unit unit,
uint64_t count) const {
const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join(names);
cluster_->statsScope()
.histogramFromStatName(Stats::StatName(stat_name_storage.get()), unit)
.recordValue(count);
}

void cleanup();
RouterStats generateStats(const std::string& prefix, Stats::Scope& scope) {
return RouterStats{ALL_THRIFT_ROUTER_STATS(POOL_COUNTER_PREFIX(scope, prefix),
POOL_GAUGE_PREFIX(scope, prefix),
POOL_HISTOGRAM_PREFIX(scope, prefix))};
}

Upstream::ClusterManager& cluster_manager_;
RouterStats stats_;
Stats::StatNameSetPtr stat_name_set_;
Stats::SymbolTable& symbol_table_;
const Stats::StatName upstream_rq_call_;
const Stats::StatName upstream_rq_oneway_;
const Stats::StatName upstream_rq_invalid_type_;
const Stats::StatName upstream_resp_reply_;
const Stats::StatName upstream_resp_reply_success_;
const Stats::StatName upstream_resp_reply_error_;
const Stats::StatName upstream_resp_exception_;
const Stats::StatName upstream_resp_invalid_type_;
const Stats::StatName upstream_rq_time_;
const Stats::StatName upstream_rq_size_;
const Stats::StatName upstream_resp_size_;

ThriftFilters::DecoderFilterCallbacks* callbacks_{};
RouteConstSharedPtr route_{};
Expand Down

0 comments on commit 5bb9b06

Please sign in to comment.