diff --git a/Cargo.lock b/Cargo.lock index 3d8a195..e6329a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -501,6 +501,12 @@ dependencies = [ "libc", ] +[[package]] +name = "crc16" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" + [[package]] name = "crc32fast" version = "1.4.2" @@ -567,10 +573,13 @@ version = "0.1.0" dependencies = [ "actix-web", "chrono", + "deadpool", + "deadpool-redis", "maxminddb", "once_cell", "phf", - "redis", + "port_check", + "redis 0.26.1", "redis-test", "regex", "serde", @@ -581,6 +590,36 @@ dependencies = [ "validator", ] +[[package]] +name = "deadpool" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6541a3916932fe57768d4be0b1ffb5ec7cbf74ca8c903fdfd5c0fe8aa958f0ed" +dependencies = [ + "deadpool-runtime", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-redis" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ed4f481f6a4770b95e09b91e183ee5ed6e1d4a34c0b09814012b3ee5e585f70" +dependencies = [ + "deadpool", + "redis 0.26.1", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" +dependencies = [ + "tokio", +] + [[package]] name = "deranged" version = "0.3.11" @@ -1374,12 +1413,31 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + [[package]] name = "num-conv" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1389,6 +1447,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "object" version = "0.36.1" @@ -1556,6 +1624,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +[[package]] +name = "port_check" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2110609fb863cdb367d4e69d6c43c81ba6a8c7d18e80082fe9f3ef16b23afeed" + [[package]] name = "powerfmt" version = "0.2.0" @@ -1697,21 +1771,39 @@ name = "redis" version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec" +dependencies = [ + "combine", + "itoa", + "percent-encoding", + "ryu", + "sha1_smol", + "socket2", + "url", +] + +[[package]] +name = "redis" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e902a69d09078829137b4a5d9d082e0490393537badd7c91a3d69d14639e115f" dependencies = [ "arc-swap", "async-trait", "bytes", "combine", + "crc16", "futures", "futures-util", "itoa", + "log", + "num-bigint", "percent-encoding", "pin-project-lite", + "rand", "ryu", "sha1_smol", "socket2", "tokio", - "tokio-retry", "tokio-util", "url", ] @@ -1722,7 +1814,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a948b3cec9e4b1fedbb0f0788e79029fb1f641b6cfefb7a15d044f803854427" dependencies = [ - "redis", + "redis 0.25.4", ] [[package]] @@ -2324,17 +2416,6 @@ dependencies = [ "syn 2.0.72", ] -[[package]] -name = "tokio-retry" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" -dependencies = [ - "pin-project", - "rand", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.25.0" diff --git a/Cargo.toml b/Cargo.toml index 3e3433b..25f436f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,11 +12,23 @@ actix-web = "4" serde_with = "3.9.0" regex = "1.10.5" once_cell = "1.19.0" -redis = { version = "0.25.4", features = ["tokio-comp", "aio", "connection-manager"] } redis-test = "0.4.0" tokio = "1.39.1" chrono = "0.4.38" maxminddb = "0.17" +deadpool-redis = { version = "0.16", features = ["cluster"] } +# Must be the same version that deadpool-redis uses (https://github.com/bikeshedder/deadpool/blob/master/redis/Cargo.toml) +redis = { version = "0.26", features = [ + "tokio-comp", + "aio", + "cluster", + "cluster-async", +] } +# Must be the same version that deadpool-redis uses +deadpool = { version = "0.12.0", default-features = false, features = [ + "managed", +] } [dev-dependencies] testcontainers = "0.20.1" +port_check = "0.2.1" diff --git a/src/chart_updater.rs b/src/chart_updater.rs index 5832852..5d36ba6 100644 --- a/src/chart_updater.rs +++ b/src/chart_updater.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +use redis::AsyncCommands; + use crate::{ charts::{ advanced_pie::AdvancedPie, @@ -14,12 +16,14 @@ use crate::{ submit_data_schema::SubmitDataChartSchema, }; -pub fn update_chart( +pub async fn update_chart( chart: &Chart, data: &SubmitDataChartSchema, tms2000: i64, country_iso: Option<&str>, + // Used where possible (currently not possible for line charts) pipeline: &mut redis::Pipeline, + con: &mut C, ) -> Result<(), serde_json::Error> { match chart.r#type { ChartType::SingleLineChart => { @@ -36,7 +40,7 @@ pub fn update_chart( if should_block { return Ok(()); } - update_line_chart_data(chart.id, tms2000, "1", data.value, pipeline); + update_line_chart_data(chart.id, tms2000, "1", data.value, con).await; } ChartType::SimplePie => { let data: SimplePie = serde_json::from_value(data.data.clone())?; @@ -115,7 +119,7 @@ pub fn update_pie_data( value: u16, pipeline: &mut redis::Pipeline, ) { - let key = format!("data:{}.{}.{}", service_id, chart_id, tms2000); + let key = format!("data:{{{}}}.{}.{}", service_id, chart_id, tms2000); pipeline.zincr(&key, value_name, value); pipeline.expire(&key, 60 * 61); } @@ -132,15 +136,22 @@ pub fn update_map_data( update_pie_data(service_id, chart_id, tms2000, value_name, value, pipeline); } -pub fn update_line_chart_data( +pub async fn update_line_chart_data( chart_id: u64, tms2000: i64, line: &str, value: i16, - pipeline: &mut redis::Pipeline, + con: &mut C, ) { - let key = format!("data:{}.{}", chart_id, line); - pipeline.hincr(key, tms2000_to_timestamp(tms2000), value); + let key = format!("data:{{{}}}.{}", chart_id, line); + match con.hincr(key, tms2000_to_timestamp(tms2000), value).await { + Ok(()) => (), + Err(e) => { + // TODO Proper logging framework + eprintln!("Failed to update line chart data: {}", e); + () + } + } } pub fn update_drilldown_pie_data( @@ -155,13 +166,13 @@ pub fn update_drilldown_pie_data( for (value_key, value) in values.iter() { total_value += value; let key = format!( - "data:{}.{}.{}.{}", + "data:{{{}}}.{}.{}.{}", service_id, chart_id, tms2000, value_name ); pipeline.zincr(&key, value_key, value); pipeline.expire(&key, 60 * 61); } - let key = format!("data:{}.{}.{}", service_id, chart_id, tms2000); + let key = format!("data:{{{}}}.{}.{}", service_id, chart_id, tms2000); pipeline.zincr(&key, value_name, total_value); pipeline.expire(&key, 60 * 61); } diff --git a/src/charts.rs b/src/charts.rs index ac64f60..769fe92 100644 --- a/src/charts.rs +++ b/src/charts.rs @@ -24,65 +24,58 @@ pub struct Chart { pub service_id: u32, } -static CHART_FIELDS: (&str, &str, &str, &str, &str, &str, &str) = ( - "id", "type", "position", "title", "default", "data", "pluginId", -); - /// Find all charts with the given IDs. -/// -/// Using this function is more efficient than calling `find_by_id` multiple times. pub async fn find_by_ids( con: &mut C, ids: Vec, ) -> Result>, redis::RedisError> { - let mut pipeline = redis::pipe(); - for id in &ids { - pipeline.hget(format!("charts:{}", id), CHART_FIELDS); - } - let charts: Vec<[Option; 7]> = pipeline.query_async(con).await.unwrap(); - - let mut result: HashMap> = HashMap::new(); - for (i, values) in charts.iter().enumerate() { - let id = ids[i]; - - fn map_strings(id: u64, values: &[Option; 7]) -> Option { - Some(Chart { - id, - id_custom: values[0].as_ref()?.to_string(), - r#type: match serde_json::from_str(&format!("\"{}\"", values[1].as_ref()?)) { - Ok(t) => t, - // TODO Log warning - Err(_) => return None, - }, - position: match values[2].as_ref()?.parse() { - Ok(p) => p, - Err(_) => 0, - }, - title: values[3].as_ref()?.to_string(), - default: values[4].as_ref().unwrap_or(&String::from("0")) == "1", - data: match serde_json::from_str(&values[5].as_ref()?) { - Ok(d) => d, - Err(_) => Value::Null, - }, - service_id: values[6] - .as_ref()? - .parse() - .expect("Chart with non-numeric 'pluginId'"), - }) - } - - let chart = map_strings(id, values); - result.insert(id, chart); + // TODO: Move all charts from a single service in a single hash and use pipelining + let mut response = HashMap::new(); + for id in ids { + response.insert(id, find_by_id(con, id).await?); } - - Ok(result) + Ok(response) } pub async fn find_by_id( con: &mut C, id: u64, ) -> Result, redis::RedisError> { - find_by_ids(con, vec![id]) - .await - .map(|mut m| m.remove(&id).unwrap()) + let map: HashMap = con.hgetall(format!("charts:{}", id)).await?; + + if map.is_empty() { + return Ok(None); + } + + Ok(Some(Chart { + id, + id_custom: map + .get("id") + .expect("Chart without 'id_custom'") + .to_string(), + r#type: match serde_json::from_str(&format!( + "\"{}\"", + map.get("type").expect("Chart without 'type'") + )) { + Ok(t) => t, + // TODO Log warning + Err(_) => return Ok(None), + }, + position: map + .get("position") + .expect("Chart without 'position'") + .parse() + .expect("Chart with non-numeric or to small/large 'position"), + title: map.get("title").expect("Chart without 'title'").to_string(), + default: map.get("default").unwrap_or(&String::from("0")) == "1", + data: match serde_json::from_str(map.get("data").expect("Chart without 'data'")) { + Ok(d) => d, + Err(_) => Value::Null, + }, + service_id: map + .get("pluginId") + .expect("Chart without 'pluginId'") + .parse() + .expect("Chart with non-numeric 'pluginId"), + })) } diff --git a/src/data_submission.rs b/src/data_submission.rs index fe252e6..882e821 100644 --- a/src/data_submission.rs +++ b/src/data_submission.rs @@ -13,13 +13,13 @@ use crate::submit_data_schema::SubmitDataSchema; use crate::submit_data_schema::SubmitDataServiceSchema; use crate::util::geo_ip; use crate::util::ip_parser; +use crate::util::redis::RedisClusterPool; use actix_web::{error, web, HttpRequest, Responder}; use once_cell::sync::Lazy; pub async fn handle_data_submission( - con: &mut redis::aio::ConnectionManager, request: &HttpRequest, - redis: &web::Data, + redis_pool: &web::Data, software_url: &str, data: &SubmitDataSchema, is_global_service: bool, @@ -29,7 +29,12 @@ pub async fn handle_data_submission( return Ok(""); } - let software = match software::find_by_url(con, software_url).await { + let mut con = match redis_pool.get().await { + Ok(con) => con, + Err(e) => return Err(error::ErrorInternalServerError(e)), + }; + + let software = match software::find_by_url(&mut con, software_url).await { Ok(None) => return Err(error::ErrorNotFound("Software not found")), Err(e) => return Err(error::ErrorInternalServerError(e)), Ok(Some(s)) => s, @@ -40,7 +45,7 @@ pub async fn handle_data_submission( let ip = ip_parser::get_ip(&request)?; let ratelimit = is_ratelimited( - con, + &mut con, software_url, software.max_requests_per_ip, &data.server_uuid, @@ -59,7 +64,7 @@ pub async fn handle_data_submission( // this only happens once per server. if !is_global_service && software.global_plugin.is_some() { let global_plugin = software.global_plugin.unwrap(); - let global_plugin = service::find_by_id(con, global_plugin).await; + let global_plugin = service::find_by_id(&mut con, global_plugin).await; let global_plugin = match global_plugin { Ok(o) => o, Err(e) => return Err(error::ErrorInternalServerError(e)), @@ -67,9 +72,8 @@ pub async fn handle_data_submission( if let Some(global_plugin) = global_plugin { let result = Box::pin(handle_data_submission( - con, request, - redis, + redis_pool, software_url, &SubmitDataSchema { server_uuid: data.server_uuid.clone(), @@ -98,7 +102,7 @@ pub async fn handle_data_submission( } } - let service = match service::find_by_id(con, data.service.id).await { + let service = match service::find_by_id(&mut con, data.service.id).await { Ok(None) => return Err(error::ErrorNotFound("Service not found")), Err(e) => return Err(error::ErrorInternalServerError(e)), Ok(Some(s)) => s, @@ -138,7 +142,7 @@ pub async fn handle_data_submission( let chart_data = default_charts.iter().chain(custom_charts.iter()); let resolved_charts: std::collections::HashMap> = - charts::find_by_ids(con, service.charts).await.unwrap(); + charts::find_by_ids(&mut con, service.charts).await.unwrap(); let mut pipeline = redis::pipe(); @@ -163,11 +167,13 @@ pub async fn handle_data_submission( tms2000, country_iso.as_deref(), &mut pipeline, - ); + &mut con, + ) + .await; } pipeline - .query_async(con) + .query_async(&mut con) .await .map_err(error::ErrorInternalServerError)?; diff --git a/src/legacy_data_submission.rs b/src/legacy_data_submission.rs index cda224d..7d4e24b 100644 --- a/src/legacy_data_submission.rs +++ b/src/legacy_data_submission.rs @@ -11,6 +11,7 @@ use crate::{ data_submission::handle_data_submission, service, submit_data_schema::{SubmitDataChartSchema, SubmitDataSchema, SubmitDataServiceSchema}, + util::redis::RedisClusterPool, }; #[skip_serializing_none] @@ -47,12 +48,16 @@ pub struct LegacySubmitDataServiceSchema { } pub async fn handle_legacy_data_submission( - con: &mut redis::aio::ConnectionManager, request: &HttpRequest, - redis: &web::Data, + redis_pool: &web::Data, software_url: &str, data: LegacySubmitDataSchema, ) -> actix_web::Result { + let mut con = match redis_pool.get().await { + Ok(con) => con, + Err(e) => return Err(error::ErrorInternalServerError(e)), + }; + for plugin in data.plugins { let plugin_id = match plugin.id { Some(id) => id, @@ -63,7 +68,8 @@ pub async fn handle_legacy_data_submission( None => continue, }; - match service::find_by_software_url_and_name(con, software_url, &plugin_name).await + match service::find_by_software_url_and_name(&mut con, software_url, &plugin_name) + .await { Ok(None) => continue, Ok(Some(plugin)) => plugin.id, @@ -73,9 +79,8 @@ pub async fn handle_legacy_data_submission( }; let _ = handle_data_submission( - con, &request, - &redis, + &redis_pool, software_url, &SubmitDataSchema { server_uuid: data.server_uuid.clone(), diff --git a/src/lib.rs b/src/lib.rs index aedb542..5d9e63a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,26 +10,21 @@ pub mod software; pub mod submit_data_schema; pub mod util; -use actix_web::{error, post, web, HttpRequest, Responder}; +use actix_web::{post, web, HttpRequest, Responder}; use legacy_data_submission::LegacySubmitDataSchema; use submit_data_schema::SubmitDataSchema; +use util::redis::RedisClusterPool; #[post("/{software_url}")] async fn submit_data( request: HttpRequest, - redis: web::Data, + redis_pool: web::Data, software_url: web::Path, data: web::Json, ) -> actix_web::Result { - let mut con: redis::aio::ConnectionManager = redis - .get_connection_manager() - .await - .map_err(error::ErrorInternalServerError)?; - data_submission::handle_data_submission( - &mut con, &request, - &redis, + &redis_pool, software_url.as_str(), &data.0, false, @@ -40,19 +35,13 @@ async fn submit_data( #[post("/legacy/{software_url}")] async fn legacy_submit_data( request: HttpRequest, - redis: web::Data, + redis_pool: web::Data, software_url: web::Path, data: web::Json, ) -> actix_web::Result { - let mut con: redis::aio::ConnectionManager = redis - .get_connection_manager() - .await - .map_err(error::ErrorInternalServerError)?; - legacy_data_submission::handle_legacy_data_submission( - &mut con, &request, - &redis, + &redis_pool, software_url.as_str(), data.0, ) diff --git a/src/main.rs b/src/main.rs index f929881..64028c4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,26 +1,19 @@ use actix_web::{web, App, HttpServer}; -use data_processor::{legacy_submit_data, submit_data}; +use data_processor::{legacy_submit_data, submit_data, util::redis::get_redis_cluster_pool}; #[actix_web::main] async fn main() -> std::io::Result<()> { - let redis_url = match std::env::var("REDIS_URL") { - Ok(url) => url, - Err(_) => { - eprintln!("Please set the REDIS_URL environment variable"); - std::process::exit(1); - } - }; - let redis = redis::Client::open(redis_url).unwrap(); - let host = std::env::var("HOST").unwrap_or_else(|_| "0.0.0.0".to_string()); let port: u16 = std::env::var("PORT") .unwrap_or_else(|_| "8080".to_string()) .parse() .unwrap(); + let pool = get_redis_cluster_pool().await; + HttpServer::new(move || { App::new() - .app_data(web::Data::new(redis.clone())) + .app_data(web::Data::new(pool.clone())) .service(submit_data) .service(legacy_submit_data) }) diff --git a/src/util.rs b/src/util.rs index 9386589..5c5876c 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,2 +1,3 @@ pub mod geo_ip; pub mod ip_parser; +pub mod redis; diff --git a/src/util/redis.rs b/src/util/redis.rs new file mode 100644 index 0000000..224674d --- /dev/null +++ b/src/util/redis.rs @@ -0,0 +1,17 @@ +use deadpool::managed::Pool; +use deadpool_redis::cluster::{Config, Connection, Manager, Runtime}; +use std::env; + +pub type RedisClusterPool = Pool; + +pub async fn get_redis_cluster_pool() -> RedisClusterPool { + let redis_urls = env::var("REDIS_CLUSTER__URLS") + .expect("REDIS_CLUSTER__URLS is not set") + .split(',') + .map(String::from) + .collect::>(); + let cfg = Config::from_urls(redis_urls); + let pool = cfg.create_pool(Some(Runtime::Tokio1)).unwrap(); + + return pool; +} diff --git a/tests/integration_tests/helper/redis_testcontainer.rs b/tests/integration_tests/helper/redis_testcontainer.rs index 3e9fade..044f303 100644 --- a/tests/integration_tests/helper/redis_testcontainer.rs +++ b/tests/integration_tests/helper/redis_testcontainer.rs @@ -1,39 +1,79 @@ +use data_processor::util::redis::{get_redis_cluster_pool, RedisClusterPool}; +use port_check::*; use testcontainers::{ core::{IntoContainerPort, WaitFor}, runners::AsyncRunner, - ContainerAsync, GenericImage, + ContainerAsync, GenericImage, ImageExt, }; pub struct RedisTestcontainer { - client: redis::Client, + pool: RedisClusterPool, // Bind the container to the struct to keep it alive _container: ContainerAsync, } impl RedisTestcontainer { pub async fn new() -> Self { - let container = GenericImage::new("redis", "7.2.4") - .with_exposed_port(6379.tcp()) + fn find_3_consecutive_free_ports() -> (u16, u16, u16) { + let mut port = 9000; + while !is_local_ipv4_port_free(port) + || !is_local_ipv4_port_free(port + 1) + || !is_local_ipv4_port_free(port + 2) + { + port += 1; + } + (port, port + 1, port + 2) + } + + // Normally, with devcontainers, you would just call .with_exposed_port() + // and the container would bind to a random free port on the host machine. + // However, we need the ports to be same on the host machine and in the container + // or Redis cluster will announce the wrong ports. This could be fixed with + // Redis' `cluster-announce-port` setting, but this is not supported by the + // grokzen/redis-cluster image. And even then, we would need to know the ports + // in advance to set the setting. + // + // Unfortunately, this also means that tests can't be run in parallel, since + // they would all try to bind to the same ports. + // + // See https://github.com/Grokzen/docker-redis-cluster/issues/88#issuecomment-973172199 + // for a similar issue. + // + // TODO: Find a better solution. Maybe use another Redis image. + let (port1, port2, port3) = find_3_consecutive_free_ports(); + + let container = GenericImage::new("grokzen/redis-cluster", "7.0.7") .with_wait_for(WaitFor::message_on_stdout("Ready to accept connections")) + .with_mapped_port(port1, port1.tcp()) + .with_mapped_port(port2, port2.tcp()) + .with_mapped_port(port3, port3.tcp()) + .with_env_var("MASTERS", "3") + .with_env_var("SLAVES_PER_MASTER", "0") + .with_env_var("INITIAL_PORT", port1.to_string()) + .with_env_var("IP", "0.0.0.0") .start() .await - .unwrap(); + .expect("Failed to start Redis container"); let redis_addr = format!( "redis://{}:{}/", container.get_host().await.unwrap(), - container.get_host_port_ipv4(6379).await.unwrap() + container.get_host_port_ipv4(port1).await.unwrap() ); - let client = redis::Client::open(redis_addr).unwrap(); + std::env::set_var("REDIS_CLUSTER__URLS", &redis_addr); + + println!("Redis container started at {}", &redis_addr); + + let pool = get_redis_cluster_pool().await; Self { - client, + pool, _container: container, } } - pub fn client(&self) -> &redis::Client { - &self.client + pub fn pool(&self) -> &RedisClusterPool { + &self.pool } } diff --git a/tests/integration_tests/helper/test_environment.rs b/tests/integration_tests/helper/test_environment.rs index 387d365..7f64080 100644 --- a/tests/integration_tests/helper/test_environment.rs +++ b/tests/integration_tests/helper/test_environment.rs @@ -5,7 +5,9 @@ use data_processor::{ }, service::Service, software::Software, + util::redis::RedisClusterPool, }; +use deadpool_redis::cluster::Connection; use redis::AsyncCommands; use serde_json::json; @@ -45,7 +47,7 @@ impl TestEnvironment { } pub async fn add_software(&mut self, software: Software) { - let mut con: redis::aio::MultiplexedConnection = self.redis_multiplexed_connection().await; + let mut con = self.redis_connection().await; let _: () = con.sadd("software.ids", software.id).await.unwrap(); let _: () = con @@ -91,7 +93,7 @@ impl TestEnvironment { } pub async fn add_service(&mut self, service: Service) { - let mut con: redis::aio::MultiplexedConnection = self.redis_multiplexed_connection().await; + let mut con = self.redis_connection().await; let software = self .software @@ -138,7 +140,7 @@ impl TestEnvironment { } pub async fn add_chart(&mut self, chart: Chart) { - let mut con: redis::aio::MultiplexedConnection = self.redis_multiplexed_connection().await; + let mut con = self.redis_connection().await; let _: () = con.sadd("charts.uids", chart.id).await.unwrap(); let _: () = con @@ -186,8 +188,8 @@ impl TestEnvironment { self.charts.push(cloned_chart); } - pub fn redis_client(&self) -> &redis::Client { - &self.redis_testcontainer.client() + pub fn redis_pool(&self) -> &RedisClusterPool { + &self.redis_testcontainer.pool() } pub fn software(&self) -> &Vec { @@ -202,11 +204,8 @@ impl TestEnvironment { &self.charts } - pub async fn redis_multiplexed_connection(&self) -> redis::aio::MultiplexedConnection { - self.redis_client() - .get_multiplexed_tokio_connection() - .await - .unwrap() + pub async fn redis_connection(&self) -> Connection { + self.redis_pool().get().await.unwrap() } } diff --git a/tests/integration_tests/test_charts.rs b/tests/integration_tests/test_charts.rs index c4c28f4..eeb4ac9 100644 --- a/tests/integration_tests/test_charts.rs +++ b/tests/integration_tests/test_charts.rs @@ -4,7 +4,7 @@ use data_processor::charts; #[tokio::test] async fn test_find_by_id() { let test_environment = TestEnvironment::with_data().await; - let mut con = test_environment.redis_multiplexed_connection().await; + let mut con = test_environment.redis_connection().await; let chart = charts::find_by_id(&mut con, 1).await; assert_eq!(chart.unwrap().unwrap().id_custom, "servers"); @@ -13,9 +13,10 @@ async fn test_find_by_id() { #[tokio::test] async fn test_find_by_ids() { let test_environment = TestEnvironment::with_data().await; - let mut con = test_environment.redis_multiplexed_connection().await; + let mut con = test_environment.redis_connection().await; - let charts: std::collections::HashMap> = charts::find_by_ids(&mut con, vec![1, 2]).await.unwrap(); + let charts: std::collections::HashMap> = + charts::find_by_ids(&mut con, vec![1, 2]).await.unwrap(); assert_eq!( charts.get(&1).unwrap().as_ref().unwrap().id_custom.clone(), diff --git a/tests/integration_tests/test_ratelimits.rs b/tests/integration_tests/test_ratelimits.rs index 0e00dcf..150a715 100644 --- a/tests/integration_tests/test_ratelimits.rs +++ b/tests/integration_tests/test_ratelimits.rs @@ -6,7 +6,7 @@ use crate::helper::test_environment::TestEnvironment; async fn test_check_ratelimits() { let test_environment = TestEnvironment::empty().await; - let mut con = test_environment.redis_multiplexed_connection().await; + let mut con = test_environment.redis_connection().await; let software_url = "bukkit"; let max_requests_per_ip = 3; diff --git a/tests/integration_tests/test_service.rs b/tests/integration_tests/test_service.rs index e3e4bad..8c07008 100644 --- a/tests/integration_tests/test_service.rs +++ b/tests/integration_tests/test_service.rs @@ -5,7 +5,7 @@ use data_processor::service::{find_all, find_by_software_url_and_name, Service}; async fn test_find_all() { let test_environment = TestEnvironment::with_data().await; - let mut con = test_environment.redis_multiplexed_connection().await; + let mut con = test_environment.redis_connection().await; let services: Vec = find_all(&mut con).await.unwrap(); assert_eq!(services.len(), test_environment.services().len()); @@ -14,7 +14,7 @@ async fn test_find_all() { // In an empty environment, no data should be returned let empty_test_environment: TestEnvironment = TestEnvironment::empty().await; - let mut con = empty_test_environment.redis_multiplexed_connection().await; + let mut con = empty_test_environment.redis_connection().await; let services: Vec = find_all(&mut con).await.unwrap(); assert_eq!(services.len(), 0); @@ -23,8 +23,7 @@ async fn test_find_all() { #[tokio::test] async fn test_find_by_software_url_and_name() { let test_environment = TestEnvironment::with_data().await; - let mut con: redis::aio::MultiplexedConnection = - test_environment.redis_multiplexed_connection().await; + let mut con = test_environment.redis_connection().await; let service = find_by_software_url_and_name(&mut con, "bukkit", "_bukkit_") .await diff --git a/tests/integration_tests/test_software.rs b/tests/integration_tests/test_software.rs index 1760384..e824284 100644 --- a/tests/integration_tests/test_software.rs +++ b/tests/integration_tests/test_software.rs @@ -5,7 +5,7 @@ use data_processor::software::find_all; async fn test_find_all() { let test_environment = TestEnvironment::with_data().await; - let mut con = test_environment.redis_multiplexed_connection().await; + let mut con = test_environment.redis_connection().await; let software: Vec = find_all(&mut con).await.unwrap(); assert_eq!(software.len(), test_environment.software().len()); @@ -18,7 +18,7 @@ async fn test_find_all() { // In an empty environment, no data should be returned let empty_test_environment = TestEnvironment::empty().await; - let mut con = empty_test_environment.redis_multiplexed_connection().await; + let mut con = empty_test_environment.redis_connection().await; let software: Vec = find_all(&mut con).await.unwrap(); assert_eq!(software.len(), 0); @@ -27,7 +27,7 @@ async fn test_find_all() { #[tokio::test] async fn test_find_by_url() { let test_environment = TestEnvironment::with_data().await; - let mut con = test_environment.redis_multiplexed_connection().await; + let mut con = test_environment.redis_connection().await; let software = data_processor::software::find_by_url(&mut con, "bukkit") .await diff --git a/tests/integration_tests/test_submit_data.rs b/tests/integration_tests/test_submit_data.rs index edbe1de..e19ee8d 100644 --- a/tests/integration_tests/test_submit_data.rs +++ b/tests/integration_tests/test_submit_data.rs @@ -10,11 +10,11 @@ use crate::helper::test_environment::TestEnvironment; async fn test_submit_data() { let test_environment = TestEnvironment::with_data().await; - let redis = test_environment.redis_client(); + let redis_pool = test_environment.redis_pool(); let app = test::init_service( App::new() - .app_data(web::Data::new(redis.clone())) + .app_data(web::Data::new(redis_pool.clone())) .service(submit_data), ) .await;