diff --git a/rust/src/operations/write.rs b/rust/src/operations/write.rs index b512513c8d..e5d03e6492 100644 --- a/rust/src/operations/write.rs +++ b/rust/src/operations/write.rs @@ -27,6 +27,7 @@ use datafusion::physical_plan::{memory::MemoryExec, ExecutionPlan}; use futures::future::BoxFuture; use futures::StreamExt; use parquet::file::properties::WriterProperties; +use serde_json::Map; use super::writer::{DeltaWriter, WriterConfig}; use super::MAX_SUPPORTED_WRITER_VERSION; @@ -101,6 +102,8 @@ pub struct WriteBuilder { safe_cast: bool, /// Parquet writer properties writer_properties: Option, + /// Additional metadata to be added to commit + app_metadata: Option>, } impl WriteBuilder { @@ -119,6 +122,7 @@ impl WriteBuilder { batches: None, safe_cast: false, writer_properties: None, + app_metadata: None, } } @@ -187,6 +191,15 @@ impl WriteBuilder { self } + /// Additional metadata to be added to commit info + pub fn with_metadata( + mut self, + metadata: impl IntoIterator, + ) -> Self { + self.app_metadata = Some(Map::from_iter(metadata)); + self + } + async fn check_preconditions(&self) -> DeltaResult> { match self.store.is_delta_table_location().await? { true => { @@ -455,8 +468,7 @@ impl std::future::IntoFuture for WriteBuilder { predicate: this.predicate, }, &this.snapshot, - // TODO pass through metadata - None, + this.app_metadata, ) .await?; @@ -550,7 +562,7 @@ mod tests { use arrow_array::{Int32Array, StringArray, TimestampMicrosecondArray}; use arrow_schema::{DataType, TimeUnit}; use datafusion::assert_batches_sorted_eq; - use serde_json::json; + use serde_json::{json, Value}; #[tokio::test] async fn test_create_write() { @@ -563,33 +575,73 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); + assert_eq!(table.state.commit_infos().len(), 1); // write some data - let table = DeltaOps(table) + let metadata = Map::from_iter(vec![("k1".to_string(), json!("v1.1"))]); + let mut table = DeltaOps(table) .write(vec![batch.clone()]) .with_save_mode(SaveMode::Append) + .with_metadata(metadata.clone()) .await .unwrap(); assert_eq!(table.version(), 1); assert_eq!(table.get_file_uris().count(), 1); + table.load().await.unwrap(); + assert_eq!(table.state.commit_infos().len(), 2); + assert_eq!( + table.state.commit_infos()[1] + .info + .clone() + .into_iter() + .filter(|(k, _)| k != "clientVersion") + .collect::>(), + metadata + ); // append some data - let table = DeltaOps(table) + let metadata: Map = Map::from_iter(vec![("k1".to_string(), json!("v1.2"))]); + let mut table = DeltaOps(table) .write(vec![batch.clone()]) .with_save_mode(SaveMode::Append) + .with_metadata(metadata.clone()) .await .unwrap(); assert_eq!(table.version(), 2); assert_eq!(table.get_file_uris().count(), 2); + table.load().await.unwrap(); + assert_eq!(table.state.commit_infos().len(), 3); + assert_eq!( + table.state.commit_infos()[2] + .info + .clone() + .into_iter() + .filter(|(k, _)| k != "clientVersion") + .collect::>(), + metadata + ); // overwrite table - let table = DeltaOps(table) + let metadata: Map = Map::from_iter(vec![("k2".to_string(), json!("v2.1"))]); + let mut table = DeltaOps(table) .write(vec![batch]) .with_save_mode(SaveMode::Overwrite) + .with_metadata(metadata.clone()) .await .unwrap(); assert_eq!(table.version(), 3); - assert_eq!(table.get_file_uris().count(), 1) + assert_eq!(table.get_file_uris().count(), 1); + table.load().await.unwrap(); + assert_eq!(table.state.commit_infos().len(), 4); + assert_eq!( + table.state.commit_infos()[3] + .info + .clone() + .into_iter() + .filter(|(k, _)| k != "clientVersion") + .collect::>(), + metadata + ); } #[tokio::test]