Skip to content

Commit

Permalink
[Enhancement] early check table exist before create
Browse files Browse the repository at this point in the history
Currently, when creating an external table, we won't check
whether it exist or not, always create a new table.
To create a new table, we need to read data from csv/json/parquet
and infer the type, this will bring a lot of overhead.

In this pr, before creating an external table, we can check whether
table exist or not, if it exist, directly return.

Signed-off-by: xyz <[email protected]>
  • Loading branch information
xiaoyong-z committed Jan 21, 2023
1 parent a9ddcd3 commit 054f014
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 14 deletions.
28 changes: 15 additions & 13 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,21 +446,23 @@ impl SessionContext {
&self,
cmd: &CreateExternalTable,
) -> Result<DataFrame> {
let table_provider: Arc<dyn TableProvider> =
self.create_custom_table(cmd).await?;

let table = self.table(&cmd.name).await;
match (cmd.if_not_exists, table) {
(true, Ok(_)) => self.return_empty_dataframe(),
(_, Err(_)) => {
self.register_table(&cmd.name, table_provider)?;
self.return_empty_dataframe()
let exist = self.table_exist(&cmd.name)?;
if exist {
match cmd.if_not_exists {
true => return self.return_empty_dataframe(),
false => {
return Err(DataFusionError::Execution(format!(
"Table '{}' already exists",
cmd.name
)));
}
}
(false, Ok(_)) => Err(DataFusionError::Execution(format!(
"Table '{}' already exists",
cmd.name
))),
}

let table_provider: Arc<dyn TableProvider> =
self.create_custom_table(cmd).await?;
self.register_table(&cmd.name, table_provider)?;
self.return_empty_dataframe()
}

async fn create_custom_table(
Expand Down
48 changes: 47 additions & 1 deletion datafusion/core/tests/sql/create_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async fn sql_create_table_if_not_exists() -> Result<()> {
.collect()
.await?;

// Create external table
// Create external table again
let result = ctx.sql("CREATE EXTERNAL TABLE IF NOT EXISTS aggregate_simple STORED AS CSV WITH HEADER ROW LOCATION 'tests/data/aggregate_simple.csv'")
.await?
.collect()
Expand All @@ -58,6 +58,52 @@ async fn sql_create_table_if_not_exists() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn sql_create_table_exists() -> Result<()> {
// the information schema used to introduce cyclic Arcs
let ctx =
SessionContext::with_config(SessionConfig::new().with_information_schema(true));

// Create table
ctx.sql("CREATE TABLE y AS VALUES (1,2,3)")
.await?
.collect()
.await?;

// Create table again without if not exist
let result = ctx.sql("CREATE TABLE y AS VALUES (1,2,3)").await;

match result {
Err(DataFusionError::Execution(err_msg)) => {
assert_eq!(err_msg, "Table 'y' already exists");
}
_ => {
panic!("expect create table failed");
}
}

// Create external table
ctx.sql("CREATE EXTERNAL TABLE aggregate_simple STORED AS CSV WITH HEADER ROW LOCATION 'tests/data/aggregate_simple.csv'")
.await?
.collect()
.await?;

// Create external table again without if not exist
let result = ctx.sql("CREATE EXTERNAL TABLE aggregate_simple STORED AS CSV WITH HEADER ROW LOCATION 'tests/data/aggregate_simple.csv'")
.await;

match result {
Err(DataFusionError::Execution(err_msg)) => {
assert_eq!(err_msg, "Table 'aggregate_simple' already exists");
}
_ => {
panic!("expect create table failed");
}
}

Ok(())
}

#[tokio::test]
async fn create_custom_table() -> Result<()> {
let mut cfg = RuntimeConfig::new();
Expand Down

0 comments on commit 054f014

Please sign in to comment.