Skip to content

Commit

Permalink
chore: run relation_link_test concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
Weakky committed Jul 22, 2024
1 parent f66d888 commit aa4eb82
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 82 deletions.
29 changes: 15 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 0 additions & 47 deletions query-engine/connector-test-kit-rs/qe-setup/src/cockroachdb.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use once_cell::sync::OnceCell;
use quaint::{connector::PostgresFlavour, prelude::*, single::Quaint};
use schema_core::schema_connector::{ConnectorError, ConnectorResult};
use url::Url;
Expand All @@ -20,7 +19,6 @@ pub(crate) async fn cockroach_setup(url: String, prisma_schema: &str) -> Connect

conn.raw_cmd(&query).await.unwrap();

drop_db_when_thread_exits(parsed_url, db_name);
let mut connector = sql_schema_connector::SqlSchemaConnector::new_cockroach();
crate::diff_and_apply(prisma_schema, url, &mut connector).await
}
Expand All @@ -29,48 +27,3 @@ async fn create_admin_conn(url: &mut Url) -> ConnectorResult<Quaint> {
url.set_path("/postgres");
Ok(Quaint::new(url.as_ref()).await.unwrap())
}

fn drop_db_when_thread_exits(admin_url: Url, db_name: &str) {
use std::{cell::RefCell, sync::mpsc, thread};
use test_setup::runtime::run_with_thread_local_runtime as tok;

// === Dramatis Personæ ===

// DB_DROP_THREAD: A thread that drops databases.
static DB_DROP_THREAD: OnceCell<mpsc::SyncSender<String>> = OnceCell::new();

let sender = DB_DROP_THREAD.get_or_init(|| {
let (sender, receiver) = mpsc::sync_channel::<String>(4096);

thread::spawn(move || {
let mut admin_url = admin_url;
let conn = tok(create_admin_conn(&mut admin_url)).unwrap();

// Receive new databases to drop.
for msg in receiver.iter() {
tok(conn.raw_cmd(&msg)).unwrap();
}
});

sender
});

// NOTIFIER: a thread local that notifies DB_DROP_THREAD when dropped.
struct Notifier(String, mpsc::SyncSender<String>);

impl Drop for Notifier {
fn drop(&mut self) {
let message = std::mem::take(&mut self.0);

self.1.send(message).unwrap();
}
}

thread_local! {
static NOTIFIER: RefCell<Option<Notifier>> = const { RefCell::new(None) };
}

NOTIFIER.with(move |cell| {
*cell.borrow_mut() = Some(Notifier(format!("DROP DATABASE \"{db_name}\""), sender.clone()));
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ query-engine-metrics = { path = "../../metrics" }
quaint.workspace = true
jsonrpc-core = "17"
insta = "1.7.1"
futures-util = "0.3.30"

# Only this version is vetted, upgrade only after going through the code,
# as this is a small crate with little user base.
Expand Down
45 changes: 24 additions & 21 deletions query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ fn run_relation_link_test_impl(

let (dms, capabilities) = schema_with_relation(on_parent, on_child, id_only);

let mut futs = Vec::new();

insta::allow_duplicates! {
for (i, (dm, caps)) in dms.into_iter().zip(capabilities.into_iter()).enumerate() {
if RELATION_TEST_IDX.map(|idx| idx != i).unwrap_or(false) {
Expand All @@ -165,28 +167,29 @@ fn run_relation_link_test_impl(
let metrics_for_subscriber = metrics.clone();
let (log_capture, log_tx) = TestLogCapture::new();

run_with_tokio(
async move {
println!("Used datamodel:\n {}", datamodel.yellow());
let override_local_max_bind_values = None;
let runner = Runner::load(datamodel.clone(), &[], version, connector_tag, override_local_max_bind_values, metrics, log_capture)
.await
.unwrap();


test_fn(&runner, &dm).with_subscriber(test_tracing_subscriber(
ENV_LOG_LEVEL.to_string(),
metrics_for_subscriber,
log_tx,
))
.await.unwrap();

teardown_project(&datamodel, Default::default(), runner.schema_id())
.await
.unwrap();
}
);
futs.push(async move {
println!("Used datamodel:\n {}", datamodel.yellow());
let override_local_max_bind_values = None;
let runner = Runner::load(datamodel.clone(), &[], version, connector_tag, override_local_max_bind_values, metrics, log_capture)
.await
.unwrap();

test_fn(&runner, &dm).with_subscriber(test_tracing_subscriber(
ENV_LOG_LEVEL.to_string(),
metrics_for_subscriber,
log_tx,
))
.await.unwrap();

teardown_project(&datamodel, Default::default(), runner.schema_id())
.await
.unwrap();
});
}

run_with_tokio(async move {
futures_util::future::join_all(futs).await
});
}
}

Expand Down

0 comments on commit aa4eb82

Please sign in to comment.