Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix querying and defining table / view names with period #4530

Merged
merged 7 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl DFSchema {
(Some(qq), None) => {
// the original field may now be aliased with a name that matches the
// original qualified name
let table_ref: TableReference = field.name().as_str().into();
let table_ref = TableReference::parse_str(field.name().as_str());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this is correct as it potentially splits on "." -- but I chose to keep the existing behavior

match table_ref {
TableReference::Partial { schema, table } => {
schema == qq && table == name
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub use error::{field_not_found, DataFusionError, Result, SchemaError};
pub use parsers::parse_interval;
pub use scalar::{ScalarType, ScalarValue};
pub use stats::{ColumnStatistics, Statistics};
pub use table_reference::{ResolvedTableReference, TableReference};
pub use table_reference::{OwnedTableReference, ResolvedTableReference, TableReference};

/// Downcast an Arrow Array to a concrete type, return an `DataFusionError::Internal` if the cast is
/// not possible. In normal usage of DataFusion the downcast should always succeed.
Expand Down
90 changes: 87 additions & 3 deletions datafusion/common/src/table_reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,73 @@ pub enum TableReference<'a> {
},
}

/// Represents a path to a table that may require further resolution
/// that owns the underlying names
#[derive(Debug, Clone)]
pub enum OwnedTableReference {
/// An unqualified table reference, e.g. "table"
Bare {
/// The table name
table: String,
},
/// A partially resolved table reference, e.g. "schema.table"
Partial {
/// The schema containing the table
schema: String,
/// The table name
table: String,
},
/// A fully resolved table reference, e.g. "catalog.schema.table"
Full {
/// The catalog (aka database) containing the table
catalog: String,
/// The schema containing the table
schema: String,
/// The table name
table: String,
},
}

impl OwnedTableReference {
/// Return a `TableReference` view of this `OwnedTableReference`
pub fn as_table_reference(&self) -> TableReference<'_> {
match self {
Self::Bare { table } => TableReference::Bare { table },
Self::Partial { schema, table } => TableReference::Partial { schema, table },
Self::Full {
catalog,
schema,
table,
} => TableReference::Full {
catalog,
schema,
table,
},
}
}

/// Return a string suitable for display
pub fn display_string(&self) -> String {
match self {
OwnedTableReference::Bare { table } => table.clone(),
OwnedTableReference::Partial { schema, table } => format!("{schema}.{table}"),
OwnedTableReference::Full {
catalog,
schema,
table,
} => format!("{catalog}.{schema}.{table}"),
}
}
}

/// Convert `OwnedTableReference` into a `TableReference`. Somewhat
/// akward to use but 'idiomatic': `(&table_ref).into()`
impl<'a> From<&'a OwnedTableReference> for TableReference<'a> {
fn from(r: &'a OwnedTableReference) -> Self {
r.as_table_reference()
}
}

impl<'a> TableReference<'a> {
/// Retrieve the actual table name, regardless of qualification
pub fn table(&self) -> &str {
Expand Down Expand Up @@ -90,10 +157,18 @@ impl<'a> TableReference<'a> {
},
}
}
}

impl<'a> From<&'a str> for TableReference<'a> {
fn from(s: &'a str) -> Self {
/// Forms a [`TableReferece`] by splitting `s` on periods `.`.
///
/// Note that this function does NOT handle periods or name
/// normalization correctly (e.g. `"foo.bar"` will be parsed as
/// `"foo`.`bar"`. and `Foo` will be parsed as `Foo` (not `foo`).
///
/// If you need to handle such identifiers correctly, you should
/// use a SQL oarser or form the [`OwnedTableReference`] directly.
///
/// See more detail in <https://github.com/apache/arrow-datafusion/issues/4532>
pub fn parse_str(s: &'a str) -> Self {
let parts: Vec<&str> = s.split('.').collect();

match parts.len() {
Expand All @@ -112,6 +187,15 @@ impl<'a> From<&'a str> for TableReference<'a> {
}
}

/// Parse a string into a TableReference, by splittig on `.`
///
/// See caveats on [`TableReference::parse_str`]
impl<'a> From<&'a str> for TableReference<'a> {
fn from(s: &'a str) -> Self {
Self::parse_str(s)
}
}

impl<'a> From<ResolvedTableReference<'a>> for TableReference<'a> {
fn from(resolved: ResolvedTableReference<'a>) -> Self {
Self::Full {
Expand Down
8 changes: 6 additions & 2 deletions datafusion/core/src/catalog/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::catalog::schema::SchemaProvider;
use crate::datasource::datasource::TableProviderFactory;
use crate::datasource::TableProvider;
use crate::execution::context::SessionState;
use datafusion_common::{DFSchema, DataFusionError};
use datafusion_common::{DFSchema, DataFusionError, OwnedTableReference};
use datafusion_expr::CreateExternalTable;
use futures::TryStreamExt;
use itertools::Itertools;
Expand Down Expand Up @@ -115,16 +115,20 @@ impl ListingSchemaProvider {
let table_path = table.to_str().ok_or_else(|| {
DataFusionError::Internal("Cannot parse file name!".to_string())
})?;

if !self.table_exist(table_name) {
let table_url = format!("{}/{}", self.authority, table_path);

let name = OwnedTableReference::Bare {
table: table_name.to_string(),
};
let provider = self
.factory
.create(
state,
&CreateExternalTable {
schema: Arc::new(DFSchema::empty()),
name: table_name.to_string(),
name,
location: table_url,
file_type: self.format.clone(),
has_header: self.has_header,
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ mod tests {
let actual = format!("{}", plan.display_indent());
let expected = "\
Explain\
\n CreateView: \"xyz\"\
\n CreateView: Bare { table: \"xyz\" }\
\n Projection: abc.column1, abc.column2, abc.column3\
\n TableScan: abc projection=[column1, column2, column3]";
assert_eq!(expected, actual);
Expand All @@ -516,7 +516,7 @@ mod tests {
let actual = format!("{}", plan.display_indent());
let expected = "\
Explain\
\n CreateView: \"xyz\"\
\n CreateView: Bare { table: \"xyz\" }\
\n Projection: abc.column1, abc.column2, abc.column3\
\n Filter: abc.column2 = Int64(5)\
\n TableScan: abc projection=[column1, column2, column3]";
Expand All @@ -531,7 +531,7 @@ mod tests {
let actual = format!("{}", plan.display_indent());
let expected = "\
Explain\
\n CreateView: \"xyz\"\
\n CreateView: Bare { table: \"xyz\" }\
\n Projection: abc.column1, abc.column2\
\n Filter: abc.column2 = Int64(5)\
\n TableScan: abc projection=[column1, column2]";
Expand Down
28 changes: 14 additions & 14 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ impl SessionContext {
batch: RecordBatch,
) -> Result<Option<Arc<dyn TableProvider>>> {
let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
self.register_table(table_name, Arc::new(table))
self.register_table(TableReference::Bare { table: table_name }, Arc::new(table))
}

/// Return the [RuntimeEnv] used to run queries with this [SessionContext]
Expand Down Expand Up @@ -265,12 +265,12 @@ impl SessionContext {
if_not_exists,
or_replace,
}) => {
let table = self.table(name.as_str());
let table = self.table(&name);

match (if_not_exists, or_replace, table) {
(true, false, Ok(_)) => self.return_empty_dataframe(),
(false, true, Ok(_)) => {
self.deregister_table(name.as_str())?;
self.deregister_table(&name)?;
let physical =
Arc::new(DataFrame::new(self.state.clone(), &input));

Expand All @@ -280,7 +280,7 @@ impl SessionContext {
batches,
)?);

self.register_table(name.as_str(), table)?;
self.register_table(&name, table)?;
self.return_empty_dataframe()
}
(true, true, Ok(_)) => Err(DataFusionError::Internal(
Expand All @@ -296,7 +296,7 @@ impl SessionContext {
batches,
)?);

self.register_table(name.as_str(), table)?;
self.register_table(&name, table)?;
self.return_empty_dataframe()
}
(false, false, Ok(_)) => Err(DataFusionError::Execution(format!(
Expand All @@ -312,22 +312,22 @@ impl SessionContext {
or_replace,
definition,
}) => {
let view = self.table(name.as_str());
let view = self.table(&name);

match (or_replace, view) {
(true, Ok(_)) => {
self.deregister_table(name.as_str())?;
self.deregister_table(&name)?;
let table =
Arc::new(ViewTable::try_new((*input).clone(), definition)?);

self.register_table(name.as_str(), table)?;
self.register_table(&name, table)?;
self.return_empty_dataframe()
}
(_, Err(_)) => {
let table =
Arc::new(ViewTable::try_new((*input).clone(), definition)?);

self.register_table(name.as_str(), table)?;
self.register_table(&name, table)?;
self.return_empty_dataframe()
}
(false, Ok(_)) => Err(DataFusionError::Execution(format!(
Expand All @@ -340,7 +340,7 @@ impl SessionContext {
LogicalPlan::DropTable(DropTable {
name, if_exists, ..
}) => {
let result = self.find_and_deregister(name.as_str(), TableType::Base);
let result = self.find_and_deregister(&name, TableType::Base);
match (result, if_exists) {
(Ok(true), _) => self.return_empty_dataframe(),
(_, true) => self.return_empty_dataframe(),
Expand All @@ -354,7 +354,7 @@ impl SessionContext {
LogicalPlan::DropView(DropView {
name, if_exists, ..
}) => {
let result = self.find_and_deregister(name.as_str(), TableType::View);
let result = self.find_and_deregister(&name, TableType::View);
match (result, if_exists) {
(Ok(true), _) => self.return_empty_dataframe(),
(_, true) => self.return_empty_dataframe(),
Expand Down Expand Up @@ -497,11 +497,11 @@ impl SessionContext {
let table_provider: Arc<dyn TableProvider> =
self.create_custom_table(cmd).await?;

let table = self.table(cmd.name.as_str());
let table = self.table(&cmd.name);
match (cmd.if_not_exists, table) {
(true, Ok(_)) => self.return_empty_dataframe(),
(_, Err(_)) => {
self.register_table(cmd.name.as_str(), table_provider)?;
self.register_table(&cmd.name, table_provider)?;
self.return_empty_dataframe()
}
(false, Ok(_)) => Err(DataFusionError::Execution(format!(
Expand Down Expand Up @@ -765,7 +765,7 @@ impl SessionContext {
.with_listing_options(options)
.with_schema(resolved_schema);
let table = ListingTable::try_new(config)?.with_definition(sql_definition);
self.register_table(name, Arc::new(table))?;
self.register_table(TableReference::Bare { table: name }, Arc::new(table))?;
Ok(())
}

Expand Down
7 changes: 6 additions & 1 deletion datafusion/core/tests/sql/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,12 @@ async fn invalid_qualified_table_references() -> Result<()> {
"way.too.many.namespaces.as.ident.prefixes.aggregate_test_100",
] {
let sql = format!("SELECT COUNT(*) FROM {}", table_ref);
assert!(matches!(ctx.sql(&sql).await, Err(DataFusionError::Plan(_))));
let result = ctx.sql(&sql).await;
assert!(
matches!(result, Err(DataFusionError::Plan(_))),
"result was: {:?}",
result
);
}
Ok(())
}
Expand Down
21 changes: 10 additions & 11 deletions datafusion/core/tests/sqllogictests/src/insert/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,21 @@ use datafusion::datasource::MemTable;
use datafusion::prelude::SessionContext;
use datafusion_common::{DFSchema, DataFusionError};
use datafusion_expr::Expr as DFExpr;
use datafusion_sql::planner::{PlannerContext, SqlToRel};
use datafusion_sql::planner::{object_name_to_table_reference, PlannerContext, SqlToRel};
use sqlparser::ast::{Expr, SetExpr, Statement as SQLStatement};
use std::sync::Arc;

pub async fn insert(ctx: &SessionContext, insert_stmt: &SQLStatement) -> Result<String> {
pub async fn insert(ctx: &SessionContext, insert_stmt: SQLStatement) -> Result<String> {
// First, use sqlparser to get table name and insert values
let table_name;
let table_reference;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the test harness to use the new name handling as well

let insert_values: Vec<Vec<Expr>>;
match insert_stmt {
SQLStatement::Insert {
table_name: name,
source,
..
table_name, source, ..
} => {
table_reference = object_name_to_table_reference(table_name)?;

// Todo: check columns match table schema
table_name = name.to_string();
match &*source.body {
SetExpr::Values(values) => {
insert_values = values.0.clone();
Expand All @@ -54,9 +53,9 @@ pub async fn insert(ctx: &SessionContext, insert_stmt: &SQLStatement) -> Result<
}

// Second, get batches in table and destroy the old table
let mut origin_batches = ctx.table(table_name.as_str())?.collect().await?;
let schema = ctx.table_provider(table_name.as_str())?.schema();
ctx.deregister_table(table_name.as_str())?;
let mut origin_batches = ctx.table(&table_reference)?.collect().await?;
let schema = ctx.table_provider(&table_reference)?.schema();
ctx.deregister_table(&table_reference)?;

// Third, transfer insert values to `RecordBatch`
// Attention: schema info can be ignored. (insert values don't contain schema info)
Expand Down Expand Up @@ -84,7 +83,7 @@ pub async fn insert(ctx: &SessionContext, insert_stmt: &SQLStatement) -> Result<

// Final, create new memtable with same schema.
let new_provider = MemTable::try_new(schema, vec![origin_batches])?;
ctx.register_table(table_name.as_str(), Arc::new(new_provider))?;
ctx.register_table(&table_reference, Arc::new(new_provider))?;

Ok("".to_string())
}
12 changes: 7 additions & 5 deletions datafusion/core/tests/sqllogictests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,17 @@ fn format_batches(batches: Vec<RecordBatch>) -> Result<String> {
async fn run_query(ctx: &SessionContext, sql: impl Into<String>) -> Result<String> {
let sql = sql.into();
// Check if the sql is `insert`
if let Ok(statements) = DFParser::parse_sql(&sql) {
if let Statement::Statement(statement) = &statements[0] {
if let SQLStatement::Insert { .. } = &**statement {
if let Ok(mut statements) = DFParser::parse_sql(&sql) {
let statement0 = statements.pop_front().expect("at least one SQL statement");
if let Statement::Statement(statement) = statement0 {
let statement = *statement;
if matches!(&statement, SQLStatement::Insert { .. }) {
return insert(ctx, statement).await;
}
}
}
let df = ctx.sql(sql.as_str()).await.unwrap();
let results: Vec<RecordBatch> = df.collect().await.unwrap();
let df = ctx.sql(sql.as_str()).await?;
let results: Vec<RecordBatch> = df.collect().await?;
let formatted_batches = format_batches(results)?;
Ok(formatted_batches)
}
Loading