diff --git a/python/src/lib.rs b/python/src/lib.rs index a6d7b0b993..dc5d6bcd86 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -5,7 +5,7 @@ mod schema; mod utils; use arrow::pyarrow::PyArrowType; -use chrono::{DateTime, FixedOffset, Utc}; +use chrono::{DateTime, Duration, FixedOffset, Utc}; use deltalake::action::{ self, Action, ColumnCountStat, ColumnValueStat, DeltaOperation, SaveMode, Stats, }; @@ -13,12 +13,12 @@ use deltalake::arrow::record_batch::RecordBatch; use deltalake::arrow::{self, datatypes::Schema as ArrowSchema}; use deltalake::builder::DeltaTableBuilder; use deltalake::delta_datafusion::DeltaDataChecker; +use deltalake::operations::vacuum::VacuumBuilder; use deltalake::partitions::PartitionFilter; -use deltalake::DeltaDataTypeLong; -use deltalake::DeltaDataTypeTimestamp; -use deltalake::DeltaTableMetaData; -use deltalake::DeltaTransactionOptions; -use deltalake::{Invariant, Schema}; +use deltalake::{ + DeltaDataTypeLong, DeltaDataTypeTimestamp, DeltaTableMetaData, DeltaTransactionOptions, + Invariant, Schema, +}; use pyo3::create_exception; use pyo3::exceptions::PyException; use pyo3::exceptions::PyValueError; @@ -49,10 +49,6 @@ impl PyDeltaTableError { PyDeltaTableError::new_err(err.to_string()) } - fn from_vacuum_error(err: deltalake::vacuum::VacuumError) -> pyo3::PyErr { - PyDeltaTableError::new_err(err.to_string()) - } - fn from_tokio(err: tokio::io::Error) -> pyo3::PyErr { PyDeltaTableError::new_err(err.to_string()) } @@ -288,12 +284,17 @@ impl RawDeltaTable { retention_hours: Option, enforce_retention_duration: bool, ) -> PyResult> { - rt()? - .block_on( - self._table - .vacuum(retention_hours, dry_run, enforce_retention_duration), - ) - .map_err(PyDeltaTableError::from_vacuum_error) + let mut cmd = VacuumBuilder::new(self._table.object_store(), self._table.state.clone()) + .with_enforce_retention_duration(enforce_retention_duration) + .with_dry_run(dry_run); + if let Some(retention_period) = retention_hours { + cmd = cmd.with_retention_period(Duration::hours(retention_period as i64)); + } + let (table, metrics) = rt()? + .block_on(async { cmd.await }) + .map_err(PyDeltaTableError::from_raw)?; + self._table.state = table.state; + Ok(metrics.files_deleted) } // Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table. @@ -321,7 +322,7 @@ impl RawDeltaTable { pub fn update_incremental(&mut self) -> PyResult<()> { rt()? - .block_on(self._table.update_incremental()) + .block_on(self._table.update_incremental(None)) .map_err(PyDeltaTableError::from_raw) } diff --git a/python/tests/test_vacuum.py b/python/tests/test_vacuum.py index ef5e84c078..5b8b17d080 100644 --- a/python/tests/test_vacuum.py +++ b/python/tests/test_vacuum.py @@ -30,7 +30,7 @@ def test_vacuum_dry_run_simple_table(): dt.vacuum(retention_periods) assert ( str(exception.value) - == "Invalid retention period, minimum retention for vacuum is configured to be greater than 168 hours, got 167 hours" + == "Generic error: Invalid retention period, minimum retention for vacuum is configured to be greater than 168 hours, got 167 hours" ) diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index 8bc09ba8ed..10a1528084 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -226,6 +226,12 @@ pub struct Add { pub tags: Option>>, } +impl Hash for Add { + fn hash(&self, state: &mut H) { + self.path.hash(state); + } +} + impl Add { /// Returns the Add action with path decoded. pub fn path_decoded(self) -> Result { diff --git a/rust/src/builder/mod.rs b/rust/src/builder/mod.rs index e3778fdf30..2bc1093c4f 100644 --- a/rust/src/builder/mod.rs +++ b/rust/src/builder/mod.rs @@ -228,7 +228,7 @@ impl DeltaTableBuilder { Ok(object_store) } - /// Build the delta Table from specified options. + /// Build the [`DeltaTable`] from specified options. /// /// This will not load the log, i.e. the table is not initialized. To get an initialized /// table use the `load` function @@ -258,7 +258,7 @@ impl DeltaTableBuilder { Ok(DeltaTable::new(object_store, config)) } - /// finally load the table + /// Build the [`DeltaTable`] and load its state pub async fn load(self) -> Result { let version = self.options.version.clone(); let mut table = self.build()?; diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 478656bc3d..e290886479 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -12,13 +12,13 @@ use std::{cmp::max, cmp::Ordering, collections::HashSet}; use super::action; use super::action::{Action, DeltaOperation}; -use super::partitions::{DeltaTablePartition, PartitionFilter}; +use super::partitions::PartitionFilter; use super::schema::*; use super::table_state::DeltaTableState; use crate::action::{Add, Stats}; use crate::delta_config::DeltaConfigError; +use crate::operations::vacuum::VacuumBuilder; use crate::storage::ObjectStoreRef; -use crate::vacuum::{Vacuum, VacuumError}; use chrono::{DateTime, Duration, Utc}; use futures::StreamExt; @@ -426,6 +426,7 @@ pub struct DeltaTable { pub state: DeltaTableState, /// the load options used during load pub config: DeltaTableConfig, + /// object store to access log and data files pub(crate) storage: ObjectStoreRef, /// file metadata for latest checkpoint last_check_point: Option, @@ -512,6 +513,21 @@ impl DeltaTable { } } + /// Create a new [`DeltaTable`] from a [`DeltaTableState`] without loading any + /// data from backing storage. + /// + /// NOTE: This is for advanced users. If you don't know why you need to use this method, + /// please call one of the `open_table` helper methods instead. + pub(crate) fn new_with_state(storage: ObjectStoreRef, state: DeltaTableState) -> Self { + Self { + state, + storage, + config: Default::default(), + last_check_point: None, + version_timestamp: HashMap::new(), + } + } + /// get a shared reference to the delta object store pub fn object_store(&self) -> ObjectStoreRef { self.storage.clone() @@ -665,17 +681,6 @@ impl DeltaTable { Ok(cp) } - async fn apply_log(&mut self, version: DeltaDataTypeVersion) -> Result<(), ApplyLogError> { - let new_state = DeltaTableState::from_commit(self, version).await?; - self.state.merge( - new_state, - self.config.require_tombstones, - self.config.require_files, - ); - - Ok(()) - } - #[cfg(any(feature = "parquet", feature = "parquet2"))] async fn restore_checkpoint(&mut self, check_point: CheckPoint) -> Result<(), DeltaTableError> { self.state = DeltaTableState::from_checkpoint(self, &check_point).await?; @@ -766,26 +771,6 @@ impl DeltaTable { Ok(PeekCommit::New(next_version, actions)) } - ///Apply any actions associated with the PeekCommit to the DeltaTable - pub fn apply_actions( - &mut self, - new_version: DeltaDataTypeVersion, - actions: Vec, - ) -> Result<(), DeltaTableError> { - if self.version() + 1 != new_version { - return Err(DeltaTableError::VersionMismatch( - new_version, - self.version(), - )); - } - - let s = DeltaTableState::from_actions(actions, new_version)?; - self.state - .merge(s, self.config.require_tombstones, self.config.require_files); - - Ok(()) - } - /// Updates the DeltaTable to the most recent state committed to the transaction log by /// loading the last checkpoint and incrementally applying each version since. #[cfg(any(feature = "parquet", feature = "parquet2"))] @@ -793,29 +778,39 @@ impl DeltaTable { match self.get_last_checkpoint().await { Ok(last_check_point) => { if Some(last_check_point) == self.last_check_point { - self.update_incremental().await + self.update_incremental(None).await } else { self.last_check_point = Some(last_check_point); self.restore_checkpoint(last_check_point).await?; - self.update_incremental().await + self.update_incremental(None).await } } - Err(LoadCheckpointError::NotFound) => self.update_incremental().await, - Err(e) => Err(DeltaTableError::LoadCheckpoint { source: e }), + Err(LoadCheckpointError::NotFound) => self.update_incremental(None).await, + Err(source) => Err(DeltaTableError::LoadCheckpoint { source }), } } /// Updates the DeltaTable to the most recent state committed to the transaction log. #[cfg(not(any(feature = "parquet", feature = "parquet2")))] pub async fn update(&mut self) -> Result<(), DeltaTableError> { - self.update_incremental().await + self.update_incremental(None).await } /// Updates the DeltaTable to the latest version by incrementally applying newer versions. /// It assumes that the table is already updated to the current version `self.version`. - pub async fn update_incremental(&mut self) -> Result<(), DeltaTableError> { - while let PeekCommit::New(version, actions) = self.peek_next_commit(self.version()).await? { - self.apply_actions(version, actions)?; + pub async fn update_incremental( + &mut self, + max_version: Option, + ) -> Result<(), DeltaTableError> { + while let PeekCommit::New(new_version, actions) = + self.peek_next_commit(self.version()).await? + { + let s = DeltaTableState::from_actions(actions, new_version)?; + self.state + .merge(s, self.config.require_tombstones, self.config.require_files); + if Some(self.version()) == max_version { + return Ok(()); + } } if self.version() == -1 { @@ -846,25 +841,20 @@ impl DeltaTable { } } - let mut next_version = 0; // 1. find latest checkpoint below version #[cfg(any(feature = "parquet", feature = "parquet2"))] match self.find_latest_check_point_for_version(version).await? { Some(check_point) => { self.restore_checkpoint(check_point).await?; - next_version = check_point.version + 1; } None => { // no checkpoint found, clear table state and start from the beginning - self.state = DeltaTableState::with_version(0); + self.state = DeltaTableState::with_version(-1); } } // 2. apply all logs starting from checkpoint - while next_version <= version { - self.apply_log(next_version).await?; - next_version += 1; - } + self.update_incremental(Some(version)).await?; Ok(()) } @@ -948,35 +938,7 @@ impl DeltaTable { &'a self, filters: &'a [PartitionFilter<'a, &'a str>], ) -> Result + '_, DeltaTableError> { - let current_metadata = self - .state - .current_metadata() - .ok_or(DeltaTableError::NoMetadata)?; - if !filters - .iter() - .all(|f| current_metadata.partition_columns.contains(&f.key.into())) - { - return Err(DeltaTableError::InvalidPartitionFilter { - partition_filter: format!("{:?}", filters), - }); - } - - let partition_col_data_types: HashMap<&str, &SchemaDataType> = current_metadata - .get_partition_col_data_types() - .into_iter() - .collect(); - - let actions = self.state.files().iter().filter(move |add| { - let partitions = add - .partition_values - .iter() - .map(|p| DeltaTablePartition::from_partition_value(p, "")) - .collect::>(); - filters - .iter() - .all(|filter| filter.match_partitions(&partitions, &partition_col_data_types)) - }); - Ok(actions) + self.state.get_active_add_actions_by_partitions(filters) } /// Returns the file list tracked in current table state filtered by provided @@ -1006,33 +968,25 @@ impl DeltaTable { /// Returns an iterator of file names present in the loaded state #[inline] pub fn get_files_iter(&self) -> impl Iterator + '_ { - self.state - .files() - .iter() - .map(|add| Path::from(add.path.as_ref())) + self.state.file_paths_iter() } /// Returns a collection of file names present in the loaded state #[inline] pub fn get_files(&self) -> Vec { - self.get_files_iter().collect() + self.state.file_paths_iter().collect() } /// Returns file names present in the loaded state in HashSet pub fn get_file_set(&self) -> HashSet { - self.state - .files() - .iter() - .map(|add| Path::from(add.path.as_ref())) - .collect() + self.state.file_paths_iter().collect() } /// Returns a URIs for all active files present in the current table version. pub fn get_file_uris(&self) -> impl Iterator + '_ { self.state - .files() - .iter() - .map(|add| self.storage.to_uri(&Path::from(add.path.as_ref()))) + .file_paths_iter() + .map(|path| self.storage.to_uri(&path)) } /// Returns statistics for files, in order @@ -1084,24 +1038,6 @@ impl DeltaTable { self.state.min_writer_version() } - /// Vacuum the delta table see [`Vacuum`] for more info - pub async fn vacuum( - &mut self, - retention_hours: Option, - dry_run: bool, - enforce_retention_duration: bool, - ) -> Result, VacuumError> { - let mut plan = Vacuum::default() - .dry_run(dry_run) - .enforce_retention_duration(enforce_retention_duration); - if let Some(hours) = retention_hours { - plan = plan.with_retention_period(Duration::hours(hours as i64)); - } - - let res = plan.execute(self).await?; - Ok(res.files_deleted) - } - /// Return table schema parsed from transaction log. Return None if table hasn't been loaded or /// no metadata was found in the log. pub fn schema(&self) -> Option<&Schema> { @@ -1123,6 +1059,25 @@ impl DeltaTable { .get_configuration()) } + /// Vacuum the delta table. See [`VacuumBuilder`] for more information. + pub async fn vacuum( + &mut self, + retention_hours: Option, + dry_run: bool, + enforce_retention_duration: bool, + ) -> Result, DeltaTableError> { + let mut plan = VacuumBuilder::new(self.object_store(), self.state.clone()) + .with_dry_run(dry_run) + .with_enforce_retention_duration(enforce_retention_duration); + if let Some(hours) = retention_hours { + plan = plan.with_retention_period(Duration::hours(hours as i64)); + } + + let (table, metrics) = plan.await?; + self.state = table.state; + Ok(metrics.files_deleted) + } + /// Creates a new DeltaTransaction for the DeltaTable. /// The transaction holds a mutable reference to the DeltaTable, preventing other references /// until the transaction is dropped. diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 8c99bd4ea2..280ce33710 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -84,13 +84,13 @@ pub mod builder; pub mod data_catalog; pub mod delta; pub mod delta_config; +pub mod operations; pub mod partitions; pub mod schema; pub mod storage; pub mod table_properties; pub mod table_state; pub mod time_utils; -pub mod vacuum; #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod checkpoints; @@ -99,8 +99,6 @@ pub mod delta_arrow; #[cfg(feature = "datafusion")] pub mod delta_datafusion; #[cfg(all(feature = "arrow", feature = "parquet"))] -pub mod operations; -#[cfg(all(feature = "arrow", feature = "parquet"))] pub mod optimize; #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod writer; @@ -111,14 +109,13 @@ pub use self::delta::*; pub use self::partitions::*; pub use self::schema::*; pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore}; +pub use operations::DeltaOps; // convenience exports for consumers to avoid aligning crate versions #[cfg(feature = "arrow")] pub use arrow; #[cfg(feature = "datafusion")] pub use datafusion; -#[cfg(all(feature = "arrow", feature = "parquet"))] -pub use operations::DeltaOps; #[cfg(feature = "parquet")] pub use parquet; #[cfg(feature = "parquet2")] diff --git a/rust/src/operations/create.rs b/rust/src/operations/create.rs index 622afed005..2cbfa2b6f8 100644 --- a/rust/src/operations/create.rs +++ b/rust/src/operations/create.rs @@ -310,7 +310,7 @@ impl std::future::IntoFuture for CreateBuilder { } } -#[cfg(test)] +#[cfg(all(test, feature = "parquet"))] mod tests { use super::*; use crate::operations::DeltaOps; diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index fb0d79198e..b49fd44b6d 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -8,11 +8,13 @@ //! if the operation returns data as well. use self::create::CreateBuilder; +use self::vacuum::VacuumBuilder; use crate::builder::DeltaTableBuilder; use crate::{DeltaResult, DeltaTable, DeltaTableError}; pub mod create; pub mod transaction; +pub mod vacuum; #[cfg(feature = "datafusion")] use self::{load::LoadBuilder, write::WriteBuilder}; @@ -107,6 +109,12 @@ impl DeltaOps { .with_input_batches(batches) .with_object_store(self.0.object_store()) } + + /// Vacuum stale files from delta table + #[must_use] + pub fn vacuum(self) -> VacuumBuilder { + VacuumBuilder::new(self.0.object_store(), self.0.state) + } } impl From for DeltaOps { diff --git a/rust/src/operations/transaction.rs b/rust/src/operations/transaction.rs index 7f513f7ed5..9b06e48088 100644 --- a/rust/src/operations/transaction.rs +++ b/rust/src/operations/transaction.rs @@ -141,7 +141,7 @@ pub(crate) async fn commit( } } -#[cfg(test)] +#[cfg(all(test, feature = "parquet"))] mod tests { use super::*; use crate::action::{DeltaOperation, Protocol, SaveMode}; diff --git a/rust/src/operations/vacuum.rs b/rust/src/operations/vacuum.rs new file mode 100644 index 0000000000..ea0fe43fe8 --- /dev/null +++ b/rust/src/operations/vacuum.rs @@ -0,0 +1,326 @@ +//! Vacuum a Delta table +//! +//! Run the Vacuum command on the Delta Table: delete files no longer referenced by a Delta table and are older than the retention threshold. +//! We do not recommend that you set a retention interval shorter than 7 days, because old snapshots +//! and uncommitted files can still be in use by concurrent readers or writers to the table. +//! +//! If vacuum cleans up active files, concurrent readers can fail or, worse, tables can be +//! corrupted when vacuum deletes files that have not yet been committed. +//! If `retention_period` is not set then the `configuration.deletedFileRetentionDuration` of +//! delta table is used or if that's missing too, then the default value of 7 days otherwise. +//! +//! When you run vacuum then you cannot use time travel to a version older than +//! the specified retention period. +//! +//! Warning: Vacuum does not support partitioned tables on Windows. This is due +//! to Windows not using unix style paths. See #682 +//! +//! # Example +//! ```rust ignore +//! let mut table = open_table("../path/to/table")?; +//! let (table, metrics) = VacuumBuilder::new(table.object_store(). table.state).await?; +//! ```` + +use crate::storage::DeltaObjectStore; +use crate::table_state::DeltaTableState; +use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError}; +use chrono::{Duration, Utc}; +use futures::future::BoxFuture; +use futures::StreamExt; +use object_store::{path::Path, ObjectStore}; +use std::collections::HashSet; +use std::fmt::Debug; +use std::sync::Arc; + +/// Errors that can occur during vacuum +#[derive(thiserror::Error, Debug)] +enum VacuumError { + /// Error returned when Vacuum retention period is below the safe threshold + #[error( + "Invalid retention period, minimum retention for vacuum is configured to be greater than {} hours, got {} hours", .min, .provided + )] + InvalidVacuumRetentionPeriod { + /// User provided retention on vacuum call + provided: DeltaDataTypeLong, + /// Minimal retention configured in delta table config + min: DeltaDataTypeLong, + }, + + /// Error returned + #[error(transparent)] + DeltaTable(#[from] DeltaTableError), +} + +impl From for DeltaTableError { + fn from(err: VacuumError) -> Self { + DeltaTableError::GenericError { + source: Box::new(err), + } + } +} + +/// A source of time +pub trait Clock: Debug + Send + Sync { + /// get the current time in milliseconds since epoch + fn current_timestamp_millis(&self) -> i64; +} + +#[derive(Debug)] +/// Vacuum a Delta table with the given options +/// See this module's documentation for more information +pub struct VacuumBuilder { + /// A snapshot of the to-be-vacuumed table's state + snapshot: DeltaTableState, + /// Delta object store for handling data files + store: Arc, + /// Period of stale files allowed. + retention_period: Option, + /// Validate the retention period is not below the retention period configured in the table + enforce_retention_duration: bool, + /// Don't delete the files. Just determine which files can be deleted + dry_run: bool, + /// Override the source of time + clock: Option>, +} + +/// Details for the Vacuum operation including which files were +#[derive(Debug)] +pub struct VacuumMetrics { + /// Was this a dry run + pub dry_run: bool, + /// Files deleted successfully + pub files_deleted: Vec, +} + +/// Methods to specify various vacuum options and to execute the operation +impl VacuumBuilder { + /// Create a new [`VacuumBuilder`] + pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { + VacuumBuilder { + snapshot, + store, + retention_period: None, + enforce_retention_duration: true, + dry_run: false, + clock: None, + } + } + + /// Override the default rention period for which files are deleted. + pub fn with_retention_period(mut self, retention_period: Duration) -> Self { + self.retention_period = Some(retention_period); + self + } + + /// Only determine which files should be deleted + pub fn with_dry_run(mut self, dry_run: bool) -> Self { + self.dry_run = dry_run; + self + } + + /// Check if the specified retention period is less than the table's minimum + pub fn with_enforce_retention_duration(mut self, enforce: bool) -> Self { + self.enforce_retention_duration = enforce; + self + } + + /// add a time source for testing + #[doc(hidden)] + pub fn with_clock(mut self, clock: Arc) -> Self { + self.clock = Some(clock); + self + } + + /// Determine which files can be deleted. Does not actually peform the deletion + async fn create_vacuum_plan(&self) -> Result { + let min_retention = Duration::milliseconds(self.snapshot.tombstone_retention_millis()); + let retention_period = self.retention_period.unwrap_or(min_retention); + let enforce_retention_duration = self.enforce_retention_duration; + + if enforce_retention_duration && retention_period < min_retention { + return Err(VacuumError::InvalidVacuumRetentionPeriod { + provided: retention_period.num_hours(), + min: min_retention.num_hours(), + }); + } + + let now_millis = match &self.clock { + Some(clock) => clock.current_timestamp_millis(), + None => Utc::now().timestamp_millis(), + }; + + let expired_tombstones = get_stale_files(&self.snapshot, retention_period, now_millis); + let valid_files = self.snapshot.file_paths_iter().collect::>(); + + let mut files_to_delete = vec![]; + let mut all_files = self.store.list(None).await.map_err(DeltaTableError::from)?; + + while let Some(obj_meta) = all_files.next().await { + // TODO should we allow NotFound here in case we have a temporary commit file in the list + let obj_meta = obj_meta.map_err(DeltaTableError::from)?; + if valid_files.contains(&obj_meta.location) // file is still being tracked in table + || !expired_tombstones.contains(obj_meta.location.as_ref()) // file is not an expired tombstone + || is_hidden_directory(&self.snapshot, &obj_meta.location)? + { + continue; + } + + files_to_delete.push(obj_meta.location); + } + + Ok(VacuumPlan { files_to_delete }) + } +} + +impl std::future::IntoFuture for VacuumBuilder { + type Output = DeltaResult<(DeltaTable, VacuumMetrics)>; + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + let this = self; + + Box::pin(async move { + let plan = this.create_vacuum_plan().await?; + if this.dry_run { + return Ok(( + DeltaTable::new_with_state(this.store, this.snapshot), + VacuumMetrics { + files_deleted: plan.files_to_delete.iter().map(|f| f.to_string()).collect(), + dry_run: true, + }, + )); + } + + let metrics = plan.execute(&this.store).await?; + Ok(( + DeltaTable::new_with_state(this.store, this.snapshot), + metrics, + )) + }) + } +} + +/// Encapsulate which files are to be deleted and the parameters used to make that decision +struct VacuumPlan { + /// What files are to be deleted + pub files_to_delete: Vec, +} + +impl VacuumPlan { + /// Execute the vacuum plan and delete files from underlying storage + pub async fn execute(self, store: &DeltaObjectStore) -> Result { + if self.files_to_delete.is_empty() { + return Ok(VacuumMetrics { + dry_run: false, + files_deleted: Vec::new(), + }); + } + + // Delete the files + let files_deleted = match store.delete_batch(&self.files_to_delete).await { + Ok(_) => Ok(self.files_to_delete), + Err(err) => Err(err), + }? + .into_iter() + .map(|file| file.to_string()) + .collect(); + + Ok(VacuumMetrics { + files_deleted, + dry_run: false, + }) + } +} + +/// Whether a path should be hidden for delta-related file operations, such as Vacuum. +/// Names of the form partitionCol=[value] are partition directories, and should be +/// deleted even if they'd normally be hidden. The _db_index directory contains (bloom filter) +/// indexes and these must be deleted when the data they are tied to is deleted. +fn is_hidden_directory(snapshot: &DeltaTableState, path: &Path) -> Result { + let path_name = path.to_string(); + Ok((path_name.starts_with('.') || path_name.starts_with('_')) + && !path_name.starts_with("_delta_index") + && !path_name.starts_with("_change_data") + && !snapshot + .current_metadata() + .ok_or(DeltaTableError::NoMetadata)? + .partition_columns + .iter() + .any(|partition_column| path_name.starts_with(partition_column))) +} + +/// List files no longer referenced by a Delta table and are older than the retention threshold. +fn get_stale_files( + snapshot: &DeltaTableState, + retention_period: Duration, + now_timestamp_millis: i64, +) -> HashSet<&str> { + let tombstone_retention_timestamp = now_timestamp_millis - retention_period.num_milliseconds(); + snapshot + .all_tombstones() + .iter() + .filter(|tombstone| { + // if the file has a creation time before the `tombstone_retention_timestamp` + // then it's considered as a stale file + tombstone.deletion_timestamp.unwrap_or(0) < tombstone_retention_timestamp + }) + .map(|tombstone| tombstone.path.as_str()) + .collect::>() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::open_table; + use std::time::SystemTime; + + #[tokio::test] + async fn vacuum_delta_8_0_table() { + let table = open_table("./tests/data/delta-0.8.0").await.unwrap(); + + let result = VacuumBuilder::new(table.object_store(), table.state.clone()) + .with_retention_period(Duration::hours(1)) + .with_dry_run(true) + .await; + + assert!(result.is_err()); + + let table = open_table("./tests/data/delta-0.8.0").await.unwrap(); + let (table, result) = VacuumBuilder::new(table.object_store(), table.state) + .with_retention_period(Duration::hours(0)) + .with_dry_run(true) + .with_enforce_retention_duration(false) + .await + .unwrap(); + // do not enforce retention duration check with 0 hour will purge all files + assert_eq!( + result.files_deleted, + vec!["part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"] + ); + + let (table, result) = VacuumBuilder::new(table.object_store(), table.state) + .with_retention_period(Duration::hours(169)) + .with_dry_run(true) + .await + .unwrap(); + + assert_eq!( + result.files_deleted, + vec!["part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"] + ); + + let retention_hours = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() + / 3600; + let empty: Vec = Vec::new(); + let (_table, result) = VacuumBuilder::new(table.object_store(), table.state) + .with_retention_period(Duration::hours(retention_hours as i64)) + .with_dry_run(true) + .await + .unwrap(); + + assert_eq!(result.files_deleted, empty); + } +} diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 7dcc776099..373747fe49 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -1,12 +1,15 @@ //! The module for delta table state. -use super::{ - ApplyLogError, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable, DeltaTableMetaData, -}; -use crate::action::{self, Action}; +use crate::action::{self, Action, Add}; use crate::delta_config; +use crate::partitions::{DeltaTablePartition, PartitionFilter}; +use crate::schema::SchemaDataType; +use crate::{ + ApplyLogError, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable, DeltaTableError, + DeltaTableMetaData, +}; use chrono::Utc; -use object_store::ObjectStore; +use object_store::{path::Path, ObjectStore}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use std::collections::HashMap; @@ -15,23 +18,29 @@ use std::convert::TryFrom; use std::io::{BufRead, BufReader, Cursor}; #[cfg(any(feature = "parquet", feature = "parquet2"))] -use super::{CheckPoint, DeltaTableConfig, DeltaTableError}; +use super::{CheckPoint, DeltaTableConfig}; /// State snapshot currently held by the Delta Table instance. #[derive(Default, Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct DeltaTableState { + // current table version represented by this table state version: DeltaDataTypeVersion, // A remove action should remain in the state of the table as a tombstone until it has expired. // A tombstone expires when the creation timestamp of the delta file exceeds the expiration tombstones: HashSet, + // active files for table state files: Vec, + // Information added to individual commits commit_infos: Vec>, app_transaction_version: HashMap, min_reader_version: i32, min_writer_version: i32, + // table metadata corresponding to current version current_metadata: Option, + // retention period for tombstones in milli-seconds tombstone_retention_millis: DeltaDataTypeLong, + // retention period for log entries in milli-seconds log_retention_millis: DeltaDataTypeLong, enable_expired_log_cleanup: bool, } @@ -192,6 +201,12 @@ impl DeltaTableState { self.files.as_ref() } + /// Returns an iterator of file names present in the loaded state + #[inline] + pub fn file_paths_iter(&self) -> impl Iterator + '_ { + self.files.iter().map(|add| Path::from(add.path.as_ref())) + } + /// HashMap containing the last txn version stored for every app id writing txn /// actions. pub fn app_transaction_version(&self) -> &HashMap { @@ -330,6 +345,39 @@ impl DeltaTableState { Ok(()) } + + /// Obtain Add actions for files that match the filter + pub fn get_active_add_actions_by_partitions<'a>( + &'a self, + filters: &'a [PartitionFilter<'a, &'a str>], + ) -> Result + '_, DeltaTableError> { + let current_metadata = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?; + if !filters + .iter() + .all(|f| current_metadata.partition_columns.contains(&f.key.into())) + { + return Err(DeltaTableError::InvalidPartitionFilter { + partition_filter: format!("{:?}", filters), + }); + } + + let partition_col_data_types: HashMap<&str, &SchemaDataType> = current_metadata + .get_partition_col_data_types() + .into_iter() + .collect(); + + let actions = self.files().iter().filter(move |add| { + let partitions = add + .partition_values + .iter() + .map(|p| DeltaTablePartition::from_partition_value(p, "")) + .collect::>(); + filters + .iter() + .all(|filter| filter.match_partitions(&partitions, &partition_col_data_types)) + }); + Ok(actions) + } } #[cfg(test)] diff --git a/rust/src/vacuum.rs b/rust/src/vacuum.rs deleted file mode 100644 index 8fa7e81738..0000000000 --- a/rust/src/vacuum.rs +++ /dev/null @@ -1,247 +0,0 @@ -//! Vacuum a Delta table -//! -//! Run the Vacuum command on the Delta Table: delete files no longer referenced by a Delta table and are older than the retention threshold. -//! We do not recommend that you set a retention interval shorter than 7 days, because old snapshots -//! and uncommitted files can still be in use by concurrent readers or writers to the table. -//! -//! If vacuum cleans up active files, concurrent readers can fail or, worse, tables can be -//! corrupted when vacuum deletes files that have not yet been committed. -//! If `retention_period` is not set then the `configuration.deletedFileRetentionDuration` of -//! delta table is used or if that's missing too, then the default value of 7 days otherwise. -//! -//! When you run vacuum then you cannot use time travel to a version older than -//! the specified retention period. -//! -//! Warning: Vacuum does not support partitioned tables on Windows. This is due -//! to Windows not using unix style paths. See #682 -//! -//! # Example -//! ```rust ignore -//! let table = open_table("../path/to/table")?; -//! let metrics = Vacuum::default().execute(table).await?; -//! ```` - -use crate::{DeltaDataTypeLong, DeltaTable, DeltaTableError}; -use chrono::{Duration, Utc}; -use futures::StreamExt; -use object_store::{path::Path, ObjectStore}; -use std::collections::HashSet; -use std::fmt::Debug; -use std::sync::Arc; - -#[derive(Debug)] -/// Vacuum a Delta table with the given options -/// See this module's documentation for more information -pub struct Vacuum { - /// Period of stale files allowed. - pub retention_period: Option, - /// Validate the retention period is not below the retention period configured in the table - pub enforce_retention_duration: bool, - /// Don't delete the files. Just determine which files can be deleted - pub dry_run: bool, - /// Override the source of time - pub clock: Option>, -} - -impl Default for Vacuum { - fn default() -> Self { - Vacuum { - retention_period: None, - enforce_retention_duration: true, - dry_run: false, - clock: None, - } - } -} - -/// Encapsulate which files are to be deleted and the parameters used to make that decision -pub struct VacuumPlan { - /// What files are to be deleted - pub files_to_delete: Vec, -} - -/// Details for the Vacuum operation including which files were -#[derive(Debug)] -pub struct VacuumMetrics { - /// Was this a dry run - pub dry_run: bool, - /// Files deleted successfully - pub files_deleted: Vec, -} - -/// Errors that can occur during vacuum -#[derive(thiserror::Error, Debug)] -pub enum VacuumError { - /// Error returned when Vacuum retention period is below the safe threshold - #[error( - "Invalid retention period, minimum retention for vacuum is configured to be greater than {} hours, got {} hours", .min, .provided - )] - InvalidVacuumRetentionPeriod { - /// User provided retention on vacuum call - provided: DeltaDataTypeLong, - /// Minimal retention configured in delta table config - min: DeltaDataTypeLong, - }, - - /// Error returned - #[error(transparent)] - DeltaTable(#[from] DeltaTableError), -} - -/// List files no longer referenced by a Delta table and are older than the retention threshold. -pub(crate) fn get_stale_files( - table: &DeltaTable, - retention_period: Duration, - now_timestamp_millis: i64, -) -> Result, VacuumError> { - let tombstone_retention_timestamp = now_timestamp_millis - retention_period.num_milliseconds(); - - Ok(table - .state - .all_tombstones() - .iter() - .filter(|tombstone| { - // if the file has a creation time before the `tombstone_retention_timestamp` - // then it's considered as a stale file - tombstone.deletion_timestamp.unwrap_or(0) < tombstone_retention_timestamp - }) - .map(|tombstone| tombstone.path.as_str()) - .collect::>()) -} - -/// Whether a path should be hidden for delta-related file operations, such as Vacuum. -/// Names of the form partitionCol=[value] are partition directories, and should be -/// deleted even if they'd normally be hidden. The _db_index directory contains (bloom filter) -/// indexes and these must be deleted when the data they are tied to is deleted. -pub(crate) fn is_hidden_directory( - table: &DeltaTable, - path: &Path, -) -> Result { - let path_name = path.to_string(); - Ok((path_name.starts_with('.') || path_name.starts_with('_')) - && !path_name.starts_with("_delta_index") - && !path_name.starts_with("_change_data") - && !table - .state - .current_metadata() - .ok_or(DeltaTableError::NoMetadata)? - .partition_columns - .iter() - .any(|partition_column| path_name.starts_with(partition_column))) -} - -impl VacuumPlan { - /// Execute the vacuum plan and delete files from underlying storage - pub async fn execute(self, table: &mut DeltaTable) -> Result { - if self.files_to_delete.is_empty() { - return Ok(VacuumMetrics { - dry_run: false, - files_deleted: Vec::new(), - }); - } - - // Delete the files - let files_deleted = match table.storage.delete_batch(&self.files_to_delete).await { - Ok(_) => Ok(self.files_to_delete), - Err(err) => Err(VacuumError::from(DeltaTableError::ObjectStore { - source: err, - })), - }? - .into_iter() - .map(|file| file.to_string()) - .collect(); - - Ok(VacuumMetrics { - files_deleted, - dry_run: false, - }) - } -} - -/// Determine which files can be deleted. Does not actually peform the deletion -pub async fn create_vacuum_plan( - table: &DeltaTable, - params: Vacuum, -) -> Result { - let min_retention = Duration::milliseconds(table.state.tombstone_retention_millis()); - let retention_period = params.retention_period.unwrap_or(min_retention); - let enforce_retention_duration = params.enforce_retention_duration; - - if enforce_retention_duration && retention_period < min_retention { - return Err(VacuumError::InvalidVacuumRetentionPeriod { - provided: retention_period.num_hours(), - min: min_retention.num_hours(), - }); - } - - let now_millis = match params.clock { - Some(clock) => clock.current_timestamp_millis(), - None => Utc::now().timestamp_millis(), - }; - - let expired_tombstones = get_stale_files(table, retention_period, now_millis)?; - let valid_files = table.get_file_set(); - - let mut files_to_delete = vec![]; - let mut all_files = table - .storage - .list(None) - .await - .map_err(DeltaTableError::from)?; - - while let Some(obj_meta) = all_files.next().await { - // TODO should we allow NotFound here in case we have a temporary commit file in the list - let obj_meta = obj_meta.map_err(DeltaTableError::from)?; - if valid_files.contains(&obj_meta.location) // file is still being tracked in table - || !expired_tombstones.contains(obj_meta.location.as_ref()) // file is not an expired tombstone - || is_hidden_directory(table, &obj_meta.location)? - { - continue; - } - - files_to_delete.push(obj_meta.location); - } - - Ok(VacuumPlan { files_to_delete }) -} - -/// Methods to specify various vacuum options and to execute the operation -impl Vacuum { - /// Override the default rention period for which files are deleted. - pub fn with_retention_period(mut self, retention_period: Duration) -> Self { - self.retention_period = Some(retention_period); - self - } - - /// Only determine which files should be deleted - pub fn dry_run(mut self, dry_run: bool) -> Self { - self.dry_run = dry_run; - self - } - - /// Check if the specified retention period is less than the table's minimum - pub fn enforce_retention_duration(mut self, enforce: bool) -> Self { - self.enforce_retention_duration = enforce; - self - } - - /// Perform the vacuum. Returns metrics on which files were deleted - pub async fn execute(self, table: &mut DeltaTable) -> Result { - let dry_run = self.dry_run; - let plan = create_vacuum_plan(table, self).await?; - if dry_run { - return Ok(VacuumMetrics { - files_deleted: plan.files_to_delete.iter().map(|f| f.to_string()).collect(), - dry_run: true, - }); - } - - plan.execute(table).await - } -} - -/// A source of time -pub trait Clock: Debug { - /// get the current time in milliseconds since epoch - fn current_timestamp_millis(&self) -> i64; -} diff --git a/rust/tests/command_vacuum.rs b/rust/tests/command_vacuum.rs index c72e8d32b0..ce6c6071c7 100644 --- a/rust/tests/command_vacuum.rs +++ b/rust/tests/command_vacuum.rs @@ -1,82 +1,19 @@ use chrono::Duration; -use deltalake::vacuum::Clock; -use deltalake::vacuum::Vacuum; -use object_store::{path::Path, Error as ObjectStoreError, ObjectStore}; -use std::sync::Arc; - use common::clock::TestClock; use common::schemas::{get_vacuum_underscore_schema, get_xy_date_schema}; use common::TestContext; -use std::time::SystemTime; +use deltalake::operations::vacuum::Clock; +use deltalake::operations::DeltaOps; +use object_store::{path::Path, Error as ObjectStoreError, ObjectStore}; +use std::sync::Arc; mod common; -#[tokio::test] -async fn vacuum_delta_8_0_table() { - let mut table = deltalake::open_table("./tests/data/delta-0.8.0") - .await - .unwrap(); - - let result = Vacuum::default() - .with_retention_period(Duration::hours(1)) - .dry_run(true) - .execute(&mut table) - .await; - - assert!(matches!(result.unwrap_err(), - deltalake::vacuum::VacuumError::InvalidVacuumRetentionPeriod { - provided, - min, - } if provided == 1 - && min == 168, - )); - - let result = Vacuum::default() - .with_retention_period(Duration::hours(0)) - .dry_run(true) - .enforce_retention_duration(false) - .execute(&mut table) - .await - .unwrap(); - // do not enforce retention duration check with 0 hour will purge all files - assert_eq!( - result.files_deleted, - vec!["part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"] - ); - - let result = Vacuum::default() - .with_retention_period(Duration::hours(169)) - .dry_run(true) - .execute(&mut table) - .await - .unwrap(); - - assert_eq!( - result.files_deleted, - vec!["part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"] - ); - - let retention_hours = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() - / 3600; - let empty: Vec = Vec::new(); - let result = Vacuum::default() - .with_retention_period(Duration::hours(retention_hours as i64)) - .dry_run(true) - .execute(&mut table) - .await - .unwrap(); - - assert_eq!(result.files_deleted, empty); -} - #[tokio::test] // Validate vacuum works on a non-partitioned table async fn test_non_partitioned_table() { let mut context = TestContext::from_env().await; - context + let mut table = context .create_table_from_schema(get_xy_date_schema(), &[]) .await; let clock = TestClock::from_systemtime(); @@ -87,31 +24,35 @@ async fn test_non_partitioned_table() { ]; for path in paths { - context - .add_file( - &path, - "random junk".as_bytes().into(), - &[], - clock.current_timestamp_millis(), - true, - ) - .await; + common::add_file( + &mut table, + &path, + "random junk".as_bytes().into(), + &[], + clock.current_timestamp_millis(), + true, + ) + .await; } clock.tick(Duration::seconds(10)); - context - .remove_file("delete_me.parquet", &[], clock.current_timestamp_millis()) - .await; + common::remove_file( + &mut table, + "delete_me.parquet", + &[], + clock.current_timestamp_millis(), + ) + .await; let res = { clock.tick(Duration::days(8)); - let table = context.table.as_mut().unwrap(); - let plan = Vacuum { - clock: Some(Arc::new(clock.clone())), - ..Default::default() - }; - plan.execute(table).await.unwrap() + let (_, metrics) = DeltaOps(table) + .vacuum() + .with_clock(Arc::new(clock.clone())) + .await + .unwrap(); + metrics }; assert_eq!(res.files_deleted.len(), 1); @@ -123,7 +64,7 @@ async fn test_non_partitioned_table() { // Validate vacuum works on a table with multiple partitions async fn test_partitioned_table() { let mut context = TestContext::from_env().await; - context + let mut table = context .create_table_from_schema(get_xy_date_schema(), &["date", "x"]) .await; let clock = TestClock::from_systemtime(); @@ -135,35 +76,35 @@ async fn test_partitioned_table() { let partition_values = [("date", Some("2022-07-03")), ("x", Some("2"))]; for path in paths { - context - .add_file( - &path, - "random junk".as_bytes().into(), - &partition_values, - clock.current_timestamp_millis(), - true, - ) - .await; - } - - clock.tick(Duration::seconds(10)); - - context - .remove_file( - "date=2022-07-03/x=2/delete_me.parquet", + common::add_file( + &mut table, + &path, + "random junk".as_bytes().into(), &partition_values, clock.current_timestamp_millis(), + true, ) .await; + } + + clock.tick(Duration::seconds(10)); + + common::remove_file( + &mut table, + "date=2022-07-03/x=2/delete_me.parquet", + &partition_values, + clock.current_timestamp_millis(), + ) + .await; let res = { clock.tick(Duration::days(8)); - let table = context.table.as_mut().unwrap(); - let plan = Vacuum { - clock: Some(Arc::new(clock.clone())), - ..Default::default() - }; - plan.execute(table).await.unwrap() + let (_, metrics) = DeltaOps(table) + .vacuum() + .with_clock(Arc::new(clock.clone())) + .await + .unwrap(); + metrics }; assert_eq!(res.files_deleted.len(), 1); @@ -187,7 +128,7 @@ async fn test_partitioned_table() { // Partitions that start with _ are not ignored async fn test_partitions_included() { let mut context = TestContext::from_env().await; - context + let mut table = context .create_table_from_schema(get_vacuum_underscore_schema(), &["_date"]) .await; let clock = TestClock::from_systemtime(); @@ -200,35 +141,35 @@ async fn test_partitions_included() { let partition_values = &[("_date", Some("2022-07-03"))]; for path in paths { - context - .add_file( - &path, - "random junk".as_bytes().into(), - partition_values, - clock.current_timestamp_millis(), - true, - ) - .await; - } - - clock.tick(Duration::seconds(10)); - - context - .remove_file( - "_date=2022-07-03/delete_me.parquet", + common::add_file( + &mut table, + &path, + "random junk".as_bytes().into(), partition_values, clock.current_timestamp_millis(), + true, ) .await; + } + + clock.tick(Duration::seconds(10)); + + common::remove_file( + &mut table, + "_date=2022-07-03/delete_me.parquet", + partition_values, + clock.current_timestamp_millis(), + ) + .await; let res = { clock.tick(Duration::days(8)); - let table = context.table.as_mut().unwrap(); - let plan = deltalake::vacuum::Vacuum { - clock: Some(Arc::new(clock.clone())), - ..Default::default() - }; - plan.execute(table).await.unwrap() + let (_, metrics) = DeltaOps(table) + .vacuum() + .with_clock(Arc::new(clock.clone())) + .await + .unwrap(); + metrics }; assert_eq!(res.files_deleted.len(), 1); @@ -255,7 +196,7 @@ async fn test_partitions_included() { // that start with _ or . are ignored async fn test_non_managed_files() { let mut context = TestContext::from_env().await; - context + let mut table = context .create_table_from_schema(get_xy_date_schema(), &["date"]) .await; let clock = TestClock::from_systemtime(); @@ -279,27 +220,26 @@ async fn test_non_managed_files() { ]; for path in paths_delete.iter().chain(paths_ignore.iter()) { - context - .add_file( - path, - "random junk".as_bytes().into(), - &[], - clock.current_timestamp_millis(), - false, - ) - .await; + common::add_file( + &mut table, + path, + "random junk".as_bytes().into(), + &[], + clock.current_timestamp_millis(), + false, + ) + .await; } // Validate unmanaged files are not deleted within the retention period - let res = { + let (table, res) = { clock.tick(Duration::hours(1)); - let table = context.table.as_mut().unwrap(); - let plan = Vacuum { - clock: Some(Arc::new(clock.clone())), - ..Default::default() - }; - plan.execute(table).await.unwrap() + DeltaOps(table) + .vacuum() + .with_clock(Arc::new(clock.clone())) + .await + .unwrap() }; assert_eq!(res.files_deleted.len(), 0); @@ -309,13 +249,13 @@ async fn test_non_managed_files() { // Validate unmanaged files are deleted after the retention period let res = { - clock.tick(Duration::days(8)); - let table = context.table.as_mut().unwrap(); - let plan = Vacuum { - clock: Some(Arc::new(clock.clone())), - ..Default::default() - }; - plan.execute(table).await.unwrap() + clock.tick(Duration::hours(1)); + let (_, metrics) = DeltaOps(table) + .vacuum() + .with_clock(Arc::new(clock.clone())) + .await + .unwrap(); + metrics }; assert_eq!(res.files_deleted.len(), paths_delete.len()); diff --git a/rust/tests/common/clock.rs b/rust/tests/common/clock.rs index 30a85e3fda..9143837406 100644 --- a/rust/tests/common/clock.rs +++ b/rust/tests/common/clock.rs @@ -1,5 +1,5 @@ use chrono::{Duration, Utc}; -use deltalake::vacuum::Clock; +use deltalake::operations::vacuum::Clock; use std::sync::{Arc, Mutex}; #[derive(Clone, Debug)] diff --git a/rust/tests/common/mod.rs b/rust/tests/common/mod.rs index 37118d4ad5..cd1758bffe 100644 --- a/rust/tests/common/mod.rs +++ b/rust/tests/common/mod.rs @@ -68,64 +68,12 @@ impl TestContext { .unwrap() } - pub async fn add_file( - &mut self, - path: &Path, - data: Bytes, - partition_values: &[(&str, Option<&str>)], - create_time: i64, - commit_to_log: bool, - ) { - let backend = self.get_storage(); - backend.put(path, data.clone()).await.unwrap(); - - if commit_to_log { - let mut part_values = HashMap::new(); - for v in partition_values { - part_values.insert(v.0.to_string(), v.1.map(|v| v.to_string())); - } - - let add = Add { - path: path.as_ref().into(), - size: data.len() as i64, - modification_time: create_time, - partition_values: part_values, - data_change: true, - ..Default::default() - }; - let table = self.table.as_mut().unwrap(); - let mut transaction = table.create_transaction(None); - transaction.add_action(action::Action::add(add)); - transaction.commit(None, None).await.unwrap(); - } - } - - pub async fn remove_file( - &mut self, - path: &str, - partition_values: &[(&str, Option<&str>)], - deletion_timestamp: i64, - ) { - let mut part_values = HashMap::new(); - for v in partition_values { - part_values.insert(v.0.to_string(), v.1.map(|v| v.to_string())); - } - - let remove = Remove { - path: path.into(), - deletion_timestamp: Some(deletion_timestamp), - partition_values: Some(part_values), - data_change: true, - ..Default::default() - }; - let table = self.table.as_mut().unwrap(); - let mut transaction = table.create_transaction(None); - transaction.add_action(action::Action::remove(remove)); - transaction.commit(None, None).await.unwrap(); - } - //Create and set a new table from the provided schema - pub async fn create_table_from_schema(&mut self, schema: Schema, partitions: &[&str]) { + pub async fn create_table_from_schema( + &mut self, + schema: Schema, + partitions: &[&str], + ) -> DeltaTable { let p = partitions .iter() .map(|s| s.to_string()) @@ -161,7 +109,7 @@ impl TestContext { .await .unwrap(); - self.table = Some(dt); + dt } } @@ -181,3 +129,57 @@ pub async fn setup_local_context() -> TestContext { ..TestContext::default() } } + +pub async fn add_file( + table: &mut DeltaTable, + path: &Path, + data: Bytes, + partition_values: &[(&str, Option<&str>)], + create_time: i64, + commit_to_log: bool, +) { + let backend = table.object_store(); + backend.put(path, data.clone()).await.unwrap(); + + if commit_to_log { + let mut part_values = HashMap::new(); + for v in partition_values { + part_values.insert(v.0.to_string(), v.1.map(|v| v.to_string())); + } + + let add = Add { + path: path.as_ref().into(), + size: data.len() as i64, + modification_time: create_time, + partition_values: part_values, + data_change: true, + ..Default::default() + }; + let mut transaction = table.create_transaction(None); + transaction.add_action(action::Action::add(add)); + transaction.commit(None, None).await.unwrap(); + } +} + +pub async fn remove_file( + table: &mut DeltaTable, + path: &str, + partition_values: &[(&str, Option<&str>)], + deletion_timestamp: i64, +) { + let mut part_values = HashMap::new(); + for v in partition_values { + part_values.insert(v.0.to_string(), v.1.map(|v| v.to_string())); + } + + let remove = Remove { + path: path.into(), + deletion_timestamp: Some(deletion_timestamp), + partition_values: Some(part_values), + data_change: true, + ..Default::default() + }; + let mut transaction = table.create_transaction(None); + transaction.add_action(action::Action::remove(remove)); + transaction.commit(None, None).await.unwrap(); +} diff --git a/rust/tests/read_delta_test.rs b/rust/tests/read_delta_test.rs index 6958e71b77..ff30294ff7 100644 --- a/rust/tests/read_delta_test.rs +++ b/rust/tests/read_delta_test.rs @@ -506,8 +506,9 @@ async fn test_poll_table_commits() { let path = "./tests/data/simple_table_with_checkpoint"; let mut table = deltalake::open_table_with_version(path, 9).await.unwrap(); let peek = table.peek_next_commit(table.version()).await.unwrap(); + assert!(matches!(peek, PeekCommit::New(..))); - let is_new = if let PeekCommit::New(version, actions) = peek { + if let PeekCommit::New(version, actions) = peek { assert_eq!(table.version(), 9); assert!(!table.get_files_iter().any(|f| f == Path::from("part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet"))); @@ -515,17 +516,12 @@ async fn test_poll_table_commits() { assert_eq!(version, 10); assert_eq!(actions.len(), 2); - table.apply_actions(version, actions).unwrap(); + table.update_incremental(None).await.unwrap(); assert_eq!(table.version(), 10); assert!(table.get_files_iter().any(|f| f == Path::from("part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet"))); - - true - } else { - false }; - assert!(is_new); let peek = table.peek_next_commit(table.version()).await.unwrap(); assert!(matches!(peek, PeekCommit::UpToDate));