diff --git a/rust/src/delta_config.rs b/rust/src/delta_config.rs index 4540dc587e..df0a13b455 100644 --- a/rust/src/delta_config.rs +++ b/rust/src/delta_config.rs @@ -1,31 +1,165 @@ //! Delta Table configuration +use std::time::Duration; +use std::{collections::HashMap, str::FromStr}; -use crate::{DeltaDataTypeInt, DeltaDataTypeLong, DeltaTableMetaData}; use lazy_static::lazy_static; -use std::time::Duration; +use serde::{Deserialize, Serialize}; + +use crate::DeltaTableError; + +/// Typed property keys that can be defined on a delta table +/// +/// +pub enum DeltaConfigKey { + /// true for this Delta table to be append-only. If append-only, + /// existing records cannot be deleted, and existing values cannot be updated. + AppendOnly, + + /// true for Delta Lake to automatically optimize the layout of the files for this Delta table. + AutoOptimizeAutoCompact, + + /// true for Delta Lake to automatically optimize the layout of the files for this Delta table during writes. + AutoOptimizeOptimizeWrite, + + /// Interval (number of commits) after which a new checkpoint should be created + CheckpointInterval, + + /// true for Delta Lake to write file statistics in checkpoints in JSON format for the stats column. + CheckpointWriteStatsAsJson, + + /// true for Delta Lake to write file statistics to checkpoints in struct format for the + /// stats_parsed column and to write partition values as a struct for partitionValues_parsed. + CheckpointWriteStatsAsStruct, + + /// Whether column mapping is enabled for Delta table columns and the corresponding + /// Parquet columns that use different names. + ColumnMappingMode, + + /// The number of columns for Delta Lake to collect statistics about for data skipping. + /// A value of -1 means to collect statistics for all columns. Updating this property does + /// not automatically collect statistics again; instead, it redefines the statistics schema + /// of the Delta table. Specifically, it changes the behavior of future statistics collection + /// (such as during appends and optimizations) as well as data skipping (such as ignoring column + /// statistics beyond this number, even when such statistics exist). + DataSkippingNumIndexedCols, + + /// The shortest duration for Delta Lake to keep logically deleted data files before deleting + /// them physically. This is to prevent failures in stale readers after compactions or partition overwrites. + /// + /// This value should be large enough to ensure that: + /// + /// * It is larger than the longest possible duration of a job if you run VACUUM when there are + /// concurrent readers or writers accessing the Delta table. + /// * If you run a streaming query that reads from the table, that query does not stop for longer + /// than this value. Otherwise, the query may not be able to restart, as it must still read old files. + DeletedFileRetentionDuration, + + /// true to enable change data feed. + EnableChangeDataFeed, + + /// The degree to which a transaction must be isolated from modifications made by concurrent transactions. + /// + /// Valid values are `Serializable` and `WriteSerializable`. + IsolationLevel, + + /// How long the history for a Delta table is kept. + /// + /// Each time a checkpoint is written, Delta Lake automatically cleans up log entries older + /// than the retention interval. If you set this property to a large enough value, many log + /// entries are retained. This should not impact performance as operations against the log are + /// constant time. Operations on history are parallel but will become more expensive as the log size increases. + LogRetentionDuration, + + /// TODO I could not find this property in the documentation, but was defined here and makes sense..? + EnableExpiredLogCleanup, + + /// The minimum required protocol reader version for a reader that allows to read from this Delta table. + MinReaderVersion, + + /// The minimum required protocol writer version for a writer that allows to write to this Delta table. + MinWriterVersion, + + /// true for Delta Lake to generate a random prefix for a file path instead of partition information. + /// + /// For example, this ma + /// y improve Amazon S3 performance when Delta Lake needs to send very high volumes + /// of Amazon S3 calls to better partition across S3 servers. + RandomizeFilePrefixes, + + /// When delta.randomizeFilePrefixes is set to true, the number of characters that Delta Lake generates for random prefixes. + RandomPrefixLength, + + /// The shortest duration within which new snapshots will retain transaction identifiers (for example, SetTransactions). + /// When a new snapshot sees a transaction identifier older than or equal to the duration specified by this property, + /// the snapshot considers it expired and ignores it. The SetTransaction identifier is used when making the writes idempotent. + SetTransactionRetentionDuration, + + /// The target file size in bytes or higher units for file tuning. For example, 104857600 (bytes) or 100mb. + TargetFileSize, + + /// The target file size in bytes or higher units for file tuning. For example, 104857600 (bytes) or 100mb. + TuneFileSizesForRewrites, +} + +impl AsRef for DeltaConfigKey { + fn as_ref(&self) -> &str { + match self { + Self::AppendOnly => "delta.appendOnly", + Self::CheckpointInterval => "delta.checkpointInterval", + Self::AutoOptimizeAutoCompact => "delta.autoOptimize.autoCompact", + Self::AutoOptimizeOptimizeWrite => "delta.autoOptimize.optimizeWrite", + Self::CheckpointWriteStatsAsJson => "delta.checkpoint.writeStatsAsJson", + Self::CheckpointWriteStatsAsStruct => "delta.checkpoint.writeStatsAsStruct", + Self::ColumnMappingMode => "delta.columnMapping.mode", + Self::DataSkippingNumIndexedCols => "delta.dataSkippingNumIndexedCols", + Self::DeletedFileRetentionDuration => "delta.deletedFileRetentionDuration", + Self::EnableChangeDataFeed => "delta.enableChangeDataFeed", + Self::IsolationLevel => "delta.isolationLevel", + Self::LogRetentionDuration => "delta.logRetentionDuration", + Self::EnableExpiredLogCleanup => "delta.enableExpiredLogCleanup", + Self::MinReaderVersion => "delta.minReaderVersion", + Self::MinWriterVersion => "delta.minWriterVersion", + Self::RandomizeFilePrefixes => "delta.randomizeFilePrefixes", + Self::RandomPrefixLength => "delta.randomPrefixLength", + Self::SetTransactionRetentionDuration => "delta.setTransactionRetentionDuration", + Self::TargetFileSize => "delta.targetFileSize", + Self::TuneFileSizesForRewrites => "delta.tuneFileSizesForRewrites", + } + } +} -lazy_static! { - /// How often to checkpoint the delta log. - pub static ref CHECKPOINT_INTERVAL: DeltaConfig = DeltaConfig::new("checkpointInterval", "10"); - - /// The shortest duration we have to keep logically deleted data files around before deleting - /// them physically. - /// Note: this value should be large enough: - /// - It should be larger than the longest possible duration of a job if you decide to run "VACUUM" - /// when there are concurrent readers or writers accessing the table. - ///- If you are running a streaming query reading from the table, you should make sure the query - /// doesn't stop longer than this value. Otherwise, the query may not be able to restart as it - /// still needs to read old files. - pub static ref TOMBSTONE_RETENTION: DeltaConfig = - DeltaConfig::new("deletedFileRetentionDuration", "interval 1 week"); - - /// The shortest duration we have to keep delta files around before deleting them. We can only - /// delete delta files that are before a compaction. We may keep files beyond this duration until - /// the next calendar day. - pub static ref LOG_RETENTION: DeltaConfig = DeltaConfig::new("logRetentionDuration", "interval 30 day"); - - /// Whether to clean up expired checkpoints and delta logs. - pub static ref ENABLE_EXPIRED_LOG_CLEANUP: DeltaConfig = DeltaConfig::new("enableExpiredLogCleanup", "true"); +impl FromStr for DeltaConfigKey { + type Err = DeltaTableError; + + fn from_str(s: &str) -> Result { + match s { + "delta.appendOnly" => Ok(Self::AppendOnly), + "delta.checkpointInterval" => Ok(Self::CheckpointInterval), + "delta.autoOptimize.autoCompact" => Ok(Self::AutoOptimizeAutoCompact), + "delta.autoOptimize.optimizeWrite" => Ok(Self::AutoOptimizeOptimizeWrite), + "delta.checkpoint.writeStatsAsJson" => Ok(Self::CheckpointWriteStatsAsJson), + "delta.checkpoint.writeStatsAsStruct" => Ok(Self::CheckpointWriteStatsAsStruct), + "delta.columnMapping.mode" => Ok(Self::ColumnMappingMode), + "delta.dataSkippingNumIndexedCols" => Ok(Self::DataSkippingNumIndexedCols), + "delta.deletedFileRetentionDuration" | "deletedFileRetentionDuration" => { + Ok(Self::DeletedFileRetentionDuration) + } + "delta.enableChangeDataFeed" => Ok(Self::EnableChangeDataFeed), + "delta.isolationLevel" => Ok(Self::IsolationLevel), + "delta.logRetentionDuration" | "logRetentionDuration" => Ok(Self::LogRetentionDuration), + "delta.enableExpiredLogCleanup" | "enableExpiredLogCleanup" => { + Ok(Self::EnableExpiredLogCleanup) + } + "delta.minReaderVersion" => Ok(Self::MinReaderVersion), + "delta.minWriterVersion" => Ok(Self::MinWriterVersion), + "delta.randomizeFilePrefixes" => Ok(Self::RandomizeFilePrefixes), + "delta.randomPrefixLength" => Ok(Self::RandomPrefixLength), + "delta.setTransactionRetentionDuration" => Ok(Self::SetTransactionRetentionDuration), + "delta.targetFileSize" => Ok(Self::TargetFileSize), + "delta.tuneFileSizesForRewrites" => Ok(Self::TuneFileSizesForRewrites), + _ => Err(DeltaTableError::Generic("unknown config key".into())), + } + } } /// Delta configuration error @@ -36,70 +170,170 @@ pub enum DeltaConfigError { Validation(String), } -/// Delta table's `metadata.configuration` entry. -#[derive(Debug)] -pub struct DeltaConfig { - /// The configuration name - pub key: String, - /// The default value if `key` is not set in `metadata.configuration`. - pub default: String, +macro_rules! table_config { + ($(($key:expr, $name:ident, $ret:ty, $default:literal),)*) => { + $( + /// read property $key + pub fn $name(&self) -> $ret { + self.0 + .get($key.as_ref()) + .and_then(|opt| opt.as_ref().and_then(|value| value.parse().ok())) + .unwrap_or($default) + } + )* + } } -impl DeltaConfig { - fn new(key: &str, default: &str) -> Self { - Self { - key: key.to_string(), - default: default.to_string(), +/// Well known delta table configuration +pub struct TableConfig<'a>(pub(crate) &'a HashMap>); + +impl<'a> TableConfig<'a> { + table_config!( + (DeltaConfigKey::AppendOnly, append_only, bool, false), + ( + DeltaConfigKey::CheckpointWriteStatsAsJson, + write_stats_as_json, + bool, + true + ), + ( + DeltaConfigKey::CheckpointWriteStatsAsStruct, + write_stats_as_struct, + bool, + true + ), + ( + DeltaConfigKey::TargetFileSize, + target_file_size, + i64, + // Databricks / spark defaults to 104857600 (bytes) or 100mb + 104857600 + ), + ( + DeltaConfigKey::EnableChangeDataFeed, + enable_change_data_feed, + bool, + false + ), + ( + DeltaConfigKey::DataSkippingNumIndexedCols, + num_indexed_cols, + i32, + 32 + ), + ( + DeltaConfigKey::EnableExpiredLogCleanup, + enable_expired_log_cleanup, + bool, + true + ), + ( + DeltaConfigKey::CheckpointInterval, + checkpoint_interval, + i32, + 10 + ), + ); + + /// The shortest duration for Delta Lake to keep logically deleted data files before deleting + /// them physically. This is to prevent failures in stale readers after compactions or partition overwrites. + /// + /// This value should be large enough to ensure that: + /// + /// * It is larger than the longest possible duration of a job if you run VACUUM when there are + /// concurrent readers or writers accessing the Delta table. + /// * If you run a streaming query that reads from the table, that query does not stop for longer + /// than this value. Otherwise, the query may not be able to restart, as it must still read old files. + pub fn deleted_file_retention_duration(&self) -> Duration { + lazy_static! { + static ref DEFAULT_DURATION: Duration = parse_interval("interval 1 week").unwrap(); } + self.0 + .get(DeltaConfigKey::DeletedFileRetentionDuration.as_ref()) + .and_then(|o| o.as_ref().and_then(|v| parse_interval(v).ok())) + .unwrap_or_else(|| DEFAULT_DURATION.to_owned()) } - /// Returns the value from `metadata.configuration` for `self.key` as DeltaDataTypeInt. - /// If it's missing in metadata then the `self.default` is used. - #[allow(dead_code)] - pub fn get_int_from_metadata( - &self, - metadata: &DeltaTableMetaData, - ) -> Result { - Ok(parse_int(&self.get_raw_from_metadata(metadata))? as i32) + /// How long the history for a Delta table is kept. + /// + /// Each time a checkpoint is written, Delta Lake automatically cleans up log entries older + /// than the retention interval. If you set this property to a large enough value, many log + /// entries are retained. This should not impact performance as operations against the log are + /// constant time. Operations on history are parallel but will become more expensive as the log size increases. + pub fn log_retention_duration(&self) -> Duration { + lazy_static! { + static ref DEFAULT_DURATION: Duration = parse_interval("interval 30 day").unwrap(); + } + self.0 + .get(DeltaConfigKey::LogRetentionDuration.as_ref()) + .and_then(|o| o.as_ref().and_then(|v| parse_interval(v).ok())) + .unwrap_or_else(|| DEFAULT_DURATION.to_owned()) } - /// Returns the value from `metadata.configuration` for `self.key` as DeltaDataTypeLong. - /// If it's missing in metadata then the `self.default` is used. - #[allow(dead_code)] - pub fn get_long_from_metadata( - &self, - metadata: &DeltaTableMetaData, - ) -> Result { - parse_int(&self.get_raw_from_metadata(metadata)) + /// The degree to which a transaction must be isolated from modifications made by concurrent transactions. + /// + /// Valid values are `Serializable` and `WriteSerializable`. + pub fn isolation_level(&self) -> IsolationLevel { + self.0 + .get(DeltaConfigKey::IsolationLevel.as_ref()) + .and_then(|o| o.as_ref().and_then(|v| v.parse().ok())) + .unwrap_or_default() } +} - /// Returns the value from `metadata.configuration` for `self.key` as Duration type for the interval. - /// The string value of this config has to have the following format: interval . - /// Where is either week, day, hour, second, millisecond, microsecond or nanosecond. - /// If it's missing in metadata then the `self.default` is used. - pub fn get_interval_from_metadata( - &self, - metadata: &DeltaTableMetaData, - ) -> Result { - parse_interval(&self.get_raw_from_metadata(metadata)) +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +/// The isolation level applied during transaction +pub enum IsolationLevel { + /// The strongest isolation level. It ensures that committed write operations + /// and all reads are Serializable. Operations are allowed as long as there + /// exists a serial sequence of executing them one-at-a-time that generates + /// the same outcome as that seen in the table. For the write operations, + /// the serial sequence is exactly the same as that seen in the table’s history. + Serializable, + + /// A weaker isolation level than Serializable. It ensures only that the write + /// operations (that is, not reads) are serializable. However, this is still stronger + /// than Snapshot isolation. WriteSerializable is the default isolation level because + /// it provides great balance of data consistency and availability for most common operations. + WriteSerializable, + + /// SnapshotIsolation is a guarantee that all reads made in a transaction will see a consistent + /// snapshot of the database (in practice it reads the last committed values that existed at the + /// time it started), and the transaction itself will successfully commit only if no updates + /// it has made conflict with any concurrent updates made since that snapshot. + SnapshotIsolation, +} + +// Spark assumes Serializable as default isolation level +// https://github.com/delta-io/delta/blob/abb171c8401200e7772b27e3be6ea8682528ac72/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala#L1023 +impl Default for IsolationLevel { + fn default() -> Self { + Self::Serializable } +} - /// Returns the value from `metadata.configuration` for `self.key` as bool. - /// If it's missing in metadata then the `self.default` is used. - pub fn get_boolean_from_metadata( - &self, - metadata: &DeltaTableMetaData, - ) -> Result { - parse_bool(&self.get_raw_from_metadata(metadata)) +impl AsRef for IsolationLevel { + fn as_ref(&self) -> &str { + match self { + Self::Serializable => "Serializable", + Self::WriteSerializable => "WriteSerializable", + Self::SnapshotIsolation => "SnapshotIsolation", + } } +} - fn get_raw_from_metadata(&self, metadata: &DeltaTableMetaData) -> String { - metadata - .configuration - .get(&self.key) - .and_then(|opt| opt.as_deref()) - .unwrap_or(self.default.as_str()) - .to_string() +impl FromStr for IsolationLevel { + type Err = DeltaTableError; + + fn from_str(s: &str) -> Result { + match s.to_ascii_lowercase().as_str() { + "serializable" => Ok(Self::Serializable), + "writeserializable" | "write_serializable" => Ok(Self::WriteSerializable), + "snapshotisolation" | "snapshot_isolation" => Ok(Self::SnapshotIsolation), + _ => Err(DeltaTableError::Generic( + "Invalid string for IsolationLevel".into(), + )), + } } } @@ -149,15 +383,10 @@ fn parse_int(value: &str) -> Result { }) } -fn parse_bool(value: &str) -> Result { - value - .parse() - .map_err(|e| DeltaConfigError::Validation(format!("Cannot parse '{value}' as bool: {e}"))) -} - #[cfg(test)] mod tests { use super::*; + use crate::DeltaTableMetaData; use crate::Schema; use std::collections::HashMap; @@ -168,68 +397,55 @@ mod tests { #[test] fn get_interval_from_metadata_test() { - let mut md = dummy_metadata(); + let md = dummy_metadata(); + let config = TableConfig(&md.configuration); // default 1 week assert_eq!( - TOMBSTONE_RETENTION - .get_interval_from_metadata(&md) - .unwrap() - .as_secs(), + config.deleted_file_retention_duration().as_secs(), SECONDS_PER_WEEK, ); // change to 2 day + let mut md = dummy_metadata(); md.configuration.insert( - TOMBSTONE_RETENTION.key.to_string(), + DeltaConfigKey::DeletedFileRetentionDuration + .as_ref() + .to_string(), Some("interval 2 day".to_string()), ); + let config = TableConfig(&md.configuration); + assert_eq!( - TOMBSTONE_RETENTION - .get_interval_from_metadata(&md) - .unwrap() - .as_secs(), + config.deleted_file_retention_duration().as_secs(), 2 * SECONDS_PER_DAY, ); } #[test] fn get_long_from_metadata_test() { - assert_eq!( - CHECKPOINT_INTERVAL - .get_long_from_metadata(&dummy_metadata()) - .unwrap(), - 10, - ) - } - - #[test] - fn get_int_from_metadata_test() { - assert_eq!( - CHECKPOINT_INTERVAL - .get_int_from_metadata(&dummy_metadata()) - .unwrap(), - 10, - ) + let md = dummy_metadata(); + let config = TableConfig(&md.configuration); + assert_eq!(config.checkpoint_interval(), 10,) } #[test] fn get_boolean_from_metadata_test() { - let mut md = dummy_metadata(); + let md = dummy_metadata(); + let config = TableConfig(&md.configuration); // default value is true - assert!(ENABLE_EXPIRED_LOG_CLEANUP - .get_boolean_from_metadata(&md) - .unwrap(),); + assert!(config.enable_expired_log_cleanup()); // change to false + let mut md = dummy_metadata(); md.configuration.insert( - ENABLE_EXPIRED_LOG_CLEANUP.key.to_string(), + DeltaConfigKey::EnableExpiredLogCleanup.as_ref().into(), Some("false".to_string()), ); - assert!(!ENABLE_EXPIRED_LOG_CLEANUP - .get_boolean_from_metadata(&md) - .unwrap()); + let config = TableConfig(&md.configuration); + + assert!(!config.enable_expired_log_cleanup()); } #[test] diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 9aece98ff2..43a3277ae6 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -91,7 +91,6 @@ pub mod operations; pub mod partitions; pub mod schema; pub mod storage; -pub mod table_properties; pub mod table_state; pub mod time_utils; diff --git a/rust/src/operations/create.rs b/rust/src/operations/create.rs index f10fb50bf5..864ef0164d 100644 --- a/rust/src/operations/create.rs +++ b/rust/src/operations/create.rs @@ -4,6 +4,7 @@ use super::transaction::commit; use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION}; use crate::action::{Action, DeltaOperation, MetaData, Protocol, SaveMode}; use crate::builder::ensure_table_uri; +use crate::delta_config::DeltaConfigKey; use crate::schema::{SchemaDataType, SchemaField, SchemaTypeStruct}; use crate::storage::DeltaObjectStore; use crate::{DeltaResult, DeltaTable, DeltaTableError}; @@ -152,19 +153,25 @@ impl CreateBuilder { } /// Set configuration on created table - pub fn with_configuration(mut self, configuration: HashMap>) -> Self { - self.configuration = configuration; + pub fn with_configuration( + mut self, + configuration: HashMap>>, + ) -> Self { + self.configuration = configuration + .into_iter() + .map(|(k, v)| (k.as_ref().into(), v.map(|s| s.into()))) + .collect(); self } /// Specify a table property in the table configuration pub fn with_configuration_property( mut self, - key: impl Into, + key: DeltaConfigKey, value: Option>, ) -> Self { self.configuration - .insert(key.into(), value.map(|v| v.into())); + .insert(key.as_ref().into(), value.map(|v| v.into())); self } @@ -311,8 +318,8 @@ impl std::future::IntoFuture for CreateBuilder { #[cfg(all(test, feature = "parquet"))] mod tests { use super::*; + use crate::delta_config::DeltaConfigKey; use crate::operations::DeltaOps; - use crate::table_properties::APPEND_ONLY; use crate::writer::test_utils::get_delta_schema; use tempdir::TempDir; @@ -396,14 +403,14 @@ mod tests { let table = CreateBuilder::new() .with_location("memory://") .with_columns(schema.get_fields().clone()) - .with_configuration_property(APPEND_ONLY, Some("true")) + .with_configuration_property(DeltaConfigKey::AppendOnly, Some("true")) .await .unwrap(); let append = table .get_metadata() .unwrap() .configuration - .get(APPEND_ONLY) + .get(DeltaConfigKey::AppendOnly.as_ref()) .unwrap() .as_ref() .unwrap() diff --git a/rust/src/operations/optimize.rs b/rust/src/operations/optimize.rs index a3a89ad998..d1cfaa7721 100644 --- a/rust/src/operations/optimize.rs +++ b/rust/src/operations/optimize.rs @@ -26,14 +26,14 @@ use crate::storage::ObjectStoreRef; use crate::table_state::DeltaTableState; use crate::writer::utils::arrow_schema_without_partitions; use crate::writer::utils::PartitionPath; -use crate::{table_properties, DeltaDataTypeVersion}; +use crate::DeltaDataTypeVersion; use crate::{ DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError, ObjectMeta, PartitionFilter, }; use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use futures::future::BoxFuture; use futures::StreamExt; -use log::{debug, error}; +use log::debug; use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; use parquet::file::properties::WriterProperties; use serde::{Deserialize, Serialize}; @@ -380,26 +380,6 @@ impl MergePlan { } } -fn get_target_file_size(snapshot: &DeltaTableState) -> DeltaDataTypeLong { - let mut target_size = 268_435_456; - if let Some(meta) = snapshot.current_metadata() { - let config_str = meta.configuration.get(table_properties::TARGET_FILE_SIZE); - if let Some(s) = config_str { - if let Some(s) = s { - let r = s.parse::(); - if let Ok(size) = r { - target_size = size; - } else { - error!("Unable to parse value of 'delta.targetFileSize'. Using default value"); - } - } else { - error!("Check your configuration of 'delta.targetFileSize'. Using default value"); - } - } - } - target_size -} - /// Build a Plan on which files to merge together. See [OptimizeBuilder] pub fn create_merge_plan( snapshot: &DeltaTableState, @@ -407,7 +387,7 @@ pub fn create_merge_plan( target_size: Option, writer_properties: WriterProperties, ) -> Result { - let target_size = target_size.unwrap_or_else(|| get_target_file_size(snapshot)); + let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size()); let mut candidates = HashMap::new(); let mut operations: HashMap = HashMap::new(); let mut metrics = Metrics::default(); diff --git a/rust/src/table_properties.rs b/rust/src/table_properties.rs deleted file mode 100644 index 1c69d423fd..0000000000 --- a/rust/src/table_properties.rs +++ /dev/null @@ -1,69 +0,0 @@ -//! properties defined on Delta Tables -//! - -/// true for this Delta table to be append-only. If append-only, -/// existing records cannot be deleted, and existing values cannot be updated. -pub const APPEND_ONLY: &str = "delta.appendOnly"; -/// true for Delta Lake to automatically optimize the layout of the files for this Delta table. -pub const AUTO_OPTIMIZE_AUTO_COMPACT: &str = "delta.autoOptimize.autoCompact"; -/// true for Delta Lake to automatically optimize the layout of the files for this Delta table during writes. -pub const AUTO_OPTIMIZE_OPTIMIZE_WRITE: &str = "delta.autoOptimize.optimizeWrite"; -/// true for Delta Lake to write file statistics in checkpoints in JSON format for the stats column. -pub const CHECKPOINT_WRITE_STATS_AS_JSON: &str = "delta.checkpoint.writeStatsAsJson"; -/// true for Delta Lake to write file statistics to checkpoints in struct format for the -/// stats_parsed column and to write partition values as a struct for partitionValues_parsed. -pub const CHECKPOINT_WRITE_STATS_AS_STRUCT: &str = "delta.checkpoint.writeStatsAsStruct"; -/// Whether column mapping is enabled for Delta table columns and the corresponding Parquet columns that use different names. -pub const COLUMN_MAPPING_MODE: &str = "delta.columnMapping.mode"; -/// Whether column mapping is enabled for Delta table columns and the corresponding Parquet columns that use different names. -pub const COMPATIBILITY_SYMLINK_FORMAT_MANIFEST_ENABLED: &str = - "delta.compatibility.symlinkFormatManifest.enabled"; -/// The number of columns for Delta Lake to collect statistics about for data skipping. -/// A value of -1 means to collect statistics for all columns. Updating this property does -/// not automatically collect statistics again; instead, it redefines the statistics schema -/// of the Delta table. Specifically, it changes the behavior of future statistics collection -/// (such as during appends and optimizations) as well as data skipping (such as ignoring column -/// statistics beyond this number, even when such statistics exist). -pub const DATA_SKIPPING_NUM_INDEXED_COLS: &str = "delta.dataSkippingNumIndexedCols"; -/// The shortest duration for Delta Lake to keep logically deleted data files before deleting -/// them physically. This is to prevent failures in stale readers after compactions or partition overwrites. -/// -/// This value should be large enough to ensure that: -/// -/// * It is larger than the longest possible duration of a job if you run VACUUM when there are -/// concurrent readers or writers accessing the Delta table. -/// * If you run a streaming query that reads from the table, that query does not stop for longer -/// than this value. Otherwise, the query may not be able to restart, as it must still read old files. -pub const DELETED_FILE_RETENTION_DURATION: &str = "delta.deletedFileRetentionDuration"; -/// true to enable change data feed. -pub const ENABLE_CHANGE_DATA_FEED: &str = "delta.enableChangeDataFeed"; -/// The degree to which a transaction must be isolated from modifications made by concurrent transactions. -/// -/// Valid values are `Serializable` and `WriteSerializable`. -pub const ISOLATION_LEVEL: &str = "delta.isolationLevel"; -/// How long the history for a Delta table is kept. -/// -/// Each time a checkpoint is written, Delta Lake automatically cleans up log entries older -/// than the retention interval. If you set this property to a large enough value, many log -/// entries are retained. This should not impact performance as operations against the log are -/// constant time. Operations on history are parallel but will become more expensive as the log size increases. -pub const LOG_RETENTION_DURATION: &str = "delta.logRetentionDuration"; -/// The minimum required protocol reader version for a reader that allows to read from this Delta table. -pub const MIN_READER_VERSION: &str = "delta.minReaderVersion"; -/// The minimum required protocol writer version for a writer that allows to write to this Delta table. -pub const MIN_WRITER_VERSION: &str = "delta.minWriterVersion"; -/// true for Delta Lake to generate a random prefix for a file path instead of partition information. -/// -/// For example, this may improve Amazon S3 performance when Delta Lake needs to send very high volumes -/// of Amazon S3 calls to better partition across S3 servers. -pub const RANDOMIZE_FILE_PREFIXES: &str = "delta.randomizeFilePrefixes"; -/// When delta.randomizeFilePrefixes is set to true, the number of characters that Delta Lake generates for random prefixes. -pub const RANDOM_PREFIX_LENGTH: &str = "delta.randomPrefixLength"; -/// The shortest duration within which new snapshots will retain transaction identifiers (for example, SetTransactions). -/// When a new snapshot sees a transaction identifier older than or equal to the duration specified by this property, -/// the snapshot considers it expired and ignores it. The SetTransaction identifier is used when making the writes idempotent. -pub const SET_TRANSACTION_RETENTION_DURATION: &str = "delta.setTransactionRetentionDuration"; -/// The target file size in bytes or higher units for file tuning. For example, 104857600 (bytes) or 100mb. -pub const TARGET_FILE_SIZE: &str = "delta.targetFileSize"; -/// The target file size in bytes or higher units for file tuning. For example, 104857600 (bytes) or 100mb. -pub const TUNE_FILE_SIZES_FOR_REWRITES: &str = "delta.tuneFileSizesForRewrites"; diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 1eaf862398..341e3d8f75 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -1,14 +1,16 @@ //! The module for delta table state. use crate::action::{self, Action, Add}; -use crate::delta_config; +use crate::delta_config::TableConfig; use crate::partitions::{DeltaTablePartition, PartitionFilter}; use crate::schema::SchemaDataType; +use crate::Schema; use crate::{ ApplyLogError, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable, DeltaTableError, DeltaTableMetaData, }; use chrono::Utc; +use lazy_static::lazy_static; use object_store::{path::Path, ObjectStore}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; @@ -228,6 +230,22 @@ impl DeltaTableState { self.current_metadata.as_ref() } + /// The table schema + pub fn schema(&self) -> Option<&Schema> { + self.current_metadata.as_ref().map(|m| &m.schema) + } + + /// Well known table configuration + pub fn table_config(&self) -> TableConfig<'_> { + lazy_static! { + static ref DUMMY_CONF: HashMap> = HashMap::new(); + } + self.current_metadata + .as_ref() + .map(|meta| TableConfig(&meta.configuration)) + .unwrap_or_else(|| TableConfig(&DUMMY_CONF)) + } + /// Merges new state information into our state /// /// The DeltaTableState also carries the version information for the given state, @@ -322,14 +340,12 @@ impl DeltaTableState { action::Action::metaData(v) => { let md = DeltaTableMetaData::try_from(v) .map_err(|e| ApplyLogError::InvalidJson { source: e })?; - self.tombstone_retention_millis = delta_config::TOMBSTONE_RETENTION - .get_interval_from_metadata(&md)? - .as_millis() as i64; - self.log_retention_millis = delta_config::LOG_RETENTION - .get_interval_from_metadata(&md)? - .as_millis() as i64; - self.enable_expired_log_cleanup = - delta_config::ENABLE_EXPIRED_LOG_CLEANUP.get_boolean_from_metadata(&md)?; + let table_config = TableConfig(&md.configuration); + self.tombstone_retention_millis = + table_config.deleted_file_retention_duration().as_millis() as i64; + self.log_retention_millis = + table_config.log_retention_duration().as_millis() as i64; + self.enable_expired_log_cleanup = table_config.enable_expired_log_cleanup(); self.current_metadata = Some(md); } action::Action::txn(v) => { diff --git a/rust/tests/checkpoint_writer.rs b/rust/tests/checkpoint_writer.rs index a494f238e2..80067814dd 100644 --- a/rust/tests/checkpoint_writer.rs +++ b/rust/tests/checkpoint_writer.rs @@ -91,6 +91,7 @@ mod delete_expired_delta_log_in_checkpoint { use ::object_store::path::Path as ObjectStorePath; use chrono::Utc; + use deltalake::delta_config::DeltaConfigKey; use deltalake::*; use maplit::hashmap; @@ -99,8 +100,8 @@ mod delete_expired_delta_log_in_checkpoint { let mut table = fs_common::create_table( "./tests/data/checkpoints_with_expired_logs/expired", Some(hashmap! { - delta_config::LOG_RETENTION.key.clone() => Some("interval 10 minute".to_string()), - delta_config::ENABLE_EXPIRED_LOG_CLEANUP.key.clone() => Some("true".to_string()) + DeltaConfigKey::LogRetentionDuration.as_ref().into() => Some("interval 10 minute".to_string()), + DeltaConfigKey::EnableExpiredLogCleanup.as_ref().into() => Some("true".to_string()) }), ) .await; @@ -163,8 +164,8 @@ mod delete_expired_delta_log_in_checkpoint { let mut table = fs_common::create_table( "./tests/data/checkpoints_with_expired_logs/not_delete_expired", Some(hashmap! { - delta_config::LOG_RETENTION.key.clone() => Some("interval 1 second".to_string()), - delta_config::ENABLE_EXPIRED_LOG_CLEANUP.key.clone() => Some("false".to_string()) + DeltaConfigKey::LogRetentionDuration.as_ref().into() => Some("interval 1 second".to_string()), + DeltaConfigKey::EnableExpiredLogCleanup.as_ref().into() => Some("false".to_string()) }), ) .await; @@ -212,6 +213,7 @@ mod checkpoints_with_tombstones { use ::object_store::path::Path as ObjectStorePath; use chrono::Utc; use deltalake::action::*; + use deltalake::delta_config::DeltaConfigKey; use deltalake::*; use maplit::hashmap; use parquet::file::reader::{FileReader, SerializedFileReader}; @@ -237,7 +239,7 @@ mod checkpoints_with_tombstones { #[tokio::test] async fn test_expired_tombstones() { let mut table = fs_common::create_table("./tests/data/checkpoints_tombstones/expired", Some(hashmap! { - delta_config::TOMBSTONE_RETENTION.key.clone() => Some("interval 1 minute".to_string()) + DeltaConfigKey::DeletedFileRetentionDuration.as_ref().into() => Some("interval 1 minute".to_string()) })).await; let a1 = fs_common::add(3 * 60 * 1000); // 3 mins ago,