Skip to content

Commit ebc9e93

Browse files
committed
ccm: Add DowngradingConsistencyRetryPolicy test
1 parent c839847 commit ebc9e93

File tree

2 files changed

+103
-0
lines changed

2 files changed

+103
-0
lines changed

scylla/tests/ccm_integration/main.rs

+1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@
22
mod common;
33

44
pub(crate) mod ccm;
5+
mod retry_policies;
56
mod test_example;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
use std::collections::HashSet;
2+
use std::sync::Arc;
3+
use std::time::Instant;
4+
use tokio::sync::Mutex;
5+
6+
use scylla::client::execution_profile::ExecutionProfile;
7+
use scylla::client::session::Session;
8+
use scylla::policies::retry::DowngradingConsistencyRetryPolicy;
9+
use scylla::statement::Consistency;
10+
11+
use crate::ccm::cluster::{Cluster, ClusterOptions};
12+
use crate::ccm::{run_ccm_test, CLUSTER_VERSION};
13+
use crate::common::utils::setup_tracing;
14+
15+
fn cluster_config() -> ClusterOptions {
16+
ClusterOptions {
17+
name: "ccm_retry_policies".to_string(),
18+
version: CLUSTER_VERSION.clone(),
19+
nodes: vec![3],
20+
..ClusterOptions::default()
21+
}
22+
}
23+
24+
async fn cql_read_cl_all(session: &Session) {
25+
let cql = "SELECT range_end FROM system_distributed_everywhere.cdc_generation_descriptions_v2";
26+
for is_idempotent in vec![false, true] {
27+
let mut prep_stmt = session.prepare(cql).await.unwrap();
28+
prep_stmt.set_is_idempotent(is_idempotent);
29+
prep_stmt.set_retry_policy(Some(Arc::new(DowngradingConsistencyRetryPolicy::new())));
30+
prep_stmt.set_consistency(Consistency::All);
31+
session
32+
.execute_unpaged(&prep_stmt, &[])
33+
.await
34+
.expect("failed to execute CL=ALL read query");
35+
}
36+
}
37+
38+
async fn get_alive_nodes_number(session: &Session) -> usize {
39+
let cluster_state = session.get_cluster_state();
40+
let alive_nodes: HashSet<_> = cluster_state
41+
.get_nodes_info()
42+
.iter()
43+
.filter(|node| node.is_connected())
44+
.collect();
45+
alive_nodes.len()
46+
}
47+
48+
#[tokio::test]
49+
#[ntest::timeout(30000)]
50+
#[cfg_attr(not(ccm_tests), ignore)]
51+
async fn test_downgrading_cl_dbnode_unavailable() {
52+
// NOTE: whole test takes 15-20 seconds
53+
setup_tracing();
54+
async fn test(cluster: Arc<Mutex<Cluster>>) {
55+
let handle = ExecutionProfile::builder()
56+
.retry_policy(Arc::new(DowngradingConsistencyRetryPolicy::new()))
57+
.consistency(Consistency::All)
58+
.build()
59+
.into_handle();
60+
let cluster = cluster.lock().await;
61+
let session = cluster
62+
.make_session_builder()
63+
.await
64+
.default_execution_profile_handle(handle)
65+
.build()
66+
.await
67+
.unwrap();
68+
69+
cql_read_cl_all(&session).await;
70+
let target_node = cluster.nodes().iter().next();
71+
72+
let alive_nodes_num = get_alive_nodes_number(&session).await;
73+
let all_nodes_num: usize = cluster_config().nodes.iter().map(|&n| n as usize).sum();
74+
assert_eq!(all_nodes_num, alive_nodes_num);
75+
76+
println!("Going to stop first node");
77+
target_node.expect("REASON").write().await.stop(None).await.unwrap();
78+
79+
// NOTE: make sure we have "ALL-1" active DB nodes
80+
let alive_nodes_num_after_stop = get_alive_nodes_number(&session).await;
81+
assert_eq!(all_nodes_num, alive_nodes_num_after_stop + 1);
82+
83+
// NOTE: make a CL=ALL query, it should succeed having "ALL-1" alive nodes
84+
cql_read_cl_all(&session).await;
85+
86+
println!("Going to start first node");
87+
target_node.expect("REASON").write().await.start(None).await.unwrap();
88+
89+
// NOTE: wait while driver detects the node availability back again.
90+
// During the test development the waiting loop was taking ~1.2s
91+
let loop_start_time = Instant::now();
92+
loop {
93+
let alive_nodes_num_after_start = get_alive_nodes_number(&session).await;
94+
if alive_nodes_num_after_start == all_nodes_num {
95+
break
96+
}
97+
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
98+
}
99+
println!("Waiting for the node availability took {:#?}", Instant::now() - loop_start_time);
100+
}
101+
run_ccm_test(cluster_config, test).await;
102+
}

0 commit comments

Comments
 (0)