Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(test): minor code refactoring in simulation test #7666

Merged
merged 3 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/tests/simulation/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ async fn main() {
.run_on_client(async move {
let glob = &args.files;
if let Some(jobs) = args.jobs {
run_parallel_slt_task(cluster0, glob, jobs).await.unwrap();
run_parallel_slt_task(glob, jobs).await.unwrap();
} else {
run_slt_task(cluster0, glob, &kill_opts).await;
}
Expand Down
144 changes: 79 additions & 65 deletions src/tests/simulation/src/slt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,47 @@ fn is_create_table_as(sql: &str) -> bool {
parts.len() >= 4 && parts[0] == "create" && parts[1] == "table" && parts[3] == "as"
}

#[derive(PartialEq, Eq)]
enum SqlCmd {
Create { is_create_table_as: bool },
Drop,
Dml,
Flush,
Others,
}

impl SqlCmd {
// We won't kill during insert/update/delete since the atomicity is not guaranteed.
// Notice that `create table as` is also not atomic in our system.
fn ignore_kill(&self) -> bool {
matches!(
self,
SqlCmd::Dml
| SqlCmd::Create {
is_create_table_as: true
}
)
}
}

fn extract_sql_command(sql: &str) -> SqlCmd {
let cmd = sql
.trim_start()
.split_once(' ')
.unwrap_or_default()
.0
.to_lowercase();
match cmd.as_str() {
"create" => SqlCmd::Create {
is_create_table_as: is_create_table_as(sql),
},
"drop" => SqlCmd::Drop,
"insert" | "update" | "delete" => SqlCmd::Dml,
"flush" => SqlCmd::Flush,
_ => SqlCmd::Others,
}
}

const KILL_IGNORE_FILES: &[&str] = &[
// TPCH queries are too slow for recovery.
"tpch_snapshot.slt",
Expand Down Expand Up @@ -64,75 +105,48 @@ pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: &KillOpts) {
if let sqllogictest::Record::Halt { .. } = record {
break;
}
let (is_create_table_as, is_create, is_drop, is_dml, is_flush) =
if let sqllogictest::Record::Statement { sql, .. } = &record {
let is_create_table_as = is_create_table_as(sql);
let sql =
(sql.trim_start().split_once(' ').unwrap_or_default().0).to_lowercase();
(
is_create_table_as,
!is_create_table_as && sql == "create",
sql == "drop",
sql == "insert" || sql == "update" || sql == "delete",
sql == "flush",
)
} else if let sqllogictest::Record::Query { sql, .. } = &record {
let sql =
(sql.trim_start().split_once(' ').unwrap_or_default().0).to_lowercase();
(
false,
false,
false,
sql == "insert" || sql == "update" || sql == "delete",
false,
)
} else {
(false, false, false, false, false)
};

// For normal records.
if !kill {
match tester.run_async(record).await {
Ok(_) => continue,
Err(e) => panic!("{}", e),
}
}

// For kill enabled.
let cmd = match &record {
sqllogictest::Record::Statement { sql, .. }
| sqllogictest::Record::Query { sql, .. } => extract_sql_command(sql),
_ => SqlCmd::Others,
};

// Since we've configured the session to always enable implicit flush, we don't need to
// execute `FLUSH` statements.
if is_flush {
if cmd == SqlCmd::Flush {
continue;
}

// We won't kill during create/insert/update/delete since the atomicity is not
// guaranteed. Notice that `create table as` is also not atomic in our system.
if is_dml || is_create_table_as {
if !kill {
if let Err(e) = tester.run_async(record).await {
panic!("{}", e);
}
} else {
for i in 0usize.. {
let delay = Duration::from_secs(1 << i);
match tester.run_async(record.clone()).await {
Ok(_) => break,
// cluster could be still under recovering if killed before, retry if
// meets `no reader for dml in table with id {}`.
Err(e)
if !e.to_string().contains("no reader for dml in table")
|| i >= 5 =>
{
panic!("failed to run test after retry {i} times: {e}")
}
Err(e) => {
tracing::error!("failed to run test: {e}\nretry after {delay:?}")
}
if cmd.ignore_kill() {
for i in 0usize.. {
let delay = Duration::from_secs(1 << i);
if let Err(err) = tester.run_async(record.clone()).await {
// cluster could be still under recovering if killed before, retry if
// meets `no reader for dml in table with id {}`.
let should_retry =
err.to_string().contains("no reader for dml in table") && i < 5;
if !should_retry {
panic!("{}", err);
}
tokio::time::sleep(delay).await;
tracing::error!("failed to run test: {err}\nretry after {delay:?}");
} else {
break;
}
tokio::time::sleep(delay).await;
}
continue;
}

// For normal records.
if !kill {
match tester.run_async(record).await {
Ok(_) => continue,
Err(e) => panic!("{}", e),
}
}
let should_kill = thread_rng().gen_ratio((opts.kill_rate * 1000.0) as u32, 1000);
// spawn a background task to kill nodes
let handle = if should_kill {
Expand All @@ -154,16 +168,20 @@ pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: &KillOpts) {
Ok(_) => break,
// allow 'table exists' error when retry CREATE statement
Err(e)
if is_create
&& i != 0
if matches!(
cmd,
SqlCmd::Create {
is_create_table_as: false
}
) && i != 0
&& e.to_string().contains("exists")
&& e.to_string().contains("Catalog error") =>
{
break
}
// allow 'not found' error when retry DROP statement
Err(e)
if is_drop
if cmd == SqlCmd::Drop
&& i != 0
&& e.to_string().contains("not found")
&& e.to_string().contains("Catalog error") =>
Expand All @@ -182,11 +200,7 @@ pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: &KillOpts) {
}
}

pub async fn run_parallel_slt_task(
cluster: Arc<Cluster>,
glob: &str,
jobs: usize,
) -> Result<(), ParallelTestError> {
pub async fn run_parallel_slt_task(glob: &str, jobs: usize) -> Result<(), ParallelTestError> {
let db = RisingWave::connect("frontend".into(), "dev".into())
.await
.unwrap();
Expand Down