Skip to content

Commit

Permalink
Implement filesystem check
Browse files Browse the repository at this point in the history
  • Loading branch information
Blajda committed Jan 26, 2023
1 parent 750f400 commit 877c4c6
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 2 deletions.
7 changes: 6 additions & 1 deletion rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,11 @@ pub enum DeltaOperation {
predicate: Option<String>,
/// Target optimize size
target_size: DeltaDataTypeLong,
}, // TODO: Add more operations
},
#[serde(rename_all = "camelCase")]
/// Represents a `FileSystemCheck` operation
FileSystemCheck {},
// TODO: Add more operations
}

impl DeltaOperation {
Expand All @@ -502,6 +506,7 @@ impl DeltaOperation {
DeltaOperation::Write { .. } => "delta-rs.Write",
DeltaOperation::StreamingUpdate { .. } => "delta-rs.StreamingUpdate",
DeltaOperation::Optimize { .. } => "delta-rs.Optimize",
DeltaOperation::FileSystemCheck { .. } => "delta-rs.FileSystemCheck",
};
commit_info.insert(
"operation".to_string(),
Expand Down
167 changes: 167 additions & 0 deletions rust/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
//! Audit the Delta Table for active files that do not exist in the underlying filesystem and remove them
//!
//! # Example
//! ```rust ignore
//! let mut table = open_table("../path/to/table")?;
//! let (table, metrics) = FileSystemCheckBuilder::new(table.object_store(). table.state).await?;
//! ````
use crate::action::{Action, Add, DeltaOperation, Remove};
use crate::operations::transaction::commit;
use crate::storage::DeltaObjectStore;
use crate::table_state::DeltaTableState;
use crate::DeltaDataTypeVersion;
use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError};
use futures::future::BoxFuture;
pub use object_store::path::Path;
use object_store::Error as ObjectStoreError;
use object_store::ObjectStore;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

/// Audit the Delta Table's active files with the underlying file system.
/// See this module's documentaiton for more information
#[derive(Debug)]
pub struct FileSystemCheckBuilder {
/// A snapshot of the to-be-checked table's state
state: DeltaTableState,
/// Delta object store for handling data files
store: Arc<DeltaObjectStore>,
/// Don't remove actions to the table log. Just determine which files can be removed
dry_run: bool,
}

/// Details of the FSCK operation including which files were removed from the log
#[derive(Debug)]
pub struct FileSystemCheckMetrics {
/// Was this a dry run
pub dry_run: bool,
/// Files that wrere removed successfully
pub files_removed: Vec<String>,
}

struct FileSystemCheckPlan {
/// Version of the snapshot provided
version: DeltaDataTypeVersion,
/// Delta object store for handling data files
store: Arc<DeltaObjectStore>,
/// Files that no longer exists in undlying ObjectStore but have active add actions
pub files_to_remove: Vec<Add>,
}

impl FileSystemCheckBuilder {
/// Create a new [`VacuumBuilder`]
pub fn new(store: Arc<DeltaObjectStore>, state: DeltaTableState) -> Self {
FileSystemCheckBuilder {
state,
store,
dry_run: false,
}
}

/// Only determine which add actions should be removed
pub fn with_dry_run(mut self, dry_run: bool) -> Self {
self.dry_run = dry_run;
self
}

async fn create_fsck_plan(&self) -> DeltaResult<FileSystemCheckPlan> {
let mut files_to_remove = Vec::new();
let version = self.state.version();
let store = self.store.clone();

for active in self.state.files() {
let res = self.store.head(&Path::from(active.path.as_str())).await;
if let Err(ObjectStoreError::NotFound { path: _, source: _ }) = res {
files_to_remove.push(active.to_owned());
} else {
res.map_err(DeltaTableError::from)?;
}
}

Ok(FileSystemCheckPlan {
files_to_remove,
version,
store,
})
}
}

impl FileSystemCheckPlan {
pub async fn execute(self) -> DeltaResult<FileSystemCheckMetrics> {
if self.files_to_remove.is_empty() {
return Ok(FileSystemCheckMetrics {
dry_run: false,
files_removed: Vec::new(),
});
}

let mut actions = Vec::new();
let mut removed_file_paths = Vec::new();
let version = self.version;
let store = &self.store;

for file in self.files_to_remove {
let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let deletion_time = deletion_time.as_millis() as DeltaDataTypeLong;
removed_file_paths.push(file.path.clone());
actions.push(Action::remove(Remove {
path: file.path,
deletion_timestamp: Some(deletion_time),
data_change: true,
extended_file_metadata: None,
partition_values: Some(file.partition_values),
size: Some(file.size),
tags: file.tags,
}));
}

if !actions.is_empty() {
commit(
store,
version + 1,
actions,
DeltaOperation::FileSystemCheck {},
None,
)
.await?;
}

Ok(FileSystemCheckMetrics {
dry_run: false,
files_removed: removed_file_paths,
})
}
}

impl std::future::IntoFuture for FileSystemCheckBuilder {
type Output = DeltaResult<(DeltaTable, FileSystemCheckMetrics)>;
type IntoFuture = BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
let this = self;

Box::pin(async move {
let plan = this.create_fsck_plan().await?;
if this.dry_run {
return Ok((
DeltaTable::new_with_state(this.store, this.state),
FileSystemCheckMetrics {
files_removed: plan
.files_to_remove
.iter()
.map(|f| f.path.clone())
.collect(),
dry_run: true,
},
));
}

let metrics = plan.execute().await?;
let mut table = DeltaTable::new_with_state(this.store, this.state);
table.update().await?;
Ok((table, metrics))
})
}
}
8 changes: 8 additions & 0 deletions rust/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
//! if the operation returns data as well.
use self::create::CreateBuilder;
use self::filesystem_check::FileSystemCheckBuilder;
use self::vacuum::VacuumBuilder;
use crate::builder::DeltaTableBuilder;
use crate::{DeltaResult, DeltaTable, DeltaTableError};

pub mod create;
pub mod filesystem_check;
pub mod transaction;
pub mod vacuum;

Expand Down Expand Up @@ -115,6 +117,12 @@ impl DeltaOps {
pub fn vacuum(self) -> VacuumBuilder {
VacuumBuilder::new(self.0.object_store(), self.0.state)
}

/// Audit active files with files present on the filesystem
#[must_use]
pub fn filesystem_check(self) -> FileSystemCheckBuilder {
FileSystemCheckBuilder::new(self.0.object_store(), self.0.state)
}
}

impl From<DeltaTable> for DeltaOps {
Expand Down
7 changes: 6 additions & 1 deletion rust/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ impl IntegrationContext {
StorageIntegration::Google => format!("gs://{}", &bucket),
StorageIntegration::Local => format!("file://{}", &bucket),
};

// the "storage_backend" will always point to the root ofg the object store.
// TODO should we provide the store via object_Store builders?
let store = match integration {
Expand Down Expand Up @@ -89,6 +88,12 @@ impl IntegrationContext {
}
}

pub fn table_builder(&self, table: TestTables) -> DeltaTableBuilder {
let name = table.as_name();
let table_uri = format!("{}/{}", self.root_uri(), &name);
DeltaTableBuilder::from_uri(table_uri).with_allow_http(true)
}

pub fn uri_for_table(&self, table: TestTables) -> String {
format!("{}/{}", self.root_uri(), table.as_name())
}
Expand Down
77 changes: 77 additions & 0 deletions rust/tests/command_filesystem_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#![cfg(all(feature = "integration_test"))]

use arrow::array::Int64Array;
use deltalake::test_utils::{IntegrationContext, StorageIntegration, TestResult, TestTables};
use deltalake::DeltaOps;
use deltalake::Path;
use serial_test::serial;
use std::sync::Arc;

mod common;

#[tokio::test]
#[serial]
async fn test_filesystem_check_local() -> TestResult {
Ok(test_filesystem_check(StorageIntegration::Local).await?)
}

#[cfg(any(feature = "s3", feature = "s3-rustls"))]
#[tokio::test]
#[serial]
async fn test_filesystem_check_aws() -> TestResult {
Ok(test_filesystem_check(StorageIntegration::Amazon).await?)
}

#[cfg(feature = "azure")]
#[tokio::test]
#[serial]
async fn test_filesystem_check_azure() -> TestResult {
Ok(test_filesystem_check(StorageIntegration::Microsoft).await?)
}

#[cfg(feature = "gcs")]
#[tokio::test]
#[serial]
async fn test_filesystem_check_gcp() -> TestResult {
Ok(test_filesystem_check(StorageIntegration::Google).await?)
}

async fn test_filesystem_check(storage: StorageIntegration) -> TestResult {
let context = IntegrationContext::new(storage)?;
context.load_table(TestTables::Simple).await?;
let file = "part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet";
let path = Path::from_iter([&TestTables::Simple.as_name(), file]);

// Delete an active file from underlying storage without an update to the log to simulate an external fault
context.object_store().delete(&path).await?;

let table = context.table_builder(TestTables::Simple).load().await?;
let version = table.state.version();
let active = table.state.files().len();

// Validate a Dry run does not mutate the table log and indentifies orphaned add actions
let op = DeltaOps::from(table);
let (table, metrics) = op.filesystem_check().with_dry_run(true).await?;
assert_eq!(version, table.state.version());
assert_eq!(active, table.state.files().len());
assert_eq!(vec![file.to_string()], metrics.files_removed);

// Validate a run updates the table version with proper remove actions
let op = DeltaOps::from(table);
let (table, metrics) = op.filesystem_check().await?;
assert_eq!(version + 1, table.state.version());
assert_eq!(active - 1, table.state.files().len());
assert_eq!(vec![file.to_string()], metrics.files_removed);

let remove = table.state.all_tombstones().get(file).unwrap();
assert_eq!(remove.data_change, true);

// An additonal run should return an empty list of orphaned actions
let op = DeltaOps::from(table);
let (table, metrics) = op.filesystem_check().await?;
assert_eq!(version + 1, table.state.version());
assert_eq!(active - 1, table.state.files().len());
assert!(metrics.files_removed.is_empty());

Ok(())
}

0 comments on commit 877c4c6

Please sign in to comment.