diff --git a/Cargo.lock b/Cargo.lock index d089d80016..f79ec8e131 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -96,9 +96,9 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "arrow" -version = "4.0.0" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "014c7490f839d9dd4ce28f8232d4731f8b1fb93fa477d69e6fe881814ac2b6bb" +checksum = "93811be1c0f60f4b29d80b34dad4e59fdc397a9e580f849df9e2635701498663" dependencies = [ "cfg_aliases", "chrono", @@ -108,6 +108,7 @@ dependencies = [ "indexmap", "lazy_static", "lexical-core", + "multiversion", "num", "prettytable-rs", "rand 0.7.3", @@ -636,7 +637,7 @@ dependencies = [ [[package]] name = "deltalake-python" -version = "0.4.9" +version = "0.5.0" dependencies = [ "arrow", "deltalake", @@ -1438,6 +1439,26 @@ dependencies = [ "winapi", ] +[[package]] +name = "multiversion" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "025c962a3dd3cc5e0e520aa9c612201d127dcdf28616974961a649dca64f5373" +dependencies = [ + "multiversion-macros", +] + +[[package]] +name = "multiversion-macros" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8a3e2bde382ebf960c1f3e79689fa5941625fe9bf694a1cb64af3e85faff3af" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "native-tls" version = "0.2.7" @@ -1479,9 +1500,9 @@ dependencies = [ [[package]] name = "num" -version = "0.3.1" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b7a8e9be5e039e2ff869df49155f1c06bd01ade2117ec783e56ab0932b67a8f" +checksum = "43db66d1170d347f9a065114077f7dccb00c1b9478c89384490a3425279a4606" dependencies = [ "num-bigint", "num-complex", @@ -1493,9 +1514,9 @@ dependencies = [ [[package]] name = "num-bigint" -version = "0.3.2" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d0a3d5e207573f948a9e5376662aa743a2ea13f7c50a554d7af443a73fbfeba" +checksum = "4e0d047c1062aa51e256408c560894e5251f08925980e53cf1aa5bd00eec6512" dependencies = [ "autocfg", "num-integer", @@ -1504,9 +1525,9 @@ dependencies = [ [[package]] name = "num-complex" -version = "0.3.1" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "747d632c0c558b87dbabbe6a82f3b4ae03720d0646ac5b7b4dae89394be5f2c5" +checksum = "26873667bbbb7c5182d4a37c1add32cdf09f841af72da53318fdb81543c15085" dependencies = [ "num-traits", ] @@ -1534,9 +1555,9 @@ dependencies = [ [[package]] name = "num-rational" -version = "0.3.2" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12ac428b1cb17fce6f731001d307d351ec70a6d202fc2e60f7d4c5e42d8f4f07" +checksum = "d41702bd167c2df5520b384281bc111a4b5efcf7fbc4c9c222c815b07e0a6a6a" dependencies = [ "autocfg", "num-bigint", @@ -1702,12 +1723,12 @@ dependencies = [ [[package]] name = "parquet" -version = "4.0.0" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32858ae16bd61fda406be4b76af617d2f632fed4f0093810245aa4f5316ac865" +checksum = "9275a7f8eab04e6ab6918b4fdd50e00aeba3c288e0f91bdc5da87a2c8ff288a6" dependencies = [ "arrow", - "base64 0.12.3", + "base64 0.13.0", "brotli", "byteorder", "chrono", @@ -3268,18 +3289,18 @@ checksum = "81a974bcdd357f0dca4d41677db03436324d45a4c9ed2d0b873a5a360ce41c36" [[package]] name = "zstd" -version = "0.7.0+zstd.1.4.9" +version = "0.8.2+zstd.1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9428752481d8372e15b1bf779ea518a179ad6c771cca2d2c60e4fbff3cc2cd52" +checksum = "c83508bcbbdc9c3abcf77e8e56773d3ffcd2479e0933caab2e7d6b5a9e183aae" dependencies = [ "zstd-safe", ] [[package]] name = "zstd-safe" -version = "3.1.0+zstd.1.4.9" +version = "4.1.0+zstd.1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aa1926623ad7fe406e090555387daf73db555b948134b4d73eac5eb08fb666d" +checksum = "d30375f78e185ca4c91930f42ea2c0162f9aa29737032501f93b79266d985ae7" dependencies = [ "libc", "zstd-sys", @@ -3287,9 +3308,9 @@ dependencies = [ [[package]] name = "zstd-sys" -version = "1.5.0+zstd.1.4.9" +version = "1.6.0+zstd.1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e6c094340240369025fc6b731b054ee2a834328fa584310ac96aa4baebdc465" +checksum = "2141bed8922b427761470e6bbfeff255da94fa20b0bbeab0d9297fcaf71e3aa7" dependencies = [ "cc", "libc", diff --git a/rust/src/action.rs b/rust/src/action.rs index 73a7715d3d..de5b9a10bb 100644 --- a/rust/src/action.rs +++ b/rust/src/action.rs @@ -364,7 +364,7 @@ pub struct Format { /// Action that describes the metadata of the table. /// This is a top-level action in Delta log entries. -#[derive(Serialize, Deserialize, Debug, Default)] +#[derive(Serialize, Deserialize, Debug, Default, Clone)] #[serde(rename_all = "camelCase")] pub struct MetaData { /// Unique identifier for this table @@ -612,7 +612,7 @@ pub struct Txn { /// An application-specific numeric identifier for this transaction. pub version: DeltaDataTypeVersion, /// The time when this transaction action was created in milliseconds since the Unix epoch. - pub last_updated: DeltaDataTypeTimestamp, + pub last_updated: Option, } impl Txn { @@ -635,9 +635,11 @@ impl Txn { .map_err(|_| gen_action_type_error("txn", "version", "long"))?; } "lastUpdated" => { - re.last_updated = record - .get_long(i) - .map_err(|_| gen_action_type_error("txn", "lastUpdated", "long"))?; + re.last_updated = Some( + record + .get_long(i) + .map_err(|_| gen_action_type_error("txn", "lastUpdated", "long"))?, + ); } _ => { log::warn!( diff --git a/rust/src/checkpoints.rs b/rust/src/checkpoints.rs new file mode 100644 index 0000000000..5977903c36 --- /dev/null +++ b/rust/src/checkpoints.rs @@ -0,0 +1,198 @@ +//! Implementation for writing delta checkpoints. + +use arrow::datatypes::Schema as ArrowSchema; +use arrow::error::ArrowError; +use arrow::json::reader::Decoder; +use log::*; +use parquet::arrow::ArrowWriter; +use parquet::errors::ParquetError; +use parquet::file::writer::InMemoryWriteableCursor; +use std::convert::TryFrom; + +use super::action; +use super::delta_arrow::delta_log_schema_for_table; +use super::open_table_with_version; +use super::schema::*; +use super::storage::{StorageBackend, StorageError}; +use super::{CheckPoint, DeltaTableError, DeltaTableState}; + +/// Error returned when the CheckPointWriter is unable to write a checkpoint. +#[derive(thiserror::Error, Debug)] +pub enum CheckPointWriterError { + /// Error returned when the DeltaTableState does not contain a metadata action. + #[error("DeltaTableMetadata not present in DeltaTableState")] + MissingMetaData, + /// Passthrough error returned when calling DeltaTable. + #[error("DeltaTableError: {source}")] + DeltaTable { + /// The source DeltaTableError. + #[from] + source: DeltaTableError, + }, + /// Error returned when the parquet writer fails while writing the checkpoint. + #[error("Failed to write parquet: {}", .source)] + ParquetError { + /// Parquet error details returned when writing the checkpoint failed. + #[from] + source: ParquetError, + }, + /// Error returned when converting the schema to Arrow format failed. + #[error("Failed to convert into Arrow schema: {}", .source)] + ArrowError { + /// Arrow error details returned when converting the schema in Arrow format failed + #[from] + source: ArrowError, + }, + /// Passthrough error returned when calling StorageBackend. + #[error("StorageError: {source}")] + Storage { + /// The source StorageError. + #[from] + source: StorageError, + }, + /// Passthrough error returned by serde_json. + #[error("serde_json::Error: {source}")] + JSONSerialization { + /// The source serde_json::Error. + #[from] + source: serde_json::Error, + }, +} + +/// Struct for writing checkpoints to the delta log. +pub struct CheckPointWriter { + table_uri: String, + delta_log_uri: String, + last_checkpoint_uri: String, + storage: Box, +} + +impl CheckPointWriter { + /// Creates a new CheckPointWriter. + pub fn new(table_uri: &str, storage: Box) -> Self { + let delta_log_uri = storage.join_path(table_uri, "_delta_log"); + let last_checkpoint_uri = storage.join_path(delta_log_uri.as_str(), "_last_checkpoint"); + + Self { + table_uri: table_uri.to_string(), + delta_log_uri, + last_checkpoint_uri, + storage, + } + } + + /// Creates a new checkpoint at the specified version. + /// NOTE: This method loads a new instance of delta table to determine the state to + /// checkpoint. + pub async fn create_checkpoint_for_version( + &self, + version: DeltaDataTypeVersion, + ) -> Result<(), CheckPointWriterError> { + let table = open_table_with_version(self.table_uri.as_str(), version).await?; + + self.create_checkpoint_from_state(version, table.get_state()) + .await + } + + /// Creates a new checkpoint at the specified version from the given DeltaTableState. + pub async fn create_checkpoint_from_state( + &self, + version: DeltaDataTypeVersion, + state: &DeltaTableState, + ) -> Result<(), CheckPointWriterError> { + // TODO: checkpoints _can_ be multi-part... haven't actually found a good reference for + // an appropriate split point yet though so only writing a single part currently. + // See https://github.com/delta-io/delta-rs/issues/288 + + info!("Writing parquet bytes to checkpoint buffer."); + let parquet_bytes = self.parquet_bytes_from_state(state)?; + + let size = parquet_bytes.len() as i64; + + let checkpoint = CheckPoint::new(version, size, None); + + let file_name = format!("{:020}.checkpoint.parquet", version); + let checkpoint_uri = self.storage.join_path(&self.delta_log_uri, &file_name); + + info!("Writing checkpoint to {:?}.", checkpoint_uri); + self.storage + .put_obj(&checkpoint_uri, &parquet_bytes) + .await?; + + let last_checkpoint_content: serde_json::Value = serde_json::to_value(&checkpoint)?; + let last_checkpoint_content = serde_json::to_string(&last_checkpoint_content)?; + + info!( + "Writing _last_checkpoint to {:?}.", + self.last_checkpoint_uri + ); + self.storage + .put_obj( + self.last_checkpoint_uri.as_str(), + last_checkpoint_content.as_bytes(), + ) + .await?; + + Ok(()) + } + + fn parquet_bytes_from_state( + &self, + state: &DeltaTableState, + ) -> Result, CheckPointWriterError> { + let current_metadata = state + .current_metadata() + .ok_or(CheckPointWriterError::MissingMetaData)?; + + let mut jsons = std::iter::once(action::Action::protocol(action::Protocol { + min_reader_version: state.min_reader_version(), + min_writer_version: state.min_writer_version(), + })) + .chain(std::iter::once(action::Action::metaData( + action::MetaData::try_from(current_metadata.clone())?, + ))) + .chain(state.files().iter().map(|f| action::Action::add(f.clone()))) + .chain( + state + .tombstones() + .iter() + .map(|f| action::Action::remove(f.clone())), + ) + .chain( + state + .app_transaction_version() + .iter() + .map(|(app_id, version)| { + action::Action::txn(action::Txn { + app_id: app_id.clone(), + version: *version, + last_updated: None, + }) + }), + ) + .map(|a| serde_json::to_value(a).map_err(ArrowError::from)); + + debug!("Preparing checkpoint parquet buffer."); + let arrow_schema = delta_log_schema_for_table( + >::try_from(¤t_metadata.schema)?, + current_metadata.partition_columns.as_slice(), + ); + let writeable_cursor = InMemoryWriteableCursor::default(); + let mut writer = + ArrowWriter::try_new(writeable_cursor.clone(), arrow_schema.clone(), None)?; + + debug!("Writing to checkpoint parquet buffer..."); + let batch_size = state.app_transaction_version().len() + + state.tombstones().len() + + state.files().len() + + 2; + let decoder = Decoder::new(arrow_schema, batch_size, None); + while let Some(batch) = decoder.next_batch(&mut jsons)? { + writer.write(&batch)?; + } + let _ = writer.close()?; + debug!("Finished writing checkpoint parquet buffer."); + + Ok(writeable_cursor.data()) + } +} diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 506d95de14..b39d348d8a 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -1,17 +1,13 @@ //! Delta Table read and write implementation // Reference: https://github.com/delta-io/delta/blob/master/PROTOCOL.md - -use std::cmp::Ordering; -use std::collections::{HashMap, HashSet}; -use std::fmt; -use std::io::{BufRead, BufReader, Cursor}; +// use arrow::error::ArrowError; use chrono::{DateTime, FixedOffset, Utc}; use futures::StreamExt; use lazy_static::lazy_static; -use log::debug; +use log::*; use parquet::errors::ParquetError; use parquet::file::{ reader::{FileReader, SerializedFileReader}, @@ -20,8 +16,13 @@ use parquet::file::{ use regex::Regex; use serde::{Deserialize, Serialize}; use serde_json::Value; +use std::collections::HashMap; use std::convert::TryFrom; +use std::fmt; +use std::io::{BufRead, BufReader, Cursor}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::{cmp::Ordering, collections::HashSet}; +use uuid::Uuid; use super::action; use super::action::{Action, DeltaOperation}; @@ -29,7 +30,6 @@ use super::partitions::{DeltaTablePartition, PartitionFilter}; use super::schema::*; use super::storage; use super::storage::{parse_uri, StorageBackend, StorageError, UriError}; -use uuid::Uuid; /// Metadata for a checkpoint file #[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)] @@ -40,6 +40,21 @@ pub struct CheckPoint { parts: Option, // 10 digits decimals } +impl CheckPoint { + /// Creates a new checkpoint from the given parameters. + pub(crate) fn new( + version: DeltaDataTypeVersion, + size: DeltaDataTypeLong, + parts: Option, + ) -> Self { + Self { + version, + size, + parts, + } + } +} + impl PartialEq for CheckPoint { fn eq(&self, other: &Self) -> bool { self.version == other.version @@ -199,6 +214,42 @@ impl fmt::Display for DeltaTableMetaData { } } +impl TryFrom for DeltaTableMetaData { + type Error = serde_json::error::Error; + + fn try_from(action_metadata: action::MetaData) -> Result { + let schema = action_metadata.get_schema()?; + Ok(Self { + id: action_metadata.id, + name: action_metadata.name, + description: action_metadata.description, + format: action_metadata.format, + schema, + partition_columns: action_metadata.partition_columns, + created_time: action_metadata.created_time, + configuration: action_metadata.configuration, + }) + } +} + +impl TryFrom for action::MetaData { + type Error = serde_json::error::Error; + + fn try_from(metadata: DeltaTableMetaData) -> Result { + let schema_string = serde_json::to_string(&metadata.schema)?; + Ok(Self { + id: metadata.id, + name: metadata.name, + description: metadata.description, + format: metadata.format, + schema_string, + partition_columns: metadata.partition_columns, + created_time: metadata.created_time, + configuration: metadata.configuration, + }) + } +} + /// Error related to Delta log application #[derive(thiserror::Error, Debug)] pub enum ApplyLogError { @@ -266,8 +317,9 @@ impl From for LoadCheckpointError { } } -#[derive(Default, Debug)] -struct DeltaTableState { +/// State snapshot currently held by the Delta Table instance. +#[derive(Default, Debug, Clone)] +pub struct DeltaTableState { // A remove action should remain in the state of the table as a tombstone until it has expired. // A tombstone expires when the creation timestamp of the delta file exceeds the expiration tombstones: Vec, @@ -279,6 +331,40 @@ struct DeltaTableState { current_metadata: Option, } +impl DeltaTableState { + /// Full list of tombstones (remove actions) representing files removed from table state). + pub fn tombstones(&self) -> &Vec { + self.tombstones.as_ref() + } + + /// Full list of add actions representing all parquet files that are part of the current + /// delta table state. + pub fn files(&self) -> &Vec { + self.files.as_ref() + } + + /// HashMap containing the last txn version stored for every app id writing txn + /// actions. + pub fn app_transaction_version(&self) -> &HashMap { + &self.app_transaction_version + } + + /// The min reader version required by the protocol. + pub fn min_reader_version(&self) -> i32 { + self.min_reader_version + } + + /// The min writer version required by the protocol. + pub fn min_writer_version(&self) -> i32 { + self.min_writer_version + } + + /// The most recent metadata of the table. + pub fn current_metadata(&self) -> Option<&DeltaTableMetaData> { + self.current_metadata.as_ref() + } +} + #[inline] /// Return path relative to parent_path fn extract_rel_path<'a, 'b>( @@ -423,7 +509,7 @@ impl DeltaTable { ) -> Result<(), ApplyLogError> { for line in reader.lines() { let action: Action = serde_json::from_str(line?.as_str())?; - process_action(&mut self.state, &action)?; + process_action(&mut self.state, action)?; } Ok(()) @@ -453,7 +539,7 @@ impl DeltaTable { for record in preader.get_row_iter(None)? { process_action( &mut self.state, - &Action::from_parquet_record(&schema, &record)?, + Action::from_parquet_record(&schema, &record)?, )?; } } @@ -740,6 +826,11 @@ impl DeltaTable { .collect() } + /// Returns the currently loaded state snapshot. + pub fn get_state(&self) -> &DeltaTableState { + &self.state + } + /// Returns the metadata associated with the loaded state. pub fn get_metadata(&self) -> Result<&DeltaTableMetaData, DeltaTableError> { self.state @@ -1357,40 +1448,31 @@ fn log_entry_from_actions(actions: &[Action]) -> Result Result<(), serde_json::error::Error> { match action { Action::add(v) => { - state.files.push(v.clone()); + state.files.push(v); } Action::remove(v) => { state.files.retain(|a| *a.path != v.path); - state.tombstones.push(v.clone()); + state.tombstones.push(v); } Action::protocol(v) => { state.min_reader_version = v.min_reader_version; state.min_writer_version = v.min_writer_version; } Action::metaData(v) => { - state.current_metadata = Some(DeltaTableMetaData { - id: v.id.clone(), - name: v.name.clone(), - description: v.description.clone(), - format: v.format.clone(), - schema: v.get_schema()?, - partition_columns: v.partition_columns.clone(), - created_time: v.created_time, - configuration: v.configuration.clone(), - }); + state.current_metadata = Some(DeltaTableMetaData::try_from(v)?); } Action::txn(v) => { *state .app_transaction_version - .entry(v.app_id.clone()) + .entry(v.app_id) .or_insert(v.version) = v.version; } Action::commitInfo(v) => { - state.commit_infos.push(v.clone()); + state.commit_infos.push(v); } } @@ -1432,7 +1514,7 @@ pub async fn open_table_with_ds(table_uri: &str, ds: &str) -> Result &'static str { env!("CARGO_PKG_VERSION") } @@ -1462,10 +1544,10 @@ mod tests { let txn_action = Action::txn(action::Txn { app_id: "abc".to_string(), version: 2, - last_updated: 0, + last_updated: Some(0), }); - let _ = process_action(&mut state, &txn_action).unwrap(); + let _ = process_action(&mut state, txn_action).unwrap(); assert_eq!(2, *state.app_transaction_version.get("abc").unwrap()); assert_eq!(1, *state.app_transaction_version.get("xyz").unwrap()); diff --git a/rust/src/delta_arrow.rs b/rust/src/delta_arrow.rs index b654e2aa57..79af411088 100644 --- a/rust/src/delta_arrow.rs +++ b/rust/src/delta_arrow.rs @@ -2,7 +2,7 @@ use crate::schema; use arrow::datatypes::{ - DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, TimeUnit, + DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef, TimeUnit, }; use arrow::error::ArrowError; use lazy_static::lazy_static; @@ -116,6 +116,11 @@ impl TryFrom<&schema::SchemaDataType> for ArrowDataType { a )?))) } + // NOTE: this doesn't currently support maps with string keys + // See below arrow-rs issues for adding arrow::datatypes::DataType::Map to support a + // more general map type: + // https://github.com/apache/arrow-rs/issues/395 + // https://github.com/apache/arrow-rs/issues/396 schema::SchemaDataType::map(m) => Ok(ArrowDataType::Dictionary( Box::new( >::try_from( @@ -131,3 +136,193 @@ impl TryFrom<&schema::SchemaDataType> for ArrowDataType { } } } + +pub(crate) fn delta_log_schema_for_table( + table_schema: ArrowSchema, + partition_columns: &[String], +) -> SchemaRef { + lazy_static! { + static ref SCHEMA_FIELDS: Vec = vec![ + ArrowField::new( + "metaData", + ArrowDataType::Struct(vec![ + ArrowField::new("id", ArrowDataType::Utf8, true), + ArrowField::new("name", ArrowDataType::Utf8, true), + ArrowField::new("description", ArrowDataType::Utf8, true), + ArrowField::new("schemaString", ArrowDataType::Utf8, true), + ArrowField::new("createdTime", ArrowDataType::Int64, true), + ArrowField::new("partitionColumns", ArrowDataType::List(Box::new( + ArrowField::new("element", ArrowDataType::Utf8, true))), true), + ArrowField::new("format", ArrowDataType::Struct(vec![ + ArrowField::new("provider", ArrowDataType::Utf8, true), + // TODO: Add "options" after ArrowDataType::Map support + ]), true), + ]), + true + ), + ArrowField::new( + "protocol", + ArrowDataType::Struct(vec![ + ArrowField::new("minReaderVersion", ArrowDataType::Int32, true), + ArrowField::new("minWriterVersion", ArrowDataType::Int32, true), + ]), + true + ), + ArrowField::new( + "txn", + ArrowDataType::Struct(vec![ + ArrowField::new("appId", ArrowDataType::Utf8, true), + ArrowField::new("version", ArrowDataType::Int64, true), + ]), + true + ), + ArrowField::new( + "remove", + ArrowDataType::Struct(vec![ + ArrowField::new("path", ArrowDataType::Utf8, true), + ArrowField::new("deletionTimestamp", ArrowDataType::Int64, true), + ArrowField::new("dataChange", ArrowDataType::Boolean, true), + ArrowField::new("extendedFileMetadata", ArrowDataType::Boolean, true), + ArrowField::new("size", ArrowDataType::Int64, true), + // TODO: Add "partitionValues" after ArrowDataType::Map support + // TODO: Add "tags" after ArrowDataType::Map support + ]), + true + ) + ]; + static ref ADD_FIELDS: Vec = vec![ + ArrowField::new("path", ArrowDataType::Utf8, true), + ArrowField::new("size", ArrowDataType::Int64, true), + ArrowField::new("modificationTime", ArrowDataType::Int64, true), + ArrowField::new("dataChange", ArrowDataType::Boolean, true), + ArrowField::new("stats", ArrowDataType::Utf8, true), + // TODO: Add "partitionValues" after ArrowDataType::Map support + // TODO: Add "tags" after ArrowDataType::Map support + ]; + } + + let (partition_fields, non_partition_fields): (Vec, Vec) = table_schema + .fields() + .iter() + .map(|f| f.to_owned()) + .partition(|field| partition_columns.contains(&field.name())); + + let mut stats_parsed_fields: Vec = + vec![ArrowField::new("numRecords", ArrowDataType::Int64, true)]; + + if !non_partition_fields.is_empty() { + stats_parsed_fields.extend(["minValues", "maxValues", "nullCounts"].iter().map(|name| { + ArrowField::new( + name, + ArrowDataType::Struct(non_partition_fields.clone()), + true, + ) + })); + } + + let mut add_fields = ADD_FIELDS.clone(); + + add_fields.push(ArrowField::new( + "stats_parsed", + ArrowDataType::Struct(stats_parsed_fields), + true, + )); + + if !partition_fields.is_empty() { + add_fields.push(ArrowField::new( + "partitionValues_parsed", + ArrowDataType::Struct(partition_fields), + true, + )); + } + + let mut schema_fields = SCHEMA_FIELDS.clone(); + schema_fields.push(ArrowField::new( + "add", + ArrowDataType::Struct(add_fields), + true, + )); + + let arrow_schema = ArrowSchema::new(schema_fields); + + std::sync::Arc::new(arrow_schema) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + + #[test] + fn delta_log_schema_for_table_test() { + // NOTE: We should future proof the checkpoint schema in case action schema changes. + // See https://github.com/delta-io/delta-rs/issues/287 + + let table_schema = ArrowSchema::new(vec![ + ArrowField::new("pcol", ArrowDataType::Int32, true), + ArrowField::new("col1", ArrowDataType::Int32, true), + ]); + let partition_columns = vec!["pcol".to_string()]; + let log_schema = delta_log_schema_for_table(table_schema, partition_columns.as_slice()); + + let expected_fields = vec!["metaData", "protocol", "txn", "remove", "add"]; + for f in log_schema.fields().iter() { + assert!(expected_fields.contains(&f.name().as_str())); + } + let add_fields: Vec<_> = log_schema + .fields() + .iter() + .filter(|f| f.name() == "add") + .map(|f| { + if let ArrowDataType::Struct(fields) = f.data_type() { + fields.iter().map(|f| f.clone()) + } else { + unreachable!(); + } + }) + .flatten() + .collect(); + assert_eq!(7, add_fields.len()); + + let add_field_map: HashMap<_, _> = add_fields + .iter() + .map(|f| (f.name().to_owned(), f.clone())) + .collect(); + + let partition_values_parsed = add_field_map.get("partitionValues_parsed").unwrap(); + if let ArrowDataType::Struct(fields) = partition_values_parsed.data_type() { + assert_eq!(1, fields.len()); + let field = fields.get(0).unwrap().to_owned(); + assert_eq!(ArrowField::new("pcol", ArrowDataType::Int32, true), field); + } else { + unreachable!(); + } + + let stats_parsed = add_field_map.get("stats_parsed").unwrap(); + if let ArrowDataType::Struct(fields) = stats_parsed.data_type() { + assert_eq!(4, fields.len()); + + let field_map: HashMap<_, _> = fields + .iter() + .map(|f| (f.name().to_owned(), f.clone())) + .collect(); + + for (k, v) in field_map.iter() { + match k.as_ref() { + "minValues" | "maxValues" | "nullCounts" => match v.data_type() { + ArrowDataType::Struct(fields) => { + assert_eq!(1, fields.len()); + let field = fields.get(0).unwrap().to_owned(); + assert_eq!(ArrowField::new("col1", ArrowDataType::Int32, true), field); + } + _ => unreachable!(), + }, + "numRecords" => {} + _ => panic!(), + } + } + } else { + unreachable!(); + } + } +} diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 226306fc7d..0bc128e73a 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -58,6 +58,7 @@ extern crate serde_json; extern crate thiserror; pub mod action; +pub mod checkpoints; mod delta; pub mod delta_arrow; pub mod partitions; diff --git a/rust/src/schema.rs b/rust/src/schema.rs index 0a71ecdce1..098f66caed 100644 --- a/rust/src/schema.rs +++ b/rust/src/schema.rs @@ -1,8 +1,7 @@ #![allow(non_snake_case, non_camel_case_types)] -use std::collections::HashMap; - use serde::{Deserialize, Serialize}; +use std::collections::HashMap; /// Type alias for a string expected to match a GUID/UUID format pub type Guid = String; @@ -17,7 +16,7 @@ pub type DeltaDataTypeInt = i32; /// Represents a struct field defined in the Delta table schema. // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Schema-Serialization-Format -#[derive(Serialize, Deserialize, Debug, Default, Clone)] +#[derive(Serialize, Deserialize, PartialEq, Debug, Default, Clone)] pub struct SchemaTypeStruct { // type field is always the string "struct", so we are ignoring it here r#type: String, @@ -32,7 +31,7 @@ impl SchemaTypeStruct { } /// Describes a specific field of the Delta table schema. -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] pub struct SchemaField { // Name of this (possibly nested) column name: String, @@ -67,7 +66,7 @@ impl SchemaField { } /// Schema definition for array type fields. -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] pub struct SchemaTypeArray { // type field is always the string "array", so we are ignoring it here r#type: String, @@ -91,7 +90,7 @@ impl SchemaTypeArray { } /// Schema definition for map type fields. -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] pub struct SchemaTypeMap { r#type: String, keyType: Box, @@ -133,7 +132,7 @@ impl SchemaTypeMap { * timestamp: Microsecond precision timestamp without a timezone */ /// Enum with variants for each top level schema data type. -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] #[serde(untagged)] pub enum SchemaDataType { /// Variant representing non-array, non-map, non-struct fields. Wrapped value will contain the diff --git a/rust/src/writer.rs b/rust/src/writer.rs index d2eede8703..33a631a268 100644 --- a/rust/src/writer.rs +++ b/rust/src/writer.rs @@ -176,13 +176,13 @@ impl ParquetBuffer { } } -struct InMemValueIter<'a> { +pub(crate) struct InMemValueIter<'a> { buffer: &'a [Value], current_index: usize, } impl<'a> InMemValueIter<'a> { - fn from_vec(buffer: &'a [Value]) -> Self { + pub(crate) fn from_vec(buffer: &'a [Value]) -> Self { Self { buffer, current_index: 0, diff --git a/rust/tests/checkpoint_writer_test.rs b/rust/tests/checkpoint_writer_test.rs new file mode 100644 index 0000000000..a6e8edf584 --- /dev/null +++ b/rust/tests/checkpoint_writer_test.rs @@ -0,0 +1,82 @@ +extern crate deltalake; + +use deltalake::checkpoints::CheckPointWriter; +use deltalake::storage; +use std::fs; +use std::path::{Path, PathBuf}; + +// NOTE: The below is a useful external command for inspecting the written checkpoint schema visually: +// parquet-tools inspect tests/data/checkpoints/_delta_log/00000000000000000005.checkpoint.parquet + +#[tokio::test] +async fn write_simple_checkpoint() { + let table_location = "./tests/data/checkpoints"; + let table_path = PathBuf::from(table_location); + let log_path = table_path.join("_delta_log"); + + // Delete checkpoint files from previous runs + cleanup_checkpoint_files(log_path.as_path()); + + // Load the delta table at version 5 + let table = deltalake::open_table_with_version(table_location, 5) + .await + .unwrap(); + + // Write a checkpoint + let storage_backend = storage::get_backend_for_uri(table_location).unwrap(); + let checkpoint_writer = CheckPointWriter::new(table_location, storage_backend); + let _ = checkpoint_writer + .create_checkpoint_from_state(table.version, table.get_state()) + .await + .unwrap(); + + // checkpoint should exist + let checkpoint_path = log_path.join("00000000000000000005.checkpoint.parquet"); + assert!(checkpoint_path.as_path().exists()); + + // HACK: seems like a race condition exists reading the file back in. + // Without the sleep, frequently fails with: + // Error("EOF while parsing a value", line: 1, column: 0)' + std::thread::sleep(std::time::Duration::from_secs(1)); + + // _last_checkpoint should exist + let last_checkpoint_path = log_path.join("_last_checkpoint"); + assert!(last_checkpoint_path.as_path().exists()); + + // _last_checkpoint should point to the correct version + let last_checkpoint_content = fs::read_to_string(last_checkpoint_path.as_path()).unwrap(); + let last_checkpoint_content: serde_json::Value = + serde_json::from_str(last_checkpoint_content.trim()).unwrap(); + + let version = last_checkpoint_content + .get("version") + .unwrap() + .as_i64() + .unwrap(); + assert_eq!(5, version); + + // delta table should load just fine with the checkpoint in place + let table_result = deltalake::open_table(table_location).await.unwrap(); + let table = table_result; + let files = table.get_files(); + assert_eq!(11, files.len()); +} + +fn cleanup_checkpoint_files(log_path: &Path) { + let paths = fs::read_dir(log_path).unwrap(); + + for p in paths { + match p { + Ok(d) => { + let path = d.path(); + + if path.file_name().unwrap() == "_last_checkpoint" + || (path.extension().is_some() && path.extension().unwrap() == "parquet") + { + fs::remove_file(path).unwrap(); + } + } + _ => {} + } + } +} diff --git a/rust/tests/data/checkpoints/_delta_log/.gitignore b/rust/tests/data/checkpoints/_delta_log/.gitignore new file mode 100644 index 0000000000..8624856880 --- /dev/null +++ b/rust/tests/data/checkpoints/_delta_log/.gitignore @@ -0,0 +1,3 @@ +*.parquet +_last_checkpoint + diff --git a/rust/tests/data/checkpoints/_delta_log/00000000000000000000.json b/rust/tests/data/checkpoints/_delta_log/00000000000000000000.json new file mode 100644 index 0000000000..90f4a993cf --- /dev/null +++ b/rust/tests/data/checkpoints/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"commitInfo":{"timestamp":1615751699523,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"cf3741a3-5f93-434f-99ac-9a4bebcdf06c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"version\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1615751699422}} +{"add":{"path":"part-00000-3810fbe0-9892-431d-bcfd-7de5788dfe8d-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751699515,"dataChange":true}} diff --git a/rust/tests/data/checkpoints/_delta_log/00000000000000000001.json b/rust/tests/data/checkpoints/_delta_log/00000000000000000001.json new file mode 100644 index 0000000000..66aa3a4d49 --- /dev/null +++ b/rust/tests/data/checkpoints/_delta_log/00000000000000000001.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1615751700281,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}} +{"add":{"path":"part-00000-1abe25d3-0da6-46c5-98c1-7a69872fd797-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751700275,"dataChange":true}} diff --git a/rust/tests/data/checkpoints/_delta_log/00000000000000000002.json b/rust/tests/data/checkpoints/_delta_log/00000000000000000002.json new file mode 100644 index 0000000000..c4430292c4 --- /dev/null +++ b/rust/tests/data/checkpoints/_delta_log/00000000000000000002.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1615751701120,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":1,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}} +{"add":{"path":"part-00000-136c36f5-639d-4e95-bb0f-15cde3fb14eb-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751701112,"dataChange":true}} diff --git a/rust/tests/data/checkpoints/_delta_log/00000000000000000003.json b/rust/tests/data/checkpoints/_delta_log/00000000000000000003.json new file mode 100644 index 0000000000..e91353d69a --- /dev/null +++ b/rust/tests/data/checkpoints/_delta_log/00000000000000000003.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1615751701854,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":2,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}} +{"add":{"path":"part-00000-8e7dc8c1-337b-40b8-a411-46d4295da531-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751701848,"dataChange":true}} diff --git a/rust/tests/data/checkpoints/_delta_log/00000000000000000004.json b/rust/tests/data/checkpoints/_delta_log/00000000000000000004.json new file mode 100644 index 0000000000..20fa6e4471 --- /dev/null +++ b/rust/tests/data/checkpoints/_delta_log/00000000000000000004.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1615751702764,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":3,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}} +{"add":{"path":"part-00000-e93060ad-9c8c-4170-a9da-7c6f53f6406b-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751702758,"dataChange":true}} diff --git a/rust/tests/data/checkpoints/_delta_log/00000000000000000005.json b/rust/tests/data/checkpoints/_delta_log/00000000000000000005.json new file mode 100644 index 0000000000..12a3d009fe --- /dev/null +++ b/rust/tests/data/checkpoints/_delta_log/00000000000000000005.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1615751703539,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":4,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}} +{"add":{"path":"part-00000-e9c6df9a-e585-4c70-bc1f-de9bd8ae025b-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751703532,"dataChange":true}} diff --git a/rust/tests/data/checkpoints/_delta_log/00000000000000000006.json b/rust/tests/data/checkpoints/_delta_log/00000000000000000006.json new file mode 100644 index 0000000000..97d497924e --- /dev/null +++ b/rust/tests/data/checkpoints/_delta_log/00000000000000000006.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1615751704301,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":5,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}} +{"add":{"path":"part-00000-7d239c98-d74b-4b02-b3f6-9f256992c633-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751704295,"dataChange":true}} diff --git a/rust/tests/data/checkpoints/_delta_log/00000000000000000007.json b/rust/tests/data/checkpoints/_delta_log/00000000000000000007.json new file mode 100644 index 0000000000..f34c112437 --- /dev/null +++ b/rust/tests/data/checkpoints/_delta_log/00000000000000000007.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1615751705073,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":6,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}} +{"add":{"path":"part-00000-72ecc4d6-2e44-4df4-99e6-23f1ac2b7b7c-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751705065,"dataChange":true}} diff --git a/rust/tests/data/checkpoints/_delta_log/00000000000000000008.json b/rust/tests/data/checkpoints/_delta_log/00000000000000000008.json new file mode 100644 index 0000000000..61314a9df0 --- /dev/null +++ b/rust/tests/data/checkpoints/_delta_log/00000000000000000008.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1615751705959,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":7,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}} +{"add":{"path":"part-00000-3fa65c69-4e55-4b18-a195-5f1ae583e553-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751705952,"dataChange":true}} diff --git a/rust/tests/data/checkpoints/_delta_log/00000000000000000009.json b/rust/tests/data/checkpoints/_delta_log/00000000000000000009.json new file mode 100644 index 0000000000..4a58463074 --- /dev/null +++ b/rust/tests/data/checkpoints/_delta_log/00000000000000000009.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1615751706703,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":8,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}} +{"add":{"path":"part-00000-9afd9224-729f-4420-a05e-8032113a6568-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751706698,"dataChange":true}} diff --git a/rust/tests/data/checkpoints/_delta_log/00000000000000000010.json b/rust/tests/data/checkpoints/_delta_log/00000000000000000010.json new file mode 100644 index 0000000000..3f6aaf85df --- /dev/null +++ b/rust/tests/data/checkpoints/_delta_log/00000000000000000010.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1615751716705,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":9,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}} +{"add":{"path":"part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751716698,"dataChange":true}}