Skip to content

Commit bd00e4b

Browse files
committed
tests: move batch statements tests to batch.rs module
1 parent 3c234ac commit bd00e4b

File tree

4 files changed

+365
-391
lines changed

4 files changed

+365
-391
lines changed

scylla/tests/common/utils.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ pub(crate) fn unique_keyspace_name() -> String {
4242
SystemTime::now()
4343
.duration_since(UNIX_EPOCH)
4444
.unwrap()
45-
.as_secs(),
45+
.as_micros(),
4646
cnt
4747
);
4848
println!("Unique name: {}", name);

scylla/tests/integration/batch.rs

+364-2
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,82 @@
11
use scylla::batch::Batch;
22
use scylla::batch::BatchType;
3-
use scylla::errors::{ExecutionError, RequestAttemptError};
3+
use scylla::client::session::Session;
4+
use scylla::errors::{DbError, ExecutionError, RequestAttemptError};
45
use scylla::frame::frame_errors::BatchSerializationError;
56
use scylla::frame::frame_errors::CqlRequestSerializationError;
7+
use scylla::prepared_statement::PreparedStatement;
68
use scylla::query::Query;
9+
use scylla::value::{Counter, CqlValue, MaybeUnset};
10+
use std::collections::HashMap;
11+
use std::string::String;
712

8-
use crate::utils::create_new_session_builder;
913
use crate::utils::setup_tracing;
1014
use crate::utils::unique_keyspace_name;
1115
use crate::utils::PerformDDL;
16+
use crate::utils::{create_new_session_builder, scylla_supports_tablets};
1217

1318
use assert_matches::assert_matches;
1419

20+
const BATCH_COUNT: usize = 100;
21+
22+
async fn create_test_session(table_name: &str, supports_tablets: bool) -> Session {
23+
let session = create_new_session_builder().build().await.unwrap();
24+
let ks = unique_keyspace_name();
25+
26+
let mut create_ks = format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks);
27+
28+
if !supports_tablets && scylla_supports_tablets(&session).await {
29+
create_ks += " AND TABLETS = {'enabled': false}"
30+
}
31+
session.ddl(create_ks).await.unwrap();
32+
session.use_keyspace(&ks, false).await.unwrap();
33+
session
34+
.ddl(format!(
35+
"CREATE TABLE IF NOT EXISTS {} (k0 text, k1 int, v int, PRIMARY KEY (k0, k1))",
36+
table_name
37+
))
38+
.await
39+
.unwrap();
40+
session
41+
}
42+
43+
async fn create_counter_tables(session: &Session) {
44+
for &table in ["counter1", "counter2", "counter3"].iter() {
45+
session
46+
.ddl(format!(
47+
"CREATE TABLE {} (k0 text PRIMARY KEY, c counter)",
48+
table
49+
))
50+
.await
51+
.unwrap();
52+
}
53+
}
54+
55+
async fn verify_batch_insert(session: &Session, test_name: &str, count: usize) {
56+
let select_query = format!("SELECT k0, k1, v FROM {} WHERE k0 = ?", test_name);
57+
let query_result = session
58+
.query_unpaged(select_query, (test_name,))
59+
.await
60+
.unwrap()
61+
.into_rows_result()
62+
.unwrap();
63+
let rows: Vec<(String, i32, i32)> = query_result
64+
.rows::<(String, i32, i32)>()
65+
.unwrap()
66+
.map(|r| r.unwrap())
67+
.collect();
68+
assert_eq!(rows.len(), count);
69+
for (k0, k1, v) in rows {
70+
assert_eq!(k0, test_name);
71+
assert_eq!(v, k1 + 1);
72+
}
73+
}
74+
75+
async fn prepare_insert_statement(session: &Session, table: &str) -> PreparedStatement {
76+
let query_str = format!("INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?)", table);
77+
session.prepare(Query::new(query_str)).await.unwrap()
78+
}
79+
1580
#[tokio::test]
1681
#[ntest::timeout(60000)]
1782
async fn batch_statements_and_values_mismatch_detected() {
@@ -76,3 +141,300 @@ async fn batch_statements_and_values_mismatch_detected() {
76141
)
77142
}
78143
}
144+
145+
#[tokio::test]
146+
async fn test_batch_of_simple_statements() {
147+
setup_tracing();
148+
let test_name = String::from("test_batch_simple_statements");
149+
let session = create_test_session(&test_name, true).await;
150+
151+
let mut batch = Batch::new(BatchType::Unlogged);
152+
for i in 0..BATCH_COUNT {
153+
let simple_statement = Query::new(format!(
154+
"INSERT INTO {} (k0, k1, v) VALUES ('{}', {}, {})",
155+
&test_name,
156+
&test_name,
157+
i,
158+
i + 1
159+
));
160+
batch.append_statement(simple_statement);
161+
}
162+
session.batch(&batch, vec![(); BATCH_COUNT]).await.unwrap();
163+
verify_batch_insert(&session, &test_name, BATCH_COUNT).await;
164+
}
165+
166+
#[tokio::test]
167+
async fn test_batch_of_bound_statements() {
168+
setup_tracing();
169+
let test_name = String::from("test_batch_bound_statements");
170+
let session = create_test_session(&test_name, true).await;
171+
172+
let prepared = prepare_insert_statement(&session, &test_name).await;
173+
let mut batch = Batch::new(BatchType::Unlogged);
174+
let mut batch_values: Vec<_> = Vec::with_capacity(BATCH_COUNT);
175+
for i in 0..BATCH_COUNT as i32 {
176+
batch.append_statement(prepared.clone());
177+
batch_values.push((test_name.as_str(), i, i + 1));
178+
}
179+
session.batch(&batch, batch_values).await.unwrap();
180+
verify_batch_insert(&session, &test_name, BATCH_COUNT).await;
181+
}
182+
183+
#[tokio::test]
184+
async fn test_prepared_batch() {
185+
setup_tracing();
186+
let test_name = String::from("test_prepared_batch");
187+
let session = create_test_session(&test_name, true).await;
188+
189+
let mut batch = Batch::new(BatchType::Unlogged);
190+
let mut batch_values = Vec::with_capacity(BATCH_COUNT);
191+
let query_str = format!("INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?)", &test_name);
192+
for i in 0..BATCH_COUNT as i32 {
193+
batch.append_statement(Query::new(query_str.clone()));
194+
batch_values.push((&test_name, i, i + 1));
195+
}
196+
let prepared_batch = session.prepare_batch(&batch).await.unwrap();
197+
session.batch(&prepared_batch, batch_values).await.unwrap();
198+
verify_batch_insert(&session, &test_name, BATCH_COUNT).await;
199+
}
200+
201+
#[tokio::test]
202+
async fn test_batch_of_bound_statements_with_unset_values() {
203+
setup_tracing();
204+
let test_name = String::from("test_batch_bound_statements_with_unset_values");
205+
let session = create_test_session(&test_name, true).await;
206+
207+
let prepared = prepare_insert_statement(&session, &test_name).await;
208+
let mut batch1 = Batch::new(BatchType::Unlogged);
209+
let mut batch1_values = Vec::with_capacity(BATCH_COUNT);
210+
for i in 0..BATCH_COUNT as i32 {
211+
batch1.append_statement(prepared.clone());
212+
batch1_values.push((test_name.as_str(), i, i + 1));
213+
}
214+
session.batch(&batch1, batch1_values).await.unwrap();
215+
216+
// Update v to (k1 + 2), but for every 20th row leave v unset.
217+
let mut batch2 = Batch::new(BatchType::Unlogged);
218+
let mut batch2_values = Vec::with_capacity(BATCH_COUNT);
219+
for i in 0..BATCH_COUNT as i32 {
220+
batch2.append_statement(prepared.clone());
221+
if i % 20 == 0 {
222+
batch2_values.push((
223+
MaybeUnset::Set(&test_name),
224+
MaybeUnset::Set(i),
225+
MaybeUnset::Unset,
226+
));
227+
} else {
228+
batch2_values.push((
229+
MaybeUnset::Set(&test_name),
230+
MaybeUnset::Set(i),
231+
MaybeUnset::Set(i + 2),
232+
));
233+
}
234+
}
235+
session.batch(&batch2, batch2_values).await.unwrap();
236+
237+
// Verify that rows with k1 % 20 == 0 retain the original value.
238+
let select_query = format!("SELECT k0, k1, v FROM {} WHERE k0 = ?", &test_name);
239+
let query_result = session
240+
.query_unpaged(select_query, (&test_name,))
241+
.await
242+
.unwrap()
243+
.into_rows_result()
244+
.unwrap();
245+
let rows: Vec<(String, i32, i32)> = query_result
246+
.rows::<(String, i32, i32)>()
247+
.unwrap()
248+
.map(|r| r.unwrap())
249+
.collect();
250+
assert_eq!(
251+
rows.len(),
252+
BATCH_COUNT,
253+
"Expected {} rows, got {}",
254+
BATCH_COUNT,
255+
rows.len()
256+
);
257+
for (k0, k1, v) in rows {
258+
assert_eq!(k0, test_name);
259+
assert_eq!(v, if k1 % 20 == 0 { k1 + 1 } else { k1 + 2 });
260+
}
261+
}
262+
263+
#[tokio::test]
264+
async fn test_batch_of_bound_statements_named_variables() {
265+
setup_tracing();
266+
let test_name = String::from("test_batch_bound_statements_named_variables");
267+
let session = create_test_session(&test_name, true).await;
268+
269+
let query_str = format!(
270+
"INSERT INTO {} (k0, k1, v) VALUES (:k0, :k1, :v)",
271+
&test_name
272+
);
273+
let prepared = session.prepare(query_str).await.unwrap();
274+
275+
let mut batch = Batch::new(BatchType::Unlogged);
276+
let mut batch_values = Vec::with_capacity(BATCH_COUNT);
277+
for i in 0..BATCH_COUNT as i32 {
278+
batch.append_statement(prepared.clone());
279+
let mut values = HashMap::new();
280+
values.insert("k0", CqlValue::Text(test_name.clone()));
281+
values.insert("k1", CqlValue::Int(i));
282+
values.insert("v", CqlValue::Int(i + 1));
283+
batch_values.push(values);
284+
}
285+
session.batch(&batch, batch_values).await.unwrap();
286+
verify_batch_insert(&session, &test_name, BATCH_COUNT).await;
287+
}
288+
289+
#[tokio::test]
290+
async fn test_batch_of_mixed_bound_and_simple_statements() {
291+
setup_tracing();
292+
let test_name = String::from("test_batch_mixed_bound_and_simple_statements");
293+
let session = create_test_session(&test_name, true).await;
294+
295+
let query_str = format!("INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?)", &test_name);
296+
let prepared_bound = session
297+
.prepare(Query::new(query_str.clone()))
298+
.await
299+
.unwrap();
300+
301+
let mut batch = Batch::new(BatchType::Unlogged);
302+
let mut batch_values = Vec::with_capacity(BATCH_COUNT);
303+
for i in 0..BATCH_COUNT as i32 {
304+
if i % 2 == 1 {
305+
let simple_statement = Query::new(format!(
306+
"INSERT INTO {} (k0, k1, v) VALUES ('{}', {}, {})",
307+
&test_name,
308+
&test_name,
309+
i,
310+
i + 1
311+
));
312+
batch.append_statement(simple_statement);
313+
batch_values.push(vec![]);
314+
} else {
315+
batch.append_statement(prepared_bound.clone());
316+
batch_values.push(vec![
317+
CqlValue::Text(test_name.clone()),
318+
CqlValue::Int(i),
319+
CqlValue::Int(i + 1),
320+
]);
321+
}
322+
}
323+
session.batch(&batch, batch_values).await.unwrap();
324+
verify_batch_insert(&session, &test_name, BATCH_COUNT).await;
325+
}
326+
327+
#[tokio::test]
328+
async fn test_cas_batch() {
329+
setup_tracing();
330+
let test_name = String::from("test_cas_batch");
331+
let session = create_test_session(&test_name, false).await;
332+
333+
let query_str = format!(
334+
"INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?) IF NOT EXISTS",
335+
&test_name
336+
);
337+
let prepared = session.prepare(Query::new(query_str)).await.unwrap();
338+
let mut batch = Batch::new(BatchType::Unlogged);
339+
let mut batch_values = Vec::with_capacity(BATCH_COUNT);
340+
for i in 0..BATCH_COUNT as i32 {
341+
batch.append_statement(prepared.clone());
342+
batch_values.push((&test_name, i, i + 1));
343+
}
344+
let result = session.batch(&batch, batch_values.clone()).await.unwrap();
345+
let row: (bool, Option<String>, Option<i32>, Option<i32>) =
346+
result.into_rows_result().unwrap().first_row().unwrap();
347+
assert!(row.0, "First CAS batch should be applied");
348+
349+
verify_batch_insert(&session, &test_name, BATCH_COUNT).await;
350+
351+
let result2 = session.batch(&batch, batch_values).await.unwrap();
352+
let row: (bool, Option<String>, Option<i32>, Option<i32>) =
353+
result2.into_rows_result().unwrap().first_row().unwrap();
354+
assert!(!row.0, "Second CAS batch should not be applied");
355+
}
356+
357+
#[tokio::test]
358+
async fn test_counter_batch() {
359+
setup_tracing();
360+
let test_name = String::from("test_counter_batch");
361+
let session = create_test_session(&test_name, false).await;
362+
create_counter_tables(&session).await;
363+
364+
let mut batch = Batch::new(BatchType::Counter);
365+
let mut batch_values = Vec::with_capacity(3);
366+
for i in 1..=3 {
367+
let query_str = format!("UPDATE counter{} SET c = c + ? WHERE k0 = ?", i);
368+
let prepared = session.prepare(Query::new(query_str)).await.unwrap();
369+
batch.append_statement(prepared);
370+
batch_values.push((Counter(i), &test_name));
371+
}
372+
session.batch(&batch, batch_values).await.unwrap();
373+
374+
for i in 1..=3 {
375+
let query_str = format!("SELECT c FROM counter{} WHERE k0 = ?", i);
376+
let query_result = session
377+
.query_unpaged(query_str, (&test_name,))
378+
.await
379+
.unwrap()
380+
.into_rows_result()
381+
.unwrap();
382+
let row = query_result.single_row::<(Counter,)>().unwrap();
383+
let (c,) = row;
384+
assert_eq!(c, Counter(i));
385+
}
386+
}
387+
388+
#[tokio::test]
389+
async fn test_fail_logged_batch_with_counter_increment() {
390+
setup_tracing();
391+
let test_name = String::from("test_fail_logged_batch");
392+
let session = create_test_session(&test_name, false).await;
393+
create_counter_tables(&session).await;
394+
395+
let mut batch = Batch::new(BatchType::Logged);
396+
let mut batch_values: Vec<_> = Vec::with_capacity(3);
397+
for i in 1..=3 {
398+
let query_str = format!("UPDATE counter{} SET c = c + ? WHERE k0 = ?", i);
399+
let prepared = session.prepare(Query::new(query_str)).await.unwrap();
400+
batch.append_statement(prepared);
401+
batch_values.push((Counter(i), &test_name));
402+
}
403+
let err = session.batch(&batch, batch_values).await.unwrap_err();
404+
assert_matches!(
405+
err,
406+
ExecutionError::LastAttemptError(RequestAttemptError::DbError(DbError::Invalid, _)),
407+
"Expected an Invalid DbError when using counter statements in a LOGGED batch"
408+
);
409+
}
410+
411+
#[tokio::test]
412+
async fn test_fail_counter_batch_with_non_counter_increment() {
413+
setup_tracing();
414+
let test_name = String::from("test_fail_counter_batch");
415+
let session = create_test_session(&test_name, false).await;
416+
create_counter_tables(&session).await;
417+
418+
let mut batch = Batch::new(BatchType::Counter);
419+
let mut batch_values: Vec<Vec<CqlValue>> = Vec::new();
420+
for i in 1..=3 {
421+
let query_str = format!("UPDATE counter{} SET c = c + ? WHERE k0 = ?", i);
422+
let prepared = session.prepare(Query::new(query_str)).await.unwrap();
423+
batch.append_statement(prepared);
424+
batch_values.push(vec![CqlValue::Int(i), CqlValue::Text(test_name.clone())]);
425+
}
426+
427+
let prepared = prepare_insert_statement(&session, &test_name).await;
428+
batch.append_statement(prepared);
429+
batch_values.push(vec![
430+
CqlValue::Text(test_name.clone()),
431+
CqlValue::Int(0),
432+
CqlValue::Int(1),
433+
]);
434+
let err = session.batch(&batch, batch_values).await.unwrap_err();
435+
assert_matches!(
436+
err,
437+
ExecutionError::BadQuery(_),
438+
"Expected a BadQuery error when including a non-counter statement in a COUNTER batch"
439+
);
440+
}

0 commit comments

Comments
 (0)