From a6927a55b3a8562f6890fc9e1570af270160071a Mon Sep 17 00:00:00 2001 From: sjaanus Date: Mon, 17 Feb 2025 12:40:49 +0200 Subject: [PATCH 01/14] feat: delta endpoint in edge --- server/src/builder.rs | 17 +++++++++++----- server/src/client_api.rs | 40 +++++++++++++++++++++++++++++++++++++ server/src/feature_cache.rs | 1 + server/src/frontend_api.rs | 32 ++++++++++++++--------------- server/src/main.rs | 2 +- 5 files changed, 70 insertions(+), 22 deletions(-) diff --git a/server/src/builder.rs b/server/src/builder.rs index d2d44cfe..7a5ea524 100644 --- a/server/src/builder.rs +++ b/server/src/builder.rs @@ -26,10 +26,12 @@ use crate::{ http::{refresher::feature_refresher::FeatureRefresher, unleash_client::UnleashClient}, types::{EdgeResult, EdgeToken, TokenType}, }; +use crate::delta_cache::DeltaCache; type CacheContainer = ( Arc>, Arc, + Arc>, Arc>, ); type EdgeInfo = ( @@ -42,16 +44,19 @@ type EdgeInfo = ( fn build_caches() -> CacheContainer { let token_cache: DashMap = DashMap::default(); let features_cache: DashMap = DashMap::default(); + let delta_cache: DashMap = DashMap::default(); let engine_cache: DashMap = DashMap::default(); ( Arc::new(token_cache), Arc::new(FeatureCache::new(features_cache)), + Arc::new(delta_cache), Arc::new(engine_cache), ) } async fn hydrate_from_persistent_storage(cache: CacheContainer, storage: Arc) { - let (token_cache, features_cache, engine_cache) = cache; + let (token_cache, features_cache, delta_cache, engine_cache, ) = cache; + // TODO: do we need to hydrate from persistant storage for delta? let tokens = storage.load_tokens().await.unwrap_or_else(|error| { warn!("Failed to load tokens from cache {error:?}"); vec![] @@ -84,7 +89,7 @@ pub(crate) fn build_offline_mode( client_tokens: Vec, frontend_tokens: Vec, ) -> EdgeResult { - let (token_cache, features_cache, engine_cache) = build_caches(); + let (token_cache, features_cache, delta_cache, engine_cache) = build_caches(); let edge_tokens: Vec = tokens .iter() @@ -135,7 +140,8 @@ pub(crate) fn build_offline_mode( client_features.clone(), ) } - Ok((token_cache, features_cache, engine_cache)) + // TODO: possibly need to resolve delta cache for offline mode? + Ok((token_cache, features_cache, delta_cache, engine_cache)) } fn build_offline(offline_args: OfflineArgs) -> EdgeResult { @@ -235,7 +241,7 @@ async fn build_edge( )); } - let (token_cache, feature_cache, engine_cache) = build_caches(); + let (token_cache, feature_cache, delta_cache, engine_cache ) = build_caches(); let persistence = get_data_source(args).await; @@ -287,6 +293,7 @@ async fn build_edge( ( token_cache.clone(), feature_cache.clone(), + delta_cache.clone(), engine_cache.clone(), ), persistence, @@ -307,7 +314,7 @@ async fn build_edge( .await; } Ok(( - (token_cache, feature_cache, engine_cache), + (token_cache, feature_cache, delta_cache, engine_cache), Some(token_validator), Some(feature_refresher), persistence, diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 8b331110..d53866e4 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -17,6 +17,7 @@ use actix_web::{get, post, HttpRequest, HttpResponse}; use dashmap::DashMap; use unleash_types::client_features::{ClientFeature, ClientFeatures}; use unleash_types::client_metrics::{ClientApplication, ClientMetrics, ConnectVia}; +use crate::delta_cache::DeltaCache; #[utoipa::path( context_path = "/api/client", @@ -41,6 +42,17 @@ pub async fn get_features( resolve_features(edge_token, features_cache, token_cache, filter_query, req).await } +#[get("/delta")] +pub async fn get_delta( + edge_token: EdgeToken, + delta_cache: Data>, + token_cache: Data>, + filter_query: Query, + req: HttpRequest, +) -> EdgeJsonResult { + resolve_delta(edge_token, delta_cache, token_cache, filter_query, req).await +} + #[get("/streaming")] pub async fn stream_features( edge_token: EdgeToken, @@ -147,6 +159,34 @@ async fn resolve_features( ..client_features })) } + +async fn resolve_delta( + edge_token: EdgeToken, + delta_cache: Data, + token_cache: Data>, + filter_query: Query, + req: HttpRequest, +) -> EdgeJsonResult { + let (validated_token, filter_set, query) = + get_feature_filter(&edge_token, &token_cache, filter_query.clone())?; + + let client_features = match req.app_data::>() { + Some(refresher) => { + refresher + .features_for_filter(validated_token.clone(), &filter_set) + .await + } + None => delta_cache + .get(&cache_key(&validated_token)) + .map(|client_features| filter_client_features(&client_features, &filter_set)) + .ok_or(EdgeError::ClientCacheError), + }?; + + Ok(Json(ClientFeatures { + query: Some(query), + ..client_features + })) +} #[utoipa::path( context_path = "/api/client", params(("feature_name" = String, Path,)), diff --git a/server/src/feature_cache.rs b/server/src/feature_cache.rs index 16f28cf7..d589791f 100644 --- a/server/src/feature_cache.rs +++ b/server/src/feature_cache.rs @@ -20,6 +20,7 @@ pub struct FeatureCache { update_sender: broadcast::Sender, } + impl FeatureCache { pub fn new(features: DashMap) -> Self { let (tx, _rx) = tokio::sync::broadcast::channel::(16); diff --git a/server/src/frontend_api.rs b/server/src/frontend_api.rs index 57b26957..95a60814 100644 --- a/server/src/frontend_api.rs +++ b/server/src/frontend_api.rs @@ -917,7 +917,7 @@ mod tests { #[actix_web::test] #[traced_test] async fn calling_post_requests_resolves_context_values_correctly() { - let (token_cache, features_cache, engine_cache) = build_offline_mode( + let (token_cache, features_cache, delta_cache, engine_cache) = build_offline_mode( client_features_with_constraint_requiring_user_id_of_seven(), vec![ "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7" @@ -969,7 +969,7 @@ mod tests { #[actix_web::test] #[traced_test] async fn calling_get_requests_resolves_context_values_correctly() { - let (feature_cache, token_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( client_features_with_constraint_requiring_user_id_of_seven(), vec![ "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7" @@ -1019,7 +1019,7 @@ mod tests { #[actix_web::test] #[traced_test] async fn calling_get_requests_resolves_top_level_properties_correctly() { - let (feature_cache, token_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( client_features_with_constraint_requiring_test_property_to_be_42(), vec![ "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7" @@ -1079,7 +1079,7 @@ mod tests { #[actix_web::test] #[traced_test] async fn calling_post_requests_resolves_top_level_properties_correctly() { - let (feature_cache, token_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( client_features_with_constraint_requiring_test_property_to_be_42(), vec![ "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7" @@ -1137,7 +1137,7 @@ mod tests { #[actix_web::test] #[traced_test] async fn calling_get_requests_resolves_context_values_correctly_with_enabled_filter() { - let (token_cache, features_cache, engine_cache) = build_offline_mode( + let (token_cache, features_cache, delta_cache, engine_cache) = build_offline_mode( client_features_with_constraint_one_enabled_toggle_and_one_disabled_toggle(), vec![ "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7" @@ -1256,7 +1256,7 @@ mod tests { #[tokio::test] async fn when_running_in_offline_mode_with_proxy_key_should_not_filter_features() { let client_features = client_features_with_constraint_requiring_user_id_of_seven(); - let (token_cache, feature_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( client_features.clone(), vec!["secret-123".to_string()], vec![], @@ -1292,7 +1292,7 @@ mod tests { #[tokio::test] async fn frontend_api_filters_evaluated_toggles_to_tokens_access() { let client_features = crate::tests::features_from_disk("../examples/hostedexample.json"); - let (token_cache, feature_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( client_features.clone(), vec!["dx:development.secret123".to_string()], vec![], @@ -1382,7 +1382,7 @@ mod tests { #[tokio::test] async fn using_a_string_for_properties_gives_400() { let client_features = crate::tests::features_from_disk("../examples/hostedexample.json"); - let (token_cache, feature_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( client_features, vec!["dx:development.secret123".to_string()], vec![], @@ -1413,7 +1413,7 @@ mod tests { #[tokio::test] async fn can_get_single_feature() { let client_features = crate::tests::features_from_disk("../examples/hostedexample.json"); - let (token_cache, feature_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( client_features.clone(), vec!["dx:development.secret123".to_string()], vec![], @@ -1443,7 +1443,7 @@ mod tests { #[tokio::test] async fn can_get_single_feature_with_top_level_properties() { - let (token_cache, feature_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( client_features_with_constraint_requiring_test_property_to_be_42(), vec!["*:development.secret123".to_string()], vec![], @@ -1473,7 +1473,7 @@ mod tests { #[tokio::test] async fn trying_to_evaluate_feature_you_do_not_have_access_to_will_give_not_found() { let client_features = crate::tests::features_from_disk("../examples/hostedexample.json"); - let (token_cache, feature_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( client_features.clone(), vec!["dx:development.secret123".to_string()], vec![], @@ -1506,7 +1506,7 @@ mod tests { let client_features_with_custom_context_field = crate::tests::features_from_disk("../examples/with_custom_constraint.json"); let auth_key = "default:development.secret123".to_string(); - let (token_cache, feature_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( client_features_with_custom_context_field.clone(), vec![auth_key.clone()], vec![], @@ -1548,7 +1548,7 @@ mod tests { let client_features_with_custom_context_field = crate::tests::features_from_disk("../examples/with_custom_constraint.json"); let auth_key = "default:development.secret123".to_string(); - let (token_cache, feature_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( client_features_with_custom_context_field.clone(), vec![auth_key.clone()], vec![], @@ -1589,7 +1589,7 @@ mod tests { let client_features_with_custom_context_field = crate::tests::features_from_disk("../examples/ip_address_feature.json"); let auth_key = "gard:development.secret123".to_string(); - let (token_cache, feature_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( client_features_with_custom_context_field.clone(), vec![auth_key.clone()], vec![], @@ -1629,7 +1629,7 @@ mod tests { let client_features_with_custom_context_field = crate::tests::features_from_disk("../examples/ip_address_feature.json"); let auth_key = "gard:development.secret123".to_string(); - let (token_cache, feature_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( client_features_with_custom_context_field.clone(), vec![auth_key.clone()], vec![], @@ -1671,7 +1671,7 @@ mod tests { let client_features_with_custom_context_field = crate::tests::features_from_disk("../examples/with_custom_constraint.json"); let auth_key = "default:development.secret123".to_string(); - let (token_cache, feature_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( client_features_with_custom_context_field.clone(), vec![auth_key.clone()], vec![], diff --git a/server/src/main.rs b/server/src/main.rs index b48e21a0..3979b477 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -71,7 +71,7 @@ async fn main() -> Result<(), anyhow::Error> { let internal_backstage_args = args.internal_backstage.clone(); let ( - (token_cache, features_cache, engine_cache), + (token_cache, features_cache, delta_cache, engine_cache), token_validator, feature_refresher, persistence, From 4c979b2d459ffeeb2747f3848c9fd381fbe271d3 Mon Sep 17 00:00:00 2001 From: sjaanus Date: Mon, 17 Feb 2025 12:49:24 +0200 Subject: [PATCH 02/14] Update --- server/src/client_api.rs | 4 +-- server/src/frontend_api.rs | 32 +++++++++--------- server/src/http/refresher/delta_refresher.rs | 33 +++++++++++++++++-- .../src/http/refresher/feature_refresher.rs | 1 + 4 files changed, 50 insertions(+), 20 deletions(-) diff --git a/server/src/client_api.rs b/server/src/client_api.rs index d53866e4..f547a69e 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -162,7 +162,7 @@ async fn resolve_features( async fn resolve_delta( edge_token: EdgeToken, - delta_cache: Data, + delta_cache: Data>, token_cache: Data>, filter_query: Query, req: HttpRequest, @@ -172,7 +172,7 @@ async fn resolve_delta( let client_features = match req.app_data::>() { Some(refresher) => { - refresher + refresher. .features_for_filter(validated_token.clone(), &filter_set) .await } diff --git a/server/src/frontend_api.rs b/server/src/frontend_api.rs index 95a60814..23564fe8 100644 --- a/server/src/frontend_api.rs +++ b/server/src/frontend_api.rs @@ -917,7 +917,7 @@ mod tests { #[actix_web::test] #[traced_test] async fn calling_post_requests_resolves_context_values_correctly() { - let (token_cache, features_cache, delta_cache, engine_cache) = build_offline_mode( + let (token_cache, features_cache, _delta_cache, engine_cache) = build_offline_mode( client_features_with_constraint_requiring_user_id_of_seven(), vec![ "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7" @@ -969,7 +969,7 @@ mod tests { #[actix_web::test] #[traced_test] async fn calling_get_requests_resolves_context_values_correctly() { - let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, _delta_cache, engine_cache) = build_offline_mode( client_features_with_constraint_requiring_user_id_of_seven(), vec![ "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7" @@ -1019,7 +1019,7 @@ mod tests { #[actix_web::test] #[traced_test] async fn calling_get_requests_resolves_top_level_properties_correctly() { - let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, _delta_cache, engine_cache) = build_offline_mode( client_features_with_constraint_requiring_test_property_to_be_42(), vec![ "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7" @@ -1079,7 +1079,7 @@ mod tests { #[actix_web::test] #[traced_test] async fn calling_post_requests_resolves_top_level_properties_correctly() { - let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, _delta_cache, engine_cache) = build_offline_mode( client_features_with_constraint_requiring_test_property_to_be_42(), vec![ "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7" @@ -1137,7 +1137,7 @@ mod tests { #[actix_web::test] #[traced_test] async fn calling_get_requests_resolves_context_values_correctly_with_enabled_filter() { - let (token_cache, features_cache, delta_cache, engine_cache) = build_offline_mode( + let (token_cache, features_cache, _delta_cache, engine_cache) = build_offline_mode( client_features_with_constraint_one_enabled_toggle_and_one_disabled_toggle(), vec![ "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7" @@ -1256,7 +1256,7 @@ mod tests { #[tokio::test] async fn when_running_in_offline_mode_with_proxy_key_should_not_filter_features() { let client_features = client_features_with_constraint_requiring_user_id_of_seven(); - let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, _delta_cache, engine_cache) = build_offline_mode( client_features.clone(), vec!["secret-123".to_string()], vec![], @@ -1292,7 +1292,7 @@ mod tests { #[tokio::test] async fn frontend_api_filters_evaluated_toggles_to_tokens_access() { let client_features = crate::tests::features_from_disk("../examples/hostedexample.json"); - let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, _delta_cache, engine_cache) = build_offline_mode( client_features.clone(), vec!["dx:development.secret123".to_string()], vec![], @@ -1382,7 +1382,7 @@ mod tests { #[tokio::test] async fn using_a_string_for_properties_gives_400() { let client_features = crate::tests::features_from_disk("../examples/hostedexample.json"); - let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, _delta_cache, engine_cache) = build_offline_mode( client_features, vec!["dx:development.secret123".to_string()], vec![], @@ -1413,7 +1413,7 @@ mod tests { #[tokio::test] async fn can_get_single_feature() { let client_features = crate::tests::features_from_disk("../examples/hostedexample.json"); - let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, _delta_cache, engine_cache) = build_offline_mode( client_features.clone(), vec!["dx:development.secret123".to_string()], vec![], @@ -1443,7 +1443,7 @@ mod tests { #[tokio::test] async fn can_get_single_feature_with_top_level_properties() { - let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, _delta_cache, engine_cache) = build_offline_mode( client_features_with_constraint_requiring_test_property_to_be_42(), vec!["*:development.secret123".to_string()], vec![], @@ -1473,7 +1473,7 @@ mod tests { #[tokio::test] async fn trying_to_evaluate_feature_you_do_not_have_access_to_will_give_not_found() { let client_features = crate::tests::features_from_disk("../examples/hostedexample.json"); - let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, _delta_cache, engine_cache) = build_offline_mode( client_features.clone(), vec!["dx:development.secret123".to_string()], vec![], @@ -1506,7 +1506,7 @@ mod tests { let client_features_with_custom_context_field = crate::tests::features_from_disk("../examples/with_custom_constraint.json"); let auth_key = "default:development.secret123".to_string(); - let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, _delta_cache, engine_cache) = build_offline_mode( client_features_with_custom_context_field.clone(), vec![auth_key.clone()], vec![], @@ -1548,7 +1548,7 @@ mod tests { let client_features_with_custom_context_field = crate::tests::features_from_disk("../examples/with_custom_constraint.json"); let auth_key = "default:development.secret123".to_string(); - let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, _delta_cache, engine_cache) = build_offline_mode( client_features_with_custom_context_field.clone(), vec![auth_key.clone()], vec![], @@ -1589,7 +1589,7 @@ mod tests { let client_features_with_custom_context_field = crate::tests::features_from_disk("../examples/ip_address_feature.json"); let auth_key = "gard:development.secret123".to_string(); - let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, _delta_cache, engine_cache) = build_offline_mode( client_features_with_custom_context_field.clone(), vec![auth_key.clone()], vec![], @@ -1629,7 +1629,7 @@ mod tests { let client_features_with_custom_context_field = crate::tests::features_from_disk("../examples/ip_address_feature.json"); let auth_key = "gard:development.secret123".to_string(); - let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, _delta_cache, engine_cache) = build_offline_mode( client_features_with_custom_context_field.clone(), vec![auth_key.clone()], vec![], @@ -1671,7 +1671,7 @@ mod tests { let client_features_with_custom_context_field = crate::tests::features_from_disk("../examples/with_custom_constraint.json"); let auth_key = "default:development.secret123".to_string(); - let (token_cache, feature_cache, delta_cache, engine_cache) = build_offline_mode( + let (token_cache, feature_cache, _delta_cache, engine_cache) = build_offline_mode( client_features_with_custom_context_field.clone(), vec![auth_key.clone()], vec![], diff --git a/server/src/http/refresher/delta_refresher.rs b/server/src/http/refresher/delta_refresher.rs index 8f212d0c..a56635ac 100644 --- a/server/src/http/refresher/delta_refresher.rs +++ b/server/src/http/refresher/delta_refresher.rs @@ -4,17 +4,18 @@ use futures::TryStreamExt; use reqwest::StatusCode; use std::time::Duration; use tracing::{debug, info, warn}; -use unleash_types::client_features::ClientFeaturesDelta; +use unleash_types::client_features::{ClientFeatures, ClientFeaturesDelta}; use unleash_yggdrasil::EngineState; use crate::error::{EdgeError, FeatureError}; +use crate::filters::FeatureFilterSet; use crate::http::headers::{ UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_INSTANCE_ID_HEADER, }; use crate::http::refresher::feature_refresher::FeatureRefresher; use crate::http::unleash_client::ClientMetaInformation; use crate::tokens::cache_key; -use crate::types::{ClientFeaturesDeltaResponse, ClientFeaturesRequest, EdgeToken, TokenRefresh}; +use crate::types::{ClientFeaturesDeltaResponse, ClientFeaturesRequest, EdgeResult, EdgeToken, TokenRefresh}; impl FeatureRefresher { async fn handle_client_features_delta_updated( @@ -118,6 +119,34 @@ impl FeatureRefresher { } } + pub(crate) async fn features_for_filter( + &self, + token: EdgeToken, + filters: &FeatureFilterSet, + ) -> EdgeResult { + match self.get_features_by_filter(&token, filters) { + Some(features) if self.token_is_subsumed(&token) => Ok(features), + _ => { + if self.strict { + debug!("Strict behavior: Token is not subsumed by any registered tokens. Returning error"); + Err(EdgeError::InvalidTokenWithStrictBehavior) + } else { + debug!( + "Dynamic behavior: Had never seen this environment. Configuring fetcher" + ); + self.register_and_hydrate_token(&token).await; + self.get_features_by_filter(&token, filters).ok_or_else(|| { + EdgeError::ClientHydrationFailed( + "Failed to get features by filter after registering and hydrating token (This is very likely an error in Edge. Please report this!)" + .into(), + ) + }) + } + } + } + } + + pub async fn start_streaming_delta_background_task( &self, client_meta_information: ClientMetaInformation, diff --git a/server/src/http/refresher/feature_refresher.rs b/server/src/http/refresher/feature_refresher.rs index 71c1e0ce..379608de 100644 --- a/server/src/http/refresher/feature_refresher.rs +++ b/server/src/http/refresher/feature_refresher.rs @@ -46,6 +46,7 @@ pub struct FeatureRefresher { pub unleash_client: Arc, pub tokens_to_refresh: Arc>, pub features_cache: Arc, + pub delta_cache: Arc>, pub engine_cache: Arc>, pub refresh_interval: chrono::Duration, pub persistence: Option>, From 4593179a5268adbe2cc09d9a6877f9d82ff28a3d Mon Sep 17 00:00:00 2001 From: sjaanus Date: Mon, 17 Feb 2025 13:32:58 +0200 Subject: [PATCH 03/14] feat: start storing actual deltas in edge --- server/src/delta_cache.rs | 6 ++--- server/src/http/refresher/delta_refresher.rs | 27 ++++++++++++++++--- .../src/http/refresher/feature_refresher.rs | 3 ++- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/server/src/delta_cache.rs b/server/src/delta_cache.rs index 8264e3e8..976c0998 100644 --- a/server/src/delta_cache.rs +++ b/server/src/delta_cache.rs @@ -4,9 +4,9 @@ use unleash_types::client_features::{DeltaEvent, ClientFeature, Segment}; #[derive(Debug, Clone)] pub struct DeltaHydrationEvent { - event_id: u32, - features: Vec, - segments: Vec, + pub event_id: u32, + pub features: Vec, + pub segments: Vec, } diff --git a/server/src/http/refresher/delta_refresher.rs b/server/src/http/refresher/delta_refresher.rs index 8f212d0c..87674be8 100644 --- a/server/src/http/refresher/delta_refresher.rs +++ b/server/src/http/refresher/delta_refresher.rs @@ -3,10 +3,10 @@ use eventsource_client::Client; use futures::TryStreamExt; use reqwest::StatusCode; use std::time::Duration; -use tracing::{debug, info, warn}; -use unleash_types::client_features::ClientFeaturesDelta; +use tracing::{debug, error, info, warn}; +use unleash_types::client_features::{ClientFeaturesDelta, DeltaEvent}; use unleash_yggdrasil::EngineState; - +use crate::delta_cache::{DeltaCache, DeltaHydrationEvent}; use crate::error::{EdgeError, FeatureError}; use crate::http::headers::{ UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_INSTANCE_ID_HEADER, @@ -31,6 +31,27 @@ impl FeatureRefresher { let key = cache_key(refresh_token); self.features_cache.apply_delta(key.clone(), &delta); + + + self.delta_cache + .entry(key.clone()) + .and_modify(|cache| { + cache.add_events(delta.events.clone()); + }) + .or_insert_with(|| { + if let Some(DeltaEvent::Hydration { + event_id, + features, + segments, + }) = delta.events.into_iter().next() + { + DeltaCache::new(DeltaHydrationEvent{event_id, features, segments}, 100) + } else { + warn!("Expected exactly one Hydration event, but none found. Skipping cache initialization."); + return; + } + }); + self.update_last_refresh( refresh_token, etag, diff --git a/server/src/http/refresher/feature_refresher.rs b/server/src/http/refresher/feature_refresher.rs index 71c1e0ce..99fd3a8f 100644 --- a/server/src/http/refresher/feature_refresher.rs +++ b/server/src/http/refresher/feature_refresher.rs @@ -27,7 +27,7 @@ use crate::{ tokens::{cache_key, simplify}, types::{ClientFeaturesRequest, ClientFeaturesResponse, EdgeToken, TokenRefresh}, }; - +use crate::delta_cache::DeltaCache; use crate::http::unleash_client::{ClientMetaInformation, UnleashClient}; fn frontend_token_is_covered_by_tokens( @@ -46,6 +46,7 @@ pub struct FeatureRefresher { pub unleash_client: Arc, pub tokens_to_refresh: Arc>, pub features_cache: Arc, + pub delta_cache: Arc>, pub engine_cache: Arc>, pub refresh_interval: chrono::Duration, pub persistence: Option>, From d639b3f6832a6398d6d57781f967f0f2041eabb8 Mon Sep 17 00:00:00 2001 From: sjaanus Date: Mon, 17 Feb 2025 15:09:57 +0200 Subject: [PATCH 04/14] Update --- server/src/delta_cache.rs | 24 ++++++------ server/src/http/refresher/delta_refresher.rs | 37 ++++++++++--------- .../src/http/refresher/feature_refresher.rs | 6 ++- 3 files changed, 36 insertions(+), 31 deletions(-) diff --git a/server/src/delta_cache.rs b/server/src/delta_cache.rs index 976c0998..040907f5 100644 --- a/server/src/delta_cache.rs +++ b/server/src/delta_cache.rs @@ -35,16 +35,16 @@ impl DeltaCache { .cloned() .expect("Hydration event must have at least one feature"); - self.add_events(vec![DeltaEvent::FeatureUpdated { + self.add_events(&vec![DeltaEvent::FeatureUpdated { event_id: hydration_event.event_id, feature: last_feature, }]); } - pub fn add_events(&mut self, events: Vec) { + pub fn add_events(&mut self, events: &Vec) { for event in events.into_iter() { self.events.push(event.clone()); - self.update_hydration_event(event); + self.update_hydration_event(&event); if self.events.len() > self.max_length { self.events.remove(0); // O(n) operation @@ -65,29 +65,29 @@ impl DeltaCache { &self.hydration_event } - fn update_hydration_event(&mut self, event: DeltaEvent) { + fn update_hydration_event(&mut self, event: &DeltaEvent) { let event_id = event.get_event_id(); self.hydration_event.event_id = event_id; match event { DeltaEvent::FeatureUpdated { feature, .. } => { if let Some(existing) = self.hydration_event.features.iter_mut().find(|f| f.name == feature.name) { - *existing = feature; + *existing = feature.clone(); } else { - self.hydration_event.features.push(feature); + self.hydration_event.features.push(feature.clone()); } } DeltaEvent::FeatureRemoved { feature_name, .. } => { - self.hydration_event.features.retain(|f| f.name != feature_name); + self.hydration_event.features.retain(|f| f.name != feature_name.clone()); } DeltaEvent::SegmentUpdated { segment, .. } => { if let Some(existing) = self.hydration_event.segments.iter_mut().find(|s| s.id == segment.id) { - *existing = segment; + *existing = segment.clone(); } else { - self.hydration_event.segments.push(segment); + self.hydration_event.segments.push(segment.clone()); } } DeltaEvent::SegmentRemoved { segment_id, .. } => { - self.hydration_event.segments.retain(|s| s.id != segment_id); + self.hydration_event.segments.retain(|s| s.id != segment_id.clone()); } DeltaEvent::Hydration { .. } => { @@ -124,7 +124,7 @@ mod tests { let max_length = 2; let mut delta_cache = DeltaCache::new(base_event.clone(), max_length); - let initial_events = vec![ + let initial_events = &vec![ DeltaEvent::FeatureUpdated { event_id: 2, feature: ClientFeature { @@ -160,7 +160,7 @@ mod tests { segment: Segment { id: 3, constraints: vec![] }, }, ]; - delta_cache.add_events(added_events.clone()); + delta_cache.add_events(&added_events); let events: Vec<_> = delta_cache.get_events().to_vec(); assert_eq!(events.len(), max_length); diff --git a/server/src/http/refresher/delta_refresher.rs b/server/src/http/refresher/delta_refresher.rs index 87674be8..3235cb7c 100644 --- a/server/src/http/refresher/delta_refresher.rs +++ b/server/src/http/refresher/delta_refresher.rs @@ -16,6 +16,8 @@ use crate::http::unleash_client::ClientMetaInformation; use crate::tokens::cache_key; use crate::types::{ClientFeaturesDeltaResponse, ClientFeaturesRequest, EdgeToken, TokenRefresh}; +pub type Environment = String; + impl FeatureRefresher { async fn handle_client_features_delta_updated( &self, @@ -33,24 +35,20 @@ impl FeatureRefresher { self.features_cache.apply_delta(key.clone(), &delta); - self.delta_cache - .entry(key.clone()) - .and_modify(|cache| { - cache.add_events(delta.events.clone()); - }) - .or_insert_with(|| { - if let Some(DeltaEvent::Hydration { - event_id, - features, - segments, - }) = delta.events.into_iter().next() - { - DeltaCache::new(DeltaHydrationEvent{event_id, features, segments}, 100) - } else { - warn!("Expected exactly one Hydration event, but none found. Skipping cache initialization."); - return; - } - }); + if let Some(mut entry) = self.delta_cache.get_mut(&key) { + entry.add_events(&delta.events); + } else { + if let Some(DeltaEvent::Hydration { + event_id, + features, + segments, + }) = delta.events.clone().into_iter().next() + { + self.delta_cache.insert(key.clone(), DeltaCache::new(DeltaHydrationEvent{event_id, features, segments}, 100)); + } else { + warn!("Warning: No hydrationEvent found in delta.events, but cache empty for environment"); + } + } self.update_last_refresh( refresh_token, @@ -272,6 +270,8 @@ mod tests { Segment, }; use unleash_yggdrasil::EngineState; + use crate::delta_cache::DeltaCache; + use crate::http::refresher::delta_refresher::Environment; #[actix_web::test] #[tracing_test::traced_test] @@ -279,6 +279,7 @@ mod tests { let srv = test_features_server().await; let unleash_client = Arc::new(UnleashClient::new(srv.url("/").as_str(), None).unwrap()); let features_cache: Arc = Arc::new(FeatureCache::default()); + let delta_cache: Arc> = Arc::new(DashMap::default()); let engine_cache: Arc> = Arc::new(DashMap::default()); let feature_refresher = Arc::new(FeatureRefresher { diff --git a/server/src/http/refresher/feature_refresher.rs b/server/src/http/refresher/feature_refresher.rs index 99fd3a8f..e010e66d 100644 --- a/server/src/http/refresher/feature_refresher.rs +++ b/server/src/http/refresher/feature_refresher.rs @@ -28,6 +28,7 @@ use crate::{ types::{ClientFeaturesRequest, ClientFeaturesResponse, EdgeToken, TokenRefresh}, }; use crate::delta_cache::DeltaCache; +use crate::http::refresher::delta_refresher::Environment; use crate::http::unleash_client::{ClientMetaInformation, UnleashClient}; fn frontend_token_is_covered_by_tokens( @@ -46,7 +47,7 @@ pub struct FeatureRefresher { pub unleash_client: Arc, pub tokens_to_refresh: Arc>, pub features_cache: Arc, - pub delta_cache: Arc>, + pub delta_cache: Arc>, pub engine_cache: Arc>, pub refresh_interval: chrono::Duration, pub persistence: Option>, @@ -64,6 +65,7 @@ impl Default for FeatureRefresher { unleash_client: Default::default(), tokens_to_refresh: Arc::new(DashMap::default()), features_cache: Arc::new(Default::default()), + delta_cache: Default::default(), engine_cache: Default::default(), persistence: None, strict: true, @@ -134,6 +136,7 @@ impl FeatureRefresher { pub fn new( unleash_client: Arc, features_cache: Arc, + delta_cache: Arc>, engines: Arc>, persistence: Option>, config: FeatureRefreshConfig, @@ -142,6 +145,7 @@ impl FeatureRefresher { unleash_client, tokens_to_refresh: Arc::new(DashMap::default()), features_cache, + delta_cache, engine_cache: engines, refresh_interval: config.features_refresh_interval, persistence, From ca66a542a9a37f7c1df8cc1558fa371fc48c6d32 Mon Sep 17 00:00:00 2001 From: sjaanus Date: Mon, 17 Feb 2025 15:49:55 +0200 Subject: [PATCH 05/14] A lot of fixes --- server/src/builder.rs | 9 ++-- server/src/client_api.rs | 29 ++++++------- server/src/delta_cache.rs | 4 +- server/src/filters.rs | 22 +++++++++- server/src/http/refresher/delta_refresher.rs | 28 ------------- .../src/http/refresher/feature_refresher.rs | 42 ++++++++++++++++++- 6 files changed, 80 insertions(+), 54 deletions(-) diff --git a/server/src/builder.rs b/server/src/builder.rs index 7a5ea524..6ba0f8f5 100644 --- a/server/src/builder.rs +++ b/server/src/builder.rs @@ -55,7 +55,7 @@ fn build_caches() -> CacheContainer { } async fn hydrate_from_persistent_storage(cache: CacheContainer, storage: Arc) { - let (token_cache, features_cache, delta_cache, engine_cache, ) = cache; + let (token_cache, features_cache, _delta_cache, engine_cache, ) = cache; // TODO: do we need to hydrate from persistant storage for delta? let tokens = storage.load_tokens().await.unwrap_or_else(|error| { warn!("Failed to load tokens from cache {error:?}"); @@ -89,7 +89,7 @@ pub(crate) fn build_offline_mode( client_tokens: Vec, frontend_tokens: Vec, ) -> EdgeResult { - let (token_cache, features_cache, delta_cache, engine_cache) = build_caches(); + let (token_cache, features_cache, _delta_cache, engine_cache) = build_caches(); let edge_tokens: Vec = tokens .iter() @@ -141,7 +141,7 @@ pub(crate) fn build_offline_mode( ) } // TODO: possibly need to resolve delta cache for offline mode? - Ok((token_cache, features_cache, delta_cache, engine_cache)) + Ok((token_cache, features_cache, _delta_cache, engine_cache)) } fn build_offline(offline_args: OfflineArgs) -> EdgeResult { @@ -282,6 +282,7 @@ async fn build_edge( let feature_refresher = Arc::new(FeatureRefresher::new( unleash_client, feature_cache.clone(), + delta_cache.clone(), engine_cache.clone(), persistence.clone(), feature_config, @@ -314,7 +315,7 @@ async fn build_edge( .await; } Ok(( - (token_cache, feature_cache, delta_cache, engine_cache), + (token_cache, feature_cache, _delta_cache, engine_cache), Some(token_validator), Some(feature_refresher), persistence, diff --git a/server/src/client_api.rs b/server/src/client_api.rs index f547a69e..4e88c7e4 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -1,9 +1,7 @@ use crate::cli::{EdgeArgs, EdgeMode}; use crate::error::EdgeError; use crate::feature_cache::FeatureCache; -use crate::filters::{ - filter_client_features, name_match_filter, name_prefix_filter, project_filter, FeatureFilterSet, -}; +use crate::filters::{filter_client_features, filter_delta_events, name_match_filter, name_prefix_filter, project_filter, FeatureFilterSet}; use crate::http::broadcaster::Broadcaster; use crate::http::refresher::feature_refresher::FeatureRefresher; use crate::metrics::client_metrics::MetricsCache; @@ -15,9 +13,10 @@ use actix_web::web::{self, Data, Json, Query}; use actix_web::Responder; use actix_web::{get, post, HttpRequest, HttpResponse}; use dashmap::DashMap; -use unleash_types::client_features::{ClientFeature, ClientFeatures}; +use unleash_types::client_features::{ClientFeature, ClientFeatures, ClientFeaturesDelta}; use unleash_types::client_metrics::{ClientApplication, ClientMetrics, ConnectVia}; use crate::delta_cache::DeltaCache; +use crate::http::refresher::delta_refresher::Environment; #[utoipa::path( context_path = "/api/client", @@ -49,7 +48,7 @@ pub async fn get_delta( token_cache: Data>, filter_query: Query, req: HttpRequest, -) -> EdgeJsonResult { +) -> EdgeJsonResult { resolve_delta(edge_token, delta_cache, token_cache, filter_query, req).await } @@ -162,30 +161,27 @@ async fn resolve_features( async fn resolve_delta( edge_token: EdgeToken, - delta_cache: Data>, + delta_cache: Data>, token_cache: Data>, filter_query: Query, req: HttpRequest, -) -> EdgeJsonResult { - let (validated_token, filter_set, query) = +) -> EdgeJsonResult { + let (validated_token, filter_set, ..) = get_feature_filter(&edge_token, &token_cache, filter_query.clone())?; - let client_features = match req.app_data::>() { + let delta = match req.app_data::>() { Some(refresher) => { - refresher. - .features_for_filter(validated_token.clone(), &filter_set) + refresher + .delta_events_for_filter(validated_token.clone(), &filter_set) .await } None => delta_cache .get(&cache_key(&validated_token)) - .map(|client_features| filter_client_features(&client_features, &filter_set)) + .map(|cache| filter_delta_events(cache.value(), &filter_set)) .ok_or(EdgeError::ClientCacheError), }?; - Ok(Json(ClientFeatures { - query: Some(query), - ..client_features - })) + Ok(Json(delta)) } #[utoipa::path( context_path = "/api/client", @@ -1057,6 +1053,7 @@ mod tests { unleash_client: unleash_client.clone(), tokens_to_refresh: Arc::new(Default::default()), features_cache: features_cache.clone(), + delta_cache: Arc::new(Default::default()), engine_cache: engine_cache.clone(), refresh_interval: Duration::seconds(6000), persistence: None, diff --git a/server/src/delta_cache.rs b/server/src/delta_cache.rs index 040907f5..06d09c58 100644 --- a/server/src/delta_cache.rs +++ b/server/src/delta_cache.rs @@ -194,7 +194,7 @@ mod tests { ..ClientFeature::default() }, }; - delta_cache.add_events(vec![initial_feature_event.clone()]); + delta_cache.add_events(&vec![initial_feature_event.clone()]); let updated_feature_event = DeltaEvent::FeatureUpdated { event_id: 130, @@ -212,7 +212,7 @@ mod tests { ..ClientFeature::default() }, }; - delta_cache.add_events(vec![updated_feature_event.clone()]); + delta_cache.add_events(&vec![updated_feature_event.clone()]); assert_eq!(delta_cache.get_events()[1], initial_feature_event); assert_eq!(delta_cache.get_events()[2], updated_feature_event); diff --git a/server/src/filters.rs b/server/src/filters.rs index af33502a..1522dd5c 100644 --- a/server/src/filters.rs +++ b/server/src/filters.rs @@ -1,6 +1,7 @@ use dashmap::mapref::one::Ref; -use unleash_types::client_features::{ClientFeature, ClientFeatures}; - +use unleash_types::client_features::{ClientFeature, ClientFeatures, ClientFeaturesDelta, DeltaEvent}; +use crate::delta_cache::DeltaCache; +use crate::http::refresher::delta_refresher::Environment; use crate::types::EdgeToken; pub type FeatureFilter = Box bool>; @@ -39,6 +40,14 @@ fn filter_features( .collect::>() } +fn filter_deltas( + delta_cache: &DeltaCache, + _filters: &FeatureFilterSet, +) -> Vec { + // TODO: add filtering to here + delta_cache.get_events().clone() +} + pub(crate) fn filter_client_features( feature_cache: &Ref<'_, String, ClientFeatures>, filters: &FeatureFilterSet, @@ -52,6 +61,15 @@ pub(crate) fn filter_client_features( } } +pub(crate) fn filter_delta_events( + delta_cache: &DeltaCache, + filters: &FeatureFilterSet, +) -> ClientFeaturesDelta { + ClientFeaturesDelta { + events: filter_deltas(delta_cache, filters), + } +} + pub(crate) fn name_prefix_filter(name_prefix: String) -> FeatureFilter { Box::new(move |f| f.name.starts_with(&name_prefix)) } diff --git a/server/src/http/refresher/delta_refresher.rs b/server/src/http/refresher/delta_refresher.rs index 30e2e7ee..fe35d22c 100644 --- a/server/src/http/refresher/delta_refresher.rs +++ b/server/src/http/refresher/delta_refresher.rs @@ -139,34 +139,6 @@ impl FeatureRefresher { } } - pub(crate) async fn features_for_filter( - &self, - token: EdgeToken, - filters: &FeatureFilterSet, - ) -> EdgeResult { - match self.get_features_by_filter(&token, filters) { - Some(features) if self.token_is_subsumed(&token) => Ok(features), - _ => { - if self.strict { - debug!("Strict behavior: Token is not subsumed by any registered tokens. Returning error"); - Err(EdgeError::InvalidTokenWithStrictBehavior) - } else { - debug!( - "Dynamic behavior: Had never seen this environment. Configuring fetcher" - ); - self.register_and_hydrate_token(&token).await; - self.get_features_by_filter(&token, filters).ok_or_else(|| { - EdgeError::ClientHydrationFailed( - "Failed to get features by filter after registering and hydrating token (This is very likely an error in Edge. Please report this!)" - .into(), - ) - }) - } - } - } - } - - pub async fn start_streaming_delta_background_task( &self, client_meta_information: ClientMetaInformation, diff --git a/server/src/http/refresher/feature_refresher.rs b/server/src/http/refresher/feature_refresher.rs index e010e66d..f3eac4e5 100644 --- a/server/src/http/refresher/feature_refresher.rs +++ b/server/src/http/refresher/feature_refresher.rs @@ -9,13 +9,13 @@ use futures::TryStreamExt; use json_structural_diff::JsonDiff; use reqwest::StatusCode; use tracing::{debug, info, warn}; -use unleash_types::client_features::{ClientFeatures, DeltaEvent}; +use unleash_types::client_features::{ClientFeatures, ClientFeaturesDelta, DeltaEvent}; use unleash_types::client_metrics::{ClientApplication, MetricsMetadata}; use unleash_yggdrasil::{EngineState, UpdateMessage}; use crate::error::{EdgeError, FeatureError}; use crate::feature_cache::FeatureCache; -use crate::filters::{filter_client_features, FeatureFilterSet}; +use crate::filters::{filter_client_features, filter_delta_events, FeatureFilterSet}; use crate::http::headers::{ UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_INSTANCE_ID_HEADER, }; @@ -254,6 +254,33 @@ impl FeatureRefresher { } } + pub(crate) async fn delta_events_for_filter( + &self, + token: EdgeToken, + filters: &FeatureFilterSet, + ) -> EdgeResult { + match self.get_delta_events_by_filter(&token, filters) { + Some(features) if self.token_is_subsumed(&token) => Ok(features), + _ => { + if self.strict { + debug!("Strict behavior: Token is not subsumed by any registered tokens. Returning error"); + Err(EdgeError::InvalidTokenWithStrictBehavior) + } else { + debug!( + "Dynamic behavior: Had never seen this environment. Configuring fetcher" + ); + self.register_and_hydrate_token(&token).await; + self.get_delta_events_by_filter(&token, filters).ok_or_else(|| { + EdgeError::ClientHydrationFailed( + "Failed to get delta events by filter after registering and hydrating token (This is very likely an error in Edge. Please report this!)" + .into(), + ) + }) + } + } + } + } + fn get_features_by_filter( &self, token: &EdgeToken, @@ -264,6 +291,17 @@ impl FeatureRefresher { .map(|client_features| filter_client_features(&client_features, filters)) } + fn get_delta_events_by_filter( + &self, + token: &EdgeToken, + filters: &FeatureFilterSet, + ) -> Option { + self.delta_cache + .get(&cache_key(token)) + .map(|delta_events| filter_delta_events(&delta_events, filters)) + } + + /// /// Registers a token for refresh, the token will be discarded if it can be subsumed by another previously registered token pub async fn register_token_for_refresh(&self, token: EdgeToken, etag: Option) { From d1d94e8b309cd46b6dcfab7e0a9ecc474b1ea8d6 Mon Sep 17 00:00:00 2001 From: sjaanus Date: Tue, 18 Feb 2025 09:33:03 +0200 Subject: [PATCH 06/14] A lot of fixes --- server/src/builder.rs | 2 +- server/src/http/refresher/delta_refresher.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/builder.rs b/server/src/builder.rs index 6ba0f8f5..6e3449e8 100644 --- a/server/src/builder.rs +++ b/server/src/builder.rs @@ -315,7 +315,7 @@ async fn build_edge( .await; } Ok(( - (token_cache, feature_cache, _delta_cache, engine_cache), + (token_cache, feature_cache, delta_cache, engine_cache), Some(token_validator), Some(feature_refresher), persistence, diff --git a/server/src/http/refresher/delta_refresher.rs b/server/src/http/refresher/delta_refresher.rs index fe35d22c..6fb3d953 100644 --- a/server/src/http/refresher/delta_refresher.rs +++ b/server/src/http/refresher/delta_refresher.rs @@ -287,6 +287,7 @@ mod tests { let feature_refresher = Arc::new(FeatureRefresher { unleash_client: unleash_client.clone(), tokens_to_refresh: Arc::new(Default::default()), + delta_cache: Arc::new(Default::default()), features_cache: features_cache.clone(), engine_cache: engine_cache.clone(), refresh_interval: Duration::seconds(6000), From 71ed382b318f249fa1832f2e39c15cb5cf810c4e Mon Sep 17 00:00:00 2001 From: sjaanus Date: Tue, 18 Feb 2025 09:34:19 +0200 Subject: [PATCH 07/14] Fix --- server/src/delta_cache.rs | 6 ++--- server/src/filters.rs | 1 - server/src/http/refresher/delta_refresher.rs | 23 +++++++++----------- 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/server/src/delta_cache.rs b/server/src/delta_cache.rs index 06d09c58..f4f8c01f 100644 --- a/server/src/delta_cache.rs +++ b/server/src/delta_cache.rs @@ -42,9 +42,9 @@ impl DeltaCache { } pub fn add_events(&mut self, events: &Vec) { - for event in events.into_iter() { + for event in events.iter() { self.events.push(event.clone()); - self.update_hydration_event(&event); + self.update_hydration_event(event); if self.events.len() > self.max_length { self.events.remove(0); // O(n) operation @@ -87,7 +87,7 @@ impl DeltaCache { } } DeltaEvent::SegmentRemoved { segment_id, .. } => { - self.hydration_event.segments.retain(|s| s.id != segment_id.clone()); + self.hydration_event.segments.retain(|s| s.id != *segment_id); } DeltaEvent::Hydration { .. } => { diff --git a/server/src/filters.rs b/server/src/filters.rs index 1522dd5c..d53d319f 100644 --- a/server/src/filters.rs +++ b/server/src/filters.rs @@ -1,7 +1,6 @@ use dashmap::mapref::one::Ref; use unleash_types::client_features::{ClientFeature, ClientFeatures, ClientFeaturesDelta, DeltaEvent}; use crate::delta_cache::DeltaCache; -use crate::http::refresher::delta_refresher::Environment; use crate::types::EdgeToken; pub type FeatureFilter = Box bool>; diff --git a/server/src/http/refresher/delta_refresher.rs b/server/src/http/refresher/delta_refresher.rs index 6fb3d953..7f4b2c1d 100644 --- a/server/src/http/refresher/delta_refresher.rs +++ b/server/src/http/refresher/delta_refresher.rs @@ -3,20 +3,19 @@ use eventsource_client::Client; use futures::TryStreamExt; use reqwest::StatusCode; use std::time::Duration; -use tracing::{debug, error, info, warn}; +use tracing::{debug, info, warn}; use unleash_types::client_features::{ClientFeaturesDelta, DeltaEvent}; use unleash_yggdrasil::EngineState; use crate::delta_cache::{DeltaCache, DeltaHydrationEvent}; use crate::error::{EdgeError, FeatureError}; -use crate::filters::FeatureFilterSet; use crate::http::headers::{ UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_INSTANCE_ID_HEADER, }; use crate::http::refresher::feature_refresher::FeatureRefresher; use crate::http::unleash_client::ClientMetaInformation; use crate::tokens::cache_key; -use crate::types::{ClientFeaturesDeltaResponse, ClientFeaturesRequest, EdgeResult, EdgeToken, TokenRefresh}; +use crate::types::{ClientFeaturesDeltaResponse, ClientFeaturesRequest, EdgeToken, TokenRefresh}; pub type Environment = String; @@ -39,17 +38,15 @@ impl FeatureRefresher { if let Some(mut entry) = self.delta_cache.get_mut(&key) { entry.add_events(&delta.events); + } else if let Some(DeltaEvent::Hydration { + event_id, + features, + segments, + }) = delta.events.clone().into_iter().next() + { + self.delta_cache.insert(key.clone(), DeltaCache::new(DeltaHydrationEvent{event_id, features, segments}, 100)); } else { - if let Some(DeltaEvent::Hydration { - event_id, - features, - segments, - }) = delta.events.clone().into_iter().next() - { - self.delta_cache.insert(key.clone(), DeltaCache::new(DeltaHydrationEvent{event_id, features, segments}, 100)); - } else { - warn!("Warning: No hydrationEvent found in delta.events, but cache empty for environment"); - } + warn!("Warning: No hydrationEvent found in delta.events, but cache empty for environment"); } self.update_last_refresh( From c87911d4c858cc499e612f706b83455c7ce4051a Mon Sep 17 00:00:00 2001 From: sjaanus Date: Tue, 18 Feb 2025 10:25:55 +0200 Subject: [PATCH 08/14] Basic poc working --- server/src/client_api.rs | 13 ++++++++++++- server/src/filters.rs | 2 ++ server/src/http/refresher/delta_refresher.rs | 2 ++ server/src/lib.rs | 3 +++ server/src/main.rs | 2 ++ 5 files changed, 21 insertions(+), 1 deletion(-) diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 4e88c7e4..ff3d4b5f 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -13,6 +13,7 @@ use actix_web::web::{self, Data, Json, Query}; use actix_web::Responder; use actix_web::{get, post, HttpRequest, HttpResponse}; use dashmap::DashMap; +use tracing::info; use unleash_types::client_features::{ClientFeature, ClientFeatures, ClientFeaturesDelta}; use unleash_types::client_metrics::{ClientApplication, ClientMetrics, ConnectVia}; use crate::delta_cache::DeltaCache; @@ -168,7 +169,6 @@ async fn resolve_delta( ) -> EdgeJsonResult { let (validated_token, filter_set, ..) = get_feature_filter(&edge_token, &token_cache, filter_query.clone())?; - let delta = match req.app_data::>() { Some(refresher) => { refresher @@ -314,6 +314,7 @@ pub fn configure_client_api(cfg: &mut web::ServiceConfig) { crate::middleware::validate_token::validate_token, )) .service(get_features) + .service(get_delta) .service(get_feature) .service(register) .service(metrics) @@ -724,11 +725,13 @@ mod tests { token.token_type = Some(TokenType::Client); let upstream_token_cache = Arc::new(DashMap::default()); let upstream_features_cache = Arc::new(FeatureCache::default()); + let upstream_delta_cache = Arc::new(DashMap::default()); let upstream_engine_cache = Arc::new(DashMap::default()); upstream_token_cache.insert(token.token.clone(), token.clone()); let srv = upstream_server( upstream_token_cache, upstream_features_cache, + upstream_delta_cache, upstream_engine_cache, ) .await; @@ -763,11 +766,13 @@ mod tests { frontend_token.token_type = Some(TokenType::Frontend); let upstream_token_cache = Arc::new(DashMap::default()); let upstream_features_cache = Arc::new(FeatureCache::default()); + let upstream_delta_cache = Arc::new(DashMap::default()); let upstream_engine_cache = Arc::new(DashMap::default()); upstream_token_cache.insert(frontend_token.token.clone(), frontend_token.clone()); let srv = upstream_server( upstream_token_cache, upstream_features_cache, + upstream_delta_cache, upstream_engine_cache, ) .await; @@ -1029,10 +1034,12 @@ mod tests { async fn calling_client_features_endpoint_with_new_token_hydrates_from_upstream_when_dynamic() { let upstream_features_cache = Arc::new(FeatureCache::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); + let upstream_delta_cache: Arc> = Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let server = upstream_server( upstream_token_cache.clone(), upstream_features_cache.clone(), + upstream_delta_cache.clone(), upstream_engine_cache.clone(), ) .await; @@ -1094,10 +1101,12 @@ mod tests { async fn calling_client_features_endpoint_with_new_token_does_not_hydrate_when_strict() { let upstream_features_cache = Arc::new(FeatureCache::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); + let upstream_delta_cache: Arc> = Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let server = upstream_server( upstream_token_cache.clone(), upstream_features_cache.clone(), + upstream_delta_cache.clone(), upstream_engine_cache.clone(), ) .await; @@ -1216,10 +1225,12 @@ mod tests { { let upstream_features_cache: Arc = Arc::new(FeatureCache::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); + let upstream_delta_cache: Arc> = Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let server = upstream_server( upstream_token_cache.clone(), upstream_features_cache.clone(), + upstream_delta_cache.clone(), upstream_engine_cache.clone(), ) .await; diff --git a/server/src/filters.rs b/server/src/filters.rs index d53d319f..cdb5051f 100644 --- a/server/src/filters.rs +++ b/server/src/filters.rs @@ -1,4 +1,5 @@ use dashmap::mapref::one::Ref; +use tracing::info; use unleash_types::client_features::{ClientFeature, ClientFeatures, ClientFeaturesDelta, DeltaEvent}; use crate::delta_cache::DeltaCache; use crate::types::EdgeToken; @@ -64,6 +65,7 @@ pub(crate) fn filter_delta_events( delta_cache: &DeltaCache, filters: &FeatureFilterSet, ) -> ClientFeaturesDelta { + info!("filtering delta events for api"); ClientFeaturesDelta { events: filter_deltas(delta_cache, filters), } diff --git a/server/src/http/refresher/delta_refresher.rs b/server/src/http/refresher/delta_refresher.rs index 7f4b2c1d..fbba0d3a 100644 --- a/server/src/http/refresher/delta_refresher.rs +++ b/server/src/http/refresher/delta_refresher.rs @@ -38,12 +38,14 @@ impl FeatureRefresher { if let Some(mut entry) = self.delta_cache.get_mut(&key) { entry.add_events(&delta.events); + info!("adding events to delta"); } else if let Some(DeltaEvent::Hydration { event_id, features, segments, }) = delta.events.clone().into_iter().next() { + info!("creating entry in delta"); self.delta_cache.insert(key.clone(), DeltaCache::new(DeltaHydrationEvent{event_id, features, segments}, 100)); } else { warn!("Warning: No hydrationEvent found in delta.events, but cache empty for environment"); diff --git a/server/src/lib.rs b/server/src/lib.rs index 14bc9deb..ea87df9e 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -46,6 +46,7 @@ mod tests { use unleash_yggdrasil::EngineState; use crate::auth::token_validator::TokenValidator; + use crate::delta_cache::DeltaCache; use crate::feature_cache::FeatureCache; use crate::metrics::client_metrics::MetricsCache; use crate::types::EdgeToken; @@ -60,6 +61,7 @@ mod tests { pub async fn upstream_server( upstream_token_cache: Arc>, upstream_features_cache: Arc, + upstream_delta_cache: Arc>, upstream_engine_cache: Arc>, ) -> TestServer { let token_validator = Arc::new(TokenValidator { @@ -81,6 +83,7 @@ mod tests { .app_data(config) .app_data(web::Data::from(token_validator.clone())) .app_data(web::Data::from(upstream_features_cache.clone())) + .app_data(web::Data::from(upstream_delta_cache.clone())) .app_data(web::Data::from(upstream_engine_cache.clone())) .app_data(web::Data::from(upstream_token_cache.clone())) .app_data(web::Data::new(metrics_cache)) diff --git a/server/src/main.rs b/server/src/main.rs index 3979b477..b44db9be 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -80,6 +80,7 @@ async fn main() -> Result<(), anyhow::Error> { let token_validator_schedule = token_validator.clone(); let lazy_feature_cache = features_cache.clone(); let lazy_token_cache = token_cache.clone(); + // TODO : do we need lazy delta cache? let lazy_engine_cache = engine_cache.clone(); let lazy_feature_refresher = feature_refresher.clone(); @@ -105,6 +106,7 @@ async fn main() -> Result<(), anyhow::Error> { .app_data(web::Data::new(connect_via.clone())) .app_data(web::Data::from(metrics_cache.clone())) .app_data(web::Data::from(token_cache.clone())) + .app_data(web::Data::from(delta_cache.clone())) .app_data(web::Data::from(features_cache.clone())) .app_data(web::Data::from(engine_cache.clone())) .app_data(web::Data::from(broadcaster.clone())); From db6ce6c3f72ce6d9cb1cb79868ae1b00b7fd2c40 Mon Sep 17 00:00:00 2001 From: sjaanus Date: Tue, 18 Feb 2025 10:29:56 +0200 Subject: [PATCH 09/14] Remove logs --- server/src/http/refresher/delta_refresher.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/http/refresher/delta_refresher.rs b/server/src/http/refresher/delta_refresher.rs index fbba0d3a..7f4b2c1d 100644 --- a/server/src/http/refresher/delta_refresher.rs +++ b/server/src/http/refresher/delta_refresher.rs @@ -38,14 +38,12 @@ impl FeatureRefresher { if let Some(mut entry) = self.delta_cache.get_mut(&key) { entry.add_events(&delta.events); - info!("adding events to delta"); } else if let Some(DeltaEvent::Hydration { event_id, features, segments, }) = delta.events.clone().into_iter().next() { - info!("creating entry in delta"); self.delta_cache.insert(key.clone(), DeltaCache::new(DeltaHydrationEvent{event_id, features, segments}, 100)); } else { warn!("Warning: No hydrationEvent found in delta.events, but cache empty for environment"); From 179ceea5fa346483fc4084fe6b9c22664e878e00 Mon Sep 17 00:00:00 2001 From: sjaanus Date: Tue, 18 Feb 2025 10:34:17 +0200 Subject: [PATCH 10/14] Fix --- server/src/client_api.rs | 1 - server/src/delta_cache.rs | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/client_api.rs b/server/src/client_api.rs index ff3d4b5f..ae952a78 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -13,7 +13,6 @@ use actix_web::web::{self, Data, Json, Query}; use actix_web::Responder; use actix_web::{get, post, HttpRequest, HttpResponse}; use dashmap::DashMap; -use tracing::info; use unleash_types::client_features::{ClientFeature, ClientFeatures, ClientFeaturesDelta}; use unleash_types::client_metrics::{ClientApplication, ClientMetrics, ConnectVia}; use crate::delta_cache::DeltaCache; diff --git a/server/src/delta_cache.rs b/server/src/delta_cache.rs index f4f8c01f..a441d739 100644 --- a/server/src/delta_cache.rs +++ b/server/src/delta_cache.rs @@ -41,13 +41,13 @@ impl DeltaCache { }]); } - pub fn add_events(&mut self, events: &Vec) { + pub fn add_events(&mut self, events: &[DeltaEvent]) { for event in events.iter() { self.events.push(event.clone()); self.update_hydration_event(event); if self.events.len() > self.max_length { - self.events.remove(0); // O(n) operation + self.events.remove(0); } } } From 128586ef0e575d8988879bb7a5596f09a12eff13 Mon Sep 17 00:00:00 2001 From: sjaanus Date: Tue, 18 Feb 2025 10:35:11 +0200 Subject: [PATCH 11/14] Fix --- server/src/builder.rs | 6 +- server/src/cli.rs | 8 +- server/src/client_api.rs | 9 +- server/src/delta_cache.rs | 91 ++++++++++++------- server/src/feature_cache.rs | 1 - server/src/filters.rs | 13 ++- server/src/http/refresher/delta_refresher.rs | 27 ++++-- .../src/http/refresher/feature_refresher.rs | 7 +- server/src/lib.rs | 2 +- 9 files changed, 104 insertions(+), 60 deletions(-) diff --git a/server/src/builder.rs b/server/src/builder.rs index 6e3449e8..7504a8b9 100644 --- a/server/src/builder.rs +++ b/server/src/builder.rs @@ -11,6 +11,7 @@ use unleash_types::client_features::ClientFeatures; use unleash_yggdrasil::{EngineState, UpdateMessage}; use crate::cli::RedisMode; +use crate::delta_cache::DeltaCache; use crate::feature_cache::FeatureCache; use crate::http::refresher::feature_refresher::{FeatureRefreshConfig, FeatureRefresherMode}; use crate::http::unleash_client::{new_reqwest_client, ClientMetaInformation}; @@ -26,7 +27,6 @@ use crate::{ http::{refresher::feature_refresher::FeatureRefresher, unleash_client::UnleashClient}, types::{EdgeResult, EdgeToken, TokenType}, }; -use crate::delta_cache::DeltaCache; type CacheContainer = ( Arc>, @@ -55,7 +55,7 @@ fn build_caches() -> CacheContainer { } async fn hydrate_from_persistent_storage(cache: CacheContainer, storage: Arc) { - let (token_cache, features_cache, _delta_cache, engine_cache, ) = cache; + let (token_cache, features_cache, _delta_cache, engine_cache) = cache; // TODO: do we need to hydrate from persistant storage for delta? let tokens = storage.load_tokens().await.unwrap_or_else(|error| { warn!("Failed to load tokens from cache {error:?}"); @@ -241,7 +241,7 @@ async fn build_edge( )); } - let (token_cache, feature_cache, delta_cache, engine_cache ) = build_caches(); + let (token_cache, feature_cache, delta_cache, engine_cache) = build_caches(); let persistence = get_data_source(args).await; diff --git a/server/src/cli.rs b/server/src/cli.rs index 85b803b9..a40ea1ec 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -224,7 +224,13 @@ pub struct EdgeArgs { pub delta: bool, /// If set to true, it compares features payload with delta payload and logs diff. This flag is for internal testing only. Do not turn this on for production configurations - #[clap(long, env, default_value_t = false, conflicts_with = "delta", hide = true)] + #[clap( + long, + env, + default_value_t = false, + conflicts_with = "delta", + hide = true + )] pub delta_diff: bool, /// Sets a remote write url for prometheus metrics, if this is set, prometheus metrics will be written upstream diff --git a/server/src/client_api.rs b/server/src/client_api.rs index ae952a78..e6987971 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -1,8 +1,13 @@ use crate::cli::{EdgeArgs, EdgeMode}; +use crate::delta_cache::DeltaCache; use crate::error::EdgeError; use crate::feature_cache::FeatureCache; -use crate::filters::{filter_client_features, filter_delta_events, name_match_filter, name_prefix_filter, project_filter, FeatureFilterSet}; +use crate::filters::{ + filter_client_features, filter_delta_events, name_match_filter, name_prefix_filter, + project_filter, FeatureFilterSet, +}; use crate::http::broadcaster::Broadcaster; +use crate::http::refresher::delta_refresher::Environment; use crate::http::refresher::feature_refresher::FeatureRefresher; use crate::metrics::client_metrics::MetricsCache; use crate::tokens::cache_key; @@ -15,8 +20,6 @@ use actix_web::{get, post, HttpRequest, HttpResponse}; use dashmap::DashMap; use unleash_types::client_features::{ClientFeature, ClientFeatures, ClientFeaturesDelta}; use unleash_types::client_metrics::{ClientApplication, ClientMetrics, ConnectVia}; -use crate::delta_cache::DeltaCache; -use crate::http::refresher::delta_refresher::Environment; #[utoipa::path( context_path = "/api/client", diff --git a/server/src/delta_cache.rs b/server/src/delta_cache.rs index a441d739..c44bc9a5 100644 --- a/server/src/delta_cache.rs +++ b/server/src/delta_cache.rs @@ -1,6 +1,4 @@ -use unleash_types::client_features::{DeltaEvent, ClientFeature, Segment}; - - +use unleash_types::client_features::{ClientFeature, DeltaEvent, Segment}; #[derive(Debug, Clone)] pub struct DeltaHydrationEvent { @@ -9,7 +7,6 @@ pub struct DeltaHydrationEvent { pub segments: Vec, } - #[derive(Debug, Clone)] pub struct DeltaCache { max_length: usize, @@ -52,13 +49,15 @@ impl DeltaCache { } } - pub fn get_events(&self) -> &Vec { &self.events } pub fn is_missing_revision(&self, revision_id: u32) -> bool { - !self.events.iter().any(|event| event.get_event_id() == revision_id) + !self + .events + .iter() + .any(|event| event.get_event_id() == revision_id) } pub fn get_hydration_event(&self) -> &DeltaHydrationEvent { @@ -70,28 +69,41 @@ impl DeltaCache { self.hydration_event.event_id = event_id; match event { DeltaEvent::FeatureUpdated { feature, .. } => { - if let Some(existing) = self.hydration_event.features.iter_mut().find(|f| f.name == feature.name) { + if let Some(existing) = self + .hydration_event + .features + .iter_mut() + .find(|f| f.name == feature.name) + { *existing = feature.clone(); } else { self.hydration_event.features.push(feature.clone()); } } - DeltaEvent::FeatureRemoved { feature_name, .. } => { - self.hydration_event.features.retain(|f| f.name != feature_name.clone()); + DeltaEvent::FeatureRemoved { feature_name, .. } => { + self.hydration_event + .features + .retain(|f| f.name != feature_name.clone()); } - DeltaEvent::SegmentUpdated { segment, .. } => { - if let Some(existing) = self.hydration_event.segments.iter_mut().find(|s| s.id == segment.id) { + DeltaEvent::SegmentUpdated { segment, .. } => { + if let Some(existing) = self + .hydration_event + .segments + .iter_mut() + .find(|s| s.id == segment.id) + { *existing = segment.clone(); } else { self.hydration_event.segments.push(segment.clone()); } } - DeltaEvent::SegmentRemoved { segment_id, .. } => { - self.hydration_event.segments.retain(|s| s.id != *segment_id); - + DeltaEvent::SegmentRemoved { segment_id, .. } => { + self.hydration_event + .segments + .retain(|s| s.id != *segment_id); } DeltaEvent::Hydration { .. } => { - // do nothing, as hydration will never end up in update events + // do nothing, as hydration will never end up in update events } } } @@ -99,8 +111,8 @@ impl DeltaCache { #[cfg(test)] mod tests { - use unleash_types::client_features::{DeltaEvent, ClientFeature, Segment}; use crate::delta_cache::{DeltaCache, DeltaHydrationEvent}; + use unleash_types::client_features::{ClientFeature, DeltaEvent, Segment}; #[test] fn test_update_hydration_event_and_remove_event_when_over_limit() { @@ -117,22 +129,26 @@ mod tests { }, ], segments: vec![ - Segment { id: 1, constraints: vec![] }, - Segment { id: 2,constraints: vec![] }, + Segment { + id: 1, + constraints: vec![], + }, + Segment { + id: 2, + constraints: vec![], + }, ], }; let max_length = 2; let mut delta_cache = DeltaCache::new(base_event.clone(), max_length); - let initial_events = &vec![ - DeltaEvent::FeatureUpdated { - event_id: 2, - feature: ClientFeature { - name: "my-feature-flag".to_string(), - ..ClientFeature::default() - }, + let initial_events = &vec![DeltaEvent::FeatureUpdated { + event_id: 2, + feature: ClientFeature { + name: "my-feature-flag".to_string(), + ..ClientFeature::default() }, - ]; + }]; delta_cache.add_events(initial_events); let added_events = vec![ @@ -149,7 +165,10 @@ mod tests { }, DeltaEvent::SegmentUpdated { event_id: 5, - segment: Segment { id: 1, constraints: vec![] }, + segment: Segment { + id: 1, + constraints: vec![], + }, }, DeltaEvent::SegmentRemoved { event_id: 6, @@ -157,7 +176,10 @@ mod tests { }, DeltaEvent::SegmentUpdated { event_id: 7, - segment: Segment { id: 3, constraints: vec![] }, + segment: Segment { + id: 3, + constraints: vec![], + }, }, ]; delta_cache.add_events(&added_events); @@ -169,8 +191,14 @@ mod tests { let hydration_event = delta_cache.get_hydration_event(); assert_eq!(hydration_event.features.len(), 2); assert_eq!(hydration_event.event_id, 7); - assert!(hydration_event.features.iter().any(|f| f.name == "my-feature-flag")); - assert!(hydration_event.features.iter().any(|f| f.name == "another-feature-flag")); + assert!(hydration_event + .features + .iter() + .any(|f| f.name == "my-feature-flag")); + assert!(hydration_event + .features + .iter() + .any(|f| f.name == "another-feature-flag")); assert!(hydration_event.segments.iter().any(|s| s.id == 1)); } @@ -206,7 +234,7 @@ mod tests { sort_order: None, segments: None, variants: None, - constraints: None , + constraints: None, parameters: None, }]), ..ClientFeature::default() @@ -217,5 +245,4 @@ mod tests { assert_eq!(delta_cache.get_events()[1], initial_feature_event); assert_eq!(delta_cache.get_events()[2], updated_feature_event); } - } diff --git a/server/src/feature_cache.rs b/server/src/feature_cache.rs index d589791f..16f28cf7 100644 --- a/server/src/feature_cache.rs +++ b/server/src/feature_cache.rs @@ -20,7 +20,6 @@ pub struct FeatureCache { update_sender: broadcast::Sender, } - impl FeatureCache { pub fn new(features: DashMap) -> Self { let (tx, _rx) = tokio::sync::broadcast::channel::(16); diff --git a/server/src/filters.rs b/server/src/filters.rs index cdb5051f..60c30b66 100644 --- a/server/src/filters.rs +++ b/server/src/filters.rs @@ -1,8 +1,10 @@ -use dashmap::mapref::one::Ref; -use tracing::info; -use unleash_types::client_features::{ClientFeature, ClientFeatures, ClientFeaturesDelta, DeltaEvent}; use crate::delta_cache::DeltaCache; use crate::types::EdgeToken; +use dashmap::mapref::one::Ref; +use tracing::info; +use unleash_types::client_features::{ + ClientFeature, ClientFeatures, ClientFeaturesDelta, DeltaEvent, +}; pub type FeatureFilter = Box bool>; @@ -40,10 +42,7 @@ fn filter_features( .collect::>() } -fn filter_deltas( - delta_cache: &DeltaCache, - _filters: &FeatureFilterSet, -) -> Vec { +fn filter_deltas(delta_cache: &DeltaCache, _filters: &FeatureFilterSet) -> Vec { // TODO: add filtering to here delta_cache.get_events().clone() } diff --git a/server/src/http/refresher/delta_refresher.rs b/server/src/http/refresher/delta_refresher.rs index 7f4b2c1d..46b76185 100644 --- a/server/src/http/refresher/delta_refresher.rs +++ b/server/src/http/refresher/delta_refresher.rs @@ -6,7 +6,6 @@ use std::time::Duration; use tracing::{debug, info, warn}; use unleash_types::client_features::{ClientFeaturesDelta, DeltaEvent}; -use unleash_yggdrasil::EngineState; use crate::delta_cache::{DeltaCache, DeltaHydrationEvent}; use crate::error::{EdgeError, FeatureError}; use crate::http::headers::{ @@ -16,6 +15,7 @@ use crate::http::refresher::feature_refresher::FeatureRefresher; use crate::http::unleash_client::ClientMetaInformation; use crate::tokens::cache_key; use crate::types::{ClientFeaturesDeltaResponse, ClientFeaturesRequest, EdgeToken, TokenRefresh}; +use unleash_yggdrasil::EngineState; pub type Environment = String; @@ -35,18 +35,29 @@ impl FeatureRefresher { let key = cache_key(refresh_token); self.features_cache.apply_delta(key.clone(), &delta); - if let Some(mut entry) = self.delta_cache.get_mut(&key) { entry.add_events(&delta.events); } else if let Some(DeltaEvent::Hydration { + event_id, + features, + segments, + }) = delta.events.clone().into_iter().next() + { + self.delta_cache.insert( + key.clone(), + DeltaCache::new( + DeltaHydrationEvent { event_id, features, segments, - }) = delta.events.clone().into_iter().next() - { - self.delta_cache.insert(key.clone(), DeltaCache::new(DeltaHydrationEvent{event_id, features, segments}, 100)); + }, + 100, + ), + ); } else { - warn!("Warning: No hydrationEvent found in delta.events, but cache empty for environment"); + warn!( + "Warning: No hydrationEvent found in delta.events, but cache empty for environment" + ); } self.update_last_refresh( @@ -250,7 +261,9 @@ impl FeatureRefresher { #[cfg(test)] mod tests { + use crate::delta_cache::DeltaCache; use crate::feature_cache::FeatureCache; + use crate::http::refresher::delta_refresher::Environment; use crate::http::refresher::feature_refresher::FeatureRefresher; use crate::http::unleash_client::{ClientMetaInformation, UnleashClient}; use crate::types::EdgeToken; @@ -269,8 +282,6 @@ mod tests { Segment, }; use unleash_yggdrasil::EngineState; - use crate::delta_cache::DeltaCache; - use crate::http::refresher::delta_refresher::Environment; #[actix_web::test] #[tracing_test::traced_test] diff --git a/server/src/http/refresher/feature_refresher.rs b/server/src/http/refresher/feature_refresher.rs index f3eac4e5..7b595fb0 100644 --- a/server/src/http/refresher/feature_refresher.rs +++ b/server/src/http/refresher/feature_refresher.rs @@ -13,12 +13,15 @@ use unleash_types::client_features::{ClientFeatures, ClientFeaturesDelta, DeltaE use unleash_types::client_metrics::{ClientApplication, MetricsMetadata}; use unleash_yggdrasil::{EngineState, UpdateMessage}; +use crate::delta_cache::DeltaCache; use crate::error::{EdgeError, FeatureError}; use crate::feature_cache::FeatureCache; use crate::filters::{filter_client_features, filter_delta_events, FeatureFilterSet}; use crate::http::headers::{ UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_INSTANCE_ID_HEADER, }; +use crate::http::refresher::delta_refresher::Environment; +use crate::http::unleash_client::{ClientMetaInformation, UnleashClient}; use crate::types::{ build, ClientFeaturesDeltaResponse, EdgeResult, TokenType, TokenValidationStatus, }; @@ -27,9 +30,6 @@ use crate::{ tokens::{cache_key, simplify}, types::{ClientFeaturesRequest, ClientFeaturesResponse, EdgeToken, TokenRefresh}, }; -use crate::delta_cache::DeltaCache; -use crate::http::refresher::delta_refresher::Environment; -use crate::http::unleash_client::{ClientMetaInformation, UnleashClient}; fn frontend_token_is_covered_by_tokens( frontend_token: &EdgeToken, @@ -301,7 +301,6 @@ impl FeatureRefresher { .map(|delta_events| filter_delta_events(&delta_events, filters)) } - /// /// Registers a token for refresh, the token will be discarded if it can be subsumed by another previously registered token pub async fn register_token_for_refresh(&self, token: EdgeToken, etag: Option) { diff --git a/server/src/lib.rs b/server/src/lib.rs index ea87df9e..b5f6c2a5 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -4,11 +4,11 @@ pub mod builder; #[cfg(not(tarpaulin_include))] pub mod cli; pub mod client_api; +pub mod delta_cache; pub mod edge_api; #[cfg(not(tarpaulin_include))] pub mod error; pub mod feature_cache; -pub mod delta_cache; pub mod filters; pub mod frontend_api; pub mod health_checker; From 2ce2d352ef930568f543e68318f9d8c155bb5084 Mon Sep 17 00:00:00 2001 From: sjaanus Date: Tue, 18 Feb 2025 10:49:23 +0200 Subject: [PATCH 12/14] Fix --- server/src/http/refresher/delta_refresher.rs | 2 +- server/src/internal_backstage.rs | 6 ++++++ server/src/middleware/client_token_from_frontend_token.rs | 3 +++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/server/src/http/refresher/delta_refresher.rs b/server/src/http/refresher/delta_refresher.rs index 46b76185..08d5faa3 100644 --- a/server/src/http/refresher/delta_refresher.rs +++ b/server/src/http/refresher/delta_refresher.rs @@ -295,7 +295,7 @@ mod tests { let feature_refresher = Arc::new(FeatureRefresher { unleash_client: unleash_client.clone(), tokens_to_refresh: Arc::new(Default::default()), - delta_cache: Arc::new(Default::default()), + delta_cache: delta_cache.clone(), features_cache: features_cache.clone(), engine_cache: engine_cache.clone(), refresh_interval: Duration::seconds(6000), diff --git a/server/src/internal_backstage.rs b/server/src/internal_backstage.rs index ed55f703..c055256a 100644 --- a/server/src/internal_backstage.rs +++ b/server/src/internal_backstage.rs @@ -181,6 +181,7 @@ mod tests { use unleash_yggdrasil::EngineState; use crate::auth::token_validator::TokenValidator; + use crate::delta_cache::DeltaCache; use crate::feature_cache::FeatureCache; use crate::http::refresher::feature_refresher::FeatureRefresher; use crate::http::unleash_client::UnleashClient; @@ -314,6 +315,7 @@ mod tests { Arc::new(DashMap::default()), Arc::new(FeatureCache::default()), Arc::new(DashMap::default()), + Arc::new(DashMap::default()), ) .await; let unleash_client = @@ -353,10 +355,12 @@ mod tests { async fn returns_validated_tokens_when_dynamic() { let upstream_features_cache = Arc::new(FeatureCache::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); + let upstream_delta_cache: Arc> = Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let server = upstream_server( upstream_token_cache.clone(), upstream_features_cache.clone(), + upstream_delta_cache.clone(), upstream_engine_cache.clone(), ) .await; @@ -424,10 +428,12 @@ mod tests { async fn returns_validated_tokens_when_strict() { let upstream_features_cache = Arc::new(FeatureCache::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); + let upstream_delta_cache: Arc> = Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let server = upstream_server( upstream_token_cache.clone(), upstream_features_cache.clone(), + upstream_delta_cache.clone(), upstream_engine_cache.clone(), ) .await; diff --git a/server/src/middleware/client_token_from_frontend_token.rs b/server/src/middleware/client_token_from_frontend_token.rs index ee85fd25..5c17dfae 100644 --- a/server/src/middleware/client_token_from_frontend_token.rs +++ b/server/src/middleware/client_token_from_frontend_token.rs @@ -63,6 +63,7 @@ mod tests { use unleash_yggdrasil::EngineState; use crate::auth::token_validator::TokenValidator; + use crate::delta_cache::DeltaCache; use crate::feature_cache::FeatureCache; use crate::http::refresher::feature_refresher::FeatureRefresher; use crate::http::unleash_client::{new_reqwest_client, UnleashClient}; @@ -123,10 +124,12 @@ mod tests { let upstream_features_cache = Arc::new(FeatureCache::default()); let upstream_token_cache: Arc> = Arc::new(DashMap::default()); upstream_token_cache.insert(frontend_token.token.clone(), frontend_token.clone()); + let upstream_delta_cache: Arc> = Arc::new(DashMap::default()); let upstream_engine_cache: Arc> = Arc::new(DashMap::default()); let upstream_server = upstream_server( upstream_token_cache.clone(), upstream_features_cache.clone(), + upstream_delta_cache.clone(), upstream_engine_cache.clone(), ) .await; From ae1872de5068a2509711e0e9e923c6f0bef1d096 Mon Sep 17 00:00:00 2001 From: sjaanus Date: Tue, 18 Feb 2025 13:20:55 +0200 Subject: [PATCH 13/14] Fixes --- server/src/builder.rs | 1 - server/src/main.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/server/src/builder.rs b/server/src/builder.rs index 7504a8b9..07ef7596 100644 --- a/server/src/builder.rs +++ b/server/src/builder.rs @@ -140,7 +140,6 @@ pub(crate) fn build_offline_mode( client_features.clone(), ) } - // TODO: possibly need to resolve delta cache for offline mode? Ok((token_cache, features_cache, _delta_cache, engine_cache)) } diff --git a/server/src/main.rs b/server/src/main.rs index b44db9be..2127f1a2 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -80,7 +80,6 @@ async fn main() -> Result<(), anyhow::Error> { let token_validator_schedule = token_validator.clone(); let lazy_feature_cache = features_cache.clone(); let lazy_token_cache = token_cache.clone(); - // TODO : do we need lazy delta cache? let lazy_engine_cache = engine_cache.clone(); let lazy_feature_refresher = feature_refresher.clone(); From c31d6ffce65f2329d5ac10ae54f1edf2863d4f6e Mon Sep 17 00:00:00 2001 From: sjaanus Date: Tue, 18 Feb 2025 13:45:06 +0200 Subject: [PATCH 14/14] Fix --- server/src/http/refresher/delta_refresher.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/http/refresher/delta_refresher.rs b/server/src/http/refresher/delta_refresher.rs index 08d5faa3..680fde4e 100644 --- a/server/src/http/refresher/delta_refresher.rs +++ b/server/src/http/refresher/delta_refresher.rs @@ -19,6 +19,8 @@ use unleash_yggdrasil::EngineState; pub type Environment = String; +const DELTA_CACHE_LIMIT: usize = 100; + impl FeatureRefresher { async fn handle_client_features_delta_updated( &self, @@ -51,7 +53,7 @@ impl FeatureRefresher { features, segments, }, - 100, + DELTA_CACHE_LIMIT, ), ); } else {