diff --git a/python/deltalake/table.py b/python/deltalake/table.py index d9fee36bf4..0f7f6ab9d1 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -2,6 +2,7 @@ import operator import warnings from dataclasses import dataclass +from datetime import datetime from functools import reduce from pathlib import Path from typing import ( @@ -454,6 +455,35 @@ def pyarrow_schema(self) -> pyarrow.Schema: ) return self.schema().to_pyarrow() + def restore( + self, + target: Union[int, datetime, str], + *, + ignore_missing_files: bool = False, + protocol_downgrade_allowed: bool = False, + ) -> Dict[str, Any]: + """ + Run the Restore command on the Delta Table: restore table to a given version or datetime. + + :param target: the expected version will restore, which represented by int, date str or datetime. + :param ignore_missing_files: whether the operation carry on when some data files missing. + :param protocol_downgrade_allowed: whether the operation when protocol version upgraded. + :return: the metrics from restore. + """ + if isinstance(target, datetime): + metrics = self._table.restore( + target.isoformat(), + ignore_missing_files=ignore_missing_files, + protocol_downgrade_allowed=protocol_downgrade_allowed, + ) + else: + metrics = self._table.restore( + target, + ignore_missing_files=ignore_missing_files, + protocol_downgrade_allowed=protocol_downgrade_allowed, + ) + return json.loads(metrics) + def to_pyarrow_dataset( self, partitions: Optional[List[Tuple[str, str, Any]]] = None, diff --git a/python/docs/source/conf.py b/python/docs/source/conf.py index 692dc758fe..72a6cd929d 100644 --- a/python/docs/source/conf.py +++ b/python/docs/source/conf.py @@ -65,6 +65,7 @@ def get_release_version() -> str: ("py:class", "pandas.DataFrame"), ("py:class", "pyarrow._dataset_parquet.ParquetFileWriteOptions"), ("py:class", "pathlib.Path"), + ("py:class", "datetime.datetime"), ] # Add any paths that contain templates here, relative to this directory. diff --git a/python/docs/source/usage.rst b/python/docs/source/usage.rst index fdecfdc1a1..5dc5a0959e 100644 --- a/python/docs/source/usage.rst +++ b/python/docs/source/usage.rst @@ -508,3 +508,24 @@ the method will raise an error. This method could also be used to insert a new partition if one doesn't already exist, making this operation idempotent. + + +Restoring tables +~~~~~~~~~~~~~~~~ + +.. py:currentmodule:: deltalake.table + +Restoring a table will restore delta table to a specified version or datetime. This +operation compares the current state of the delta table with the state to be restored. +And add those missing files into the AddFile actions and add redundant files into +RemoveFile actions. Then commit into a new version. + + +Use :meth:`DeltaTable.restore` to perform the restore operation. Note that if any other +concurrent operation was performed on the table, restore will fail. + +.. code-block:: python + + >>> dt = DeltaTable("../rust/tests/data/simple_table") + >>> dt.restore(1) + {'numRemovedFile': 5, 'numRestoredFile': 22} diff --git a/python/src/lib.rs b/python/src/lib.rs index 38647f2279..5ed69ecd01 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -25,6 +25,7 @@ use deltalake::datafusion::prelude::SessionContext; use deltalake::delta_datafusion::DeltaDataChecker; use deltalake::errors::DeltaTableError; use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType}; +use deltalake::operations::restore::RestoreBuilder; use deltalake::operations::transaction::commit; use deltalake::operations::vacuum::VacuumBuilder; use deltalake::partitions::PartitionFilter; @@ -316,6 +317,37 @@ impl RawDeltaTable { Ok(serde_json::to_string(&metrics).unwrap()) } + // Run the restore command on the Delta Table: restore table to a given version or datetime + #[pyo3(signature = (target, *, ignore_missing_files = false, protocol_downgrade_allowed = false))] + pub fn restore( + &mut self, + target: Option<&PyAny>, + ignore_missing_files: bool, + protocol_downgrade_allowed: bool, + ) -> PyResult { + let mut cmd = RestoreBuilder::new(self._table.object_store(), self._table.state.clone()); + if let Some(val) = target { + if let Ok(version) = val.extract::() { + cmd = cmd.with_version_to_restore(version) + } + if let Ok(ds) = val.extract::<&str>() { + let datetime = DateTime::::from( + DateTime::::parse_from_rfc3339(ds).map_err(|err| { + PyValueError::new_err(format!("Failed to parse datetime string: {err}")) + })?, + ); + cmd = cmd.with_datetime_to_restore(datetime) + } + } + cmd = cmd.with_ignore_missing_files(ignore_missing_files); + cmd = cmd.with_protocol_downgrade_allowed(protocol_downgrade_allowed); + let (table, metrics) = rt()? + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + self._table.state = table.state; + Ok(serde_json::to_string(&metrics).unwrap()) + } + /// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table. pub fn history(&mut self, limit: Option) -> PyResult> { let history = rt()? diff --git a/python/tests/test_restore.py b/python/tests/test_restore.py new file mode 100644 index 0000000000..d877b97c40 --- /dev/null +++ b/python/tests/test_restore.py @@ -0,0 +1,79 @@ +import datetime +import pathlib + +import pyarrow as pa +import pytest + +from deltalake import DeltaTable, write_deltalake + + +@pytest.mark.parametrize("use_relative", [True, False]) +def test_restore_with_version( + tmp_path: pathlib.Path, sample_data: pa.Table, monkeypatch, use_relative: bool +): + if use_relative: + monkeypatch.chdir(tmp_path) # Make tmp_path the working directory + (tmp_path / "path/to/table").mkdir(parents=True) + table_path = "./path/to/table" + else: + table_path = str(tmp_path) + + write_deltalake(table_path, sample_data, mode="append") + write_deltalake(table_path, sample_data, mode="append") + write_deltalake(table_path, sample_data, mode="append") + + dt = DeltaTable(table_path) + old_version = dt.version() + dt.restore(1) + last_action = dt.history(1)[0] + assert last_action["operation"] == "RESTORE" + assert dt.version() == old_version + 1 + + +@pytest.mark.parametrize("use_relative", [True, False]) +def test_restore_with_datetime_str( + tmp_path: pathlib.Path, sample_data: pa.Table, monkeypatch, use_relative: bool +): + if use_relative: + monkeypatch.chdir(tmp_path) # Make tmp_path the working directory + (tmp_path / "path/to/table").mkdir(parents=True) + table_path = "./path/to/table" + else: + table_path = str(tmp_path) + + write_deltalake(table_path, sample_data, mode="append") + write_deltalake(table_path, sample_data, mode="append") + write_deltalake(table_path, sample_data, mode="append") + + dt = DeltaTable(table_path) + old_version = dt.version() + dt.restore("2020-05-01T00:47:31-07:00") + last_action = dt.history(1)[0] + assert last_action["operation"] == "RESTORE" + assert dt.version() == old_version + 1 + + +@pytest.mark.parametrize("use_relative", [True, False]) +def test_restore_with_datetime( + tmp_path: pathlib.Path, sample_data: pa.Table, monkeypatch, use_relative: bool +): + if use_relative: + monkeypatch.chdir(tmp_path) # Make tmp_path the working directory + (tmp_path / "path/to/table").mkdir(parents=True) + table_path = "./path/to/table" + else: + table_path = str(tmp_path) + + write_deltalake(table_path, sample_data, mode="append") + write_deltalake(table_path, sample_data, mode="append") + write_deltalake(table_path, sample_data, mode="append") + + dt = DeltaTable(table_path) + old_version = dt.version() + date = datetime.datetime.strptime( + "2023-04-26T21:23:32+08:00", "%Y-%m-%dT%H:%M:%S%z" + ) + dt.restore(date) + last_action = dt.history(1)[0] + assert last_action["operation"] == "RESTORE" + assert dt.version() == old_version + 1 diff --git a/rust/src/operations/restore.rs b/rust/src/operations/restore.rs index d1e8a378aa..77bcaa492a 100644 --- a/rust/src/operations/restore.rs +++ b/rust/src/operations/restore.rs @@ -28,6 +28,7 @@ use chrono::{DateTime, Utc}; use futures::future::BoxFuture; use object_store::path::Path; use object_store::ObjectStore; +use serde::Serialize; use crate::action::{Action, Add, DeltaOperation, Remove}; use crate::operations::transaction::{prepare_commit, try_commit_transaction, TransactionError}; @@ -57,7 +58,8 @@ impl From for DeltaTableError { } /// Metrics from Restore -#[derive(Default, Debug)] +#[derive(Default, Debug, Serialize)] +#[serde(rename_all = "camelCase")] pub struct RestoreMetrics { /// Number of files removed pub num_removed_file: usize, @@ -109,13 +111,13 @@ impl RestoreBuilder { /// Set whether to ignore missing files which delete manually or by vacuum. /// If true, continue to run when encountering missing files. - pub fn ignore_missing_files(mut self, ignore_missing_files: bool) -> Self { + pub fn with_ignore_missing_files(mut self, ignore_missing_files: bool) -> Self { self.ignore_missing_files = ignore_missing_files; self } /// Set whether allow to downgrade protocol - pub fn protocol_downgrade_allowed(mut self, protocol_downgrade_allowed: bool) -> Self { + pub fn with_protocol_downgrade_allowed(mut self, protocol_downgrade_allowed: bool) -> Self { self.protocol_downgrade_allowed = protocol_downgrade_allowed; self } diff --git a/rust/tests/command_restore.rs b/rust/tests/command_restore.rs index 4902d85a28..7a54f7e3a3 100644 --- a/rust/tests/command_restore.rs +++ b/rust/tests/command_restore.rs @@ -194,7 +194,7 @@ async fn test_restore_allow_file_missing() -> Result<(), Box> { let result = DeltaOps(context.table) .restore() - .ignore_missing_files(true) + .with_ignore_missing_files(true) .with_version_to_restore(1) .await; assert!(result.is_ok());