Skip to content

Commit

Permalink
shotover#1650 Store and reuse counter
Browse files Browse the repository at this point in the history
  • Loading branch information
justinweng-instaclustr committed Jun 6, 2024
1 parent d9e441f commit e5e696f
Showing 1 changed file with 43 additions and 24 deletions.
67 changes: 43 additions & 24 deletions shotover/src/transforms/redis/sink_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::stream::FuturesOrdered;
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryFutureExt};
use itertools::Itertools;
use metrics::counter;
use metrics::{counter, Counter};
use rand::rngs::SmallRng;
use rand::seq::IteratorRandom;
use rand::SeedableRng;
Expand Down Expand Up @@ -61,14 +61,14 @@ impl TransformConfig for RedisSinkClusterConfig {
RedisAuthenticator {},
self.tls.clone(),
)?;
Ok(Box::new(RedisSinkClusterBuilder {
first_contact_points: self.first_contact_points.clone(),
direct_destination: self.direct_destination.clone(),
connection_count: self.connection_count.unwrap_or(1),
Ok(Box::new(RedisSinkClusterBuilder::new(
self.first_contact_points.clone(),
self.direct_destination.clone(),
self.connection_count.unwrap_or(1),
connection_pool,
chain_name: transform_context.chain_name,
shared_topology: Arc::new(RwLock::new(Topology::new())),
}))
transform_context.chain_name,
Arc::new(RwLock::new(Topology::new())),
)))
}

fn up_chain_protocol(&self) -> UpChainProtocol {
Expand All @@ -85,8 +85,35 @@ pub struct RedisSinkClusterBuilder {
direct_destination: Option<String>,
connection_count: usize,
connection_pool: ConnectionPool<RedisCodecBuilder, RedisAuthenticator, UsernamePasswordToken>,
chain_name: String,
shared_topology: Arc<RwLock<Topology>>,
failed_requests: Counter,
}

impl RedisSinkClusterBuilder {
fn new(
first_contact_points: Vec<String>,
direct_destination: Option<String>,
connection_count: usize,
connection_pool: ConnectionPool<
RedisCodecBuilder,
RedisAuthenticator,
UsernamePasswordToken,
>,
chain_name: String,
shared_topology: Arc<RwLock<Topology>>,
) -> Self {
let failed_requests =
counter!("shotover_failed_requests_count", "chain" => chain_name, "transform" => NAME);

RedisSinkClusterBuilder {
first_contact_points,
direct_destination,
connection_count,
connection_pool,
shared_topology,
failed_requests,
}
}
}

impl TransformBuilder for RedisSinkClusterBuilder {
Expand All @@ -95,9 +122,9 @@ impl TransformBuilder for RedisSinkClusterBuilder {
self.first_contact_points.clone(),
self.direct_destination.clone(),
self.connection_count,
self.chain_name.clone(),
self.shared_topology.clone(),
self.connection_pool.clone(),
self.failed_requests.clone(),
))
}

Expand Down Expand Up @@ -126,7 +153,6 @@ impl Topology {
}

pub struct RedisSinkCluster {
chain_name: String,
has_run_init: bool,
topology: Topology,
shared_topology: Arc<RwLock<Topology>>,
Expand All @@ -140,23 +166,23 @@ pub struct RedisSinkCluster {
first_contact_points: Vec<String>,
direct_destination: Option<String>,
token: Option<UsernamePasswordToken>,
failed_requests: Counter,
}

impl RedisSinkCluster {
fn new(
first_contact_points: Vec<String>,
direct_destination: Option<String>,
connection_count: usize,
chain_name: String,
shared_topology: Arc<RwLock<Topology>>,
connection_pool: ConnectionPool<
RedisCodecBuilder,
RedisAuthenticator,
UsernamePasswordToken,
>,
failed_requests: Counter,
) -> Self {
let sink_cluster = RedisSinkCluster {
chain_name: chain_name.clone(),
RedisSinkCluster {
has_run_init: false,
first_contact_points,
direct_destination,
Expand All @@ -170,11 +196,8 @@ impl RedisSinkCluster {
reason_for_no_nodes: None,
rebuild_connections: true,
token: None,
};

counter!("shotover_failed_requests_count", "chain" => chain_name, "transform" => sink_cluster.get_name());

sink_cluster
failed_requests,
}
}

async fn direct_connection(&mut self) -> Result<&UnboundedSender<Request>> {
Expand All @@ -196,10 +219,6 @@ impl RedisSinkCluster {
Ok(self.direct_connection.as_ref().unwrap())
}

fn get_name(&self) -> &'static str {
NAME
}

#[inline]
async fn dispatch_message(&mut self, mut message: Message) -> Result<ResponseFuture> {
let command = match message.frame() {
Expand Down Expand Up @@ -617,7 +636,7 @@ impl RedisSinkCluster {

#[inline(always)]
fn send_error_response(&self, message: &str) -> Result<ResponseFuture> {
counter!("shotover_failed_requests_count", "chain" => self.chain_name.clone(), "transform" => self.get_name()).increment(1);
self.failed_requests.increment(1);
short_circuit(RedisFrame::Error(message.into()))
}

Expand Down

0 comments on commit e5e696f

Please sign in to comment.