Skip to content

Commit

Permalink
fix(test): replay SET statements after recovery (#8292)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
Signed-off-by: Bugen Zhao <[email protected]>
Co-authored-by: Bugen Zhao <[email protected]>
Co-authored-by: August <[email protected]>
  • Loading branch information
3 people authored Mar 6, 2023
1 parent dad63fa commit 55d1438
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 13 deletions.
27 changes: 26 additions & 1 deletion src/tests/simulation/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,23 @@ pub struct RisingWave {
task: tokio::task::JoinHandle<()>,
host: String,
dbname: String,
/// The `SET` statements that have been executed on this client.
/// We need to replay them when reconnecting.
set_stmts: Vec<String>,
}

impl RisingWave {
pub async fn connect(
host: String,
dbname: String,
) -> Result<Self, tokio_postgres::error::Error> {
Self::reconnect(host, dbname, vec![]).await
}

pub async fn reconnect(
host: String,
dbname: String,
set_stmts: Vec<String>,
) -> Result<Self, tokio_postgres::error::Error> {
let (client, connection) = tokio_postgres::Config::new()
.host(&host)
Expand All @@ -52,11 +63,16 @@ impl RisingWave {
client
.simple_query("SET VISIBILITY_MODE TO checkpoint;")
.await?;
// replay all SET statements
for stmt in &set_stmts {
client.simple_query(stmt).await?;
}
Ok(RisingWave {
client,
task,
host,
dbname,
set_stmts,
})
}

Expand All @@ -81,7 +97,16 @@ impl sqllogictest::AsyncDB for RisingWave {

if self.client.is_closed() {
// connection error, reset the client
*self = Self::connect(self.host.clone(), self.dbname.clone()).await?;
*self = Self::reconnect(
self.host.clone(),
self.dbname.clone(),
self.set_stmts.clone(),
)
.await?;
}

if sql.trim_start().to_lowercase().starts_with("set") {
self.set_stmts.push(sql.to_string());
}

let mut output = vec![];
Expand Down
17 changes: 7 additions & 10 deletions src/tests/simulation/src/slt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl SqlCmd {
matches!(
self,
SqlCmd::Dml
| SqlCmd::Flush
| SqlCmd::Create {
is_create_table_as: true
}
Expand Down Expand Up @@ -85,13 +86,15 @@ const KILL_IGNORE_FILES: &[&str] = &[

/// Run the sqllogictest files in `glob`.
pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: &KillOpts) {
let risingwave = RisingWave::connect("frontend".into(), "dev".into())
.await
.unwrap();
let kill = opts.kill_compute || opts.kill_meta || opts.kill_frontend || opts.kill_compactor;
let mut tester = sqllogictest::Runner::new(risingwave);
let files = glob::glob(glob).expect("failed to read glob pattern");
for file in files {
// use a session per file
let risingwave = RisingWave::connect("frontend".into(), "dev".into())
.await
.unwrap();
let mut tester = sqllogictest::Runner::new(risingwave);

let file = file.unwrap();
let path = file.as_path();
println!("{}", path.display());
Expand Down Expand Up @@ -122,12 +125,6 @@ pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: &KillOpts) {
_ => SqlCmd::Others,
};

// Since we've configured the session to always enable implicit flush, we don't need to
// execute `FLUSH` statements.
if cmd == SqlCmd::Flush {
continue;
}

if cmd.ignore_kill() {
for i in 0usize.. {
let delay = Duration::from_secs(1 << i);
Expand Down
4 changes: 2 additions & 2 deletions src/tests/simulation/tests/it/nexmark_q4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,12 @@ async fn nexmark_q4_materialize_agg_cache_invalidation() -> Result<()> {

// Let parallel unit 0 handle all groups.
cluster.reschedule(format!("{id}-[1,2,3,4,5]")).await?;
sleep(Duration::from_secs(10)).await;
sleep(Duration::from_secs(7)).await;
let result_1 = cluster.run(SELECT).await?.assert_result_ne(RESULT);

// Scale out.
cluster.reschedule(format!("{id}+[1,2,3,4,5]")).await?;
sleep(Duration::from_secs(3)).await;
sleep(Duration::from_secs(7)).await;
cluster
.run(SELECT)
.await?
Expand Down

0 comments on commit 55d1438

Please sign in to comment.