Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eds: Adding eds caching support to grpc-mux #28273

Merged
merged 9 commits into from
Aug 2, 2023
1 change: 1 addition & 0 deletions envoy/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ envoy_cc_library(
name = "grpc_mux_interface",
hdrs = ["grpc_mux.h"],
deps = [
":eds_resources_cache_interface",
":subscription_interface",
"//envoy/stats:stats_macros",
"//source/common/common:cleanup_lib",
Expand Down
7 changes: 7 additions & 0 deletions envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "envoy/common/exception.h"
#include "envoy/common/pure.h"
#include "envoy/config/eds_resources_cache.h"
#include "envoy/config/subscription.h"
#include "envoy/stats/stats_macros.h"

Expand Down Expand Up @@ -105,6 +106,12 @@ class GrpcMux {

virtual void requestOnDemandUpdate(const std::string& type_url,
const absl::flat_hash_set<std::string>& for_update) PURE;

/**
* Returns an EdsResourcesCache for this GrpcMux if there is one.
* @return EdsResourcesCacheOptRef optional eds resources cache for the gRPC-mux.
*/
virtual EdsResourcesCacheOptRef edsResourcesCache() PURE;
};

using GrpcMuxPtr = std::unique_ptr<GrpcMux>;
Expand Down
2 changes: 1 addition & 1 deletion envoy/config/subscription_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class MuxFactory : public Config::UntypedFactory {
const LocalInfo::LocalInfo& local_info,
std::unique_ptr<CustomConfigValidators>&& config_validators,
BackOffStrategyPtr&& backoff_strategy, OptRef<XdsConfigTracker> xds_config_tracker,
OptRef<XdsResourcesDelegate> xds_resources_delegate) PURE;
OptRef<XdsResourcesDelegate> xds_resources_delegate, bool use_eds_resources_cache) PURE;
};

} // namespace Config
Expand Down
2 changes: 2 additions & 0 deletions source/common/config/null_grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class NullGrpcMuxImpl : public GrpcMux,
ENVOY_BUG(false, "unexpected request for on demand update");
}

EdsResourcesCacheOptRef edsResourcesCache() override { return absl::nullopt; }

void onWriteable() override {}
void onStreamEstablished() override {}
void onEstablishmentFailure() override {}
Expand Down
3 changes: 3 additions & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ FALSE_RUNTIME_GUARD(envoy_reloadable_features_enable_include_histograms);
FALSE_RUNTIME_GUARD(envoy_reloadable_features_refresh_rtt_after_request);
// TODO(danzh) false deprecate it once QUICHE has its own enable/disable flag.
FALSE_RUNTIME_GUARD(envoy_reloadable_features_quic_reject_all);
// TODO(adisuissa) this will be enabled by default once the work on the feature is
// done in EdsClusterImpl.
FALSE_RUNTIME_GUARD(envoy_restart_features_use_eds_cache_for_ads);
htuch marked this conversation as resolved.
Show resolved Hide resolved

// Block of non-boolean flags. Use of int flags is deprecated. Do not add more.
ABSL_FLAG(uint64_t, re2_max_program_size_error_level, 100, ""); // NOLINT
Expand Down
4 changes: 2 additions & 2 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ ClusterManagerImpl::ClusterManagerImpl(
->createUncachedRawAsyncClient(),
main_thread_dispatcher, random_, *stats_.rootScope(), dyn_resources.ads_config(),
local_info, std::move(custom_config_validators), std::move(backoff_strategy),
makeOptRefFromPtr(xds_config_tracker_.get()), {});
makeOptRefFromPtr(xds_config_tracker_.get()), {}, false);
} else {
Config::Utility::checkTransportVersion(dyn_resources.ads_config());
auto xds_delegate_opt_ref = makeOptRefFromPtr(xds_resources_delegate_.get());
Expand All @@ -439,7 +439,7 @@ ClusterManagerImpl::ClusterManagerImpl(
->createUncachedRawAsyncClient(),
main_thread_dispatcher, random_, *stats_.rootScope(), dyn_resources.ads_config(),
local_info, std::move(custom_config_validators), std::move(backoff_strategy),
makeOptRefFromPtr(xds_config_tracker_.get()), xds_delegate_opt_ref);
makeOptRefFromPtr(xds_config_tracker_.get()), xds_delegate_opt_ref, false);
}
} else {
ads_mux_ = std::make_unique<Config::NullGrpcMuxImpl>();
Expand Down
6 changes: 6 additions & 0 deletions source/extensions/config_subscription/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ envoy_cc_library(
srcs = ["grpc_mux_impl.cc"],
hdrs = ["grpc_mux_impl.h"],
deps = [
":eds_resources_cache_lib",
":grpc_stream_lib",
":xds_source_id_lib",
"//envoy/config:custom_config_validators_interface",
Expand All @@ -34,6 +35,7 @@ envoy_cc_library(
"//source/common/memory:utils_lib",
"//source/common/protobuf",
"@com_google_absl//absl/container:btree",
"@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
],
)
Expand All @@ -44,6 +46,7 @@ envoy_cc_library(
hdrs = ["new_grpc_mux_impl.h"],
deps = [
":delta_subscription_state_lib",
":eds_resources_cache_lib",
":grpc_stream_lib",
":pausable_ack_queue_lib",
":watch_map_lib",
Expand All @@ -54,6 +57,7 @@ envoy_cc_library(
"//source/common/config:xds_context_params_lib",
"//source/common/config:xds_resource_lib",
"//source/common/memory:utils_lib",
"@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
],
)
Expand All @@ -63,6 +67,7 @@ envoy_cc_library(
srcs = ["grpc_subscription_impl.cc"],
hdrs = ["grpc_subscription_impl.h"],
deps = [
":eds_resources_cache_lib",
":grpc_mux_lib",
":new_grpc_mux_lib",
"//envoy/config:subscription_interface",
Expand Down Expand Up @@ -210,6 +215,7 @@ envoy_cc_library(
"//source/common/common:minimal_logger_lib",
"//source/common/common:utility_lib",
"//source/common/config:decoded_resource_lib",
"//source/common/config:resource_name_lib",
"//source/common/config:utility_lib",
"//source/common/config:xds_resource_lib",
"//source/common/protobuf",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ SubscriptionPtr DeltaGrpcCollectionConfigSubscriptionFactory::create(
data.dispatcher_, deltaGrpcMethod(data.type_url_), data.scope_,
Utility::parseRateLimitSettings(api_config_source), data.local_info_,
std::move(custom_config_validators), std::move(backoff_strategy),
data.xds_config_tracker_),
data.xds_config_tracker_,
// No EDS resources cache needed from collections.
/*eds_resources_cache=*/nullptr),
data.callbacks_, data.resource_decoder_, data.stats_, data.dispatcher_,
Utility::configSourceInitialFetchTimeout(data.config_), /*is_aggregated=*/false,
data.options_);
Expand Down
50 changes: 37 additions & 13 deletions source/extensions/config_subscription/grpc/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "source/common/config/utility.h"
#include "source/common/memory/utils.h"
#include "source/common/protobuf/protobuf.h"
#include "source/extensions/config_subscription/grpc/eds_resources_cache_impl.h"
#include "source/extensions/config_subscription/grpc/xds_source_id.h"

#include "absl/container/btree_map.h"
Expand Down Expand Up @@ -56,21 +57,21 @@ std::string convertToWildcard(const std::string& resource_name) {
}
} // namespace

GrpcMuxImpl::GrpcMuxImpl(const LocalInfo::LocalInfo& local_info,
Grpc::RawAsyncClientPtr async_client, Event::Dispatcher& dispatcher,
const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope,
const RateLimitSettings& rate_limit_settings, bool skip_subsequent_node,
CustomConfigValidatorsPtr&& config_validators,
BackOffStrategyPtr backoff_strategy,
XdsConfigTrackerOptRef xds_config_tracker,
XdsResourcesDelegateOptRef xds_resources_delegate,
const std::string& target_xds_authority)
GrpcMuxImpl::GrpcMuxImpl(
const LocalInfo::LocalInfo& local_info, Grpc::RawAsyncClientPtr async_client,
Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method,
Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, bool skip_subsequent_node,
CustomConfigValidatorsPtr&& config_validators, BackOffStrategyPtr backoff_strategy,
XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate,
EdsResourcesCachePtr eds_resources_cache, const std::string& target_xds_authority)
: grpc_stream_(this, std::move(async_client), service_method, dispatcher, scope,
std::move(backoff_strategy), rate_limit_settings),
local_info_(local_info), skip_subsequent_node_(skip_subsequent_node),
config_validators_(std::move(config_validators)), xds_config_tracker_(xds_config_tracker),
xds_resources_delegate_(xds_resources_delegate), target_xds_authority_(target_xds_authority),
first_stream_request_(true), dispatcher_(dispatcher),
xds_resources_delegate_(xds_resources_delegate),
eds_resources_cache_(std::move(eds_resources_cache)),
target_xds_authority_(target_xds_authority), first_stream_request_(true),
dispatcher_(dispatcher),
dynamic_update_callback_handle_(local_info.contextProvider().addDynamicContextUpdateCallback(
[this](absl::string_view resource_type_url) {
onDynamicContextUpdate(resource_type_url);
Expand Down Expand Up @@ -187,8 +188,14 @@ GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoderSharedPtr resource_decoder,
const SubscriptionOptions& options) {
// Resource cache is only used for EDS resources.
EdsResourcesCacheOptRef resources_cache{absl::nullopt};
if (eds_resources_cache_ &&
(type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>())) {
resources_cache = makeOptRefFromPtr(eds_resources_cache_.get());
}
auto watch = std::make_unique<GrpcMuxWatchImpl>(resources, callbacks, resource_decoder, type_url,
*this, options, local_info_);
*this, options, local_info_, resources_cache);
ENVOY_LOG(debug, "gRPC mux addWatch for " + type_url);

// Lazily kick off the requests based on first subscription. This has the
Expand Down Expand Up @@ -412,6 +419,19 @@ void GrpcMuxImpl::processDiscoveryResources(const std::vector<DecodedResourcePtr
// updates in the message for EDS/RDS.
if (!found_resources.empty()) {
watch->callbacks_.onConfigUpdate(found_resources, version_info);
// Resource cache is only used for EDS resources.
htuch marked this conversation as resolved.
Show resolved Hide resolved
if (eds_resources_cache_ &&
(type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>())) {
for (const auto& resource : found_resources) {
const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment =
dynamic_cast<const envoy::config::endpoint::v3::ClusterLoadAssignment&>(
resource.get().resource());
eds_resources_cache_->setResource(resource.get().name(), cluster_load_assignment);
}
// No need to remove resources from the cache, as currently only non-collection
// subscriptions are supported, and these resources are removed in the call
// to updateWatchInterest().
}
}
}

Expand Down Expand Up @@ -532,14 +552,18 @@ class GrpcMuxFactory : public MuxFactory {
const envoy::config::core::v3::ApiConfigSource& ads_config,
const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators,
BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker,
XdsResourcesDelegateOptRef xds_resources_delegate) override {
XdsResourcesDelegateOptRef xds_resources_delegate, bool use_eds_resources_cache) override {
return std::make_shared<Config::GrpcMuxImpl>(
local_info, std::move(async_client), dispatcher,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"),
scope, Utility::parseRateLimitSettings(ads_config),
ads_config.set_node_on_first_message_only(), std::move(config_validators),
std::move(backoff_strategy), xds_config_tracker, xds_resources_delegate,
(use_eds_resources_cache &&
Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads"))
? std::make_unique<EdsResourcesCacheImpl>(dispatcher)
: nullptr,
Config::Utility::getGrpcControlPlane(ads_config).value_or(""));
}
};
Expand Down
44 changes: 40 additions & 4 deletions source/extensions/config_subscription/grpc/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "envoy/common/random_generator.h"
#include "envoy/common/time.h"
#include "envoy/config/custom_config_validators.h"
#include "envoy/config/endpoint/v3/endpoint.pb.h"
#include "envoy/config/grpc_mux.h"
#include "envoy/config/subscription.h"
#include "envoy/config/xds_config_tracker.h"
Expand All @@ -20,6 +21,7 @@
#include "source/common/common/logger.h"
#include "source/common/common/utility.h"
#include "source/common/config/api_version.h"
#include "source/common/config/resource_name.h"
#include "source/common/config/ttl.h"
#include "source/common/config/utility.h"
#include "source/common/config/xds_context_params.h"
Expand All @@ -44,7 +46,7 @@ class GrpcMuxImpl : public GrpcMux,
bool skip_subsequent_node, CustomConfigValidatorsPtr&& config_validators,
BackOffStrategyPtr backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker,
XdsResourcesDelegateOptRef xds_resources_delegate,
const std::string& target_xds_authority);
EdsResourcesCachePtr eds_resources_cache, const std::string& target_xds_authority);

~GrpcMuxImpl() override;

Expand Down Expand Up @@ -72,6 +74,10 @@ class GrpcMuxImpl : public GrpcMux,
void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set<std::string>&) override {
}

EdsResourcesCacheOptRef edsResourcesCache() override {
return makeOptRefFromPtr(eds_resources_cache_.get());
}

void handleDiscoveryResponse(
std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&& message);

Expand Down Expand Up @@ -99,17 +105,29 @@ class GrpcMuxImpl : public GrpcMux,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoderSharedPtr resource_decoder, const std::string& type_url,
GrpcMuxImpl& parent, const SubscriptionOptions& options,
const LocalInfo::LocalInfo& local_info)
const LocalInfo::LocalInfo& local_info,
EdsResourcesCacheOptRef eds_resources_cache)
: callbacks_(callbacks), resource_decoder_(resource_decoder), type_url_(type_url),
parent_(parent), subscription_options_(options), local_info_(local_info),
watches_(parent.apiStateFor(type_url).watches_) {
watches_(parent.apiStateFor(type_url).watches_),
eds_resources_cache_(eds_resources_cache) {
updateResources(resources);
// If eds resources cache is provided, then the type must be ClusterLoadAssignment.
ASSERT(
!eds_resources_cache_.has_value() ||
(type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>()));
}

~GrpcMuxWatchImpl() override {
watches_.erase(iter_);
if (!resources_.empty()) {
parent_.queueDiscoveryRequest(type_url_);
if (eds_resources_cache_.has_value()) {
// All resources are to be removed, so remove them from the cache.
for (const auto& resource_name : resources_) {
eds_resources_cache_->removeResource(resource_name);
htuch marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}

Expand All @@ -131,7 +149,12 @@ class GrpcMuxImpl : public GrpcMux,

private:
void updateResources(const absl::flat_hash_set<std::string>& resources) {
resources_.clear();
// Finding the list of removed resources by keeping the current resources
// set until the end the function and computing the diff.
// Temporarily keep the resources prior to the update to find which ones
// were removed.
std::set<std::string> previous_resources;
htuch marked this conversation as resolved.
Show resolved Hide resolved
previous_resources.swap(resources_);
std::transform(
resources.begin(), resources.end(), std::inserter(resources_, resources_.begin()),
[this](const std::string& resource_name) -> std::string {
Expand All @@ -148,6 +171,16 @@ class GrpcMuxImpl : public GrpcMux,
}
return resource_name;
});
if (eds_resources_cache_.has_value()) {
// Compute the removed resources and remove them from the cache.
std::vector<std::string> removed_resources;
std::set_difference(previous_resources.begin(), previous_resources.end(),
resources_.begin(), resources_.end(),
std::back_inserter(removed_resources));
for (const auto& resource_name : removed_resources) {
eds_resources_cache_->removeResource(resource_name);
}
}
// move this watch to the beginning of the list
iter_ = watches_.emplace(watches_.begin(), this);
}
Expand All @@ -157,6 +190,8 @@ class GrpcMuxImpl : public GrpcMux,
const LocalInfo::LocalInfo& local_info_;
WatchList& watches_;
WatchList::iterator iter_;
// Optional cache for the specific ClusterLoadAssignments of this watch.
EdsResourcesCacheOptRef eds_resources_cache_;
};

// Per muxed API state.
Expand Down Expand Up @@ -212,6 +247,7 @@ class GrpcMuxImpl : public GrpcMux,
CustomConfigValidatorsPtr config_validators_;
XdsConfigTrackerOptRef xds_config_tracker_;
XdsResourcesDelegateOptRef xds_resources_delegate_;
EdsResourcesCachePtr eds_resources_cache_;
const std::string target_xds_authority_;
bool first_stream_request_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ GrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionDat
Utility::parseRateLimitSettings(api_config_source), data.local_info_,
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
std::move(backoff_strategy), data.xds_config_tracker_, data.xds_resources_delegate_,
/*std::move(data.eds_resources_cache_)*/ nullptr, // EDS cache is only used for ADS.
control_plane_id);
} else {
mux = std::make_shared<Config::GrpcMuxImpl>(
Expand All @@ -44,6 +45,7 @@ GrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionDat
Utility::parseRateLimitSettings(api_config_source),
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
std::move(backoff_strategy), data.xds_config_tracker_, data.xds_resources_delegate_,
/*std::move(data.eds_resources_cache_)*/ nullptr, // EDS cache is only used for ADS.
control_plane_id);
}
return std::make_unique<GrpcSubscriptionImpl>(
Expand Down Expand Up @@ -73,15 +75,19 @@ DeltaGrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::Subscripti
data.dispatcher_, deltaGrpcMethod(data.type_url_), data.scope_,
Utility::parseRateLimitSettings(api_config_source), data.local_info_,
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
std::move(backoff_strategy), data.xds_config_tracker_);
std::move(backoff_strategy), data.xds_config_tracker_,
/*std::move(data.eds_resources_cache_)*/ nullptr // EDS cache is only used for ADS.
);
} else {
mux = std::make_shared<Config::NewGrpcMuxImpl>(
Config::Utility::factoryForGrpcApiConfigSource(data.cm_.grpcAsyncClientManager(),
api_config_source, data.scope_, true)
->createUncachedRawAsyncClient(),
data.dispatcher_, deltaGrpcMethod(data.type_url_), data.scope_,
Utility::parseRateLimitSettings(api_config_source), data.local_info_,
std::move(custom_config_validators), std::move(backoff_strategy), data.xds_config_tracker_);
std::move(custom_config_validators), std::move(backoff_strategy), data.xds_config_tracker_,
/*std::move(data.eds_resources_cache_)*/ nullptr // EDS cache is only used for ADS.
);
}
return std::make_unique<GrpcSubscriptionImpl>(
std::move(mux), data.callbacks_, data.resource_decoder_, data.stats_, data.type_url_,
Expand Down
Loading