From bab6933035e9396260afda6f4382ae46a9085329 Mon Sep 17 00:00:00 2001 From: Adi Suissa-Peleg Date: Wed, 2 Aug 2023 14:15:47 +0000 Subject: [PATCH] grpc-mux: refactor c'tor arguments Signed-off-by: Adi Suissa-Peleg --- .../extensions/config_subscription/grpc/BUILD | 15 +++ .../grpc_collection_subscription_factory.cc | 28 ++-- .../grpc/grpc_mux_context.h | 36 ++++++ .../config_subscription/grpc/grpc_mux_impl.cc | 61 +++++---- .../config_subscription/grpc/grpc_mux_impl.h | 9 +- .../grpc/grpc_subscription_factory.cc | 78 +++++------ .../grpc/new_grpc_mux_impl.cc | 50 +++++--- .../grpc/new_grpc_mux_impl.h | 9 +- .../config_subscription/grpc/xds_mux/BUILD | 1 + .../grpc/xds_mux/grpc_mux_impl.cc | 121 +++++++++--------- .../grpc/xds_mux/grpc_mux_impl.h | 28 +--- .../config/grpc_subscription_test_harness.h | 31 ++--- .../extensions/clusters/eds/eds_speed_test.cc | 34 ++--- .../grpc/delta_subscription_impl_test.cc | 27 ++-- .../grpc/delta_subscription_test_harness.h | 27 ++-- .../grpc/grpc_mux_impl_test.cc | 96 +++++++------- .../grpc/new_grpc_mux_impl_test.cc | 34 ++--- .../grpc/xds_grpc_mux_impl_test.cc | 100 ++++++++++----- 18 files changed, 437 insertions(+), 348 deletions(-) create mode 100644 source/extensions/config_subscription/grpc/grpc_mux_context.h diff --git a/source/extensions/config_subscription/grpc/BUILD b/source/extensions/config_subscription/grpc/BUILD index a703ad258a90..4cbf37e0b9cd 100644 --- a/source/extensions/config_subscription/grpc/BUILD +++ b/source/extensions/config_subscription/grpc/BUILD @@ -9,12 +9,26 @@ licenses(["notice"]) # Apache 2 envoy_extension_package() +envoy_cc_library( + name = "grpc_mux_context_lib", + hdrs = ["grpc_mux_context.h"], + deps = [ + "//envoy/config:custom_config_validators_interface", + "//envoy/config:eds_resources_cache_interface", + "//envoy/config:xds_config_tracker_interface", + "//envoy/config:xds_resources_delegate_interface", + "//envoy/upstream:cluster_manager_interface", + "//source/common/config:utility_lib", + ], +) + envoy_cc_library( name = "grpc_mux_lib", srcs = ["grpc_mux_impl.cc"], hdrs = ["grpc_mux_impl.h"], deps = [ ":eds_resources_cache_lib", + ":grpc_mux_context_lib", ":grpc_stream_lib", ":xds_source_id_lib", "//envoy/config:custom_config_validators_interface", @@ -47,6 +61,7 @@ envoy_cc_library( deps = [ ":delta_subscription_state_lib", ":eds_resources_cache_lib", + ":grpc_mux_context_lib", ":grpc_stream_lib", ":pausable_ack_queue_lib", ":watch_map_lib", diff --git a/source/extensions/config_subscription/grpc/grpc_collection_subscription_factory.cc b/source/extensions/config_subscription/grpc/grpc_collection_subscription_factory.cc index 25c11bb02b58..bfd2c706070c 100644 --- a/source/extensions/config_subscription/grpc/grpc_collection_subscription_factory.cc +++ b/source/extensions/config_subscription/grpc/grpc_collection_subscription_factory.cc @@ -22,18 +22,24 @@ SubscriptionPtr DeltaGrpcCollectionConfigSubscriptionFactory::create( api_config_source, data.api_.randomGenerator(), SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs); + GrpcMuxContext grpc_mux_context{ + /*async_client_=*/Config::Utility::factoryForGrpcApiConfigSource( + data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true) + ->createUncachedRawAsyncClient(), + /*dispatcher_=*/data.dispatcher_, + /*service_method_=*/deltaGrpcMethod(data.type_url_), + /*local_info_=*/data.local_info_, + /*rate_limit_settings_=*/Utility::parseRateLimitSettings(api_config_source), + /*scope_=*/data.scope_, + /*config_validators_=*/std::move(custom_config_validators), + /*xds_resources_delegate_=*/{}, + /*xds_config_tracker_=*/data.xds_config_tracker_, + /*backoff_strategy_=*/std::move(backoff_strategy), + /*target_xds_authority_=*/"", + /*eds_resources_cache_=*/nullptr // No EDS resources cache needed from collections. + }; return std::make_unique( - data.collection_locator_.value(), - std::make_shared( - Config::Utility::factoryForGrpcApiConfigSource(data.cm_.grpcAsyncClientManager(), - api_config_source, data.scope_, true) - ->createUncachedRawAsyncClient(), - data.dispatcher_, deltaGrpcMethod(data.type_url_), data.scope_, - Utility::parseRateLimitSettings(api_config_source), data.local_info_, - std::move(custom_config_validators), std::move(backoff_strategy), - data.xds_config_tracker_, - // No EDS resources cache needed from collections. - /*eds_resources_cache=*/nullptr), + data.collection_locator_.value(), std::make_shared(grpc_mux_context), data.callbacks_, data.resource_decoder_, data.stats_, data.dispatcher_, Utility::configSourceInitialFetchTimeout(data.config_), /*is_aggregated=*/false, data.options_); diff --git a/source/extensions/config_subscription/grpc/grpc_mux_context.h b/source/extensions/config_subscription/grpc/grpc_mux_context.h new file mode 100644 index 000000000000..254cb18b9534 --- /dev/null +++ b/source/extensions/config_subscription/grpc/grpc_mux_context.h @@ -0,0 +1,36 @@ +#pragma once + +#include "envoy/common/backoff_strategy.h" +#include "envoy/config/custom_config_validators.h" +#include "envoy/config/eds_resources_cache.h" +#include "envoy/config/xds_config_tracker.h" +#include "envoy/config/xds_resources_delegate.h" +#include "envoy/event/dispatcher.h" +#include "envoy/grpc/async_client.h" +#include "envoy/local_info/local_info.h" +#include "envoy/stats/scope.h" + +#include "source/common/config/utility.h" + +namespace Envoy { +namespace Config { + +// Context (data) needed for creating a GrpcMux object. +// These are parameters needed for the creation of all GrpcMux objects. +struct GrpcMuxContext { + Grpc::RawAsyncClientPtr async_client_; + Event::Dispatcher& dispatcher_; + const Protobuf::MethodDescriptor& service_method_; + const LocalInfo::LocalInfo& local_info_; + const RateLimitSettings& rate_limit_settings_; + Stats::Scope& scope_; + CustomConfigValidatorsPtr config_validators_; + XdsResourcesDelegateOptRef xds_resources_delegate_; + XdsConfigTrackerOptRef xds_config_tracker_; + BackOffStrategyPtr backoff_strategy_; + const std::string& target_xds_authority_; + EdsResourcesCachePtr eds_resources_cache_; +}; + +} // namespace Config +} // namespace Envoy diff --git a/source/extensions/config_subscription/grpc/grpc_mux_impl.cc b/source/extensions/config_subscription/grpc/grpc_mux_impl.cc index 72a9a41a33ae..640609f5af04 100644 --- a/source/extensions/config_subscription/grpc/grpc_mux_impl.cc +++ b/source/extensions/config_subscription/grpc/grpc_mux_impl.cc @@ -57,26 +57,24 @@ std::string convertToWildcard(const std::string& resource_name) { } } // namespace -GrpcMuxImpl::GrpcMuxImpl( - const LocalInfo::LocalInfo& local_info, Grpc::RawAsyncClientPtr async_client, - Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method, - Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, bool skip_subsequent_node, - CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy, - XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate, - EdsResourcesCachePtr eds_resources_cache, const std::string& target_xds_authority) - : grpc_stream_(this, std::move(async_client), service_method, dispatcher, scope, - std::move(backoff_strategy), rate_limit_settings), - local_info_(local_info), skip_subsequent_node_(skip_subsequent_node), - config_validators_(std::move(config_validators)), xds_config_tracker_(xds_config_tracker), - xds_resources_delegate_(xds_resources_delegate), - eds_resources_cache_(std::move(eds_resources_cache)), - target_xds_authority_(target_xds_authority), first_stream_request_(true), - dispatcher_(dispatcher), - dynamic_update_callback_handle_(local_info.contextProvider().addDynamicContextUpdateCallback( - [this](absl::string_view resource_type_url) { - onDynamicContextUpdate(resource_type_url); - })) { - Config::Utility::checkLocalInfo("ads", local_info); +GrpcMuxImpl::GrpcMuxImpl(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node) + : grpc_stream_(this, std::move(grpc_mux_context.async_client_), + grpc_mux_context.service_method_, grpc_mux_context.dispatcher_, + grpc_mux_context.scope_, std::move(grpc_mux_context.backoff_strategy_), + grpc_mux_context.rate_limit_settings_), + local_info_(grpc_mux_context.local_info_), skip_subsequent_node_(skip_subsequent_node), + config_validators_(std::move(grpc_mux_context.config_validators_)), + xds_config_tracker_(grpc_mux_context.xds_config_tracker_), + xds_resources_delegate_(grpc_mux_context.xds_resources_delegate_), + eds_resources_cache_(std::move(grpc_mux_context.eds_resources_cache_)), + target_xds_authority_(grpc_mux_context.target_xds_authority_), first_stream_request_(true), + dispatcher_(grpc_mux_context.dispatcher_), + dynamic_update_callback_handle_( + grpc_mux_context.local_info_.contextProvider().addDynamicContextUpdateCallback( + [this](absl::string_view resource_type_url) { + onDynamicContextUpdate(resource_type_url); + })) { + Config::Utility::checkLocalInfo("ads", local_info_); AllMuxes::get().insert(this); } @@ -553,18 +551,27 @@ class GrpcMuxFactory : public MuxFactory { const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate, bool use_eds_resources_cache) override { - return std::make_shared( - local_info, std::move(async_client), dispatcher, + GrpcMuxContext grpc_mux_context{ + /*async_client_=*/std::move(async_client), + /*dispatcher_=*/dispatcher, + /*service_method_=*/ *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), - scope, Utility::parseRateLimitSettings(ads_config), - ads_config.set_node_on_first_message_only(), std::move(config_validators), - std::move(backoff_strategy), xds_config_tracker, xds_resources_delegate, + /*local_info_=*/local_info, + /*rate_limit_settings_=*/Utility::parseRateLimitSettings(ads_config), + /*scope_=*/scope, + /*config_validators_=*/std::move(config_validators), + /*xds_resources_delegate_=*/xds_resources_delegate, + /*xds_config_tracker_=*/xds_config_tracker, + /*backoff_strategy_=*/std::move(backoff_strategy), + /*target_xds_authority_=*/Config::Utility::getGrpcControlPlane(ads_config).value_or(""), + /*eds_resources_cache_=*/ (use_eds_resources_cache && Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads")) ? std::make_unique(dispatcher) - : nullptr, - Config::Utility::getGrpcControlPlane(ads_config).value_or("")); + : nullptr}; + return std::make_shared(grpc_mux_context, + ads_config.set_node_on_first_message_only()); } }; diff --git a/source/extensions/config_subscription/grpc/grpc_mux_impl.h b/source/extensions/config_subscription/grpc/grpc_mux_impl.h index a018dd0a57e1..1b683ad6c765 100644 --- a/source/extensions/config_subscription/grpc/grpc_mux_impl.h +++ b/source/extensions/config_subscription/grpc/grpc_mux_impl.h @@ -26,6 +26,7 @@ #include "source/common/config/utility.h" #include "source/common/config/xds_context_params.h" #include "source/common/config/xds_resource.h" +#include "source/extensions/config_subscription/grpc/grpc_mux_context.h" #include "source/extensions/config_subscription/grpc/grpc_stream.h" #include "absl/container/node_hash_map.h" @@ -40,13 +41,7 @@ class GrpcMuxImpl : public GrpcMux, public GrpcStreamCallbacks, public Logger::Loggable { public: - GrpcMuxImpl(const LocalInfo::LocalInfo& local_info, Grpc::RawAsyncClientPtr async_client, - Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method, - Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, - bool skip_subsequent_node, CustomConfigValidatorsPtr&& config_validators, - BackOffStrategyPtr backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, - XdsResourcesDelegateOptRef xds_resources_delegate, - EdsResourcesCachePtr eds_resources_cache, const std::string& target_xds_authority); + GrpcMuxImpl(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node); ~GrpcMuxImpl() override; diff --git a/source/extensions/config_subscription/grpc/grpc_subscription_factory.cc b/source/extensions/config_subscription/grpc/grpc_subscription_factory.cc index 435df4f3a1ef..722bdf5b689e 100644 --- a/source/extensions/config_subscription/grpc/grpc_subscription_factory.cc +++ b/source/extensions/config_subscription/grpc/grpc_subscription_factory.cc @@ -2,6 +2,7 @@ #include "source/common/config/custom_config_validators_impl.h" #include "source/common/config/type_to_endpoint.h" +#include "source/extensions/config_subscription/grpc/grpc_mux_context.h" #include "source/extensions/config_subscription/grpc/grpc_mux_impl.h" #include "source/extensions/config_subscription/grpc/grpc_subscription_impl.h" #include "source/extensions/config_subscription/grpc/new_grpc_mux_impl.h" @@ -24,29 +25,29 @@ GrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionDat api_config_source, data.api_.randomGenerator(), SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs); + GrpcMuxContext grpc_mux_context{ + /*async_client_=*/Utility::factoryForGrpcApiConfigSource(data.cm_.grpcAsyncClientManager(), + api_config_source, data.scope_, true) + ->createUncachedRawAsyncClient(), + /*dispatcher_=*/data.dispatcher_, + /*service_method_=*/sotwGrpcMethod(data.type_url_), + /*local_info_=*/data.local_info_, + /*rate_limit_settings_=*/Utility::parseRateLimitSettings(api_config_source), + /*scope_=*/data.scope_, + /*config_validators_=*/std::move(custom_config_validators), + /*xds_resources_delegate_=*/data.xds_resources_delegate_, + /*xds_config_tracker_=*/data.xds_config_tracker_, + /*backoff_strategy_=*/std::move(backoff_strategy), + /*target_xds_authority_=*/control_plane_id, + /*eds_resources_cache_=*/nullptr // EDS cache is only used for ADS. + }; + if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) { mux = std::make_shared( - Utility::factoryForGrpcApiConfigSource(data.cm_.grpcAsyncClientManager(), api_config_source, - data.scope_, true) - ->createUncachedRawAsyncClient(), - data.dispatcher_, sotwGrpcMethod(data.type_url_), data.scope_, - Utility::parseRateLimitSettings(api_config_source), data.local_info_, - api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators), - std::move(backoff_strategy), data.xds_config_tracker_, data.xds_resources_delegate_, - /*std::move(data.eds_resources_cache_)*/ nullptr, // EDS cache is only used for ADS. - control_plane_id); + grpc_mux_context, api_config_source.set_node_on_first_message_only()); } else { - mux = std::make_shared( - data.local_info_, - Utility::factoryForGrpcApiConfigSource(data.cm_.grpcAsyncClientManager(), api_config_source, - data.scope_, true) - ->createUncachedRawAsyncClient(), - data.dispatcher_, sotwGrpcMethod(data.type_url_), data.scope_, - Utility::parseRateLimitSettings(api_config_source), - api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators), - std::move(backoff_strategy), data.xds_config_tracker_, data.xds_resources_delegate_, - /*std::move(data.eds_resources_cache_)*/ nullptr, // EDS cache is only used for ADS. - control_plane_id); + mux = std::make_shared(grpc_mux_context, + api_config_source.set_node_on_first_message_only()); } return std::make_unique( std::move(mux), data.callbacks_, data.resource_decoder_, data.stats_, data.type_url_, @@ -67,27 +68,28 @@ DeltaGrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::Subscripti api_config_source, data.api_.randomGenerator(), SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs); + GrpcMuxContext grpc_mux_context{ + /*async_client_=*/Utility::factoryForGrpcApiConfigSource(data.cm_.grpcAsyncClientManager(), + api_config_source, data.scope_, true) + ->createUncachedRawAsyncClient(), + /*dispatcher_=*/data.dispatcher_, + /*service_method_=*/deltaGrpcMethod(data.type_url_), + /*local_info_=*/data.local_info_, + /*rate_limit_settings_=*/Utility::parseRateLimitSettings(api_config_source), + /*scope_=*/data.scope_, + /*config_validators_=*/std::move(custom_config_validators), + /*xds_resources_delegate_=*/{}, + /*xds_config_tracker_=*/data.xds_config_tracker_, + /*backoff_strategy_=*/std::move(backoff_strategy), + /*target_xds_authority_=*/"", + /*eds_resources_cache_=*/nullptr // EDS cache is only used for ADS. + }; + if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) { mux = std::make_shared( - Utility::factoryForGrpcApiConfigSource(data.cm_.grpcAsyncClientManager(), api_config_source, - data.scope_, true) - ->createUncachedRawAsyncClient(), - data.dispatcher_, deltaGrpcMethod(data.type_url_), data.scope_, - Utility::parseRateLimitSettings(api_config_source), data.local_info_, - api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators), - std::move(backoff_strategy), data.xds_config_tracker_, - /*std::move(data.eds_resources_cache_)*/ nullptr // EDS cache is only used for ADS. - ); + grpc_mux_context, api_config_source.set_node_on_first_message_only()); } else { - mux = std::make_shared( - Config::Utility::factoryForGrpcApiConfigSource(data.cm_.grpcAsyncClientManager(), - api_config_source, data.scope_, true) - ->createUncachedRawAsyncClient(), - data.dispatcher_, deltaGrpcMethod(data.type_url_), data.scope_, - Utility::parseRateLimitSettings(api_config_source), data.local_info_, - std::move(custom_config_validators), std::move(backoff_strategy), data.xds_config_tracker_, - /*std::move(data.eds_resources_cache_)*/ nullptr // EDS cache is only used for ADS. - ); + mux = std::make_shared(grpc_mux_context); } return std::make_unique( std::move(mux), data.callbacks_, data.resource_decoder_, data.stats_, data.type_url_, diff --git a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc index 8507e3b525fd..5a45f97abd8f 100644 --- a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc +++ b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc @@ -35,21 +35,21 @@ class AllMuxesState { using AllMuxes = ThreadSafeSingleton; } // namespace -NewGrpcMuxImpl::NewGrpcMuxImpl( - Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher, - const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope, - const RateLimitSettings& rate_limit_settings, const LocalInfo::LocalInfo& local_info, - CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy, - XdsConfigTrackerOptRef xds_config_tracker, EdsResourcesCachePtr eds_resources_cache) - : grpc_stream_(this, std::move(async_client), service_method, dispatcher, scope, - std::move(backoff_strategy), rate_limit_settings), - local_info_(local_info), config_validators_(std::move(config_validators)), - dynamic_update_callback_handle_(local_info.contextProvider().addDynamicContextUpdateCallback( - [this](absl::string_view resource_type_url) { - onDynamicContextUpdate(resource_type_url); - })), - dispatcher_(dispatcher), xds_config_tracker_(xds_config_tracker), - eds_resources_cache_(std::move(eds_resources_cache)) { +NewGrpcMuxImpl::NewGrpcMuxImpl(GrpcMuxContext& grpc_mux_context) + : grpc_stream_(this, std::move(grpc_mux_context.async_client_), + grpc_mux_context.service_method_, grpc_mux_context.dispatcher_, + grpc_mux_context.scope_, std::move(grpc_mux_context.backoff_strategy_), + grpc_mux_context.rate_limit_settings_), + local_info_(grpc_mux_context.local_info_), + config_validators_(std::move(grpc_mux_context.config_validators_)), + dynamic_update_callback_handle_( + grpc_mux_context.local_info_.contextProvider().addDynamicContextUpdateCallback( + [this](absl::string_view resource_type_url) { + onDynamicContextUpdate(resource_type_url); + })), + dispatcher_(grpc_mux_context.dispatcher_), + xds_config_tracker_(grpc_mux_context.xds_config_tracker_), + eds_resources_cache_(std::move(grpc_mux_context.eds_resources_cache_)) { AllMuxes::get().insert(this); } @@ -348,16 +348,26 @@ class NewGrpcMuxFactory : public MuxFactory { const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, OptRef, bool use_eds_resources_cache) override { - return std::make_shared( - std::move(async_client), dispatcher, + GrpcMuxContext grpc_mux_context{ + /*async_client_=*/std::move(async_client), + /*dispatcher_=*/dispatcher, + /*service_method_=*/ *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources"), - scope, Utility::parseRateLimitSettings(ads_config), local_info, - std::move(config_validators), std::move(backoff_strategy), xds_config_tracker, + /*local_info_=*/local_info, + /*rate_limit_settings_=*/Utility::parseRateLimitSettings(ads_config), + /*scope_=*/scope, + /*config_validators_=*/std::move(config_validators), + /*xds_resources_delegate_=*/absl::nullopt, + /*xds_config_tracker_=*/xds_config_tracker, + /*backoff_strategy_=*/std::move(backoff_strategy), + /*target_xds_authority_=*/"", + /*eds_resources_cache_=*/ (use_eds_resources_cache && Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads")) ? std::make_unique(dispatcher) - : nullptr); + : nullptr}; + return std::make_shared(grpc_mux_context); } }; diff --git a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h index 2c82453a7488..421cbd9ae05b 100644 --- a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h +++ b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h @@ -16,6 +16,7 @@ #include "source/common/grpc/common.h" #include "source/common/runtime/runtime_features.h" #include "source/extensions/config_subscription/grpc/delta_subscription_state.h" +#include "source/extensions/config_subscription/grpc/grpc_mux_context.h" #include "source/extensions/config_subscription/grpc/grpc_stream.h" #include "source/extensions/config_subscription/grpc/pausable_ack_queue.h" #include "source/extensions/config_subscription/grpc/watch_map.h" @@ -33,13 +34,7 @@ class NewGrpcMuxImpl public GrpcStreamCallbacks, Logger::Loggable { public: - NewGrpcMuxImpl(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher, - const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope, - const RateLimitSettings& rate_limit_settings, - const LocalInfo::LocalInfo& local_info, - CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy, - XdsConfigTrackerOptRef xds_config_tracker, - EdsResourcesCachePtr eds_resources_cache); + NewGrpcMuxImpl(GrpcMuxContext& grpc_mux_context); ~NewGrpcMuxImpl() override; diff --git a/source/extensions/config_subscription/grpc/xds_mux/BUILD b/source/extensions/config_subscription/grpc/xds_mux/BUILD index 42b2d7b63336..a9e7f75fd70a 100644 --- a/source/extensions/config_subscription/grpc/xds_mux/BUILD +++ b/source/extensions/config_subscription/grpc/xds_mux/BUILD @@ -69,6 +69,7 @@ envoy_cc_extension( "//source/common/config:xds_resource_lib", "//source/common/memory:utils_lib", "//source/extensions/config_subscription/grpc:eds_resources_cache_lib", + "//source/extensions/config_subscription/grpc:grpc_mux_context_lib", "//source/extensions/config_subscription/grpc:grpc_stream_lib", "//source/extensions/config_subscription/grpc:pausable_ack_queue_lib", "//source/extensions/config_subscription/grpc:watch_map_lib", diff --git a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.cc b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.cc index 890f5c6f5a90..e13695971fde 100644 --- a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.cc +++ b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.cc @@ -38,27 +38,25 @@ using AllMuxes = ThreadSafeSingleton; } // namespace template -GrpcMuxImpl::GrpcMuxImpl( - std::unique_ptr subscription_state_factory, bool skip_subsequent_node, - const LocalInfo::LocalInfo& local_info, Grpc::RawAsyncClientPtr&& async_client, - Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method, - Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, - CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy, - XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate, - EdsResourcesCachePtr eds_resources_cache, const std::string& target_xds_authority) - : grpc_stream_(this, std::move(async_client), service_method, dispatcher, scope, - std::move(backoff_strategy), rate_limit_settings), +GrpcMuxImpl::GrpcMuxImpl(std::unique_ptr subscription_state_factory, + GrpcMuxContext& grpc_mux_content, bool skip_subsequent_node) + : grpc_stream_(this, std::move(grpc_mux_content.async_client_), + grpc_mux_content.service_method_, grpc_mux_content.dispatcher_, + grpc_mux_content.scope_, std::move(grpc_mux_content.backoff_strategy_), + grpc_mux_content.rate_limit_settings_), subscription_state_factory_(std::move(subscription_state_factory)), - skip_subsequent_node_(skip_subsequent_node), local_info_(local_info), - dynamic_update_callback_handle_(local_info.contextProvider().addDynamicContextUpdateCallback( - [this](absl::string_view resource_type_url) { - onDynamicContextUpdate(resource_type_url); - })), - config_validators_(std::move(config_validators)), xds_config_tracker_(xds_config_tracker), - xds_resources_delegate_(xds_resources_delegate), - eds_resources_cache_(std::move(eds_resources_cache)), - target_xds_authority_(target_xds_authority) { - Config::Utility::checkLocalInfo("ads", local_info); + skip_subsequent_node_(skip_subsequent_node), local_info_(grpc_mux_content.local_info_), + dynamic_update_callback_handle_( + grpc_mux_content.local_info_.contextProvider().addDynamicContextUpdateCallback( + [this](absl::string_view resource_type_url) { + onDynamicContextUpdate(resource_type_url); + })), + config_validators_(std::move(grpc_mux_content.config_validators_)), + xds_config_tracker_(grpc_mux_content.xds_config_tracker_), + xds_resources_delegate_(grpc_mux_content.xds_resources_delegate_), + eds_resources_cache_(std::move(grpc_mux_content.eds_resources_cache_)), + target_xds_authority_(grpc_mux_content.target_xds_authority_) { + Config::Utility::checkLocalInfo("ads", grpc_mux_content.local_info_); AllMuxes::get().insert(this); } @@ -372,18 +370,9 @@ template class GrpcMuxImpl; // Delta- and SotW-specific concrete subclasses: -GrpcMuxDelta::GrpcMuxDelta(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher, - const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope, - const RateLimitSettings& rate_limit_settings, - const LocalInfo::LocalInfo& local_info, bool skip_subsequent_node, - CustomConfigValidatorsPtr&& config_validators, - BackOffStrategyPtr backoff_strategy, - XdsConfigTrackerOptRef xds_config_tracker, - EdsResourcesCachePtr eds_resources_cache) - : GrpcMuxImpl(std::make_unique(dispatcher), skip_subsequent_node, - local_info, std::move(async_client), dispatcher, service_method, scope, - rate_limit_settings, std::move(config_validators), std::move(backoff_strategy), - xds_config_tracker, absl::nullopt, std::move(eds_resources_cache)) {} +GrpcMuxDelta::GrpcMuxDelta(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node) + : GrpcMuxImpl(std::make_unique(grpc_mux_context.dispatcher_), + grpc_mux_context, skip_subsequent_node) {} // GrpcStreamCallbacks for GrpcMuxDelta void GrpcMuxDelta::requestOnDemandUpdate(const std::string& type_url, @@ -396,21 +385,9 @@ void GrpcMuxDelta::requestOnDemandUpdate(const std::string& type_url, } } -GrpcMuxSotw::GrpcMuxSotw(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher, - const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope, - const RateLimitSettings& rate_limit_settings, - const LocalInfo::LocalInfo& local_info, bool skip_subsequent_node, - CustomConfigValidatorsPtr&& config_validators, - BackOffStrategyPtr backoff_strategy, - XdsConfigTrackerOptRef xds_config_tracker, - XdsResourcesDelegateOptRef xds_resources_delegate, - EdsResourcesCachePtr eds_resources_cache, - const std::string& target_xds_authority) - : GrpcMuxImpl(std::make_unique(dispatcher), skip_subsequent_node, - local_info, std::move(async_client), dispatcher, service_method, scope, - rate_limit_settings, std::move(config_validators), std::move(backoff_strategy), - xds_config_tracker, xds_resources_delegate, std::move(eds_resources_cache), - target_xds_authority) {} +GrpcMuxSotw::GrpcMuxSotw(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node) + : GrpcMuxImpl(std::make_unique(grpc_mux_context.dispatcher_), + grpc_mux_context, skip_subsequent_node) {} Config::GrpcMuxWatchPtr NullGrpcMuxImpl::addWatch(const std::string&, const absl::flat_hash_set&, @@ -431,18 +408,27 @@ class DeltaGrpcMuxFactory : public MuxFactory { const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef, bool use_eds_resources_cache) override { - return std::make_shared( - std::move(async_client), dispatcher, + GrpcMuxContext grpc_mux_context{ + /*async_client_=*/std::move(async_client), + /*dispatcher_=*/dispatcher, + /*service_method_=*/ *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( - "envoy.service.discovery.v3.AggregatedDiscoveryService." - "DeltaAggregatedResources"), - scope, Envoy::Config::Utility::parseRateLimitSettings(ads_config), local_info, - ads_config.set_node_on_first_message_only(), std::move(config_validators), - std::move(backoff_strategy), xds_config_tracker, + "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources"), + /*local_info_=*/local_info, + /*rate_limit_settings_=*/Utility::parseRateLimitSettings(ads_config), + /*scope_=*/scope, + /*config_validators_=*/std::move(config_validators), + /*xds_resources_delegate_=*/absl::nullopt, + /*xds_config_tracker_=*/xds_config_tracker, + /*backoff_strategy_=*/std::move(backoff_strategy), + /*target_xds_authority_=*/"", + /*eds_resources_cache_=*/ (use_eds_resources_cache && Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads")) ? std::make_unique(dispatcher) - : nullptr); + : nullptr}; + return std::make_shared(grpc_mux_context, + ads_config.set_node_on_first_message_only()); } }; @@ -457,18 +443,27 @@ class SotwGrpcMuxFactory : public MuxFactory { const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef, bool use_eds_resources_cache) override { - return std::make_shared( - std::move(async_client), dispatcher, + GrpcMuxContext grpc_mux_context{ + /*async_client_=*/std::move(async_client), + /*dispatcher_=*/dispatcher, + /*service_method_=*/ *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( - "envoy.service.discovery.v3.AggregatedDiscoveryService." - "StreamAggregatedResources"), - scope, Envoy::Config::Utility::parseRateLimitSettings(ads_config), local_info, - ads_config.set_node_on_first_message_only(), std::move(config_validators), - std::move(backoff_strategy), xds_config_tracker, absl::nullopt, + "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), + /*local_info_=*/local_info, + /*rate_limit_settings_=*/Utility::parseRateLimitSettings(ads_config), + /*scope_=*/scope, + /*config_validators_=*/std::move(config_validators), + /*xds_resources_delegate_=*/absl::nullopt, + /*xds_config_tracker_=*/xds_config_tracker, + /*backoff_strategy_=*/std::move(backoff_strategy), + /*target_xds_authority_=*/"", + /*eds_resources_cache_=*/ (use_eds_resources_cache && Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads")) ? std::make_unique(dispatcher) - : nullptr); + : nullptr}; + return std::make_shared(grpc_mux_context, + ads_config.set_node_on_first_message_only()); } }; diff --git a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h index 67746534b092..f7063dc76f7c 100644 --- a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h +++ b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h @@ -21,6 +21,7 @@ #include "source/common/common/utility.h" #include "source/common/config/api_version.h" #include "source/common/grpc/common.h" +#include "source/extensions/config_subscription/grpc/grpc_mux_context.h" #include "source/extensions/config_subscription/grpc/grpc_stream.h" #include "source/extensions/config_subscription/grpc/pausable_ack_queue.h" #include "source/extensions/config_subscription/grpc/watch_map.h" @@ -59,15 +60,8 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, public ShutdownableMux, Logger::Loggable { public: - GrpcMuxImpl(std::unique_ptr subscription_state_factory, bool skip_subsequent_node, - const LocalInfo::LocalInfo& local_info, Grpc::RawAsyncClientPtr&& async_client, - Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method, - Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, - CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy, - XdsConfigTrackerOptRef xds_config_tracker, - XdsResourcesDelegateOptRef xds_resources_delegate = absl::nullopt, - EdsResourcesCachePtr eds_resources_cache = nullptr, - const std::string& target_xds_authority = ""); + GrpcMuxImpl(std::unique_ptr subscription_state_factory, GrpcMuxContext& grpc_mux_context, + bool skip_subsequent_node); ~GrpcMuxImpl() override; @@ -226,12 +220,7 @@ class GrpcMuxDelta : public GrpcMuxImpl { public: - GrpcMuxDelta(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher, - const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope, - const RateLimitSettings& rate_limit_settings, const LocalInfo::LocalInfo& local_info, - bool skip_subsequent_node, CustomConfigValidatorsPtr&& config_validators, - BackOffStrategyPtr backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, - EdsResourcesCachePtr eds_resources_cache); + GrpcMuxDelta(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node); // GrpcStreamCallbacks void requestOnDemandUpdate(const std::string& type_url, @@ -242,14 +231,7 @@ class GrpcMuxSotw : public GrpcMuxImpl { public: - GrpcMuxSotw(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher, - const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope, - const RateLimitSettings& rate_limit_settings, const LocalInfo::LocalInfo& local_info, - bool skip_subsequent_node, CustomConfigValidatorsPtr&& config_validators, - BackOffStrategyPtr backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker, - XdsResourcesDelegateOptRef xds_resources_delegate = absl::nullopt, - EdsResourcesCachePtr eds_resources_cache = nullptr, - const std::string& target_xds_authority = ""); + GrpcMuxSotw(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node); // GrpcStreamCallbacks void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set&) override { diff --git a/test/common/config/grpc_subscription_test_harness.h b/test/common/config/grpc_subscription_test_harness.h index c478901eca49..6467981a8510 100644 --- a/test/common/config/grpc_subscription_test_harness.h +++ b/test/common/config/grpc_subscription_test_harness.h @@ -59,23 +59,24 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { auto backoff_strategy = std::make_unique( SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_); + GrpcMuxContext grpc_mux_context{ + /*async_client_=*/std::unique_ptr(async_client_), + /*dispatcher_=*/dispatcher_, + /*service_method_=*/*method_descriptor_, + /*local_info_=*/local_info_, + /*rate_limit_settings_=*/rate_limit_settings_, + /*scope_=*/*stats_store_.rootScope(), + /*config_validators_=*/std::move(config_validators_), + /*xds_resources_delegate_=*/XdsResourcesDelegateOptRef(), + /*xds_config_tracker_=*/XdsConfigTrackerOptRef(), + /*backoff_strategy_=*/std::move(backoff_strategy), + /*target_xds_authority_=*/"", + /*eds_resources_cache_=*/nullptr}; + if (should_use_unified_) { - mux_ = std::make_shared( - std::unique_ptr(async_client_), dispatcher_, *method_descriptor_, - *stats_store_.rootScope(), rate_limit_settings_, local_info_, true, - std::move(config_validators_), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), - /*eds_resources_cache=*/nullptr); + mux_ = std::make_shared(grpc_mux_context, true); } else { - mux_ = std::make_shared( - local_info_, std::unique_ptr(async_client_), dispatcher_, - *method_descriptor_, *stats_store_.rootScope(), rate_limit_settings_, true, - std::move(config_validators_), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), - /*eds_resources_cache=*/nullptr, - /*target_xds_authority=*/""); + mux_ = std::make_shared(grpc_mux_context, true); } subscription_ = std::make_unique( mux_, callbacks_, resource_decoder_, stats_, Config::TypeUrl::get().ClusterLoadAssignment, diff --git a/test/extensions/clusters/eds/eds_speed_test.cc b/test/extensions/clusters/eds/eds_speed_test.cc index 68c076f7b6f2..c5b10480216f 100644 --- a/test/extensions/clusters/eds/eds_speed_test.cc +++ b/test/extensions/clusters/eds/eds_speed_test.cc @@ -52,25 +52,25 @@ class EdsSpeedTest { auto backoff_strategy = std::make_unique( Config::SubscriptionFactory::RetryInitialDelayMs, Config::SubscriptionFactory::RetryMaxDelayMs, random_); + Config::GrpcMuxContext grpc_mux_context{ + /*async_client_=*/std::unique_ptr(async_client_), + /*dispatcher_=*/server_context_.dispatcher_, + /*service_method_=*/ + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.service.endpoint.v3.EndpointDiscoveryService.StreamEndpoints"), + /*local_info_=*/local_info_, + /*rate_limit_settings_=*/{}, + /*scope_=*/scope_, + /*config_validators_=*/std::move(config_validators_), + /*xds_resources_delegate_=*/Config::XdsResourcesDelegateOptRef(), + /*xds_config_tracker_=*/Config::XdsConfigTrackerOptRef(), + /*backoff_strategy_=*/std::move(backoff_strategy), + /*target_xds_authority_=*/"", + /*eds_resources_cache_=*/nullptr}; if (use_unified_mux_) { - grpc_mux_.reset(new Config::XdsMux::GrpcMuxSotw( - std::unique_ptr(async_client_), server_context_.dispatcher_, - *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( - "envoy.service.endpoint.v3.EndpointDiscoveryService.StreamEndpoints"), - scope_, {}, local_info_, true, std::move(config_validators_), std::move(backoff_strategy), - /*xds_config_tracker=*/Config::XdsConfigTrackerOptRef(), - /*xds_resources_delegate=*/{}, /*eds_resources_cache=*/nullptr)); + grpc_mux_.reset(new Config::XdsMux::GrpcMuxSotw(grpc_mux_context, true)); } else { - grpc_mux_.reset(new Config::GrpcMuxImpl( - local_info_, std::unique_ptr(async_client_), - server_context_.dispatcher_, - *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( - "envoy.service.endpoint.v3.EndpointDiscoveryService.StreamEndpoints"), - scope_, {}, true, std::move(config_validators_), std::move(backoff_strategy), - /*xds_config_tracker=*/Config::XdsConfigTrackerOptRef(), - /*xds_resources_delegate=*/Config::XdsResourcesDelegateOptRef(), - /*eds_resources_cache=*/nullptr, - /*target_xds_authority=*/"")); + grpc_mux_.reset(new Config::GrpcMuxImpl(grpc_mux_context, true)); } resetCluster(R"EOF( name: name diff --git a/test/extensions/config_subscription/grpc/delta_subscription_impl_test.cc b/test/extensions/config_subscription/grpc/delta_subscription_impl_test.cc index 14a7760b6501..c74e53cd5b27 100644 --- a/test/extensions/config_subscription/grpc/delta_subscription_impl_test.cc +++ b/test/extensions/config_subscription/grpc/delta_subscription_impl_test.cc @@ -154,20 +154,23 @@ TEST_P(DeltaSubscriptionNoGrpcStreamTest, NoGrpcStream) { auto backoff_strategy = std::make_unique( SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random); + GrpcMuxContext grpc_mux_context{ + /*async_client_=*/std::unique_ptr(async_client), + /*dispatcher_=*/dispatcher, + /*service_method_=*/*method_descriptor, + /*local_info_=*/local_info, + /*rate_limit_settings_=*/rate_limit_settings, + /*scope_=*/*stats_store.rootScope(), + /*config_validators_=*/std::make_unique>(), + /*xds_resources_delegate_=*/XdsResourcesDelegateOptRef(), + /*xds_config_tracker_=*/XdsConfigTrackerOptRef(), + /*backoff_strategy_=*/std::move(backoff_strategy), + /*target_xds_authority_=*/"", + /*eds_resources_cache_=*/nullptr}; if (GetParam() == LegacyOrUnified::Unified) { - xds_context = std::make_shared( - std::unique_ptr(async_client), dispatcher, *method_descriptor, - *stats_store.rootScope(), rate_limit_settings, local_info, false, - std::make_unique>(), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - /*eds_resources_cache=*/nullptr); + xds_context = std::make_shared(grpc_mux_context, false); } else { - xds_context = std::make_shared( - std::unique_ptr(async_client), dispatcher, *method_descriptor, - *stats_store.rootScope(), rate_limit_settings, local_info, - std::make_unique>(), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - /*eds_resources_cache=*/nullptr); + xds_context = std::make_shared(grpc_mux_context); } GrpcSubscriptionImplPtr subscription = std::make_unique( diff --git a/test/extensions/config_subscription/grpc/delta_subscription_test_harness.h b/test/extensions/config_subscription/grpc/delta_subscription_test_harness.h index a597f4f7b2dc..95ae79c6f042 100644 --- a/test/extensions/config_subscription/grpc/delta_subscription_test_harness.h +++ b/test/extensions/config_subscription/grpc/delta_subscription_test_harness.h @@ -50,20 +50,23 @@ class DeltaSubscriptionTestHarness : public SubscriptionTestHarness { EXPECT_CALL(dispatcher_, createTimer_(_)).Times(2); auto backoff_strategy = std::make_unique( SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_); + GrpcMuxContext grpc_mux_context{ + /*async_client_=*/std::unique_ptr(async_client_), + /*dispatcher_=*/dispatcher_, + /*service_method_=*/*method_descriptor_, + /*local_info_=*/local_info_, + /*rate_limit_settings_=*/rate_limit_settings_, + /*scope_=*/*stats_store_.rootScope(), + /*config_validators_=*/std::make_unique>(), + /*xds_resources_delegate_=*/XdsResourcesDelegateOptRef(), + /*xds_config_tracker_=*/XdsConfigTrackerOptRef(), + /*backoff_strategy_=*/std::move(backoff_strategy), + /*target_xds_authority_=*/"", + /*eds_resources_cache_=*/nullptr}; if (should_use_unified_) { - xds_context_ = std::make_shared( - std::unique_ptr(async_client_), dispatcher_, *method_descriptor_, - *stats_store_.rootScope(), rate_limit_settings_, local_info_, false, - std::make_unique>(), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - /*eds_resources_cache=*/nullptr); + xds_context_ = std::make_shared(grpc_mux_context, false); } else { - xds_context_ = std::make_shared( - std::unique_ptr(async_client_), dispatcher_, *method_descriptor_, - *stats_store_.rootScope(), rate_limit_settings_, local_info_, - std::make_unique>(), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - /*eds_resources_cache=*/nullptr); + xds_context_ = std::make_shared(grpc_mux_context); } subscription_ = std::make_unique( xds_context_, callbacks_, resource_decoder_, stats_, diff --git a/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc b/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc index abe52c8c7098..4375be1c1c0e 100644 --- a/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc +++ b/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc @@ -61,32 +61,28 @@ class GrpcMuxImplTestBase : public testing::Test { {} - void setup() { - grpc_mux_ = std::make_unique( - local_info_, std::unique_ptr(async_client_), dispatcher_, - *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( - "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), - *stats_.rootScope(), rate_limit_settings_, true, std::move(config_validators_), - std::make_unique( - SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, - random_), - /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), - std::unique_ptr(eds_resources_cache_), /*target_xds_authority=*/""); - } + void setup() { setup(rate_limit_settings_); } void setup(const RateLimitSettings& custom_rate_limit_settings) { - grpc_mux_ = std::make_unique( - local_info_, std::unique_ptr(async_client_), dispatcher_, + GrpcMuxContext grpc_mux_context{ + /*async_client_=*/std::unique_ptr(async_client_), + /*dispatcher_=*/dispatcher_, + /*service_method_=*/ *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), - *stats_.rootScope(), custom_rate_limit_settings, true, std::move(config_validators_), + /*local_info_=*/local_info_, + /*rate_limit_settings_=*/custom_rate_limit_settings, + /*scope_=*/*stats_.rootScope(), + /*config_validators_=*/std::move(config_validators_), + /*xds_resources_delegate_=*/XdsResourcesDelegateOptRef(), + /*xds_config_tracker_=*/XdsConfigTrackerOptRef(), + /*backoff_strategy_=*/ std::make_unique( SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), - /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), - std::unique_ptr(eds_resources_cache_), /*target_xds_authority=*/""); + /*target_xds_authority_=*/"", + /*eds_resources_cache_=*/std::unique_ptr(eds_resources_cache_)}; + grpc_mux_ = std::make_unique(grpc_mux_context, true); } void expectSendMessage(const std::string& type_url, @@ -955,40 +951,50 @@ TEST_F(GrpcMuxImplTest, UnwatchedTypeRejectsResources) { TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyClusterName) { EXPECT_CALL(local_info_, clusterName()).WillOnce(ReturnRef(EMPTY_STRING)); + GrpcMuxContext grpc_mux_context{ + /*async_client_=*/std::unique_ptr(async_client_), + /*dispatcher_=*/dispatcher_, + /*service_method_=*/ + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), + /*local_info_=*/local_info_, + /*rate_limit_settings_=*/rate_limit_settings_, + /*scope_=*/*stats_.rootScope(), + /*config_validators_=*/std::make_unique>(), + /*xds_resources_delegate_=*/XdsResourcesDelegateOptRef(), + /*xds_config_tracker_=*/XdsConfigTrackerOptRef(), + /*backoff_strategy_=*/ + std::make_unique( + SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), + /*target_xds_authority_=*/"", + /*eds_resources_cache_=*/nullptr}; EXPECT_THROW_WITH_MESSAGE( - GrpcMuxImpl( - local_info_, std::unique_ptr(async_client_), dispatcher_, - *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( - "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), - *stats_.rootScope(), rate_limit_settings_, true, - std::make_unique>(), - std::make_unique( - SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, - random_), - /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), - /*eds_resources_cache=*/nullptr, /*target_xds_authority=*/""), - EnvoyException, + GrpcMuxImpl(grpc_mux_context, true), EnvoyException, "ads: node 'id' and 'cluster' are required. Set it either in 'node' config or via " "--service-node and --service-cluster options."); } TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyNodeName) { EXPECT_CALL(local_info_, nodeName()).WillOnce(ReturnRef(EMPTY_STRING)); + GrpcMuxContext grpc_mux_context{ + /*async_client_=*/std::unique_ptr(async_client_), + /*dispatcher_=*/dispatcher_, + /*service_method_=*/ + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), + /*local_info_=*/local_info_, + /*rate_limit_settings_=*/rate_limit_settings_, + /*scope_=*/*stats_.rootScope(), + /*config_validators_=*/std::make_unique>(), + /*xds_resources_delegate_=*/XdsResourcesDelegateOptRef(), + /*xds_config_tracker_=*/XdsConfigTrackerOptRef(), + /*backoff_strategy_=*/ + std::make_unique( + SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), + /*target_xds_authority_=*/"", + /*eds_resources_cache_=*/nullptr}; EXPECT_THROW_WITH_MESSAGE( - GrpcMuxImpl( - local_info_, std::unique_ptr(async_client_), dispatcher_, - *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( - "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), - *stats_.rootScope(), rate_limit_settings_, true, - std::make_unique>(), - std::make_unique( - SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, - random_), - /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), - /*eds_resources_cache=*/nullptr, /*target_xds_authority=*/""), - EnvoyException, + GrpcMuxImpl(grpc_mux_context, true), EnvoyException, "ads: node 'id' and 'cluster' are required. Set it either in 'node' config or via " "--service-node and --service-cluster options."); } diff --git a/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc b/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc index ebec8abfc6c4..f7ee28901f64 100644 --- a/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc +++ b/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc @@ -63,26 +63,26 @@ class NewGrpcMuxImplTestBase : public testing::TestWithParam { void setup() { auto backoff_strategy = std::make_unique( SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_); - + GrpcMuxContext grpc_mux_context{ + /*async_client_=*/std::unique_ptr(async_client_), + /*dispatcher_=*/dispatcher_, + /*service_method_=*/ + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources"), + /*local_info_=*/local_info_, + /*rate_limit_settings_=*/rate_limit_settings_, + /*scope_=*/*stats_.rootScope(), + /*config_validators_=*/std::move(config_validators_), + /*xds_resources_delegate_=*/XdsResourcesDelegateOptRef(), + /*xds_config_tracker_=*/XdsConfigTrackerOptRef(), + /*backoff_strategy_=*/std::move(backoff_strategy), + /*target_xds_authority_=*/"", + /*eds_resources_cache_=*/std::unique_ptr(eds_resources_cache_)}; if (isUnifiedMuxTest()) { - grpc_mux_ = std::make_unique( - std::unique_ptr(async_client_), dispatcher_, - *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( - "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), - *stats_.rootScope(), rate_limit_settings_, local_info_, false, - std::move(config_validators_), std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - std::unique_ptr(eds_resources_cache_)); + grpc_mux_ = std::make_unique(grpc_mux_context, false); return; } - grpc_mux_ = std::make_unique( - std::unique_ptr(async_client_), dispatcher_, - *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( - "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), - *stats_.rootScope(), rate_limit_settings_, local_info_, std::move(config_validators_), - std::move(backoff_strategy), - /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - std::unique_ptr(eds_resources_cache_)); + grpc_mux_ = std::make_unique(grpc_mux_context); } void expectSendMessage(const std::string& type_url, diff --git a/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc b/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc index 0ecb2e8cad5e..78c3df708dc4 100644 --- a/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc +++ b/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc @@ -62,18 +62,25 @@ class GrpcMuxImplTestBase : public testing::Test { void setup() { setup(rate_limit_settings_); } void setup(const RateLimitSettings& custom_rate_limit_settings) { - grpc_mux_ = std::make_unique( - std::unique_ptr(async_client_), dispatcher_, + GrpcMuxContext grpc_mux_context{ + /*async_client_=*/std::unique_ptr(async_client_), + /*dispatcher_=*/dispatcher_, + /*service_method_=*/ *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( - "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), - *stats_.rootScope(), custom_rate_limit_settings, local_info_, true, - std::move(config_validators_), + "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), + /*local_info_=*/local_info_, + /*rate_limit_settings_=*/custom_rate_limit_settings, + /*scope_=*/*stats_.rootScope(), + /*config_validators_=*/std::move(config_validators_), + /*xds_resources_delegate_=*/XdsResourcesDelegateOptRef(), + /*xds_config_tracker_=*/XdsConfigTrackerOptRef(), + /*backoff_strategy_=*/ std::make_unique( SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), - /*xds_config_tracker=*/XdsConfigTrackerOptRef(), - /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), - std::unique_ptr(eds_resources_cache_), /*target_xds_authority=*/""); + /*target_xds_authority_=*/"", + /*eds_resources_cache_=*/std::unique_ptr(eds_resources_cache_)}; + grpc_mux_ = std::make_unique(grpc_mux_context, true); } void expectSendMessage(const std::string& type_url, @@ -891,33 +898,50 @@ TEST_F(GrpcMuxImplTest, UnwatchedTypeAcceptsResources) { TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyClusterName) { EXPECT_CALL(local_info_, clusterName()).WillOnce(ReturnRef(EMPTY_STRING)); + GrpcMuxContext grpc_mux_context{ + /*async_client_=*/std::unique_ptr(async_client_), + /*dispatcher_=*/dispatcher_, + /*service_method_=*/ + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), + /*local_info_=*/local_info_, + /*rate_limit_settings_=*/rate_limit_settings_, + /*scope_=*/*stats_.rootScope(), + /*config_validators_=*/std::make_unique>(), + /*xds_resources_delegate_=*/XdsResourcesDelegateOptRef(), + /*xds_config_tracker_=*/XdsConfigTrackerOptRef(), + /*backoff_strategy_=*/ + std::make_unique( + SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), + /*target_xds_authority_=*/"", + /*eds_resources_cache_=*/nullptr}; EXPECT_THROW_WITH_MESSAGE( - XdsMux::GrpcMuxSotw( - std::unique_ptr(async_client_), dispatcher_, - *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( - "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), - *stats_.rootScope(), rate_limit_settings_, local_info_, true, - std::make_unique>(), - std::make_unique( - SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, - random_), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()), - EnvoyException, + XdsMux::GrpcMuxSotw(grpc_mux_context, true), EnvoyException, "ads: node 'id' and 'cluster' are required. Set it either in 'node' config or via " "--service-node and --service-cluster options."); } TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyNodeName) { EXPECT_CALL(local_info_, nodeName()).WillOnce(ReturnRef(EMPTY_STRING)); + GrpcMuxContext grpc_mux_context{ + /*async_client_=*/std::unique_ptr(async_client_), + /*dispatcher_=*/dispatcher_, + /*service_method_=*/ + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), + /*local_info_=*/local_info_, + /*rate_limit_settings_=*/rate_limit_settings_, + /*scope_=*/*stats_.rootScope(), + /*config_validators_=*/std::make_unique>(), + /*xds_resources_delegate_=*/XdsResourcesDelegateOptRef(), + /*xds_config_tracker_=*/XdsConfigTrackerOptRef(), + /*backoff_strategy_=*/ + std::make_unique( + SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), + /*target_xds_authority_=*/"", + /*eds_resources_cache_=*/nullptr}; EXPECT_THROW_WITH_MESSAGE( - XdsMux::GrpcMuxSotw( - std::unique_ptr(async_client_), dispatcher_, - *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( - "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), - *stats_.rootScope(), rate_limit_settings_, local_info_, true, - std::make_unique>(), nullptr, - /*xds_config_tracker=*/XdsConfigTrackerOptRef()), - EnvoyException, + XdsMux::GrpcMuxSotw(grpc_mux_context, true), EnvoyException, "ads: node 'id' and 'cluster' are required. Set it either in 'node' config or via " "--service-node and --service-cluster options."); } @@ -1023,16 +1047,24 @@ TEST_F(GrpcMuxImplTest, DynamicContextParameters) { TEST_F(GrpcMuxImplTest, AllMuxesStateTest) { setup(); - auto grpc_mux_1 = std::make_unique( - std::unique_ptr(), dispatcher_, + GrpcMuxContext grpc_mux_context{ + /*async_client_=*/std::unique_ptr(), + /*dispatcher_=*/dispatcher_, + /*service_method_=*/ *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( - "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), - *stats_.rootScope(), rate_limit_settings_, local_info_, true, - std::make_unique>(), + "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), + /*local_info_=*/local_info_, + /*rate_limit_settings_=*/rate_limit_settings_, + /*scope_=*/*stats_.rootScope(), + /*config_validators_=*/std::make_unique>(), + /*xds_resources_delegate_=*/XdsResourcesDelegateOptRef(), + /*xds_config_tracker_=*/XdsConfigTrackerOptRef(), + /*backoff_strategy_=*/ std::make_unique( SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), - /*xds_config_tracker=*/XdsConfigTrackerOptRef()); - + /*target_xds_authority_=*/"", + /*eds_resources_cache_=*/nullptr}; + auto grpc_mux_1 = std::make_unique(grpc_mux_context, true); Config::XdsMux::GrpcMuxSotw::shutdownAll(); EXPECT_TRUE(grpc_mux_->isShutdown());