Skip to content

Commit

Permalink
Switch to Redis Cluster with connection pooling
Browse files Browse the repository at this point in the history
  • Loading branch information
Bastian Oppermann committed Aug 11, 2024
1 parent c23e7e0 commit 73b677f
Show file tree
Hide file tree
Showing 17 changed files with 297 additions and 150 deletions.
109 changes: 95 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,23 @@ actix-web = "4"
serde_with = "3.9.0"
regex = "1.10.5"
once_cell = "1.19.0"
redis = { version = "0.25.4", features = ["tokio-comp", "aio", "connection-manager"] }
redis-test = "0.4.0"
tokio = "1.39.1"
chrono = "0.4.38"
maxminddb = "0.17"
deadpool-redis = { version = "0.16", features = ["cluster"] }
# Must be the same version that deadpool-redis uses (https://github.com/bikeshedder/deadpool/blob/master/redis/Cargo.toml)
redis = { version = "0.26", features = [
"tokio-comp",
"aio",
"cluster",
"cluster-async",
] }
# Must be the same version that deadpool-redis uses
deadpool = { version = "0.12.0", default-features = false, features = [
"managed",
] }

[dev-dependencies]
testcontainers = "0.20.1"
port_check = "0.2.1"
29 changes: 20 additions & 9 deletions src/chart_updater.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::HashMap;

use redis::AsyncCommands;

use crate::{
charts::{
advanced_pie::AdvancedPie,
Expand All @@ -14,12 +16,14 @@ use crate::{
submit_data_schema::SubmitDataChartSchema,
};

pub fn update_chart(
pub async fn update_chart<C: AsyncCommands>(
chart: &Chart,
data: &SubmitDataChartSchema,
tms2000: i64,
country_iso: Option<&str>,
// Used where possible (currently not possible for line charts)
pipeline: &mut redis::Pipeline,
con: &mut C,
) -> Result<(), serde_json::Error> {
match chart.r#type {
ChartType::SingleLineChart => {
Expand All @@ -36,7 +40,7 @@ pub fn update_chart(
if should_block {
return Ok(());
}
update_line_chart_data(chart.id, tms2000, "1", data.value, pipeline);
update_line_chart_data(chart.id, tms2000, "1", data.value, con).await;
}
ChartType::SimplePie => {
let data: SimplePie = serde_json::from_value(data.data.clone())?;
Expand Down Expand Up @@ -115,7 +119,7 @@ pub fn update_pie_data(
value: u16,
pipeline: &mut redis::Pipeline,
) {
let key = format!("data:{}.{}.{}", service_id, chart_id, tms2000);
let key = format!("data:{{{}}}.{}.{}", service_id, chart_id, tms2000);
pipeline.zincr(&key, value_name, value);
pipeline.expire(&key, 60 * 61);
}
Expand All @@ -132,15 +136,22 @@ pub fn update_map_data(
update_pie_data(service_id, chart_id, tms2000, value_name, value, pipeline);
}

pub fn update_line_chart_data(
pub async fn update_line_chart_data<C: AsyncCommands>(
chart_id: u64,
tms2000: i64,
line: &str,
value: i16,
pipeline: &mut redis::Pipeline,
con: &mut C,
) {
let key = format!("data:{}.{}", chart_id, line);
pipeline.hincr(key, tms2000_to_timestamp(tms2000), value);
let key = format!("data:{{{}}}.{}", chart_id, line);
match con.hincr(key, tms2000_to_timestamp(tms2000), value).await {
Ok(()) => (),
Err(e) => {
// TODO Proper logging framework
eprintln!("Failed to update line chart data: {}", e);
()
}
}
}

pub fn update_drilldown_pie_data(
Expand All @@ -155,13 +166,13 @@ pub fn update_drilldown_pie_data(
for (value_key, value) in values.iter() {
total_value += value;
let key = format!(
"data:{}.{}.{}.{}",
"data:{{{}}}.{}.{}.{}",
service_id, chart_id, tms2000, value_name
);
pipeline.zincr(&key, value_key, value);
pipeline.expire(&key, 60 * 61);
}
let key = format!("data:{}.{}.{}", service_id, chart_id, tms2000);
let key = format!("data:{{{}}}.{}.{}", service_id, chart_id, tms2000);
pipeline.zincr(&key, value_name, total_value);
pipeline.expire(&key, 60 * 61);
}
Loading

0 comments on commit 73b677f

Please sign in to comment.