diff --git a/aws/delta-checkpoint/src/main.rs b/aws/delta-checkpoint/src/main.rs index 6b60381e76..26321bc0ce 100644 --- a/aws/delta-checkpoint/src/main.rs +++ b/aws/delta-checkpoint/src/main.rs @@ -10,7 +10,6 @@ use deltalake::checkpoints; use deltalake::checkpoints::CheckpointError; -use deltalake::DeltaDataTypeVersion; use lambda_runtime::{service_fn, Error, LambdaEvent}; use lazy_static::lazy_static; use log::*; @@ -78,9 +77,7 @@ fn bucket_and_key_from_event(event: &Value) -> Result<(String, String), CheckPoi Ok((bucket, key)) } -fn table_path_and_version_from_key( - key: &str, -) -> Result<(String, DeltaDataTypeVersion), CheckPointLambdaError> { +fn table_path_and_version_from_key(key: &str) -> Result<(String, i64), CheckPointLambdaError> { lazy_static! { static ref JSON_LOG_ENTRY_REGEX: Regex = Regex::new(r#"(.*)/_delta_log/0*(\d+)\.json$"#).unwrap(); @@ -97,7 +94,7 @@ fn table_path_and_version_from_key( .get(2) .ok_or_else(|| CheckPointLambdaError::ObjectKeyParseFailed(key.to_string()))? .as_str(); - let version = version_str.parse::()?; + let version = version_str.parse::()?; Ok((table_path, version)) } diff --git a/python/src/lib.rs b/python/src/lib.rs index c09917d278..cdcaefa2cc 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -20,7 +20,7 @@ use deltalake::operations::optimize::OptimizeBuilder; use deltalake::operations::transaction::commit; use deltalake::operations::vacuum::VacuumBuilder; use deltalake::partitions::PartitionFilter; -use deltalake::{DeltaDataTypeLong, DeltaDataTypeTimestamp, DeltaOps, Invariant, Schema}; +use deltalake::{DeltaOps, Invariant, Schema}; use pyo3::create_exception; use pyo3::exceptions::PyException; use pyo3::exceptions::PyValueError; @@ -107,7 +107,7 @@ struct RawDeltaTableMetaData { #[pyo3(get)] partition_columns: Vec, #[pyo3(get)] - created_time: Option, + created_time: Option, #[pyo3(get)] configuration: HashMap>, } @@ -118,7 +118,7 @@ impl RawDeltaTable { #[pyo3(signature = (table_uri, version = None, storage_options = None, without_files = false))] fn new( table_uri: &str, - version: Option, + version: Option, storage_options: Option>, without_files: bool, ) -> PyResult { @@ -198,7 +198,7 @@ impl RawDeltaTable { )) } - pub fn load_version(&mut self, version: deltalake::DeltaDataTypeVersion) -> PyResult<()> { + pub fn load_version(&mut self, version: i64) -> PyResult<()> { rt()? .block_on(self._table.load_version(version)) .map_err(PyDeltaTableError::from_raw) @@ -317,7 +317,7 @@ impl RawDeltaTable { pub fn optimize( &mut self, partition_filters: Option>, - target_size: Option, + target_size: Option, ) -> PyResult { let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()); if let Some(size) = target_size { @@ -717,7 +717,7 @@ fn save_mode_from_str(value: &str) -> PyResult { } } -fn current_timestamp() -> DeltaDataTypeTimestamp { +fn current_timestamp() -> i64 { let start = SystemTime::now(); let since_the_epoch = start .duration_since(UNIX_EPOCH) @@ -728,9 +728,9 @@ fn current_timestamp() -> DeltaDataTypeTimestamp { #[derive(FromPyObject)] pub struct PyAddAction { path: String, - size: DeltaDataTypeLong, + size: i64, partition_values: HashMap>, - modification_time: DeltaDataTypeTimestamp, + modification_time: i64, data_change: bool, stats: Option, } diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index 732b2c24c2..e246bfee97 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -96,7 +96,7 @@ pub enum ColumnCountStat { /// Composite HashMap representation of statistics. Column(HashMap), /// Json representation of statistics. - Value(DeltaDataTypeLong), + Value(i64), } impl ColumnCountStat { @@ -109,7 +109,7 @@ impl ColumnCountStat { } /// Returns the serde_json representation of the ColumnCountStat. - pub fn as_value(&self) -> Option { + pub fn as_value(&self) -> Option { match self { ColumnCountStat::Value(v) => Some(*v), _ => None, @@ -122,7 +122,7 @@ impl ColumnCountStat { #[serde(rename_all = "camelCase")] pub struct Stats { /// Number of records in the file associated with the log action. - pub num_records: DeltaDataTypeLong, + pub num_records: i64, // start of per column stats /// Contains a value smaller than all values present in the file for all columns. @@ -137,7 +137,7 @@ pub struct Stats { #[derive(Debug, Default)] pub struct StatsParsed { /// Number of records in the file associated with the log action. - pub num_records: DeltaDataTypeLong, + pub num_records: i64, // start of per column stats /// Contains a value smaller than all values present in the file for all columns. @@ -154,7 +154,7 @@ pub struct StatsParsed { /// Contains a value larger than all values present in the file for all columns. pub max_values: HashMap, /// The number of null values for all columns. - pub null_count: HashMap, + pub null_count: HashMap, } /// Delta AddCDCFile action that describes a parquet CDC data file. @@ -165,7 +165,7 @@ pub struct AddCDCFile { /// absolute path to a CDC file pub path: String, /// The size of this file in bytes - pub size: DeltaDataTypeLong, + pub size: i64, /// A map from partition column to value for this file pub partition_values: HashMap>, /// Should always be set to false because they do not change the underlying data of the table @@ -181,7 +181,7 @@ pub struct Add { /// A relative path, from the root of the table, to a file that should be added to the table pub path: String, /// The size of this file in bytes - pub size: DeltaDataTypeLong, + pub size: i64, /// A map from partition column to value for this file pub partition_values: HashMap>, /// Partition values stored in raw parquet struct format. In this struct, the column names @@ -205,7 +205,7 @@ pub struct Add { #[serde(skip_serializing, skip_deserializing)] pub partition_values_parsed: Option, /// The time this file was created, as milliseconds since the epoch - pub modification_time: DeltaDataTypeTimestamp, + pub modification_time: i64, /// When false the file must already be present in the table or the records in the added file /// must be contained in one or more remove actions in the same version /// @@ -327,7 +327,7 @@ pub struct MetaData { /// An array containing the names of columns by which the data should be partitioned pub partition_columns: Vec, /// The time when this metadata action is created, in milliseconds since the Unix epoch - pub created_time: Option, + pub created_time: Option, /// A map containing configuration options for the table pub configuration: HashMap>, } @@ -367,7 +367,7 @@ pub struct Remove { /// The path of the file that is removed from the table. pub path: String, /// The timestamp when the remove was added to table state. - pub deletion_timestamp: Option, + pub deletion_timestamp: Option, /// Whether data is changed by the remove. A table optimize will report this as false for /// example, since it adds and removes files by combining many files into one. pub data_change: bool, @@ -379,7 +379,7 @@ pub struct Remove { /// A map from partition column to value for this file. pub partition_values: Option>>, /// Size of this file in bytes - pub size: Option, + pub size: Option, /// Map containing metadata about this file pub tags: Option>>, } @@ -425,9 +425,9 @@ pub struct Txn { /// A unique identifier for the application performing the transaction. pub app_id: String, /// An application-specific numeric identifier for this transaction. - pub version: DeltaDataTypeVersion, + pub version: i64, /// The time when this transaction action was created in milliseconds since the Unix epoch. - pub last_updated: Option, + pub last_updated: Option, } /// Action used to increase the version of the Delta protocol required to read or write to the @@ -437,10 +437,10 @@ pub struct Txn { pub struct Protocol { /// Minimum version of the Delta read protocol a client must implement to correctly read the /// table. - pub min_reader_version: DeltaDataTypeInt, + pub min_reader_version: i32, /// Minimum version of the Delta write protocol a client must implement to correctly read the /// table. - pub min_writer_version: DeltaDataTypeInt, + pub min_writer_version: i32, } /// The commitInfo is a fairly flexible action within the delta specification, where arbitrary data can be stored. @@ -451,7 +451,7 @@ pub struct Protocol { pub struct CommitInfo { /// Timestamp in millis when the commit was created #[serde(skip_serializing_if = "Option::is_none")] - pub timestamp: Option, + pub timestamp: Option, /// Id of the user invoking the commit #[serde(skip_serializing_if = "Option::is_none")] pub user_id: Option, @@ -569,7 +569,7 @@ pub enum DeltaOperation { /// The filter used to determine which partitions to filter predicate: Option, /// Target optimize size - target_size: DeltaDataTypeLong, + target_size: i64, }, #[serde(rename_all = "camelCase")] /// Represents a `FileSystemCheck` operation diff --git a/rust/src/action/parquet2_read/mod.rs b/rust/src/action/parquet2_read/mod.rs index ddfd2a9e55..7c7519344a 100644 --- a/rust/src/action/parquet2_read/mod.rs +++ b/rust/src/action/parquet2_read/mod.rs @@ -19,9 +19,7 @@ mod string; mod validity; use crate::action::{Action, Add, CommitInfo, MetaData, Protocol, Remove, Txn}; -use crate::schema::{ - DeltaDataTypeInt, DeltaDataTypeLong, DeltaDataTypeTimestamp, DeltaDataTypeVersion, Guid, -}; +use crate::schema::Guid; use boolean::for_each_boolean_field_value; use map::for_each_map_field_value; use primitive::for_each_primitive_field_value; @@ -253,7 +251,7 @@ fn deserialize_txn_column_page( page, dict, descriptor, - |action: &mut Txn, v: DeltaDataTypeVersion| action.version = v, + |action: &mut Txn, v: i64| action.version = v, )?; } "appId" => { @@ -271,7 +269,7 @@ fn deserialize_txn_column_page( page, dict, descriptor, - |action: &mut Txn, v: DeltaDataTypeTimestamp| action.last_updated = Some(v), + |action: &mut Txn, v: i64| action.last_updated = Some(v), )?; } _ => { @@ -309,7 +307,7 @@ fn deserialize_add_column_page( page, dict, descriptor, - |action: &mut Add, v: DeltaDataTypeLong| action.size = v, + |action: &mut Add, v: i64| action.size = v, )?; } "partitionValues" => { @@ -363,7 +361,7 @@ fn deserialize_add_column_page( page, dict, descriptor, - |action: &mut Add, v: DeltaDataTypeTimestamp| action.modification_time = v, + |action: &mut Add, v: i64| action.modification_time = v, )?; } _ => { @@ -398,9 +396,7 @@ fn deserialize_remove_column_page( page, dict, descriptor, - |action: &mut Remove, v: DeltaDataTypeTimestamp| { - action.deletion_timestamp = Some(v) - }, + |action: &mut Remove, v: i64| action.deletion_timestamp = Some(v), )?; } "size" => { @@ -409,7 +405,7 @@ fn deserialize_remove_column_page( page, dict, descriptor, - |action: &mut Remove, v: DeltaDataTypeLong| action.size = Some(v), + |action: &mut Remove, v: i64| action.size = Some(v), )?; } // FIXME suport partitionValueParsed @@ -556,7 +552,7 @@ fn deserialize_metadata_column_page( page, dict, descriptor, - |action: &mut MetaData, v: DeltaDataTypeTimestamp| action.created_time = Some(v), + |action: &mut MetaData, v: i64| action.created_time = Some(v), )?; } "configuration" => { @@ -595,7 +591,7 @@ fn deserialize_protocol_column_page( page, dict, descriptor, - |action: &mut Protocol, v: DeltaDataTypeInt| action.min_reader_version = v, + |action: &mut Protocol, v: i32| action.min_reader_version = v, )?; } "minWriterVersion" => { @@ -604,7 +600,7 @@ fn deserialize_protocol_column_page( page, dict, descriptor, - |action: &mut Protocol, v: DeltaDataTypeInt| action.min_writer_version = v, + |action: &mut Protocol, v: i32| action.min_writer_version = v, )?; } _ => { diff --git a/rust/src/builder.rs b/rust/src/builder.rs index 1dd8e22f5f..641aca6aaf 100644 --- a/rust/src/builder.rs +++ b/rust/src/builder.rs @@ -5,7 +5,6 @@ use std::path::PathBuf; use std::sync::Arc; use crate::delta::{DeltaResult, DeltaTable, DeltaTableError}; -use crate::schema::DeltaDataTypeVersion; use crate::storage::config::StorageOptions; use crate::storage::{DeltaObjectStore, ObjectStoreRef}; @@ -44,7 +43,7 @@ pub enum DeltaVersion { #[default] Newest, /// specify the version to load - Version(DeltaDataTypeVersion), + Version(i64), /// specify the timestamp in UTC Timestamp(DateTime), } @@ -148,7 +147,7 @@ impl DeltaTableBuilder { } /// Sets `version` to the builder - pub fn with_version(mut self, version: DeltaDataTypeVersion) -> Self { + pub fn with_version(mut self, version: i64) -> Self { self.options.version = DeltaVersion::Version(version); self } diff --git a/rust/src/checkpoints.rs b/rust/src/checkpoints.rs index 534e00c400..98a4e790c1 100644 --- a/rust/src/checkpoints.rs +++ b/rust/src/checkpoints.rs @@ -115,7 +115,7 @@ pub async fn cleanup_metadata(table: &DeltaTable) -> Result, ) -> Result<(), CheckpointError> { let table = open_table_with_version(table_uri, version).await?; @@ -133,7 +133,7 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup( } async fn create_checkpoint_for( - version: DeltaDataTypeVersion, + version: i64, state: &DeltaTableState, storage: &DeltaObjectStore, ) -> Result<(), CheckpointError> { @@ -164,10 +164,10 @@ async fn create_checkpoint_for( Ok(()) } -async fn flush_delete_files bool>( +async fn flush_delete_files bool>( storage: &DeltaObjectStore, - maybe_delete_files: &mut Vec<(DeltaDataTypeVersion, ObjectMeta)>, - files_to_delete: &mut Vec<(DeltaDataTypeVersion, ObjectMeta)>, + maybe_delete_files: &mut Vec<(i64, ObjectMeta)>, + files_to_delete: &mut Vec<(i64, ObjectMeta)>, should_delete_file: T, ) -> Result { if !maybe_delete_files.is_empty() && should_delete_file(maybe_delete_files.last().unwrap()) { @@ -199,7 +199,7 @@ async fn flush_delete_files bool>( /// exposed only for integration testing - DO NOT USE otherwise pub async fn cleanup_expired_logs_for( - until_version: DeltaDataTypeVersion, + until_version: i64, storage: &DeltaObjectStore, log_retention_timestamp: i64, ) -> Result { @@ -211,7 +211,7 @@ pub async fn cleanup_expired_logs_for( let mut deleted_log_num = 0; // Get file objects from table. - let mut candidates: Vec<(DeltaDataTypeVersion, ObjectMeta)> = Vec::new(); + let mut candidates: Vec<(i64, ObjectMeta)> = Vec::new(); let mut stream = storage.list(Some(storage.log_path())).await?; while let Some(obj_meta) = stream.next().await { let obj_meta = obj_meta?; @@ -220,7 +220,7 @@ pub async fn cleanup_expired_logs_for( if let Some(captures) = DELTA_LOG_REGEX.captures(obj_meta.location.as_ref()) { let log_ver_str = captures.get(1).unwrap().as_str(); - let log_ver: DeltaDataTypeVersion = log_ver_str.parse().unwrap(); + let log_ver: i64 = log_ver_str.parse().unwrap(); if log_ver < until_version && ts <= log_retention_timestamp { candidates.push((log_ver, obj_meta)); } @@ -248,8 +248,8 @@ pub async fn cleanup_expired_logs_for( file.1.last_modified.timestamp() <= log_retention_timestamp && file.0 < until_version }; - let mut maybe_delete_files: Vec<(DeltaDataTypeVersion, ObjectMeta)> = Vec::new(); - let mut files_to_delete: Vec<(DeltaDataTypeVersion, ObjectMeta)> = Vec::new(); + let mut maybe_delete_files: Vec<(i64, ObjectMeta)> = Vec::new(); + let mut files_to_delete: Vec<(i64, ObjectMeta)> = Vec::new(); // Init if !candidates.is_empty() { @@ -258,7 +258,7 @@ pub async fn cleanup_expired_logs_for( maybe_delete_files.push(removed); } - let mut current_file: (DeltaDataTypeVersion, ObjectMeta); + let mut current_file: (i64, ObjectMeta); loop { if candidates.is_empty() { deleted_log_num += flush_delete_files( diff --git a/rust/src/delta.rs b/rust/src/delta.rs index a8ea919fe1..cdbf1cc52f 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -40,14 +40,14 @@ pub use crate::builder::{DeltaTableBuilder, DeltaTableConfig, DeltaVersion}; #[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)] pub struct CheckPoint { /// Delta table version - pub(crate) version: DeltaDataTypeVersion, // 20 digits decimals - size: DeltaDataTypeLong, + pub(crate) version: i64, // 20 digits decimals + size: i64, parts: Option, // 10 digits decimals } impl CheckPoint { /// Creates a new checkpoint from the given parameters. - pub fn new(version: DeltaDataTypeVersion, size: DeltaDataTypeLong, parts: Option) -> Self { + pub fn new(version: i64, size: i64, parts: Option) -> Self { Self { version, size, @@ -123,7 +123,7 @@ pub enum DeltaTableError { /// invalid log entry content. line: String, /// corresponding table version for the log file. - version: DeltaDataTypeVersion, + version: i64, }, /// Error returned when the log contains invalid stats JSON. #[error("Invalid JSON in file stats: {}", .json_err)] @@ -141,7 +141,7 @@ pub enum DeltaTableError { }, /// Error returned when the DeltaTable has an invalid version. #[error("Invalid table version: {0}")] - InvalidVersion(DeltaDataTypeVersion), + InvalidVersion(i64), /// Error returned when the DeltaTable has no data files. #[error("Corrupted table, cannot read data file {}: {}", .path, .source)] MissingDataFile { @@ -225,10 +225,10 @@ pub enum DeltaTableError { }, /// Error returned when transaction is failed to be committed because given version already exists. #[error("Delta transaction failed, version {0} already exists.")] - VersionAlreadyExists(DeltaDataTypeVersion), + VersionAlreadyExists(i64), /// Error returned when user attempts to commit actions that don't belong to the next version. #[error("Delta transaction failed, version {0} does not follow {1}")] - VersionMismatch(DeltaDataTypeVersion, DeltaDataTypeVersion), + VersionMismatch(i64, i64), /// A Feature is missing to perform operation #[error("Delta-rs must be build with feature '{feature}' to support loading from: {url}.")] MissingFeature { @@ -287,7 +287,7 @@ pub struct DeltaTableMetaData { /// An array containing the names of columns by which the data should be partitioned pub partition_columns: Vec, /// The time when this metadata action is created, in milliseconds since the Unix epoch - pub created_time: Option, + pub created_time: Option, /// table properties pub configuration: HashMap>, } @@ -458,7 +458,7 @@ impl From for LoadCheckpointError { #[derive(Debug)] pub enum PeekCommit { /// The next commit version and associated actions - New(DeltaDataTypeVersion, Vec), + New(i64, Vec), /// Provided DeltaVersion is up to date UpToDate, } @@ -474,7 +474,7 @@ pub struct DeltaTable { /// file metadata for latest checkpoint last_check_point: Option, /// table versions associated with timestamps - version_timestamp: HashMap, + version_timestamp: HashMap, } impl Serialize for DeltaTable { @@ -609,16 +609,14 @@ impl DeltaTable { } /// This method scans delta logs to find the earliest delta log version - async fn get_earliest_delta_log_version( - &self, - ) -> Result { + async fn get_earliest_delta_log_version(&self) -> Result { // TODO check if regex matches against path lazy_static! { static ref DELTA_LOG_REGEX: Regex = Regex::new(r#"^_delta_log/(\d{20})\.(json|checkpoint)*$"#).unwrap(); } - let mut current_delta_log_ver = DeltaDataTypeVersion::MAX; + let mut current_delta_log_ver = i64::MAX; // Get file objects from table. let mut stream = self.storage.list(Some(self.storage.log_path())).await?; @@ -627,7 +625,7 @@ impl DeltaTable { if let Some(captures) = DELTA_LOG_REGEX.captures(obj_meta.location.as_ref()) { let log_ver_str = captures.get(1).unwrap().as_str(); - let log_ver: DeltaDataTypeVersion = log_ver_str.parse().unwrap(); + let log_ver: i64 = log_ver_str.parse().unwrap(); if log_ver < current_delta_log_ver { current_delta_log_ver = log_ver; } @@ -642,10 +640,7 @@ impl DeltaTable { match self.storage.get(&last_checkpoint_path).await { Ok(data) => Ok(serde_json::from_slice(&data.bytes().await?)?), Err(ObjectStoreError::NotFound { .. }) => { - match self - .find_latest_check_point_for_version(DeltaDataTypeVersion::MAX) - .await - { + match self.find_latest_check_point_for_version(i64::MAX).await { Ok(Some(cp)) => Ok(cp), _ => Err(LoadCheckpointError::NotFound), } @@ -656,7 +651,7 @@ impl DeltaTable { async fn find_latest_check_point_for_version( &self, - version: DeltaDataTypeVersion, + version: i64, ) -> Result, DeltaTableError> { lazy_static! { static ref CHECKPOINT_REGEX: Regex = @@ -681,7 +676,7 @@ impl DeltaTable { }?; if let Some(captures) = CHECKPOINT_REGEX.captures(obj_meta.location.as_ref()) { let curr_ver_str = captures.get(1).unwrap().as_str(); - let curr_ver: DeltaDataTypeVersion = curr_ver_str.parse().unwrap(); + let curr_ver: i64 = curr_ver_str.parse().unwrap(); if curr_ver > version { // skip checkpoints newer than max version continue; @@ -698,7 +693,7 @@ impl DeltaTable { if let Some(captures) = CHECKPOINT_PARTS_REGEX.captures(obj_meta.location.as_ref()) { let curr_ver_str = captures.get(1).unwrap().as_str(); - let curr_ver: DeltaDataTypeVersion = curr_ver_str.parse().unwrap(); + let curr_ver: i64 = curr_ver_str.parse().unwrap(); if curr_ver > version { // skip checkpoints newer than max version continue; @@ -726,7 +721,7 @@ impl DeltaTable { Ok(()) } - async fn get_latest_version(&mut self) -> Result { + async fn get_latest_version(&mut self) -> Result { let mut version = match self.get_last_checkpoint().await { Ok(last_check_point) => last_check_point.version + 1, Err(LoadCheckpointError::NotFound) => { @@ -772,7 +767,7 @@ impl DeltaTable { } /// Currently loaded version of the table - pub fn version(&self) -> DeltaDataTypeVersion { + pub fn version(&self) -> i64 { self.state.version() } @@ -786,7 +781,7 @@ impl DeltaTable { /// Get the list of actions for the next commit pub async fn peek_next_commit( &self, - current_version: DeltaDataTypeVersion, + current_version: i64, ) -> Result { let next_version = current_version + 1; let commit_uri = commit_uri_from_version(next_version); @@ -848,7 +843,7 @@ impl DeltaTable { /// It assumes that the table is already updated to the current version `self.version`. pub async fn update_incremental( &mut self, - max_version: Option, + max_version: Option, ) -> Result<(), DeltaTableError> { debug!( "incremental update with version({}) and max_version({max_version:?})", @@ -879,10 +874,7 @@ impl DeltaTable { } /// Loads the DeltaTable state for the given version. - pub async fn load_version( - &mut self, - version: DeltaDataTypeVersion, - ) -> Result<(), DeltaTableError> { + pub async fn load_version(&mut self, version: i64) -> Result<(), DeltaTableError> { // check if version is valid let commit_uri = commit_uri_from_version(version); match self.storage.head(&commit_uri).await { @@ -916,7 +908,7 @@ impl DeltaTable { pub(crate) async fn get_version_timestamp( &mut self, - version: DeltaDataTypeVersion, + version: i64, ) -> Result { match self.version_timestamp.get(&version) { Some(ts) => Ok(*ts), @@ -944,7 +936,7 @@ impl DeltaTable { None => self.get_earliest_delta_log_version().await?, }; let mut commit_infos_list = vec![]; - let mut earliest_commit: Option = None; + let mut earliest_commit: Option = None; loop { match DeltaTableState::from_commit(self, version).await { @@ -1074,7 +1066,7 @@ impl DeltaTable { } /// Returns the current version of the DeltaTable based on the loaded metadata. - pub fn get_app_transaction_version(&self) -> &HashMap { + pub fn get_app_transaction_version(&self) -> &HashMap { self.state.app_transaction_version() } @@ -1162,8 +1154,8 @@ impl DeltaTable { pub async fn try_commit_transaction( &mut self, commit: &PreparedCommit, - version: DeltaDataTypeVersion, - ) -> Result { + version: i64, + ) -> Result { // move temporary commit file to delta log directory // rely on storage to fail if the file already exists - self.storage @@ -1382,7 +1374,7 @@ impl<'a> DeltaTransaction<'a> { &mut self, operation: Option, app_metadata: Option>, - ) -> Result { + ) -> Result { // TODO: stubbing `operation` parameter (which will be necessary for writing the CommitInfo action), // but leaving it unused for now. `CommitInfo` is a fairly dynamic data structure so we should work // out the data structure approach separately. @@ -1454,10 +1446,7 @@ impl<'a> DeltaTransaction<'a> { } #[allow(deprecated)] - async fn try_commit_loop( - &mut self, - commit: &PreparedCommit, - ) -> Result { + async fn try_commit_loop(&mut self, commit: &PreparedCommit) -> Result { let mut attempt_number: u32 = 0; loop { self.delta_table.update().await?; @@ -1540,7 +1529,7 @@ pub async fn open_table_with_storage_options( /// Infers the storage backend to use from the scheme in the given table path. pub async fn open_table_with_version( table_uri: impl AsRef, - version: DeltaDataTypeVersion, + version: i64, ) -> Result { let table = DeltaTableBuilder::from_uri(table_uri) .with_version(version) diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index 477104208f..83ac45ec45 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -27,7 +27,6 @@ use crate::operations::write::write_execution_plan; use crate::storage::DeltaObjectStore; use crate::storage::ObjectStoreRef; use crate::table_state::DeltaTableState; -use crate::DeltaDataTypeVersion; use crate::DeltaTable; use crate::DeltaTableError; @@ -485,7 +484,7 @@ async fn execute( state: SessionState, writer_properties: Option, app_metadata: Option>, -) -> DeltaResult<((Vec, DeltaDataTypeVersion), DeleteMetrics)> { +) -> DeltaResult<((Vec, i64), DeleteMetrics)> { let mut metrics = DeleteMetrics::default(); let exec_start = Instant::now(); diff --git a/rust/src/operations/filesystem_check.rs b/rust/src/operations/filesystem_check.rs index 265b6aee50..0448de228c 100644 --- a/rust/src/operations/filesystem_check.rs +++ b/rust/src/operations/filesystem_check.rs @@ -15,7 +15,7 @@ use crate::action::{Action, Add, DeltaOperation, Remove}; use crate::operations::transaction::commit; use crate::storage::DeltaObjectStore; use crate::table_state::DeltaTableState; -use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError}; +use crate::{DeltaResult, DeltaTable, DeltaTableError}; use futures::future::BoxFuture; use futures::StreamExt; pub use object_store::path::Path; @@ -133,7 +133,7 @@ impl FileSystemCheckPlan { for file in self.files_to_remove { let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); - let deletion_time = deletion_time.as_millis() as DeltaDataTypeLong; + let deletion_time = deletion_time.as_millis() as i64; removed_file_paths.push(file.path.clone()); actions.push(Action::remove(Remove { path: file.path, diff --git a/rust/src/operations/optimize.rs b/rust/src/operations/optimize.rs index b1ffd1dbe2..e0853d298d 100644 --- a/rust/src/operations/optimize.rs +++ b/rust/src/operations/optimize.rs @@ -27,10 +27,7 @@ use crate::storage::ObjectStoreRef; use crate::table_state::DeltaTableState; use crate::writer::utils::arrow_schema_without_partitions; use crate::writer::utils::PartitionPath; -use crate::DeltaDataTypeVersion; -use crate::{ - DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError, ObjectMeta, PartitionFilter, -}; +use crate::{DeltaResult, DeltaTable, DeltaTableError, ObjectMeta, PartitionFilter}; use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use futures::future::BoxFuture; use futures::StreamExt; @@ -73,21 +70,21 @@ pub struct Metrics { #[serde(rename_all = "camelCase")] pub struct MetricDetails { /// Minimum file size of a operation - pub min: DeltaDataTypeLong, + pub min: i64, /// Maximum file size of a operation - pub max: DeltaDataTypeLong, + pub max: i64, /// Average file size of a operation pub avg: f64, /// Number of files encountered during operation pub total_files: usize, /// Sum of file sizes of a operation - pub total_size: DeltaDataTypeLong, + pub total_size: i64, } impl Default for MetricDetails { fn default() -> Self { MetricDetails { - min: DeltaDataTypeLong::MAX, + min: i64::MAX, max: 0, avg: 0.0, total_files: 0, @@ -136,7 +133,7 @@ impl<'a> OptimizeBuilder<'a> { } /// Set the target file size - pub fn with_target_size(mut self, target: DeltaDataTypeLong) -> Self { + pub fn with_target_size(mut self, target: i64) -> Self { self.target_size = Some(target); self } @@ -184,7 +181,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { #[derive(Debug)] struct OptimizeInput { - target_size: DeltaDataTypeLong, + target_size: i64, } impl From for DeltaOperation { @@ -200,7 +197,7 @@ impl From for DeltaOperation { #[derive(Debug)] struct MergeBin { files: Vec, - size_bytes: DeltaDataTypeLong, + size_bytes: i64, } #[derive(Debug)] @@ -234,7 +231,7 @@ impl MergeBin { fn create_remove( path: &str, partitions: &HashMap>, - size: DeltaDataTypeLong, + size: i64, ) -> Result { // NOTE unwrap is safe since UNIX_EPOCH will always be earlier then now. let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); @@ -266,7 +263,7 @@ pub struct MergePlan { /// Properties passed to parquet writer writer_properties: WriterProperties, /// Version of the table at beginning of optimization. Used for conflict resolution. - read_table_version: DeltaDataTypeVersion, + read_table_version: i64, } impl MergePlan { @@ -389,7 +386,7 @@ impl MergePlan { pub fn create_merge_plan( snapshot: &DeltaTableState, filters: &[PartitionFilter<'_, &str>], - target_size: Option, + target_size: Option, writer_properties: WriterProperties, ) -> Result { let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size()); diff --git a/rust/src/operations/transaction/conflict_checker.rs b/rust/src/operations/transaction/conflict_checker.rs index 6cc15fc751..dd27a71cd2 100644 --- a/rust/src/operations/transaction/conflict_checker.rs +++ b/rust/src/operations/transaction/conflict_checker.rs @@ -8,7 +8,7 @@ use super::CommitInfo; use crate::action::{Action, Add, DeltaOperation, MetaData, Protocol, Remove}; use crate::delta_config::IsolationLevel; use crate::storage::commit_uri_from_version; -use crate::{table_state::DeltaTableState, DeltaDataTypeVersion, DeltaResult, DeltaTableError}; +use crate::{table_state::DeltaTableState, DeltaResult, DeltaTableError}; #[cfg(feature = "datafusion")] use super::state::AddContainer; @@ -205,8 +205,8 @@ pub(crate) struct WinningCommitSummary { impl WinningCommitSummary { pub async fn try_new( object_store: &dyn ObjectStore, - read_version: DeltaDataTypeVersion, - winning_commit_version: DeltaDataTypeVersion, + read_version: i64, + winning_commit_version: i64, ) -> DeltaResult { // NOTE using asser, since a wrong version would right now mean a bug in our code. assert_eq!(winning_commit_version, read_version + 1); diff --git a/rust/src/operations/transaction/mod.rs b/rust/src/operations/transaction/mod.rs index c60b6fe5cb..cae0f9e610 100644 --- a/rust/src/operations/transaction/mod.rs +++ b/rust/src/operations/transaction/mod.rs @@ -8,7 +8,7 @@ use serde_json::{Map, Value}; use crate::action::{Action, CommitInfo, DeltaOperation}; use crate::storage::commit_uri_from_version; use crate::table_state::DeltaTableState; -use crate::{crate_version, DeltaDataTypeVersion, DeltaResult, DeltaTableError}; +use crate::{crate_version, DeltaResult, DeltaTableError}; mod conflict_checker; #[cfg(feature = "datafusion")] @@ -25,7 +25,7 @@ const DELTA_LOG_FOLDER: &str = "_delta_log"; pub enum TransactionError { /// Version already exists #[error("Tried committing existing table version: {0}")] - VersionAlreadyExists(DeltaDataTypeVersion), + VersionAlreadyExists(i64), /// Error returned when reading the delta log object failed. #[error("Error serializing commit log to json: {json_err}")] @@ -134,8 +134,8 @@ pub(crate) async fn prepare_commit<'a>( async fn try_commit_transaction( storage: &dyn ObjectStore, tmp_commit: &Path, - version: DeltaDataTypeVersion, -) -> Result { + version: i64, +) -> Result { // move temporary commit file to delta log directory // rely on storage to fail if the file already exists - storage @@ -160,7 +160,7 @@ pub async fn commit( operation: DeltaOperation, read_snapshot: &DeltaTableState, app_metadata: Option>, -) -> DeltaResult { +) -> DeltaResult { let tmp_commit = prepare_commit(storage, &operation, actions, app_metadata).await?; let max_attempts = 5; diff --git a/rust/src/operations/vacuum.rs b/rust/src/operations/vacuum.rs index ca253edbad..f381d282b1 100644 --- a/rust/src/operations/vacuum.rs +++ b/rust/src/operations/vacuum.rs @@ -23,7 +23,7 @@ use crate::storage::DeltaObjectStore; use crate::table_state::DeltaTableState; -use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError}; +use crate::{DeltaResult, DeltaTable, DeltaTableError}; use chrono::{Duration, Utc}; use futures::future::BoxFuture; use futures::{StreamExt, TryStreamExt}; @@ -41,9 +41,9 @@ enum VacuumError { )] InvalidVacuumRetentionPeriod { /// User provided retention on vacuum call - provided: DeltaDataTypeLong, + provided: i64, /// Minimal retention configured in delta table config - min: DeltaDataTypeLong, + min: i64, }, /// Error returned diff --git a/rust/src/schema.rs b/rust/src/schema.rs index 931a225301..cd3022fd90 100644 --- a/rust/src/schema.rs +++ b/rust/src/schema.rs @@ -10,14 +10,6 @@ use crate::DeltaTableError; /// Type alias for a string expected to match a GUID/UUID format pub type Guid = String; -/// Type alias for i64/Delta long -pub type DeltaDataTypeLong = i64; -/// Type alias representing the expected type (i64) of a Delta table version. -pub type DeltaDataTypeVersion = DeltaDataTypeLong; -/// Type alias representing the expected type (i64/ms since Unix epoch) of a Delta timestamp. -pub type DeltaDataTypeTimestamp = DeltaDataTypeLong; -/// Type alias for i32/Delta int -pub type DeltaDataTypeInt = i32; static STRUCT_TAG: &str = "struct"; static ARRAY_TAG: &str = "array"; diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index c9fe842f93..8cd744a99e 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -5,7 +5,7 @@ pub mod file; pub mod utils; use self::config::StorageOptions; -use crate::{DeltaDataTypeVersion, DeltaResult}; +use crate::DeltaResult; use bytes::Bytes; use futures::{stream::BoxStream, StreamExt}; @@ -38,7 +38,7 @@ lazy_static! { } /// Return the uri of commit version. -pub(crate) fn commit_uri_from_version(version: DeltaDataTypeVersion) -> Path { +pub(crate) fn commit_uri_from_version(version: i64) -> Path { let version = format!("{version:020}.json"); DELTA_LOG_PATH.child(version.as_str()) } diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 889e2eb36a..631b41781e 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -6,10 +6,7 @@ use crate::partitions::{DeltaTablePartition, PartitionFilter}; use crate::schema::SchemaDataType; use crate::storage::commit_uri_from_version; use crate::Schema; -use crate::{ - ApplyLogError, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable, DeltaTableError, - DeltaTableMetaData, -}; +use crate::{ApplyLogError, DeltaTable, DeltaTableError, DeltaTableMetaData}; use chrono::Utc; use lazy_static::lazy_static; use object_store::{path::Path, ObjectStore}; @@ -27,7 +24,7 @@ use super::{CheckPoint, DeltaTableConfig}; #[serde(rename_all = "camelCase")] pub struct DeltaTableState { // current table version represented by this table state - version: DeltaDataTypeVersion, + version: i64, // A remove action should remain in the state of the table as a tombstone until it has expired. // A tombstone expires when the creation timestamp of the delta file exceeds the expiration tombstones: HashSet, @@ -35,21 +32,21 @@ pub struct DeltaTableState { files: Vec, // Information added to individual commits commit_infos: Vec, - app_transaction_version: HashMap, + app_transaction_version: HashMap, min_reader_version: i32, min_writer_version: i32, // table metadata corresponding to current version current_metadata: Option, // retention period for tombstones in milli-seconds - tombstone_retention_millis: DeltaDataTypeLong, + tombstone_retention_millis: i64, // retention period for log entries in milli-seconds - log_retention_millis: DeltaDataTypeLong, + log_retention_millis: i64, enable_expired_log_cleanup: bool, } impl DeltaTableState { /// Create Table state with specified version - pub fn with_version(version: DeltaDataTypeVersion) -> Self { + pub fn with_version(version: i64) -> Self { Self { version, ..Self::default() @@ -57,15 +54,12 @@ impl DeltaTableState { } /// Return table version - pub fn version(&self) -> DeltaDataTypeVersion { + pub fn version(&self) -> i64 { self.version } /// Construct a delta table state object from commit version. - pub async fn from_commit( - table: &DeltaTable, - version: DeltaDataTypeVersion, - ) -> Result { + pub async fn from_commit(table: &DeltaTable, version: i64) -> Result { let commit_uri = commit_uri_from_version(version); let commit_log_bytes = table.storage.get(&commit_uri).await?.bytes().await?; let reader = BufReader::new(Cursor::new(commit_log_bytes)); @@ -84,10 +78,7 @@ impl DeltaTableState { } /// Construct a delta table state object from a list of actions - pub fn from_actions( - actions: Vec, - version: DeltaDataTypeVersion, - ) -> Result { + pub fn from_actions(actions: Vec, version: i64) -> Result { let mut new_state = DeltaTableState::with_version(version); for action in actions { new_state.process_action(action, true, true)?; @@ -169,12 +160,12 @@ impl DeltaTableState { } /// Retention of tombstone in milliseconds. - pub fn tombstone_retention_millis(&self) -> DeltaDataTypeLong { + pub fn tombstone_retention_millis(&self) -> i64 { self.tombstone_retention_millis } /// Retention of logs in milliseconds. - pub fn log_retention_millis(&self) -> DeltaDataTypeLong { + pub fn log_retention_millis(&self) -> i64 { self.log_retention_millis } @@ -211,7 +202,7 @@ impl DeltaTableState { /// HashMap containing the last txn version stored for every app id writing txn /// actions. - pub fn app_transaction_version(&self) -> &HashMap { + pub fn app_transaction_version(&self) -> &HashMap { &self.app_transaction_version } diff --git a/rust/src/table_state_arrow.rs b/rust/src/table_state_arrow.rs index 74a6492248..2bedfc76a8 100644 --- a/rust/src/table_state_arrow.rs +++ b/rust/src/table_state_arrow.rs @@ -4,7 +4,6 @@ use crate::action::{ColumnCountStat, ColumnValueStat, Stats}; use crate::table_state::DeltaTableState; -use crate::DeltaDataTypeLong; use crate::DeltaTableError; use crate::SchemaDataType; use crate::SchemaTypeStruct; @@ -335,7 +334,7 @@ impl DeltaTableState { .as_ref() .map(|stat| resolve_column_count_stat(&stat.null_count, &path)) }) - .collect::>>(); + .collect::>>(); let null_count = Some(value_vec_to_array(null_count, |values| { Ok(Arc::new(arrow::array::Int64Array::from(values))) })?); @@ -532,7 +531,7 @@ fn resolve_column_value_stat<'a>( fn resolve_column_count_stat( values: &HashMap, path: &[&str], -) -> Option { +) -> Option { let mut current = values; let (&name, path) = path.split_last()?; for &segment in path { diff --git a/rust/src/writer/mod.rs b/rust/src/writer/mod.rs index 3c3c40d0aa..522ab3614d 100644 --- a/rust/src/writer/mod.rs +++ b/rust/src/writer/mod.rs @@ -2,7 +2,7 @@ //! Abstractions and implementations for writing data to delta tables use crate::action::{Action, Add, ColumnCountStat}; -use crate::{DeltaDataTypeVersion, DeltaTable, DeltaTableError}; +use crate::{DeltaTable, DeltaTableError}; use arrow::{datatypes::SchemaRef, datatypes::*, error::ArrowError}; use async_trait::async_trait; @@ -123,10 +123,7 @@ pub trait DeltaWriter { /// Flush the internal write buffers to files in the delta table folder structure. /// and commit the changes to the Delta log, creating a new table version. - async fn flush_and_commit( - &mut self, - table: &mut DeltaTable, - ) -> Result { + async fn flush_and_commit(&mut self, table: &mut DeltaTable) -> Result { let mut adds = self.flush().await?; #[allow(deprecated)] let mut tx = table.create_transaction(None); diff --git a/rust/src/writer/stats.rs b/rust/src/writer/stats.rs index 779486dc4a..a120c39b3a 100644 --- a/rust/src/writer/stats.rs +++ b/rust/src/writer/stats.rs @@ -2,7 +2,6 @@ use super::*; use crate::{ action::{Add, ColumnValueStat, Stats}, time_utils::timestamp_to_delta_stats_string, - DeltaDataTypeLong, }; use arrow::{ array::{ @@ -65,7 +64,7 @@ pub(crate) fn apply_null_counts( match col_struct { ColumnCountStat::Value(n) => { - let null_count = column.null_count() as DeltaDataTypeLong; + let null_count = column.null_count() as i64; let n = null_count + *n; null_counts.insert(key, ColumnCountStat::Value(n)); }