Skip to content

Commit

Permalink
refactor!: remove DeltaDataType aliases
Browse files Browse the repository at this point in the history
Removing the aliases in favor of the actual type to improve readability
of the code and reduce confusion around the existence of the types.

`DeltaDataTypeLong` replaced with `i64`
`DeltaDataTypeVersion` replaced with `i64`
`DeltaDataTypeTimestamp` replaced with `i64`
`DeltaDataTypeInt` replaced with `i32`
  • Loading branch information
cmackenzie1 committed May 23, 2023
1 parent 6799d2e commit 0a3e530
Show file tree
Hide file tree
Showing 19 changed files with 124 additions and 169 deletions.
7 changes: 2 additions & 5 deletions aws/delta-checkpoint/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

use deltalake::checkpoints;
use deltalake::checkpoints::CheckpointError;
use deltalake::DeltaDataTypeVersion;
use lambda_runtime::{service_fn, Error, LambdaEvent};
use lazy_static::lazy_static;
use log::*;
Expand Down Expand Up @@ -78,9 +77,7 @@ fn bucket_and_key_from_event(event: &Value) -> Result<(String, String), CheckPoi
Ok((bucket, key))
}

fn table_path_and_version_from_key(
key: &str,
) -> Result<(String, DeltaDataTypeVersion), CheckPointLambdaError> {
fn table_path_and_version_from_key(key: &str) -> Result<(String, i64), CheckPointLambdaError> {
lazy_static! {
static ref JSON_LOG_ENTRY_REGEX: Regex =
Regex::new(r#"(.*)/_delta_log/0*(\d+)\.json$"#).unwrap();
Expand All @@ -97,7 +94,7 @@ fn table_path_and_version_from_key(
.get(2)
.ok_or_else(|| CheckPointLambdaError::ObjectKeyParseFailed(key.to_string()))?
.as_str();
let version = version_str.parse::<DeltaDataTypeVersion>()?;
let version = version_str.parse::<i64>()?;

Ok((table_path, version))
}
Expand Down
16 changes: 8 additions & 8 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use deltalake::operations::optimize::OptimizeBuilder;
use deltalake::operations::transaction::commit;
use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::partitions::PartitionFilter;
use deltalake::{DeltaDataTypeLong, DeltaDataTypeTimestamp, DeltaOps, Invariant, Schema};
use deltalake::{DeltaOps, Invariant, Schema};
use pyo3::create_exception;
use pyo3::exceptions::PyException;
use pyo3::exceptions::PyValueError;
Expand Down Expand Up @@ -107,7 +107,7 @@ struct RawDeltaTableMetaData {
#[pyo3(get)]
partition_columns: Vec<String>,
#[pyo3(get)]
created_time: Option<deltalake::DeltaDataTypeTimestamp>,
created_time: Option<i64>,
#[pyo3(get)]
configuration: HashMap<String, Option<String>>,
}
Expand All @@ -118,7 +118,7 @@ impl RawDeltaTable {
#[pyo3(signature = (table_uri, version = None, storage_options = None, without_files = false))]
fn new(
table_uri: &str,
version: Option<deltalake::DeltaDataTypeLong>,
version: Option<i64>,
storage_options: Option<HashMap<String, String>>,
without_files: bool,
) -> PyResult<Self> {
Expand Down Expand Up @@ -198,7 +198,7 @@ impl RawDeltaTable {
))
}

pub fn load_version(&mut self, version: deltalake::DeltaDataTypeVersion) -> PyResult<()> {
pub fn load_version(&mut self, version: i64) -> PyResult<()> {
rt()?
.block_on(self._table.load_version(version))
.map_err(PyDeltaTableError::from_raw)
Expand Down Expand Up @@ -317,7 +317,7 @@ impl RawDeltaTable {
pub fn optimize(
&mut self,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
target_size: Option<DeltaDataTypeLong>,
target_size: Option<i64>,
) -> PyResult<String> {
let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone());
if let Some(size) = target_size {
Expand Down Expand Up @@ -717,7 +717,7 @@ fn save_mode_from_str(value: &str) -> PyResult<SaveMode> {
}
}

fn current_timestamp() -> DeltaDataTypeTimestamp {
fn current_timestamp() -> i64 {
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
Expand All @@ -728,9 +728,9 @@ fn current_timestamp() -> DeltaDataTypeTimestamp {
#[derive(FromPyObject)]
pub struct PyAddAction {
path: String,
size: DeltaDataTypeLong,
size: i64,
partition_values: HashMap<String, Option<String>>,
modification_time: DeltaDataTypeTimestamp,
modification_time: i64,
data_change: bool,
stats: Option<String>,
}
Expand Down
34 changes: 17 additions & 17 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub enum ColumnCountStat {
/// Composite HashMap representation of statistics.
Column(HashMap<String, ColumnCountStat>),
/// Json representation of statistics.
Value(DeltaDataTypeLong),
Value(i64),
}

impl ColumnCountStat {
Expand All @@ -109,7 +109,7 @@ impl ColumnCountStat {
}

/// Returns the serde_json representation of the ColumnCountStat.
pub fn as_value(&self) -> Option<DeltaDataTypeLong> {
pub fn as_value(&self) -> Option<i64> {
match self {
ColumnCountStat::Value(v) => Some(*v),
_ => None,
Expand All @@ -122,7 +122,7 @@ impl ColumnCountStat {
#[serde(rename_all = "camelCase")]
pub struct Stats {
/// Number of records in the file associated with the log action.
pub num_records: DeltaDataTypeLong,
pub num_records: i64,

// start of per column stats
/// Contains a value smaller than all values present in the file for all columns.
Expand All @@ -137,7 +137,7 @@ pub struct Stats {
#[derive(Debug, Default)]
pub struct StatsParsed {
/// Number of records in the file associated with the log action.
pub num_records: DeltaDataTypeLong,
pub num_records: i64,

// start of per column stats
/// Contains a value smaller than all values present in the file for all columns.
Expand All @@ -154,7 +154,7 @@ pub struct StatsParsed {
/// Contains a value larger than all values present in the file for all columns.
pub max_values: HashMap<String, String>,
/// The number of null values for all columns.
pub null_count: HashMap<String, DeltaDataTypeLong>,
pub null_count: HashMap<String, i64>,
}

/// Delta AddCDCFile action that describes a parquet CDC data file.
Expand All @@ -165,7 +165,7 @@ pub struct AddCDCFile {
/// absolute path to a CDC file
pub path: String,
/// The size of this file in bytes
pub size: DeltaDataTypeLong,
pub size: i64,
/// A map from partition column to value for this file
pub partition_values: HashMap<String, Option<String>>,
/// Should always be set to false because they do not change the underlying data of the table
Expand All @@ -181,7 +181,7 @@ pub struct Add {
/// A relative path, from the root of the table, to a file that should be added to the table
pub path: String,
/// The size of this file in bytes
pub size: DeltaDataTypeLong,
pub size: i64,
/// A map from partition column to value for this file
pub partition_values: HashMap<String, Option<String>>,
/// Partition values stored in raw parquet struct format. In this struct, the column names
Expand All @@ -205,7 +205,7 @@ pub struct Add {
#[serde(skip_serializing, skip_deserializing)]
pub partition_values_parsed: Option<String>,
/// The time this file was created, as milliseconds since the epoch
pub modification_time: DeltaDataTypeTimestamp,
pub modification_time: i64,
/// When false the file must already be present in the table or the records in the added file
/// must be contained in one or more remove actions in the same version
///
Expand Down Expand Up @@ -327,7 +327,7 @@ pub struct MetaData {
/// An array containing the names of columns by which the data should be partitioned
pub partition_columns: Vec<String>,
/// The time when this metadata action is created, in milliseconds since the Unix epoch
pub created_time: Option<DeltaDataTypeTimestamp>,
pub created_time: Option<i64>,
/// A map containing configuration options for the table
pub configuration: HashMap<String, Option<String>>,
}
Expand Down Expand Up @@ -367,7 +367,7 @@ pub struct Remove {
/// The path of the file that is removed from the table.
pub path: String,
/// The timestamp when the remove was added to table state.
pub deletion_timestamp: Option<DeltaDataTypeTimestamp>,
pub deletion_timestamp: Option<i64>,
/// Whether data is changed by the remove. A table optimize will report this as false for
/// example, since it adds and removes files by combining many files into one.
pub data_change: bool,
Expand All @@ -379,7 +379,7 @@ pub struct Remove {
/// A map from partition column to value for this file.
pub partition_values: Option<HashMap<String, Option<String>>>,
/// Size of this file in bytes
pub size: Option<DeltaDataTypeLong>,
pub size: Option<i64>,
/// Map containing metadata about this file
pub tags: Option<HashMap<String, Option<String>>>,
}
Expand Down Expand Up @@ -425,9 +425,9 @@ pub struct Txn {
/// A unique identifier for the application performing the transaction.
pub app_id: String,
/// An application-specific numeric identifier for this transaction.
pub version: DeltaDataTypeVersion,
pub version: i64,
/// The time when this transaction action was created in milliseconds since the Unix epoch.
pub last_updated: Option<DeltaDataTypeTimestamp>,
pub last_updated: Option<i64>,
}

/// Action used to increase the version of the Delta protocol required to read or write to the
Expand All @@ -437,10 +437,10 @@ pub struct Txn {
pub struct Protocol {
/// Minimum version of the Delta read protocol a client must implement to correctly read the
/// table.
pub min_reader_version: DeltaDataTypeInt,
pub min_reader_version: i32,
/// Minimum version of the Delta write protocol a client must implement to correctly read the
/// table.
pub min_writer_version: DeltaDataTypeInt,
pub min_writer_version: i32,
}

/// The commitInfo is a fairly flexible action within the delta specification, where arbitrary data can be stored.
Expand All @@ -451,7 +451,7 @@ pub struct Protocol {
pub struct CommitInfo {
/// Timestamp in millis when the commit was created
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp: Option<DeltaDataTypeTimestamp>,
pub timestamp: Option<i64>,
/// Id of the user invoking the commit
#[serde(skip_serializing_if = "Option::is_none")]
pub user_id: Option<String>,
Expand Down Expand Up @@ -569,7 +569,7 @@ pub enum DeltaOperation {
/// The filter used to determine which partitions to filter
predicate: Option<String>,
/// Target optimize size
target_size: DeltaDataTypeLong,
target_size: i64,
},
#[serde(rename_all = "camelCase")]
/// Represents a `FileSystemCheck` operation
Expand Down
24 changes: 10 additions & 14 deletions rust/src/action/parquet2_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ mod string;
mod validity;

use crate::action::{Action, Add, CommitInfo, MetaData, Protocol, Remove, Txn};
use crate::schema::{
DeltaDataTypeInt, DeltaDataTypeLong, DeltaDataTypeTimestamp, DeltaDataTypeVersion, Guid,
};
use crate::schema::Guid;
use boolean::for_each_boolean_field_value;
use map::for_each_map_field_value;
use primitive::for_each_primitive_field_value;
Expand Down Expand Up @@ -253,7 +251,7 @@ fn deserialize_txn_column_page(
page,
dict,
descriptor,
|action: &mut Txn, v: DeltaDataTypeVersion| action.version = v,
|action: &mut Txn, v: i64| action.version = v,
)?;
}
"appId" => {
Expand All @@ -271,7 +269,7 @@ fn deserialize_txn_column_page(
page,
dict,
descriptor,
|action: &mut Txn, v: DeltaDataTypeTimestamp| action.last_updated = Some(v),
|action: &mut Txn, v: i64| action.last_updated = Some(v),
)?;
}
_ => {
Expand Down Expand Up @@ -309,7 +307,7 @@ fn deserialize_add_column_page(
page,
dict,
descriptor,
|action: &mut Add, v: DeltaDataTypeLong| action.size = v,
|action: &mut Add, v: i64| action.size = v,
)?;
}
"partitionValues" => {
Expand Down Expand Up @@ -363,7 +361,7 @@ fn deserialize_add_column_page(
page,
dict,
descriptor,
|action: &mut Add, v: DeltaDataTypeTimestamp| action.modification_time = v,
|action: &mut Add, v: i64| action.modification_time = v,
)?;
}
_ => {
Expand Down Expand Up @@ -398,9 +396,7 @@ fn deserialize_remove_column_page(
page,
dict,
descriptor,
|action: &mut Remove, v: DeltaDataTypeTimestamp| {
action.deletion_timestamp = Some(v)
},
|action: &mut Remove, v: i64| action.deletion_timestamp = Some(v),
)?;
}
"size" => {
Expand All @@ -409,7 +405,7 @@ fn deserialize_remove_column_page(
page,
dict,
descriptor,
|action: &mut Remove, v: DeltaDataTypeLong| action.size = Some(v),
|action: &mut Remove, v: i64| action.size = Some(v),
)?;
}
// FIXME suport partitionValueParsed
Expand Down Expand Up @@ -556,7 +552,7 @@ fn deserialize_metadata_column_page(
page,
dict,
descriptor,
|action: &mut MetaData, v: DeltaDataTypeTimestamp| action.created_time = Some(v),
|action: &mut MetaData, v: i64| action.created_time = Some(v),
)?;
}
"configuration" => {
Expand Down Expand Up @@ -595,7 +591,7 @@ fn deserialize_protocol_column_page(
page,
dict,
descriptor,
|action: &mut Protocol, v: DeltaDataTypeInt| action.min_reader_version = v,
|action: &mut Protocol, v: i32| action.min_reader_version = v,
)?;
}
"minWriterVersion" => {
Expand All @@ -604,7 +600,7 @@ fn deserialize_protocol_column_page(
page,
dict,
descriptor,
|action: &mut Protocol, v: DeltaDataTypeInt| action.min_writer_version = v,
|action: &mut Protocol, v: i32| action.min_writer_version = v,
)?;
}
_ => {
Expand Down
5 changes: 2 additions & 3 deletions rust/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::path::PathBuf;
use std::sync::Arc;

use crate::delta::{DeltaResult, DeltaTable, DeltaTableError};
use crate::schema::DeltaDataTypeVersion;
use crate::storage::config::StorageOptions;
use crate::storage::{DeltaObjectStore, ObjectStoreRef};

Expand Down Expand Up @@ -44,7 +43,7 @@ pub enum DeltaVersion {
#[default]
Newest,
/// specify the version to load
Version(DeltaDataTypeVersion),
Version(i64),
/// specify the timestamp in UTC
Timestamp(DateTime<Utc>),
}
Expand Down Expand Up @@ -148,7 +147,7 @@ impl DeltaTableBuilder {
}

/// Sets `version` to the builder
pub fn with_version(mut self, version: DeltaDataTypeVersion) -> Self {
pub fn with_version(mut self, version: i64) -> Self {
self.options.version = DeltaVersion::Version(version);
self
}
Expand Down
Loading

0 comments on commit 0a3e530

Please sign in to comment.