From 5d32c56725184e3b716a837978cca22912f774fd Mon Sep 17 00:00:00 2001 From: David Blajda Date: Wed, 25 Jan 2023 21:26:28 -0500 Subject: [PATCH 01/13] Implement filesystem check --- rust/src/action/mod.rs | 7 +- rust/src/operations/filesystem_check.rs | 167 ++++++++++++++++++++++++ rust/src/operations/mod.rs | 8 ++ rust/src/test_utils.rs | 7 +- rust/tests/command_filesystem_check.rs | 77 +++++++++++ 5 files changed, 264 insertions(+), 2 deletions(-) create mode 100644 rust/src/operations/filesystem_check.rs create mode 100644 rust/tests/command_filesystem_check.rs diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index eab1fdff0b..787d15ae8e 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -490,7 +490,11 @@ pub enum DeltaOperation { predicate: Option, /// Target optimize size target_size: DeltaDataTypeLong, - }, // TODO: Add more operations + }, + #[serde(rename_all = "camelCase")] + /// Represents a `FileSystemCheck` operation + FileSystemCheck {}, + // TODO: Add more operations } impl DeltaOperation { @@ -502,6 +506,7 @@ impl DeltaOperation { DeltaOperation::Write { .. } => "delta-rs.Write", DeltaOperation::StreamingUpdate { .. } => "delta-rs.StreamingUpdate", DeltaOperation::Optimize { .. } => "delta-rs.Optimize", + DeltaOperation::FileSystemCheck { .. } => "delta-rs.FileSystemCheck", }; commit_info.insert( "operation".to_string(), diff --git a/rust/src/operations/filesystem_check.rs b/rust/src/operations/filesystem_check.rs new file mode 100644 index 0000000000..00d675f386 --- /dev/null +++ b/rust/src/operations/filesystem_check.rs @@ -0,0 +1,167 @@ +//! Audit the Delta Table for active files that do not exist in the underlying filesystem and remove them +//! +//! # Example +//! ```rust ignore +//! let mut table = open_table("../path/to/table")?; +//! let (table, metrics) = FileSystemCheckBuilder::new(table.object_store(). table.state).await?; +//! ```` +use crate::action::{Action, Add, DeltaOperation, Remove}; +use crate::operations::transaction::commit; +use crate::storage::DeltaObjectStore; +use crate::table_state::DeltaTableState; +use crate::DeltaDataTypeVersion; +use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError}; +use futures::future::BoxFuture; +pub use object_store::path::Path; +use object_store::Error as ObjectStoreError; +use object_store::ObjectStore; +use std::fmt::Debug; +use std::sync::Arc; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; + +/// Audit the Delta Table's active files with the underlying file system. +/// See this module's documentaiton for more information +#[derive(Debug)] +pub struct FileSystemCheckBuilder { + /// A snapshot of the to-be-checked table's state + state: DeltaTableState, + /// Delta object store for handling data files + store: Arc, + /// Don't remove actions to the table log. Just determine which files can be removed + dry_run: bool, +} + +/// Details of the FSCK operation including which files were removed from the log +#[derive(Debug)] +pub struct FileSystemCheckMetrics { + /// Was this a dry run + pub dry_run: bool, + /// Files that wrere removed successfully + pub files_removed: Vec, +} + +struct FileSystemCheckPlan { + /// Version of the snapshot provided + version: DeltaDataTypeVersion, + /// Delta object store for handling data files + store: Arc, + /// Files that no longer exists in undlying ObjectStore but have active add actions + pub files_to_remove: Vec, +} + +impl FileSystemCheckBuilder { + /// Create a new [`FileSystemCheckBuilder`] + pub fn new(store: Arc, state: DeltaTableState) -> Self { + FileSystemCheckBuilder { + state, + store, + dry_run: false, + } + } + + /// Only determine which add actions should be removed + pub fn with_dry_run(mut self, dry_run: bool) -> Self { + self.dry_run = dry_run; + self + } + + async fn create_fsck_plan(&self) -> DeltaResult { + let mut files_to_remove = Vec::new(); + let version = self.state.version(); + let store = self.store.clone(); + + for active in self.state.files() { + let res = self.store.head(&Path::from(active.path.as_str())).await; + if let Err(ObjectStoreError::NotFound { path: _, source: _ }) = res { + files_to_remove.push(active.to_owned()); + } else { + res.map_err(DeltaTableError::from)?; + } + } + + Ok(FileSystemCheckPlan { + files_to_remove, + version, + store, + }) + } +} + +impl FileSystemCheckPlan { + pub async fn execute(self) -> DeltaResult { + if self.files_to_remove.is_empty() { + return Ok(FileSystemCheckMetrics { + dry_run: false, + files_removed: Vec::new(), + }); + } + + let mut actions = Vec::new(); + let mut removed_file_paths = Vec::new(); + let version = self.version; + let store = &self.store; + + for file in self.files_to_remove { + let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + let deletion_time = deletion_time.as_millis() as DeltaDataTypeLong; + removed_file_paths.push(file.path.clone()); + actions.push(Action::remove(Remove { + path: file.path, + deletion_timestamp: Some(deletion_time), + data_change: true, + extended_file_metadata: None, + partition_values: Some(file.partition_values), + size: Some(file.size), + tags: file.tags, + })); + } + + if !actions.is_empty() { + commit( + store, + version + 1, + actions, + DeltaOperation::FileSystemCheck {}, + None, + ) + .await?; + } + + Ok(FileSystemCheckMetrics { + dry_run: false, + files_removed: removed_file_paths, + }) + } +} + +impl std::future::IntoFuture for FileSystemCheckBuilder { + type Output = DeltaResult<(DeltaTable, FileSystemCheckMetrics)>; + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + let this = self; + + Box::pin(async move { + let plan = this.create_fsck_plan().await?; + if this.dry_run { + return Ok(( + DeltaTable::new_with_state(this.store, this.state), + FileSystemCheckMetrics { + files_removed: plan + .files_to_remove + .iter() + .map(|f| f.path.clone()) + .collect(), + dry_run: true, + }, + )); + } + + let metrics = plan.execute().await?; + let mut table = DeltaTable::new_with_state(this.store, this.state); + table.update().await?; + Ok((table, metrics)) + }) + } +} diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index b49fd44b6d..212bf34414 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -8,11 +8,13 @@ //! if the operation returns data as well. use self::create::CreateBuilder; +use self::filesystem_check::FileSystemCheckBuilder; use self::vacuum::VacuumBuilder; use crate::builder::DeltaTableBuilder; use crate::{DeltaResult, DeltaTable, DeltaTableError}; pub mod create; +pub mod filesystem_check; pub mod transaction; pub mod vacuum; @@ -115,6 +117,12 @@ impl DeltaOps { pub fn vacuum(self) -> VacuumBuilder { VacuumBuilder::new(self.0.object_store(), self.0.state) } + + /// Audit active files with files present on the filesystem + #[must_use] + pub fn filesystem_check(self) -> FileSystemCheckBuilder { + FileSystemCheckBuilder::new(self.0.object_store(), self.0.state) + } } impl From for DeltaOps { diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index 639ec7dc22..bd41ee1b6e 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -53,7 +53,6 @@ impl IntegrationContext { StorageIntegration::Google => format!("gs://{}", &bucket), StorageIntegration::Local => format!("file://{}", &bucket), }; - // the "storage_backend" will always point to the root ofg the object store. // TODO should we provide the store via object_Store builders? let store = match integration { @@ -89,6 +88,12 @@ impl IntegrationContext { } } + pub fn table_builder(&self, table: TestTables) -> DeltaTableBuilder { + let name = table.as_name(); + let table_uri = format!("{}/{}", self.root_uri(), &name); + DeltaTableBuilder::from_uri(table_uri).with_allow_http(true) + } + pub fn uri_for_table(&self, table: TestTables) -> String { format!("{}/{}", self.root_uri(), table.as_name()) } diff --git a/rust/tests/command_filesystem_check.rs b/rust/tests/command_filesystem_check.rs new file mode 100644 index 0000000000..06a2e17a2a --- /dev/null +++ b/rust/tests/command_filesystem_check.rs @@ -0,0 +1,77 @@ +#![cfg(all(feature = "integration_test"))] + +use arrow::array::Int64Array; +use deltalake::test_utils::{IntegrationContext, StorageIntegration, TestResult, TestTables}; +use deltalake::DeltaOps; +use deltalake::Path; +use serial_test::serial; +use std::sync::Arc; + +mod common; + +#[tokio::test] +#[serial] +async fn test_filesystem_check_local() -> TestResult { + Ok(test_filesystem_check(StorageIntegration::Local).await?) +} + +#[cfg(any(feature = "s3", feature = "s3-rustls"))] +#[tokio::test] +#[serial] +async fn test_filesystem_check_aws() -> TestResult { + Ok(test_filesystem_check(StorageIntegration::Amazon).await?) +} + +#[cfg(feature = "azure")] +#[tokio::test] +#[serial] +async fn test_filesystem_check_azure() -> TestResult { + Ok(test_filesystem_check(StorageIntegration::Microsoft).await?) +} + +#[cfg(feature = "gcs")] +#[tokio::test] +#[serial] +async fn test_filesystem_check_gcp() -> TestResult { + Ok(test_filesystem_check(StorageIntegration::Google).await?) +} + +async fn test_filesystem_check(storage: StorageIntegration) -> TestResult { + let context = IntegrationContext::new(storage)?; + context.load_table(TestTables::Simple).await?; + let file = "part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet"; + let path = Path::from_iter([&TestTables::Simple.as_name(), file]); + + // Delete an active file from underlying storage without an update to the log to simulate an external fault + context.object_store().delete(&path).await?; + + let table = context.table_builder(TestTables::Simple).load().await?; + let version = table.state.version(); + let active = table.state.files().len(); + + // Validate a Dry run does not mutate the table log and indentifies orphaned add actions + let op = DeltaOps::from(table); + let (table, metrics) = op.filesystem_check().with_dry_run(true).await?; + assert_eq!(version, table.state.version()); + assert_eq!(active, table.state.files().len()); + assert_eq!(vec![file.to_string()], metrics.files_removed); + + // Validate a run updates the table version with proper remove actions + let op = DeltaOps::from(table); + let (table, metrics) = op.filesystem_check().await?; + assert_eq!(version + 1, table.state.version()); + assert_eq!(active - 1, table.state.files().len()); + assert_eq!(vec![file.to_string()], metrics.files_removed); + + let remove = table.state.all_tombstones().get(file).unwrap(); + assert_eq!(remove.data_change, true); + + // An additonal run should return an empty list of orphaned actions + let op = DeltaOps::from(table); + let (table, metrics) = op.filesystem_check().await?; + assert_eq!(version + 1, table.state.version()); + assert_eq!(active - 1, table.state.files().len()); + assert!(metrics.files_removed.is_empty()); + + Ok(()) +} From 07476026046ce86906facb567325bf686e3ca6b8 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Thu, 26 Jan 2023 20:20:30 -0500 Subject: [PATCH 02/13] Allow HTTP for AWS when useing test_utils --- rust/src/test_utils.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index bd41ee1b6e..fd389b5490 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -336,6 +336,7 @@ pub mod s3_cli { set_env_if_not_set("DYNAMO_LOCK_TABLE_NAME", "test_table"); set_env_if_not_set("DYNAMO_LOCK_REFRESH_PERIOD_MILLIS", "100"); set_env_if_not_set("DYNAMO_LOCK_ADDITIONAL_TIME_TO_WAIT_MILLIS", "100"); + set_env_if_not_set("AWS_STORAGE_ALLOW_HTTP", "TRUE"); } pub fn create_lock_table() -> std::io::Result { From e9446337ae19fe40e985d646207282c876039f43 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Thu, 26 Jan 2023 20:26:40 -0500 Subject: [PATCH 03/13] Remove unused imports --- rust/tests/command_filesystem_check.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/rust/tests/command_filesystem_check.rs b/rust/tests/command_filesystem_check.rs index 06a2e17a2a..099ecdba2c 100644 --- a/rust/tests/command_filesystem_check.rs +++ b/rust/tests/command_filesystem_check.rs @@ -1,11 +1,9 @@ #![cfg(all(feature = "integration_test"))] -use arrow::array::Int64Array; use deltalake::test_utils::{IntegrationContext, StorageIntegration, TestResult, TestTables}; use deltalake::DeltaOps; use deltalake::Path; use serial_test::serial; -use std::sync::Arc; mod common; From 18e7974e15bead18b5ff7e1de8e05eca11acce36 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Thu, 26 Jan 2023 21:57:03 -0500 Subject: [PATCH 04/13] Allow unsafe rename for s3 test --- rust/src/test_utils.rs | 4 ++-- rust/tests/command_filesystem_check.rs | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index fd389b5490..e104508a49 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -224,7 +224,8 @@ impl TestTables { } } -fn set_env_if_not_set(key: impl AsRef, value: impl AsRef) { +/// Set environment variable if it is not set +pub fn set_env_if_not_set(key: impl AsRef, value: impl AsRef) { if std::env::var(key.as_ref()).is_err() { std::env::set_var(key.as_ref(), value.as_ref()) }; @@ -336,7 +337,6 @@ pub mod s3_cli { set_env_if_not_set("DYNAMO_LOCK_TABLE_NAME", "test_table"); set_env_if_not_set("DYNAMO_LOCK_REFRESH_PERIOD_MILLIS", "100"); set_env_if_not_set("DYNAMO_LOCK_ADDITIONAL_TIME_TO_WAIT_MILLIS", "100"); - set_env_if_not_set("AWS_STORAGE_ALLOW_HTTP", "TRUE"); } pub fn create_lock_table() -> std::io::Result { diff --git a/rust/tests/command_filesystem_check.rs b/rust/tests/command_filesystem_check.rs index 099ecdba2c..67412d38cf 100644 --- a/rust/tests/command_filesystem_check.rs +++ b/rust/tests/command_filesystem_check.rs @@ -1,6 +1,8 @@ #![cfg(all(feature = "integration_test"))] -use deltalake::test_utils::{IntegrationContext, StorageIntegration, TestResult, TestTables}; +use deltalake::test_utils::{ + set_env_if_not_set, IntegrationContext, StorageIntegration, TestResult, TestTables, +}; use deltalake::DeltaOps; use deltalake::Path; use serial_test::serial; @@ -17,6 +19,7 @@ async fn test_filesystem_check_local() -> TestResult { #[tokio::test] #[serial] async fn test_filesystem_check_aws() -> TestResult { + set_env_if_not_set("AWS_S3_ALLOW_UNSAFE_RENAME", "true"); Ok(test_filesystem_check(StorageIntegration::Amazon).await?) } From e90ba5390967285d0169549640cd9a904fde1f8f Mon Sep 17 00:00:00 2001 From: David Blajda Date: Thu, 26 Jan 2023 22:15:19 -0500 Subject: [PATCH 05/13] Set lock client to none for s3 test --- rust/tests/command_filesystem_check.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/tests/command_filesystem_check.rs b/rust/tests/command_filesystem_check.rs index 67412d38cf..743bf41874 100644 --- a/rust/tests/command_filesystem_check.rs +++ b/rust/tests/command_filesystem_check.rs @@ -20,6 +20,7 @@ async fn test_filesystem_check_local() -> TestResult { #[serial] async fn test_filesystem_check_aws() -> TestResult { set_env_if_not_set("AWS_S3_ALLOW_UNSAFE_RENAME", "true"); + set_env_if_not_set("AWS_S3_LOCKING_PROVIDER", "none"); Ok(test_filesystem_check(StorageIntegration::Amazon).await?) } From d42eb78e9f2ae0f3789ea9d695a7af6705b4fc57 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sat, 28 Jan 2023 15:30:57 -0500 Subject: [PATCH 06/13] Reconcile by using list instead of head --- rust/src/operations/filesystem_check.rs | 25 ++++++++++++------- rust/tests/command_filesystem_check.rs | 33 +++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 9 deletions(-) diff --git a/rust/src/operations/filesystem_check.rs b/rust/src/operations/filesystem_check.rs index 00d675f386..a76e151d0e 100644 --- a/rust/src/operations/filesystem_check.rs +++ b/rust/src/operations/filesystem_check.rs @@ -10,11 +10,12 @@ use crate::operations::transaction::commit; use crate::storage::DeltaObjectStore; use crate::table_state::DeltaTableState; use crate::DeltaDataTypeVersion; -use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError}; +use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable}; use futures::future::BoxFuture; +use futures::StreamExt; pub use object_store::path::Path; -use object_store::Error as ObjectStoreError; use object_store::ObjectStore; +use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; use std::time::SystemTime; @@ -68,18 +69,24 @@ impl FileSystemCheckBuilder { async fn create_fsck_plan(&self) -> DeltaResult { let mut files_to_remove = Vec::new(); + let mut files_to_check: HashMap = HashMap::new(); let version = self.state.version(); let store = self.store.clone(); - for active in self.state.files() { - let res = self.store.head(&Path::from(active.path.as_str())).await; - if let Err(ObjectStoreError::NotFound { path: _, source: _ }) = res { - files_to_remove.push(active.to_owned()); - } else { - res.map_err(DeltaTableError::from)?; - } + self.state.files().iter().for_each(|active| { + files_to_check.insert(active.path.to_owned(), active); + }); + + let mut files = self.store.list(None).await?; + while let Some(result) = files.next().await { + let file = result?; + files_to_check.remove(file.location.as_ref()); } + files_to_check + .into_iter() + .for_each(|(_, file)| files_to_remove.push(file.to_owned())); + Ok(FileSystemCheckPlan { files_to_remove, version, diff --git a/rust/tests/command_filesystem_check.rs b/rust/tests/command_filesystem_check.rs index 743bf41874..1c45652544 100644 --- a/rust/tests/command_filesystem_check.rs +++ b/rust/tests/command_filesystem_check.rs @@ -77,3 +77,36 @@ async fn test_filesystem_check(storage: StorageIntegration) -> TestResult { Ok(()) } + +#[tokio::test] +#[serial] +async fn test_filesystem_check_partitioned() -> TestResult { + let storage = StorageIntegration::Local; + let context = IntegrationContext::new(storage)?; + context + .load_table(TestTables::Delta0_8_0Partitioned) + .await?; + let file = "year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet"; + let path = Path::parse(TestTables::Delta0_8_0Partitioned.as_name() + "/" + file).unwrap(); + + // Delete an active file from underlying storage without an update to the log to simulate an external fault + context.object_store().delete(&path).await?; + + let table = context + .table_builder(TestTables::Delta0_8_0Partitioned) + .load() + .await?; + let version = table.state.version(); + let active = table.state.files().len(); + + // Validate a run updates the table version with proper remove actions + let op = DeltaOps::from(table); + let (table, metrics) = op.filesystem_check().await?; + assert_eq!(version + 1, table.state.version()); + assert_eq!(active - 1, table.state.files().len()); + assert_eq!(vec![file.to_string()], metrics.files_removed); + + let remove = table.state.all_tombstones().get(file).unwrap(); + assert_eq!(remove.data_change, true); + Ok(()) +} From 48a460a96e251cb9bab398a715197463459b6839 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sat, 28 Jan 2023 15:40:00 -0500 Subject: [PATCH 07/13] Update test utils to include partitioned table --- rust/src/test_utils.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index e104508a49..13a350095c 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -191,6 +191,7 @@ pub enum TestTables { Simple, SimpleCommit, Golden, + Delta0_8_0Partitioned, Custom(String), } @@ -209,6 +210,11 @@ impl TestTables { .to_str() .unwrap() .to_owned(), + Self::Delta0_8_0Partitioned => data_path + .join("delta-0.8.0-partitioned") + .to_str() + .unwrap() + .to_owned(), // the data path for upload does not apply to custom tables. Self::Custom(_) => todo!(), } @@ -219,6 +225,7 @@ impl TestTables { Self::Simple => "simple".into(), Self::SimpleCommit => "simple_commit".into(), Self::Golden => "golden".into(), + Self::Delta0_8_0Partitioned => "delta-0.8.0-partitioned".into(), Self::Custom(name) => name.to_owned(), } } From 85886a572f6f9fb1244ecc046d4da0cde5b91d39 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sun, 29 Jan 2023 20:53:14 -0500 Subject: [PATCH 08/13] Apply suggestions from code review Add wjones127 suggestions Co-authored-by: Will Jones --- rust/src/operations/filesystem_check.rs | 28 ++++++++++++++----------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/rust/src/operations/filesystem_check.rs b/rust/src/operations/filesystem_check.rs index a76e151d0e..23e81ca0d4 100644 --- a/rust/src/operations/filesystem_check.rs +++ b/rust/src/operations/filesystem_check.rs @@ -1,5 +1,10 @@ -//! Audit the Delta Table for active files that do not exist in the underlying filesystem and remove them +//! Audit the Delta Table for active files that do not exist in the underlying filesystem and remove them. //! +//! Active files are ones that have an add action in the log, but no corresponding remove action. +//! This operation creates a new transaction containing a remove action for each of the missing files. +//! +//! This can be used to repair tables where a data file has been deleted accidentally or +//! purposefully, if the file was corrupted. //! # Example //! ```rust ignore //! let mut table = open_table("../path/to/table")?; @@ -69,23 +74,22 @@ impl FileSystemCheckBuilder { async fn create_fsck_plan(&self) -> DeltaResult { let mut files_to_remove = Vec::new(); - let mut files_to_check: HashMap = HashMap::new(); + let mut files_to_check: HashMap = self.state.files().iter() + .map(|active| (active.path.to_owned(), active)) + .collect(); let version = self.state.version(); let store = self.store.clone(); - self.state.files().iter().for_each(|active| { - files_to_check.insert(active.path.to_owned(), active); - }); - let mut files = self.store.list(None).await?; while let Some(result) = files.next().await { let file = result?; files_to_check.remove(file.location.as_ref()); } - files_to_check + let files_to_remove: Vec = files_to_check .into_iter() - .for_each(|(_, file)| files_to_remove.push(file.to_owned())); + .map(|(_, file)| file.to_owned()) + .collect(); Ok(FileSystemCheckPlan { files_to_remove, @@ -104,8 +108,8 @@ impl FileSystemCheckPlan { }); } - let mut actions = Vec::new(); - let mut removed_file_paths = Vec::new(); + let mut actions = Vec::with_capacity(self.files_to_remove.len()); + let mut removed_file_paths = Vec::with_capacity(self.files_to_remove.len()); let version = self.version; let store = &self.store; @@ -157,8 +161,8 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { FileSystemCheckMetrics { files_removed: plan .files_to_remove - .iter() - .map(|f| f.path.clone()) + .into_iter() + .map(|f| f.path) .collect(), dry_run: true, }, From 3dc4b6754008b80830df61d9724bcaae75e3d82d Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 29 Jan 2023 19:31:22 -0800 Subject: [PATCH 09/13] Update rust/src/operations/filesystem_check.rs --- rust/src/operations/filesystem_check.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/src/operations/filesystem_check.rs b/rust/src/operations/filesystem_check.rs index 23e81ca0d4..c4ac73111d 100644 --- a/rust/src/operations/filesystem_check.rs +++ b/rust/src/operations/filesystem_check.rs @@ -73,7 +73,6 @@ impl FileSystemCheckBuilder { } async fn create_fsck_plan(&self) -> DeltaResult { - let mut files_to_remove = Vec::new(); let mut files_to_check: HashMap = self.state.files().iter() .map(|active| (active.path.to_owned(), active)) .collect(); From 7cbe4fa16ed7521bbd2844a87164af629f530e55 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sun, 29 Jan 2023 22:58:38 -0500 Subject: [PATCH 10/13] Resolve merge conflicts --- rust/src/operations/filesystem_check.rs | 78 +++++++++++++++++-------- rust/tests/command_filesystem_check.rs | 25 ++++++++ 2 files changed, 78 insertions(+), 25 deletions(-) diff --git a/rust/src/operations/filesystem_check.rs b/rust/src/operations/filesystem_check.rs index c4ac73111d..8c643b7aac 100644 --- a/rust/src/operations/filesystem_check.rs +++ b/rust/src/operations/filesystem_check.rs @@ -3,7 +3,7 @@ //! Active files are ones that have an add action in the log, but no corresponding remove action. //! This operation creates a new transaction containing a remove action for each of the missing files. //! -//! This can be used to repair tables where a data file has been deleted accidentally or +//! This can be used to repair tables where a data file has been deleted accidentally or //! purposefully, if the file was corrupted. //! # Example //! ```rust ignore @@ -15,16 +15,18 @@ use crate::operations::transaction::commit; use crate::storage::DeltaObjectStore; use crate::table_state::DeltaTableState; use crate::DeltaDataTypeVersion; -use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable}; +use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError}; use futures::future::BoxFuture; use futures::StreamExt; pub use object_store::path::Path; +use object_store::Error as ObjectStoreError; use object_store::ObjectStore; use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; use std::time::SystemTime; use std::time::UNIX_EPOCH; +use url::Url; /// Audit the Delta Table's active files with the underlying file system. /// See this module's documentaiton for more information @@ -73,23 +75,55 @@ impl FileSystemCheckBuilder { } async fn create_fsck_plan(&self) -> DeltaResult { - let mut files_to_check: HashMap = self.state.files().iter() - .map(|active| (active.path.to_owned(), active)) - .collect(); + let mut files_relative: HashMap<&str, &Add> = + HashMap::with_capacity(self.state.files().len()); + let mut files_absolute: HashMap<&str, &Add> = + HashMap::with_capacity(self.state.files().len()); + + for active in self.state.files() { + let url = Url::parse(&active.path).map_err(|_| { + DeltaTableError::Generic(format!( + "Unable to parse path from Delta log: {}", + &active.path + )) + })?; + if url.scheme().len() == 0 { + files_relative.insert(active.path.as_str(), active); + } else { + files_absolute.insert(active.path.as_str(), active); + } + } let version = self.state.version(); let store = self.store.clone(); - let mut files = self.store.list(None).await?; - while let Some(result) = files.next().await { - let file = result?; - files_to_check.remove(file.location.as_ref()); + if !files_relative.is_empty() { + let mut files = self.store.list(None).await?; + while let Some(result) = files.next().await { + let file = result?; + files_relative.remove(file.location.as_ref()); + + if files_relative.is_empty() { + break; + } + } } - let files_to_remove: Vec = files_to_check + let mut files_to_remove: Vec = files_relative .into_iter() .map(|(_, file)| file.to_owned()) .collect(); + if !files_absolute.is_empty() { + for (_, file) in files_absolute { + let res = self.store.head(&Path::from(file.path.as_str())).await; + if let Err(ObjectStoreError::NotFound { .. }) = res { + files_to_remove.push(file.to_owned()); + } else { + res.map_err(DeltaTableError::from)?; + } + } + } + Ok(FileSystemCheckPlan { files_to_remove, version, @@ -127,16 +161,14 @@ impl FileSystemCheckPlan { })); } - if !actions.is_empty() { - commit( - store, - version + 1, - actions, - DeltaOperation::FileSystemCheck {}, - None, - ) - .await?; - } + commit( + store, + version + 1, + actions, + DeltaOperation::FileSystemCheck {}, + None, + ) + .await?; Ok(FileSystemCheckMetrics { dry_run: false, @@ -158,11 +190,7 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { return Ok(( DeltaTable::new_with_state(this.store, this.state), FileSystemCheckMetrics { - files_removed: plan - .files_to_remove - .into_iter() - .map(|f| f.path) - .collect(), + files_removed: plan.files_to_remove.into_iter().map(|f| f.path).collect(), dry_run: true, }, )); diff --git a/rust/tests/command_filesystem_check.rs b/rust/tests/command_filesystem_check.rs index 1c45652544..0df84013c7 100644 --- a/rust/tests/command_filesystem_check.rs +++ b/rust/tests/command_filesystem_check.rs @@ -110,3 +110,28 @@ async fn test_filesystem_check_partitioned() -> TestResult { assert_eq!(remove.data_change, true); Ok(()) } + +#[tokio::test] +#[serial] +async fn test_filesystem_check_outdated() -> TestResult { + // Validate failure when a non dry only executes on the latest version + let context = IntegrationContext::new(StorageIntegration::Local)?; + context.load_table(TestTables::Simple).await?; + let file = "part-00003-53f42606-6cda-4f13-8d07-599a21197296-c000.snappy.parquet"; + let path = Path::from_iter([&TestTables::Simple.as_name(), file]); + + // Delete an active file from underlying storage without an update to the log to simulate an external fault + context.object_store().delete(&path).await?; + + let table = context + .table_builder(TestTables::Simple) + .with_version(2) + .load() + .await?; + + let op = DeltaOps::from(table); + let res = op.filesystem_check().with_dry_run(false).await; + assert!(res.is_err()); + + Ok(()) +} From 558e89090ee192a9ee2dc084f79cf0f1350f0c58 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sun, 29 Jan 2023 23:02:21 -0500 Subject: [PATCH 11/13] Resolve clippy errors --- rust/src/operations/filesystem_check.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/src/operations/filesystem_check.rs b/rust/src/operations/filesystem_check.rs index 8c643b7aac..70016087ec 100644 --- a/rust/src/operations/filesystem_check.rs +++ b/rust/src/operations/filesystem_check.rs @@ -87,7 +87,7 @@ impl FileSystemCheckBuilder { &active.path )) })?; - if url.scheme().len() == 0 { + if url.scheme().is_empty() { files_relative.insert(active.path.as_str(), active); } else { files_absolute.insert(active.path.as_str(), active); From 46841cc1adbb61ef5ad54ced1d6171e58c773208 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sun, 29 Jan 2023 23:10:44 -0500 Subject: [PATCH 12/13] Resolve clippy errors 2 --- rust/src/operations/filesystem_check.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/src/operations/filesystem_check.rs b/rust/src/operations/filesystem_check.rs index 70016087ec..80115ae3c5 100644 --- a/rust/src/operations/filesystem_check.rs +++ b/rust/src/operations/filesystem_check.rs @@ -109,8 +109,8 @@ impl FileSystemCheckBuilder { } let mut files_to_remove: Vec = files_relative - .into_iter() - .map(|(_, file)| file.to_owned()) + .into_values() + .map(|file| file.to_owned()) .collect(); if !files_absolute.is_empty() { From 9d19ef355277beee1323fbf87118fa208eb1d6d6 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Tue, 31 Jan 2023 21:34:53 -0500 Subject: [PATCH 13/13] Error on existence of absolute paths --- rust/src/operations/filesystem_check.rs | 90 +++++++++++++++---------- rust/tests/command_filesystem_check.rs | 9 ++- 2 files changed, 60 insertions(+), 39 deletions(-) diff --git a/rust/src/operations/filesystem_check.rs b/rust/src/operations/filesystem_check.rs index 80115ae3c5..279748c19d 100644 --- a/rust/src/operations/filesystem_check.rs +++ b/rust/src/operations/filesystem_check.rs @@ -5,10 +5,11 @@ //! //! This can be used to repair tables where a data file has been deleted accidentally or //! purposefully, if the file was corrupted. +//! //! # Example //! ```rust ignore //! let mut table = open_table("../path/to/table")?; -//! let (table, metrics) = FileSystemCheckBuilder::new(table.object_store(). table.state).await?; +//! let (table, metrics) = FileSystemCheckBuilder::new(table.object_store(), table.state).await?; //! ```` use crate::action::{Action, Add, DeltaOperation, Remove}; use crate::operations::transaction::commit; @@ -19,14 +20,13 @@ use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError}; use futures::future::BoxFuture; use futures::StreamExt; pub use object_store::path::Path; -use object_store::Error as ObjectStoreError; use object_store::ObjectStore; use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; use std::time::SystemTime; use std::time::UNIX_EPOCH; -use url::Url; +use url::{ParseError, Url}; /// Audit the Delta Table's active files with the underlying file system. /// See this module's documentaiton for more information @@ -58,6 +58,17 @@ struct FileSystemCheckPlan { pub files_to_remove: Vec, } +fn is_absolute_path(path: &str) -> DeltaResult { + match Url::parse(path) { + Ok(_) => Ok(true), + Err(ParseError::RelativeUrlWithoutBase) => Ok(false), + Err(_) => Err(DeltaTableError::Generic(format!( + "Unable to parse path: {}", + &path + ))), + } +} + impl FileSystemCheckBuilder { /// Create a new [`FileSystemCheckBuilder`] pub fn new(store: Arc, state: DeltaTableState) -> Self { @@ -68,7 +79,7 @@ impl FileSystemCheckBuilder { } } - /// Only determine which add actions should be removed + /// Only determine which add actions should be removed. A dry run will not commit actions to the Delta log pub fn with_dry_run(mut self, dry_run: bool) -> Self { self.dry_run = dry_run; self @@ -77,53 +88,34 @@ impl FileSystemCheckBuilder { async fn create_fsck_plan(&self) -> DeltaResult { let mut files_relative: HashMap<&str, &Add> = HashMap::with_capacity(self.state.files().len()); - let mut files_absolute: HashMap<&str, &Add> = - HashMap::with_capacity(self.state.files().len()); + let version = self.state.version(); + let store = self.store.clone(); for active in self.state.files() { - let url = Url::parse(&active.path).map_err(|_| { - DeltaTableError::Generic(format!( - "Unable to parse path from Delta log: {}", - &active.path - )) - })?; - if url.scheme().is_empty() { - files_relative.insert(active.path.as_str(), active); + if is_absolute_path(&active.path)? { + return Err(DeltaTableError::Generic( + "Filesystem check does not support absolute paths".to_string(), + )); } else { - files_absolute.insert(active.path.as_str(), active); + files_relative.insert(&active.path, active); } } - let version = self.state.version(); - let store = self.store.clone(); - if !files_relative.is_empty() { - let mut files = self.store.list(None).await?; - while let Some(result) = files.next().await { - let file = result?; - files_relative.remove(file.location.as_ref()); + let mut files = self.store.list(None).await?; + while let Some(result) = files.next().await { + let file = result?; + files_relative.remove(file.location.as_ref()); - if files_relative.is_empty() { - break; - } + if files_relative.is_empty() { + break; } } - let mut files_to_remove: Vec = files_relative + let files_to_remove: Vec = files_relative .into_values() .map(|file| file.to_owned()) .collect(); - if !files_absolute.is_empty() { - for (_, file) in files_absolute { - let res = self.store.head(&Path::from(file.path.as_str())).await; - if let Err(ObjectStoreError::NotFound { .. }) = res { - files_to_remove.push(file.to_owned()); - } else { - res.map_err(DeltaTableError::from)?; - } - } - } - Ok(FileSystemCheckPlan { files_to_remove, version, @@ -203,3 +195,27 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { }) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn absolute_path() { + assert!(!is_absolute_path( + "part-00003-53f42606-6cda-4f13-8d07-599a21197296-c000.snappy.parquet" + ) + .unwrap()); + assert!(!is_absolute_path( + "x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet" + ) + .unwrap()); + + assert!(is_absolute_path("abfss://container@account_name.blob.core.windows.net/full/part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet").unwrap()); + assert!(is_absolute_path("file:///C:/my_table/windows.parquet").unwrap()); + assert!(is_absolute_path("file:///home/my_table/unix.parquet").unwrap()); + assert!(is_absolute_path("s3://container/path/file.parquet").unwrap()); + assert!(is_absolute_path("gs://container/path/file.parquet").unwrap()); + assert!(is_absolute_path("scheme://table/file.parquet").unwrap()); + } +} diff --git a/rust/tests/command_filesystem_check.rs b/rust/tests/command_filesystem_check.rs index 0df84013c7..602371f99f 100644 --- a/rust/tests/command_filesystem_check.rs +++ b/rust/tests/command_filesystem_check.rs @@ -3,8 +3,8 @@ use deltalake::test_utils::{ set_env_if_not_set, IntegrationContext, StorageIntegration, TestResult, TestTables, }; -use deltalake::DeltaOps; use deltalake::Path; +use deltalake::{DeltaOps, DeltaTableError}; use serial_test::serial; mod common; @@ -131,7 +131,12 @@ async fn test_filesystem_check_outdated() -> TestResult { let op = DeltaOps::from(table); let res = op.filesystem_check().with_dry_run(false).await; - assert!(res.is_err()); + + if let Err(DeltaTableError::VersionAlreadyExists(version)) = res { + assert!(version == 3); + } else { + assert!(false); + } Ok(()) }