Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: start storing actual deltas in edge #742

Merged
merged 18 commits into from
Feb 18, 2025
17 changes: 12 additions & 5 deletions server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use unleash_types::client_features::ClientFeatures;
use unleash_yggdrasil::{EngineState, UpdateMessage};

use crate::cli::RedisMode;
use crate::delta_cache::DeltaCache;
use crate::feature_cache::FeatureCache;
use crate::http::refresher::feature_refresher::{FeatureRefreshConfig, FeatureRefresherMode};
use crate::http::unleash_client::{new_reqwest_client, ClientMetaInformation};
Expand All @@ -30,6 +31,7 @@ use crate::{
type CacheContainer = (
Arc<DashMap<String, EdgeToken>>,
Arc<FeatureCache>,
Arc<DashMap<String, DeltaCache>>,
Arc<DashMap<String, EngineState>>,
);
type EdgeInfo = (
Expand All @@ -42,16 +44,19 @@ type EdgeInfo = (
fn build_caches() -> CacheContainer {
let token_cache: DashMap<String, EdgeToken> = DashMap::default();
let features_cache: DashMap<String, ClientFeatures> = DashMap::default();
let delta_cache: DashMap<String, DeltaCache> = DashMap::default();
let engine_cache: DashMap<String, EngineState> = DashMap::default();
(
Arc::new(token_cache),
Arc::new(FeatureCache::new(features_cache)),
Arc::new(delta_cache),
Arc::new(engine_cache),
)
}

async fn hydrate_from_persistent_storage(cache: CacheContainer, storage: Arc<dyn EdgePersistence>) {
let (token_cache, features_cache, engine_cache) = cache;
let (token_cache, features_cache, _delta_cache, engine_cache) = cache;
// TODO: do we need to hydrate from persistant storage for delta?
let tokens = storage.load_tokens().await.unwrap_or_else(|error| {
warn!("Failed to load tokens from cache {error:?}");
vec![]
Expand Down Expand Up @@ -84,7 +89,7 @@ pub(crate) fn build_offline_mode(
client_tokens: Vec<String>,
frontend_tokens: Vec<String>,
) -> EdgeResult<CacheContainer> {
let (token_cache, features_cache, engine_cache) = build_caches();
let (token_cache, features_cache, _delta_cache, engine_cache) = build_caches();

let edge_tokens: Vec<EdgeToken> = tokens
.iter()
Expand Down Expand Up @@ -135,7 +140,7 @@ pub(crate) fn build_offline_mode(
client_features.clone(),
)
}
Ok((token_cache, features_cache, engine_cache))
Ok((token_cache, features_cache, _delta_cache, engine_cache))
}

fn build_offline(offline_args: OfflineArgs) -> EdgeResult<CacheContainer> {
Expand Down Expand Up @@ -235,7 +240,7 @@ async fn build_edge(
));
}

let (token_cache, feature_cache, engine_cache) = build_caches();
let (token_cache, feature_cache, delta_cache, engine_cache) = build_caches();

let persistence = get_data_source(args).await;

Expand Down Expand Up @@ -276,6 +281,7 @@ async fn build_edge(
let feature_refresher = Arc::new(FeatureRefresher::new(
unleash_client,
feature_cache.clone(),
delta_cache.clone(),
engine_cache.clone(),
persistence.clone(),
feature_config,
Expand All @@ -287,6 +293,7 @@ async fn build_edge(
(
token_cache.clone(),
feature_cache.clone(),
delta_cache.clone(),
engine_cache.clone(),
),
persistence,
Expand All @@ -307,7 +314,7 @@ async fn build_edge(
.await;
}
Ok((
(token_cache, feature_cache, engine_cache),
(token_cache, feature_cache, delta_cache, engine_cache),
Some(token_validator),
Some(feature_refresher),
persistence,
Expand Down
8 changes: 7 additions & 1 deletion server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,13 @@ pub struct EdgeArgs {
pub delta: bool,

/// If set to true, it compares features payload with delta payload and logs diff. This flag is for internal testing only. Do not turn this on for production configurations
#[clap(long, env, default_value_t = false, conflicts_with = "delta", hide = true)]
#[clap(
long,
env,
default_value_t = false,
conflicts_with = "delta",
hide = true
)]
pub delta_diff: bool,

/// Sets a remote write url for prometheus metrics, if this is set, prometheus metrics will be written upstream
Expand Down
54 changes: 52 additions & 2 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::cli::{EdgeArgs, EdgeMode};
use crate::delta_cache::DeltaCache;
use crate::error::EdgeError;
use crate::feature_cache::FeatureCache;
use crate::filters::{
filter_client_features, name_match_filter, name_prefix_filter, project_filter, FeatureFilterSet,
filter_client_features, filter_delta_events, name_match_filter, name_prefix_filter,
project_filter, FeatureFilterSet,
};
use crate::http::broadcaster::Broadcaster;
use crate::http::refresher::delta_refresher::Environment;
use crate::http::refresher::feature_refresher::FeatureRefresher;
use crate::metrics::client_metrics::MetricsCache;
use crate::tokens::cache_key;
Expand All @@ -15,7 +18,7 @@ use actix_web::web::{self, Data, Json, Query};
use actix_web::Responder;
use actix_web::{get, post, HttpRequest, HttpResponse};
use dashmap::DashMap;
use unleash_types::client_features::{ClientFeature, ClientFeatures};
use unleash_types::client_features::{ClientFeature, ClientFeatures, ClientFeaturesDelta};
use unleash_types::client_metrics::{ClientApplication, ClientMetrics, ConnectVia};

#[utoipa::path(
Expand All @@ -41,6 +44,17 @@ pub async fn get_features(
resolve_features(edge_token, features_cache, token_cache, filter_query, req).await
}

#[get("/delta")]
pub async fn get_delta(
edge_token: EdgeToken,
delta_cache: Data<DashMap<String, DeltaCache>>,
token_cache: Data<DashMap<String, EdgeToken>>,
filter_query: Query<FeatureFilters>,
req: HttpRequest,
) -> EdgeJsonResult<ClientFeaturesDelta> {
resolve_delta(edge_token, delta_cache, token_cache, filter_query, req).await
}

#[get("/streaming")]
pub async fn stream_features(
edge_token: EdgeToken,
Expand Down Expand Up @@ -147,6 +161,30 @@ async fn resolve_features(
..client_features
}))
}

async fn resolve_delta(
edge_token: EdgeToken,
delta_cache: Data<DashMap<Environment, DeltaCache>>,
token_cache: Data<DashMap<String, EdgeToken>>,
filter_query: Query<FeatureFilters>,
req: HttpRequest,
) -> EdgeJsonResult<ClientFeaturesDelta> {
let (validated_token, filter_set, ..) =
get_feature_filter(&edge_token, &token_cache, filter_query.clone())?;
let delta = match req.app_data::<Data<FeatureRefresher>>() {
Some(refresher) => {
refresher
.delta_events_for_filter(validated_token.clone(), &filter_set)
.await
}
None => delta_cache
.get(&cache_key(&validated_token))
.map(|cache| filter_delta_events(cache.value(), &filter_set))
.ok_or(EdgeError::ClientCacheError),
}?;

Ok(Json(delta))
}
#[utoipa::path(
context_path = "/api/client",
params(("feature_name" = String, Path,)),
Expand Down Expand Up @@ -278,6 +316,7 @@ pub fn configure_client_api(cfg: &mut web::ServiceConfig) {
crate::middleware::validate_token::validate_token,
))
.service(get_features)
.service(get_delta)
.service(get_feature)
.service(register)
.service(metrics)
Expand Down Expand Up @@ -688,11 +727,13 @@ mod tests {
token.token_type = Some(TokenType::Client);
let upstream_token_cache = Arc::new(DashMap::default());
let upstream_features_cache = Arc::new(FeatureCache::default());
let upstream_delta_cache = Arc::new(DashMap::default());
let upstream_engine_cache = Arc::new(DashMap::default());
upstream_token_cache.insert(token.token.clone(), token.clone());
let srv = upstream_server(
upstream_token_cache,
upstream_features_cache,
upstream_delta_cache,
upstream_engine_cache,
)
.await;
Expand Down Expand Up @@ -727,11 +768,13 @@ mod tests {
frontend_token.token_type = Some(TokenType::Frontend);
let upstream_token_cache = Arc::new(DashMap::default());
let upstream_features_cache = Arc::new(FeatureCache::default());
let upstream_delta_cache = Arc::new(DashMap::default());
let upstream_engine_cache = Arc::new(DashMap::default());
upstream_token_cache.insert(frontend_token.token.clone(), frontend_token.clone());
let srv = upstream_server(
upstream_token_cache,
upstream_features_cache,
upstream_delta_cache,
upstream_engine_cache,
)
.await;
Expand Down Expand Up @@ -993,10 +1036,12 @@ mod tests {
async fn calling_client_features_endpoint_with_new_token_hydrates_from_upstream_when_dynamic() {
let upstream_features_cache = Arc::new(FeatureCache::default());
let upstream_token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let upstream_delta_cache: Arc<DashMap<String, DeltaCache>> = Arc::new(DashMap::default());
let upstream_engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let server = upstream_server(
upstream_token_cache.clone(),
upstream_features_cache.clone(),
upstream_delta_cache.clone(),
upstream_engine_cache.clone(),
)
.await;
Expand All @@ -1017,6 +1062,7 @@ mod tests {
unleash_client: unleash_client.clone(),
tokens_to_refresh: Arc::new(Default::default()),
features_cache: features_cache.clone(),
delta_cache: Arc::new(Default::default()),
engine_cache: engine_cache.clone(),
refresh_interval: Duration::seconds(6000),
persistence: None,
Expand Down Expand Up @@ -1057,10 +1103,12 @@ mod tests {
async fn calling_client_features_endpoint_with_new_token_does_not_hydrate_when_strict() {
let upstream_features_cache = Arc::new(FeatureCache::default());
let upstream_token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let upstream_delta_cache: Arc<DashMap<String, DeltaCache>> = Arc::new(DashMap::default());
let upstream_engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let server = upstream_server(
upstream_token_cache.clone(),
upstream_features_cache.clone(),
upstream_delta_cache.clone(),
upstream_engine_cache.clone(),
)
.await;
Expand Down Expand Up @@ -1179,10 +1227,12 @@ mod tests {
{
let upstream_features_cache: Arc<FeatureCache> = Arc::new(FeatureCache::default());
let upstream_token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let upstream_delta_cache: Arc<DashMap<String, DeltaCache>> = Arc::new(DashMap::default());
let upstream_engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let server = upstream_server(
upstream_token_cache.clone(),
upstream_features_cache.clone(),
upstream_delta_cache.clone(),
upstream_engine_cache.clone(),
)
.await;
Expand Down
Loading
Loading