diff --git a/python/Cargo.toml b/python/Cargo.toml index 3b86b1d663..f7c1542211 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -18,7 +18,7 @@ doc = false name = "deltalake._internal" [dependencies] -arrow-schema = { version = "36", features = ["serde"] } +arrow-schema = { version = "39", features = ["serde"] } chrono = "0" env_logger = "0" futures = "0.3" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index fbd7f0df6e..a92c0d743e 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -13,10 +13,10 @@ readme = "README.md" edition = "2021" [dependencies] -arrow = { version = "38", optional = true } -arrow-array = { version = "38", optional = true } -arrow-cast = { version = "38", optional = true } -arrow-schema = { version = "38", optional = true } +arrow = { version = "39", optional = true } +arrow-array = { version = "39", optional = true } +arrow-cast = { version = "39", optional = true } +arrow-schema = { version = "39", optional = true } async-trait = "0.1" bytes = "1" chrono = { version = "0.4.22", default-features = false, features = ["clock"] } @@ -36,7 +36,7 @@ num-traits = "0.2.15" object_store = "0.5.6" once_cell = "1.16.0" parking_lot = "0.12" -parquet = { version = "38", features = [ +parquet = { version = "39", features = [ "async", "object_store", ], optional = true } @@ -65,12 +65,12 @@ reqwest-middleware = { version = "0.2.1", optional = true } reqwest-retry = { version = "0.2.2", optional = true } # Datafusion -datafusion = { version = "24", optional = true } -datafusion-expr = { version = "24", optional = true } -datafusion-common = { version = "24", optional = true } -datafusion-proto = { version = "24", optional = true } -datafusion-sql = { version = "24", optional = true } -datafusion-physical-expr = { version = "24", optional = true } +datafusion = { version = "25", optional = true } +datafusion-expr = { version = "25", optional = true } +datafusion-common = { version = "25", optional = true } +datafusion-proto = { version = "25", optional = true } +datafusion-sql = { version = "25", optional = true } +datafusion-physical-expr = { version = "25", optional = true } sqlparser = { version = "0.33", optional = true } @@ -142,11 +142,7 @@ s3 = [ "object_store/aws", "object_store/aws_profile", ] -unity-experimental = [ - "reqwest", - "reqwest-middleware", - "reqwest-retry", -] +unity-experimental = ["reqwest", "reqwest-middleware", "reqwest-retry"] [[bench]] name = "read_checkpoint" diff --git a/rust/examples/recordbatch-writer.rs b/rust/examples/recordbatch-writer.rs index d490a2dad7..4b13b26815 100644 --- a/rust/examples/recordbatch-writer.rs +++ b/rust/examples/recordbatch-writer.rs @@ -188,15 +188,13 @@ fn convert_to_batch(table: &DeltaTable, records: &Vec) -> RecordB * Table in an existing directory that doesn't currently contain a Delta table */ async fn create_initialized_table(table_path: &Path) -> DeltaTable { - let table = DeltaOps::try_from_uri(table_path) + DeltaOps::try_from_uri(table_path) .await .unwrap() .create() .with_columns(WeatherRecord::columns()) .await - .unwrap(); - - table + .unwrap() } #[cfg(test)] diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index 83ac45ec45..2a12985ac2 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -50,12 +50,10 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::RecordBatchStream; use datafusion::prelude::Expr; use datafusion_common::scalar::ScalarValue; -use datafusion_common::tree_node::TreeNode; -use datafusion_common::tree_node::TreeNodeVisitor; -use datafusion_common::tree_node::VisitRecursion; +use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion}; use datafusion_common::DFSchema; -use datafusion_expr::col; -use datafusion_expr::Volatility; +use datafusion_expr::expr::{ScalarFunction, ScalarUDF}; +use datafusion_expr::{col, Volatility}; use futures::future::BoxFuture; use futures::stream::StreamExt; use parquet::file::properties::WriterProperties; @@ -283,7 +281,7 @@ impl TreeNodeVisitor for ExprProperties { | Expr::Case(_) | Expr::Cast(_) | Expr::TryCast(_) => (), - Expr::ScalarFunction { fun, .. } => { + Expr::ScalarFunction(ScalarFunction { fun, .. }) => { let v = fun.volatility(); if v > Volatility::Immutable { self.result = Err(DeltaTableError::Generic(format!( @@ -293,7 +291,7 @@ impl TreeNodeVisitor for ExprProperties { return Ok(VisitRecursion::Stop); } } - Expr::ScalarUDF { fun, .. } => { + Expr::ScalarUDF(ScalarUDF { fun, .. }) => { let v = fun.signature.volatility; if v > Volatility::Immutable { self.result = Err(DeltaTableError::Generic(format!( diff --git a/rust/src/table_state_arrow.rs b/rust/src/table_state_arrow.rs index 2bedfc76a8..67a37edad2 100644 --- a/rust/src/table_state_arrow.rs +++ b/rust/src/table_state_arrow.rs @@ -13,7 +13,7 @@ use arrow::array::{ }; use arrow::compute::cast; use arrow::compute::kernels::cast_utils::Parser; -use arrow::datatypes::{DataType, Date32Type, Field, TimeUnit, TimestampMicrosecondType}; +use arrow::datatypes::{DataType, Date32Type, Field, Fields, TimeUnit, TimestampMicrosecondType}; use itertools::Itertools; use std::borrow::Cow; use std::collections::{HashMap, HashSet, VecDeque}; @@ -193,14 +193,17 @@ impl DeltaTableState { let fields = partition_column_types .into_iter() .zip(metadata.partition_columns.iter()) - .map(|(datatype, name)| arrow::datatypes::Field::new(name, datatype, true)); - let field_arrays = fields - .zip(partition_columns.into_iter()) + .map(|(datatype, name)| arrow::datatypes::Field::new(name, datatype, true)) .collect::>(); - if field_arrays.is_empty() { + + if fields.is_empty() { vec![] } else { - let arr = Arc::new(arrow::array::StructArray::from(field_arrays)); + let arr = Arc::new(arrow::array::StructArray::try_new( + Fields::from(fields), + partition_columns, + None, + )?); vec![(Cow::Borrowed("partition_values"), arr)] } }; @@ -259,16 +262,13 @@ impl DeltaTableState { .map(|(key, array)| (format!("tags.{key}"), array)), )?) } else { + let (fields, arrays): (Vec<_>, Vec<_>) = arrays + .into_iter() + .map(|(key, array)| (Field::new(key, array.data_type().clone(), true), array)) + .unzip(); Ok(arrow::record_batch::RecordBatch::try_from_iter(vec![( "tags", - Arc::new(StructArray::from( - arrays - .into_iter() - .map(|(key, array)| { - (Field::new(key, array.data_type().clone(), true), array) - }) - .collect_vec(), - )) as ArrayRef, + Arc::new(StructArray::new(Fields::from(fields), arrays, None)) as ArrayRef, )])?) } } @@ -418,7 +418,7 @@ impl DeltaTableState { let combine_arrays = |sub_fields: &Vec, getter: for<'a> fn(&'a ColStats) -> &'a Option| -> Option { - let fields = sub_fields + let (fields, arrays): (Vec<_>, Vec<_>) = sub_fields .iter() .flat_map(|sub_field| { if let Some(values) = getter(sub_field) { @@ -435,11 +435,15 @@ impl DeltaTableState { None } }) - .collect::>(); + .unzip(); if fields.is_empty() { None } else { - Some(Arc::new(StructArray::from(fields))) + Some(Arc::new(StructArray::new( + Fields::from(fields), + arrays, + None, + ))) } }; diff --git a/rust/tests/add_actions_test.rs b/rust/tests/add_actions_test.rs index 3ee5c8dff1..afd60afb6d 100644 --- a/rust/tests/add_actions_test.rs +++ b/rust/tests/add_actions_test.rs @@ -3,7 +3,7 @@ use arrow::array::{self, ArrayRef, StructArray}; use arrow::compute::kernels::cast_utils::Parser; use arrow::compute::sort_to_indices; -use arrow::datatypes::{DataType, Date32Type, Field, TimestampMicrosecondType}; +use arrow::datatypes::{DataType, Date32Type, Field, Fields, TimestampMicrosecondType}; use arrow::record_batch::RecordBatch; use std::sync::Arc; @@ -54,10 +54,11 @@ async fn test_with_partitions() { expected_columns[4] = ( "partition_values", - Arc::new(array::StructArray::from(vec![( - Field::new("k", DataType::Utf8, true), - Arc::new(array::StringArray::from(vec![Some("A"), None])) as ArrayRef, - )])), + Arc::new(array::StructArray::new( + Fields::from(vec![Field::new("k", DataType::Utf8, true)]), + vec![Arc::new(array::StringArray::from(vec![Some("A"), None])) as ArrayRef], + None, + )), ); let expected = RecordBatch::try_from_iter(expected_columns).unwrap();