From c2e83243bbb666bd626b9f9f826ff50f0297a3ef Mon Sep 17 00:00:00 2001 From: David Blajda Date: Tue, 16 May 2023 19:49:00 -0400 Subject: [PATCH 01/26] mvp update operation --- rust/src/action/mod.rs | 10 +- rust/src/operations/delete.rs | 60 +++- rust/src/operations/mod.rs | 11 +- rust/src/operations/update.rs | 636 ++++++++++++++++++++++++++++++++++ 4 files changed, 714 insertions(+), 3 deletions(-) create mode 100644 rust/src/operations/update.rs diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index e246bfee97..294d86756c 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -550,6 +550,11 @@ pub enum DeltaOperation { /// The condition the to be deleted data must match predicate: Option, }, + /// Update data matching predicate from delta table + Update { + /// The update predicate + predicate: Option, + }, /// Represents a Delta `StreamingUpdate` operation. #[serde(rename_all = "camelCase")] @@ -588,6 +593,7 @@ impl DeltaOperation { DeltaOperation::Create { .. } => "CREATE TABLE", DeltaOperation::Write { .. } => "WRITE", DeltaOperation::Delete { .. } => "DELETE", + DeltaOperation::Update { .. } => "UPDATE", DeltaOperation::StreamingUpdate { .. } => "STREAMING UPDATE", DeltaOperation::Optimize { .. } => "OPTIMIZE", DeltaOperation::FileSystemCheck { .. } => "FSCK", @@ -631,7 +637,8 @@ impl DeltaOperation { | Self::FileSystemCheck {} | Self::StreamingUpdate { .. } | Self::Write { .. } - | Self::Delete { .. } => true, + | Self::Delete { .. } + | Self::Update { .. } => true, } } @@ -651,6 +658,7 @@ impl DeltaOperation { // TODO add more operations Self::Write { predicate, .. } => predicate.clone(), Self::Delete { predicate, .. } => predicate.clone(), + Self::Update { predicate, .. } => predicate.clone(), _ => None, } } diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index 2a12985ac2..4e44dc5f78 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -103,7 +103,7 @@ pub struct DeleteMetrics { } /// Determine which files contain a record that statisfies the predicate -async fn find_files<'a>( +pub async fn find_files<'a>( snapshot: &DeltaTableState, store: ObjectStoreRef, schema: Arc, @@ -941,6 +941,64 @@ mod tests { let actual = get_data(table).await; assert_batches_sorted_eq!(&expected, &actual); } + #[tokio::test] + async fn test_delete_on_mixed_predicate() { + // Perform a delete where the predicate only contains partition columns + + let schema = get_arrow_schema(&None); + let table = setup_table(Some(["modified"].to_vec())).await; + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])), + Arc::new(arrow::array::Int32Array::from_slice([0, 20, 10, 100])), + Arc::new(arrow::array::StringArray::from_slice([ + "2021-02-02", + "2021-02-03", + "2021-02-02", + "2021-02-03", + ])), + ], + ) + .unwrap(); + + // write some data + let table = DeltaOps(table) + .write(vec![batch]) + .with_save_mode(SaveMode::Append) + .await + .unwrap(); + assert_eq!(table.version(), 1); + assert_eq!(table.get_file_uris().count(), 2); + + let (table, metrics) = DeltaOps(table) + .delete() + .with_predicate(col("modified").eq(lit("2021-02-03")).and(col("id").eq(lit(20)))) + .await + .unwrap(); + assert_eq!(table.version(), 2); + assert_eq!(table.get_file_uris().count(), 1); + + assert_eq!(metrics.num_added_files, 0); + assert_eq!(metrics.num_removed_files, 1); + assert_eq!(metrics.num_deleted_rows, None); + assert_eq!(metrics.num_copied_rows, None); + assert!(metrics.scan_time_ms > 0); + assert_eq!(metrics.rewrite_time_ms, 0); + + let expected = vec![ + "+----+-------+------------+", + "| id | value | modified |", + "+----+-------+------------+", + "| A | 0 | 2021-02-02 |", + "| A | 10 | 2021-02-02 |", + "+----+-------+------------+", + ]; + + let actual = get_data(table).await; + assert_batches_sorted_eq!(&expected, &actual); + } #[tokio::test] async fn test_delete_on_partition_column() { diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index a7da3a2d1d..90ab54562c 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -21,7 +21,7 @@ pub mod transaction; pub mod vacuum; #[cfg(feature = "datafusion")] -use self::{delete::DeleteBuilder, load::LoadBuilder, write::WriteBuilder}; +use self::{delete::DeleteBuilder, load::LoadBuilder, update::UpdateBuilder, write::WriteBuilder}; #[cfg(feature = "datafusion")] use arrow::record_batch::RecordBatch; #[cfg(feature = "datafusion")] @@ -34,6 +34,8 @@ pub mod delete; #[cfg(feature = "datafusion")] mod load; #[cfg(feature = "datafusion")] +pub mod update; +#[cfg(feature = "datafusion")] pub mod write; #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod writer; @@ -139,6 +141,13 @@ impl DeltaOps { pub fn delete(self) -> DeleteBuilder { DeleteBuilder::new(self.0.object_store(), self.0.state) } + + /// Update data from Delta table + #[cfg(feature = "datafusion")] + #[must_use] + pub fn update(self) -> UpdateBuilder { + UpdateBuilder::new(self.0.object_store(), self.0.state) + } } impl From for DeltaOps { diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs new file mode 100644 index 0000000000..87516eb20c --- /dev/null +++ b/rust/src/operations/update.rs @@ -0,0 +1,636 @@ +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::{Instant, SystemTime, UNIX_EPOCH}, +}; + +use arrow::datatypes::Schema as ArrowSchema; +use arrow_schema::Field; +use datafusion::{ + execution::context::SessionState, + physical_optimizer::pruning::PruningPredicate, + physical_plan::{projection::ProjectionExec, ExecutionPlan}, + prelude::SessionContext, +}; +use datafusion_common::{Column, DFSchema, ScalarValue}; +use datafusion_expr::{case, col, lit, Expr}; +use datafusion_physical_expr::{ + create_physical_expr, execution_props::ExecutionProps, expressions, PhysicalExpr, +}; +use futures::future::BoxFuture; +use parquet::file::properties::WriterProperties; +use serde_json::{Map, Value}; + +use crate::{ + action::{Action, Add, DeltaOperation, Remove}, + delta_datafusion::{logical_expr_to_physical_expr, parquet_scan_from_actions, register_store}, + operations::delete::find_files, + storage::{DeltaObjectStore, ObjectStoreRef}, + table_state::DeltaTableState, + DeltaDataTypeVersion, DeltaResult, DeltaTable, DeltaTableError, +}; + +use super::{transaction::commit, write::write_execution_plan}; + +/// Updates records in the Delta Table. +/// See this module's documentaiton for more information +pub struct UpdateBuilder { + /// Which records to update + predicate: Option, + /// How to update columns in a record that match the predicate + updates: HashMap, + /// A snapshot of the table's state + snapshot: DeltaTableState, + /// Delta object store for handling data files + object_store: Arc, + /// Datafusion session state relevant for executing the input plan + state: Option, + /// Properties passed to underlying parquet writer for when files are rewritten + writer_properties: Option, + /// Additional metadata to be added to commit + app_metadata: Option>, +} + +#[derive(Default)] +/// Metrics collected during the Update operation +pub struct UpdateMetrics { + /// Number of files added. + pub num_added_files: usize, + /// Number of files removed. + pub num_removed_files: usize, + /// Number of rows updated. + pub num_updated_rows: usize, + /// Number of rows just copied over in the process of updating files. + pub num_copied_rows: usize, + /// Time taken to execute the entire operation. + pub execution_time_ms: u128, + /// Time taken to scan the files for matches. + pub scan_time_ms: u128, +} + +impl UpdateBuilder { + /// Create a new ['UpdateBuilder'] + pub fn new(object_store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + Self { + predicate: None, + updates: HashMap::new(), + snapshot, + object_store, + state: None, + writer_properties: None, + app_metadata: None, + } + } + + /// Which records to update + pub fn with_predicate(mut self, predicate: Expr) -> Self { + self.predicate = Some(predicate); + self + } + + /// Overwrite update expressions with the supplied map + pub fn with_updates(mut self, updates: HashMap) -> Self { + self.updates = updates; + self + } + + /// Perform an additonal update expression during the operaton + pub fn with_update>(mut self, column: S, expression: Expr) -> Self { + self.updates.insert(column.into(), expression); + self + } + + /// The Datafusion session state to use + pub fn with_session_state(mut self, state: SessionState) -> Self { + self.state = Some(state); + self + } + + /// Additional metadata to be added to commit info + pub fn with_metadata( + mut self, + metadata: impl IntoIterator, + ) -> Self { + self.app_metadata = Some(Map::from_iter(metadata)); + self + } + + /// Writer properties passed to parquet writer for when fiiles are rewritten + pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self { + self.writer_properties = Some(writer_properties); + self + } +} + +async fn execute( + predicate: Option, + updates: &HashMap, + object_store: ObjectStoreRef, + snapshot: &DeltaTableState, + state: SessionState, + writer_properties: Option, + app_metadata: Option>, +) -> DeltaResult<((Vec, DeltaDataTypeVersion), UpdateMetrics)> { + // Validate the predicate and update expressions + // + // If the predicate is not set then all files needs to be updated. + // else if only contains partitions columns then perform in memory-scan + // otherwise scan files for records that statisfy the predicate + // + // For files that were identified, scan for record that match the predicate + // and perform update operations, and then commit add and remove actions to + // the log + + let exec_start = Instant::now(); + let mut metrics = UpdateMetrics::default(); + let mut version = snapshot.version(); + + let rewrite = match &predicate { + Some(predicate) => { + let schema = snapshot.arrow_schema()?; + let table_partition_cols = snapshot + .current_metadata() + .ok_or(DeltaTableError::NoMetadata)? + .partition_columns + .clone(); + let file_schema = Arc::new(ArrowSchema::new( + schema + .fields() + .iter() + .filter(|f| !table_partition_cols.contains(f.name())) + .cloned() + .collect::>(), + )); + let expr = logical_expr_to_physical_expr(predicate, &schema); + + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone())?; + let files_to_prune = pruning_predicate.prune(snapshot)?; + let files: Vec<&Add> = snapshot + .files() + .iter() + .zip(files_to_prune.into_iter()) + .filter_map(|(action, keep)| if keep { Some(action) } else { None }) + .collect(); + + // Create a new delta scan plan with only files that have a record + let rewrite = find_files( + snapshot, + object_store.clone(), + &schema, + file_schema.clone(), + files, + &state, + predicate, + ) + .await?; + rewrite.into_iter().map(|s| s.to_owned()).collect() + } + None => snapshot.files().to_owned(), + }; + + let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); + + let schema = snapshot.arrow_schema()?; + let table_partition_cols = snapshot + .current_metadata() + .ok_or(DeltaTableError::NoMetadata)? + .partition_columns + .clone(); + let file_schema = Arc::new(ArrowSchema::new( + schema + .fields() + .iter() + .filter(|f| !table_partition_cols.contains(f.name())) + .cloned() + .collect::>(), + )); + let execution_props = ExecutionProps::new(); + // For each rewrite evaluate the predicate and then modify each expression + // to either compute the new value or obtain the old one then write these batches + let parquet_scan = parquet_scan_from_actions( + snapshot, + object_store.clone(), + &rewrite, + &schema, + None, + &state, + None, + None, + ) + .await?; + + // Create a new projection + let physical_schema = file_schema.clone(); + let mut fields = Vec::new(); + for field in physical_schema.fields.iter() { + fields.push(field.to_owned()); + } + fields.push(Arc::new(Field::new( + "__delta_rs_update_predicate", + arrow_schema::DataType::Boolean, + true, + ))); + + let physical_schema = Arc::new(ArrowSchema::new(fields)); + let logical_schema: DFSchema = physical_schema.as_ref().clone().try_into()?; + + let mut expressions: Vec<(Arc, String)> = Vec::new(); + let scan_schema = parquet_scan.schema(); + for (i, field) in scan_schema.fields().into_iter().enumerate() { + expressions.push(( + Arc::new(expressions::Column::new(field.name(), i)), + field.name().to_owned(), + )); + } + + let predicate_expr = create_physical_expr( + &predicate, + &logical_schema, + &physical_schema, + &execution_props, + )?; + expressions.push((predicate_expr, "__delta_rs_update_predicate".to_string())); + + // Create the projection... + let projection: Arc = + Arc::new(ProjectionExec::try_new(expressions, parquet_scan)?); + let mut expressions: Vec<(Arc, String)> = Vec::new(); + let scan_schema = projection.schema(); + for (i, field) in scan_schema.fields().into_iter().enumerate() { + expressions.push(( + Arc::new(expressions::Column::new(field.name(), i)), + field.name().to_owned(), + )); + } + + let mut map = HashMap::::new(); + let mut control_columns = HashSet::::new(); + control_columns.insert("__delta_rs_update_predicate".to_owned()); + + for (column, expr) in updates { + let expr = case(col("__delta_rs_update_predicate")) + .when(lit(true), expr.to_owned()) + .otherwise(col(column.to_owned()))?; + let predicate_expr = + create_physical_expr(&expr, &logical_schema, &physical_schema, &execution_props)?; + map.insert(column.name.clone(), expressions.len()); + let c = "__delta_rs_".to_string() + &column.name; + expressions.push((predicate_expr, c.clone())); + control_columns.insert(c); + } + + let projection: Arc = + Arc::new(ProjectionExec::try_new(expressions, projection)?); + + // project again to remove __delta_rs columns and rename update columns to their original name + let mut expressions: Vec<(Arc, String)> = Vec::new(); + let scan_schema = projection.schema(); + for (i, field) in scan_schema.fields().into_iter().enumerate() { + if !control_columns.contains(field.name()) { + match map.get(field.name()) { + Some(value) => { + expressions.push(( + Arc::new(expressions::Column::new(field.name(), value.clone())), + field.name().to_owned(), + )); + } + None => { + expressions.push(( + Arc::new(expressions::Column::new(field.name(), i)), + field.name().to_owned(), + )); + } + } + } + } + + let projection: Arc = + Arc::new(ProjectionExec::try_new(expressions, projection)?); + + let add_actions = write_execution_plan( + snapshot, + state.clone(), + projection.clone(), + table_partition_cols.clone(), + object_store.clone(), + Some(snapshot.table_config().target_file_size() as usize), + None, + writer_properties, + ) + .await?; + + let deletion_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as i64; + let mut actions: Vec = add_actions.into_iter().map(Action::add).collect(); + + metrics.num_added_files = actions.len(); + metrics.num_removed_files = rewrite.len(); + + for action in rewrite { + actions.push(Action::remove(Remove { + path: action.path, + deletion_timestamp: Some(deletion_timestamp), + data_change: true, + extended_file_metadata: Some(true), + partition_values: Some(action.partition_values), + size: Some(action.size), + tags: None, + })) + } + + metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_micros(); + + // Do not make a commit when there are zero updates to the state + if !actions.is_empty() { + let operation = DeltaOperation::Update { + predicate: Some(predicate.canonical_name()), + }; + version = commit( + object_store.as_ref(), + &actions, + operation, + snapshot, + app_metadata, + ) + .await?; + } + + Ok(((actions, version), metrics)) +} + +impl std::future::IntoFuture for UpdateBuilder { + type Output = DeltaResult<(DeltaTable, UpdateMetrics)>; + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + let mut this = self; + + Box::pin(async move { + let state = this.state.unwrap_or_else(|| { + let session = SessionContext::new(); + + // If a user provides their own their DF state then they must register the store themselves + register_store(this.object_store.clone(), session.runtime_env()); + + session.state() + }); + + let ((actions, version), metrics) = execute( + this.predicate, + &this.updates, + this.object_store.clone(), + &this.snapshot, + state, + this.writer_properties, + this.app_metadata, + ) + .await?; + + this.snapshot + .merge(DeltaTableState::from_actions(actions, version)?, true, true); + let table = DeltaTable::new_with_state(this.object_store, this.snapshot); + + Ok((table, metrics)) + }) + } +} +#[cfg(test)] +mod tests { + + use crate::action::*; + use crate::operations::DeltaOps; + use crate::writer::test_utils::{get_arrow_schema, get_delta_schema}; + use crate::DeltaTable; + use arrow::record_batch::RecordBatch; + use datafusion::assert_batches_sorted_eq; + use datafusion::from_slice::FromSlice; + use datafusion::prelude::*; + use std::sync::Arc; + + async fn setup_table(partitions: Option>) -> DeltaTable { + let table_schema = get_delta_schema(); + + let table = DeltaOps::new_in_memory() + .create() + .with_columns(table_schema.get_fields().clone()) + .with_partition_columns(partitions.unwrap_or_default()) + .await + .unwrap(); + assert_eq!(table.version(), 0); + table + } + + async fn get_data(table: DeltaTable) -> Vec { + let ctx = SessionContext::new(); + ctx.register_table("test", Arc::new(table)).unwrap(); + ctx.sql("select * from test") + .await + .unwrap() + .collect() + .await + .unwrap() + } + + #[tokio::test] + async fn test_update_no_predicate() { + let schema = get_arrow_schema(&None); + let table = setup_table(None).await; + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])), + Arc::new(arrow::array::Int32Array::from_slice([1, 10, 10, 100])), + Arc::new(arrow::array::StringArray::from_slice([ + "2021-02-02", + "2021-02-02", + "2021-02-02", + "2021-02-02", + ])), + ], + ) + .unwrap(); + // write some data + let table = DeltaOps(table) + .write(vec![batch.clone()]) + .with_save_mode(SaveMode::Append) + .await + .unwrap(); + assert_eq!(table.version(), 1); + assert_eq!(table.get_file_uris().count(), 1); + + let (table, metrics) = DeltaOps(table) + .update() + .with_update("modified", lit("2023-05-14")) + .await + .unwrap(); + + assert_eq!(table.version(), 2); + assert_eq!(table.get_file_uris().count(), 1); + assert_eq!(metrics.num_added_files, 1); + assert_eq!(metrics.num_removed_files, 1); + //assert_eq!(metrics.num_updated_rows, 4); + //assert_eq!(metrics.num_copied_rows, 0); + + let expected = vec![ + "+----+-------+------------+", + "| id | value | modified |", + "+----+-------+------------+", + "| A | 1 | 2023-05-14 |", + "| A | 10 | 2023-05-14 |", + "| A | 100 | 2023-05-14 |", + "| B | 10 | 2023-05-14 |", + "+----+-------+------------+", + ]; + let actual = get_data(table).await; + assert_batches_sorted_eq!(&expected, &actual); + } + + #[tokio::test] + async fn test_update_non_partition() { + let schema = get_arrow_schema(&None); + let table = setup_table(None).await; + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])), + Arc::new(arrow::array::Int32Array::from_slice([1, 10, 10, 100])), + Arc::new(arrow::array::StringArray::from_slice([ + "2021-02-02", + "2021-02-02", + "2021-02-03", + "2021-02-03", + ])), + ], + ) + .unwrap(); + // write some data + let table = DeltaOps(table) + .write(vec![batch.clone()]) + .with_save_mode(SaveMode::Append) + .await + .unwrap(); + assert_eq!(table.version(), 1); + assert_eq!(table.get_file_uris().count(), 1); + + + let ctx = SessionContext::new(); + let t = Arc::new(table); + ctx.register_table("test", t).unwrap(); + let r = ctx.sql("select * from test").await.unwrap() + .filter(col("modified").eq(lit("2021-02-03"))).unwrap(); + + r.show().await.unwrap(); + ctx.sql("select * from test where modified = '2021-02-03'") + .await + .unwrap() + .collect() + .await + .unwrap(); + + /* + let (table, metrics) = DeltaOps(table) + .update() + .with_predicate(col("modified").eq(lit("2021-02-03"))) + .with_update("modified", lit("2023-05-14")) + .await + .unwrap(); + + assert_eq!(table.version(), 2); + assert_eq!(table.get_file_uris().count(), 1); + assert_eq!(metrics.num_added_files, 1); + assert_eq!(metrics.num_removed_files, 1); + //assert_eq!(metrics.num_updated_rows, 4); + //assert_eq!(metrics.num_copied_rows, 0); + + let expected = vec![ + "+----+-------+------------+", + "| id | value | modified |", + "+----+-------+------------+", + "| A | 1 | 2021-02-02 |", + "| A | 10 | 2023-05-14 |", + "| A | 100 | 2023-05-14 |", + "| B | 10 | 2021-02-02 |", + "+----+-------+------------+", + ]; + let actual = get_data(table).await; + assert_batches_sorted_eq!(&expected, &actual); + */ + } + + #[tokio::test] + async fn test_update_partitions() { + let schema = get_arrow_schema(&None); + let table = setup_table(Some(vec!["modified"])).await; + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])), + Arc::new(arrow::array::Int32Array::from_slice([1, 10, 10, 100])), + Arc::new(arrow::array::StringArray::from_slice([ + "2021-02-02", + "2021-02-02", + "2021-02-03", + "2021-02-03", + ])), + ], + ) + .unwrap(); + // write some data + let table = DeltaOps(table) + .write(vec![batch.clone()]) + .with_save_mode(SaveMode::Append) + .await + .unwrap(); + assert_eq!(table.version(), 1); + assert_eq!(table.get_file_uris().count(), 2); + + let (table, metrics) = DeltaOps(table) + .update() + .with_predicate(col("modified").eq(lit("2021-02-03"))) + .with_update("modified", lit("2023-05-14")) + .await + .unwrap(); + + assert_eq!(table.version(), 2); + assert_eq!(table.get_file_uris().count(), 2); + assert_eq!(metrics.num_added_files, 1); + assert_eq!(metrics.num_removed_files, 1); + //assert_eq!(metrics.num_updated_rows, 4); + //assert_eq!(metrics.num_copied_rows, 0); + + let expected = vec![ + "+----+-------+------------+", + "| id | value | modified |", + "+----+-------+------------+", + "| A | 1 | 2023-05-14 |", + "| A | 10 | 2023-05-14 |", + "| A | 100 | 2023-05-14 |", + "| B | 10 | 2023-05-14 |", + "+----+-------+------------+", + ]; + let actual = get_data(table).await; + assert_batches_sorted_eq!(&expected, &actual); + + } + + #[tokio::test] + async fn test_failure_invalid_expressions() { + // The predicate must be deterministic and expression must be valid + + let table = setup_table(None).await; + + let res = DeltaOps(table) + .update() + .with_predicate(col("value").eq(cast( + random() * lit(20.0), + arrow::datatypes::DataType::Int32, + ))) + .await; + assert!(res.is_err()); + } +} From 0bac94be1b83f231985863c58d2a1fd8880e32b3 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Tue, 23 May 2023 22:40:58 -0400 Subject: [PATCH 02/26] rebase on main and fix expression schemas --- rust/src/operations/delete.rs | 6 +- rust/src/operations/update.rs | 104 ++++++++++++++++++---------------- 2 files changed, 61 insertions(+), 49 deletions(-) diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index 4e44dc5f78..679838b980 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -974,7 +974,11 @@ mod tests { let (table, metrics) = DeltaOps(table) .delete() - .with_predicate(col("modified").eq(lit("2021-02-03")).and(col("id").eq(lit(20)))) + .with_predicate( + col("modified") + .eq(lit("2021-02-03")) + .and(col("id").eq(lit(20))), + ) .await .unwrap(); assert_eq!(table.version(), 2); diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index 87516eb20c..8fe250f21b 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -1,3 +1,23 @@ +//! Update records from a Delta Table for records statisfy a predicate +//! +//! When a predicate is not provided then all records are updated from the Delta +//! Table. Otherwise a scan of the Delta table is performed to mark any files +//! that contain records that satisfy the predicate. Once they are determined +//! then column values are updated with new values provided by the user +//! +//! +//! Predicates MUST be deterministic otherwise undefined behaviour may occur during the +//! scanning and rewriting phase. +//! +//! # Example +//! ```rust ignore +//! let table = open_table("../path/to/table")?; +//! let (table, metrics) = UpdateBuilder::new(table.object_store(), table.state) +//! .with_predicate(col("col1").eq(lit(1))) +//! .with_update("value", col("value") + lit(20)) +//! .await?; +//! ```` + use std::{ collections::{HashMap, HashSet}, sync::Arc, @@ -23,11 +43,11 @@ use serde_json::{Map, Value}; use crate::{ action::{Action, Add, DeltaOperation, Remove}, - delta_datafusion::{logical_expr_to_physical_expr, parquet_scan_from_actions, register_store}, + delta_datafusion::{parquet_scan_from_actions, register_store}, operations::delete::find_files, storage::{DeltaObjectStore, ObjectStoreRef}, table_state::DeltaTableState, - DeltaDataTypeVersion, DeltaResult, DeltaTable, DeltaTableError, + DeltaResult, DeltaTable, DeltaTableError, }; use super::{transaction::commit, write::write_execution_plan}; @@ -130,7 +150,7 @@ async fn execute( state: SessionState, writer_properties: Option, app_metadata: Option>, -) -> DeltaResult<((Vec, DeltaDataTypeVersion), UpdateMetrics)> { +) -> DeltaResult<((Vec, i64), UpdateMetrics)> { // Validate the predicate and update expressions // // If the predicate is not set then all files needs to be updated. @@ -161,7 +181,15 @@ async fn execute( .cloned() .collect::>(), )); - let expr = logical_expr_to_physical_expr(predicate, &schema); + + let input_schema = snapshot.input_schema()?; + let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; + let expr = create_physical_expr( + predicate, + &input_dfschema, + &input_schema, + state.execution_props(), + )?; let pruning_predicate = PruningPredicate::try_new(expr, schema.clone())?; let files_to_prune = pruning_predicate.prune(snapshot)?; @@ -176,7 +204,7 @@ async fn execute( let rewrite = find_files( snapshot, object_store.clone(), - &schema, + schema.clone(), file_schema.clone(), files, &state, @@ -196,14 +224,6 @@ async fn execute( .ok_or(DeltaTableError::NoMetadata)? .partition_columns .clone(); - let file_schema = Arc::new(ArrowSchema::new( - schema - .fields() - .iter() - .filter(|f| !table_partition_cols.contains(f.name())) - .cloned() - .collect::>(), - )); let execution_props = ExecutionProps::new(); // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches @@ -220,9 +240,9 @@ async fn execute( .await?; // Create a new projection - let physical_schema = file_schema.clone(); + let input_schema = snapshot.input_schema()?; let mut fields = Vec::new(); - for field in physical_schema.fields.iter() { + for field in input_schema.fields.iter() { fields.push(field.to_owned()); } fields.push(Arc::new(Field::new( @@ -231,8 +251,8 @@ async fn execute( true, ))); - let physical_schema = Arc::new(ArrowSchema::new(fields)); - let logical_schema: DFSchema = physical_schema.as_ref().clone().try_into()?; + let input_schema = Arc::new(ArrowSchema::new(fields)); + let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; let mut expressions: Vec<(Arc, String)> = Vec::new(); let scan_schema = parquet_scan.schema(); @@ -243,12 +263,8 @@ async fn execute( )); } - let predicate_expr = create_physical_expr( - &predicate, - &logical_schema, - &physical_schema, - &execution_props, - )?; + let predicate_expr = + create_physical_expr(&predicate, &input_dfschema, &input_schema, &execution_props)?; expressions.push((predicate_expr, "__delta_rs_update_predicate".to_string())); // Create the projection... @@ -272,7 +288,7 @@ async fn execute( .when(lit(true), expr.to_owned()) .otherwise(col(column.to_owned()))?; let predicate_expr = - create_physical_expr(&expr, &logical_schema, &physical_schema, &execution_props)?; + create_physical_expr(&expr, &input_dfschema, &input_schema, &execution_props)?; map.insert(column.name.clone(), expressions.len()); let c = "__delta_rs_".to_string() + &column.name; expressions.push((predicate_expr, c.clone())); @@ -290,7 +306,7 @@ async fn execute( match map.get(field.name()) { Some(value) => { expressions.push(( - Arc::new(expressions::Column::new(field.name(), value.clone())), + Arc::new(expressions::Column::new(field.name(), *value)), field.name().to_owned(), )); } @@ -516,22 +532,6 @@ mod tests { assert_eq!(table.version(), 1); assert_eq!(table.get_file_uris().count(), 1); - - let ctx = SessionContext::new(); - let t = Arc::new(table); - ctx.register_table("test", t).unwrap(); - let r = ctx.sql("select * from test").await.unwrap() - .filter(col("modified").eq(lit("2021-02-03"))).unwrap(); - - r.show().await.unwrap(); - ctx.sql("select * from test where modified = '2021-02-03'") - .await - .unwrap() - .collect() - .await - .unwrap(); - - /* let (table, metrics) = DeltaOps(table) .update() .with_predicate(col("modified").eq(lit("2021-02-03"))) @@ -558,7 +558,6 @@ mod tests { ]; let actual = get_data(table).await; assert_batches_sorted_eq!(&expected, &actual); - */ } #[tokio::test] @@ -593,6 +592,7 @@ mod tests { .update() .with_predicate(col("modified").eq(lit("2021-02-03"))) .with_update("modified", lit("2023-05-14")) + .with_update("id", lit("C")) .await .unwrap(); @@ -600,22 +600,22 @@ mod tests { assert_eq!(table.get_file_uris().count(), 2); assert_eq!(metrics.num_added_files, 1); assert_eq!(metrics.num_removed_files, 1); - //assert_eq!(metrics.num_updated_rows, 4); + //assert_eq!(metrics.num_updated_rows, 2); //assert_eq!(metrics.num_copied_rows, 0); let expected = vec![ "+----+-------+------------+", "| id | value | modified |", "+----+-------+------------+", - "| A | 1 | 2023-05-14 |", - "| A | 10 | 2023-05-14 |", - "| A | 100 | 2023-05-14 |", - "| B | 10 | 2023-05-14 |", + "| A | 1 | 2021-02-02 |", + "| C | 10 | 2023-05-14 |", + "| C | 100 | 2023-05-14 |", + "| B | 10 | 2021-02-02 |", "+----+-------+------------+", ]; + let actual = get_data(table).await; assert_batches_sorted_eq!(&expected, &actual); - } #[tokio::test] @@ -632,5 +632,13 @@ mod tests { ))) .await; assert!(res.is_err()); + + let table = setup_table(None).await; + let res = DeltaOps(table) + .update() + .with_predicate(col("value").eq(lit(10))) + .with_update("value", col("value") + lit(20)) + .await; + assert!(res.is_err()); } } From 392e050350a1be0c623c4ac2a3346ff43188cefe Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sat, 27 May 2023 23:29:17 -0400 Subject: [PATCH 03/26] error update on non-deterministic predicate --- rust/src/operations/delete.rs | 13 +-- rust/src/operations/update.rs | 146 +++++++++++++++++++--------------- 2 files changed, 90 insertions(+), 69 deletions(-) diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index 679838b980..78bdaf9752 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -234,11 +234,11 @@ async fn handle_stream( Ok(None) } -struct ExprProperties { - partition_columns: Vec, +pub(crate) struct ExprProperties { + pub partition_columns: Vec, - partition_only: bool, - result: DeltaResult<()>, + pub partition_only: bool, + pub result: DeltaResult<()>, } /// Ensure only expressions that make sense are accepted, check for @@ -568,7 +568,10 @@ async fn execute( Ok(((actions, version), metrics)) } -async fn scan_memory_table(snapshot: &DeltaTableState, predicate: &Expr) -> DeltaResult> { +pub(crate) async fn scan_memory_table( + snapshot: &DeltaTableState, + predicate: &Expr, +) -> DeltaResult> { let actions = snapshot.files().to_owned(); let batch = snapshot.add_actions_table(true)?; diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index 8fe250f21b..1189c19dc4 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -32,11 +32,9 @@ use datafusion::{ physical_plan::{projection::ProjectionExec, ExecutionPlan}, prelude::SessionContext, }; -use datafusion_common::{Column, DFSchema, ScalarValue}; +use datafusion_common::{tree_node::TreeNode, Column, DFSchema, ScalarValue}; use datafusion_expr::{case, col, lit, Expr}; -use datafusion_physical_expr::{ - create_physical_expr, execution_props::ExecutionProps, expressions, PhysicalExpr, -}; +use datafusion_physical_expr::{create_physical_expr, expressions, PhysicalExpr}; use futures::future::BoxFuture; use parquet::file::properties::WriterProperties; use serde_json::{Map, Value}; @@ -50,7 +48,11 @@ use crate::{ DeltaResult, DeltaTable, DeltaTableError, }; -use super::{transaction::commit, write::write_execution_plan}; +use super::{ + delete::{scan_memory_table, ExprProperties}, + transaction::commit, + write::write_execution_plan, +}; /// Updates records in the Delta Table. /// See this module's documentaiton for more information @@ -78,10 +80,10 @@ pub struct UpdateMetrics { pub num_added_files: usize, /// Number of files removed. pub num_removed_files: usize, - /// Number of rows updated. - pub num_updated_rows: usize, - /// Number of rows just copied over in the process of updating files. - pub num_copied_rows: usize, + // Number of rows updated. + // pub num_updated_rows: usize, + // Number of rows just copied over in the process of updating files. + // pub num_copied_rows: usize, /// Time taken to execute the entire operation. pub execution_time_ms: u128, /// Time taken to scan the files for matches. @@ -165,14 +167,18 @@ async fn execute( let mut metrics = UpdateMetrics::default(); let mut version = snapshot.version(); - let rewrite = match &predicate { + if updates.is_empty() { + return Ok(((Vec::new(), version), metrics)); + } + + let current_metadata = snapshot + .current_metadata() + .ok_or(DeltaTableError::NoMetadata)?; + let table_partition_cols = current_metadata.partition_columns.clone(); + let schema = snapshot.arrow_schema()?; + + let files_to_rewrite = match &predicate { Some(predicate) => { - let schema = snapshot.arrow_schema()?; - let table_partition_cols = snapshot - .current_metadata() - .ok_or(DeltaTableError::NoMetadata)? - .partition_columns - .clone(); let file_schema = Arc::new(ArrowSchema::new( schema .fields() @@ -191,46 +197,57 @@ async fn execute( state.execution_props(), )?; - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone())?; - let files_to_prune = pruning_predicate.prune(snapshot)?; - let files: Vec<&Add> = snapshot - .files() - .iter() - .zip(files_to_prune.into_iter()) - .filter_map(|(action, keep)| if keep { Some(action) } else { None }) - .collect(); - - // Create a new delta scan plan with only files that have a record - let rewrite = find_files( - snapshot, - object_store.clone(), - schema.clone(), - file_schema.clone(), - files, - &state, - predicate, - ) - .await?; - rewrite.into_iter().map(|s| s.to_owned()).collect() + // Validate the Predicate and determine if it only contains partition columns + let mut expr_properties = ExprProperties { + partition_only: true, + partition_columns: current_metadata.partition_columns.clone(), + result: Ok(()), + }; + + TreeNode::visit(predicate, &mut expr_properties)?; + expr_properties.result?; + + let scan_start = Instant::now(); + let candidates = if expr_properties.partition_only { + scan_memory_table(snapshot, predicate).await? + } else { + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone())?; + let files_to_prune = pruning_predicate.prune(snapshot)?; + let files: Vec<&Add> = snapshot + .files() + .iter() + .zip(files_to_prune.into_iter()) + .filter_map(|(action, keep)| if keep { Some(action) } else { None }) + .collect(); + + // Create a new delta scan plan with only files that have a record + let candidates = find_files( + snapshot, + object_store.clone(), + schema.clone(), + file_schema.clone(), + files, + &state, + predicate, + ) + .await?; + candidates.into_iter().map(|s| s.to_owned()).collect() + }; + metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_micros(); + candidates } None => snapshot.files().to_owned(), }; let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); - let schema = snapshot.arrow_schema()?; - let table_partition_cols = snapshot - .current_metadata() - .ok_or(DeltaTableError::NoMetadata)? - .partition_columns - .clone(); - let execution_props = ExecutionProps::new(); + let execution_props = state.execution_props(); // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches let parquet_scan = parquet_scan_from_actions( snapshot, object_store.clone(), - &rewrite, + &files_to_rewrite, &schema, None, &state, @@ -239,7 +256,7 @@ async fn execute( ) .await?; - // Create a new projection + // Create a projection for a new column with the predicate evaluated let input_schema = snapshot.input_schema()?; let mut fields = Vec::new(); for field in input_schema.fields.iter() { @@ -251,6 +268,7 @@ async fn execute( true, ))); + // Recreate the schemas with the new column included let input_schema = Arc::new(ArrowSchema::new(fields)); let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; @@ -264,10 +282,14 @@ async fn execute( } let predicate_expr = - create_physical_expr(&predicate, &input_dfschema, &input_schema, &execution_props)?; + create_physical_expr(&predicate, &input_dfschema, &input_schema, execution_props)?; expressions.push((predicate_expr, "__delta_rs_update_predicate".to_string())); - // Create the projection... + // Perform another projection but instead calculate updated values based on + // the predicate value. If the predicate is true then evalute the user + // provided expression otherwise return the original column value + // + // For each update column a new column with a name of __delta_rs_ + `original name` is created let projection: Arc = Arc::new(ProjectionExec::try_new(expressions, parquet_scan)?); let mut expressions: Vec<(Arc, String)> = Vec::new(); @@ -279,6 +301,7 @@ async fn execute( )); } + // Maintain a map from the original column name to its temporary column index let mut map = HashMap::::new(); let mut control_columns = HashSet::::new(); control_columns.insert("__delta_rs_update_predicate".to_owned()); @@ -288,19 +311,19 @@ async fn execute( .when(lit(true), expr.to_owned()) .otherwise(col(column.to_owned()))?; let predicate_expr = - create_physical_expr(&expr, &input_dfschema, &input_schema, &execution_props)?; + create_physical_expr(&expr, &input_dfschema, &input_schema, execution_props)?; map.insert(column.name.clone(), expressions.len()); let c = "__delta_rs_".to_string() + &column.name; expressions.push((predicate_expr, c.clone())); control_columns.insert(c); } - let projection: Arc = - Arc::new(ProjectionExec::try_new(expressions, projection)?); + let projection_predicate: Arc = + Arc::new(ProjectionExec::try_new(expressions, projection.clone())?); - // project again to remove __delta_rs columns and rename update columns to their original name + // Project again to remove __delta_rs columns and rename update columns to their original name let mut expressions: Vec<(Arc, String)> = Vec::new(); - let scan_schema = projection.schema(); + let scan_schema = projection_predicate.schema(); for (i, field) in scan_schema.fields().into_iter().enumerate() { if !control_columns.contains(field.name()) { match map.get(field.name()) { @@ -320,8 +343,10 @@ async fn execute( } } - let projection: Arc = - Arc::new(ProjectionExec::try_new(expressions, projection)?); + let projection: Arc = Arc::new(ProjectionExec::try_new( + expressions, + projection_predicate.clone(), + )?); let add_actions = write_execution_plan( snapshot, @@ -342,9 +367,9 @@ async fn execute( let mut actions: Vec = add_actions.into_iter().map(Action::add).collect(); metrics.num_added_files = actions.len(); - metrics.num_removed_files = rewrite.len(); + metrics.num_removed_files = files_to_rewrite.len(); - for action in rewrite { + for action in files_to_rewrite { actions.push(Action::remove(Remove { path: action.path, deletion_timestamp: Some(deletion_timestamp), @@ -630,13 +655,6 @@ mod tests { random() * lit(20.0), arrow::datatypes::DataType::Int32, ))) - .await; - assert!(res.is_err()); - - let table = setup_table(None).await; - let res = DeltaOps(table) - .update() - .with_predicate(col("value").eq(lit(10))) .with_update("value", col("value") + lit(20)) .await; assert!(res.is_err()); From 1c48c32badc55fc04db25daedc30eca4ca08cb8a Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sun, 28 May 2023 00:42:58 -0400 Subject: [PATCH 04/26] Factor out find files --- rust/src/delta_datafusion.rs | 396 +++++++++++++++++++++++++++++- rust/src/operations/delete.rs | 436 ++++------------------------------ rust/src/operations/update.rs | 92 ++----- 3 files changed, 450 insertions(+), 474 deletions(-) diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 1835eebc57..ee1f9f00de 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -24,13 +24,17 @@ use std::any::Any; use std::collections::HashMap; use std::convert::TryFrom; use std::fmt::Debug; +use std::pin::Pin; use std::sync::Arc; use arrow::array::ArrayRef; use arrow::compute::{cast_with_options, CastOptions}; +use arrow::datatypes::DataType; use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, TimeUnit}; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; +use arrow_array::StringArray; +use arrow_schema::Field; use async_trait::async_trait; use chrono::{DateTime, NaiveDateTime, Utc}; use datafusion::datasource::datasource::TableProviderFactory; @@ -43,17 +47,25 @@ use datafusion::optimizer::utils::conjunction; use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use datafusion::physical_plan::file_format::FileScanConfig; +use datafusion::physical_plan::filter::FilterExec; +use datafusion::physical_plan::limit::LocalLimitExec; use datafusion::physical_plan::{ - ColumnStatistics, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, + ColumnStatistics, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, + Statistics, }; use datafusion_common::scalar::ScalarValue; -use datafusion_common::{Column, DataFusionError, Result as DataFusionResult, ToDFSchema}; +use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion}; +use datafusion_common::{ + Column, DFSchema, DataFusionError, Result as DataFusionResult, ToDFSchema, +}; +use datafusion_expr::expr::{ScalarFunction, ScalarUDF}; use datafusion_expr::logical_plan::CreateExternalTable; -use datafusion_expr::{Expr, Extension, LogicalPlan, TableProviderFilterPushDown}; +use datafusion_expr::{col, Expr, Extension, LogicalPlan, TableProviderFilterPushDown, Volatility}; use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::{create_physical_expr, PhysicalExpr}; use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::physical_plan::PhysicalExtensionCodec; +use futures::StreamExt; use object_store::ObjectMeta; use url::Url; @@ -65,6 +77,8 @@ use crate::{action, open_table, open_table_with_storage_options, SchemaDataType} use crate::{DeltaResult, Invariant}; use crate::{DeltaTable, DeltaTableError}; +const PATH_COLUMN: &str = "__delta_rs_path"; + impl From for DataFusionError { fn from(err: DeltaTableError) -> Self { match err { @@ -980,6 +994,382 @@ impl TableProviderFactory for DeltaTableFactory { } } +pub(crate) struct FindFilesExprProperties { + pub partition_columns: Vec, + + pub partition_only: bool, + pub result: DeltaResult<()>, +} + +/// Ensure only expressions that make sense are accepted, check for +/// non-deterministic functions, and determine if the expression only contains +/// partition columns +impl TreeNodeVisitor for FindFilesExprProperties { + type N = Expr; + + fn pre_visit(&mut self, expr: &Self::N) -> datafusion_common::Result { + // TODO: We can likely relax the volatility to STABLE. Would require further + // research to confirm the same value is generated during the scan and + // rewrite phases. + + match expr { + Expr::Column(c) => { + if !self.partition_columns.contains(&c.name) { + self.partition_only = false; + } + } + Expr::ScalarVariable(_, _) + | Expr::Literal(_) + | Expr::Alias(_, _) + | Expr::BinaryExpr(_) + | Expr::Like(_) + | Expr::ILike(_) + | Expr::SimilarTo(_) + | Expr::Not(_) + | Expr::IsNotNull(_) + | Expr::IsNull(_) + | Expr::IsTrue(_) + | Expr::IsFalse(_) + | Expr::IsUnknown(_) + | Expr::IsNotTrue(_) + | Expr::IsNotFalse(_) + | Expr::IsNotUnknown(_) + | Expr::Negative(_) + | Expr::InList { .. } + | Expr::GetIndexedField(_) + | Expr::Between(_) + | Expr::Case(_) + | Expr::Cast(_) + | Expr::TryCast(_) => (), + Expr::ScalarFunction(ScalarFunction { fun, .. }) => { + let v = fun.volatility(); + if v > Volatility::Immutable { + self.result = Err(DeltaTableError::Generic(format!( + "Find files predicate contains nondeterministic function {}", + fun + ))); + return Ok(VisitRecursion::Stop); + } + } + Expr::ScalarUDF(ScalarUDF { fun, .. }) => { + let v = fun.signature.volatility; + if v > Volatility::Immutable { + self.result = Err(DeltaTableError::Generic(format!( + "Find files predicate contains nondeterministic function {}", + fun.name + ))); + return Ok(VisitRecursion::Stop); + } + } + _ => { + self.result = Err(DeltaTableError::Generic(format!( + "Find files predicate contains unsupported expression {}", + expr + ))); + return Ok(VisitRecursion::Stop); + } + } + + Ok(VisitRecursion::Continue) + } +} + +pub(crate) struct FindFiles { + pub candidates: Vec, + /// Was a physical read to the datastore required to determine the candidates + pub parition_scan: bool, +} + +async fn handle_stream( + mut stream: Pin>, +) -> Result, DeltaTableError> { + if let Some(maybe_batch) = stream.next().await { + let batch: RecordBatch = maybe_batch?; + if batch.num_rows() > 1 { + return Err(DeltaTableError::Generic( + "Find files returned multiple records for batch".to_owned(), + )); + } + let array = batch + .column_by_name(PATH_COLUMN) + .unwrap() + .as_any() + .downcast_ref::() + .ok_or(DeltaTableError::Generic(format!( + "Unable to downcast column {}", + PATH_COLUMN + )))?; + + let path = array + .into_iter() + .next() + .unwrap() + .ok_or(DeltaTableError::Generic(format!( + "{} cannot be null", + PATH_COLUMN + )))?; + return Ok(Some(path.to_string())); + } + + Ok(None) +} + +/// Determine which files contain a record that statisfies the predicate +pub(crate) async fn find_files_scan<'a>( + snapshot: &DeltaTableState, + store: ObjectStoreRef, + schema: Arc, + file_schema: Arc, + candidates: Vec<&'a Add>, + state: &SessionState, + expression: &Expr, +) -> DeltaResult> { + let mut files = Vec::new(); + let mut candidate_map: HashMap = HashMap::new(); + + let table_partition_cols = snapshot + .current_metadata() + .ok_or(DeltaTableError::NoMetadata)? + .partition_columns + .clone(); + + let mut file_groups: HashMap, Vec> = HashMap::new(); + for action in candidates { + let mut part = partitioned_file_from_action(action, &schema); + part.partition_values + .push(ScalarValue::Utf8(Some(action.path.clone()))); + + file_groups + .entry(part.partition_values.clone()) + .or_default() + .push(part); + + candidate_map.insert(action.path.to_owned(), action); + } + + let mut table_partition_cols = table_partition_cols + .iter() + .map(|c| Ok((c.to_owned(), schema.field_with_name(c)?.data_type().clone()))) + .collect::, ArrowError>>()?; + // Append a column called __delta_rs_path to track the file path + table_partition_cols.push((PATH_COLUMN.to_owned(), DataType::Utf8)); + + let input_schema = snapshot.input_schema()?; + let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; + + let predicate_expr = create_physical_expr( + &Expr::IsTrue(Box::new(expression.clone())), + &input_dfschema, + &input_schema, + state.execution_props(), + )?; + + let parquet_scan = ParquetFormat::new() + .create_physical_plan( + state, + FileScanConfig { + object_store_url: store.object_store_url(), + file_schema, + file_groups: file_groups.into_values().collect(), + statistics: snapshot.datafusion_table_statistics(), + projection: None, + limit: None, + table_partition_cols, + infinite_source: false, + output_ordering: None, + }, + None, + ) + .await?; + + let filter: Arc = + Arc::new(FilterExec::try_new(predicate_expr, parquet_scan.clone())?); + let limit: Arc = Arc::new(LocalLimitExec::new(filter, 1)); + + let task_ctx = Arc::new(TaskContext::from(state)); + let partitions = limit.output_partitioning().partition_count(); + let mut tasks = Vec::with_capacity(partitions); + + for i in 0..partitions { + let stream = limit.execute(i, task_ctx.clone())?; + tasks.push(handle_stream(stream)); + } + + for res in futures::future::join_all(tasks).await.into_iter() { + let path = res?; + if let Some(path) = path { + match candidate_map.remove(&path) { + Some(action) => files.push(action), + None => { + return Err(DeltaTableError::Generic( + "Unable to map __delta_rs_path to action.".to_owned(), + )) + } + } + } + } + + Ok(files) +} + +pub(crate) async fn scan_memory_table( + snapshot: &DeltaTableState, + predicate: &Expr, +) -> DeltaResult> { + let actions = snapshot.files().to_owned(); + + let batch = snapshot.add_actions_table(true)?; + let mut arrays = Vec::new(); + let mut fields = Vec::new(); + + let schema = batch.schema(); + + arrays.push( + batch + .column_by_name("path") + .ok_or(DeltaTableError::Generic( + "Column with name `path` does not exist".to_owned(), + ))? + .to_owned(), + ); + fields.push(Field::new(PATH_COLUMN, DataType::Utf8, false)); + + for field in schema.fields() { + if field.name().starts_with("partition.") { + let name = field.name().strip_prefix("partition.").unwrap(); + + arrays.push(batch.column_by_name(field.name()).unwrap().to_owned()); + fields.push(Field::new( + name, + field.data_type().to_owned(), + field.is_nullable(), + )); + } + } + + let schema = Arc::new(ArrowSchema::new(fields)); + let batch = RecordBatch::try_new(schema, arrays)?; + let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?; + + let ctx = SessionContext::new(); + let mut df = ctx.read_table(Arc::new(mem_table))?; + df = df + .filter(predicate.to_owned())? + .select(vec![col(PATH_COLUMN)])?; + let batches = df.collect().await?; + + let mut map = HashMap::new(); + for action in actions { + map.insert(action.path.clone(), action); + } + let mut files = Vec::new(); + + for batch in batches { + let array = batch + .column_by_name(PATH_COLUMN) + .unwrap() + .as_any() + .downcast_ref::() + .ok_or(DeltaTableError::Generic(format!( + "Unable to downcast column {}", + PATH_COLUMN + )))?; + for path in array { + let path = path.ok_or(DeltaTableError::Generic(format!( + "{} cannot be null", + PATH_COLUMN + )))?; + let value = map.remove(path).unwrap(); + files.push(value); + } + } + + Ok(files) +} + +pub(crate) async fn find_files<'a>( + snapshot: &DeltaTableState, + object_store: ObjectStoreRef, + schema: Arc, + state: &SessionState, + predicate: Option, +) -> DeltaResult { + let current_metadata = snapshot + .current_metadata() + .ok_or(DeltaTableError::NoMetadata)?; + let table_partition_cols = current_metadata.partition_columns.clone(); + + match &predicate { + Some(predicate) => { + let file_schema = Arc::new(ArrowSchema::new( + schema + .fields() + .iter() + .filter(|f| !table_partition_cols.contains(f.name())) + .cloned() + .collect::>(), + )); + + let input_schema = snapshot.input_schema()?; + let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; + let expr = create_physical_expr( + predicate, + &input_dfschema, + &input_schema, + state.execution_props(), + )?; + + // Validate the Predicate and determine if it only contains partition columns + let mut expr_properties = FindFilesExprProperties { + partition_only: true, + partition_columns: current_metadata.partition_columns.clone(), + result: Ok(()), + }; + + TreeNode::visit(predicate, &mut expr_properties)?; + expr_properties.result?; + + if expr_properties.partition_only { + let candidates = scan_memory_table(snapshot, predicate).await?; + Ok(FindFiles { + candidates, + parition_scan: true, + }) + } else { + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone())?; + let files_to_prune = pruning_predicate.prune(snapshot)?; + let files: Vec<&Add> = snapshot + .files() + .iter() + .zip(files_to_prune.into_iter()) + .filter_map(|(action, keep)| if keep { Some(action) } else { None }) + .collect(); + + // Create a new delta scan plan with only files that have a record + let candidates = find_files_scan( + snapshot, + object_store.clone(), + schema.clone(), + file_schema.clone(), + files, + state, + predicate, + ) + .await?; + + Ok(FindFiles { + candidates: candidates.into_iter().map(|s| s.to_owned()).collect(), + parition_scan: false, + }) + } + } + None => Ok(FindFiles { + candidates: snapshot.files().to_owned(), + parition_scan: true, + }), + } +} + #[cfg(test)] mod tests { use arrow::array::StructArray; diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index 78bdaf9752..5bbf254d65 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -19,8 +19,8 @@ use crate::action::DeltaOperation; use crate::delta::DeltaResult; +use crate::delta_datafusion::find_files; use crate::delta_datafusion::parquet_scan_from_actions; -use crate::delta_datafusion::partitioned_file_from_action; use crate::delta_datafusion::register_store; use crate::operations::transaction::commit; use crate::operations::write::write_execution_plan; @@ -31,41 +31,20 @@ use crate::DeltaTable; use crate::DeltaTableError; use crate::action::{Action, Add, Remove}; -use arrow::array::StringArray; -use arrow::datatypes::DataType; -use arrow::datatypes::Field; -use arrow::datatypes::Schema as ArrowSchema; -use arrow::error::ArrowError; -use arrow::record_batch::RecordBatch; -use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat}; -use datafusion::datasource::listing::PartitionedFile; -use datafusion::datasource::MemTable; -use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; +use datafusion::execution::context::{SessionContext, SessionState}; use datafusion::physical_expr::create_physical_expr; -use datafusion::physical_optimizer::pruning::PruningPredicate; -use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::filter::FilterExec; -use datafusion::physical_plan::limit::LocalLimitExec; use datafusion::physical_plan::ExecutionPlan; -use datafusion::physical_plan::RecordBatchStream; use datafusion::prelude::Expr; use datafusion_common::scalar::ScalarValue; -use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion}; use datafusion_common::DFSchema; -use datafusion_expr::expr::{ScalarFunction, ScalarUDF}; -use datafusion_expr::{col, Volatility}; use futures::future::BoxFuture; -use futures::stream::StreamExt; use parquet::file::properties::WriterProperties; use serde_json::Map; use serde_json::Value; -use std::collections::HashMap; -use std::pin::Pin; use std::sync::Arc; use std::time::{Instant, SystemTime, UNIX_EPOCH}; -const PATH_COLUMN: &str = "__delta_rs_path"; - /// Delete Records from the Delta Table. /// See this module's documentaiton for more information pub struct DeleteBuilder { @@ -102,218 +81,6 @@ pub struct DeleteMetrics { pub rewrite_time_ms: u128, } -/// Determine which files contain a record that statisfies the predicate -pub async fn find_files<'a>( - snapshot: &DeltaTableState, - store: ObjectStoreRef, - schema: Arc, - file_schema: Arc, - candidates: Vec<&'a Add>, - state: &SessionState, - expression: &Expr, -) -> DeltaResult> { - let mut files = Vec::new(); - let mut candidate_map: HashMap = HashMap::new(); - - let table_partition_cols = snapshot - .current_metadata() - .ok_or(DeltaTableError::NoMetadata)? - .partition_columns - .clone(); - - let mut file_groups: HashMap, Vec> = HashMap::new(); - for action in candidates { - let mut part = partitioned_file_from_action(action, &schema); - part.partition_values - .push(ScalarValue::Utf8(Some(action.path.clone()))); - - file_groups - .entry(part.partition_values.clone()) - .or_default() - .push(part); - - candidate_map.insert(action.path.to_owned(), action); - } - - let mut table_partition_cols = table_partition_cols - .iter() - .map(|c| Ok((c.to_owned(), schema.field_with_name(c)?.data_type().clone()))) - .collect::, ArrowError>>()?; - // Append a column called __delta_rs_path to track the file path - table_partition_cols.push((PATH_COLUMN.to_owned(), DataType::Utf8)); - - let input_schema = snapshot.input_schema()?; - let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; - - let predicate_expr = create_physical_expr( - &Expr::IsTrue(Box::new(expression.clone())), - &input_dfschema, - &input_schema, - state.execution_props(), - )?; - - let parquet_scan = ParquetFormat::new() - .create_physical_plan( - state, - FileScanConfig { - object_store_url: store.object_store_url(), - file_schema, - file_groups: file_groups.into_values().collect(), - statistics: snapshot.datafusion_table_statistics(), - projection: None, - limit: None, - table_partition_cols, - infinite_source: false, - output_ordering: None, - }, - None, - ) - .await?; - - let filter: Arc = - Arc::new(FilterExec::try_new(predicate_expr, parquet_scan.clone())?); - let limit: Arc = Arc::new(LocalLimitExec::new(filter, 1)); - - let task_ctx = Arc::new(TaskContext::from(state)); - let partitions = limit.output_partitioning().partition_count(); - let mut tasks = Vec::with_capacity(partitions); - - for i in 0..partitions { - let stream = limit.execute(i, task_ctx.clone())?; - tasks.push(handle_stream(stream)); - } - - for res in futures::future::join_all(tasks).await.into_iter() { - let path = res?; - if let Some(path) = path { - match candidate_map.remove(&path) { - Some(action) => files.push(action), - None => { - return Err(DeltaTableError::Generic( - "Unable to map __delta_rs_path to action.".to_owned(), - )) - } - } - } - } - - Ok(files) -} - -async fn handle_stream( - mut stream: Pin>, -) -> Result, DeltaTableError> { - if let Some(maybe_batch) = stream.next().await { - let batch: RecordBatch = maybe_batch?; - if batch.num_rows() > 1 { - return Err(DeltaTableError::Generic( - "Find files returned multiple records for batch".to_owned(), - )); - } - let array = batch - .column_by_name(PATH_COLUMN) - .unwrap() - .as_any() - .downcast_ref::() - .ok_or(DeltaTableError::Generic(format!( - "Unable to downcast column {}", - PATH_COLUMN - )))?; - - let path = array - .into_iter() - .next() - .unwrap() - .ok_or(DeltaTableError::Generic(format!( - "{} cannot be null", - PATH_COLUMN - )))?; - return Ok(Some(path.to_string())); - } - - Ok(None) -} - -pub(crate) struct ExprProperties { - pub partition_columns: Vec, - - pub partition_only: bool, - pub result: DeltaResult<()>, -} - -/// Ensure only expressions that make sense are accepted, check for -/// non-deterministic functions, and determine if the expression only contains -/// partition columns -impl TreeNodeVisitor for ExprProperties { - type N = Expr; - - fn pre_visit(&mut self, expr: &Self::N) -> datafusion_common::Result { - // TODO: We can likely relax the volatility to STABLE. Would require further - // research to confirm the same value is generated during the scan and - // rewrite phases. - - match expr { - Expr::Column(c) => { - if !self.partition_columns.contains(&c.name) { - self.partition_only = false; - } - } - Expr::ScalarVariable(_, _) - | Expr::Literal(_) - | Expr::Alias(_, _) - | Expr::BinaryExpr(_) - | Expr::Like(_) - | Expr::ILike(_) - | Expr::SimilarTo(_) - | Expr::Not(_) - | Expr::IsNotNull(_) - | Expr::IsNull(_) - | Expr::IsTrue(_) - | Expr::IsFalse(_) - | Expr::IsUnknown(_) - | Expr::IsNotTrue(_) - | Expr::IsNotFalse(_) - | Expr::IsNotUnknown(_) - | Expr::Negative(_) - | Expr::InList { .. } - | Expr::GetIndexedField(_) - | Expr::Between(_) - | Expr::Case(_) - | Expr::Cast(_) - | Expr::TryCast(_) => (), - Expr::ScalarFunction(ScalarFunction { fun, .. }) => { - let v = fun.volatility(); - if v > Volatility::Immutable { - self.result = Err(DeltaTableError::Generic(format!( - "Delete predicate contains nondeterministic function {}", - fun - ))); - return Ok(VisitRecursion::Stop); - } - } - Expr::ScalarUDF(ScalarUDF { fun, .. }) => { - let v = fun.signature.volatility; - if v > Volatility::Immutable { - self.result = Err(DeltaTableError::Generic(format!( - "Delete predicate contains nondeterministic function {}", - fun.name - ))); - return Ok(VisitRecursion::Stop); - } - } - _ => { - self.result = Err(DeltaTableError::Generic(format!( - "Delete predicate contains unsupported expression {}", - expr - ))); - return Ok(VisitRecursion::Stop); - } - } - - Ok(VisitRecursion::Continue) - } -} - impl DeleteBuilder { /// Create a new [`DeleteBuilder`] pub fn new(object_store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { @@ -372,13 +139,12 @@ async fn excute_non_empty_expr( state: &SessionState, expression: &Expr, metrics: &mut DeleteMetrics, + rewrite: &[Add], writer_properties: Option, -) -> DeltaResult<(Vec, Vec)> { +) -> DeltaResult> { // For each identified file perform a parquet scan + filter + limit (1) + count. // If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file. - let scan_start = Instant::now(); - let schema = snapshot.arrow_schema()?; let input_schema = snapshot.input_schema()?; let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; @@ -388,50 +154,11 @@ async fn excute_non_empty_expr( .ok_or(DeltaTableError::NoMetadata)? .partition_columns .clone(); - let file_schema = Arc::new(ArrowSchema::new( - schema - .fields() - .iter() - .filter(|f| !table_partition_cols.contains(f.name())) - .cloned() - .collect::>(), - )); - let expr = create_physical_expr( - expression, - &input_dfschema, - &input_schema, - state.execution_props(), - )?; - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone())?; - let files_to_prune = pruning_predicate.prune(snapshot)?; - let files: Vec<&Add> = snapshot - .files() - .iter() - .zip(files_to_prune.into_iter()) - .filter_map(|(action, keep)| if keep { Some(action) } else { None }) - .collect(); - - // Create a new delta scan plan with only files that have a record - let rewrite = find_files( - snapshot, - object_store.clone(), - schema.clone(), - file_schema.clone(), - files, - state, - expression, - ) - .await?; - - metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis(); - let write_start = Instant::now(); - - let rewrite: Vec = rewrite.into_iter().map(|s| s.to_owned()).collect(); let parquet_scan = parquet_scan_from_actions( snapshot, object_store.clone(), - &rewrite, + rewrite, &schema, None, state, @@ -463,7 +190,6 @@ async fn excute_non_empty_expr( writer_properties, ) .await?; - metrics.rewrite_time_ms = Instant::now().duration_since(write_start).as_millis(); let read_records = parquet_scan.metrics().and_then(|m| m.output_rows()); let filter_records = filter.metrics().and_then(|m| m.output_rows()); @@ -472,7 +198,7 @@ async fn excute_non_empty_expr( .zip(filter_records) .map(|(read, filter)| read - filter); - Ok((add_actions, rewrite)) + Ok(add_actions) } async fn execute( @@ -483,60 +209,53 @@ async fn execute( writer_properties: Option, app_metadata: Option>, ) -> DeltaResult<((Vec, i64), DeleteMetrics)> { - let mut metrics = DeleteMetrics::default(); let exec_start = Instant::now(); + let mut metrics = DeleteMetrics::default(); + let schema = snapshot.arrow_schema()?; - let (add_actions, to_delete) = match &predicate { - Some(expr) => { - let current_metadata = snapshot - .current_metadata() - .ok_or(DeltaTableError::NoMetadata)?; - - let mut expr_properties = ExprProperties { - partition_only: true, - partition_columns: current_metadata.partition_columns.clone(), - result: Ok(()), - }; - - TreeNode::visit(expr, &mut expr_properties)?; - expr_properties.result?; - - if expr_properties.partition_only { - // If the expression only refers to partition columns, we can perform - // the deletion just by removing entire files, so there is no need to - // do an scan. - let scan_start = Instant::now(); - let remove = scan_memory_table(snapshot, expr).await?; - metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_micros(); - (Vec::new(), remove) - } else { - excute_non_empty_expr( - snapshot, - object_store.clone(), - &state, - expr, - &mut metrics, - writer_properties, - ) - .await? - } - } - None => (Vec::::new(), snapshot.files().to_owned()), - }; + let scan_start = Instant::now(); + let candidates = find_files( + snapshot, + object_store.clone(), + schema.clone(), + &state, + predicate.clone(), + ) + .await?; + metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_micros(); let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); + let add = if candidates.parition_scan { + Vec::new() + } else { + let write_start = Instant::now(); + let add = excute_non_empty_expr( + snapshot, + object_store.clone(), + &state, + &predicate, + &mut metrics, + &candidates.candidates, + writer_properties, + ) + .await?; + metrics.rewrite_time_ms = Instant::now().duration_since(write_start).as_millis(); + add + }; + let remove = candidates.candidates; + let deletion_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis() as i64; - let mut actions: Vec = add_actions.into_iter().map(Action::add).collect(); + let mut actions: Vec = add.into_iter().map(Action::add).collect(); let mut version = snapshot.version(); - metrics.num_removed_files = to_delete.len(); + metrics.num_removed_files = remove.len(); metrics.num_added_files = actions.len(); - for action in to_delete { + for action in remove { actions.push(Action::remove(Remove { path: action.path, deletion_timestamp: Some(deletion_timestamp), @@ -568,81 +287,6 @@ async fn execute( Ok(((actions, version), metrics)) } -pub(crate) async fn scan_memory_table( - snapshot: &DeltaTableState, - predicate: &Expr, -) -> DeltaResult> { - let actions = snapshot.files().to_owned(); - - let batch = snapshot.add_actions_table(true)?; - let mut arrays = Vec::new(); - let mut fields = Vec::new(); - - let schema = batch.schema(); - - arrays.push( - batch - .column_by_name("path") - .ok_or(DeltaTableError::Generic( - "Column with name `path` does not exist".to_owned(), - ))? - .to_owned(), - ); - fields.push(Field::new(PATH_COLUMN, DataType::Utf8, false)); - - for field in schema.fields() { - if field.name().starts_with("partition.") { - let name = field.name().strip_prefix("partition.").unwrap(); - - arrays.push(batch.column_by_name(field.name()).unwrap().to_owned()); - fields.push(Field::new( - name, - field.data_type().to_owned(), - field.is_nullable(), - )); - } - } - - let schema = Arc::new(ArrowSchema::new(fields)); - let batch = RecordBatch::try_new(schema, arrays)?; - let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?; - - let ctx = SessionContext::new(); - let mut df = ctx.read_table(Arc::new(mem_table))?; - df = df - .filter(predicate.to_owned())? - .select(vec![col(PATH_COLUMN)])?; - let batches = df.collect().await?; - - let mut map = HashMap::new(); - for action in actions { - map.insert(action.path.clone(), action); - } - let mut files = Vec::new(); - - for batch in batches { - let array = batch - .column_by_name(PATH_COLUMN) - .unwrap() - .as_any() - .downcast_ref::() - .ok_or(DeltaTableError::Generic(format!( - "Unable to downcast column {}", - PATH_COLUMN - )))?; - for path in array { - let path = path.ok_or(DeltaTableError::Generic(format!( - "{} cannot be null", - PATH_COLUMN - )))?; - let value = map.remove(path).unwrap(); - files.push(value); - } - } - - Ok(files) -} - impl std::future::IntoFuture for DeleteBuilder { type Output = DeltaResult<(DeltaTable, DeleteMetrics)>; type IntoFuture = BoxFuture<'static, Self::Output>; diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index 1189c19dc4..d99eba527f 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -28,11 +28,10 @@ use arrow::datatypes::Schema as ArrowSchema; use arrow_schema::Field; use datafusion::{ execution::context::SessionState, - physical_optimizer::pruning::PruningPredicate, physical_plan::{projection::ProjectionExec, ExecutionPlan}, prelude::SessionContext, }; -use datafusion_common::{tree_node::TreeNode, Column, DFSchema, ScalarValue}; +use datafusion_common::{Column, DFSchema, ScalarValue}; use datafusion_expr::{case, col, lit, Expr}; use datafusion_physical_expr::{create_physical_expr, expressions, PhysicalExpr}; use futures::future::BoxFuture; @@ -40,19 +39,14 @@ use parquet::file::properties::WriterProperties; use serde_json::{Map, Value}; use crate::{ - action::{Action, Add, DeltaOperation, Remove}, - delta_datafusion::{parquet_scan_from_actions, register_store}, - operations::delete::find_files, + action::{Action, DeltaOperation, Remove}, + delta_datafusion::{find_files, parquet_scan_from_actions, register_store}, storage::{DeltaObjectStore, ObjectStoreRef}, table_state::DeltaTableState, DeltaResult, DeltaTable, DeltaTableError, }; -use super::{ - delete::{scan_memory_table, ExprProperties}, - transaction::commit, - write::write_execution_plan, -}; +use super::{transaction::commit, write::write_execution_plan}; /// Updates records in the Delta Table. /// See this module's documentaiton for more information @@ -177,68 +171,16 @@ async fn execute( let table_partition_cols = current_metadata.partition_columns.clone(); let schema = snapshot.arrow_schema()?; - let files_to_rewrite = match &predicate { - Some(predicate) => { - let file_schema = Arc::new(ArrowSchema::new( - schema - .fields() - .iter() - .filter(|f| !table_partition_cols.contains(f.name())) - .cloned() - .collect::>(), - )); - - let input_schema = snapshot.input_schema()?; - let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; - let expr = create_physical_expr( - predicate, - &input_dfschema, - &input_schema, - state.execution_props(), - )?; - - // Validate the Predicate and determine if it only contains partition columns - let mut expr_properties = ExprProperties { - partition_only: true, - partition_columns: current_metadata.partition_columns.clone(), - result: Ok(()), - }; - - TreeNode::visit(predicate, &mut expr_properties)?; - expr_properties.result?; - - let scan_start = Instant::now(); - let candidates = if expr_properties.partition_only { - scan_memory_table(snapshot, predicate).await? - } else { - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone())?; - let files_to_prune = pruning_predicate.prune(snapshot)?; - let files: Vec<&Add> = snapshot - .files() - .iter() - .zip(files_to_prune.into_iter()) - .filter_map(|(action, keep)| if keep { Some(action) } else { None }) - .collect(); - - // Create a new delta scan plan with only files that have a record - let candidates = find_files( - snapshot, - object_store.clone(), - schema.clone(), - file_schema.clone(), - files, - &state, - predicate, - ) - .await?; - candidates.into_iter().map(|s| s.to_owned()).collect() - }; - metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_micros(); - candidates - } - None => snapshot.files().to_owned(), - }; - + let scan_start = Instant::now(); + let candidates = find_files( + snapshot, + object_store.clone(), + schema.clone(), + &state, + predicate.clone(), + ) + .await?; + metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_micros(); let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); let execution_props = state.execution_props(); @@ -247,7 +189,7 @@ async fn execute( let parquet_scan = parquet_scan_from_actions( snapshot, object_store.clone(), - &files_to_rewrite, + &candidates.candidates, &schema, None, &state, @@ -367,9 +309,9 @@ async fn execute( let mut actions: Vec = add_actions.into_iter().map(Action::add).collect(); metrics.num_added_files = actions.len(); - metrics.num_removed_files = files_to_rewrite.len(); + metrics.num_removed_files = candidates.candidates.len(); - for action in files_to_rewrite { + for action in candidates.candidates { actions.push(Action::remove(Remove { path: action.path, deletion_timestamp: Some(deletion_timestamp), From be4cbb926057a19531475f5ded2ddd82798326ba Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sun, 28 May 2023 00:48:35 -0400 Subject: [PATCH 05/26] remove duplicated test --- rust/src/operations/delete.rs | 62 ----------------------------------- 1 file changed, 62 deletions(-) diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index 5bbf254d65..03e7fd777d 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -588,68 +588,6 @@ mod tests { let actual = get_data(table).await; assert_batches_sorted_eq!(&expected, &actual); } - #[tokio::test] - async fn test_delete_on_mixed_predicate() { - // Perform a delete where the predicate only contains partition columns - - let schema = get_arrow_schema(&None); - let table = setup_table(Some(["modified"].to_vec())).await; - - let batch = RecordBatch::try_new( - Arc::clone(&schema), - vec![ - Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])), - Arc::new(arrow::array::Int32Array::from_slice([0, 20, 10, 100])), - Arc::new(arrow::array::StringArray::from_slice([ - "2021-02-02", - "2021-02-03", - "2021-02-02", - "2021-02-03", - ])), - ], - ) - .unwrap(); - - // write some data - let table = DeltaOps(table) - .write(vec![batch]) - .with_save_mode(SaveMode::Append) - .await - .unwrap(); - assert_eq!(table.version(), 1); - assert_eq!(table.get_file_uris().count(), 2); - - let (table, metrics) = DeltaOps(table) - .delete() - .with_predicate( - col("modified") - .eq(lit("2021-02-03")) - .and(col("id").eq(lit(20))), - ) - .await - .unwrap(); - assert_eq!(table.version(), 2); - assert_eq!(table.get_file_uris().count(), 1); - - assert_eq!(metrics.num_added_files, 0); - assert_eq!(metrics.num_removed_files, 1); - assert_eq!(metrics.num_deleted_rows, None); - assert_eq!(metrics.num_copied_rows, None); - assert!(metrics.scan_time_ms > 0); - assert_eq!(metrics.rewrite_time_ms, 0); - - let expected = vec![ - "+----+-------+------------+", - "| id | value | modified |", - "+----+-------+------------+", - "| A | 0 | 2021-02-02 |", - "| A | 10 | 2021-02-02 |", - "+----+-------+------------+", - ]; - - let actual = get_data(table).await; - assert_batches_sorted_eq!(&expected, &actual); - } #[tokio::test] async fn test_delete_on_partition_column() { From cd00c6ec29ca41e8beff93e35363ba5f11efe7e6 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sun, 28 May 2023 22:36:14 -0400 Subject: [PATCH 06/26] Obtain update and copy metrics for operation --- rust/src/operations/update.rs | 188 +++++++++++++++++++++++++++++----- 1 file changed, 164 insertions(+), 24 deletions(-) diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index d99eba527f..bd75fb697f 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -25,16 +25,26 @@ use std::{ }; use arrow::datatypes::Schema as ArrowSchema; -use arrow_schema::Field; +use arrow_array::RecordBatch; +use arrow_schema::{Field, SchemaRef}; use datafusion::{ execution::context::SessionState, - physical_plan::{projection::ProjectionExec, ExecutionPlan}, + physical_plan::{ + metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, + projection::ProjectionExec, + ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, + }, prelude::SessionContext, }; +use datafusion_common::Result as DataFusionResult; use datafusion_common::{Column, DFSchema, ScalarValue}; -use datafusion_expr::{case, col, lit, Expr}; -use datafusion_physical_expr::{create_physical_expr, expressions, PhysicalExpr}; -use futures::future::BoxFuture; +use datafusion_expr::{case, col, lit, when, Expr}; +use datafusion_physical_expr::{ + create_physical_expr, + expressions::{self}, + PhysicalExpr, +}; +use futures::{future::BoxFuture, Stream, StreamExt}; use parquet::file::properties::WriterProperties; use serde_json::{Map, Value}; @@ -74,10 +84,10 @@ pub struct UpdateMetrics { pub num_added_files: usize, /// Number of files removed. pub num_removed_files: usize, - // Number of rows updated. - // pub num_updated_rows: usize, - // Number of rows just copied over in the process of updating files. - // pub num_copied_rows: usize, + /// Number of rows updated. + pub num_updated_rows: usize, + /// Number of rows just copied over in the process of updating files. + pub num_copied_rows: usize, /// Time taken to execute the entire operation. pub execution_time_ms: u128, /// Time taken to scan the files for matches. @@ -223,19 +233,32 @@ async fn execute( )); } - let predicate_expr = - create_physical_expr(&predicate, &input_dfschema, &input_schema, execution_props)?; + // Take advantage of how null counts are tracked in arrow arrays use the + // null count to track how many records do NOT statisfy the predicate. The + // count is then exposed through the metrics through the `UpdateCountScan` + // execution plan + let predicate_null = + when(predicate.clone(), lit(true)).otherwise(lit(ScalarValue::Boolean(None)))?; + let predicate_expr = create_physical_expr( + &predicate_null, + &input_dfschema, + &input_schema, + execution_props, + )?; expressions.push((predicate_expr, "__delta_rs_update_predicate".to_string())); + let projection_predicate: Arc = + Arc::new(ProjectionExec::try_new(expressions, parquet_scan)?); + + let count_plan = Arc::new(UpdateCountScan::new(projection_predicate.clone())); + // Perform another projection but instead calculate updated values based on // the predicate value. If the predicate is true then evalute the user // provided expression otherwise return the original column value // // For each update column a new column with a name of __delta_rs_ + `original name` is created - let projection: Arc = - Arc::new(ProjectionExec::try_new(expressions, parquet_scan)?); let mut expressions: Vec<(Arc, String)> = Vec::new(); - let scan_schema = projection.schema(); + let scan_schema = count_plan.schema(); for (i, field) in scan_schema.fields().into_iter().enumerate() { expressions.push(( Arc::new(expressions::Column::new(field.name(), i)), @@ -260,12 +283,12 @@ async fn execute( control_columns.insert(c); } - let projection_predicate: Arc = - Arc::new(ProjectionExec::try_new(expressions, projection.clone())?); + let projection_update: Arc = + Arc::new(ProjectionExec::try_new(expressions, count_plan.clone())?); // Project again to remove __delta_rs columns and rename update columns to their original name let mut expressions: Vec<(Arc, String)> = Vec::new(); - let scan_schema = projection_predicate.schema(); + let scan_schema = projection_update.schema(); for (i, field) in scan_schema.fields().into_iter().enumerate() { if !control_columns.contains(field.name()) { match map.get(field.name()) { @@ -287,7 +310,7 @@ async fn execute( let projection: Arc = Arc::new(ProjectionExec::try_new( expressions, - projection_predicate.clone(), + projection_update.clone(), )?); let add_actions = write_execution_plan( @@ -302,6 +325,17 @@ async fn execute( ) .await?; + let count_metrics = count_plan.metrics().unwrap(); + + metrics.num_updated_rows = count_metrics + .sum_by_name("num_updated_rows") + .unwrap() + .as_usize(); + metrics.num_copied_rows = count_metrics + .sum_by_name("num_copied_rows") + .unwrap() + .as_usize(); + let deletion_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() @@ -379,6 +413,112 @@ impl std::future::IntoFuture for UpdateBuilder { }) } } + +#[derive(Debug)] +struct UpdateCountScan { + parent: Arc, + metrics: ExecutionPlanMetricsSet, +} + +impl UpdateCountScan { + pub fn new(parent: Arc) -> Self { + UpdateCountScan { + parent, + metrics: ExecutionPlanMetricsSet::new(), + } + } +} + +impl ExecutionPlan for UpdateCountScan { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow_schema::SchemaRef { + self.parent.schema() + } + + fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { + self.parent.output_partitioning() + } + + fn output_ordering(&self) -> Option<&[datafusion_physical_expr::PhysicalSortExpr]> { + self.parent.output_ordering() + } + + fn children(&self) -> Vec> { + vec![self.parent.clone()] + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> datafusion_common::Result { + let res = self.parent.execute(partition, context)?; + Ok(Box::pin(UpdateCountStream { + schema: self.schema(), + input: res, + metrics: self.metrics.clone(), + })) + } + + fn statistics(&self) -> datafusion_common::Statistics { + self.parent.statistics() + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> datafusion_common::Result> { + ExecutionPlan::with_new_children(self.parent.clone(), children) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } +} + +struct UpdateCountStream { + schema: SchemaRef, + input: SendableRecordBatchStream, + metrics: ExecutionPlanMetricsSet, +} + +impl Stream for UpdateCountStream { + type Item = DataFusionResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.input.poll_next_unpin(cx).map(|x| match x { + Some(Ok(batch)) => { + let array = batch.column_by_name("__delta_rs_update_predicate").unwrap(); + let copied_rows = array.null_count(); + let num_updated = array.len() - copied_rows; + let c1 = MetricBuilder::new(&self.metrics).global_counter("num_updated_rows"); + c1.add(num_updated); + + let c2 = MetricBuilder::new(&self.metrics).global_counter("num_copied_rows"); + c2.add(copied_rows); + Some(Ok(batch)) + } + other => other, + }) + } + + fn size_hint(&self) -> (usize, Option) { + self.input.size_hint() + } +} + +impl RecordBatchStream for UpdateCountStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + #[cfg(test)] mod tests { @@ -454,8 +594,8 @@ mod tests { assert_eq!(table.get_file_uris().count(), 1); assert_eq!(metrics.num_added_files, 1); assert_eq!(metrics.num_removed_files, 1); - //assert_eq!(metrics.num_updated_rows, 4); - //assert_eq!(metrics.num_copied_rows, 0); + assert_eq!(metrics.num_updated_rows, 4); + assert_eq!(metrics.num_copied_rows, 0); let expected = vec![ "+----+-------+------------+", @@ -510,8 +650,8 @@ mod tests { assert_eq!(table.get_file_uris().count(), 1); assert_eq!(metrics.num_added_files, 1); assert_eq!(metrics.num_removed_files, 1); - //assert_eq!(metrics.num_updated_rows, 4); - //assert_eq!(metrics.num_copied_rows, 0); + assert_eq!(metrics.num_updated_rows, 2); + assert_eq!(metrics.num_copied_rows, 2); let expected = vec![ "+----+-------+------------+", @@ -567,8 +707,8 @@ mod tests { assert_eq!(table.get_file_uris().count(), 2); assert_eq!(metrics.num_added_files, 1); assert_eq!(metrics.num_removed_files, 1); - //assert_eq!(metrics.num_updated_rows, 2); - //assert_eq!(metrics.num_copied_rows, 0); + assert_eq!(metrics.num_updated_rows, 2); + assert_eq!(metrics.num_copied_rows, 0); let expected = vec![ "+----+-------+------------+", From ac6bf57a212bd553b585b1b14a5d9b92a4af9f5b Mon Sep 17 00:00:00 2001 From: David Blajda Date: Thu, 1 Jun 2023 22:12:07 -0400 Subject: [PATCH 07/26] Add additional tests --- rust/src/operations/update.rs | 195 ++++++++++++++++++++++++++++------ 1 file changed, 165 insertions(+), 30 deletions(-) diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index bd75fb697f..1ba1f50a47 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -235,7 +235,7 @@ async fn execute( // Take advantage of how null counts are tracked in arrow arrays use the // null count to track how many records do NOT statisfy the predicate. The - // count is then exposed through the metrics through the `UpdateCountScan` + // count is then exposed through the metrics through the `UpdateCountExec` // execution plan let predicate_null = when(predicate.clone(), lit(true)).otherwise(lit(ScalarValue::Boolean(None)))?; @@ -250,7 +250,7 @@ async fn execute( let projection_predicate: Arc = Arc::new(ProjectionExec::try_new(expressions, parquet_scan)?); - let count_plan = Arc::new(UpdateCountScan::new(projection_predicate.clone())); + let count_plan = Arc::new(UpdateCountExec::new(projection_predicate.clone())); // Perform another projection but instead calculate updated values based on // the predicate value. If the predicate is true then evalute the user @@ -415,21 +415,21 @@ impl std::future::IntoFuture for UpdateBuilder { } #[derive(Debug)] -struct UpdateCountScan { +struct UpdateCountExec { parent: Arc, metrics: ExecutionPlanMetricsSet, } -impl UpdateCountScan { +impl UpdateCountExec { pub fn new(parent: Arc) -> Self { - UpdateCountScan { + UpdateCountExec { parent, metrics: ExecutionPlanMetricsSet::new(), } } } -impl ExecutionPlan for UpdateCountScan { +impl ExecutionPlan for UpdateCountExec { fn as_any(&self) -> &dyn std::any::Any { self } @@ -522,11 +522,13 @@ impl RecordBatchStream for UpdateCountStream { #[cfg(test)] mod tests { - use crate::action::*; use crate::operations::DeltaOps; use crate::writer::test_utils::{get_arrow_schema, get_delta_schema}; use crate::DeltaTable; + use crate::{action::*, DeltaResult}; + use arrow::datatypes::{Field, Schema}; use arrow::record_batch::RecordBatch; + use arrow_array::Int32Array; use datafusion::assert_batches_sorted_eq; use datafusion::from_slice::FromSlice; use datafusion::prelude::*; @@ -545,7 +547,8 @@ mod tests { table } - async fn get_data(table: DeltaTable) -> Vec { + async fn get_data(table: &DeltaTable) -> Vec { + let table = DeltaTable::new_with_state(table.object_store(), table.state.clone()); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(table)).unwrap(); ctx.sql("select * from test") @@ -556,6 +559,35 @@ mod tests { .unwrap() } + async fn write_batch(table: DeltaTable, batch: RecordBatch) -> DeltaResult { + DeltaOps(table) + .write(vec![batch.clone()]) + .with_save_mode(SaveMode::Append) + .await + } + + async fn prepare_values_table() -> DeltaTable { + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + arrow::datatypes::DataType::Int32, + true, + )])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![ + Some(0), + None, + Some(2), + None, + Some(4), + ]))], + ) + .unwrap(); + + DeltaOps::new_in_memory().write(vec![batch]).await.unwrap() + } + #[tokio::test] async fn test_update_no_predicate() { let schema = get_arrow_schema(&None); @@ -575,12 +607,8 @@ mod tests { ], ) .unwrap(); - // write some data - let table = DeltaOps(table) - .write(vec![batch.clone()]) - .with_save_mode(SaveMode::Append) - .await - .unwrap(); + + let table = write_batch(table, batch).await.unwrap(); assert_eq!(table.version(), 1); assert_eq!(table.get_file_uris().count(), 1); @@ -607,7 +635,7 @@ mod tests { "| B | 10 | 2023-05-14 |", "+----+-------+------------+", ]; - let actual = get_data(table).await; + let actual = get_data(&table).await; assert_batches_sorted_eq!(&expected, &actual); } @@ -630,12 +658,8 @@ mod tests { ], ) .unwrap(); - // write some data - let table = DeltaOps(table) - .write(vec![batch.clone()]) - .with_save_mode(SaveMode::Append) - .await - .unwrap(); + + let table = write_batch(table, batch).await.unwrap(); assert_eq!(table.version(), 1); assert_eq!(table.get_file_uris().count(), 1); @@ -663,7 +687,7 @@ mod tests { "| B | 10 | 2021-02-02 |", "+----+-------+------------+", ]; - let actual = get_data(table).await; + let actual = get_data(&table).await; assert_batches_sorted_eq!(&expected, &actual); } @@ -686,12 +710,8 @@ mod tests { ], ) .unwrap(); - // write some data - let table = DeltaOps(table) - .write(vec![batch.clone()]) - .with_save_mode(SaveMode::Append) - .await - .unwrap(); + + let table = write_batch(table, batch).await.unwrap(); assert_eq!(table.version(), 1); assert_eq!(table.get_file_uris().count(), 2); @@ -721,12 +741,118 @@ mod tests { "+----+-------+------------+", ]; - let actual = get_data(table).await; + let actual = get_data(&table).await; assert_batches_sorted_eq!(&expected, &actual); } #[tokio::test] - async fn test_failure_invalid_expressions() { + async fn test_update_null() { + let table = prepare_values_table().await; + assert_eq!(table.version(), 0); + assert_eq!(table.get_file_uris().count(), 1); + + let (table, metrics) = DeltaOps(table) + .update() + .with_update("value", col("value") + lit(1)) + .await + .unwrap(); + + assert_eq!(table.version(), 1); + assert_eq!(table.get_file_uris().count(), 1); + assert_eq!(metrics.num_added_files, 1); + assert_eq!(metrics.num_removed_files, 1); + assert_eq!(metrics.num_updated_rows, 5); + assert_eq!(metrics.num_copied_rows, 0); + + let expected = [ + "+-------+", + "| value |", + "+-------+", + "| |", + "| |", + "| 1 |", + "| 3 |", + "| 5 |", + "+-------+", + ]; + + let actual = get_data(&table).await; + assert_batches_sorted_eq!(&expected, &actual); + + // Validate order operators do not include nulls + let table = prepare_values_table().await; + let (table, metrics) = DeltaOps(table) + .update() + .with_predicate(col("value").gt(lit(2)).or(col("value").lt(lit(2)))) + .with_update("value", lit(10)) + .await + .unwrap(); + assert_eq!(table.version(), 1); + assert_eq!(table.get_file_uris().count(), 1); + assert_eq!(metrics.num_added_files, 1); + assert_eq!(metrics.num_removed_files, 1); + assert_eq!(metrics.num_updated_rows, 2); + assert_eq!(metrics.num_copied_rows, 3); + + let expected = [ + "+-------+", + "| value |", + "+-------+", + "| |", + "| |", + "| 2 |", + "| 10 |", + "| 10 |", + "+-------+", + ]; + let actual = get_data(&table).await; + assert_batches_sorted_eq!(&expected, &actual); + + let table = prepare_values_table().await; + let (table, metrics) = DeltaOps(table) + .update() + .with_predicate(col("value").is_null()) + .with_update("value", lit("test")) + .await + .unwrap(); + assert_eq!(table.version(), 1); + assert_eq!(table.get_file_uris().count(), 1); + assert_eq!(metrics.num_added_files, 1); + assert_eq!(metrics.num_removed_files, 1); + assert_eq!(metrics.num_updated_rows, 2); + assert_eq!(metrics.num_copied_rows, 3); + + let expected = [ + "+-------+", + "| value |", + "+-------+", + "| 10 |", + "| 10 |", + "| 0 |", + "| 2 |", + "| 4 |", + "+-------+", + ]; + let actual = get_data(&table).await; + assert_batches_sorted_eq!(&expected, &actual); + } + + #[tokio::test] + async fn test_no_updates() { + let table = prepare_values_table().await; + let (table, metrics) = DeltaOps(table).update().await.unwrap(); + + assert_eq!(table.version(), 0); + assert_eq!(metrics.num_added_files, 0); + assert_eq!(metrics.num_removed_files, 0); + assert_eq!(metrics.num_copied_rows, 0); + assert_eq!(metrics.num_removed_files, 0); + assert_eq!(metrics.scan_time_ms, 0); + assert_eq!(metrics.execution_time_ms, 0); + } + + #[tokio::test] + async fn test_expected_failures() { // The predicate must be deterministic and expression must be valid let table = setup_table(None).await; @@ -740,5 +866,14 @@ mod tests { .with_update("value", col("value") + lit(20)) .await; assert!(res.is_err()); + + // Expression result types must match the table's schema + // I.E Schema evolution is not supported for now + let table = prepare_values_table().await; + let res = DeltaOps(table) + .update() + .with_update("value", lit("a string")) + .await; + assert!(res.is_err()); } } From c52aa096a2718db8af40fba87c6f111dab5fc94d Mon Sep 17 00:00:00 2001 From: David Blajda Date: Mon, 5 Jun 2023 20:39:00 -0400 Subject: [PATCH 08/26] Allow datafusion expressions or string expressions --- rust/src/operations/delete.rs | 17 +----- rust/src/operations/update.rs | 98 +++++++++++++++++++++++++---------- 2 files changed, 72 insertions(+), 43 deletions(-) diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index 33f1f0848c..256b2b8953 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -17,23 +17,12 @@ //! .await?; //! ```` -use std::collections::HashMap; -use std::pin::Pin; use std::sync::Arc; use std::time::{Instant, SystemTime, UNIX_EPOCH}; use crate::action::{Action, Add, Remove}; -use arrow::array::StringArray; -use arrow::datatypes::DataType; -use arrow::datatypes::Field; -use arrow::datatypes::Schema as ArrowSchema; -use arrow::error::ArrowError; -use arrow::record_batch::RecordBatch; use arrow_cast::CastOptions; -use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat}; -use datafusion::datasource::listing::PartitionedFile; -use datafusion::datasource::MemTable; -use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; +use datafusion::execution::context::{SessionContext, SessionState}; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::ExecutionPlan; @@ -46,9 +35,7 @@ use serde_json::Map; use serde_json::Value; use crate::action::DeltaOperation; -use crate::delta_datafusion::{ - parquet_scan_from_actions, partitioned_file_from_action, register_store, -}; +use crate::delta_datafusion::{find_files, parquet_scan_from_actions, register_store}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::operations::transaction::commit; use crate::operations::write::write_execution_plan; diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index 1ba1f50a47..279773d3b1 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -26,6 +26,7 @@ use std::{ use arrow::datatypes::Schema as ArrowSchema; use arrow_array::RecordBatch; +use arrow_cast::CastOptions; use arrow_schema::{Field, SchemaRef}; use datafusion::{ execution::context::SessionState, @@ -58,13 +59,33 @@ use crate::{ use super::{transaction::commit, write::write_execution_plan}; +/// Used to represent user input of either a Datafusion expression or string expression +pub enum Expression { + /// Datafusion Expression + E(Expr), + /// String Expression + S(String), +} + +impl From for Expression { + fn from(val: Expr) -> Self { + Expression::E(val) + } +} + +impl From<&str> for Expression { + fn from(val: &str) -> Self { + Expression::S(val.to_string()) + } +} + /// Updates records in the Delta Table. /// See this module's documentaiton for more information pub struct UpdateBuilder { /// Which records to update - predicate: Option, + predicate: Option, /// How to update columns in a record that match the predicate - updates: HashMap, + updates: HashMap, /// A snapshot of the table's state snapshot: DeltaTableState, /// Delta object store for handling data files @@ -75,6 +96,9 @@ pub struct UpdateBuilder { writer_properties: Option, /// Additional metadata to be added to commit app_metadata: Option>, + /// CastOptions determines how data types that do not match the underlying table are handled + /// By default an error is returned + cast_options: CastOptions, } #[derive(Default)] @@ -105,24 +129,23 @@ impl UpdateBuilder { state: None, writer_properties: None, app_metadata: None, + cast_options: CastOptions { safe: false }, } } /// Which records to update - pub fn with_predicate(mut self, predicate: Expr) -> Self { - self.predicate = Some(predicate); - self - } - - /// Overwrite update expressions with the supplied map - pub fn with_updates(mut self, updates: HashMap) -> Self { - self.updates = updates; + pub fn with_predicate>(mut self, predicate: E) -> Self { + self.predicate = Some(predicate.into()); self } /// Perform an additonal update expression during the operaton - pub fn with_update>(mut self, column: S, expression: Expr) -> Self { - self.updates.insert(column.into(), expression); + pub fn with_update, E: Into>( + mut self, + column: S, + expression: E, + ) -> Self { + self.updates.insert(column.into(), expression.into()); self } @@ -146,16 +169,24 @@ impl UpdateBuilder { self.writer_properties = Some(writer_properties); self } + + /// Specify the cast options to use when casting columns that do not match the table's schema. + pub fn with_cast_options(mut self, cast_options: CastOptions) -> Self { + self.cast_options = cast_options; + self + } } +#[allow(clippy::too_many_arguments)] async fn execute( - predicate: Option, - updates: &HashMap, + predicate: Option, + updates: HashMap, object_store: ObjectStoreRef, snapshot: &DeltaTableState, state: SessionState, writer_properties: Option, app_metadata: Option>, + cast_options: CastOptions, ) -> DeltaResult<((Vec, i64), UpdateMetrics)> { // Validate the predicate and update expressions // @@ -175,6 +206,24 @@ async fn execute( return Ok(((Vec::new(), version), metrics)); } + let predicate = match predicate { + Some(predicate) => match predicate { + Expression::E(expr) => Some(expr), + Expression::S(s) => Some(snapshot.parse_predicate_expression(s)?), + }, + None => None, + }; + + let mut _updates = HashMap::new(); + for (key, expr) in updates { + let expr = match expr { + Expression::E(e) => e, + Expression::S(s) => snapshot.parse_predicate_expression(s)?, + }; + _updates.insert(key, expr); + } + let updates = _updates; + let current_metadata = snapshot .current_metadata() .ok_or(DeltaTableError::NoMetadata)?; @@ -322,6 +371,7 @@ async fn execute( Some(snapshot.table_config().target_file_size() as usize), None, writer_properties, + &cast_options, ) .await?; @@ -396,12 +446,13 @@ impl std::future::IntoFuture for UpdateBuilder { let ((actions, version), metrics) = execute( this.predicate, - &this.updates, + this.updates, this.object_store.clone(), &this.snapshot, state, this.writer_properties, this.app_metadata, + this.cast_options, ) .await?; @@ -523,6 +574,7 @@ impl RecordBatchStream for UpdateCountStream { mod tests { use crate::operations::DeltaOps; + use crate::writer::test_utils::datafusion::get_data; use crate::writer::test_utils::{get_arrow_schema, get_delta_schema}; use crate::DeltaTable; use crate::{action::*, DeltaResult}; @@ -547,18 +599,6 @@ mod tests { table } - async fn get_data(table: &DeltaTable) -> Vec { - let table = DeltaTable::new_with_state(table.object_store(), table.state.clone()); - let ctx = SessionContext::new(); - ctx.register_table("test", Arc::new(table)).unwrap(); - ctx.sql("select * from test") - .await - .unwrap() - .collect() - .await - .unwrap() - } - async fn write_batch(table: DeltaTable, batch: RecordBatch) -> DeltaResult { DeltaOps(table) .write(vec![batch.clone()]) @@ -837,6 +877,9 @@ mod tests { assert_batches_sorted_eq!(&expected, &actual); } + #[tokio::test] + async fn test_str_expressions() {} + #[tokio::test] async fn test_no_updates() { let table = prepare_values_table().await; @@ -868,7 +911,6 @@ mod tests { assert!(res.is_err()); // Expression result types must match the table's schema - // I.E Schema evolution is not supported for now let table = prepare_values_table().await; let res = DeltaOps(table) .update() From 1188985effae4a425f2d400d6882d596a0848ad5 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Mon, 5 Jun 2023 20:53:18 -0400 Subject: [PATCH 09/26] Update tests --- rust/src/operations/delete.rs | 3 +-- rust/src/operations/update.rs | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index 256b2b8953..e6473ed393 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -388,8 +388,7 @@ mod tests { assert_eq!(metrics.num_deleted_rows, None); assert_eq!(metrics.num_copied_rows, None); - // Scan and rewrite is not required - assert_eq!(metrics.scan_time_ms, 0); + // rewrite is not required assert_eq!(metrics.rewrite_time_ms, 0); // Deletes with no changes to state must not commit diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index 279773d3b1..d40b7ca4ea 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -852,7 +852,7 @@ mod tests { let (table, metrics) = DeltaOps(table) .update() .with_predicate(col("value").is_null()) - .with_update("value", lit("test")) + .with_update("value", lit(10)) .await .unwrap(); assert_eq!(table.version(), 1); From a036705129638670a758deeaf07d53c1d2f96b1d Mon Sep 17 00:00:00 2001 From: David Blajda Date: Tue, 6 Jun 2023 19:07:38 -0400 Subject: [PATCH 10/26] Update rust/src/operations/update.rs Co-authored-by: Will Jones --- rust/src/operations/update.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index d40b7ca4ea..dbd741bc89 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -62,9 +62,9 @@ use super::{transaction::commit, write::write_execution_plan}; /// Used to represent user input of either a Datafusion expression or string expression pub enum Expression { /// Datafusion Expression - E(Expr), + DataFusion(Expr), /// String Expression - S(String), + String(String), } impl From for Expression { From 5dc5bdf6048b4c756766763ab67174e038bc405b Mon Sep 17 00:00:00 2001 From: David Blajda Date: Tue, 6 Jun 2023 19:07:46 -0400 Subject: [PATCH 11/26] Update rust/src/delta_datafusion.rs Co-authored-by: Will Jones --- rust/src/delta_datafusion.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index afad594e50..5b621b513b 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -1085,7 +1085,7 @@ impl TreeNodeVisitor for FindFilesExprProperties { pub(crate) struct FindFiles { pub candidates: Vec, /// Was a physical read to the datastore required to determine the candidates - pub parition_scan: bool, + pub partition_scan: bool, } async fn handle_stream( From 8be224a3fdc242a96737b553616b2b0dd7cde49c Mon Sep 17 00:00:00 2001 From: David Blajda Date: Tue, 6 Jun 2023 19:08:20 -0400 Subject: [PATCH 12/26] Update rust/src/operations/update.rs Co-authored-by: Will Jones --- rust/src/operations/update.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index dbd741bc89..ab8b1acd87 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -80,7 +80,7 @@ impl From<&str> for Expression { } /// Updates records in the Delta Table. -/// See this module's documentaiton for more information +/// See this module's documentation for more information pub struct UpdateBuilder { /// Which records to update predicate: Option, From 1e54a7127afb47d4ac9fcbe790ca8312a9a9f2bc Mon Sep 17 00:00:00 2001 From: David Blajda Date: Tue, 6 Jun 2023 19:13:08 -0400 Subject: [PATCH 13/26] Update rust/src/delta_datafusion.rs Co-authored-by: Will Jones --- rust/src/delta_datafusion.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 5b621b513b..73e07f68e2 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -1088,7 +1088,7 @@ pub(crate) struct FindFiles { pub partition_scan: bool, } -async fn handle_stream( +async fn get_filename_if_nonempty( mut stream: Pin>, ) -> Result, DeltaTableError> { if let Some(maybe_batch) = stream.next().await { From 3b685e68abac1a6c8f39140b587d3a86726e6ea6 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Tue, 6 Jun 2023 21:00:14 -0400 Subject: [PATCH 14/26] return early if no files need to be updated. clean up find files impl --- rust/src/delta_datafusion.rs | 92 +++++++++++++---------------------- rust/src/operations/delete.rs | 2 +- rust/src/operations/update.rs | 73 ++++++++++++++++----------- 3 files changed, 80 insertions(+), 87 deletions(-) diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 73e07f68e2..221909fda6 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -24,7 +24,6 @@ use std::any::Any; use std::collections::HashMap; use std::convert::TryFrom; use std::fmt::Debug; -use std::pin::Pin; use std::sync::Arc; use arrow::array::ArrayRef; @@ -50,8 +49,7 @@ use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::limit::LocalLimitExec; use datafusion::physical_plan::{ - ColumnStatistics, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, - Statistics, + ColumnStatistics, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; use datafusion_common::scalar::ScalarValue; use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion}; @@ -65,7 +63,6 @@ use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::{create_physical_expr, PhysicalExpr}; use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::physical_plan::PhysicalExtensionCodec; -use futures::StreamExt; use object_store::ObjectMeta; use url::Url; @@ -1088,40 +1085,6 @@ pub(crate) struct FindFiles { pub partition_scan: bool, } -async fn get_filename_if_nonempty( - mut stream: Pin>, -) -> Result, DeltaTableError> { - if let Some(maybe_batch) = stream.next().await { - let batch: RecordBatch = maybe_batch?; - if batch.num_rows() > 1 { - return Err(DeltaTableError::Generic( - "Find files returned multiple records for batch".to_owned(), - )); - } - let array = batch - .column_by_name(PATH_COLUMN) - .unwrap() - .as_any() - .downcast_ref::() - .ok_or(DeltaTableError::Generic(format!( - "Unable to downcast column {}", - PATH_COLUMN - )))?; - - let path = array - .into_iter() - .next() - .unwrap() - .ok_or(DeltaTableError::Generic(format!( - "{} cannot be null", - PATH_COLUMN - )))?; - return Ok(Some(path.to_string())); - } - - Ok(None) -} - /// Determine which files contain a record that statisfies the predicate pub(crate) async fn find_files_scan<'a>( snapshot: &DeltaTableState, @@ -1195,24 +1158,39 @@ pub(crate) async fn find_files_scan<'a>( let limit: Arc = Arc::new(LocalLimitExec::new(filter, 1)); let task_ctx = Arc::new(TaskContext::from(state)); - let partitions = limit.output_partitioning().partition_count(); - let mut tasks = Vec::with_capacity(partitions); + let path_batches = datafusion::physical_plan::collect(limit, task_ctx).await?; - for i in 0..partitions { - let stream = limit.execute(i, task_ctx.clone())?; - tasks.push(handle_stream(stream)); - } + for batch in path_batches { + if batch.num_rows() > 1 { + return Err(DeltaTableError::Generic( + "Find files returned multiple records for batch".to_owned(), + )); + } + let array = batch + .column_by_name(PATH_COLUMN) + .unwrap() + .as_any() + .downcast_ref::() + .ok_or(DeltaTableError::Generic(format!( + "Unable to downcast column {}", + PATH_COLUMN + )))?; - for res in futures::future::join_all(tasks).await.into_iter() { - let path = res?; - if let Some(path) = path { - match candidate_map.remove(&path) { - Some(action) => files.push(action), - None => { - return Err(DeltaTableError::Generic( - "Unable to map __delta_rs_path to action.".to_owned(), - )) - } + let path = array + .into_iter() + .next() + .unwrap() + .ok_or(DeltaTableError::Generic(format!( + "{} cannot be null", + PATH_COLUMN + )))?; + + match candidate_map.remove(path) { + Some(action) => files.push(action), + None => { + return Err(DeltaTableError::Generic( + "Unable to map __delta_rs_path to action.".to_owned(), + )) } } } @@ -1341,7 +1319,7 @@ pub(crate) async fn find_files<'a>( let candidates = scan_memory_table(snapshot, predicate).await?; Ok(FindFiles { candidates, - parition_scan: true, + partition_scan: true, }) } else { let pruning_predicate = PruningPredicate::try_new(expr, schema.clone())?; @@ -1367,13 +1345,13 @@ pub(crate) async fn find_files<'a>( Ok(FindFiles { candidates: candidates.into_iter().map(|s| s.to_owned()).collect(), - parition_scan: false, + partition_scan: false, }) } } None => Ok(FindFiles { candidates: snapshot.files().to_owned(), - parition_scan: true, + partition_scan: true, }), } } diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index e6473ed393..6ac5766b2c 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -225,7 +225,7 @@ async fn execute( let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); - let add = if candidates.parition_scan { + let add = if candidates.partition_scan { Vec::new() } else { let write_start = Instant::now(); diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index ab8b1acd87..7c67e66ac8 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -69,13 +69,13 @@ pub enum Expression { impl From for Expression { fn from(val: Expr) -> Self { - Expression::E(val) + Expression::DataFusion(val) } } impl From<&str> for Expression { fn from(val: &str) -> Self { - Expression::S(val.to_string()) + Expression::String(val.to_string()) } } @@ -208,8 +208,8 @@ async fn execute( let predicate = match predicate { Some(predicate) => match predicate { - Expression::E(expr) => Some(expr), - Expression::S(s) => Some(snapshot.parse_predicate_expression(s)?), + Expression::DataFusion(expr) => Some(expr), + Expression::String(s) => Some(snapshot.parse_predicate_expression(s)?), }, None => None, }; @@ -217,8 +217,8 @@ async fn execute( let mut _updates = HashMap::new(); for (key, expr) in updates { let expr = match expr { - Expression::E(e) => e, - Expression::S(s) => snapshot.parse_predicate_expression(s)?, + Expression::DataFusion(e) => e, + Expression::String(s) => snapshot.parse_predicate_expression(s)?, }; _updates.insert(key, expr); } @@ -240,6 +240,11 @@ async fn execute( ) .await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_micros(); + + if candidates.candidates.is_empty() { + return Ok(((Vec::new(), version), metrics)); + } + let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); let execution_props = state.execution_props(); @@ -379,12 +384,13 @@ async fn execute( metrics.num_updated_rows = count_metrics .sum_by_name("num_updated_rows") - .unwrap() - .as_usize(); + .map(|m| m.as_usize()) + .unwrap_or(0); + metrics.num_copied_rows = count_metrics .sum_by_name("num_copied_rows") - .unwrap() - .as_usize(); + .map(|m| m.as_usize()) + .unwrap_or(0); let deletion_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -409,20 +415,17 @@ async fn execute( metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_micros(); - // Do not make a commit when there are zero updates to the state - if !actions.is_empty() { - let operation = DeltaOperation::Update { - predicate: Some(predicate.canonical_name()), - }; - version = commit( - object_store.as_ref(), - &actions, - operation, - snapshot, - app_metadata, - ) - .await?; - } + let operation = DeltaOperation::Update { + predicate: Some(predicate.canonical_name()), + }; + version = commit( + object_store.as_ref(), + &actions, + operation, + snapshot, + app_metadata, + ) + .await?; Ok(((actions, version), metrics)) } @@ -851,8 +854,8 @@ mod tests { let table = prepare_values_table().await; let (table, metrics) = DeltaOps(table) .update() - .with_predicate(col("value").is_null()) - .with_update("value", lit(10)) + .with_predicate("value is null") + .with_update("value", "10") .await .unwrap(); assert_eq!(table.version(), 1); @@ -877,11 +880,9 @@ mod tests { assert_batches_sorted_eq!(&expected, &actual); } - #[tokio::test] - async fn test_str_expressions() {} - #[tokio::test] async fn test_no_updates() { + // No Update operations are provided let table = prepare_values_table().await; let (table, metrics) = DeltaOps(table).update().await.unwrap(); @@ -892,6 +893,20 @@ mod tests { assert_eq!(metrics.num_removed_files, 0); assert_eq!(metrics.scan_time_ms, 0); assert_eq!(metrics.execution_time_ms, 0); + + // The predicate does not match any records + let (table, metrics) = DeltaOps(table) + .update() + .with_predicate(col("value").eq(lit(3))) + .with_update("value", lit(10)) + .await + .unwrap(); + + assert_eq!(table.version(), 0); + assert_eq!(metrics.num_added_files, 0); + assert_eq!(metrics.num_removed_files, 0); + assert_eq!(metrics.num_copied_rows, 0); + assert_eq!(metrics.num_removed_files, 0); } #[tokio::test] From 65a26cd00f2ccc9880f6ee504c9c1ad78743a920 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Wed, 7 Jun 2023 19:15:46 -0400 Subject: [PATCH 15/26] Update rust/src/operations/update.rs Co-authored-by: Robert Pack <42610831+roeap@users.noreply.github.com> --- rust/src/operations/update.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index 7c67e66ac8..bbc09a2c96 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -188,15 +188,15 @@ async fn execute( app_metadata: Option>, cast_options: CastOptions, ) -> DeltaResult<((Vec, i64), UpdateMetrics)> { - // Validate the predicate and update expressions + // Validate the predicate and update expressions. // - // If the predicate is not set then all files needs to be updated. - // else if only contains partitions columns then perform in memory-scan - // otherwise scan files for records that statisfy the predicate + // If the predicate is not set, then all files need to be updated. + // If it only contains partition columns then perform in memory-scan. + // Otherwise, scan files for records that satisfy the predicate. // - // For files that were identified, scan for record that match the predicate - // and perform update operations, and then commit add and remove actions to - // the log + // For files that were identified, scan for records that match the predicate, + // perform update operations, and then commit add and remove actions to + // the log. let exec_start = Instant::now(); let mut metrics = UpdateMetrics::default(); From ffb3631c0f1020fc08a67ebebf527feda433bf16 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Wed, 7 Jun 2023 19:16:04 -0400 Subject: [PATCH 16/26] Update rust/src/operations/update.rs Co-authored-by: Robert Pack <42610831+roeap@users.noreply.github.com> --- rust/src/operations/update.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index bbc09a2c96..fd3ba52816 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -214,15 +214,16 @@ async fn execute( None => None, }; - let mut _updates = HashMap::new(); - for (key, expr) in updates { - let expr = match expr { - Expression::DataFusion(e) => e, - Expression::String(s) => snapshot.parse_predicate_expression(s)?, - }; - _updates.insert(key, expr); - } - let updates = _updates; + let updates = updates + .into_iter() + .map(|(key, expr)| { + let expr = match expr { + Expression::DataFusion(e) => e, + Expression::String(s) => snapshot.parse_predicate_expression(s)?, + }; + Ok((key, expr)) + }) + .collect::>, _>()?; let current_metadata = snapshot .current_metadata() From 540c43bbcd07f0c014ffafb98f940b39d83c6e9a Mon Sep 17 00:00:00 2001 From: David Blajda Date: Wed, 7 Jun 2023 19:50:54 -0400 Subject: [PATCH 17/26] cleanup unwraps --- rust/src/delta_datafusion.rs | 22 +++++++++----------- rust/src/operations/update.rs | 38 +++++++++++++++++++---------------- 2 files changed, 31 insertions(+), 29 deletions(-) diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 221909fda6..01c17a2fc9 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -1168,22 +1168,20 @@ pub(crate) async fn find_files_scan<'a>( } let array = batch .column_by_name(PATH_COLUMN) + .ok_or_else(|| { + DeltaTableError::Generic(format!("Unable to find column {}", PATH_COLUMN)) + }) .unwrap() .as_any() .downcast_ref::() - .ok_or(DeltaTableError::Generic(format!( - "Unable to downcast column {}", - PATH_COLUMN - )))?; + .ok_or_else(|| { + DeltaTableError::Generic(format!("Unable to downcast column {}", PATH_COLUMN)) + })?; - let path = array - .into_iter() - .next() - .unwrap() - .ok_or(DeltaTableError::Generic(format!( - "{} cannot be null", - PATH_COLUMN - )))?; + let path = + array.into_iter().next().flatten().ok_or_else(|| { + DeltaTableError::Generic(format!("{} cannot be null", PATH_COLUMN)) + })?; match candidate_map.remove(path) { Some(action) => files.push(action), diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index fd3ba52816..387cb37296 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -78,6 +78,11 @@ impl From<&str> for Expression { Expression::String(val.to_string()) } } +impl From for Expression { + fn from(val: String) -> Self { + Expression::String(val) + } +} /// Updates records in the Delta Table. /// See this module's documentation for more information @@ -188,15 +193,15 @@ async fn execute( app_metadata: Option>, cast_options: CastOptions, ) -> DeltaResult<((Vec, i64), UpdateMetrics)> { - // Validate the predicate and update expressions. + // Validate the predicate and update expressions // - // If the predicate is not set, then all files need to be updated. - // If it only contains partition columns then perform in memory-scan. - // Otherwise, scan files for records that satisfy the predicate. + // If the predicate is not set then all files needs to be updated. + // else if only contains partitions columns then perform in memory-scan + // otherwise scan files for records that statisfy the predicate // - // For files that were identified, scan for records that match the predicate, - // perform update operations, and then commit add and remove actions to - // the log. + // For files that were identified, scan for record that match the predicate + // and perform update operations, and then commit add and remove actions to + // the log let exec_start = Instant::now(); let mut metrics = UpdateMetrics::default(); @@ -214,16 +219,15 @@ async fn execute( None => None, }; - let updates = updates - .into_iter() - .map(|(key, expr)| { - let expr = match expr { - Expression::DataFusion(e) => e, - Expression::String(s) => snapshot.parse_predicate_expression(s)?, - }; - Ok((key, expr)) - }) - .collect::>, _>()?; + let mut _updates = HashMap::new(); + for (key, expr) in updates { + let expr = match expr { + Expression::DataFusion(e) => e, + Expression::String(s) => snapshot.parse_predicate_expression(s)?, + }; + _updates.insert(key, expr); + } + let updates = _updates; let current_metadata = snapshot .current_metadata() From a001be0f8a9f1da403579a1e4dd3509c1f7e0e58 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Wed, 7 Jun 2023 19:55:39 -0400 Subject: [PATCH 18/26] update comment --- rust/src/operations/update.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index 387cb37296..a0654c43bc 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -193,15 +193,15 @@ async fn execute( app_metadata: Option>, cast_options: CastOptions, ) -> DeltaResult<((Vec, i64), UpdateMetrics)> { - // Validate the predicate and update expressions + // Validate the predicate and update expressions. // - // If the predicate is not set then all files needs to be updated. - // else if only contains partitions columns then perform in memory-scan - // otherwise scan files for records that statisfy the predicate + // If the predicate is not set, then all files need to be updated. + // If it only contains partition columns then perform in memory-scan. + // Otherwise, scan files for records that satisfy the predicate. // - // For files that were identified, scan for record that match the predicate - // and perform update operations, and then commit add and remove actions to - // the log + // For files that were identified, scan for records that match the predicate, + // perform update operations, and then commit add and remove actions to + // the log. let exec_start = Instant::now(); let mut metrics = UpdateMetrics::default(); From eb0d86735199d68e7b04945da9f9baa6b224568b Mon Sep 17 00:00:00 2001 From: David Blajda Date: Wed, 7 Jun 2023 20:21:20 -0400 Subject: [PATCH 19/26] update iter --- rust/src/operations/update.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index a0654c43bc..8a46e1e989 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -219,15 +219,13 @@ async fn execute( None => None, }; - let mut _updates = HashMap::new(); - for (key, expr) in updates { - let expr = match expr { - Expression::DataFusion(e) => e, - Expression::String(s) => snapshot.parse_predicate_expression(s)?, - }; - _updates.insert(key, expr); - } - let updates = _updates; + let updates: HashMap = updates + .into_iter() + .map(|(key, expr)| match expr { + Expression::DataFusion(e) => Ok((key, e)), + Expression::String(s) => snapshot.parse_predicate_expression(s).map(|e| (key, e)), + }) + .collect::, _>>()?; let current_metadata = snapshot .current_metadata() From fc30ae41c22505384fc098d4ed95af90e798a376 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Wed, 7 Jun 2023 20:35:38 -0400 Subject: [PATCH 20/26] update cast_options docs --- rust/src/operations/update.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index 8a46e1e989..29ba685994 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -175,7 +175,15 @@ impl UpdateBuilder { self } - /// Specify the cast options to use when casting columns that do not match the table's schema. + /// Specify the cast options to use when casting columns that do not match + /// the table's schema. When `cast_options.safe` is set to safe then any + /// failures to cast a datatype will use null instead of returning an error + /// to the user. + /// + /// Example (column's type is int): + /// Input Output + /// 123 -> 123 + /// Test123 -> null pub fn with_cast_options(mut self, cast_options: CastOptions) -> Self { self.cast_options = cast_options; self From 0f74bc731c51935788cfb73195ee44408093ba3a Mon Sep 17 00:00:00 2001 From: David Blajda Date: Thu, 8 Jun 2023 19:31:12 -0400 Subject: [PATCH 21/26] cleanup unwraps --- rust/src/delta_datafusion.rs | 114 +++++++++++++++-------------------- 1 file changed, 49 insertions(+), 65 deletions(-) diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 01c17a2fc9..8a6ca27e4d 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -1085,6 +1085,45 @@ pub(crate) struct FindFiles { pub partition_scan: bool, } +fn join_batches_with_add_actions( + batches: Vec, + mut actions: HashMap, +) -> DeltaResult> { + // Given RecordBatches that contains `__delta_rs_path` perform a hash join + // with actions to obtain original add actions + + let mut files = Vec::new(); + for batch in batches { + let array = batch + .column_by_name(PATH_COLUMN) + .ok_or_else(|| { + DeltaTableError::Generic(format!("Unable to find column {}", PATH_COLUMN)) + })? + .as_any() + .downcast_ref::() + .ok_or(DeltaTableError::Generic(format!( + "Unable to downcast column {}", + PATH_COLUMN + )))?; + for path in array { + let path = path.ok_or(DeltaTableError::Generic(format!( + "{} cannot be null", + PATH_COLUMN + )))?; + + match actions.remove(path) { + Some(action) => files.push(action), + None => { + return Err(DeltaTableError::Generic( + "Unable to map __delta_rs_path to action.".to_owned(), + )) + } + } + } + } + Ok(files) +} + /// Determine which files contain a record that statisfies the predicate pub(crate) async fn find_files_scan<'a>( snapshot: &DeltaTableState, @@ -1094,9 +1133,8 @@ pub(crate) async fn find_files_scan<'a>( candidates: Vec<&'a Add>, state: &SessionState, expression: &Expr, -) -> DeltaResult> { - let mut files = Vec::new(); - let mut candidate_map: HashMap = HashMap::new(); +) -> DeltaResult> { + let mut candidate_map: HashMap = HashMap::new(); let table_partition_cols = snapshot .current_metadata() @@ -1115,7 +1153,7 @@ pub(crate) async fn find_files_scan<'a>( .or_default() .push(part); - candidate_map.insert(action.path.to_owned(), action); + candidate_map.insert(action.path.to_owned(), action.to_owned()); } let mut table_partition_cols = table_partition_cols @@ -1160,40 +1198,7 @@ pub(crate) async fn find_files_scan<'a>( let task_ctx = Arc::new(TaskContext::from(state)); let path_batches = datafusion::physical_plan::collect(limit, task_ctx).await?; - for batch in path_batches { - if batch.num_rows() > 1 { - return Err(DeltaTableError::Generic( - "Find files returned multiple records for batch".to_owned(), - )); - } - let array = batch - .column_by_name(PATH_COLUMN) - .ok_or_else(|| { - DeltaTableError::Generic(format!("Unable to find column {}", PATH_COLUMN)) - }) - .unwrap() - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DeltaTableError::Generic(format!("Unable to downcast column {}", PATH_COLUMN)) - })?; - - let path = - array.into_iter().next().flatten().ok_or_else(|| { - DeltaTableError::Generic(format!("{} cannot be null", PATH_COLUMN)) - })?; - - match candidate_map.remove(path) { - Some(action) => files.push(action), - None => { - return Err(DeltaTableError::Generic( - "Unable to map __delta_rs_path to action.".to_owned(), - )) - } - } - } - - Ok(files) + join_batches_with_add_actions(path_batches, candidate_map) } pub(crate) async fn scan_memory_table( @@ -1242,33 +1247,12 @@ pub(crate) async fn scan_memory_table( .select(vec![col(PATH_COLUMN)])?; let batches = df.collect().await?; - let mut map = HashMap::new(); - for action in actions { - map.insert(action.path.clone(), action); - } - let mut files = Vec::new(); + let map = actions + .into_iter() + .map(|action| (action.path.clone(), action)) + .collect::>(); - for batch in batches { - let array = batch - .column_by_name(PATH_COLUMN) - .unwrap() - .as_any() - .downcast_ref::() - .ok_or(DeltaTableError::Generic(format!( - "Unable to downcast column {}", - PATH_COLUMN - )))?; - for path in array { - let path = path.ok_or(DeltaTableError::Generic(format!( - "{} cannot be null", - PATH_COLUMN - )))?; - let value = map.remove(path).unwrap(); - files.push(value); - } - } - - Ok(files) + join_batches_with_add_actions(batches, map) } pub(crate) async fn find_files<'a>( @@ -1342,7 +1326,7 @@ pub(crate) async fn find_files<'a>( .await?; Ok(FindFiles { - candidates: candidates.into_iter().map(|s| s.to_owned()).collect(), + candidates, partition_scan: false, }) } From 2f45a06d4f2e6f719a244646756a61e234f92ee6 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sun, 11 Jun 2023 21:30:39 -0400 Subject: [PATCH 22/26] Update rust/src/delta_datafusion.rs Co-authored-by: Will Jones --- rust/src/delta_datafusion.rs | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 8a6ca27e4d..0f4b1e90f0 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -1164,14 +1164,16 @@ pub(crate) async fn find_files_scan<'a>( table_partition_cols.push((PATH_COLUMN.to_owned(), DataType::Utf8)); let input_schema = snapshot.input_schema()?; - let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; - - let predicate_expr = create_physical_expr( - &Expr::IsTrue(Box::new(expression.clone())), - &input_dfschema, - &input_schema, - state.execution_props(), - )?; + // Identify which columns we need to project + let mut used_columns = expression + .to_columns() + ? + .into_iter() + .map(|column| input_schema.index_of(&column.name)) + .collect::, ArrowError>>() + .unwrap(); + // Add path column + used_columns.push(input_schema.fields().len()); let parquet_scan = ParquetFormat::new() .create_physical_plan( @@ -1181,7 +1183,7 @@ pub(crate) async fn find_files_scan<'a>( file_schema, file_groups: file_groups.into_values().collect(), statistics: snapshot.datafusion_table_statistics(), - projection: None, + projection: Some(used_columns), limit: None, table_partition_cols, infinite_source: false, @@ -1191,6 +1193,15 @@ pub(crate) async fn find_files_scan<'a>( ) .await?; + let input_schema = parquet_scan.schema(); + let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; + let predicate_expr = create_physical_expr( + &Expr::IsTrue(Box::new(expression.clone())), + &input_dfschema, + &input_schema, + state.execution_props(), + )?; + let filter: Arc = Arc::new(FilterExec::try_new(predicate_expr, parquet_scan.clone())?); let limit: Arc = Arc::new(LocalLimitExec::new(filter, 1)); From bc4377a7629f889effd4d60767aedf755bbb628d Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sun, 11 Jun 2023 23:39:08 -0400 Subject: [PATCH 23/26] :WIP: resolve schema issues without projection --- rust/src/delta_datafusion.rs | 27 ++++++++++-- rust/src/operations/update.rs | 80 ++++++++++++++++++++++++++++++++--- 2 files changed, 98 insertions(+), 9 deletions(-) diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 0f4b1e90f0..0ce2f8463f 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -1164,16 +1164,29 @@ pub(crate) async fn find_files_scan<'a>( table_partition_cols.push((PATH_COLUMN.to_owned(), DataType::Utf8)); let input_schema = snapshot.input_schema()?; + + let mut fields = Vec::new(); + for field in input_schema.fields.iter() { + fields.push(field.to_owned()); + } + fields.push(Arc::new(Field::new( + PATH_COLUMN, + arrow_schema::DataType::Boolean, + true, + ))); + let input_schema = Arc::new(ArrowSchema::new(fields)); + // Identify which columns we need to project + /* let mut used_columns = expression - .to_columns() - ? + .to_columns()? .into_iter() .map(|column| input_schema.index_of(&column.name)) .collect::, ArrowError>>() .unwrap(); // Add path column used_columns.push(input_schema.fields().len()); + */ let parquet_scan = ParquetFormat::new() .create_physical_plan( @@ -1183,7 +1196,7 @@ pub(crate) async fn find_files_scan<'a>( file_schema, file_groups: file_groups.into_values().collect(), statistics: snapshot.datafusion_table_statistics(), - projection: Some(used_columns), + projection: None, limit: None, table_partition_cols, infinite_source: false, @@ -1193,14 +1206,20 @@ pub(crate) async fn find_files_scan<'a>( ) .await?; - let input_schema = parquet_scan.schema(); + // Issue here... let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; + println!("scan 001"); + println!("{:?}", &expression); + println!("{:?}", &input_schema); + println!("{:?}", &input_dfschema); + let predicate_expr = create_physical_expr( &Expr::IsTrue(Box::new(expression.clone())), &input_dfschema, &input_schema, state.execution_props(), )?; + println!("scan 002"); let filter: Arc = Arc::new(FilterExec::try_new(predicate_expr, parquet_scan.clone())?); diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index 29ba685994..dc46a5610c 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -201,6 +201,7 @@ async fn execute( app_metadata: Option>, cast_options: CastOptions, ) -> DeltaResult<((Vec, i64), UpdateMetrics)> { + println!("here 200"); // Validate the predicate and update expressions. // // If the predicate is not set, then all files need to be updated. @@ -241,6 +242,7 @@ async fn execute( let table_partition_cols = current_metadata.partition_columns.clone(); let schema = snapshot.arrow_schema()?; + println!("here 201"); let scan_start = Instant::now(); let candidates = find_files( snapshot, @@ -250,7 +252,8 @@ async fn execute( predicate.clone(), ) .await?; - metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_micros(); + println!("here 202"); + metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis(); if candidates.candidates.is_empty() { return Ok(((Vec::new(), version), metrics)); @@ -261,6 +264,7 @@ async fn execute( let execution_props = state.execution_props(); // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches + println!("here100"); let parquet_scan = parquet_scan_from_actions( snapshot, object_store.clone(), @@ -274,7 +278,9 @@ async fn execute( .await?; // Create a projection for a new column with the predicate evaluated + //let df_schema = snapshot.input_schema()?; let input_schema = snapshot.input_schema()?; + let mut fields = Vec::new(); for field in input_schema.fields.iter() { fields.push(field.to_owned()); @@ -284,10 +290,21 @@ async fn execute( arrow_schema::DataType::Boolean, true, ))); - // Recreate the schemas with the new column included let input_schema = Arc::new(ArrowSchema::new(fields)); - let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; + let input_dfschema: DFSchema = input_schema.as_ref().clone().try_into()?; + + /* + let mut fields = Vec::new(); + for field in df_schema.fields.iter() { + fields.push(field.to_owned()); + } + fields.push(Arc::new(Field::new( + "__delta_rs_update_predicate", + arrow_schema::DataType::Boolean, + true, + ))); + */ let mut expressions: Vec<(Arc, String)> = Vec::new(); let scan_schema = parquet_scan.schema(); @@ -302,6 +319,10 @@ async fn execute( // null count to track how many records do NOT statisfy the predicate. The // count is then exposed through the metrics through the `UpdateCountExec` // execution plan + println!("{:?}", &input_dfschema); + println!("{:?}", &input_schema); + println!("here1"); + let predicate_null = when(predicate.clone(), lit(true)).otherwise(lit(ScalarValue::Boolean(None)))?; let predicate_expr = create_physical_expr( @@ -311,9 +332,11 @@ async fn execute( execution_props, )?; expressions.push((predicate_expr, "__delta_rs_update_predicate".to_string())); + println!("here2"); let projection_predicate: Arc = Arc::new(ProjectionExec::try_new(expressions, parquet_scan)?); + println!("here3"); let count_plan = Arc::new(UpdateCountExec::new(projection_predicate.clone())); @@ -335,6 +358,7 @@ async fn execute( let mut map = HashMap::::new(); let mut control_columns = HashSet::::new(); control_columns.insert("__delta_rs_update_predicate".to_owned()); + println!("here4"); for (column, expr) in updates { let expr = case(col("__delta_rs_update_predicate")) @@ -350,6 +374,7 @@ async fn execute( let projection_update: Arc = Arc::new(ProjectionExec::try_new(expressions, count_plan.clone())?); + println!("here5"); // Project again to remove __delta_rs columns and rename update columns to their original name let mut expressions: Vec<(Arc, String)> = Vec::new(); @@ -377,6 +402,7 @@ async fn execute( expressions, projection_update.clone(), )?); + println!("here6"); let add_actions = write_execution_plan( snapshot, @@ -390,6 +416,7 @@ async fn execute( &cast_options, ) .await?; + println!("here7"); let count_metrics = count_plan.metrics().unwrap(); @@ -423,8 +450,9 @@ async fn execute( tags: None, })) } + println!("here9"); - metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_micros(); + metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_millis(); let operation = DeltaOperation::Update { predicate: Some(predicate.canonical_name()), @@ -713,6 +741,9 @@ mod tests { ) .unwrap(); + // Update a partitioned table where the predicate contains only partition column + // The expectation is that a physical scan of data is not required + let table = write_batch(table, batch).await.unwrap(); assert_eq!(table.version(), 1); assert_eq!(table.get_file_uris().count(), 1); @@ -765,7 +796,7 @@ mod tests { ) .unwrap(); - let table = write_batch(table, batch).await.unwrap(); + let table = write_batch(table, batch.clone()).await.unwrap(); assert_eq!(table.version(), 1); assert_eq!(table.get_file_uris().count(), 2); @@ -797,6 +828,45 @@ mod tests { let actual = get_data(&table).await; assert_batches_sorted_eq!(&expected, &actual); + + // Update a partitioned table where the predicate contains a partition column and non-partition column + let table = setup_table(Some(vec!["modified"])).await; + let table = write_batch(table, batch).await.unwrap(); + assert_eq!(table.version(), 1); + assert_eq!(table.get_file_uris().count(), 2); + + let (table, metrics) = DeltaOps(table) + .update() + .with_predicate( + col("modified") + .eq(lit("2021-02-03")) + .and(col("value").eq(lit(100))), + ) + .with_update("modified", lit("2023-05-14")) + .with_update("id", lit("C")) + .await + .unwrap(); + + assert_eq!(table.version(), 2); + assert_eq!(table.get_file_uris().count(), 3); + assert_eq!(metrics.num_added_files, 2); + assert_eq!(metrics.num_removed_files, 1); + assert_eq!(metrics.num_updated_rows, 1); + assert_eq!(metrics.num_copied_rows, 1); + + let expected = vec![ + "+----+-------+------------+", + "| id | value | modified |", + "+----+-------+------------+", + "| A | 1 | 2021-02-02 |", + "| A | 10 | 2021-02-03 |", + "| B | 10 | 2021-02-02 |", + "| C | 100 | 2023-05-14 |", + "+----+-------+------------+", + ]; + + let actual = get_data(&table).await; + assert_batches_sorted_eq!(&expected, &actual); } #[tokio::test] From 80b27cf587d0b7c33b6353ca9b52197344191d18 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Mon, 12 Jun 2023 21:26:53 -0400 Subject: [PATCH 24/26] resolve schema issues with projection --- rust/src/delta_datafusion.rs | 22 ++++++++++------------ rust/src/operations/update.rs | 14 -------------- 2 files changed, 10 insertions(+), 26 deletions(-) diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 0ce2f8463f..87a1a9b25d 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -1177,7 +1177,6 @@ pub(crate) async fn find_files_scan<'a>( let input_schema = Arc::new(ArrowSchema::new(fields)); // Identify which columns we need to project - /* let mut used_columns = expression .to_columns()? .into_iter() @@ -1185,8 +1184,15 @@ pub(crate) async fn find_files_scan<'a>( .collect::, ArrowError>>() .unwrap(); // Add path column - used_columns.push(input_schema.fields().len()); - */ + used_columns.push(input_schema.index_of(PATH_COLUMN)?); + + // Project the logical schema so column indicies align between the parquet scan and the expression + let mut fields = vec![]; + for idx in &used_columns { + fields.push(input_schema.field(*idx).to_owned()); + } + let input_schema = Arc::new(ArrowSchema::new(fields)); + let input_dfschema = input_schema.as_ref().clone().try_into()?; let parquet_scan = ParquetFormat::new() .create_physical_plan( @@ -1196,7 +1202,7 @@ pub(crate) async fn find_files_scan<'a>( file_schema, file_groups: file_groups.into_values().collect(), statistics: snapshot.datafusion_table_statistics(), - projection: None, + projection: Some(used_columns), limit: None, table_partition_cols, infinite_source: false, @@ -1206,20 +1212,12 @@ pub(crate) async fn find_files_scan<'a>( ) .await?; - // Issue here... - let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; - println!("scan 001"); - println!("{:?}", &expression); - println!("{:?}", &input_schema); - println!("{:?}", &input_dfschema); - let predicate_expr = create_physical_expr( &Expr::IsTrue(Box::new(expression.clone())), &input_dfschema, &input_schema, state.execution_props(), )?; - println!("scan 002"); let filter: Arc = Arc::new(FilterExec::try_new(predicate_expr, parquet_scan.clone())?); diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index dc46a5610c..4957a90f42 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -201,7 +201,6 @@ async fn execute( app_metadata: Option>, cast_options: CastOptions, ) -> DeltaResult<((Vec, i64), UpdateMetrics)> { - println!("here 200"); // Validate the predicate and update expressions. // // If the predicate is not set, then all files need to be updated. @@ -242,7 +241,6 @@ async fn execute( let table_partition_cols = current_metadata.partition_columns.clone(); let schema = snapshot.arrow_schema()?; - println!("here 201"); let scan_start = Instant::now(); let candidates = find_files( snapshot, @@ -252,7 +250,6 @@ async fn execute( predicate.clone(), ) .await?; - println!("here 202"); metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis(); if candidates.candidates.is_empty() { @@ -264,7 +261,6 @@ async fn execute( let execution_props = state.execution_props(); // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches - println!("here100"); let parquet_scan = parquet_scan_from_actions( snapshot, object_store.clone(), @@ -319,9 +315,6 @@ async fn execute( // null count to track how many records do NOT statisfy the predicate. The // count is then exposed through the metrics through the `UpdateCountExec` // execution plan - println!("{:?}", &input_dfschema); - println!("{:?}", &input_schema); - println!("here1"); let predicate_null = when(predicate.clone(), lit(true)).otherwise(lit(ScalarValue::Boolean(None)))?; @@ -332,11 +325,9 @@ async fn execute( execution_props, )?; expressions.push((predicate_expr, "__delta_rs_update_predicate".to_string())); - println!("here2"); let projection_predicate: Arc = Arc::new(ProjectionExec::try_new(expressions, parquet_scan)?); - println!("here3"); let count_plan = Arc::new(UpdateCountExec::new(projection_predicate.clone())); @@ -358,7 +349,6 @@ async fn execute( let mut map = HashMap::::new(); let mut control_columns = HashSet::::new(); control_columns.insert("__delta_rs_update_predicate".to_owned()); - println!("here4"); for (column, expr) in updates { let expr = case(col("__delta_rs_update_predicate")) @@ -374,7 +364,6 @@ async fn execute( let projection_update: Arc = Arc::new(ProjectionExec::try_new(expressions, count_plan.clone())?); - println!("here5"); // Project again to remove __delta_rs columns and rename update columns to their original name let mut expressions: Vec<(Arc, String)> = Vec::new(); @@ -402,7 +391,6 @@ async fn execute( expressions, projection_update.clone(), )?); - println!("here6"); let add_actions = write_execution_plan( snapshot, @@ -416,7 +404,6 @@ async fn execute( &cast_options, ) .await?; - println!("here7"); let count_metrics = count_plan.metrics().unwrap(); @@ -450,7 +437,6 @@ async fn execute( tags: None, })) } - println!("here9"); metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_millis(); From 73137cc3d303bb021772fc980206f8d855065caa Mon Sep 17 00:00:00 2001 From: David Blajda Date: Mon, 12 Jun 2023 21:53:20 -0400 Subject: [PATCH 25/26] implement comments --- rust/src/delta_datafusion.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 492fce5fee..479269c2cc 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -1098,7 +1098,7 @@ fn join_batches_with_add_actions( // Given RecordBatches that contains `__delta_rs_path` perform a hash join // with actions to obtain original add actions - let mut files = Vec::new(); + let mut files = Vec::with_capacity(batches.iter().map(|batch| batch.num_rows()).sum()); for batch in batches { let array = batch .column_by_name(PATH_COLUMN) @@ -1212,7 +1212,7 @@ pub(crate) async fn find_files_scan<'a>( limit: None, table_partition_cols, infinite_source: false, - output_ordering: None, + output_ordering: vec![], }, None, ) From 111c4637fe6820187e386ef0d3800d24e6f1f2d8 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Tue, 13 Jun 2023 20:26:35 -0400 Subject: [PATCH 26/26] store time metrics in u64 --- rust/src/operations/update.rs | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index 82d8fef622..34dbf27113 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -117,9 +117,9 @@ pub struct UpdateMetrics { /// Number of rows just copied over in the process of updating files. pub num_copied_rows: usize, /// Time taken to execute the entire operation. - pub execution_time_ms: u128, + pub execution_time_ms: u64, /// Time taken to scan the files for matches. - pub scan_time_ms: u128, + pub scan_time_ms: u64, } impl UpdateBuilder { @@ -249,7 +249,7 @@ async fn execute( predicate.clone(), ) .await?; - metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis(); + metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis() as u64; if candidates.candidates.is_empty() { return Ok(((Vec::new(), version), metrics)); @@ -273,7 +273,6 @@ async fn execute( .await?; // Create a projection for a new column with the predicate evaluated - //let df_schema = snapshot.input_schema()?; let input_schema = snapshot.input_schema()?; let mut fields = Vec::new(); @@ -289,18 +288,6 @@ async fn execute( let input_schema = Arc::new(ArrowSchema::new(fields)); let input_dfschema: DFSchema = input_schema.as_ref().clone().try_into()?; - /* - let mut fields = Vec::new(); - for field in df_schema.fields.iter() { - fields.push(field.to_owned()); - } - fields.push(Arc::new(Field::new( - "__delta_rs_update_predicate", - arrow_schema::DataType::Boolean, - true, - ))); - */ - let mut expressions: Vec<(Arc, String)> = Vec::new(); let scan_schema = parquet_scan.schema(); for (i, field) in scan_schema.fields().into_iter().enumerate() { @@ -437,7 +424,7 @@ async fn execute( })) } - metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_millis(); + metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_millis() as u64; let operation = DeltaOperation::Update { predicate: Some(predicate.canonical_name()),