Skip to content

Commit

Permalink
eds: introducing EDS resources cache (#28079)
Browse files Browse the repository at this point in the history
Signed-off-by: Adi Suissa-Peleg <[email protected]>
  • Loading branch information
adisuissa authored Jun 28, 2023
1 parent bcad5b1 commit 057c80b
Show file tree
Hide file tree
Showing 7 changed files with 601 additions and 0 deletions.
8 changes: 8 additions & 0 deletions envoy/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,11 @@ envoy_cc_library(
"@com_google_googleapis//google/rpc:status_cc_proto",
],
)

envoy_cc_library(
name = "eds_resources_cache_interface",
hdrs = ["eds_resources_cache.h"],
deps = [
"@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto",
],
)
114 changes: 114 additions & 0 deletions envoy/config/eds_resources_cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#pragma once

#include "envoy/common/optref.h"
#include "envoy/common/pure.h"
#include "envoy/config/endpoint/v3/endpoint.pb.h"

#include "absl/strings/string_view.h"

namespace Envoy {
namespace Config {

// An interface for cached resource removed callback.
class EdsResourceRemovalCallback {
public:
virtual ~EdsResourceRemovalCallback() = default;

// Invoked when a cached resource is removed from the cache.
virtual void onCachedResourceRemoved(absl::string_view resource_name) PURE;
};

// Represents an xDS resources cache for EDS resources, and currently supports
// a single config-source (ADS). The motivation is that clusters that are
// updated (not added) during a CDS response will be able to use the current EDS
// configuration, thus avoiding the need of the xDS server to send an additional
// EDS response that is identical to what was already sent.
// However, using the cached EDS config is not always desired, for example when
// the cluster changes from non-TLS to TLS (see discussion in
// https://github.com/envoyproxy/envoy/issues/5168). Thus, this cache allows
// the EDS subscription to decide whether to use the cache or not.
//
// This cache will be instantiated once and owned by the ADS Mux, and passed to
// the EDS subscriptions.
//
// Resources lifetime in the cache is determined by the gRPC mux that adds/updates a
// resource when it receives its contents, and removes a resource when there is
// no longer interest in that resource.
// An EDS subscription may fetch a resource from the cache, and optionally
// install a callback to be triggered if the resource is removed from the cache.
// In addition, a resource in the cache may have an expiration timer if
// "endpoint_stale_after" (TTL) is set for that resource. Once the timer
// expires, the callbacks will be triggered to remove the resource.
class EdsResourcesCache {
public:
virtual ~EdsResourcesCache() = default;

/**
* Adds or updates a given resource name with its resource.
* Any callback that was previously assigned to the resource will be removed
* without any notification.
* @param resource_name the name of the resource to add/update.
* @param resource the contents of the resource.
*/
virtual void setResource(absl::string_view resource_name,
const envoy::config::endpoint::v3::ClusterLoadAssignment& resource) PURE;

/**
* Removes a resource from the resource cache given the resource name.
* The callbacks for the resource will be invoked, notifying that the resource
* is removed.
* @param resource_name the name of the resource that will be removed from
* the cache.
*/
virtual void removeResource(absl::string_view resource_name) PURE;

/**
* Retrieves a resource from the cache, and adds the given callback (if any)
* to the resource's removal list. if the resource is removed, all callbacks
* for that resource will be invoked.
* @param resource_name the name of the resource to fetch.
* @param removal_cb an optional callback that will be invoked if the resource is removed
* in the future. Note that updating the resource (`setResource()`) will also
* remove the callback. The caller of this function can also call
* `removeCallback()` to explicitly remove the callback. The callback
* is owned by the caller as it is part of the EDS subscription.
* @return A reference to the cluster load assignment resource, or nullopt if the
* resource doesn't exist.
*/
virtual OptRef<const envoy::config::endpoint::v3::ClusterLoadAssignment>
getResource(absl::string_view resource_name, EdsResourceRemovalCallback* removal_cb) PURE;

/**
* Removes a callback for a given resource name (if it was previously added).
* @param resource_name the name of the resource for which the callback should be removed.
* @param removal_cb a pointer to the callback that needs to be removed.
*/
virtual void removeCallback(absl::string_view resource_name,
EdsResourceRemovalCallback* removal_cb) PURE;

/**
* Sets an expiry timer for the given resource_name after the given ms milliseconds.
* Once the timer expires, the callbacks for that resource (if any) will be
* @param resource_name the name of the resource for which the timer should be added.
* @param ms the number of milliseconds until expiration.
*/
virtual void setExpiryTimer(absl::string_view resource_name, std::chrono::milliseconds ms) PURE;

/**
* Disables the expiration timer for the given resource_name.
* @param resource_name the name of the resource for which the timer should be disabled.
*/
virtual void disableExpiryTimer(absl::string_view resource_name) PURE;

/**
* Returns the number of items in the cache. Only used in tests.
* @return the number of items in the cache.
*/
virtual uint32_t cacheSizeForTest() const PURE;
};

using EdsResourcesCachePtr = std::unique_ptr<EdsResourcesCache>;
using EdsResourcesCacheOptRef = OptRef<EdsResourcesCache>;

} // namespace Config
} // namespace Envoy
11 changes: 11 additions & 0 deletions source/extensions/config_subscription/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,14 @@ envoy_cc_library(
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "eds_resources_cache_lib",
srcs = ["eds_resources_cache_impl.cc"],
hdrs = ["eds_resources_cache_impl.h"],
deps = [
"//envoy/config:eds_resources_cache_interface",
"//envoy/event:dispatcher_interface",
"//source/common/common:minimal_logger_lib",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#include "source/extensions/config_subscription/grpc/eds_resources_cache_impl.h"

#include "source/common/common/logger.h"

namespace Envoy {
namespace Config {

void EdsResourcesCacheImpl::setResource(
absl::string_view resource_name,
const envoy::config::endpoint::v3::ClusterLoadAssignment& resource) {
resources_map_.insert_or_assign(resource_name, ResourceData(resource));
}

void EdsResourcesCacheImpl::removeResource(absl::string_view resource_name) {
if (const auto& resource_it = resources_map_.find(resource_name);
resource_it != resources_map_.end()) {
// Invoke the callbacks, and remove all watchers.
for (auto& removal_cb : resource_it->second.removal_cbs_) {
removal_cb->onCachedResourceRemoved(resource_name);
}
resource_it->second.removal_cbs_.clear();

// Remove the resource entry from the cache.
resources_map_.erase(resource_it);
}
// Remove the expiration timers (if any) as there's no longer interest in the resource.
expiry_timers_.erase(resource_name);
}

OptRef<const envoy::config::endpoint::v3::ClusterLoadAssignment>
EdsResourcesCacheImpl::getResource(absl::string_view resource_name,
EdsResourceRemovalCallback* removal_cb) {
if (const auto& resource_it = resources_map_.find(resource_name);
resource_it != resources_map_.end()) {
ENVOY_LOG_MISC(trace, "Returning resource {} from the xDS resource cache", resource_name);
// Add the removal callback to the list associated with the resource.
if (removal_cb != nullptr) {
resource_it->second.removal_cbs_.push_back(removal_cb);
}
return resource_it->second.resource_;
}
// The resource doesn't exist in the resource map.
return {};
}

void EdsResourcesCacheImpl::removeCallback(absl::string_view resource_name,
EdsResourceRemovalCallback* removal_cb) {
if (const auto& resource_it = resources_map_.find(resource_name);
resource_it != resources_map_.end()) {
ENVOY_LOG_MISC(trace, "Removing callback for resource {} from the xDS resource cache",
resource_name);
resource_it->second.removal_cbs_.erase(std::remove(resource_it->second.removal_cbs_.begin(),
resource_it->second.removal_cbs_.end(),
removal_cb));
}
}

uint32_t EdsResourcesCacheImpl::cacheSizeForTest() const { return resources_map_.size(); }

void EdsResourcesCacheImpl::setExpiryTimer(absl::string_view resource_name,
std::chrono::milliseconds ms) {
auto it = expiry_timers_.find(resource_name);
if (it == expiry_timers_.end()) {
// No timer for this resource, create one, and create a copy of resource_name that will outlive
// this function.
Event::TimerPtr resource_timeout =
dispatcher_.createTimer([this, str_resource_name = std::string(resource_name)]() -> void {
// On expiration the resource is removed (from the cache and from the watchers).
removeResource(str_resource_name);
});
it = expiry_timers_.emplace(resource_name, std::move(resource_timeout)).first;
}
(it->second)->enableTimer(ms);
}

void EdsResourcesCacheImpl::disableExpiryTimer(absl::string_view resource_name) {
auto it = expiry_timers_.find(resource_name);
if (it != expiry_timers_.end()) {
(it->second)->disableTimer();
// Remove the timer as it is no longer needed.
expiry_timers_.erase(it);
}
}

} // namespace Config
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#pragma once

#include <memory>

#include "envoy/config/eds_resources_cache.h"
#include "envoy/event/dispatcher.h"
#include "envoy/event/timer.h"

#include "absl/container/flat_hash_map.h"

namespace Envoy {
namespace Config {

class EdsResourcesCacheImpl : public EdsResourcesCache {
public:
EdsResourcesCacheImpl(Event::Dispatcher& main_thread_dispatcher)
: dispatcher_(main_thread_dispatcher) {}

// EdsResourcesCache
void setResource(absl::string_view resource_name,
const envoy::config::endpoint::v3::ClusterLoadAssignment& resource) override;
void removeResource(absl::string_view resource_name) override;
OptRef<const envoy::config::endpoint::v3::ClusterLoadAssignment>
getResource(absl::string_view resource_name, EdsResourceRemovalCallback* removal_cb) override;
void removeCallback(absl::string_view resource_name,
EdsResourceRemovalCallback* removal_cb) override;
uint32_t cacheSizeForTest() const override;
void setExpiryTimer(absl::string_view resource_name, std::chrono::milliseconds ms) override;
void disableExpiryTimer(absl::string_view resource_name) override;

private:
// The value of the map, holds the resource and the removal callbacks.
struct ResourceData {
envoy::config::endpoint::v3::ClusterLoadAssignment resource_;
std::vector<EdsResourceRemovalCallback*> removal_cbs_;

ResourceData(const envoy::config::endpoint::v3::ClusterLoadAssignment& resource)
: resource_(resource) {}
};
// A map between a resource name and its ResourceData.
absl::flat_hash_map<std::string, ResourceData> resources_map_;
// The per-resource timeout timer to track when the resource should be removed.
absl::flat_hash_map<std::string, Event::TimerPtr> expiry_timers_;
Event::Dispatcher& dispatcher_;
};

} // namespace Config
} // namespace Envoy
11 changes: 11 additions & 0 deletions test/extensions/config_subscription/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,14 @@ envoy_cc_test(
"//source/extensions/config_subscription/grpc:xds_source_id_lib",
],
)

envoy_cc_test(
name = "eds_resources_cache_impl_test",
srcs = ["eds_resources_cache_impl_test.cc"],
deps = [
"//source/extensions/config_subscription/grpc:eds_resources_cache_lib",
"//test/mocks/event:event_mocks",
"//test/test_common:simulated_time_system_lib",
"//test/test_common:utility_lib",
],
)
Loading

0 comments on commit 057c80b

Please sign in to comment.