Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1650 RedisSinkCluster - Avoid recreating Counter #1658

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions shotover-proxy/tests/redis_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::thread::sleep;
use std::time::Duration;
use test_helpers::connection::redis_connection;
use test_helpers::docker_compose::docker_compose;
use test_helpers::metrics::assert_metrics_key_value;
use test_helpers::shotover_process::{Count, EventMatcher, Level};

pub mod assert;
Expand Down Expand Up @@ -44,6 +45,8 @@ async fn passthrough_standard() {
shotover
.shutdown_and_then_consume_events(&[invalid_frame_event()])
.await;

assert_failed_requests_metric_is_incremented_on_error_response().await;
}

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -321,3 +324,26 @@ async fn cluster_dr() {

shotover.shutdown_and_then_consume_events(&[]).await;
}

pub async fn assert_failed_requests_metric_is_incremented_on_error_response() {
let shotover = shotover_process("tests/test-configs/redis/passthrough/topology.yaml")
.start()
.await;
let mut connection = redis_connection::new_async("127.0.0.1", 6379).await;

redis::cmd("INVALID_COMMAND")
.arg("foo")
.query_async::<_, ()>(&mut connection)
.await
.unwrap_err();

// Redis client driver initialization sends 2 CLIENT SETINFO commands which trigger 2 errors
// because those commands are not available in the currently used redis version.
assert_metrics_key_value(
r#"shotover_failed_requests_count{chain="redis",transform="RedisSinkSingle"}"#,
"3",
)
.await;

shotover.shutdown_and_then_consume_events(&[]).await;
}
64 changes: 40 additions & 24 deletions shotover/src/transforms/redis/sink_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::stream::FuturesOrdered;
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryFutureExt};
use itertools::Itertools;
use metrics::counter;
use metrics::{counter, Counter};
use rand::rngs::SmallRng;
use rand::seq::IteratorRandom;
use rand::SeedableRng;
Expand Down Expand Up @@ -61,14 +61,14 @@ impl TransformConfig for RedisSinkClusterConfig {
RedisAuthenticator {},
self.tls.clone(),
)?;
Ok(Box::new(RedisSinkClusterBuilder {
first_contact_points: self.first_contact_points.clone(),
direct_destination: self.direct_destination.clone(),
connection_count: self.connection_count.unwrap_or(1),
Ok(Box::new(RedisSinkClusterBuilder::new(
self.first_contact_points.clone(),
self.direct_destination.clone(),
self.connection_count.unwrap_or(1),
connection_pool,
chain_name: transform_context.chain_name,
shared_topology: Arc::new(RwLock::new(Topology::new())),
}))
transform_context.chain_name,
Arc::new(RwLock::new(Topology::new())),
)))
}

fn up_chain_protocol(&self) -> UpChainProtocol {
Expand All @@ -85,8 +85,32 @@ pub struct RedisSinkClusterBuilder {
direct_destination: Option<String>,
connection_count: usize,
connection_pool: ConnectionPool<RedisCodecBuilder, RedisAuthenticator, UsernamePasswordToken>,
chain_name: String,
shared_topology: Arc<RwLock<Topology>>,
failed_requests: Counter,
}

impl RedisSinkClusterBuilder {
fn new(
first_contact_points: Vec<String>,
direct_destination: Option<String>,
connection_count: usize,
connection_pool: ConnectionPool<
RedisCodecBuilder,
RedisAuthenticator,
UsernamePasswordToken,
>,
chain_name: String,
shared_topology: Arc<RwLock<Topology>>,
) -> Self {
RedisSinkClusterBuilder {
first_contact_points,
direct_destination,
connection_count,
connection_pool,
shared_topology,
failed_requests: counter!("shotover_failed_requests_count", "chain" => chain_name, "transform" => NAME),
}
}
}

impl TransformBuilder for RedisSinkClusterBuilder {
Expand All @@ -95,9 +119,9 @@ impl TransformBuilder for RedisSinkClusterBuilder {
self.first_contact_points.clone(),
self.direct_destination.clone(),
self.connection_count,
self.chain_name.clone(),
self.shared_topology.clone(),
self.connection_pool.clone(),
self.failed_requests.clone(),
))
}

Expand Down Expand Up @@ -126,7 +150,6 @@ impl Topology {
}

pub struct RedisSinkCluster {
chain_name: String,
has_run_init: bool,
topology: Topology,
shared_topology: Arc<RwLock<Topology>>,
Expand All @@ -140,23 +163,23 @@ pub struct RedisSinkCluster {
first_contact_points: Vec<String>,
direct_destination: Option<String>,
token: Option<UsernamePasswordToken>,
failed_requests: Counter,
}

impl RedisSinkCluster {
fn new(
first_contact_points: Vec<String>,
direct_destination: Option<String>,
connection_count: usize,
chain_name: String,
shared_topology: Arc<RwLock<Topology>>,
connection_pool: ConnectionPool<
RedisCodecBuilder,
RedisAuthenticator,
UsernamePasswordToken,
>,
failed_requests: Counter,
) -> Self {
let sink_cluster = RedisSinkCluster {
chain_name: chain_name.clone(),
RedisSinkCluster {
has_run_init: false,
first_contact_points,
direct_destination,
Expand All @@ -170,11 +193,8 @@ impl RedisSinkCluster {
reason_for_no_nodes: None,
rebuild_connections: true,
token: None,
};

counter!("shotover_failed_requests_count", "chain" => chain_name, "transform" => sink_cluster.get_name());

sink_cluster
failed_requests,
}
}

async fn direct_connection(&mut self) -> Result<&UnboundedSender<Request>> {
Expand All @@ -196,10 +216,6 @@ impl RedisSinkCluster {
Ok(self.direct_connection.as_ref().unwrap())
}

fn get_name(&self) -> &'static str {
NAME
}

#[inline]
async fn dispatch_message(&mut self, mut message: Message) -> Result<ResponseFuture> {
let command = match message.frame() {
Expand Down Expand Up @@ -617,7 +633,7 @@ impl RedisSinkCluster {

#[inline(always)]
fn send_error_response(&self, message: &str) -> Result<ResponseFuture> {
counter!("shotover_failed_requests_count", "chain" => self.chain_name.clone(), "transform" => self.get_name()).increment(1);
self.failed_requests.increment(1);
short_circuit(RedisFrame::Error(message.into()))
}

Expand Down