Skip to content

Commit

Permalink
grpc-mux: refactor c'tor arguments (#28781)
Browse files Browse the repository at this point in the history
Following #28273 this is a cleanup requested to pass c'tor parameters in a single struct.

Risk Level: low - refactor
Testing: N/A

Signed-off-by: Adi Suissa-Peleg <[email protected]>
  • Loading branch information
adisuissa authored Aug 8, 2023
1 parent b47a3bd commit 69595b4
Show file tree
Hide file tree
Showing 18 changed files with 437 additions and 348 deletions.
15 changes: 15 additions & 0 deletions source/extensions/config_subscription/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,26 @@ licenses(["notice"]) # Apache 2

envoy_extension_package()

envoy_cc_library(
name = "grpc_mux_context_lib",
hdrs = ["grpc_mux_context.h"],
deps = [
"//envoy/config:custom_config_validators_interface",
"//envoy/config:eds_resources_cache_interface",
"//envoy/config:xds_config_tracker_interface",
"//envoy/config:xds_resources_delegate_interface",
"//envoy/upstream:cluster_manager_interface",
"//source/common/config:utility_lib",
],
)

envoy_cc_library(
name = "grpc_mux_lib",
srcs = ["grpc_mux_impl.cc"],
hdrs = ["grpc_mux_impl.h"],
deps = [
":eds_resources_cache_lib",
":grpc_mux_context_lib",
":grpc_stream_lib",
":xds_source_id_lib",
"//envoy/config:custom_config_validators_interface",
Expand Down Expand Up @@ -47,6 +61,7 @@ envoy_cc_library(
deps = [
":delta_subscription_state_lib",
":eds_resources_cache_lib",
":grpc_mux_context_lib",
":grpc_stream_lib",
":pausable_ack_queue_lib",
":watch_map_lib",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,24 @@ SubscriptionPtr DeltaGrpcCollectionConfigSubscriptionFactory::create(
api_config_source, data.api_.randomGenerator(), SubscriptionFactory::RetryInitialDelayMs,
SubscriptionFactory::RetryMaxDelayMs);

GrpcMuxContext grpc_mux_context{
/*async_client_=*/Config::Utility::factoryForGrpcApiConfigSource(
data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true)
->createUncachedRawAsyncClient(),
/*dispatcher_=*/data.dispatcher_,
/*service_method_=*/deltaGrpcMethod(data.type_url_),
/*local_info_=*/data.local_info_,
/*rate_limit_settings_=*/Utility::parseRateLimitSettings(api_config_source),
/*scope_=*/data.scope_,
/*config_validators_=*/std::move(custom_config_validators),
/*xds_resources_delegate_=*/{},
/*xds_config_tracker_=*/data.xds_config_tracker_,
/*backoff_strategy_=*/std::move(backoff_strategy),
/*target_xds_authority_=*/"",
/*eds_resources_cache_=*/nullptr // No EDS resources cache needed from collections.
};
return std::make_unique<GrpcCollectionSubscriptionImpl>(
data.collection_locator_.value(),
std::make_shared<Config::NewGrpcMuxImpl>(
Config::Utility::factoryForGrpcApiConfigSource(data.cm_.grpcAsyncClientManager(),
api_config_source, data.scope_, true)
->createUncachedRawAsyncClient(),
data.dispatcher_, deltaGrpcMethod(data.type_url_), data.scope_,
Utility::parseRateLimitSettings(api_config_source), data.local_info_,
std::move(custom_config_validators), std::move(backoff_strategy),
data.xds_config_tracker_,
// No EDS resources cache needed from collections.
/*eds_resources_cache=*/nullptr),
data.collection_locator_.value(), std::make_shared<Config::NewGrpcMuxImpl>(grpc_mux_context),
data.callbacks_, data.resource_decoder_, data.stats_, data.dispatcher_,
Utility::configSourceInitialFetchTimeout(data.config_), /*is_aggregated=*/false,
data.options_);
Expand Down
36 changes: 36 additions & 0 deletions source/extensions/config_subscription/grpc/grpc_mux_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once

#include "envoy/common/backoff_strategy.h"
#include "envoy/config/custom_config_validators.h"
#include "envoy/config/eds_resources_cache.h"
#include "envoy/config/xds_config_tracker.h"
#include "envoy/config/xds_resources_delegate.h"
#include "envoy/event/dispatcher.h"
#include "envoy/grpc/async_client.h"
#include "envoy/local_info/local_info.h"
#include "envoy/stats/scope.h"

#include "source/common/config/utility.h"

namespace Envoy {
namespace Config {

// Context (data) needed for creating a GrpcMux object.
// These are parameters needed for the creation of all GrpcMux objects.
struct GrpcMuxContext {
Grpc::RawAsyncClientPtr async_client_;
Event::Dispatcher& dispatcher_;
const Protobuf::MethodDescriptor& service_method_;
const LocalInfo::LocalInfo& local_info_;
const RateLimitSettings& rate_limit_settings_;
Stats::Scope& scope_;
CustomConfigValidatorsPtr config_validators_;
XdsResourcesDelegateOptRef xds_resources_delegate_;
XdsConfigTrackerOptRef xds_config_tracker_;
BackOffStrategyPtr backoff_strategy_;
const std::string& target_xds_authority_;
EdsResourcesCachePtr eds_resources_cache_;
};

} // namespace Config
} // namespace Envoy
61 changes: 34 additions & 27 deletions source/extensions/config_subscription/grpc/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,24 @@ std::string convertToWildcard(const std::string& resource_name) {
}
} // namespace

GrpcMuxImpl::GrpcMuxImpl(
const LocalInfo::LocalInfo& local_info, Grpc::RawAsyncClientPtr async_client,
Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method,
Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, bool skip_subsequent_node,
CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy,
XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate,
EdsResourcesCachePtr eds_resources_cache, const std::string& target_xds_authority)
: grpc_stream_(this, std::move(async_client), service_method, dispatcher, scope,
std::move(backoff_strategy), rate_limit_settings),
local_info_(local_info), skip_subsequent_node_(skip_subsequent_node),
config_validators_(std::move(config_validators)), xds_config_tracker_(xds_config_tracker),
xds_resources_delegate_(xds_resources_delegate),
eds_resources_cache_(std::move(eds_resources_cache)),
target_xds_authority_(target_xds_authority), first_stream_request_(true),
dispatcher_(dispatcher),
dynamic_update_callback_handle_(local_info.contextProvider().addDynamicContextUpdateCallback(
[this](absl::string_view resource_type_url) {
onDynamicContextUpdate(resource_type_url);
})) {
Config::Utility::checkLocalInfo("ads", local_info);
GrpcMuxImpl::GrpcMuxImpl(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node)
: grpc_stream_(this, std::move(grpc_mux_context.async_client_),
grpc_mux_context.service_method_, grpc_mux_context.dispatcher_,
grpc_mux_context.scope_, std::move(grpc_mux_context.backoff_strategy_),
grpc_mux_context.rate_limit_settings_),
local_info_(grpc_mux_context.local_info_), skip_subsequent_node_(skip_subsequent_node),
config_validators_(std::move(grpc_mux_context.config_validators_)),
xds_config_tracker_(grpc_mux_context.xds_config_tracker_),
xds_resources_delegate_(grpc_mux_context.xds_resources_delegate_),
eds_resources_cache_(std::move(grpc_mux_context.eds_resources_cache_)),
target_xds_authority_(grpc_mux_context.target_xds_authority_), first_stream_request_(true),
dispatcher_(grpc_mux_context.dispatcher_),
dynamic_update_callback_handle_(
grpc_mux_context.local_info_.contextProvider().addDynamicContextUpdateCallback(
[this](absl::string_view resource_type_url) {
onDynamicContextUpdate(resource_type_url);
})) {
Config::Utility::checkLocalInfo("ads", local_info_);
AllMuxes::get().insert(this);
}

Expand Down Expand Up @@ -553,18 +551,27 @@ class GrpcMuxFactory : public MuxFactory {
const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators,
BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker,
XdsResourcesDelegateOptRef xds_resources_delegate, bool use_eds_resources_cache) override {
return std::make_shared<Config::GrpcMuxImpl>(
local_info, std::move(async_client), dispatcher,
GrpcMuxContext grpc_mux_context{
/*async_client_=*/std::move(async_client),
/*dispatcher_=*/dispatcher,
/*service_method_=*/
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"),
scope, Utility::parseRateLimitSettings(ads_config),
ads_config.set_node_on_first_message_only(), std::move(config_validators),
std::move(backoff_strategy), xds_config_tracker, xds_resources_delegate,
/*local_info_=*/local_info,
/*rate_limit_settings_=*/Utility::parseRateLimitSettings(ads_config),
/*scope_=*/scope,
/*config_validators_=*/std::move(config_validators),
/*xds_resources_delegate_=*/xds_resources_delegate,
/*xds_config_tracker_=*/xds_config_tracker,
/*backoff_strategy_=*/std::move(backoff_strategy),
/*target_xds_authority_=*/Config::Utility::getGrpcControlPlane(ads_config).value_or(""),
/*eds_resources_cache_=*/
(use_eds_resources_cache &&
Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads"))
? std::make_unique<EdsResourcesCacheImpl>(dispatcher)
: nullptr,
Config::Utility::getGrpcControlPlane(ads_config).value_or(""));
: nullptr};
return std::make_shared<Config::GrpcMuxImpl>(grpc_mux_context,
ads_config.set_node_on_first_message_only());
}
};

Expand Down
9 changes: 2 additions & 7 deletions source/extensions/config_subscription/grpc/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "source/common/config/utility.h"
#include "source/common/config/xds_context_params.h"
#include "source/common/config/xds_resource.h"
#include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
#include "source/extensions/config_subscription/grpc/grpc_stream.h"

#include "absl/container/node_hash_map.h"
Expand All @@ -40,13 +41,7 @@ class GrpcMuxImpl : public GrpcMux,
public GrpcStreamCallbacks<envoy::service::discovery::v3::DiscoveryResponse>,
public Logger::Loggable<Logger::Id::config> {
public:
GrpcMuxImpl(const LocalInfo::LocalInfo& local_info, Grpc::RawAsyncClientPtr async_client,
Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method,
Stats::Scope& scope, const RateLimitSettings& rate_limit_settings,
bool skip_subsequent_node, CustomConfigValidatorsPtr&& config_validators,
BackOffStrategyPtr backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker,
XdsResourcesDelegateOptRef xds_resources_delegate,
EdsResourcesCachePtr eds_resources_cache, const std::string& target_xds_authority);
GrpcMuxImpl(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node);

~GrpcMuxImpl() override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "source/common/config/custom_config_validators_impl.h"
#include "source/common/config/type_to_endpoint.h"
#include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
#include "source/extensions/config_subscription/grpc/grpc_mux_impl.h"
#include "source/extensions/config_subscription/grpc/grpc_subscription_impl.h"
#include "source/extensions/config_subscription/grpc/new_grpc_mux_impl.h"
Expand All @@ -24,29 +25,29 @@ GrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionDat
api_config_source, data.api_.randomGenerator(), SubscriptionFactory::RetryInitialDelayMs,
SubscriptionFactory::RetryMaxDelayMs);

GrpcMuxContext grpc_mux_context{
/*async_client_=*/Utility::factoryForGrpcApiConfigSource(data.cm_.grpcAsyncClientManager(),
api_config_source, data.scope_, true)
->createUncachedRawAsyncClient(),
/*dispatcher_=*/data.dispatcher_,
/*service_method_=*/sotwGrpcMethod(data.type_url_),
/*local_info_=*/data.local_info_,
/*rate_limit_settings_=*/Utility::parseRateLimitSettings(api_config_source),
/*scope_=*/data.scope_,
/*config_validators_=*/std::move(custom_config_validators),
/*xds_resources_delegate_=*/data.xds_resources_delegate_,
/*xds_config_tracker_=*/data.xds_config_tracker_,
/*backoff_strategy_=*/std::move(backoff_strategy),
/*target_xds_authority_=*/control_plane_id,
/*eds_resources_cache_=*/nullptr // EDS cache is only used for ADS.
};

if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
mux = std::make_shared<Config::XdsMux::GrpcMuxSotw>(
Utility::factoryForGrpcApiConfigSource(data.cm_.grpcAsyncClientManager(), api_config_source,
data.scope_, true)
->createUncachedRawAsyncClient(),
data.dispatcher_, sotwGrpcMethod(data.type_url_), data.scope_,
Utility::parseRateLimitSettings(api_config_source), data.local_info_,
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
std::move(backoff_strategy), data.xds_config_tracker_, data.xds_resources_delegate_,
/*std::move(data.eds_resources_cache_)*/ nullptr, // EDS cache is only used for ADS.
control_plane_id);
grpc_mux_context, api_config_source.set_node_on_first_message_only());
} else {
mux = std::make_shared<Config::GrpcMuxImpl>(
data.local_info_,
Utility::factoryForGrpcApiConfigSource(data.cm_.grpcAsyncClientManager(), api_config_source,
data.scope_, true)
->createUncachedRawAsyncClient(),
data.dispatcher_, sotwGrpcMethod(data.type_url_), data.scope_,
Utility::parseRateLimitSettings(api_config_source),
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
std::move(backoff_strategy), data.xds_config_tracker_, data.xds_resources_delegate_,
/*std::move(data.eds_resources_cache_)*/ nullptr, // EDS cache is only used for ADS.
control_plane_id);
mux = std::make_shared<Config::GrpcMuxImpl>(grpc_mux_context,
api_config_source.set_node_on_first_message_only());
}
return std::make_unique<GrpcSubscriptionImpl>(
std::move(mux), data.callbacks_, data.resource_decoder_, data.stats_, data.type_url_,
Expand All @@ -67,27 +68,28 @@ DeltaGrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::Subscripti
api_config_source, data.api_.randomGenerator(), SubscriptionFactory::RetryInitialDelayMs,
SubscriptionFactory::RetryMaxDelayMs);

GrpcMuxContext grpc_mux_context{
/*async_client_=*/Utility::factoryForGrpcApiConfigSource(data.cm_.grpcAsyncClientManager(),
api_config_source, data.scope_, true)
->createUncachedRawAsyncClient(),
/*dispatcher_=*/data.dispatcher_,
/*service_method_=*/deltaGrpcMethod(data.type_url_),
/*local_info_=*/data.local_info_,
/*rate_limit_settings_=*/Utility::parseRateLimitSettings(api_config_source),
/*scope_=*/data.scope_,
/*config_validators_=*/std::move(custom_config_validators),
/*xds_resources_delegate_=*/{},
/*xds_config_tracker_=*/data.xds_config_tracker_,
/*backoff_strategy_=*/std::move(backoff_strategy),
/*target_xds_authority_=*/"",
/*eds_resources_cache_=*/nullptr // EDS cache is only used for ADS.
};

if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
mux = std::make_shared<Config::XdsMux::GrpcMuxDelta>(
Utility::factoryForGrpcApiConfigSource(data.cm_.grpcAsyncClientManager(), api_config_source,
data.scope_, true)
->createUncachedRawAsyncClient(),
data.dispatcher_, deltaGrpcMethod(data.type_url_), data.scope_,
Utility::parseRateLimitSettings(api_config_source), data.local_info_,
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
std::move(backoff_strategy), data.xds_config_tracker_,
/*std::move(data.eds_resources_cache_)*/ nullptr // EDS cache is only used for ADS.
);
grpc_mux_context, api_config_source.set_node_on_first_message_only());
} else {
mux = std::make_shared<Config::NewGrpcMuxImpl>(
Config::Utility::factoryForGrpcApiConfigSource(data.cm_.grpcAsyncClientManager(),
api_config_source, data.scope_, true)
->createUncachedRawAsyncClient(),
data.dispatcher_, deltaGrpcMethod(data.type_url_), data.scope_,
Utility::parseRateLimitSettings(api_config_source), data.local_info_,
std::move(custom_config_validators), std::move(backoff_strategy), data.xds_config_tracker_,
/*std::move(data.eds_resources_cache_)*/ nullptr // EDS cache is only used for ADS.
);
mux = std::make_shared<Config::NewGrpcMuxImpl>(grpc_mux_context);
}
return std::make_unique<GrpcSubscriptionImpl>(
std::move(mux), data.callbacks_, data.resource_decoder_, data.stats_, data.type_url_,
Expand Down
Loading

0 comments on commit 69595b4

Please sign in to comment.