Skip to content

Commit

Permalink
feat: start storing actual deltas in edge (#742)
Browse files Browse the repository at this point in the history
* feat: start storing actual deltas in edge
  • Loading branch information
sjaanus authored Feb 18, 2025
1 parent c67aaa3 commit b2b59da
Show file tree
Hide file tree
Showing 12 changed files with 280 additions and 82 deletions.
17 changes: 12 additions & 5 deletions server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use unleash_types::client_features::ClientFeatures;
use unleash_yggdrasil::{EngineState, UpdateMessage};

use crate::cli::RedisMode;
use crate::delta_cache::DeltaCache;
use crate::feature_cache::FeatureCache;
use crate::http::refresher::feature_refresher::{FeatureRefreshConfig, FeatureRefresherMode};
use crate::http::unleash_client::{new_reqwest_client, ClientMetaInformation};
Expand All @@ -30,6 +31,7 @@ use crate::{
type CacheContainer = (
Arc<DashMap<String, EdgeToken>>,
Arc<FeatureCache>,
Arc<DashMap<String, DeltaCache>>,
Arc<DashMap<String, EngineState>>,
);
type EdgeInfo = (
Expand All @@ -42,16 +44,19 @@ type EdgeInfo = (
fn build_caches() -> CacheContainer {
let token_cache: DashMap<String, EdgeToken> = DashMap::default();
let features_cache: DashMap<String, ClientFeatures> = DashMap::default();
let delta_cache: DashMap<String, DeltaCache> = DashMap::default();
let engine_cache: DashMap<String, EngineState> = DashMap::default();
(
Arc::new(token_cache),
Arc::new(FeatureCache::new(features_cache)),
Arc::new(delta_cache),
Arc::new(engine_cache),
)
}

async fn hydrate_from_persistent_storage(cache: CacheContainer, storage: Arc<dyn EdgePersistence>) {
let (token_cache, features_cache, engine_cache) = cache;
let (token_cache, features_cache, _delta_cache, engine_cache) = cache;
// TODO: do we need to hydrate from persistant storage for delta?
let tokens = storage.load_tokens().await.unwrap_or_else(|error| {
warn!("Failed to load tokens from cache {error:?}");
vec![]
Expand Down Expand Up @@ -84,7 +89,7 @@ pub(crate) fn build_offline_mode(
client_tokens: Vec<String>,
frontend_tokens: Vec<String>,
) -> EdgeResult<CacheContainer> {
let (token_cache, features_cache, engine_cache) = build_caches();
let (token_cache, features_cache, _delta_cache, engine_cache) = build_caches();

let edge_tokens: Vec<EdgeToken> = tokens
.iter()
Expand Down Expand Up @@ -135,7 +140,7 @@ pub(crate) fn build_offline_mode(
client_features.clone(),
)
}
Ok((token_cache, features_cache, engine_cache))
Ok((token_cache, features_cache, _delta_cache, engine_cache))
}

fn build_offline(offline_args: OfflineArgs) -> EdgeResult<CacheContainer> {
Expand Down Expand Up @@ -235,7 +240,7 @@ async fn build_edge(
));
}

let (token_cache, feature_cache, engine_cache) = build_caches();
let (token_cache, feature_cache, delta_cache, engine_cache) = build_caches();

let persistence = get_data_source(args).await;

Expand Down Expand Up @@ -276,6 +281,7 @@ async fn build_edge(
let feature_refresher = Arc::new(FeatureRefresher::new(
unleash_client,
feature_cache.clone(),
delta_cache.clone(),
engine_cache.clone(),
persistence.clone(),
feature_config,
Expand All @@ -287,6 +293,7 @@ async fn build_edge(
(
token_cache.clone(),
feature_cache.clone(),
delta_cache.clone(),
engine_cache.clone(),
),
persistence,
Expand All @@ -307,7 +314,7 @@ async fn build_edge(
.await;
}
Ok((
(token_cache, feature_cache, engine_cache),
(token_cache, feature_cache, delta_cache, engine_cache),
Some(token_validator),
Some(feature_refresher),
persistence,
Expand Down
8 changes: 7 additions & 1 deletion server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,13 @@ pub struct EdgeArgs {
pub delta: bool,

/// If set to true, it compares features payload with delta payload and logs diff. This flag is for internal testing only. Do not turn this on for production configurations
#[clap(long, env, default_value_t = false, conflicts_with = "delta", hide = true)]
#[clap(
long,
env,
default_value_t = false,
conflicts_with = "delta",
hide = true
)]
pub delta_diff: bool,

/// Sets a remote write url for prometheus metrics, if this is set, prometheus metrics will be written upstream
Expand Down
54 changes: 52 additions & 2 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::cli::{EdgeArgs, EdgeMode};
use crate::delta_cache::DeltaCache;
use crate::error::EdgeError;
use crate::feature_cache::FeatureCache;
use crate::filters::{
filter_client_features, name_match_filter, name_prefix_filter, project_filter, FeatureFilterSet,
filter_client_features, filter_delta_events, name_match_filter, name_prefix_filter,
project_filter, FeatureFilterSet,
};
use crate::http::broadcaster::Broadcaster;
use crate::http::refresher::delta_refresher::Environment;
use crate::http::refresher::feature_refresher::FeatureRefresher;
use crate::metrics::client_metrics::MetricsCache;
use crate::tokens::cache_key;
Expand All @@ -15,7 +18,7 @@ use actix_web::web::{self, Data, Json, Query};
use actix_web::Responder;
use actix_web::{get, post, HttpRequest, HttpResponse};
use dashmap::DashMap;
use unleash_types::client_features::{ClientFeature, ClientFeatures};
use unleash_types::client_features::{ClientFeature, ClientFeatures, ClientFeaturesDelta};
use unleash_types::client_metrics::{ClientApplication, ClientMetrics, ConnectVia};

#[utoipa::path(
Expand All @@ -41,6 +44,17 @@ pub async fn get_features(
resolve_features(edge_token, features_cache, token_cache, filter_query, req).await
}

#[get("/delta")]
pub async fn get_delta(
edge_token: EdgeToken,
delta_cache: Data<DashMap<String, DeltaCache>>,
token_cache: Data<DashMap<String, EdgeToken>>,
filter_query: Query<FeatureFilters>,
req: HttpRequest,
) -> EdgeJsonResult<ClientFeaturesDelta> {
resolve_delta(edge_token, delta_cache, token_cache, filter_query, req).await
}

#[get("/streaming")]
pub async fn stream_features(
edge_token: EdgeToken,
Expand Down Expand Up @@ -147,6 +161,30 @@ async fn resolve_features(
..client_features
}))
}

async fn resolve_delta(
edge_token: EdgeToken,
delta_cache: Data<DashMap<Environment, DeltaCache>>,
token_cache: Data<DashMap<String, EdgeToken>>,
filter_query: Query<FeatureFilters>,
req: HttpRequest,
) -> EdgeJsonResult<ClientFeaturesDelta> {
let (validated_token, filter_set, ..) =
get_feature_filter(&edge_token, &token_cache, filter_query.clone())?;
let delta = match req.app_data::<Data<FeatureRefresher>>() {
Some(refresher) => {
refresher
.delta_events_for_filter(validated_token.clone(), &filter_set)
.await
}
None => delta_cache
.get(&cache_key(&validated_token))
.map(|cache| filter_delta_events(cache.value(), &filter_set))
.ok_or(EdgeError::ClientCacheError),
}?;

Ok(Json(delta))
}
#[utoipa::path(
context_path = "/api/client",
params(("feature_name" = String, Path,)),
Expand Down Expand Up @@ -278,6 +316,7 @@ pub fn configure_client_api(cfg: &mut web::ServiceConfig) {
crate::middleware::validate_token::validate_token,
))
.service(get_features)
.service(get_delta)
.service(get_feature)
.service(register)
.service(metrics)
Expand Down Expand Up @@ -688,11 +727,13 @@ mod tests {
token.token_type = Some(TokenType::Client);
let upstream_token_cache = Arc::new(DashMap::default());
let upstream_features_cache = Arc::new(FeatureCache::default());
let upstream_delta_cache = Arc::new(DashMap::default());
let upstream_engine_cache = Arc::new(DashMap::default());
upstream_token_cache.insert(token.token.clone(), token.clone());
let srv = upstream_server(
upstream_token_cache,
upstream_features_cache,
upstream_delta_cache,
upstream_engine_cache,
)
.await;
Expand Down Expand Up @@ -727,11 +768,13 @@ mod tests {
frontend_token.token_type = Some(TokenType::Frontend);
let upstream_token_cache = Arc::new(DashMap::default());
let upstream_features_cache = Arc::new(FeatureCache::default());
let upstream_delta_cache = Arc::new(DashMap::default());
let upstream_engine_cache = Arc::new(DashMap::default());
upstream_token_cache.insert(frontend_token.token.clone(), frontend_token.clone());
let srv = upstream_server(
upstream_token_cache,
upstream_features_cache,
upstream_delta_cache,
upstream_engine_cache,
)
.await;
Expand Down Expand Up @@ -993,10 +1036,12 @@ mod tests {
async fn calling_client_features_endpoint_with_new_token_hydrates_from_upstream_when_dynamic() {
let upstream_features_cache = Arc::new(FeatureCache::default());
let upstream_token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let upstream_delta_cache: Arc<DashMap<String, DeltaCache>> = Arc::new(DashMap::default());
let upstream_engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let server = upstream_server(
upstream_token_cache.clone(),
upstream_features_cache.clone(),
upstream_delta_cache.clone(),
upstream_engine_cache.clone(),
)
.await;
Expand All @@ -1017,6 +1062,7 @@ mod tests {
unleash_client: unleash_client.clone(),
tokens_to_refresh: Arc::new(Default::default()),
features_cache: features_cache.clone(),
delta_cache: Arc::new(Default::default()),
engine_cache: engine_cache.clone(),
refresh_interval: Duration::seconds(6000),
persistence: None,
Expand Down Expand Up @@ -1057,10 +1103,12 @@ mod tests {
async fn calling_client_features_endpoint_with_new_token_does_not_hydrate_when_strict() {
let upstream_features_cache = Arc::new(FeatureCache::default());
let upstream_token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let upstream_delta_cache: Arc<DashMap<String, DeltaCache>> = Arc::new(DashMap::default());
let upstream_engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let server = upstream_server(
upstream_token_cache.clone(),
upstream_features_cache.clone(),
upstream_delta_cache.clone(),
upstream_engine_cache.clone(),
)
.await;
Expand Down Expand Up @@ -1179,10 +1227,12 @@ mod tests {
{
let upstream_features_cache: Arc<FeatureCache> = Arc::new(FeatureCache::default());
let upstream_token_cache: Arc<DashMap<String, EdgeToken>> = Arc::new(DashMap::default());
let upstream_delta_cache: Arc<DashMap<String, DeltaCache>> = Arc::new(DashMap::default());
let upstream_engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());
let server = upstream_server(
upstream_token_cache.clone(),
upstream_features_cache.clone(),
upstream_delta_cache.clone(),
upstream_engine_cache.clone(),
)
.await;
Expand Down
Loading

0 comments on commit b2b59da

Please sign in to comment.