Skip to content

Commit

Permalink
OriginalDstCluster: Use ThreadAwareLoadBalancer (#7820)
Browse files Browse the repository at this point in the history
Signed-off-by: Jarno Rajahalme <[email protected]>
  • Loading branch information
jrajahalme authored and mattklein123 committed Aug 8, 2019
1 parent c482279 commit e03936e
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 182 deletions.
7 changes: 6 additions & 1 deletion api/envoy/api/v2/cds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,12 @@ message Cluster {
// Refer to the :ref:`original destination load balancing
// policy<arch_overview_load_balancing_types_original_destination>`
// for an explanation.
ORIGINAL_DST_LB = 4;
//
// .. attention::
//
// **This load balancing policy is deprecated**. Use CLUSTER_PROVIDED instead.
//
ORIGINAL_DST_LB = 4 [deprecated = true];

// Refer to the :ref:`Maglev load balancing policy<arch_overview_load_balancing_types_maglev>`
// for an explanation.
Expand Down
2 changes: 1 addition & 1 deletion docs/root/intro/deprecated.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Deprecated items below are listed in chronological order.

Version 1.12.0 (pending)
========================

* The ORIGINAL_DST_LB :ref:`load balancing policy <envoy_api_field_Cluster.lb_policy>` is deprecated, use CLUSTER_PROVIDED policy instead when configuring an :ref:`original destination cluster <envoy_api_field_Cluster.type>`.

Version 1.11.0 (July 11, 2019)
==============================
Expand Down
10 changes: 2 additions & 8 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1114,18 +1114,12 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry(
}
case LoadBalancerType::ClusterProvided:
case LoadBalancerType::RingHash:
case LoadBalancerType::Maglev: {
case LoadBalancerType::Maglev:
case LoadBalancerType::OriginalDst: {
ASSERT(lb_factory_ != nullptr);
lb_ = lb_factory_->create();
break;
}
case LoadBalancerType::OriginalDst: {
ASSERT(lb_factory_ == nullptr);
lb_ = std::make_unique<OriginalDstCluster::LoadBalancer>(
priority_set_, parent.parent_.active_clusters_.at(cluster->name())->cluster_,
cluster->lbOriginalDstConfig());
break;
}
}
}
}
Expand Down
152 changes: 68 additions & 84 deletions source/common/upstream/original_dst_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,11 @@
namespace Envoy {
namespace Upstream {

// Static cast below is guaranteed to succeed, as code instantiating the cluster
// configuration, that is run prior to this code, checks that an OriginalDstCluster is
// always configured with an OriginalDstCluster::LoadBalancer, and that an
// OriginalDstCluster::LoadBalancer is never configured with any other type of cluster,
// and throws an exception otherwise.

OriginalDstCluster::LoadBalancer::LoadBalancer(
PrioritySet& priority_set, ClusterSharedPtr& parent,
const absl::optional<envoy::api::v2::Cluster::OriginalDstLbConfig>& config)
: priority_set_(priority_set), parent_(std::static_pointer_cast<OriginalDstCluster>(parent)),
info_(parent->info()), use_http_header_(config ? config.value().use_http_header() : false) {
// priority_set_ is initially empty.
priority_set_.addMemberUpdateCb(
[this](const HostVector& hosts_added, const HostVector& hosts_removed) -> void {
// Update the hosts map
// TODO(ramaraochavali): use cluster stats and move the log lines to debug.
for (const HostSharedPtr& host : hosts_removed) {
ENVOY_LOG(debug, "Removing host {}.", host->address()->asString());
host_map_.remove(host);
}
for (const HostSharedPtr& host : hosts_added) {
if (host_map_.insert(host)) {
ENVOY_LOG(debug, "Adding host {}.", host->address()->asString());
}
}
});
}

HostConstSharedPtr OriginalDstCluster::LoadBalancer::chooseHost(LoadBalancerContext* context) {
if (context) {

// Check if override host header is present, if yes use it otherwise check local address.
Network::Address::InstanceConstSharedPtr dst_host = nullptr;
if (use_http_header_) {
if (parent_->use_http_header_) {
dst_host = requestOverrideHost(context);
}
if (dst_host == nullptr) {
Expand All @@ -63,10 +34,10 @@ HostConstSharedPtr OriginalDstCluster::LoadBalancer::chooseHost(LoadBalancerCont

if (dst_host) {
const Network::Address::Instance& dst_addr = *dst_host.get();

// Check if a host with the destination address is already in the host set.
HostSharedPtr host = host_map_.find(dst_addr);
if (host) {
auto it = host_map_->find(dst_addr.asString());
if (it != host_map_->end()) {
HostSharedPtr host(it->second); // takes a reference
ENVOY_LOG(debug, "Using existing host {}.", host->address()->asString());
host->used(true); // Mark as used.
return host;
Expand All @@ -77,29 +48,24 @@ HostConstSharedPtr OriginalDstCluster::LoadBalancer::chooseHost(LoadBalancerCont
Network::Address::InstanceConstSharedPtr host_ip_port(
Network::Utility::copyInternetAddressAndPort(*dst_ip));
// Create a host we can use immediately.
host.reset(
new HostImpl(info_, info_->name() + dst_addr.asString(), std::move(host_ip_port),
envoy::api::v2::core::Metadata::default_instance(), 1,
envoy::api::v2::core::Locality().default_instance(),
envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance(),
0, envoy::api::v2::core::HealthStatus::UNKNOWN));

auto info = parent_->info();
HostSharedPtr host(std::make_shared<HostImpl>(
info, info->name() + dst_addr.asString(), std::move(host_ip_port),
envoy::api::v2::core::Metadata::default_instance(), 1,
envoy::api::v2::core::Locality().default_instance(),
envoy::api::v2::endpoint::Endpoint::HealthCheckConfig().default_instance(), 0,
envoy::api::v2::core::HealthStatus::UNKNOWN));
ENVOY_LOG(debug, "Created host {}.", host->address()->asString());
// Add the new host to the map. We just failed to find it in
// our local map above, so insert without checking (2nd arg == false).
host_map_.insert(host, false);

if (std::shared_ptr<OriginalDstCluster> parent = parent_.lock()) {
// lambda cannot capture a member by value.
std::weak_ptr<OriginalDstCluster> post_parent = parent_;
parent->dispatcher_.post([post_parent, host]() mutable {
// The main cluster may have disappeared while this post was queued.
if (std::shared_ptr<OriginalDstCluster> parent = post_parent.lock()) {
parent->addHost(host);
}
});
}

// Tell the cluster about the new host
// lambda cannot capture a member by value.
std::weak_ptr<OriginalDstCluster> post_parent = parent_;
parent_->dispatcher_.post([post_parent, host]() mutable {
// The main cluster may have disappeared while this post was queued.
if (std::shared_ptr<OriginalDstCluster> parent = post_parent.lock()) {
parent->addHost(host);
}
});
return host;
} else {
ENVOY_LOG(debug, "Failed to create host for {}.", dst_addr.asString());
Expand All @@ -126,7 +92,7 @@ OriginalDstCluster::LoadBalancer::requestOverrideHost(LoadBalancerContext* conte
ENVOY_LOG(debug, "Using request override host {}.", request_override_host);
} catch (const Envoy::EnvoyException& e) {
ENVOY_LOG(debug, "original_dst_load_balancer: invalid override header value. {}", e.what());
info_->stats().original_dst_host_invalid_.inc();
parent_->info()->stats().original_dst_host_invalid_.inc();
}
}
return request_host;
Expand All @@ -140,7 +106,11 @@ OriginalDstCluster::OriginalDstCluster(
dispatcher_(factory_context.dispatcher()),
cleanup_interval_ms_(
std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, cleanup_interval, 5000))),
cleanup_timer_(dispatcher_.createTimer([this]() -> void { cleanup(); })) {
cleanup_timer_(dispatcher_.createTimer([this]() -> void { cleanup(); })),
use_http_header_(info_->lbOriginalDstConfig()
? info_->lbOriginalDstConfig().value().use_http_header()
: false),
host_map_(std::make_shared<HostMap>()) {
// TODO(dio): Remove hosts check once the hosts field is removed.
if (config.has_load_assignment() || !config.hosts().empty()) {
throw EnvoyException("ORIGINAL_DST clusters must have no load assignment or hosts configured");
Expand All @@ -149,41 +119,53 @@ OriginalDstCluster::OriginalDstCluster(
}

void OriginalDstCluster::addHost(HostSharedPtr& host) {
// Given the current config, only EDS clusters support multiple priorities.
ASSERT(priority_set_.hostSetsPerPriority().size() == 1);
const auto& first_host_set = priority_set_.getOrCreateHostSet(0);
HostVectorSharedPtr new_hosts(new HostVector(first_host_set.hosts()));
new_hosts->emplace_back(host);
priority_set_.updateHosts(0,
HostSetImpl::partitionHosts(new_hosts, HostsPerLocalityImpl::empty()),
{}, {std::move(host)}, {}, absl::nullopt);
HostMapSharedPtr new_host_map = std::make_shared<HostMap>(*getCurrentHostMap());
auto pair = new_host_map->emplace(host->address()->asString(), host);
bool added = pair.second;
if (added) {
ENVOY_LOG(debug, "addHost() adding {}", host->address()->asString());
setHostMap(new_host_map);
// Given the current config, only EDS clusters support multiple priorities.
ASSERT(priority_set_.hostSetsPerPriority().size() == 1);
const auto& first_host_set = priority_set_.getOrCreateHostSet(0);
HostVectorSharedPtr all_hosts(new HostVector(first_host_set.hosts()));
all_hosts->emplace_back(host);
priority_set_.updateHosts(0,
HostSetImpl::partitionHosts(all_hosts, HostsPerLocalityImpl::empty()),
{}, {std::move(host)}, {}, absl::nullopt);
}
}

void OriginalDstCluster::cleanup() {
HostVectorSharedPtr new_hosts(new HostVector);
HostVectorSharedPtr keeping_hosts(new HostVector);
HostVector to_be_removed;
// Given the current config, only EDS clusters support multiple priorities.
ASSERT(priority_set_.hostSetsPerPriority().size() == 1);
const auto& host_set = priority_set_.getOrCreateHostSet(0);
ENVOY_LOG(trace, "Stale original dst hosts cleanup triggered.");
if (!host_set.hosts().empty()) {
ENVOY_LOG(debug, "Cleaning up stale original dst hosts.");
for (const HostSharedPtr& host : host_set.hosts()) {
auto host_map = getCurrentHostMap();
if (!host_map->empty()) {
ENVOY_LOG(trace, "Cleaning up stale original dst hosts.");
for (const auto& pair : *host_map) {
const std::string& addr = pair.first;
const HostSharedPtr& host = pair.second;
if (host->used()) {
ENVOY_LOG(debug, "Keeping active host {}.", host->address()->asString());
new_hosts->emplace_back(host);
ENVOY_LOG(trace, "Keeping active host {}.", addr);
keeping_hosts->emplace_back(host);
host->used(false); // Mark to be removed during the next round.
} else {
ENVOY_LOG(debug, "Removing stale host {}.", host->address()->asString());
ENVOY_LOG(trace, "Removing stale host {}.", addr);
to_be_removed.emplace_back(host);
}
}
}

if (!to_be_removed.empty()) {
priority_set_.updateHosts(0,
HostSetImpl::partitionHosts(new_hosts, HostsPerLocalityImpl::empty()),
{}, {}, to_be_removed, absl::nullopt);
HostMapSharedPtr new_host_map = std::make_shared<HostMap>(*host_map);
for (const HostSharedPtr& host : to_be_removed) {
new_host_map->erase(host->address()->asString());
}
setHostMap(new_host_map);
priority_set_.updateHosts(
0, HostSetImpl::partitionHosts(keeping_hosts, HostsPerLocalityImpl::empty()), {}, {},
to_be_removed, absl::nullopt);
}

cleanup_timer_->enableTimer(cleanup_interval_ms_);
Expand All @@ -194,10 +176,11 @@ OriginalDstClusterFactory::createClusterImpl(
const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Stats::ScopePtr&& stats_scope) {
if (cluster.lb_policy() != envoy::api::v2::Cluster::ORIGINAL_DST_LB) {
if (cluster.lb_policy() != envoy::api::v2::Cluster::ORIGINAL_DST_LB &&
cluster.lb_policy() != envoy::api::v2::Cluster::CLUSTER_PROVIDED) {
throw EnvoyException(fmt::format(
"cluster: LB policy {} is not valid for Cluster type {}. Only 'original_dst_lb' "
"is allowed with cluster type 'original_dst'",
"cluster: LB policy {} is not valid for Cluster type {}. Only 'CLUSTER_PROVIDED' or "
"'ORIGINAL_DST_LB' is allowed with cluster type 'ORIGINAL_DST'",
envoy::api::v2::Cluster_LbPolicy_Name(cluster.lb_policy()),
envoy::api::v2::Cluster_DiscoveryType_Name(cluster.type())));
}
Expand All @@ -209,14 +192,15 @@ OriginalDstClusterFactory::createClusterImpl(
// TODO(mattklein123): The original DST load balancer type should be deprecated and instead
// the cluster should directly supply the load balancer. This will remove
// a special case and allow this cluster to be compiled out as an extension.
return std::make_pair(
auto new_cluster =
std::make_shared<OriginalDstCluster>(cluster, context.runtime(), socket_factory_context,
std::move(stats_scope), context.addedViaApi()),
nullptr);
std::move(stats_scope), context.addedViaApi());
auto lb = std::make_unique<OriginalDstCluster::ThreadAwareLoadBalancer>(new_cluster);
return std::make_pair(new_cluster, std::move(lb));
}

/**
* Static registration for the strict dns cluster factory. @see RegisterFactory.
* Static registration for the original dst cluster factory. @see RegisterFactory.
*/
REGISTER_FACTORY(OriginalDstClusterFactory, ClusterFactory);

Expand Down
Loading

0 comments on commit e03936e

Please sign in to comment.