From e5e696fb59657e8d40ec99340dfaaa5d9f599d36 Mon Sep 17 00:00:00 2001 From: "justin.weng" Date: Thu, 6 Jun 2024 14:05:52 +1000 Subject: [PATCH 1/3] #1650 Store and reuse counter --- shotover/src/transforms/redis/sink_cluster.rs | 67 ++++++++++++------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/shotover/src/transforms/redis/sink_cluster.rs b/shotover/src/transforms/redis/sink_cluster.rs index f4f2506b8..c1b664bff 100644 --- a/shotover/src/transforms/redis/sink_cluster.rs +++ b/shotover/src/transforms/redis/sink_cluster.rs @@ -19,7 +19,7 @@ use futures::stream::FuturesOrdered; use futures::stream::FuturesUnordered; use futures::{StreamExt, TryFutureExt}; use itertools::Itertools; -use metrics::counter; +use metrics::{counter, Counter}; use rand::rngs::SmallRng; use rand::seq::IteratorRandom; use rand::SeedableRng; @@ -61,14 +61,14 @@ impl TransformConfig for RedisSinkClusterConfig { RedisAuthenticator {}, self.tls.clone(), )?; - Ok(Box::new(RedisSinkClusterBuilder { - first_contact_points: self.first_contact_points.clone(), - direct_destination: self.direct_destination.clone(), - connection_count: self.connection_count.unwrap_or(1), + Ok(Box::new(RedisSinkClusterBuilder::new( + self.first_contact_points.clone(), + self.direct_destination.clone(), + self.connection_count.unwrap_or(1), connection_pool, - chain_name: transform_context.chain_name, - shared_topology: Arc::new(RwLock::new(Topology::new())), - })) + transform_context.chain_name, + Arc::new(RwLock::new(Topology::new())), + ))) } fn up_chain_protocol(&self) -> UpChainProtocol { @@ -85,8 +85,35 @@ pub struct RedisSinkClusterBuilder { direct_destination: Option, connection_count: usize, connection_pool: ConnectionPool, - chain_name: String, shared_topology: Arc>, + failed_requests: Counter, +} + +impl RedisSinkClusterBuilder { + fn new( + first_contact_points: Vec, + direct_destination: Option, + connection_count: usize, + connection_pool: ConnectionPool< + RedisCodecBuilder, + RedisAuthenticator, + UsernamePasswordToken, + >, + chain_name: String, + shared_topology: Arc>, + ) -> Self { + let failed_requests = + counter!("shotover_failed_requests_count", "chain" => chain_name, "transform" => NAME); + + RedisSinkClusterBuilder { + first_contact_points, + direct_destination, + connection_count, + connection_pool, + shared_topology, + failed_requests, + } + } } impl TransformBuilder for RedisSinkClusterBuilder { @@ -95,9 +122,9 @@ impl TransformBuilder for RedisSinkClusterBuilder { self.first_contact_points.clone(), self.direct_destination.clone(), self.connection_count, - self.chain_name.clone(), self.shared_topology.clone(), self.connection_pool.clone(), + self.failed_requests.clone(), )) } @@ -126,7 +153,6 @@ impl Topology { } pub struct RedisSinkCluster { - chain_name: String, has_run_init: bool, topology: Topology, shared_topology: Arc>, @@ -140,6 +166,7 @@ pub struct RedisSinkCluster { first_contact_points: Vec, direct_destination: Option, token: Option, + failed_requests: Counter, } impl RedisSinkCluster { @@ -147,16 +174,15 @@ impl RedisSinkCluster { first_contact_points: Vec, direct_destination: Option, connection_count: usize, - chain_name: String, shared_topology: Arc>, connection_pool: ConnectionPool< RedisCodecBuilder, RedisAuthenticator, UsernamePasswordToken, >, + failed_requests: Counter, ) -> Self { - let sink_cluster = RedisSinkCluster { - chain_name: chain_name.clone(), + RedisSinkCluster { has_run_init: false, first_contact_points, direct_destination, @@ -170,11 +196,8 @@ impl RedisSinkCluster { reason_for_no_nodes: None, rebuild_connections: true, token: None, - }; - - counter!("shotover_failed_requests_count", "chain" => chain_name, "transform" => sink_cluster.get_name()); - - sink_cluster + failed_requests, + } } async fn direct_connection(&mut self) -> Result<&UnboundedSender> { @@ -196,10 +219,6 @@ impl RedisSinkCluster { Ok(self.direct_connection.as_ref().unwrap()) } - fn get_name(&self) -> &'static str { - NAME - } - #[inline] async fn dispatch_message(&mut self, mut message: Message) -> Result { let command = match message.frame() { @@ -617,7 +636,7 @@ impl RedisSinkCluster { #[inline(always)] fn send_error_response(&self, message: &str) -> Result { - counter!("shotover_failed_requests_count", "chain" => self.chain_name.clone(), "transform" => self.get_name()).increment(1); + self.failed_requests.increment(1); short_circuit(RedisFrame::Error(message.into())) } From 242c4964b7afde49c570c16614c3b6eb766a3139 Mon Sep 17 00:00:00 2001 From: "justin.weng" Date: Tue, 11 Jun 2024 14:58:43 +1000 Subject: [PATCH 2/3] #1650 Add tests --- shotover-proxy/tests/redis_int_tests/mod.rs | 52 +++++++++++++++++++ shotover/src/transforms/redis/sink_cluster.rs | 5 +- 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/shotover-proxy/tests/redis_int_tests/mod.rs b/shotover-proxy/tests/redis_int_tests/mod.rs index 991b234f9..f5b0b9caa 100644 --- a/shotover-proxy/tests/redis_int_tests/mod.rs +++ b/shotover-proxy/tests/redis_int_tests/mod.rs @@ -12,6 +12,7 @@ use std::thread::sleep; use std::time::Duration; use test_helpers::connection::redis_connection; use test_helpers::docker_compose::docker_compose; +use test_helpers::metrics::assert_metrics_key_value; use test_helpers::shotover_process::{Count, EventMatcher, Level}; pub mod assert; @@ -321,3 +322,54 @@ async fn cluster_dr() { shotover.shutdown_and_then_consume_events(&[]).await; } + +#[tokio::test(flavor = "multi_thread")] +async fn test_failed_requests_metric_sink_single() { + let _compose = docker_compose("tests/test-configs/redis/passthrough/docker-compose.yaml"); + let shotover = shotover_process("tests/test-configs/redis/passthrough/topology.yaml") + .start() + .await; + let mut connection = redis_connection::new_async("127.0.0.1", 6379).await; + + redis::cmd("INVALID_COMMAND") + .arg("foo") + .query_async::<_, ()>(&mut connection) + .await + .unwrap_err(); + + // Redis client driver initialization sends 2 CLIENT SETINFO commands which trigger 2 errors + // because those commands are not available in the currently used redis version. + assert_metrics_key_value( + r#"shotover_failed_requests_count{chain="redis",transform="RedisSinkSingle"}"#, + "3", + ) + .await; + + shotover.shutdown_and_then_consume_events(&[]).await; +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_failed_requests_metric_sink_cluster() { + let _compose = docker_compose("tests/test-configs/redis/cluster-handling/docker-compose.yaml"); + let shotover = shotover_process("tests/test-configs/redis/cluster-handling/topology.yaml") + .start() + .await; + + let mut connection = redis_connection::new_async("127.0.0.1", 6379).await; + + redis::cmd("INVALID_COMMAND") + .arg("foo") + .query_async::<_, ()>(&mut connection) + .await + .unwrap_err(); + + // Redis client driver initialization sends 2 CLIENT SETINFO commands which trigger 2 errors + // because those commands are not available in the currently used redis version. + assert_metrics_key_value( + r#"shotover_failed_requests_count{chain="redis",transform="RedisSinkCluster"}"#, + "3", + ) + .await; + + shotover.shutdown_and_then_consume_events(&[]).await; +} diff --git a/shotover/src/transforms/redis/sink_cluster.rs b/shotover/src/transforms/redis/sink_cluster.rs index c1b664bff..df091150a 100644 --- a/shotover/src/transforms/redis/sink_cluster.rs +++ b/shotover/src/transforms/redis/sink_cluster.rs @@ -102,16 +102,13 @@ impl RedisSinkClusterBuilder { chain_name: String, shared_topology: Arc>, ) -> Self { - let failed_requests = - counter!("shotover_failed_requests_count", "chain" => chain_name, "transform" => NAME); - RedisSinkClusterBuilder { first_contact_points, direct_destination, connection_count, connection_pool, shared_topology, - failed_requests, + failed_requests: counter!("shotover_failed_requests_count", "chain" => chain_name, "transform" => NAME), } } } From c6688cd0041b8bd0c8f5f0372a10185b620df5e7 Mon Sep 17 00:00:00 2001 From: "justin.weng" Date: Tue, 11 Jun 2024 16:03:39 +1000 Subject: [PATCH 3/3] #1650 Test metric in redis passthrough --- shotover-proxy/tests/redis_int_tests/mod.rs | 32 ++------------------- 1 file changed, 3 insertions(+), 29 deletions(-) diff --git a/shotover-proxy/tests/redis_int_tests/mod.rs b/shotover-proxy/tests/redis_int_tests/mod.rs index f5b0b9caa..ac0e3575b 100644 --- a/shotover-proxy/tests/redis_int_tests/mod.rs +++ b/shotover-proxy/tests/redis_int_tests/mod.rs @@ -45,6 +45,8 @@ async fn passthrough_standard() { shotover .shutdown_and_then_consume_events(&[invalid_frame_event()]) .await; + + assert_failed_requests_metric_is_incremented_on_error_response().await; } #[tokio::test(flavor = "multi_thread")] @@ -323,9 +325,7 @@ async fn cluster_dr() { shotover.shutdown_and_then_consume_events(&[]).await; } -#[tokio::test(flavor = "multi_thread")] -async fn test_failed_requests_metric_sink_single() { - let _compose = docker_compose("tests/test-configs/redis/passthrough/docker-compose.yaml"); +pub async fn assert_failed_requests_metric_is_incremented_on_error_response() { let shotover = shotover_process("tests/test-configs/redis/passthrough/topology.yaml") .start() .await; @@ -347,29 +347,3 @@ async fn test_failed_requests_metric_sink_single() { shotover.shutdown_and_then_consume_events(&[]).await; } - -#[tokio::test(flavor = "multi_thread")] -async fn test_failed_requests_metric_sink_cluster() { - let _compose = docker_compose("tests/test-configs/redis/cluster-handling/docker-compose.yaml"); - let shotover = shotover_process("tests/test-configs/redis/cluster-handling/topology.yaml") - .start() - .await; - - let mut connection = redis_connection::new_async("127.0.0.1", 6379).await; - - redis::cmd("INVALID_COMMAND") - .arg("foo") - .query_async::<_, ()>(&mut connection) - .await - .unwrap_err(); - - // Redis client driver initialization sends 2 CLIENT SETINFO commands which trigger 2 errors - // because those commands are not available in the currently used redis version. - assert_metrics_key_value( - r#"shotover_failed_requests_count{chain="redis",transform="RedisSinkCluster"}"#, - "3", - ) - .await; - - shotover.shutdown_and_then_consume_events(&[]).await; -}