Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move vacuum command to operations module #1045

Merged
merged 10 commits into from
Jan 5, 2023
Merged
35 changes: 18 additions & 17 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@ mod schema;
mod utils;

use arrow::pyarrow::PyArrowType;
use chrono::{DateTime, FixedOffset, Utc};
use chrono::{DateTime, Duration, FixedOffset, Utc};
use deltalake::action::{
self, Action, ColumnCountStat, ColumnValueStat, DeltaOperation, SaveMode, Stats,
};
use deltalake::arrow::record_batch::RecordBatch;
use deltalake::arrow::{self, datatypes::Schema as ArrowSchema};
use deltalake::builder::DeltaTableBuilder;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::partitions::PartitionFilter;
use deltalake::DeltaDataTypeLong;
use deltalake::DeltaDataTypeTimestamp;
use deltalake::DeltaTableMetaData;
use deltalake::DeltaTransactionOptions;
use deltalake::{Invariant, Schema};
use deltalake::{
DeltaDataTypeLong, DeltaDataTypeTimestamp, DeltaTableMetaData, DeltaTransactionOptions,
Invariant, Schema,
};
use pyo3::create_exception;
use pyo3::exceptions::PyException;
use pyo3::exceptions::PyValueError;
Expand Down Expand Up @@ -49,10 +49,6 @@ impl PyDeltaTableError {
PyDeltaTableError::new_err(err.to_string())
}

fn from_vacuum_error(err: deltalake::vacuum::VacuumError) -> pyo3::PyErr {
PyDeltaTableError::new_err(err.to_string())
}

fn from_tokio(err: tokio::io::Error) -> pyo3::PyErr {
PyDeltaTableError::new_err(err.to_string())
}
Expand Down Expand Up @@ -288,12 +284,17 @@ impl RawDeltaTable {
retention_hours: Option<u64>,
enforce_retention_duration: bool,
) -> PyResult<Vec<String>> {
rt()?
.block_on(
self._table
.vacuum(retention_hours, dry_run, enforce_retention_duration),
)
.map_err(PyDeltaTableError::from_vacuum_error)
let mut cmd = VacuumBuilder::new(self._table.object_store(), self._table.state.clone())
.with_enforce_retention_duration(enforce_retention_duration)
.with_dry_run(dry_run);
if let Some(retention_period) = retention_hours {
cmd = cmd.with_retention_period(Duration::hours(retention_period as i64));
}
let (table, metrics) = rt()?
.block_on(async { cmd.await })
.map_err(PyDeltaTableError::from_raw)?;
self._table.state = table.state;
Ok(metrics.files_deleted)
}

// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table.
Expand Down Expand Up @@ -321,7 +322,7 @@ impl RawDeltaTable {

pub fn update_incremental(&mut self) -> PyResult<()> {
rt()?
.block_on(self._table.update_incremental())
.block_on(self._table.update_incremental(None))
.map_err(PyDeltaTableError::from_raw)
}

Expand Down
2 changes: 1 addition & 1 deletion python/tests/test_vacuum.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_vacuum_dry_run_simple_table():
dt.vacuum(retention_periods)
assert (
str(exception.value)
== "Invalid retention period, minimum retention for vacuum is configured to be greater than 168 hours, got 167 hours"
== "Generic error: Invalid retention period, minimum retention for vacuum is configured to be greater than 168 hours, got 167 hours"
)


Expand Down
6 changes: 6 additions & 0 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ pub struct Add {
pub tags: Option<HashMap<String, Option<String>>>,
}

impl Hash for Add {
fn hash<H: Hasher>(&self, state: &mut H) {
self.path.hash(state);
}
}

impl Add {
/// Returns the Add action with path decoded.
pub fn path_decoded(self) -> Result<Self, ActionError> {
Expand Down
4 changes: 2 additions & 2 deletions rust/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl DeltaTableBuilder {
Ok(object_store)
}

/// Build the delta Table from specified options.
/// Build the [`DeltaTable`] from specified options.
///
/// This will not load the log, i.e. the table is not initialized. To get an initialized
/// table use the `load` function
Expand Down Expand Up @@ -258,7 +258,7 @@ impl DeltaTableBuilder {
Ok(DeltaTable::new(object_store, config))
}

/// finally load the table
/// Build the [`DeltaTable`] and load its state
pub async fn load(self) -> Result<DeltaTable, DeltaTableError> {
let version = self.options.version.clone();
let mut table = self.build()?;
Expand Down
Loading