diff --git a/Cargo.toml b/Cargo.toml index 1ab431ea316a9..3e7be926d7327 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,3 +50,6 @@ opt-level = 3 overflow-checks = false panic = 'unwind' rpath = false + +[patch.crates-io] +sqlparser = { git = "https://github.com/tustvold/sqlparser-rs.git", rev = "a3cd766d11845fcaa9ece9c1b048cc8463d92252" } diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 012fbc3023873..321a8517adb90 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -39,3 +39,6 @@ object_store = { version = "0.5.0", features = ["aws", "gcp"] } rustyline = "10.0" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] } url = "2.2" + +[patch.crates-io] +sqlparser = { git = "https://github.com/tustvold/sqlparser-rs.git", rev = "a3cd766d11845fcaa9ece9c1b048cc8463d92252" } diff --git a/datafusion/common/src/table_reference.rs b/datafusion/common/src/table_reference.rs index 17f1e231d5140..40c01ee8c0efb 100644 --- a/datafusion/common/src/table_reference.rs +++ b/datafusion/common/src/table_reference.rs @@ -54,7 +54,7 @@ pub enum TableReference<'a> { /// Represents a path to a table that may require further resolution /// that owns the underlying names -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Hash, PartialEq)] pub enum OwnedTableReference { /// An unqualified table reference, e.g. "table" Bare { diff --git a/datafusion/core/src/catalog/information_schema.rs b/datafusion/core/src/catalog/information_schema.rs index f7ef6b93dea94..10e4f98f3f03b 100644 --- a/datafusion/core/src/catalog/information_schema.rs +++ b/datafusion/core/src/catalog/information_schema.rs @@ -29,6 +29,7 @@ use arrow::{ datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::RecordBatch, }; +use async_trait::async_trait; use parking_lot::RwLock; use datafusion_common::Result; @@ -45,12 +46,16 @@ use super::{ schema::SchemaProvider, }; -const INFORMATION_SCHEMA: &str = "information_schema"; +/// The name of the information schema +pub const INFORMATION_SCHEMA: &str = "information_schema"; const TABLES: &str = "tables"; const VIEWS: &str = "views"; const COLUMNS: &str = "columns"; const DF_SETTINGS: &str = "df_settings"; +/// All information schema tables +pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[TABLES, VIEWS, COLUMNS, DF_SETTINGS]; + /// Wraps another [`CatalogProvider`] and adds a "information_schema" /// schema that can introspect on tables in the catalog_list pub(crate) struct CatalogWithInformationSchema { @@ -132,7 +137,7 @@ struct InformationSchemaConfig { impl InformationSchemaConfig { /// Construct the `information_schema.tables` virtual table - fn make_tables(&self, builder: &mut InformationSchemaTablesBuilder) { + async fn make_tables(&self, builder: &mut InformationSchemaTablesBuilder) { // create a mem table with the names of tables for catalog_name in self.catalog_list.catalog_names() { @@ -142,7 +147,7 @@ impl InformationSchemaConfig { if schema_name != INFORMATION_SCHEMA { let schema = catalog.schema(&schema_name).unwrap(); for table_name in schema.table_names() { - let table = schema.table(&table_name).unwrap(); + let table = schema.table(&table_name).await.unwrap(); builder.add_table( &catalog_name, &schema_name, @@ -171,7 +176,7 @@ impl InformationSchemaConfig { } } - fn make_views(&self, builder: &mut InformationSchemaViewBuilder) { + async fn make_views(&self, builder: &mut InformationSchemaViewBuilder) { for catalog_name in self.catalog_list.catalog_names() { let catalog = self.catalog_list.catalog(&catalog_name).unwrap(); @@ -179,7 +184,7 @@ impl InformationSchemaConfig { if schema_name != INFORMATION_SCHEMA { let schema = catalog.schema(&schema_name).unwrap(); for table_name in schema.table_names() { - let table = schema.table(&table_name).unwrap(); + let table = schema.table(&table_name).await.unwrap(); builder.add_view( &catalog_name, &schema_name, @@ -193,7 +198,7 @@ impl InformationSchemaConfig { } /// Construct the `information_schema.columns` virtual table - fn make_columns(&self, builder: &mut InformationSchemaColumnsBuilder) { + async fn make_columns(&self, builder: &mut InformationSchemaColumnsBuilder) { for catalog_name in self.catalog_list.catalog_names() { let catalog = self.catalog_list.catalog(&catalog_name).unwrap(); @@ -201,7 +206,7 @@ impl InformationSchemaConfig { if schema_name != INFORMATION_SCHEMA { let schema = catalog.schema(&schema_name).unwrap(); for table_name in schema.table_names() { - let table = schema.table(&table_name).unwrap(); + let table = schema.table(&table_name).await.unwrap(); for (i, field) in table.schema().fields().iter().enumerate() { builder.add_column( &catalog_name, @@ -227,6 +232,7 @@ impl InformationSchemaConfig { } } +#[async_trait] impl SchemaProvider for InformationSchemaProvider { fn as_any(&self) -> &(dyn Any + 'static) { self @@ -241,7 +247,7 @@ impl SchemaProvider for InformationSchemaProvider { ] } - fn table(&self, name: &str) -> Option> { + async fn table(&self, name: &str) -> Option> { let config = self.config.clone(); let table: Arc = if name.eq_ignore_ascii_case("tables") { Arc::new(InformationSchemaTables::new(config)) @@ -305,7 +311,7 @@ impl PartitionStream for InformationSchemaTables { self.schema.clone(), // TODO: Stream this futures::stream::once(async move { - config.make_tables(&mut builder); + config.make_tables(&mut builder).await; Ok(builder.finish()) }), )) @@ -396,7 +402,7 @@ impl PartitionStream for InformationSchemaViews { self.schema.clone(), // TODO: Stream this futures::stream::once(async move { - config.make_views(&mut builder); + config.make_views(&mut builder).await; Ok(builder.finish()) }), )) @@ -510,7 +516,7 @@ impl PartitionStream for InformationSchemaColumns { self.schema.clone(), // TODO: Stream this futures::stream::once(async move { - config.make_columns(&mut builder); + config.make_columns(&mut builder).await; Ok(builder.finish()) }), )) diff --git a/datafusion/core/src/catalog/listing_schema.rs b/datafusion/core/src/catalog/listing_schema.rs index 5fc2f48adda0e..b457b860a8fdf 100644 --- a/datafusion/core/src/catalog/listing_schema.rs +++ b/datafusion/core/src/catalog/listing_schema.rs @@ -29,6 +29,7 @@ use std::any::Any; use std::collections::{HashMap, HashSet}; use std::path::Path; use std::sync::{Arc, Mutex}; +use async_trait::async_trait; /// A `SchemaProvider` that scans an `ObjectStore` to automatically discover tables /// @@ -148,6 +149,7 @@ impl ListingSchemaProvider { } } +#[async_trait] impl SchemaProvider for ListingSchemaProvider { fn as_any(&self) -> &dyn Any { self @@ -162,7 +164,7 @@ impl SchemaProvider for ListingSchemaProvider { .collect() } - fn table(&self, name: &str) -> Option> { + async fn table(&self, name: &str) -> Option> { self.tables .lock() .expect("Can't lock tables") diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs index df0ef78807b19..c8b631f858878 100644 --- a/datafusion/core/src/catalog/schema.rs +++ b/datafusion/core/src/catalog/schema.rs @@ -21,11 +21,13 @@ use dashmap::DashMap; use std::any::Any; use std::sync::Arc; +use async_trait::async_trait; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; /// Represents a schema, comprising a number of named tables. +#[async_trait] pub trait SchemaProvider: Sync + Send { /// Returns the schema provider as [`Any`](std::any::Any) /// so that it can be downcast to a specific implementation. @@ -35,7 +37,7 @@ pub trait SchemaProvider: Sync + Send { fn table_names(&self) -> Vec; /// Retrieves a specific table from the schema by name, provided it exists. - fn table(&self, name: &str) -> Option>; + async fn table(&self, name: &str) -> Option>; /// If supported by the implementation, adds a new table to this schema. /// If a table of the same name existed before, it returns "Table already exists" error. @@ -85,6 +87,7 @@ impl Default for MemorySchemaProvider { } } +#[async_trait] impl SchemaProvider for MemorySchemaProvider { fn as_any(&self) -> &dyn Any { self @@ -97,7 +100,7 @@ impl SchemaProvider for MemorySchemaProvider { .collect() } - fn table(&self, name: &str) -> Option> { + async fn table(&self, name: &str) -> Option> { self.tables.get(name).map(|table| table.value().clone()) } diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index d7dd04f886ce2..7fbd87815f592 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -921,7 +921,7 @@ mod tests { let ctx = SessionContext::new(); ctx.register_batch("t", batch)?; - let df = ctx.table("t")?.select_columns(&["f.c1"])?; + let df = ctx.table("t").await?.select_columns(&["f.c1"])?; let df_results = df.collect().await?; @@ -1040,7 +1040,7 @@ mod tests { )); // build query with a UDF using DataFrame API - let df = ctx.table("aggregate_test_100")?; + let df = ctx.table("aggregate_test_100").await?; let f = df.registry(); @@ -1048,8 +1048,9 @@ mod tests { let plan = df.plan.clone(); // build query using SQL - let sql_plan = - ctx.create_logical_plan("SELECT my_fn(c12) FROM aggregate_test_100")?; + let sql_plan = ctx + .create_logical_plan("SELECT my_fn(c12) FROM aggregate_test_100") + .await?; // the two plans should be identical assert_same_plan(&plan, &sql_plan); @@ -1106,7 +1107,7 @@ mod tests { ctx.register_table("test_table", df_impl.clone())?; // pull the table out - let table = ctx.table("test_table")?; + let table = ctx.table("test_table").await?; let group_expr = vec![col("c1")]; let aggr_expr = vec![sum(col("c12"))]; @@ -1160,13 +1161,13 @@ mod tests { async fn create_plan(sql: &str) -> Result { let mut ctx = SessionContext::new(); register_aggregate_csv(&mut ctx, "aggregate_test_100").await?; - ctx.create_logical_plan(sql) + ctx.create_logical_plan(sql).await } async fn test_table_with_name(name: &str) -> Result> { let mut ctx = SessionContext::new(); register_aggregate_csv(&mut ctx, name).await?; - ctx.table(name) + ctx.table(name).await } async fn test_table() -> Result> { @@ -1300,8 +1301,15 @@ mod tests { ctx.register_table("t1", df.clone())?; ctx.register_table("t2", df)?; let df = ctx - .table("t1")? - .join(ctx.table("t2")?, JoinType::Inner, &["c1"], &["c1"], None)? + .table("t1") + .await? + .join( + ctx.table("t2").await?, + JoinType::Inner, + &["c1"], + &["c1"], + None, + )? .sort(vec![ // make the test deterministic col("t1.c1").sort(true, true), @@ -1378,10 +1386,11 @@ mod tests { ) .await?; - ctx.register_table("t1", ctx.table("test")?)?; + ctx.register_table("t1", ctx.table("test").await?)?; let df = ctx - .table("t1")? + .table("t1") + .await? .filter(col("id").eq(lit(1)))? .select_columns(&["bool_col", "int_col"])?; @@ -1462,7 +1471,8 @@ mod tests { ctx.register_batch("t", batch)?; let df = ctx - .table("t")? + .table("t") + .await? // try and create a column with a '.' in it .with_column("f.c2", lit("hello"))?; diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index ce262f55ddfc4..543ab25b593d5 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -431,12 +431,13 @@ mod tests { ) .await?; - ctx.register_table("t1", ctx.table("test")?)?; + ctx.register_table("t1", ctx.table("test").await?)?; ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?; let df = ctx - .table("t2")? + .table("t2") + .await? .filter(col("id").eq(lit(1)))? .select_columns(&["bool_col", "int_col"])?; @@ -460,12 +461,13 @@ mod tests { ) .await?; - ctx.register_table("t1", ctx.table("test")?)?; + ctx.register_table("t1", ctx.table("test").await?)?; ctx.sql("CREATE VIEW t2 as SELECT * FROM t1").await?; let df = ctx - .table("t2")? + .table("t2") + .await? .limit(0, Some(10))? .select_columns(&["bool_col", "int_col"])?; diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 759cc79a8bf81..1e795bd2dee60 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -38,6 +38,7 @@ use crate::{ pub use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::var_provider::is_system_variables; use parking_lot::RwLock; +use std::ops::ControlFlow; use std::str::FromStr; use std::sync::Arc; use std::{ @@ -95,10 +96,14 @@ use datafusion_sql::{ planner::{ContextProvider, SqlToRel}, }; use parquet::file::properties::WriterProperties; +use sqlparser::ast::{visit_tables, Ident, ObjectName}; use url::Url; +use crate::catalog::information_schema::{INFORMATION_SCHEMA, INFORMATION_SCHEMA_TABLES}; use crate::catalog::listing_schema::ListingSchemaProvider; use crate::datasource::object_store::ObjectStoreUrl; +use datafusion_sql::parser::Statement; +use datafusion_sql::planner::object_name_to_table_reference; use uuid::Uuid; use super::options::{ @@ -174,6 +179,7 @@ impl SessionContext { } /// Finds any ListSchemaProviders and instructs them to reload tables from "disk" + #[deprecated(note = "SchemaProvider should reload metadata in async table function")] pub async fn refresh_catalogs(&self) -> Result<()> { let cat_names = self.catalog_names().clone(); for cat_name in cat_names.iter() { @@ -253,7 +259,7 @@ impl SessionContext { /// This method is `async` because queries of type `CREATE EXTERNAL TABLE` /// might require the schema to be inferred. pub async fn sql(&self, sql: &str) -> Result> { - let plan = self.create_logical_plan(sql)?; + let plan = self.create_logical_plan(sql).await?; match plan { LogicalPlan::CreateExternalTable(cmd) => { self.create_external_table(&cmd).await @@ -265,7 +271,7 @@ impl SessionContext { if_not_exists, or_replace, }) => { - let table = self.table(&name); + let table = self.table(&name).await; match (if_not_exists, or_replace, table) { (true, false, Ok(_)) => self.return_empty_dataframe(), @@ -312,7 +318,7 @@ impl SessionContext { or_replace, definition, }) => { - let view = self.table(&name); + let view = self.table(&name).await; match (or_replace, view) { (true, Ok(_)) => { @@ -340,7 +346,7 @@ impl SessionContext { LogicalPlan::DropTable(DropTable { name, if_exists, .. }) => { - let result = self.find_and_deregister(&name, TableType::Base); + let result = self.find_and_deregister(&name, TableType::Base).await; match (result, if_exists) { (Ok(true), _) => self.return_empty_dataframe(), (_, true) => self.return_empty_dataframe(), @@ -354,7 +360,7 @@ impl SessionContext { LogicalPlan::DropView(DropView { name, if_exists, .. }) => { - let result = self.find_and_deregister(&name, TableType::View); + let result = self.find_and_deregister(&name, TableType::View).await; match (result, if_exists) { (Ok(true), _) => self.return_empty_dataframe(), (_, true) => self.return_empty_dataframe(), @@ -497,7 +503,7 @@ impl SessionContext { let table_provider: Arc = self.create_custom_table(cmd).await?; - let table = self.table(&cmd.name); + let table = self.table(&cmd.name).await; match (cmd.if_not_exists, table) { (true, Ok(_)) => self.return_empty_dataframe(), (_, Err(_)) => { @@ -531,17 +537,15 @@ impl SessionContext { Ok(table) } - fn find_and_deregister<'a>( + async fn find_and_deregister<'a>( &self, table_ref: impl Into>, table_type: TableType, ) -> Result { let table_ref = table_ref.into(); - let table_provider = self - .state - .read() - .schema_for_ref(table_ref)? - .table(table_ref.table()); + let schema = self.state.read().schema_for_ref(table_ref)?.clone(); + + let table_provider = schema.table(table_ref.table()).await; if let Some(table_provider) = table_provider { if table_provider.table_type() == table_type { @@ -554,7 +558,7 @@ impl SessionContext { /// Creates a logical plan. /// /// This function is intended for internal use and should not be called directly. - pub fn create_logical_plan(&self, sql: &str) -> Result { + pub async fn create_logical_plan(&self, sql: &str) -> Result { let mut statements = DFParser::parse_sql(sql)?; if statements.len() != 1 { @@ -564,8 +568,58 @@ impl SessionContext { } // create a query planner - let state = self.state.read().clone(); - let query_planner = SqlToRel::new(&state); + let mut provider = { + let state = self.state.read(); + QueryContextProvider { + config: state.config.clone(), + scalar_functions: state.scalar_functions.clone(), + aggregate_functions: state.aggregate_functions.clone(), + execution_props: state.execution_props.clone(), + tables: Default::default(), + } + }; + + let mut tables = hashbrown::HashSet::new(); + // Fetch all tables needed by query + for statement in &statements { + match statement { + Statement::Statement(s) => { + visit_tables(s, |table_name| { + tables.get_or_insert_with(table_name, |_| table_name.clone()); + ControlFlow::<(), ()>::Continue(()) + }); + } + Statement::CreateExternalTable(table) => { + tables.insert(ObjectName(vec![Ident::from(table.name.as_str())])); + } + Statement::DescribeTable(table) => { + tables.get_or_insert_with(&table.table_name, |_| { + table.table_name.clone() + }); + } + } + } + + // Always include information_schema if available + if provider.config.information_schema() { + for s in INFORMATION_SCHEMA_TABLES { + tables.insert(ObjectName(vec![ + Ident::new(INFORMATION_SCHEMA), + Ident::new(*s), + ])); + } + } + + for table in tables { + let owned = object_name_to_table_reference(table)?; + let reference = owned.as_table_reference(); + if let Ok(table) = self.table_provider(reference).await { + let key = provider.resolve(reference); + provider.tables.insert(key, table); + } + } + + let query_planner = SqlToRel::new(&provider); query_planner.statement_to_plan(statements.pop_front().unwrap()) } @@ -939,12 +993,12 @@ impl SessionContext { /// provided reference. /// /// [`register_table`]: SessionContext::register_table - pub fn table<'a>( + pub async fn table<'a>( &self, table_ref: impl Into>, ) -> Result> { let table_ref = table_ref.into(); - let provider = self.table_provider(table_ref)?; + let provider = self.table_provider(table_ref).await?; let plan = LogicalPlanBuilder::scan( table_ref.table(), provider_as_source(Arc::clone(&provider)), @@ -954,14 +1008,14 @@ impl SessionContext { Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) } - /// Return a [`TabelProvider`] for the specified table. - pub fn table_provider<'a>( + /// Return a [`TableProvider`] for the specified table. + pub async fn table_provider<'a>( &self, table_ref: impl Into>, ) -> Result> { let table_ref = table_ref.into(); let schema = self.state.read().schema_for_ref(table_ref)?; - match schema.table(table_ref.table()) { + match schema.table(table_ref.table()).await { Some(ref provider) => Ok(Arc::clone(provider)), _ => Err(DataFusionError::Plan(format!( "No table named '{}'", @@ -1807,22 +1861,52 @@ impl SessionState { } } -impl ContextProvider for SessionState { - fn get_table_provider(&self, name: TableReference) -> Result> { - let resolved_ref = self.resolve_table_ref(name); - match self.schema_for_ref(resolved_ref) { - Ok(schema) => { - let provider = schema.table(resolved_ref.table).ok_or_else(|| { - DataFusionError::Plan(format!( - "table '{}.{}.{}' not found", - resolved_ref.catalog, resolved_ref.schema, resolved_ref.table - )) - })?; - Ok(provider_as_source(provider)) - } - Err(e) => Err(e), +#[derive(Debug, Hash, PartialEq, Eq)] +struct TableKey { + catalog: String, + schema: String, + table: String, +} + +struct QueryContextProvider { + /// Session configuration + config: SessionConfig, + /// Scalar functions that are registered with the context + scalar_functions: HashMap>, + /// Aggregate functions registered in the context + aggregate_functions: HashMap>, + /// Aggregate functions registered in the context keyed by (catalog, schema, table) + tables: HashMap>, + /// Execution properties + execution_props: ExecutionProps, +} + +impl QueryContextProvider { + fn resolve(&self, reference: TableReference) -> TableKey { + let resolved = + reference.resolve(&self.config.default_catalog, &self.config.default_schema); + + TableKey { + catalog: resolved.catalog.to_string(), + schema: resolved.schema.to_string(), + table: resolved.table.to_string(), } } +} + +impl ContextProvider for QueryContextProvider { + fn get_table_provider(&self, name: TableReference) -> Result> { + let resolved = self.resolve(name); + + // TODO: Use Cow in TableReference + let provider = self.tables.get(&resolved).ok_or_else(|| { + DataFusionError::Plan(format!( + "table '{}.{}.{}' not found", + resolved.catalog, resolved.schema, resolved.table + )) + })?; + Ok(provider_as_source(Arc::clone(provider))) + } fn get_function_meta(&self, name: &str) -> Option> { self.scalar_functions.get(name).cloned() @@ -2045,8 +2129,7 @@ mod tests { use std::fs::File; use std::path::PathBuf; use std::sync::Weak; - use std::thread::{self, JoinHandle}; - use std::{env, io::prelude::*, sync::Mutex}; + use std::{env, io::prelude::*}; use tempfile::TempDir; #[tokio::test] @@ -2263,28 +2346,29 @@ mod tests { // environment. Usecase is for concurrent planing. let tmp_dir = TempDir::new()?; let partition_count = 4; - let ctx = Arc::new(Mutex::new(create_ctx(&tmp_dir, partition_count).await?)); + let ctx = Arc::new(create_ctx(&tmp_dir, partition_count).await?); - let threads: Vec>> = (0..2) + let handles: Vec<_> = (0..2) .map(|_| ctx.clone()) - .map(|ctx_clone| { - thread::spawn(move || { - let ctx = ctx_clone.lock().expect("Locked context"); + .map(|ctx| { + tokio::spawn(async move { // Ensure we can create logical plan code on a separate thread. ctx.create_logical_plan( "SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3", ) + .await }) }) .collect(); - for thread in threads { - thread.join().expect("Failed to join thread")?; + for handle in handles { + handle.await.expect("Failed to join thread")?; } Ok(()) } #[tokio::test] + #[allow(deprecated)] async fn with_listing_schema_provider() -> Result<()> { let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); let path = path.join("tests/tpch-csv"); diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs index 762bd9092173c..b4c3157d8a007 100644 --- a/datafusion/core/src/physical_plan/coalesce_batches.rs +++ b/datafusion/core/src/physical_plan/coalesce_batches.rs @@ -347,7 +347,9 @@ mod tests { let partition = create_vec_batches(&schema, 10); let table = MemTable::try_new(schema, vec![partition])?; ctx.register_table("a", Arc::new(table))?; - let plan = ctx.create_logical_plan("SELECT * FROM a WHERE c0 < 1")?; + let plan = ctx + .create_logical_plan("SELECT * FROM a WHERE c0 < 1") + .await?; ctx.create_physical_plan(&plan).await } diff --git a/datafusion/core/tests/dataframe.rs b/datafusion/core/tests/dataframe.rs index 57df7e32afdbf..06a43acfd581a 100644 --- a/datafusion/core/tests/dataframe.rs +++ b/datafusion/core/tests/dataframe.rs @@ -65,11 +65,11 @@ async fn join() -> Result<()> { ctx.register_batch("aa", batch1)?; - let df1 = ctx.table("aa")?; + let df1 = ctx.table("aa").await?; ctx.register_batch("aaa", batch2)?; - let df2 = ctx.table("aaa")?; + let df2 = ctx.table("aaa").await?; let a = df1.join(df2, JoinType::Inner, &["a"], &["a"], None)?; @@ -101,6 +101,7 @@ async fn sort_on_unprojected_columns() -> Result<()> { let df = ctx .table("t") + .await .unwrap() .select(vec![col("a")]) .unwrap() @@ -143,6 +144,7 @@ async fn filter_with_alias_overwrite() -> Result<()> { let df = ctx .table("t") + .await .unwrap() .select(vec![(col("a").eq(lit(10))).alias("a")]) .unwrap() @@ -179,6 +181,7 @@ async fn select_with_alias_overwrite() -> Result<()> { let df = ctx .table("t") + .await .unwrap() .select(vec![col("a").alias("a")]) .unwrap() @@ -213,7 +216,7 @@ async fn test_grouping_sets() -> Result<()> { vec![col("a"), col("b")], ])); - let df = create_test_table()? + let df = create_test_table().await? .aggregate(vec![grouping_set_expr], vec![count(col("a"))])? .sort(vec![ Expr::Sort { @@ -380,7 +383,7 @@ async fn test_grouping_set_array_agg_with_overflow() -> Result<()> { Ok(()) } -fn create_test_table() -> Result> { +async fn create_test_table() -> Result> { let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Utf8, false), Field::new("b", DataType::Int32, false), @@ -404,7 +407,7 @@ fn create_test_table() -> Result> { ctx.register_batch("test", batch)?; - ctx.table("test") + ctx.table("test").await } async fn aggregates_table(ctx: &SessionContext) -> Result> { diff --git a/datafusion/core/tests/dataframe_functions.rs b/datafusion/core/tests/dataframe_functions.rs index 5b643e12892b2..e6a585066476f 100644 --- a/datafusion/core/tests/dataframe_functions.rs +++ b/datafusion/core/tests/dataframe_functions.rs @@ -34,7 +34,7 @@ use datafusion::execution::context::SessionContext; use datafusion::assert_batches_eq; use datafusion_expr::{approx_median, cast}; -fn create_test_table() -> Result> { +async fn create_test_table() -> Result> { let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Utf8, false), Field::new("b", DataType::Int32, false), @@ -58,7 +58,7 @@ fn create_test_table() -> Result> { ctx.register_batch("test", batch)?; - ctx.table("test") + ctx.table("test").await } /// Excutes an expression on the test dataframe as a select. @@ -69,7 +69,7 @@ macro_rules! assert_fn_batches { assert_fn_batches!($EXPR, $EXPECTED, 10) }; ($EXPR:expr, $EXPECTED: expr, $LIMIT: expr) => { - let df = create_test_table()?; + let df = create_test_table().await?; let df = df.select(vec![$EXPR])?.limit(0, Some($LIMIT))?; let batches = df.collect().await?; @@ -162,7 +162,7 @@ async fn test_fn_approx_median() -> Result<()> { "+----------------------+", ]; - let df = create_test_table()?; + let df = create_test_table().await?; let batches = df.aggregate(vec![], vec![expr]).unwrap().collect().await?; assert_batches_eq!(expected, &batches); @@ -182,7 +182,7 @@ async fn test_fn_approx_percentile_cont() -> Result<()> { "+-------------------------------------------+", ]; - let df = create_test_table()?; + let df = create_test_table().await?; let batches = df.aggregate(vec![], vec![expr]).unwrap().collect().await?; assert_batches_eq!(expected, &batches); diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 2dd3a8dec8d37..dbb112e71af7a 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -25,7 +25,7 @@ async fn csv_query_avg_multi_batch() -> Result<()> { let ctx = SessionContext::new(); register_aggregate_csv(&ctx).await?; let sql = "SELECT avg(c12) FROM aggregate_test_100"; - let plan = ctx.create_logical_plan(sql).unwrap(); + let plan = ctx.create_logical_plan(sql).await.unwrap(); let plan = ctx.optimize(&plan).unwrap(); let plan = ctx.create_physical_plan(&plan).await.unwrap(); let task_ctx = ctx.task_ctx(); diff --git a/datafusion/core/tests/sql/create_drop.rs b/datafusion/core/tests/sql/create_drop.rs index 1d32d5e0d93cc..1e3331db6b31f 100644 --- a/datafusion/core/tests/sql/create_drop.rs +++ b/datafusion/core/tests/sql/create_drop.rs @@ -96,7 +96,7 @@ async fn create_external_table_with_ddl() -> Result<()> { let exists = schema.table_exist("dt"); assert!(exists, "Table should have been created!"); - let table_schema = schema.table("dt").unwrap().schema(); + let table_schema = schema.table("dt").await.unwrap().schema(); assert_eq!(3, table_schema.fields().len()); diff --git a/datafusion/core/tests/sql/errors.rs b/datafusion/core/tests/sql/errors.rs index f3761320bf60e..5370e8878cdb5 100644 --- a/datafusion/core/tests/sql/errors.rs +++ b/datafusion/core/tests/sql/errors.rs @@ -23,7 +23,7 @@ async fn csv_query_error() -> Result<()> { let ctx = create_ctx(); register_aggregate_csv(&ctx).await?; let sql = "SELECT sin(c1) FROM aggregate_test_100"; - let plan = ctx.create_logical_plan(sql); + let plan = ctx.create_logical_plan(sql).await; assert!(plan.is_err()); Ok(()) } @@ -34,7 +34,7 @@ async fn test_cast_expressions_error() -> Result<()> { let ctx = create_ctx(); register_aggregate_csv(&ctx).await?; let sql = "SELECT CAST(c1 AS INT) FROM aggregate_test_100"; - let plan = ctx.create_logical_plan(sql).unwrap(); + let plan = ctx.create_logical_plan(sql).await.unwrap(); let plan = ctx.optimize(&plan).unwrap(); let plan = ctx.create_physical_plan(&plan).await.unwrap(); let task_ctx = ctx.task_ctx(); @@ -58,7 +58,7 @@ async fn test_aggregation_with_bad_arguments() -> Result<()> { let ctx = SessionContext::new(); register_aggregate_csv(&ctx).await?; let sql = "SELECT COUNT(DISTINCT) FROM aggregate_test_100"; - let logical_plan = ctx.create_logical_plan(sql); + let logical_plan = ctx.create_logical_plan(sql).await; let err = logical_plan.unwrap_err(); assert_eq!( err.to_string(), @@ -76,7 +76,7 @@ async fn query_cte_incorrect() -> Result<()> { // self reference let sql = "WITH t AS (SELECT * FROM t) SELECT * from u"; - let plan = ctx.create_logical_plan(sql); + let plan = ctx.create_logical_plan(sql).await; assert!(plan.is_err()); assert_eq!( format!("{}", plan.unwrap_err()), @@ -85,7 +85,7 @@ async fn query_cte_incorrect() -> Result<()> { // forward referencing let sql = "WITH t AS (SELECT * FROM u), u AS (SELECT 1) SELECT * from u"; - let plan = ctx.create_logical_plan(sql); + let plan = ctx.create_logical_plan(sql).await; assert!(plan.is_err()); assert_eq!( format!("{}", plan.unwrap_err()), @@ -94,7 +94,7 @@ async fn query_cte_incorrect() -> Result<()> { // wrapping should hide u let sql = "WITH t AS (WITH u as (SELECT 1) SELECT 1) SELECT * from u"; - let plan = ctx.create_logical_plan(sql); + let plan = ctx.create_logical_plan(sql).await; assert!(plan.is_err()); assert_eq!( format!("{}", plan.unwrap_err()), @@ -148,7 +148,7 @@ async fn unsupported_sql_returns_error() -> Result<()> { register_aggregate_csv(&ctx).await?; // create view let sql = "create view test_view as select * from aggregate_test_100"; - let plan = ctx.create_logical_plan(sql); + let plan = ctx.create_logical_plan(sql).await; let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await; assert!(physical_plan.is_err()); assert_eq!( @@ -158,7 +158,7 @@ async fn unsupported_sql_returns_error() -> Result<()> { ); // // drop view let sql = "drop view test_view"; - let plan = ctx.create_logical_plan(sql); + let plan = ctx.create_logical_plan(sql).await; let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await; assert!(physical_plan.is_err()); assert_eq!( @@ -168,7 +168,7 @@ async fn unsupported_sql_returns_error() -> Result<()> { ); // // drop table let sql = "drop table aggregate_test_100"; - let plan = ctx.create_logical_plan(sql); + let plan = ctx.create_logical_plan(sql).await; let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await; assert!(physical_plan.is_err()); assert_eq!( diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 89112adae74a6..8576e9d4fc8a3 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -42,7 +42,7 @@ async fn explain_analyze_baseline_metrics() { SELECT lead(c1, 1) OVER () as cnt FROM (select 1 as c1) AS b \ LIMIT 3"; println!("running query: {}", sql); - let plan = ctx.create_logical_plan(sql).unwrap(); + let plan = ctx.create_logical_plan(sql).await.unwrap(); let plan = ctx.optimize(&plan).unwrap(); let physical_plan = ctx.create_physical_plan(&plan).await.unwrap(); let task_ctx = ctx.task_ctx(); @@ -187,7 +187,7 @@ async fn csv_explain_plans() { // Logical plan // Create plan let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); + let plan = ctx.create_logical_plan(sql).await.expect(&msg); let logical_schema = plan.schema(); // println!("SQL: {}", sql); @@ -410,7 +410,7 @@ async fn csv_explain_verbose_plans() { // Logical plan // Create plan let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); + let plan = ctx.create_logical_plan(sql).await.expect(&msg); let logical_schema = plan.schema(); // println!("SQL: {}", sql); @@ -647,7 +647,7 @@ group by order by revenue desc;"; - let mut plan = ctx.create_logical_plan(sql); + let mut plan = ctx.create_logical_plan(sql).await; plan = ctx.optimize(&plan.unwrap()); let expected = "\ @@ -680,7 +680,7 @@ async fn test_physical_plan_display_indent() { GROUP BY c1 \ ORDER BY the_min DESC \ LIMIT 10"; - let plan = ctx.create_logical_plan(sql).unwrap(); + let plan = ctx.create_logical_plan(sql).await.unwrap(); let plan = ctx.optimize(&plan).unwrap(); let physical_plan = ctx.create_physical_plan(&plan).await.unwrap(); @@ -727,7 +727,7 @@ async fn test_physical_plan_display_indent_multi_children() { ON c1=c2\ "; - let plan = ctx.create_logical_plan(sql).unwrap(); + let plan = ctx.create_logical_plan(sql).await.unwrap(); let plan = ctx.optimize(&plan).unwrap(); let physical_plan = ctx.create_physical_plan(&plan).await.unwrap(); @@ -914,7 +914,7 @@ async fn explain_nested() { .set_bool(OPT_EXPLAIN_PHYSICAL_PLAN_ONLY, explain_phy_plan_flag); let ctx = SessionContext::with_config(config); let sql = "EXPLAIN explain select 1"; - let plan = ctx.create_logical_plan(sql).unwrap(); + let plan = ctx.create_logical_plan(sql).await.unwrap(); let plan = ctx.create_physical_plan(&plan).await; diff --git a/datafusion/core/tests/sql/information_schema.rs b/datafusion/core/tests/sql/information_schema.rs index 24652f67bf97b..ff19bd70070db 100644 --- a/datafusion/core/tests/sql/information_schema.rs +++ b/datafusion/core/tests/sql/information_schema.rs @@ -40,7 +40,7 @@ async fn information_schema_tables_not_exist_by_default() { assert_eq!( err.to_string(), // Error propagates from SessionState::schema_for_ref - "Error during planning: failed to resolve schema: information_schema" + "Error during planning: table 'datafusion.information_schema.tables' not found" ); } @@ -497,7 +497,7 @@ async fn information_schema_columns_not_exist_by_default() { assert_eq!( err.to_string(), // Error propagates from SessionState::schema_for_ref - "Error during planning: failed to resolve schema: information_schema" + "Error during planning: table 'datafusion.information_schema.columns' not found" ); } diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index 1094a818f8210..8a77a40fe415a 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -119,7 +119,7 @@ async fn equijoin_left_and_condition_from_right() -> Result<()> { let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?; let sql = "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t2_name >= 'y' ORDER BY t1_id"; - let res = ctx.create_logical_plan(sql); + let res = ctx.create_logical_plan(sql).await; assert!(res.is_ok()); let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ @@ -145,7 +145,7 @@ async fn equijoin_left_and_not_null_condition_from_right() -> Result<()> { let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?; let sql = "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t2_name is not null ORDER BY t1_id"; - let res = ctx.create_logical_plan(sql); + let res = ctx.create_logical_plan(sql).await; assert!(res.is_ok()); let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ @@ -172,7 +172,7 @@ async fn full_join_sub_query() -> Result<()> { let sql = " SELECT t1_id, t1_name, t2_name FROM (SELECT * from (t1) AS t1) FULL JOIN (SELECT * from (t2) AS t2) ON t1_id = t2_id AND t2_name >= 'y' ORDER BY t1_id, t2_name"; - let res = ctx.create_logical_plan(sql); + let res = ctx.create_logical_plan(sql).await; assert!(res.is_ok()); let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ @@ -200,7 +200,7 @@ async fn equijoin_right_and_condition_from_left() -> Result<()> { let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?; let sql = "SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id AND t1_id >= 22 ORDER BY t2_name"; - let res = ctx.create_logical_plan(sql); + let res = ctx.create_logical_plan(sql).await; assert!(res.is_ok()); let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ @@ -272,7 +272,7 @@ async fn equijoin_right_and_condition_from_right() -> Result<()> { let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?; let sql = "SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id AND t2_id >= 22 ORDER BY t2_name"; - let res = ctx.create_logical_plan(sql); + let res = ctx.create_logical_plan(sql).await; assert!(res.is_ok()); let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ @@ -297,7 +297,7 @@ async fn equijoin_right_and_condition_from_both() -> Result<()> { let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?; let sql = "SELECT t1_int, t2_int, t2_id FROM t1 RIGHT JOIN t2 ON t1_id = t2_id AND t2_int <= t1_int ORDER BY t2_id"; - let res = ctx.create_logical_plan(sql); + let res = ctx.create_logical_plan(sql).await; assert!(res.is_ok()); let actual = execute_to_batches(&ctx, sql).await; let expected = vec![ @@ -1358,6 +1358,7 @@ async fn hash_join_with_date32() -> Result<()> { let msg = format!("Creating logical plan for '{}'", sql); let plan = ctx .create_logical_plan(&("explain ".to_owned() + sql)) + .await .expect(&msg); let state = ctx.state(); let plan = state.optimize(&plan)?; @@ -1400,6 +1401,7 @@ async fn hash_join_with_date64() -> Result<()> { let msg = format!("Creating logical plan for '{}'", sql); let plan = ctx .create_logical_plan(&("explain ".to_owned() + sql)) + .await .expect(&msg); let state = ctx.state(); let plan = state.optimize(&plan)?; @@ -1444,6 +1446,7 @@ async fn hash_join_with_decimal() -> Result<()> { let msg = format!("Creating logical plan for '{}'", sql); let plan = ctx .create_logical_plan(&("explain ".to_owned() + sql)) + .await .expect(&msg); let state = ctx.state(); let plan = state.optimize(&plan)?; @@ -1488,6 +1491,7 @@ async fn hash_join_with_dictionary() -> Result<()> { let msg = format!("Creating logical plan for '{}'", sql); let plan = ctx .create_logical_plan(&("explain ".to_owned() + sql)) + .await .expect(&msg); let state = ctx.state(); let plan = state.optimize(&plan)?; @@ -1532,6 +1536,7 @@ async fn reduce_left_join_1() -> Result<()> { let msg = format!("Creating logical plan for '{}'", sql); let plan = ctx .create_logical_plan(&("explain ".to_owned() + sql)) + .await .expect(&msg); let state = ctx.state(); let plan = state.optimize(&plan)?; @@ -1579,6 +1584,7 @@ async fn reduce_left_join_2() -> Result<()> { let msg = format!("Creating logical plan for '{}'", sql); let plan = ctx .create_logical_plan(&("explain ".to_owned() + sql)) + .await .expect(&msg); let state = ctx.state(); let plan = state.optimize(&plan)?; @@ -1631,6 +1637,7 @@ async fn reduce_left_join_3() -> Result<()> { let msg = format!("Creating logical plan for '{}'", sql); let plan = ctx .create_logical_plan(&("explain ".to_owned() + sql)) + .await .expect(&msg); let state = ctx.state(); let plan = state.optimize(&plan)?; @@ -1680,6 +1687,7 @@ async fn reduce_right_join_1() -> Result<()> { let msg = format!("Creating logical plan for '{}'", sql); let plan = ctx .create_logical_plan(&("explain ".to_owned() + sql)) + .await .expect(&msg); let state = ctx.state(); let plan = state.optimize(&plan)?; @@ -1726,6 +1734,7 @@ async fn reduce_right_join_2() -> Result<()> { let msg = format!("Creating logical plan for '{}'", sql); let plan = ctx .create_logical_plan(&("explain ".to_owned() + sql)) + .await .expect(&msg); let state = ctx.state(); let plan = state.optimize(&plan)?; @@ -1772,6 +1781,7 @@ async fn reduce_full_join_to_right_join() -> Result<()> { let msg = format!("Creating logical plan for '{}'", sql); let plan = ctx .create_logical_plan(&("explain ".to_owned() + sql)) + .await .expect(&msg); let state = ctx.state(); let plan = state.optimize(&plan)?; @@ -1820,6 +1830,7 @@ async fn reduce_full_join_to_left_join() -> Result<()> { let msg = format!("Creating logical plan for '{}'", sql); let plan = ctx .create_logical_plan(&("explain ".to_owned() + sql)) + .await .expect(&msg); let state = ctx.state(); let plan = state.optimize(&plan)?; @@ -1865,6 +1876,7 @@ async fn reduce_full_join_to_inner_join() -> Result<()> { let msg = format!("Creating logical plan for '{}'", sql); let plan = ctx .create_logical_plan(&("explain ".to_owned() + sql)) + .await .expect(&msg); let state = ctx.state(); let plan = state.optimize(&plan)?; @@ -1930,7 +1942,7 @@ async fn sort_merge_join_on_date32() -> Result<()> { // inner sort merge join on data type (Date32) let sql = "select * from t1 join t2 on t1.c1 = t2.c1"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); + let plan = ctx.create_logical_plan(sql).await.expect(&msg); let state = ctx.state(); let logical_plan = state.optimize(&plan)?; let physical_plan = state.create_physical_plan(&logical_plan).await?; @@ -1978,7 +1990,7 @@ async fn sort_merge_join_on_decimal() -> Result<()> { // right join on data type (Decimal) let sql = "select * from t1 right join t2 on t1.c3 = t2.c3"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); + let plan = ctx.create_logical_plan(sql).await.expect(&msg); let state = ctx.state(); let logical_plan = state.optimize(&plan)?; let physical_plan = state.create_physical_plan(&logical_plan).await?; @@ -2034,7 +2046,7 @@ async fn left_semi_join() -> Result<()> { let sql = "SELECT t1_id, t1_name FROM t1 WHERE t1_id IN (SELECT t2_id FROM t2) ORDER BY t1_id"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); + let plan = ctx.create_logical_plan(sql).await.expect(&msg); let state = ctx.state(); let logical_plan = state.optimize(&plan)?; let physical_plan = state.create_physical_plan(&logical_plan).await?; @@ -2225,7 +2237,7 @@ async fn right_semi_join() -> Result<()> { let sql = "SELECT t1_id, t1_name, t1_int FROM t1 WHERE EXISTS (SELECT * FROM t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); + let plan = ctx.create_logical_plan(sql).await.expect(&msg); let state = ctx.state(); let logical_plan = state.optimize(&plan)?; let physical_plan = state.create_physical_plan(&logical_plan).await?; @@ -2316,6 +2328,7 @@ async fn reduce_cross_join_with_expr_join_key_all() -> Result<()> { let msg = format!("Creating logical plan for '{}'", sql); let plan = ctx .create_logical_plan(&("explain ".to_owned() + sql)) + .await .expect(&msg); let state = ctx.state(); let plan = state.optimize(&plan)?; @@ -2363,6 +2376,7 @@ async fn reduce_cross_join_with_cast_expr_join_key() -> Result<()> { let msg = format!("Creating logical plan for '{}'", sql); let plan = ctx .create_logical_plan(&("explain ".to_owned() + sql)) + .await .expect(&msg); let state = ctx.state(); let plan = state.optimize(&plan)?; @@ -2410,6 +2424,7 @@ async fn reduce_cross_join_with_wildcard_and_expr() -> Result<()> { let msg = format!("Creating logical plan for '{}'", sql); let plan = ctx .create_logical_plan(&("explain ".to_owned() + sql)) + .await .expect(&msg); let state = ctx.state(); let plan = state.optimize(&plan)?; diff --git a/datafusion/core/tests/sql/json.rs b/datafusion/core/tests/sql/json.rs index e66fcb65561a5..878fa441b5bbd 100644 --- a/datafusion/core/tests/sql/json.rs +++ b/datafusion/core/tests/sql/json.rs @@ -60,7 +60,7 @@ async fn json_single_nan_schema() { .await .unwrap(); let sql = "SELECT mycol FROM single_nan"; - let plan = ctx.create_logical_plan(sql).unwrap(); + let plan = ctx.create_logical_plan(sql).await.unwrap(); let plan = ctx.optimize(&plan).unwrap(); let plan = ctx.create_physical_plan(&plan).await.unwrap(); let task_ctx = ctx.task_ctx(); diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 1e1307672394a..ff2a6fb93696b 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -1018,7 +1018,7 @@ async fn try_execute_to_batches( ctx: &SessionContext, sql: &str, ) -> Result> { - let plan = ctx.create_logical_plan(sql)?; + let plan = ctx.create_logical_plan(sql).await?; let logical_schema = plan.schema(); let plan = ctx.optimize(&plan)?; @@ -1038,6 +1038,7 @@ async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec let msg = format!("Creating logical plan for '{}'", sql); let plan = ctx .create_logical_plan(sql) + .await .map_err(|e| format!("{:?} at {}", e, msg)) .unwrap(); let logical_schema = plan.schema(); @@ -1567,7 +1568,7 @@ async fn nyc() -> Result<()> { let logical_plan = ctx.create_logical_plan( "SELECT passenger_count, MIN(fare_amount), MAX(fare_amount) \ FROM tripdata GROUP BY passenger_count", - )?; + ).await?; let optimized_plan = ctx.optimize(&logical_plan)?; diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs index e2a33c7c29a1c..a5dd0034d99ac 100644 --- a/datafusion/core/tests/sql/parquet.rs +++ b/datafusion/core/tests/sql/parquet.rs @@ -157,7 +157,7 @@ async fn fixed_size_binary_columns() { .await .unwrap(); let sql = "SELECT ids FROM t0 ORDER BY ids"; - let plan = ctx.create_logical_plan(sql).unwrap(); + let plan = ctx.create_logical_plan(sql).await.unwrap(); let plan = ctx.optimize(&plan).unwrap(); let plan = ctx.create_physical_plan(&plan).await.unwrap(); let task_ctx = ctx.task_ctx(); @@ -180,7 +180,7 @@ async fn parquet_single_nan_schema() { .await .unwrap(); let sql = "SELECT mycol FROM single_nan"; - let plan = ctx.create_logical_plan(sql).unwrap(); + let plan = ctx.create_logical_plan(sql).await.unwrap(); let plan = ctx.optimize(&plan).unwrap(); let plan = ctx.create_physical_plan(&plan).await.unwrap(); let task_ctx = ctx.task_ctx(); @@ -218,7 +218,7 @@ async fn parquet_list_columns() { ])); let sql = "SELECT int64_list, utf8_list FROM list_columns"; - let plan = ctx.create_logical_plan(sql).unwrap(); + let plan = ctx.create_logical_plan(sql).await.unwrap(); let plan = ctx.optimize(&plan).unwrap(); let plan = ctx.create_physical_plan(&plan).await.unwrap(); let task_ctx = ctx.task_ctx(); diff --git a/datafusion/core/tests/sql/predicates.rs b/datafusion/core/tests/sql/predicates.rs index 0510b12637b2a..c11f975a6a43a 100644 --- a/datafusion/core/tests/sql/predicates.rs +++ b/datafusion/core/tests/sql/predicates.rs @@ -581,7 +581,7 @@ async fn multiple_or_predicates() -> Result<()> { and p_size between 1 and 15 )"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); + let plan = ctx.create_logical_plan(sql).await.expect(&msg); let state = ctx.state(); let plan = state.optimize(&plan)?; // Note that we expect `part.p_partkey = lineitem.l_partkey` to have been @@ -645,7 +645,7 @@ where ;"#; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); + let plan = ctx.create_logical_plan(sql).await.expect(&msg); let state = ctx.state(); let plan = state.optimize(&plan)?; let formatted = plan.display_indent_schema().to_string(); diff --git a/datafusion/core/tests/sql/projection.rs b/datafusion/core/tests/sql/projection.rs index 3163482e8b4e0..2708afa9854ad 100644 --- a/datafusion/core/tests/sql/projection.rs +++ b/datafusion/core/tests/sql/projection.rs @@ -167,7 +167,7 @@ async fn projection_on_table_scan() -> Result<()> { let partition_count = 4; let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?; - let table = ctx.table("test")?; + let table = ctx.table("test").await?; let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan()?) .project(vec![col("c2")])? .build()?; @@ -208,7 +208,7 @@ async fn preserve_nullability_on_projection() -> Result<()> { let tmp_dir = TempDir::new()?; let ctx = partitioned_csv::create_ctx(&tmp_dir, 1).await?; - let schema: Schema = ctx.table("test").unwrap().schema().clone().into(); + let schema: Schema = ctx.table("test").await.unwrap().schema().clone().into(); assert!(!schema.field_with_name("c1")?.is_nullable()); let plan = scan_empty(None, &schema, None)? diff --git a/datafusion/core/tests/sql/references.rs b/datafusion/core/tests/sql/references.rs index 52a82f071ff24..079b4e8b9052f 100644 --- a/datafusion/core/tests/sql/references.rs +++ b/datafusion/core/tests/sql/references.rs @@ -64,7 +64,7 @@ async fn qualified_table_references_and_fields() -> Result<()> { // referring to the unquoted column is an error let sql = r#"SELECT f1.c1 from test"#; - let error = ctx.create_logical_plan(sql).unwrap_err(); + let error = ctx.create_logical_plan(sql).await.unwrap_err(); assert_contains!( error.to_string(), "No field named 'f1'.'c1'. Valid fields are 'test'.'f.c1', 'test'.'test.c2'" diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index c82eee7d0f7cc..e239aea566199 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -77,12 +77,12 @@ async fn select_values_list() -> Result<()> { } { let sql = "VALUES"; - let plan = ctx.create_logical_plan(sql); + let plan = ctx.create_logical_plan(sql).await; assert!(plan.is_err()); } { let sql = "VALUES ()"; - let plan = ctx.create_logical_plan(sql); + let plan = ctx.create_logical_plan(sql).await; assert!(plan.is_err()); } { @@ -100,7 +100,7 @@ async fn select_values_list() -> Result<()> { } { let sql = "VALUES (1),()"; - let plan = ctx.create_logical_plan(sql); + let plan = ctx.create_logical_plan(sql).await; assert!(plan.is_err()); } { @@ -118,22 +118,22 @@ async fn select_values_list() -> Result<()> { } { let sql = "VALUES (1),(1,2)"; - let plan = ctx.create_logical_plan(sql); + let plan = ctx.create_logical_plan(sql).await; assert!(plan.is_err()); } { let sql = "VALUES (1),('2')"; - let plan = ctx.create_logical_plan(sql); + let plan = ctx.create_logical_plan(sql).await; assert!(plan.is_err()); } { let sql = "VALUES (1),(2.0)"; - let plan = ctx.create_logical_plan(sql); + let plan = ctx.create_logical_plan(sql).await; assert!(plan.is_err()); } { let sql = "VALUES (1,2), (1,'2')"; - let plan = ctx.create_logical_plan(sql); + let plan = ctx.create_logical_plan(sql).await; assert!(plan.is_err()); } { @@ -1269,7 +1269,7 @@ async fn test_prepare_statement() -> Result<()> { // sql to statement then to prepare logical plan with parameters // c1 defined as UINT32, c2 defined as UInt64 but the params are Int32 and Float64 let logical_plan = - ctx.create_logical_plan("PREPARE my_plan(INT, DOUBLE) AS SELECT c1, c2 FROM test WHERE c1 > $2 AND c1 < $1")?; + ctx.create_logical_plan("PREPARE my_plan(INT, DOUBLE) AS SELECT c1, c2 FROM test WHERE c1 > $2 AND c1 < $1").await?; // prepare logical plan to logical plan without parameters let param_values = vec![ScalarValue::Int32(Some(3)), ScalarValue::Float64(Some(0.0))]; @@ -1331,8 +1331,9 @@ async fn parallel_query_with_filter() -> Result<()> { let partition_count = 4; let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?; - let logical_plan = - ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?; + let logical_plan = ctx + .create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3") + .await?; let logical_plan = ctx.optimize(&logical_plan)?; let physical_plan = ctx.create_physical_plan(&logical_plan).await?; @@ -1492,7 +1493,7 @@ async fn case_sensitive_in_default_dialect() { { let sql = "select \"int32\" from t"; - let plan = ctx.create_logical_plan(sql); + let plan = ctx.create_logical_plan(sql).await; assert!(plan.is_err()); } diff --git a/datafusion/core/tests/sql/subqueries.rs b/datafusion/core/tests/sql/subqueries.rs index a1b22389faaa9..999579c890679 100644 --- a/datafusion/core/tests/sql/subqueries.rs +++ b/datafusion/core/tests/sql/subqueries.rs @@ -45,7 +45,7 @@ where c_acctbal < ( ) order by c_custkey;"#; // assert plan - let plan = ctx.create_logical_plan(sql).unwrap(); + let plan = ctx.create_logical_plan(sql).await.unwrap(); debug!("input:\n{}", plan.display_indent()); let plan = ctx.optimize(&plan).unwrap(); @@ -91,7 +91,7 @@ where o_orderstatus in ( );"#; // assert plan - let plan = ctx.create_logical_plan(sql).unwrap(); + let plan = ctx.create_logical_plan(sql).await.unwrap(); let plan = ctx.optimize(&plan).unwrap(); let actual = format!("{}", plan.display_indent()); let expected = "Projection: orders.o_orderkey\ @@ -137,7 +137,7 @@ where p_partkey = ps_partkey and s_suppkey = ps_suppkey and p_size = 15 and p_ty order by s_acctbal desc, n_name, s_name, p_partkey;"#; // assert plan - let plan = ctx.create_logical_plan(sql).unwrap(); + let plan = ctx.create_logical_plan(sql).await.unwrap(); let plan = ctx.optimize(&plan).unwrap(); let actual = format!("{}", plan.display_indent()); let expected = "Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST\ @@ -201,7 +201,7 @@ async fn tpch_q4_correlated() -> Result<()> { "#; // assert plan - let plan = ctx.create_logical_plan(sql).unwrap(); + let plan = ctx.create_logical_plan(sql).await.unwrap(); let plan = ctx.optimize(&plan).unwrap(); let actual = format!("{}", plan.display_indent()); let expected = r#"Sort: orders.o_orderpriority ASC NULLS LAST @@ -254,6 +254,7 @@ async fn tpch_q17_correlated() -> Result<()> { // assert plan let plan = ctx .create_logical_plan(sql) + .await .map_err(|e| format!("{:?} at {}", e, "error")) .unwrap(); println!("before:\n{}", plan.display_indent()); @@ -315,6 +316,7 @@ order by s_name; // assert plan let plan = ctx .create_logical_plan(sql) + .await .map_err(|e| format!("{:?} at {}", e, "error")) .unwrap(); let plan = ctx @@ -376,6 +378,7 @@ order by cntrycode;"#; // assert plan let plan = ctx .create_logical_plan(sql) + .await .map_err(|e| format!("{:?} at {}", e, "error")) .unwrap(); let plan = ctx @@ -438,6 +441,7 @@ order by value desc; // assert plan let plan = ctx .create_logical_plan(sql) + .await .map_err(|e| format!("{:?} at {}", e, "error")) .unwrap(); let plan = ctx diff --git a/datafusion/core/tests/sql/udf.rs b/datafusion/core/tests/sql/udf.rs index f554b9424f980..a60fe72b0debc 100644 --- a/datafusion/core/tests/sql/udf.rs +++ b/datafusion/core/tests/sql/udf.rs @@ -75,7 +75,7 @@ async fn scalar_udf() -> Result<()> { // from here on, we may be in a different scope. We would still like to be able // to call UDFs. - let t = ctx.table("t")?; + let t = ctx.table("t").await?; let plan = LogicalPlanBuilder::from(t.to_logical_plan()?) .project(vec![ diff --git a/datafusion/core/tests/sql/wildcard.rs b/datafusion/core/tests/sql/wildcard.rs index ddf50dfa813fb..f5dbdcc721538 100644 --- a/datafusion/core/tests/sql/wildcard.rs +++ b/datafusion/core/tests/sql/wildcard.rs @@ -136,7 +136,7 @@ async fn select_wrong_qualified_wildcard() -> Result<()> { register_aggregate_simple_csv(&ctx).await?; let sql = "SELECT agg.* FROM aggregate_simple order by c1"; - let result = ctx.create_logical_plan(sql); + let result = ctx.create_logical_plan(sql).await; match result { Ok(_) => panic!("unexpected OK"), Err(err) => assert_eq!( diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs index b550e7f5dd60b..cb37f8c2292c6 100644 --- a/datafusion/core/tests/sql/window.rs +++ b/datafusion/core/tests/sql/window.rs @@ -327,6 +327,7 @@ async fn window_expr_eliminate() -> Result<()> { let msg = format!("Creating logical plan for '{}'", sql); let plan = ctx .create_logical_plan(&("explain ".to_owned() + sql)) + .await .expect(&msg); let state = ctx.state(); let plan = state.optimize(&plan)?; @@ -393,6 +394,7 @@ async fn window_expr_eliminate() -> Result<()> { let plan = ctx .create_logical_plan(&("explain ".to_owned() + sql)) + .await .expect(&msg); let plan = state.optimize(&plan)?; let expected = vec![ @@ -1621,7 +1623,7 @@ async fn test_window_agg_sort() -> Result<()> { FROM aggregate_test_100"; let msg = format!("Creating logical plan for '{}'", sql); - let plan = ctx.create_logical_plan(sql).expect(&msg); + let plan = ctx.create_logical_plan(sql).await.expect(&msg); let state = ctx.state(); let logical_plan = state.optimize(&plan)?; let physical_plan = state.create_physical_plan(&logical_plan).await?; diff --git a/datafusion/core/tests/sqllogictests/src/insert/mod.rs b/datafusion/core/tests/sqllogictests/src/insert/mod.rs index 3412e4ad8db4f..17d673302cbb0 100644 --- a/datafusion/core/tests/sqllogictests/src/insert/mod.rs +++ b/datafusion/core/tests/sqllogictests/src/insert/mod.rs @@ -54,8 +54,8 @@ pub async fn insert(ctx: &SessionContext, insert_stmt: SQLStatement) -> Result DFParser<'a> { break; } if expecting_statement_delimiter { - return parser.expected("end of statement", parser.parser.peek_token()); + return parser + .expected("end of statement", parser.parser.peek_token().token); } let statement = parser.parse_statement()?; @@ -169,7 +170,7 @@ impl<'a> DFParser<'a> { /// Parse a new expression pub fn parse_statement(&mut self) -> Result { - match self.parser.peek_token() { + match self.parser.peek_token().token { Token::Word(w) => { match w.keyword { Keyword::CREATE => { @@ -224,11 +225,11 @@ impl<'a> DFParser<'a> { } loop { - if let Token::Word(_) = self.parser.peek_token() { + if let Token::Word(_) = self.parser.peek_token().token { let identifier = self.parser.parse_identifier()?; partitions.push(identifier.to_string()); } else { - return self.expected("partition name", self.parser.peek_token()); + return self.expected("partition name", self.parser.peek_token().token); } let comma = self.parser.consume_token(&Token::Comma); if self.parser.consume_token(&Token::RParen) { @@ -237,7 +238,7 @@ impl<'a> DFParser<'a> { } else if !comma { return self.expected( "',' or ')' after partition definition", - self.parser.peek_token(), + self.parser.peek_token().token, ); } } @@ -259,13 +260,13 @@ impl<'a> DFParser<'a> { loop { if let Some(constraint) = self.parser.parse_optional_table_constraint()? { constraints.push(constraint); - } else if let Token::Word(_) = self.parser.peek_token() { + } else if let Token::Word(_) = self.parser.peek_token().token { let column_def = self.parse_column_def()?; columns.push(column_def); } else { return self.expected( "column name or constraint definition", - self.parser.peek_token(), + self.parser.peek_token().token, ); } let comma = self.parser.consume_token(&Token::Comma); @@ -275,7 +276,7 @@ impl<'a> DFParser<'a> { } else if !comma { return self.expected( "',' or ')' after column definition", - self.parser.peek_token(), + self.parser.peek_token().token, ); } } @@ -300,7 +301,7 @@ impl<'a> DFParser<'a> { } else { return self.expected( "constraint details after CONSTRAINT ", - self.parser.peek_token(), + self.parser.peek_token().token, ); } } else if let Some(option) = self.parser.parse_optional_column_option()? { @@ -376,7 +377,7 @@ impl<'a> DFParser<'a> { /// Parses the set of valid formats fn parse_file_format(&mut self) -> Result { - match self.parser.next_token() { + match self.parser.next_token().token { Token::Word(w) => parse_file_type(&w.value), unexpected => self.expected("one of PARQUET, NDJSON, or CSV", unexpected), } @@ -384,7 +385,7 @@ impl<'a> DFParser<'a> { /// Parses the set of fn parse_file_compression_type(&mut self) -> Result { - match self.parser.next_token() { + match self.parser.next_token().token { Token::Word(w) => parse_file_compression_type(&w.value), unexpected => self.expected("one of GZIP, BZIP2, XZ", unexpected), } @@ -410,7 +411,7 @@ impl<'a> DFParser<'a> { } else if !comma { return self.expected( "',' or ')' after option definition", - self.parser.peek_token(), + self.parser.peek_token().token, ); } }