Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support case sensitive column for with_column_renamed #7063

Merged
merged 6 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,7 @@ impl Column {
}
}

/// Deserialize a fully qualified name string into a column
pub fn from_qualified_name(flat_name: impl Into<String>) -> Self {
let flat_name = flat_name.into();
let mut idents = parse_identifiers_normalized(&flat_name);

fn from_idents(idents: &mut Vec<String>) -> Option<Self> {
let (relation, name) = match idents.len() {
1 => (None, idents.remove(0)),
2 => (
Expand All @@ -97,9 +93,33 @@ impl Column {
),
// any expression that failed to parse or has more than 4 period delimited
// identifiers will be treated as an unqualified column name
_ => (None, flat_name),
_ => return None,
};
Self { relation, name }
Some(Self { relation, name })
}

/// Deserialize a fully qualified name string into a column
///
/// Treats the name as a SQL identifier. For example
/// `foo.BAR` would be parsed to a reference to relation `foo`, column name `bar` (lower case)
/// where `"foo.BAR"` would be parsed to a reference to column named `foo.BAR`
pub fn from_qualified_name(flat_name: impl Into<String>) -> Self {
let flat_name: &str = &flat_name.into();
Self::from_idents(&mut parse_identifiers_normalized(flat_name, false))
.unwrap_or_else(|| Self {
relation: None,
name: flat_name.to_owned(),
})
}

/// Deserialize a fully qualified name string into a column preserving column text case
pub fn from_qualified_name_ignore_case(flat_name: impl Into<String>) -> Self {
let flat_name: &str = &flat_name.into();
Self::from_idents(&mut parse_identifiers_normalized(flat_name, true))
.unwrap_or_else(|| Self {
relation: None,
name: flat_name.to_owned(),
})
}

/// Serialize column into a flat name string
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/table_reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ impl<'a> TableReference<'a> {
/// Forms a [`TableReference`] by parsing `s` as a multipart SQL
/// identifier. See docs on [`TableReference`] for more details.
pub fn parse_str(s: &'a str) -> Self {
let mut parts = parse_identifiers_normalized(s);
let mut parts = parse_identifiers_normalized(s, false);

match parts.len() {
1 => Self::Bare {
Expand Down
5 changes: 3 additions & 2 deletions datafusion/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,14 @@ pub fn get_arrayref_at_indices(
.collect()
}

pub(crate) fn parse_identifiers_normalized(s: &str) -> Vec<String> {
pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) -> Vec<String> {
parse_identifiers(s)
.unwrap_or_default()
.into_iter()
.map(|id| match id.quote_style {
Some(_) => id.value,
None => id.value.to_ascii_lowercase(),
None if ignore_case => id.value,
_ => id.value.to_ascii_lowercase(),
})
.collect::<Vec<_>>()
}
Expand Down
67 changes: 64 additions & 3 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1014,12 +1014,21 @@ impl DataFrame {
/// ```
pub fn with_column_renamed(
self,
old_name: impl Into<Column>,
old_name: impl Into<String>,
new_name: &str,
) -> Result<DataFrame> {
let old_name: Column = old_name.into();
let ident_opts = self
.session_state
.config_options()
.sql_parser
.enable_ident_normalization;
let old_column: Column = if ident_opts {
Column::from_qualified_name(old_name)
} else {
Column::from_qualified_name_ignore_case(old_name)
};

let field_to_rename = match self.plan.schema().field_from_column(&old_name) {
let field_to_rename = match self.plan.schema().field_from_column(&old_column) {
Ok(field) => field,
// no-op if field not found
Err(DataFusionError::SchemaError(SchemaError::FieldNotFound { .. })) => {
Expand Down Expand Up @@ -1830,6 +1839,58 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn with_column_renamed_case_sensitive() -> Result<()> {
let config =
SessionConfig::from_string_hash_map(std::collections::HashMap::from([(
"datafusion.sql_parser.enable_ident_normalization".to_owned(),
"false".to_owned(),
)]))?;
let mut ctx = SessionContext::with_config(config);
let name = "aggregate_test_100";
register_aggregate_csv(&mut ctx, name).await?;
let df = ctx.table(name);

let df = df
.await?
.filter(col("c2").eq(lit(3)).and(col("c1").eq(lit("a"))))?
.limit(0, Some(1))?
.sort(vec![
// make the test deterministic
col("c1").sort(true, true),
col("c2").sort(true, true),
col("c3").sort(true, true),
])?
.select_columns(&["c1"])?;

let df_renamed = df.clone().with_column_renamed("c1", "CoLuMn1")?;

let res = &df_renamed.clone().collect().await?;

assert_batches_sorted_eq!(
vec![
"+---------+",
"| CoLuMn1 |",
"+---------+",
"| a |",
"+---------+",
],
res
);

let df_renamed = df_renamed
.with_column_renamed("CoLuMn1", "c1")?
.collect()
.await?;

assert_batches_sorted_eq!(
vec!["+----+", "| c1 |", "+----+", "| a |", "+----+",],
&df_renamed
);

Ok(())
}

#[tokio::test]
async fn filter_pushdown_dataframe() -> Result<()> {
let ctx = SessionContext::new();
Expand Down