Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upstream: subsetting load balancer [rough draft] #1735

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def envoy_api_deps(skip_targets):
native.git_repository(
name = "envoy_api",
remote = REPO_LOCATIONS["envoy_api"],
commit = "9be6aff6da46e024af56cce20cb5d5d3184f19c5",
commit = "d8ac22d4ff167a1f44229264bfa8d0f9abc3608f",
)
api_bind_targets = [
"address",
Expand Down
2 changes: 1 addition & 1 deletion bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
REPO_LOCATIONS = {
"jinja2": "https://github.com/pallets/jinja.git",
"grpc_transcoding": "https://github.com/grpc-ecosystem/grpc-httpjson-transcoding.git",
"envoy_api": "https://github.com/envoyproxy/data-plane-api.git",
"envoy_api": "https://github.com/turbinelabs/data-plane-api.git",
"protobuf" :"https://github.com/htuch/protobuf.git",
"markupsafe": "https://github.com/pallets/markupsafe.git",
}
3 changes: 3 additions & 0 deletions include/envoy/router/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ envoy_cc_library(
hdrs = ["router.h"],
deps = [
"//include/envoy/common:optional",
"//include/envoy/http:access_log_interface",
"//include/envoy/http:codec_interface",
"//include/envoy/http:header_map_interface",
"//include/envoy/tracing:http_tracer_interface",
"//include/envoy/upstream:resource_manager_interface",
"//source/common/protobuf",
],
)

Expand Down
38 changes: 38 additions & 0 deletions include/envoy/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include "envoy/tracing/http_tracer.h"
#include "envoy/upstream/resource_manager.h"

#include "common/protobuf/protobuf.h"

namespace Envoy {
namespace Router {

Expand Down Expand Up @@ -224,6 +226,40 @@ class HashPolicy {
virtual Optional<uint64_t> generateHash(const Http::HeaderMap& headers) const PURE;
};

class MetadataMatch {
public:
virtual ~MetadataMatch() {}

/*
* @return const std::string& the name of the metadata key
*/
virtual const std::string& name() const PURE;

/*
* @return const ProtobufWkt::Value& the value for the metadata key
*/
virtual const ProtobufWkt::Value& value() const PURE;

/*
* @return uint64_t a precomputed hash of the value
*/
virtual uint64_t valueHash() const PURE;
};

typedef std::shared_ptr<const MetadataMatch> MetadataMatchConstSharedPtr;

class MetadataMatches {
public:
virtual ~MetadataMatches() {}

/*
* @return std::vector<MetadataMatchConstSharedPtr>& a vector of
* metadata to be matched against upstream endpoints when load
* balancing, sorted lexically by name.
*/
virtual const std::vector<MetadataMatchConstSharedPtr>& metadataMatches() const PURE;
};

/**
* An individual resolved route entry.
*/
Expand Down Expand Up @@ -305,6 +341,8 @@ class RouteEntry {
*/
virtual bool useWebSocket() const PURE;

virtual const MetadataMatches* metadataMatches() const PURE;

/**
* @return const std::multimap<std::string, std::string> the opaque configuration associated
* with the route
Expand Down
9 changes: 8 additions & 1 deletion include/envoy/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,19 @@ envoy_cc_library(
envoy_cc_library(
name = "load_balancer_interface",
hdrs = ["load_balancer.h"],
deps = [":upstream_interface"],
deps = [
":upstream_interface",
"//include/envoy/router:router_interface",
],
)

envoy_cc_library(
name = "load_balancer_type_interface",
hdrs = ["load_balancer_type.h"],
external_deps = ["envoy_cds"],
deps = [
"//source/common/protobuf",
],
)

envoy_cc_library(
Expand Down
3 changes: 3 additions & 0 deletions include/envoy/upstream/load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <memory>

#include "envoy/common/pure.h"
#include "envoy/router/router.h"
#include "envoy/upstream/upstream.h"

namespace Envoy {
Expand All @@ -22,6 +23,8 @@ class LoadBalancerContext {
*/
virtual Optional<uint64_t> hashKey() const PURE;

virtual const Router::MetadataMatches* metadataMatches() const PURE;

/**
* @return const Network::Connection* the incoming connection or nullptr to use during load
* balancing.
Expand Down
26 changes: 26 additions & 0 deletions include/envoy/upstream/load_balancer_type.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
#pragma once

#include <string>
#include <vector>

#include "common/protobuf/protobuf.h"

#include "api/cds.pb.h"

namespace Envoy {
namespace Upstream {

Expand All @@ -8,5 +15,24 @@ namespace Upstream {
*/
enum class LoadBalancerType { RoundRobin, LeastRequest, Random, RingHash, OriginalDst };

class LoadBalancerSubsetInfo {
public:
virtual ~LoadBalancerSubsetInfo() {}

virtual bool isEnabled() const PURE;

virtual envoy::api::v2::Cluster::LbSubsetConfig::LbSubsetFallbackPolicy
fallbackPolicy() const PURE;

virtual const ProtobufWkt::Struct& defaultSubset() const PURE;

/*
* @return const std:vector<std:vector<std::string>>& a vector of
* keys used to define load balancer subsets. The inner vector is
* lexically sorted.
*/
virtual const std::vector<std::vector<std::string>>& subsetKeys() const PURE;
};

} // namespace Upstream
} // namespace Envoy
2 changes: 2 additions & 0 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ class ClusterInfo {
* @return a source address to bind to or nullptr if no bind need occur.
*/
virtual const Network::Address::InstanceConstSharedPtr& sourceAddress() const PURE;

virtual const LoadBalancerSubsetInfo& lbSubsetInfo() const PURE;
};

typedef std::shared_ptr<const ClusterInfo> ClusterInfoConstSharedPtr;
Expand Down
1 change: 1 addition & 0 deletions source/common/filter/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ envoy_cc_library(
"//include/envoy/event:timer_interface",
"//include/envoy/network:connection_interface",
"//include/envoy/network:filter_interface",
"//include/envoy/router:router_interface",
"//include/envoy/stats:stats_interface",
"//include/envoy/stats:stats_macros",
"//include/envoy/upstream:cluster_manager_interface",
Expand Down
1 change: 1 addition & 0 deletions source/common/filter/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class TcpProxy : public Network::ReadFilter,

// Upstream::LoadBalancerContext
Optional<uint64_t> hashKey() const override { return {}; }
const Router::MetadataMatches* metadataMatches() const override { return nullptr; }
const Network::Connection* downstreamConnection() const override {
return &read_callbacks_->connection();
}
Expand Down
1 change: 1 addition & 0 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class AsyncStreamImpl : public AsyncClient::Stream,
void finalizeRequestHeaders(Http::HeaderMap&,
const Http::AccessLog::RequestInfo&) const override {}
const Router::HashPolicy* hashPolicy() const override { return nullptr; }
const Router::MetadataMatches* metadataMatches() const override { return nullptr; }
Upstream::ResourcePriority priority() const override {
return Upstream::ResourcePriority::Default;
}
Expand Down
30 changes: 30 additions & 0 deletions source/common/protobuf/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,34 @@ void MessageUtil::jsonConvert(const Protobuf::Message& source, Protobuf::Message
MessageUtil::loadFromJson(json, dest);
}

bool ValueUtil::equal(const ProtobufWkt::Value& v1, const ProtobufWkt::Value& v2) {
ProtobufWkt::Value::KindCase kind = v1.kind_case();
if (kind != v2.kind_case()) {
return false;
}

switch (kind) {
case ProtobufWkt::Value::kNullValue:
return true;

case ProtobufWkt::Value::kNumberValue:
return v1.number_value() == v2.number_value();

case ProtobufWkt::Value::kStringValue:
return v1.string_value() == v2.string_value();

case ProtobufWkt::Value::kBoolValue:
return v1.bool_value() == v2.bool_value();

case ProtobufWkt::Value::kStructValue:
return ProtobufUtil::MessageDifferencer::Equals(v1.struct_value(), v2.struct_value());

case ProtobufWkt::Value::kListValue:
return ProtobufUtil::MessageDifferencer::Equals(v1.list_value(), v2.list_value());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we just use MessageDifferencer::Equals on v1 and v2?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattklein123 voiced concern about using protobuf reflection (which is what the MessageDifferencer seems to use, in my casual inspection of its implementation). The equality check is used in the load balancer path (albeit only after a hash comparison).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to be swayed here, but IMO we should try to make this path as fast as possible and we should avoid reflection if possible in the fast path. I'm not super familiar with this stuff. This function basically does not use reflection unless it involves an embedded struct or list?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this implementation only uses reflection for embedded lists and structs. That said, I believe I can write a version that doesn't use reflection at all, since the values in the list/struct are just ProtobufWkt::Value instances.


default:
RELEASE_ASSERT(false);
}
}

} // namespace Envoy
7 changes: 7 additions & 0 deletions source/common/protobuf/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,11 @@ class MessageUtil {
}
};

class ValueUtil {
public:
static std::size_t hash(const ProtobufWkt::Value& value) { return MessageUtil::hash(value); }

static bool equal(const ProtobufWkt::Value& v1, const ProtobufWkt::Value& v2);
};

} // namespace Envoy
1 change: 1 addition & 0 deletions source/common/redis/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ envoy_cc_library(
deps = [
":codec_lib",
"//include/envoy/redis:conn_pool_interface",
"//include/envoy/router:router_interface",
"//include/envoy/thread_local:thread_local_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//source/common/buffer:buffer_lib",
Expand Down
1 change: 1 addition & 0 deletions source/common/redis/conn_pool_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class InstanceImpl : public Instance {
// TODO(danielhochman): convert to HashUtil::xxHash64 when we have a migration strategy.
// Upstream::LoadBalancerContext
Optional<uint64_t> hashKey() const override { return hash_key_; }
const Router::MetadataMatches* metadataMatches() const override { return nullptr; }
const Network::Connection* downstreamConnection() const override { return nullptr; }

const Optional<uint64_t> hash_key_;
Expand Down
57 changes: 54 additions & 3 deletions source/common/router/config_impl.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "common/router/config_impl.h"

#include <algorithm>
#include <chrono>
#include <cstdint>
#include <map>
Expand Down Expand Up @@ -89,6 +90,37 @@ Optional<uint64_t> HashPolicyImpl::generateHash(const Http::HeaderMap& headers)
return hash;
}

std::vector<MetadataMatchConstSharedPtr>
MetadataMatchesImpl::extractMetadataMatches(const MetadataMatchesImpl* parent,
const ProtobufWkt::Struct& matches) {
std::vector<MetadataMatchConstSharedPtr> v;
std::unordered_map<std::string, std::size_t> existing;

if (parent) {
for (const auto& it : parent->metadata_matches_) {
v.emplace_back(it);
existing.emplace(it->name(), v.size());
}
}

// Add values from matches, replacing key/values copied from parent.
for (const auto it : matches.fields()) {
const auto index_it = existing.find(it.first);
if (index_it != existing.end()) {
v[index_it->second] = std::make_shared<MetadataMatchImpl>(it.first, it.second);
} else {
v.emplace_back(std::make_shared<MetadataMatchImpl>(it.first, it.second));
}
}

std::sort(v.begin(), v.end(),
[](const MetadataMatchConstSharedPtr& a, const MetadataMatchConstSharedPtr& b) -> bool {
return a->name() < b->name();
});

return v;
}

DecoratorImpl::DecoratorImpl(const envoy::api::v2::Decorator& decorator)
: operation_(decorator.operation()) {}

Expand Down Expand Up @@ -116,6 +148,14 @@ RouteEntryImplBase::RouteEntryImplBase(const VirtualHostImpl& vhost,
priority_(ConfigUtility::parsePriority(route.route().priority())),
request_headers_parser_(RequestHeaderParser::parse(route.route().request_headers_to_add())),
opaque_config_(parseOpaqueConfig(route)), decorator_(parseDecorator(route)) {
if (route.route().has_metadata_match()) {
const auto filter_it = route.route().metadata_match().filter_metadata().find(
Envoy::Config::MetadataFilters::get().ENVOY_LB);
if (filter_it != route.route().metadata_match().filter_metadata().end()) {
metadata_matches_.reset(new MetadataMatchesImpl(nullptr, filter_it->second));
}
}

// If this is a weighted_cluster, we create N internal route entries
// (called WeightedClusterEntry), such that each object is a simple
// single cluster, pointing back to the parent.
Expand All @@ -125,9 +165,20 @@ RouteEntryImplBase::RouteEntryImplBase(const VirtualHostImpl& vhost,

for (const auto& cluster : route.route().weighted_clusters().clusters()) {
const std::string& cluster_name = cluster.name();
std::unique_ptr<WeightedClusterEntry> cluster_entry(
new WeightedClusterEntry(this, runtime_key_prefix + "." + cluster_name, loader_,
cluster_name, PROTOBUF_GET_WRAPPED_REQUIRED(cluster, weight)));

MetadataMatchesImplConstPtr cluster_metadata_matches;
if (cluster.has_metadata_match()) {
const auto filter_it = cluster.metadata_match().filter_metadata().find(
Envoy::Config::MetadataFilters::get().ENVOY_LB);
if (filter_it != cluster.metadata_match().filter_metadata().end()) {
cluster_metadata_matches.reset(
new MetadataMatchesImpl(metadata_matches_.get(), filter_it->second));
}
}

std::unique_ptr<WeightedClusterEntry> cluster_entry(new WeightedClusterEntry(
this, runtime_key_prefix + "." + cluster_name, loader_, cluster_name,
PROTOBUF_GET_WRAPPED_REQUIRED(cluster, weight), std::move(cluster_metadata_matches)));
weighted_clusters_.emplace_back(std::move(cluster_entry));
total_weight += weighted_clusters_.back()->clusterWeight();
}
Expand Down
Loading