From f7c6aa5953b75b06f093c219b412d9e28c4bcef4 Mon Sep 17 00:00:00 2001 From: Christopher Kolstad Date: Tue, 17 Dec 2024 15:53:33 +0100 Subject: [PATCH 1/4] feat: streaming now enabled/disabled with runtime flag --- Cargo.lock | 92 +++++++++++++++------------- server/Cargo.toml | 21 +++---- server/src/builder.rs | 2 + server/src/cli.rs | 4 ++ server/src/client_api.rs | 12 ++-- server/src/error.rs | 3 - server/src/http/feature_refresher.rs | 9 +-- server/src/http/mod.rs | 1 - server/src/main.rs | 18 ++---- server/src/urls.rs | 4 -- server/tests/streaming_test.rs | 2 +- 11 files changed, 81 insertions(+), 87 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 18563a80..5c255252 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -436,9 +436,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.91" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c042108f3ed77fd83760a5fd79b53be043192bb3b9dba91d8c574c0ada7850c8" +checksum = "c1fd03a028ef38ba2276dce7e33fcd6369c158a1bca17946c4b1b701891c1ff7" [[package]] name = "arbitrary" @@ -512,9 +512,9 @@ dependencies = [ [[package]] name = "aws-config" -version = "1.5.8" +version = "1.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7198e6f03240fdceba36656d8be440297b6b82270325908c7381f37d826a74f6" +checksum = "9b49afaa341e8dd8577e1a2200468f98956d6eda50bcf4a53246cc00174ba924" dependencies = [ "aws-credential-types", "aws-runtime", @@ -523,7 +523,7 @@ dependencies = [ "aws-sdk-sts", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json", + "aws-smithy-json 0.60.7", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -554,9 +554,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.4.3" +version = "1.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a10d5c055aa540164d9561a0e2e74ad30f0dcf7393c3a92f6733ddf9c5762468" +checksum = "b5ac934720fbb46206292d2c75b57e67acfc56fe7dfd34fb9a02334af08409ea" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -580,11 +580,10 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "1.57.0" +version = "1.65.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8888c238bf93c77c5df8274b3999fd7fc1bb3fb658616f40dfde9e4fcd9efd94" +checksum = "d3ba2c5c0f2618937ce3d4a5ad574b86775576fa24006bcb3128c6e2cbf3c34e" dependencies = [ - "ahash", "aws-credential-types", "aws-runtime", "aws-sigv4", @@ -592,7 +591,7 @@ dependencies = [ "aws-smithy-checksums", "aws-smithy-eventstream", "aws-smithy-http", - "aws-smithy-json", + "aws-smithy-json 0.61.1", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -615,15 +614,15 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.46.0" +version = "1.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0dc2faec3205d496c7e57eff685dd944203df7ce16a4116d0281c44021788a7b" +checksum = "05ca43a4ef210894f93096039ef1d6fa4ad3edfabb3be92b80908b9f2e4b4eab" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json", + "aws-smithy-json 0.61.1", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -637,15 +636,15 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.47.0" +version = "1.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c93c241f52bc5e0476e259c953234dab7e2a35ee207ee202e86c0095ec4951dc" +checksum = "abaf490c2e48eed0bb8e2da2fb08405647bd7f253996e0f93b981958ea0f73b0" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json", + "aws-smithy-json 0.61.1", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -659,15 +658,15 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.46.0" +version = "1.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b259429be94a3459fa1b00c5684faee118d74f9577cc50aebadc36e507c63b5f" +checksum = "b68fde0d69c8bfdc1060ea7da21df3e39f6014da316783336deff0a9ec28f4bf" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json", + "aws-smithy-json 0.61.1", "aws-smithy-query", "aws-smithy-runtime", "aws-smithy-runtime-api", @@ -682,9 +681,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.2.4" +version = "1.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc8db6904450bafe7473c6ca9123f88cc11089e41a025408f992db4e22d3be68" +checksum = "7d3820e0c08d0737872ff3c7c1f21ebbb6693d832312d6152bf18ef50a5471c2" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -722,9 +721,9 @@ dependencies = [ [[package]] name = "aws-smithy-checksums" -version = "0.60.12" +version = "0.60.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "598b1689d001c4d4dc3cb386adb07d37786783aee3ac4b324bcadac116bf3d23" +checksum = "ba1a71073fca26775c8b5189175ea8863afb1c9ea2cceb02a5de5ad9dfbaa795" dependencies = [ "aws-smithy-http", "aws-smithy-types", @@ -782,6 +781,15 @@ dependencies = [ "aws-smithy-types", ] +[[package]] +name = "aws-smithy-json" +version = "0.61.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee4e69cc50921eb913c6b662f8d909131bb3e6ad6cb6090d3a39b66fc5c52095" +dependencies = [ + "aws-smithy-types", +] + [[package]] name = "aws-smithy-query" version = "0.60.7" @@ -794,9 +802,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.2" +version = "1.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a065c0fe6fdbdf9f11817eb68582b2ab4aff9e9c39e986ae48f7ec576c6322db" +checksum = "9f20685047ca9d6f17b994a07f629c813f08b5bce65523e47124879e60103d45" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -821,9 +829,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.7.2" +version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e086682a53d3aa241192aa110fa8dfce98f2f5ac2ead0de84d41582c7e8fdb96" +checksum = "92165296a47a812b267b4f41032ff8069ab7ff783696d217f0994a0d7ab585cd" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -838,9 +846,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.2.7" +version = "1.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "147100a7bea70fa20ef224a6bad700358305f5dc0f84649c53769761395b355b" +checksum = "8ecbf4d5dfb169812e2b240a4350f15ad3c6b03a54074e5712818801615f2dc5" dependencies = [ "base64-simd", "bytes", @@ -1091,9 +1099,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.38" +version = "0.4.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" dependencies = [ "android-tzdata", "iana-time-zone", @@ -1112,9 +1120,9 @@ checksum = "bfc95a0c21d5409adc146dbbb152b5c65aaea32bc2d2f57cf12f850bffdd7ab8" [[package]] name = "clap" -version = "4.5.21" +version = "4.5.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb3b4b9e5a7c7514dfa52869339ee98b3156b0bfb4e8a77c4ff4babb64b1604f" +checksum = "3135e7ec2ef7b10c6ed8950f0f792ed96ee093fa088608f1c76e569722700c84" dependencies = [ "clap_builder", "clap_derive", @@ -1131,9 +1139,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.21" +version = "4.5.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b17a95aa67cc7b5ebd32aa5370189aa0d79069ef1c64ce893bd30fb24bff20ec" +checksum = "30582fc632330df2bd26877bde0c1f4470d57c582bbc070376afcd04d8cb4838" dependencies = [ "anstream", "anstyle", @@ -1155,9 +1163,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.2" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" +checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" [[package]] name = "clone_dyn_types" @@ -4098,9 +4106,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.16" +version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" dependencies = [ "futures-core", "pin-project-lite", @@ -4419,9 +4427,9 @@ dependencies = [ [[package]] name = "unleash-yggdrasil" -version = "0.14.0" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c3e96d03aa2efa8deac6205bb7ca7f1912e0dfdc6a8edb63aa4df1c9eaa8012" +checksum = "c654b3a246b3a77e537499674e791ba9d94250957c1c192302e063589e9ded9f" dependencies = [ "chrono", "convert_case 0.6.0", diff --git a/server/Cargo.toml b/server/Cargo.toml index 446b2dc3..629a2b21 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -15,9 +15,6 @@ repository = "https://github.com/Unleash/unleash-edge" rust-version = "1.81.0" version = "19.6.3" -[features] -streaming = ["actix-web-lab", "eventsource-client", "tokio-stream"] - [package.metadata.wix] upgrade-guid = "11E5D83A-3034-48BB-9A84-9F589EBD648C" path-guid = "6F606A3B-C7E9-43EC-8B6E-91D7B74F80FC" @@ -33,19 +30,19 @@ actix-http = "3.9.0" actix-middleware-etag = "0.4.2" actix-service = "2.0.2" actix-web = { version = "4.9.0", features = ["rustls-0_23", "compress-zstd"] } -actix-web-lab = { version = "0.23.0", optional = true } +actix-web-lab = { version = "0.23.0" } ahash = "0.8.11" -anyhow = "1.0.91" +anyhow = "1.0.94" async-trait = "0.1.83" -aws-config = { version = "1.5.7", features = ["behavior-version-latest"] } -aws-sdk-s3 = { version = "1.57.0", features = ["behavior-version-latest"] } +aws-config = { version = "1.5.10", features = ["behavior-version-latest"] } +aws-sdk-s3 = { version = "1.65.0", features = ["behavior-version-latest"] } base64 = "0.22.1" -chrono = { version = "0.4.38", features = ["serde"] } +chrono = { version = "0.4.39", features = ["serde"] } cidr = "0.3.0" -clap = { version = "4.5.19", features = ["derive", "env"] } +clap = { version = "4.5.23", features = ["derive", "env"] } clap-markdown = "0.1.4" dashmap = "6.0.1" -eventsource-client = { version = "0.13.0", optional = true } +eventsource-client = { version = "0.13.0" } futures = "0.3.30" futures-core = "0.3.30" iter_tools = "0.24.0" @@ -93,12 +90,12 @@ tokio = { version = "1.42.0", features = [ "tracing", "fs", ] } -tokio-stream = { version = "0.1.16", optional = true } +tokio-stream = { version = "0.1.17" } tracing = { version = "0.1.40", features = ["log"] } tracing-subscriber = { version = "0.3.18", features = ["json", "env-filter"] } ulid = "1.1.2" unleash-types = { version = "0.14", features = ["openapi", "hashes"] } -unleash-yggdrasil = { version = "0.14.0" } +unleash-yggdrasil = { version = "0.14.1" } utoipa = { version = "5", features = ["actix_extras", "chrono"] } utoipa-swagger-ui = { version = "8", features = ["actix-web"] } [dev-dependencies] diff --git a/server/src/builder.rs b/server/src/builder.rs index 38655eed..bd457ee7 100644 --- a/server/src/builder.rs +++ b/server/src/builder.rs @@ -266,6 +266,7 @@ async fn build_edge(args: &EdgeArgs, app_name: &str) -> EdgeResult { Duration::seconds(args.features_refresh_interval_seconds.try_into().unwrap()), persistence.clone(), args.strict, + args.streaming, app_name, )); let _ = token_validator.register_tokens(args.tokens.clone()).await; @@ -364,6 +365,7 @@ mod tests { prometheus_user_id: None, prometheus_password: None, prometheus_username: None, + streaming: false, }; let result = build_edge(&args, "test-app").await; diff --git a/server/src/cli.rs b/server/src/cli.rs index 60ca4d65..41c80a35 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -213,6 +213,10 @@ pub struct EdgeArgs { #[clap(long, env, default_value_t = false, conflicts_with = "strict")] pub dynamic: bool, + /// If set to true. Edge connects to upstream using streaming instead of polling. Requires strict mode + #[clap(long, env, default_value_t = false, requires = "strict")] + pub streaming: bool, + /// Sets a remote write url for prometheus metrics, if this is set, prometheus metrics will be written upstream #[clap(long, env)] pub prometheus_remote_write_url: Option, diff --git a/server/src/client_api.rs b/server/src/client_api.rs index b4252a44..3009c1e7 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -3,6 +3,7 @@ use crate::feature_cache::FeatureCache; use crate::filters::{ filter_client_features, name_match_filter, name_prefix_filter, project_filter, FeatureFilterSet, }; +use crate::http::broadcaster::Broadcaster; use crate::http::feature_refresher::FeatureRefresher; use crate::metrics::client_metrics::MetricsCache; use crate::tokens::cache_key; @@ -10,7 +11,6 @@ use crate::types::{ self, BatchMetricsRequestBody, EdgeJsonResult, EdgeResult, EdgeToken, FeatureFilters, }; use actix_web::web::{self, Data, Json, Query}; -#[cfg(feature = "streaming")] use actix_web::Responder; use actix_web::{get, post, HttpRequest, HttpResponse}; use dashmap::DashMap; @@ -40,7 +40,6 @@ pub async fn get_features( resolve_features(edge_token, features_cache, token_cache, filter_query, req).await } -#[cfg(feature = "streaming")] #[get("/streaming")] pub async fn stream_features( edge_token: EdgeToken, @@ -48,8 +47,6 @@ pub async fn stream_features( filter_query: Query, req: HttpRequest, ) -> EdgeResult { - use crate::http::broadcaster::Broadcaster; - let (validated_token, _filter_set, query) = get_feature_filter(&edge_token, &token_cache, filter_query.clone())?; match req.app_data::>() { @@ -279,10 +276,8 @@ pub fn configure_client_api(cfg: &mut web::ServiceConfig) { .service(get_feature) .service(register) .service(metrics) - .service(post_bulk_metrics); - - #[cfg(feature = "streaming")] - let client_scope = client_scope.service(stream_features); + .service(post_bulk_metrics) + .service(stream_features); cfg.service(client_scope); } @@ -1020,6 +1015,7 @@ mod tests { refresh_interval: Duration::seconds(6000), persistence: None, strict: false, + streaming: false, app_name: "test-app".into(), }); let token_validator = Arc::new(TokenValidator { diff --git a/server/src/error.rs b/server/src/error.rs index 2bbefc9b..9be36019 100644 --- a/server/src/error.rs +++ b/server/src/error.rs @@ -2,11 +2,9 @@ use std::error::Error; use std::fmt::{Display, Formatter}; use actix_web::{http::StatusCode, HttpResponseBuilder, ResponseError}; -#[cfg(feature = "streaming")] use actix_web_lab::sse::Event; use serde::Serialize; use serde_json::json; -#[cfg(feature = "streaming")] use tokio::sync::mpsc::error::SendError; use tracing::debug; @@ -295,7 +293,6 @@ impl From for EdgeError { } } -#[cfg(feature = "streaming")] impl From> for EdgeError { // todo: create better enum representation. use this is placeholder fn from(_value: SendError) -> Self { diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index b79c5b18..9d7abe56 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -4,9 +4,7 @@ use std::{sync::Arc, time::Duration}; use actix_web::http::header::EntityTag; use chrono::Utc; use dashmap::DashMap; -#[cfg(feature = "streaming")] use eventsource_client::Client; -#[cfg(feature = "streaming")] use futures::TryStreamExt; use reqwest::StatusCode; use tracing::{debug, info, warn}; @@ -46,6 +44,7 @@ pub struct FeatureRefresher { pub refresh_interval: chrono::Duration, pub persistence: Option>, pub strict: bool, + pub streaming: bool, pub app_name: String, } @@ -59,6 +58,7 @@ impl Default for FeatureRefresher { engine_cache: Default::default(), persistence: None, strict: true, + streaming: false, app_name: "unleash_edge".into(), } } @@ -94,6 +94,7 @@ impl FeatureRefresher { features_refresh_interval: chrono::Duration, persistence: Option>, strict: bool, + streaming: bool, app_name: &str, ) -> Self { FeatureRefresher { @@ -104,6 +105,7 @@ impl FeatureRefresher { refresh_interval: features_refresh_interval, persistence, strict, + streaming, app_name: app_name.into(), } } @@ -245,7 +247,6 @@ impl FeatureRefresher { } /// This is where we set up a listener per token. - #[cfg(feature = "streaming")] pub async fn start_streaming_features_background_task(&self) -> anyhow::Result<()> { use anyhow::Context; @@ -331,7 +332,7 @@ impl FeatureRefresher { } pub async fn start_refresh_features_background_task(&self) { - if cfg!(feature = "streaming") { + if self.streaming { loop { tokio::time::sleep(Duration::from_secs(3600)).await; } diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs index a7849f6a..d8629d39 100644 --- a/server/src/http/mod.rs +++ b/server/src/http/mod.rs @@ -1,6 +1,5 @@ #[cfg(not(tarpaulin_include))] pub mod background_send_metrics; -#[cfg(feature = "streaming")] pub mod broadcaster; pub mod feature_refresher; pub mod unleash_client; diff --git a/server/src/main.rs b/server/src/main.rs index 6ee5a733..9e761e55 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -29,9 +29,7 @@ use unleash_edge::{internal_backstage, tls}; #[cfg(not(tarpaulin_include))] #[actix_web::main] async fn main() -> Result<(), anyhow::Error> { - #[cfg(feature = "streaming")] - use unleash_edge::http::broadcaster::Broadcaster; - use unleash_edge::metrics::metrics_pusher; + use unleash_edge::{http::broadcaster::Broadcaster, metrics::metrics_pusher}; let args = CliArgs::parse(); let disable_all_endpoint = args.disable_all_endpoint; @@ -81,7 +79,6 @@ async fn main() -> Result<(), anyhow::Error> { let refresher_for_app_data = feature_refresher.clone(); let prom_registry_for_write = metrics_handler.registry.clone(); - #[cfg(feature = "streaming")] let broadcaster = Broadcaster::new(features_cache.clone()); let server = HttpServer::new(move || { @@ -102,12 +99,9 @@ async fn main() -> Result<(), anyhow::Error> { .app_data(web::Data::from(metrics_cache.clone())) .app_data(web::Data::from(token_cache.clone())) .app_data(web::Data::from(features_cache.clone())) - .app_data(web::Data::from(engine_cache.clone())); + .app_data(web::Data::from(engine_cache.clone())) + .app_data(web::Data::from(broadcaster.clone())); - #[cfg(feature = "streaming")] - { - app = app.app_data(web::Data::from(broadcaster.clone())); - } app = match token_validator.clone() { Some(v) => app.app_data(web::Data::from(v)), None => app, @@ -161,15 +155,15 @@ async fn main() -> Result<(), anyhow::Error> { match schedule_args.mode { cli::EdgeMode::Edge(edge) => { - #[cfg(feature = "streaming")] - { - let refresher_for_background = feature_refresher.clone().unwrap(); + let refresher_for_background = feature_refresher.clone().unwrap(); + if edge.streaming { tokio::spawn(async move { let _ = refresher_for_background .start_streaming_features_background_task() .await; }); } + let refresher = feature_refresher.clone().unwrap(); let validator = token_validator_schedule.clone().unwrap(); diff --git a/server/src/urls.rs b/server/src/urls.rs index e2bf8623..7315a051 100644 --- a/server/src/urls.rs +++ b/server/src/urls.rs @@ -17,7 +17,6 @@ pub struct UnleashUrls { pub edge_validate_url: Url, pub edge_metrics_url: Url, pub new_api_token_url: Url, - #[cfg(feature = "streaming")] pub client_features_stream_url: Url, } @@ -51,9 +50,7 @@ impl UnleashUrls { .path_segments_mut() .unwrap() .push("features"); - #[cfg(feature = "streaming")] let mut client_features_stream_url = client_api_url.clone(); - #[cfg(feature = "streaming")] client_features_stream_url .path_segments_mut() .unwrap() @@ -109,7 +106,6 @@ impl UnleashUrls { edge_validate_url, edge_metrics_url, new_api_token_url, - #[cfg(feature = "streaming")] client_features_stream_url, } } diff --git a/server/tests/streaming_test.rs b/server/tests/streaming_test.rs index 4593b0ab..670e4db7 100644 --- a/server/tests/streaming_test.rs +++ b/server/tests/streaming_test.rs @@ -1,4 +1,3 @@ -#[cfg(feature = "streaming")] mod streaming_test { use dashmap::DashMap; use eventsource_client::Client; @@ -59,6 +58,7 @@ mod streaming_test { .arg("--upstream-url") .arg(&unleash_server.url("/")) .arg("--strict") + .arg("--streaming") .arg("-t") .arg(&upstream_known_token.token) .stdout(Stdio::null()) // Suppress stdout From a40c0a758373b029597e00a352f8e458ab6febc9 Mon Sep 17 00:00:00 2001 From: Christopher Kolstad Date: Tue, 17 Dec 2024 16:13:07 +0100 Subject: [PATCH 2/4] fix: make config wrapper --- server/src/builder.rs | 13 ++++++---- server/src/http/feature_refresher.rs | 36 +++++++++++++++++++++------- server/tests/streaming_test.rs | 2 +- 3 files changed, 37 insertions(+), 14 deletions(-) diff --git a/server/src/builder.rs b/server/src/builder.rs index bd457ee7..61c72354 100644 --- a/server/src/builder.rs +++ b/server/src/builder.rs @@ -12,6 +12,7 @@ use unleash_yggdrasil::EngineState; use crate::cli::RedisMode; use crate::feature_cache::FeatureCache; +use crate::http::feature_refresher::FeatureRefreshConfig; use crate::http::unleash_client::new_reqwest_client; use crate::offline::offline_hotload::{load_bootstrap, load_offline_engine_cache}; use crate::persistence::file::FilePersister; @@ -258,16 +259,18 @@ async fn build_edge(args: &EdgeArgs, app_name: &str) -> EdgeResult { unleash_client: unleash_client.clone(), persistence: persistence.clone(), }); - + let feature_config = FeatureRefreshConfig::new( + Duration::seconds(args.features_refresh_interval_seconds as i64), + args.streaming, + args.strict, + app_name.to_string(), + ); let feature_refresher = Arc::new(FeatureRefresher::new( unleash_client, feature_cache.clone(), engine_cache.clone(), - Duration::seconds(args.features_refresh_interval_seconds.try_into().unwrap()), persistence.clone(), - args.strict, - args.streaming, - app_name, + feature_config, )); let _ = token_validator.register_tokens(args.tokens.clone()).await; diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index 9d7abe56..79e966af 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -86,27 +86,47 @@ fn client_application_from_token_and_name( } } +pub struct FeatureRefreshConfig { + features_refresh_interval: chrono::Duration, + strict: bool, + streaming: bool, + app_name: String, +} + +impl FeatureRefreshConfig { + pub fn new( + features_refresh_interval: chrono::Duration, + strict: bool, + streaming: bool, + app_name: String, + ) -> Self { + Self { + features_refresh_interval, + strict, + streaming, + app_name, + } + } +} + impl FeatureRefresher { pub fn new( unleash_client: Arc, features_cache: Arc, engines: Arc>, - features_refresh_interval: chrono::Duration, persistence: Option>, - strict: bool, - streaming: bool, - app_name: &str, + config: FeatureRefreshConfig, ) -> Self { FeatureRefresher { unleash_client, tokens_to_refresh: Arc::new(DashMap::default()), features_cache, engine_cache: engines, - refresh_interval: features_refresh_interval, + refresh_interval: config.features_refresh_interval, persistence, - strict, - streaming, - app_name: app_name.into(), + strict: config.strict, + streaming: config.streaming, + app_name: config.app_name.into(), } } diff --git a/server/tests/streaming_test.rs b/server/tests/streaming_test.rs index 670e4db7..655ad54a 100644 --- a/server/tests/streaming_test.rs +++ b/server/tests/streaming_test.rs @@ -56,7 +56,7 @@ mod streaming_test { let mut edge = Command::new("./../target/debug/unleash-edge") .arg("edge") .arg("--upstream-url") - .arg(&unleash_server.url("/")) + .arg(unleash_server.url("/")) .arg("--strict") .arg("--streaming") .arg("-t") From 9d66bfde373ef647dafffcb8a2cda640af1e2cb5 Mon Sep 17 00:00:00 2001 From: Christopher Kolstad Date: Tue, 17 Dec 2024 21:38:50 +0100 Subject: [PATCH 3/4] Fix evaluation order --- server/src/http/feature_refresher.rs | 2 +- server/tests/streaming_test.rs | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index 79e966af..fbb4a3d0 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -126,7 +126,7 @@ impl FeatureRefresher { persistence, strict: config.strict, streaming: config.streaming, - app_name: config.app_name.into(), + app_name: config.app_name, } } diff --git a/server/tests/streaming_test.rs b/server/tests/streaming_test.rs index 655ad54a..52845503 100644 --- a/server/tests/streaming_test.rs +++ b/server/tests/streaming_test.rs @@ -93,7 +93,7 @@ mod streaming_test { let mut stream = es_client.stream(); - if let Err(_) = tokio::time::timeout(std::time::Duration::from_secs(2), async { + if tokio::time::timeout(std::time::Duration::from_secs(2), async { loop { if let Some(Ok(event)) = stream.next().await { match event { @@ -115,6 +115,7 @@ mod streaming_test { } }) .await + .is_err() { // If the test times out, kill the app process and fail the test edge.kill().expect("Failed to kill the app process"); @@ -127,7 +128,7 @@ mod streaming_test { features_from_disk("../examples/hostedexample.json"), ); - if let Err(_) = tokio::time::timeout(std::time::Duration::from_secs(2), async { + if tokio::time::timeout(std::time::Duration::from_secs(2), async { loop { if let Some(Ok(event)) = stream.next().await { match event { @@ -150,6 +151,7 @@ mod streaming_test { } }) .await + .is_err() { // If the test times out, kill the app process and fail the test edge.kill().expect("Failed to kill the app process"); From f96b70e1fc6da78a8e16f5d7e73e85146cd7e409 Mon Sep 17 00:00:00 2001 From: Thomas Heartman Date: Wed, 18 Dec 2024 10:25:51 +0100 Subject: [PATCH 4/4] Chore(1-3208): add feature refresher mode to avoid strict dynamic issues --- server/src/builder.rs | 10 +++++++--- server/src/http/feature_refresher.rs | 20 ++++++++++++-------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/server/src/builder.rs b/server/src/builder.rs index 61c72354..6209e93f 100644 --- a/server/src/builder.rs +++ b/server/src/builder.rs @@ -12,7 +12,7 @@ use unleash_yggdrasil::EngineState; use crate::cli::RedisMode; use crate::feature_cache::FeatureCache; -use crate::http::feature_refresher::FeatureRefreshConfig; +use crate::http::feature_refresher::{FeatureRefreshConfig, FeatureRefresherMode}; use crate::http::unleash_client::new_reqwest_client; use crate::offline::offline_hotload::{load_bootstrap, load_offline_engine_cache}; use crate::persistence::file::FilePersister; @@ -259,10 +259,14 @@ async fn build_edge(args: &EdgeArgs, app_name: &str) -> EdgeResult { unleash_client: unleash_client.clone(), persistence: persistence.clone(), }); + let refresher_mode = match (args.strict, args.streaming) { + (_, true) => FeatureRefresherMode::Streaming, + (true, _) => FeatureRefresherMode::Strict, + _ => FeatureRefresherMode::Dynamic, + }; let feature_config = FeatureRefreshConfig::new( Duration::seconds(args.features_refresh_interval_seconds as i64), - args.streaming, - args.strict, + refresher_mode, app_name.to_string(), ); let feature_refresher = Arc::new(FeatureRefresher::new( diff --git a/server/src/http/feature_refresher.rs b/server/src/http/feature_refresher.rs index fbb4a3d0..f169a2ed 100644 --- a/server/src/http/feature_refresher.rs +++ b/server/src/http/feature_refresher.rs @@ -86,24 +86,28 @@ fn client_application_from_token_and_name( } } +#[derive(Eq, PartialEq)] +pub enum FeatureRefresherMode { + Dynamic, + Streaming, + Strict, +} + pub struct FeatureRefreshConfig { features_refresh_interval: chrono::Duration, - strict: bool, - streaming: bool, + mode: FeatureRefresherMode, app_name: String, } impl FeatureRefreshConfig { pub fn new( features_refresh_interval: chrono::Duration, - strict: bool, - streaming: bool, + mode: FeatureRefresherMode, app_name: String, ) -> Self { Self { features_refresh_interval, - strict, - streaming, + mode, app_name, } } @@ -124,8 +128,8 @@ impl FeatureRefresher { engine_cache: engines, refresh_interval: config.features_refresh_interval, persistence, - strict: config.strict, - streaming: config.streaming, + strict: config.mode != FeatureRefresherMode::Dynamic, + streaming: config.mode == FeatureRefresherMode::Streaming, app_name: config.app_name, } }