Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LEDS: integrating with EDS #18195

Merged
merged 11 commits into from
Oct 1, 2021
2 changes: 2 additions & 0 deletions source/common/config/protobuf_link_hacks.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "envoy/service/cluster/v3/cds.pb.h"
#include "envoy/service/discovery/v3/ads.pb.h"
#include "envoy/service/endpoint/v3/eds.pb.h"
#include "envoy/service/endpoint/v3/leds.pb.h"
#include "envoy/service/extension/v3/config_discovery.pb.h"
#include "envoy/service/health/v3/hds.pb.h"
#include "envoy/service/listener/v3/lds.pb.h"
Expand All @@ -24,6 +25,7 @@ const envoy::service::listener::v3::LdsDummy _lds_dummy_v3;
const envoy::service::route::v3::RdsDummy _rds_dummy_v3;
const envoy::service::cluster::v3::CdsDummy _cds_dummy_v3;
const envoy::service::endpoint::v3::EdsDummy _eds_dummy_v3;
const envoy::service::endpoint::v3::LedsDummy _leds_dummy_v3;
const envoy::service::route::v3::SrdsDummy _srds_dummy_v3;
const envoy::service::extension::v3::EcdsDummy _ecds_dummy_v3;
const envoy::service::runtime::v3::RtdsDummy _rtds_dummy_v3;
Expand Down
67 changes: 43 additions & 24 deletions source/common/config/subscription_factory_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,36 +115,55 @@ SubscriptionPtr SubscriptionFactoryImpl::collectionSubscriptionFromUrl(
fmt::format("xdstp:// type does not match {} in {}", resource_type,
Config::XdsResourceIdentifier::encodeUrl(collection_locator)));
}
const envoy::config::core::v3::ApiConfigSource& api_config_source = config.api_config_source();
Utility::checkApiConfigSourceSubscriptionBackingCluster(cm_.primaryClusters(),
api_config_source);
switch (config.config_source_specifier_case()) {
case envoy::config::core::v3::ConfigSource::ConfigSourceSpecifierCase::kApiConfigSource: {
const envoy::config::core::v3::ApiConfigSource& api_config_source =
config.api_config_source();
Utility::checkApiConfigSourceSubscriptionBackingCluster(cm_.primaryClusters(),
api_config_source);

SubscriptionOptions options;
// All Envoy collections currently are xDS resource graph roots and require node context
// parameters.
options.add_xdstp_node_context_params_ = true;
switch (api_config_source.api_type()) {
case envoy::config::core::v3::ApiConfigSource::DELTA_GRPC: {
const std::string type_url = TypeUtil::descriptorFullNameToTypeUrl(resource_type);
return std::make_unique<GrpcCollectionSubscriptionImpl>(
collection_locator,
std::make_shared<Config::NewGrpcMuxImpl>(
Config::Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(),
api_config_source, scope, true)
->createUncachedRawAsyncClient(),
dispatcher_, deltaGrpcMethod(type_url), api_.randomGenerator(), scope,
Utility::parseRateLimitSettings(api_config_source), local_info_),
callbacks, resource_decoder, stats, dispatcher_,
Utility::configSourceInitialFetchTimeout(config), false, options);
SubscriptionOptions options;
// All Envoy collections currently are xDS resource graph roots and require node context
// parameters.
options.add_xdstp_node_context_params_ = true;
switch (api_config_source.api_type()) {
case envoy::config::core::v3::ApiConfigSource::DELTA_GRPC: {
const std::string type_url = TypeUtil::descriptorFullNameToTypeUrl(resource_type);
return std::make_unique<GrpcCollectionSubscriptionImpl>(
collection_locator,
std::make_shared<Config::NewGrpcMuxImpl>(
Config::Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(),
api_config_source, scope, true)
->createUncachedRawAsyncClient(),
dispatcher_, deltaGrpcMethod(type_url), api_.randomGenerator(), scope,
Utility::parseRateLimitSettings(api_config_source), local_info_),
callbacks, resource_decoder, stats, dispatcher_,
Utility::configSourceInitialFetchTimeout(config), false, options);
}
case envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC: {
return std::make_unique<GrpcCollectionSubscriptionImpl>(
collection_locator, cm_.adsMux(), callbacks, resource_decoder, stats, dispatcher_,
Utility::configSourceInitialFetchTimeout(config), false, options);
}
default:
throw EnvoyException(fmt::format("Unknown xdstp:// transport API type in {}",
api_config_source.DebugString()));
}
}
case envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC: {
case envoy::config::core::v3::ConfigSource::ConfigSourceSpecifierCase::kAds: {
// TODO(adisuissa): verify that the ADS is set up in delta-xDS mode.
SubscriptionOptions options;
// All Envoy collections currently are xDS resource graph roots and require node context
// parameters.
options.add_xdstp_node_context_params_ = true;
return std::make_unique<GrpcCollectionSubscriptionImpl>(
collection_locator, cm_.adsMux(), callbacks, resource_decoder, stats, dispatcher_,
Utility::configSourceInitialFetchTimeout(config), false, options);
Utility::configSourceInitialFetchTimeout(config), true, options);
}
default:
throw EnvoyException(fmt::format("Unknown xdstp:// transport API type in {}",
api_config_source.DebugString()));
throw EnvoyException("Missing or not supported config source specifier in "
adisuissa marked this conversation as resolved.
Show resolved Hide resolved
"envoy::config::core::v3::ConfigSource for a collection. Only ADS and "
"gRPC in delta-xDS mode are supported.");
}
}
default:
Expand Down
1 change: 1 addition & 0 deletions source/common/config/type_to_endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ TypeUrlToV3ServiceMap* buildTypeUrlToServiceMap() {
"envoy.service.secret.v3.SecretDiscoveryService",
"envoy.service.cluster.v3.ClusterDiscoveryService",
"envoy.service.endpoint.v3.EndpointDiscoveryService",
"envoy.service.endpoint.v3.LocalityEndpointDiscoveryService",
"envoy.service.listener.v3.ListenerDiscoveryService",
"envoy.service.runtime.v3.RuntimeDiscoveryService",
"envoy.service.extension.v3.ExtensionConfigDiscoveryService",
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ envoy_cc_library(
hdrs = ["eds.h"],
deps = [
":cluster_factory_lib",
":leds_lib",
":upstream_includes",
"//envoy/config:grpc_mux_interface",
"//envoy/config:subscription_factory_interface",
Expand Down
9 changes: 6 additions & 3 deletions source/common/upstream/cds_api_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ std::vector<std::string>
CdsApiHelper::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& system_version_info) {
Config::ScopedResume maybe_resume_eds;
Config::ScopedResume maybe_resume_eds_leds;
if (cm_.adsMux()) {
const auto type_url = Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>();
maybe_resume_eds = cm_.adsMux()->pause(type_url);
// A cluster update pauses sending EDS and LEDS requests.
const auto eds_type_url =
Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>();
const auto leds_type_url = Config::getTypeUrl<envoy::config::endpoint::v3::LbEndpoint>();
maybe_resume_eds_leds = cm_.adsMux()->pause({eds_type_url, leds_type_url});
htuch marked this conversation as resolved.
Show resolved Hide resolved
}

ENVOY_LOG(info, "{}: add {} cluster(s), remove {} cluster(s)", name_, added_resources.size(),
Expand Down
7 changes: 4 additions & 3 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,12 @@ void ClusterManagerInitHelper::maybeFinishInitialize() {
// If the first CDS response doesn't have any primary cluster, ClusterLoadAssignment
// should be already paused by CdsApiImpl::onConfigUpdate(). Need to check that to
// avoid double pause ClusterLoadAssignment.
Config::ScopedResume maybe_resume_eds;
Config::ScopedResume maybe_resume_eds_leds;
if (cm_.adsMux()) {
const auto type_url =
const auto eds_type_url =
Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>();
maybe_resume_eds = cm_.adsMux()->pause(type_url);
const auto leds_type_url = Config::getTypeUrl<envoy::config::endpoint::v3::LbEndpoint>();
maybe_resume_eds_leds = cm_.adsMux()->pause({eds_type_url, leds_type_url});
}
initializeSecondaryClusters();
}
Expand Down
130 changes: 115 additions & 15 deletions source/common/upstream/eds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ EdsClusterImpl::EdsClusterImpl(
added_via_api, factory_context.mainThreadDispatcher().timeSource()),
Envoy::Config::SubscriptionBase<envoy::config::endpoint::v3::ClusterLoadAssignment>(
factory_context.messageValidationVisitor(), "cluster_name"),
local_info_(factory_context.localInfo()),
factory_context_(factory_context), local_info_(factory_context.localInfo()),
cluster_name_(cluster.eds_cluster_config().service_name().empty()
? cluster.name()
: cluster.eds_cluster_config().service_name()) {
Expand All @@ -46,22 +46,30 @@ void EdsClusterImpl::startPreInit() { subscription_->start({cluster_name_}); }
void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& host_update_cb) {
absl::flat_hash_set<std::string> all_new_hosts;
PriorityStateManager priority_state_manager(parent_, parent_.local_info_, &host_update_cb);
for (const auto& locality_lb_endpoint : cluster_load_assignment_.endpoints()) {
for (const auto& locality_lb_endpoint : parent_.cluster_load_assignment_.endpoints()) {
parent_.validateEndpointsForZoneAwareRouting(locality_lb_endpoint);

priority_state_manager.initializePriorityFor(locality_lb_endpoint);

for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) {
auto address = parent_.resolveProtoAddress(lb_endpoint.endpoint().address());
// When the configuration contains duplicate hosts, only the first one will be retained.
if (all_new_hosts.count(address->asString()) > 0) {
continue;
if (locality_lb_endpoint.has_leds_cluster_locality_config()) {
// The locality uses LEDS, fetch its dynamic data, which must be ready, or otherwise
// the batchUpdate method should not have been called.
const auto& leds_config = locality_lb_endpoint.leds_cluster_locality_config();

// The batchUpdate call must be performed after all the endpoints of all localities
// were received.
ASSERT(parent_.leds_localities_.find(leds_config) != parent_.leds_localities_.end() &&
parent_.leds_localities_[leds_config]->isUpdated());
for (const auto& [_, lb_endpoint] :
parent_.leds_localities_[leds_config]->getEndpointsMap()) {
updateLocalityEndpoints(lb_endpoint, locality_lb_endpoint, priority_state_manager,
all_new_hosts);
}
} else {
for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) {
updateLocalityEndpoints(lb_endpoint, locality_lb_endpoint, priority_state_manager,
all_new_hosts);
}

priority_state_manager.registerHostForPriority(lb_endpoint.endpoint().hostname(), address,
locality_lb_endpoint, lb_endpoint,
parent_.time_source_);
all_new_hosts.emplace(address->asString());
}
}

Expand All @@ -73,8 +81,9 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h
HostMapConstSharedPtr all_hosts = parent_.prioritySet().crossPriorityHostMap();
ASSERT(all_hosts != nullptr);

const uint32_t overprovisioning_factor = PROTOBUF_GET_WRAPPED_OR_DEFAULT(
cluster_load_assignment_.policy(), overprovisioning_factor, kDefaultOverProvisioningFactor);
const uint32_t overprovisioning_factor =
PROTOBUF_GET_WRAPPED_OR_DEFAULT(parent_.cluster_load_assignment_.policy(),
overprovisioning_factor, kDefaultOverProvisioningFactor);

LocalityWeightsMap empty_locality_map;

Expand Down Expand Up @@ -118,6 +127,23 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h
parent_.onPreInitComplete();
}

void EdsClusterImpl::BatchUpdateHelper::updateLocalityEndpoints(
const envoy::config::endpoint::v3::LbEndpoint& lb_endpoint,
const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint,
PriorityStateManager& priority_state_manager, absl::flat_hash_set<std::string>& all_new_hosts) {
const auto address = parent_.resolveProtoAddress(lb_endpoint.endpoint().address());
// When the configuration contains duplicate hosts, only the first one will be retained.
const auto address_as_string = address->asString();
if (all_new_hosts.count(address_as_string) > 0) {
return;
}

priority_state_manager.registerHostForPriority(lb_endpoint.endpoint().hostname(), address,
locality_lb_endpoint, lb_endpoint,
parent_.time_source_);
all_new_hosts.emplace(address_as_string);
}

void EdsClusterImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
const std::string&) {
if (!validateUpdateSize(resources.size())) {
Expand All @@ -130,6 +156,17 @@ void EdsClusterImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef
throw EnvoyException(fmt::format("Unexpected EDS cluster (expecting {}): {}", cluster_name_,
cluster_load_assignment.cluster_name()));
}
// Validate that each locality doesn't have both LEDS and endpoints defined.
// TODO(adisuissa): This is only needed for the API v3 support. In future major versions
// the oneof definition will take care of it.
for (const auto& locality : cluster_load_assignment.endpoints()) {
if (locality.has_leds_cluster_locality_config() && locality.lb_endpoints_size() > 0) {
throw EnvoyException(fmt::format(
adisuissa marked this conversation as resolved.
Show resolved Hide resolved
"A ClusterLoadAssignment for cluster {} cannot include both LEDS (resource: {}) and a "
"list of endpoints.",
cluster_name_, locality.leds_cluster_locality_config().leds_collection_name()));
}
}

// Disable timer (if enabled) as we have received new assignment.
if (assignment_timeout_->enabled()) {
Expand All @@ -144,7 +181,60 @@ void EdsClusterImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef
assignment_timeout_->enableTimer(std::chrono::milliseconds(stale_after_ms));
}

BatchUpdateHelper helper(*this, cluster_load_assignment);
// Pause LEDS messages until the EDS config is finished processing.
Config::ScopedResume maybe_resume_leds;
if (factory_context_.clusterManager().adsMux()) {
const auto type_url = Config::getTypeUrl<envoy::config::endpoint::v3::LbEndpoint>();
maybe_resume_leds = factory_context_.clusterManager().adsMux()->pause(type_url);
}

// Compare the current set of LEDS localities (localities using LEDS) to the one received in the
// update. A LEDS locality can either be added, removed, or kept. If it is added we add a
// subscription to it, and if it is removed we delete the subscription.
LedsConfigSet cla_leds_configs;

for (const auto& locality : cluster_load_assignment.endpoints()) {
if (locality.has_leds_cluster_locality_config()) {
cla_leds_configs.emplace(locality.leds_cluster_locality_config());
}
}

// Remove the LEDS localities that are not needed anymore.
absl::erase_if(leds_localities_, [&cla_leds_configs](const auto& item) {
auto const& [leds_config, _] = item;
// Returns true if the leds_config isn't in the cla_leds_configs
return cla_leds_configs.find(leds_config) == cla_leds_configs.end();
});

// Add all the LEDS localities that are new.
for (const auto& leds_config : cla_leds_configs) {
if (leds_localities_.find(leds_config) == leds_localities_.end()) {
ENVOY_LOG(trace, "Found new LEDS config in EDS onConfigUpdate() for cluster {}: {}",
cluster_name_, leds_config.DebugString());

// Create a new LEDS subscription and add it to the subscriptions map.
LedsSubscriptionPtr leds_locality_subscription = std::make_unique<LedsSubscription>(
leds_config, cluster_name_, factory_context_, info_->statsScope(), [&]() {
// Called upon an update to the locality.
if (validateAllLedsUpdated()) {
BatchUpdateHelper helper(*this);
priority_set_.batchHostUpdate(helper);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the invariant that this is called only once per update and for the last LEDS subscription to update?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is called once per LEDS update, and should cover the following cases:

  1. Before all the locality endpoints of a specific EDS cluster are ready (this will fail the validateAllLedsUpdated() call, and no actual update will occur.
  2. Whenever a new update for some locality arrives.

}
});
leds_localities_.emplace(leds_config, std::move(leds_locality_subscription));
}
}

// TODO(adisuissa): optimize for no-copy, if possible.
cluster_load_assignment_ = cluster_load_assignment;

// If all the LEDS localities are updated, the EDS update can occur. If not, then when the last
// LEDS locality will be updated, it will trigger the EDS update helper.
if (!validateAllLedsUpdated()) {
return;
}

BatchUpdateHelper helper(*this);
priority_set_.batchHostUpdate(helper);
}

Expand Down Expand Up @@ -291,6 +381,16 @@ EdsClusterFactory::createClusterImpl(
nullptr);
}

bool EdsClusterImpl::validateAllLedsUpdated() const {
// Iterate through all LEDS based localities, and if they are all updated return true.
for (const auto& [_, leds_subscription] : leds_localities_) {
if (!leds_subscription->isUpdated()) {
return false;
}
}
return true;
}

/**
* Static registration for the Eds cluster factory. @see RegisterFactory.
*/
Expand Down
27 changes: 22 additions & 5 deletions source/common/upstream/eds.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "source/common/config/subscription_base.h"
#include "source/common/upstream/cluster_factory_impl.h"
#include "source/common/upstream/leds.h"
#include "source/common/upstream/upstream_impl.h"

namespace Envoy {
Expand Down Expand Up @@ -60,27 +61,43 @@ class EdsClusterImpl
void startPreInit() override;
void onAssignmentTimeout();

// Returns true iff all the LEDS based localities were updated.
bool validateAllLedsUpdated() const;

class BatchUpdateHelper : public PrioritySet::BatchUpdateCb {
public:
BatchUpdateHelper(
EdsClusterImpl& parent,
const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment)
: parent_(parent), cluster_load_assignment_(cluster_load_assignment) {}
BatchUpdateHelper(EdsClusterImpl& parent) : parent_(parent) {}

// Upstream::PrioritySet::BatchUpdateCb
void batchUpdate(PrioritySet::HostUpdateCb& host_update_cb) override;

private:
void updateLocalityEndpoints(
const envoy::config::endpoint::v3::LbEndpoint& lb_endpoint,
const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint,
PriorityStateManager& priority_state_manager,
absl::flat_hash_set<std::string>& all_new_hosts);

EdsClusterImpl& parent_;
const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment_;
};

Config::SubscriptionPtr subscription_;
Server::Configuration::TransportSocketFactoryContextImpl factory_context_;
const LocalInfo::LocalInfo& local_info_;
const std::string cluster_name_;
std::vector<LocalityWeightsMap> locality_weights_map_;
Event::TimerPtr assignment_timeout_;
InitializePhase initialize_phase_;
using LedsConfigSet = absl::flat_hash_set<envoy::config::endpoint::v3::LedsClusterLocalityConfig,
MessageUtil, MessageUtil>;
using LedsConfigMap = absl::flat_hash_map<envoy::config::endpoint::v3::LedsClusterLocalityConfig,
LedsSubscriptionPtr, MessageUtil, MessageUtil>;
// Maps between a LEDS configuration (ConfigSource + collection name) to the locality endpoints
// data.
LedsConfigMap leds_localities_;
// TODO(adisuissa): Avoid saving the entire cluster load assignment, only the
// relevant parts of the config for each locality.
envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment_;
};

using EdsClusterImplSharedPtr = std::shared_ptr<EdsClusterImpl>;
Expand Down
Loading