From 6853bab47315d47f93f3a8675bd1536bbd428f61 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Sun, 9 Feb 2025 15:42:22 -0500 Subject: [PATCH 01/10] feat: Add existing parquet files --- crates/iceberg/src/arrow/reader.rs | 4 +- crates/iceberg/src/scan.rs | 2 +- crates/iceberg/src/spec/datatypes.rs | 23 ++++ crates/iceberg/src/transaction.rs | 195 ++++++++++++++++++++++++++- 4 files changed, 216 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 6fcd59297..22982fb02 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -1117,14 +1117,14 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { /// - `metadata_size_hint`: Provide a hint as to the size of the parquet file's footer. /// - `preload_column_index`: Load the Column Index as part of [`Self::get_metadata`]. /// - `preload_offset_index`: Load the Offset Index as part of [`Self::get_metadata`]. -struct ArrowFileReader { +pub struct ArrowFileReader { meta: FileMetadata, r: R, } impl ArrowFileReader { /// Create a new ArrowFileReader - fn new(meta: FileMetadata, r: R) -> Self { + pub fn new(meta: FileMetadata, r: R) -> Self { Self { meta, r } } } diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 7e05da59a..9383749d7 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -981,7 +981,7 @@ pub mod tests { use crate::TableIdent; pub struct TableTestFixture { - table_location: String, + pub table_location: String, pub table: Table, } diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index c806d16ea..0856280df 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -25,6 +25,7 @@ use std::ops::Index; use std::sync::{Arc, OnceLock}; use ::serde::de::{MapAccess, Visitor}; +use ordered_float::OrderedFloat; use serde::de::{Error, IntoDeserializer}; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use serde_json::Value as JsonValue; @@ -266,6 +267,28 @@ impl PrimitiveType { | (PrimitiveType::Binary, PrimitiveLiteral::Binary(_)) ) } + + /// Used to convert `PrimitiveType` to `Literal`. + pub fn type_to_literal(&self) -> Literal { + Literal::Primitive(match self { + PrimitiveType::Boolean => PrimitiveLiteral::Boolean(false), + PrimitiveType::Int => PrimitiveLiteral::Int(0), + PrimitiveType::Long => PrimitiveLiteral::Long(0), + PrimitiveType::Float => PrimitiveLiteral::Float(OrderedFloat(0.0)), + PrimitiveType::Double => PrimitiveLiteral::Double(OrderedFloat(0.0)), + PrimitiveType::Decimal { .. } => PrimitiveLiteral::Int128(0), + PrimitiveType::Date => PrimitiveLiteral::Int(0), + PrimitiveType::Time => PrimitiveLiteral::Long(0), + PrimitiveType::Timestamp => PrimitiveLiteral::Long(0), + PrimitiveType::Timestamptz => PrimitiveLiteral::Long(0), + PrimitiveType::TimestampNs => PrimitiveLiteral::Long(0), + PrimitiveType::TimestamptzNs => PrimitiveLiteral::Long(0), + PrimitiveType::String => PrimitiveLiteral::String(String::new()), + PrimitiveType::Uuid => PrimitiveLiteral::UInt128(0), + PrimitiveType::Fixed(_size) => PrimitiveLiteral::Binary(Vec::new()), + PrimitiveType::Binary => PrimitiveLiteral::Binary(Vec::new()), + }) + } } impl Serialize for Type { diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index c27a107da..76f6345f9 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -18,19 +18,22 @@ //! This module contains transaction api. use std::cmp::Ordering; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::future::Future; use std::mem::discriminant; use std::ops::RangeFrom; +use parquet::arrow::async_reader::AsyncFileReader; use uuid::Uuid; +use crate::arrow::ArrowFileReader; use crate::error::Result; -use crate::io::OutputFile; +use crate::io::{FileIO, OutputFile}; use crate::spec::{ - DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, ManifestListWriter, - ManifestWriterBuilder, NullOrder, Operation, Snapshot, SnapshotReference, SnapshotRetention, - SortDirection, SortField, SortOrder, Struct, StructType, Summary, Transform, MAIN_BRANCH, + DataContentType, DataFile, DataFileBuilder, DataFileFormat, FormatVersion, ManifestEntry, + ManifestFile, ManifestListWriter, ManifestWriterBuilder, NullOrder, Operation, Snapshot, + SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Struct, StructType, + Summary, TableMetadata, Transform, MAIN_BRANCH, }; use crate::table::Table; use crate::TableUpdate::UpgradeFormatVersion; @@ -169,6 +172,110 @@ impl<'a> Transaction<'a> { catalog.update_table(table_commit).await } + + /// Adds existing parquet files + pub async fn add_parquet_files( + self, + file_paths: Vec, + check_duplicate_files: bool, + ) -> Result> { + if check_duplicate_files { + let unique_paths: HashSet<_> = file_paths.iter().collect(); + if unique_paths.len() != file_paths.len() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Duplicate file paths provided", + )); + } + } + let table_metadata = self.table.metadata(); + + let data_files = Transaction::parquet_files_to_data_files( + &self, + self.table.file_io(), + file_paths, + table_metadata, + ) + .await?; + + let mut fast_append_action = self.fast_append(Some(Uuid::new_v4()), Vec::new())?; + fast_append_action.add_data_files(data_files)?; + + fast_append_action.apply().await + } + + async fn parquet_files_to_data_files( + &self, + file_io: &FileIO, + file_paths: Vec, + table_metadata: &TableMetadata, + ) -> Result> { + let mut data_files: Vec = Vec::new(); + + for file_path in file_paths { + let input_file = file_io.new_input(&file_path)?; + if !input_file.exists().await? { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("File does not exist."), + )); + } + let file_metadata = input_file.metadata().await?; + let file_size_in_bytes = file_metadata.size; + let reader = input_file.reader().await?; + + let mut parquet_reader = ArrowFileReader::new(file_metadata, reader); + let parquet_metadata = parquet_reader.get_metadata().await.map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + format!("Error reading Parquet metadata: {}", err), + ) + })?; + let record_count: u64 = parquet_metadata + .file_metadata() + .num_rows() + .try_into() + .map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + "Negative row count in Parquet metadata", + ) + })?; + let partition_value = + self.create_default_partition_value(&table_metadata.default_partition_type)?; + + // TODO: Add more metadata from parquet's metadata to the `DataFile` + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(file_path) + .file_format(DataFileFormat::Parquet) + .partition(partition_value) + .record_count(record_count) + .file_size_in_bytes(file_size_in_bytes) + .build() + .unwrap(); + data_files.push(data_file); + } + Ok(data_files) + } + + fn create_default_partition_value(&self, partition_type: &StructType) -> Result { + let literals = partition_type + .fields() + .iter() + .map(|field| { + let primitive_type = field.field_type.as_primitive_type().ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "Partition field should only be a primitive type.", + ) + })?; + Ok(Some(primitive_type.type_to_literal())) + }) + .collect::>>()?; + + Ok(Struct::from_iter(literals)) + } } /// FastAppendAction is a transaction action for fast append data files to the table. @@ -607,6 +714,7 @@ mod tests { use std::io::BufReader; use crate::io::FileIOBuilder; + use crate::scan::tests::TableTestFixture; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Struct, TableMetadata, @@ -847,6 +955,7 @@ mod tests { .sequence_number() .expect("Inherit sequence number by load manifest") ); + assert_eq!( new_snapshot.snapshot_id(), manifest.entries()[0].snapshot_id().unwrap() @@ -869,4 +978,80 @@ mod tests { "Should not allow to do same kinds update in same transaction" ); } + + #[tokio::test] + async fn test_add_existing_parquet_files() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + let tx = crate::transaction::Transaction::new(&fixture.table); + + let file_paths = vec![ + format!("{}/1.parquet", &fixture.table_location), + format!("{}/2.parquet", &fixture.table_location), + format!("{}/3.parquet", &fixture.table_location), + ]; + + // attempt to add the existing Parquet files with fast append + let new_tx = tx + .add_parquet_files(file_paths.clone(), true) + .await + .expect("Adding existing Parquet files should succeed"); + + let mut found_add_snapshot = false; + let mut found_set_snapshot_ref = false; + for update in new_tx.updates.iter() { + match update { + TableUpdate::AddSnapshot { .. } => { + found_add_snapshot = true; + } + TableUpdate::SetSnapshotRef { + ref_name, + reference, + } => { + found_set_snapshot_ref = true; + assert_eq!(ref_name, crate::transaction::MAIN_BRANCH); + assert!(reference.snapshot_id > 0); + } + _ => {} + } + } + assert!(found_add_snapshot); + assert!(found_set_snapshot_ref); + + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &new_tx.updates[0] { + snapshot + } else { + panic!("Expected the first update to be an AddSnapshot update"); + }; + + let manifest_list = new_snapshot + .load_manifest_list(fixture.table.file_io(), fixture.table.metadata()) + .await + .expect("Failed to load manifest list"); + + assert_eq!(manifest_list.entries().len(), 2); + + // Load the manifest from the manifest list + let manifest = manifest_list.entries()[0] + .load_manifest(fixture.table.file_io()) + .await + .expect("Failed to load manifest"); + + // Since we added three files with add_parquet_files, check that the manifest contains three entries + assert_eq!(manifest.entries().len(), 3); + + // Verify each file path appears in manifest + let manifest_paths: Vec = manifest + .entries() + .iter() + .map(|entry| entry.data_file().file_path.clone()) + .collect(); + for path in file_paths { + assert!( + manifest_paths.contains(&path), + "Manifest does not contain expected file path: {}", + path + ); + } + } } From 3f258ad11a8e2f03928a96a162a9822bc2b63208 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Sun, 9 Feb 2025 18:06:51 -0500 Subject: [PATCH 02/10] clippy fix --- crates/iceberg/src/transaction.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 76f6345f9..246753332 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -217,7 +217,7 @@ impl<'a> Transaction<'a> { if !input_file.exists().await? { return Err(Error::new( ErrorKind::DataInvalid, - format!("File does not exist."), + "File does not exist".to_string(), )); } let file_metadata = input_file.metadata().await?; From 4cb5b1e782dd9770e5d1a77df5471bb1ed7489c4 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Tue, 11 Feb 2025 19:41:59 -0500 Subject: [PATCH 03/10] change data file builder --- crates/iceberg/src/transaction.rs | 121 ++++++++++++++---- crates/iceberg/src/writer/file_writer/mod.rs | 2 +- .../src/writer/file_writer/parquet_writer.rs | 16 ++- 3 files changed, 105 insertions(+), 34 deletions(-) diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 246753332..40ee8f01c 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -22,20 +22,23 @@ use std::collections::{HashMap, HashSet}; use std::future::Future; use std::mem::discriminant; use std::ops::RangeFrom; +use std::sync::Arc; use parquet::arrow::async_reader::AsyncFileReader; +use parquet::file::metadata::ParquetMetaData; use uuid::Uuid; use crate::arrow::ArrowFileReader; use crate::error::Result; use crate::io::{FileIO, OutputFile}; use crate::spec::{ - DataContentType, DataFile, DataFileBuilder, DataFileFormat, FormatVersion, ManifestEntry, - ManifestFile, ManifestListWriter, ManifestWriterBuilder, NullOrder, Operation, Snapshot, - SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Struct, StructType, - Summary, TableMetadata, Transform, MAIN_BRANCH, + visit_schema, DataContentType, DataFile, DataFileBuilder, DataFileFormat, FormatVersion, + ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriterBuilder, NullOrder, Operation, + SchemaRef, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, + Struct, StructType, Summary, TableMetadata, Transform, MAIN_BRANCH, }; use crate::table::Table; +use crate::writer::file_writer::parquet_writer::{IndexByParquetPathName, MinMaxColAggregator}; use crate::TableUpdate::UpgradeFormatVersion; use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; @@ -211,6 +214,8 @@ impl<'a> Transaction<'a> { table_metadata: &TableMetadata, ) -> Result> { let mut data_files: Vec = Vec::new(); + let partition_value = + self.create_default_partition_value(&table_metadata.default_partition_type)?; for file_path in file_paths { let input_file = file_io.new_input(&file_path)?; @@ -221,7 +226,7 @@ impl<'a> Transaction<'a> { )); } let file_metadata = input_file.metadata().await?; - let file_size_in_bytes = file_metadata.size; + let file_size_in_bytes = file_metadata.size as usize; let reader = input_file.reader().await?; let mut parquet_reader = ArrowFileReader::new(file_metadata, reader); @@ -231,34 +236,94 @@ impl<'a> Transaction<'a> { format!("Error reading Parquet metadata: {}", err), ) })?; - let record_count: u64 = parquet_metadata - .file_metadata() - .num_rows() - .try_into() - .map_err(|_| { - Error::new( - ErrorKind::DataInvalid, - "Negative row count in Parquet metadata", - ) - })?; - let partition_value = - self.create_default_partition_value(&table_metadata.default_partition_type)?; - - // TODO: Add more metadata from parquet's metadata to the `DataFile` - let data_file = DataFileBuilder::default() - .content(DataContentType::Data) - .file_path(file_path) - .file_format(DataFileFormat::Parquet) - .partition(partition_value) - .record_count(record_count) - .file_size_in_bytes(file_size_in_bytes) - .build() - .unwrap(); + let builder = self.parquet_to_data_file_builder( + table_metadata.current_schema().clone(), + parquet_metadata, + &partition_value, + file_size_in_bytes, + file_path, + )?; + let data_file = builder.build().unwrap(); data_files.push(data_file); } Ok(data_files) } + /// `ParquetMetadata` to data file builder + pub fn parquet_to_data_file_builder( + &self, + schema: SchemaRef, + metadata: Arc, + partition: &Struct, + written_size: usize, + file_path: String, + ) -> Result { + let index_by_parquet_path = { + let mut visitor = IndexByParquetPathName::new(); + visit_schema(&schema, &mut visitor)?; + visitor + }; + + let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = { + let mut per_col_size: HashMap = HashMap::new(); + let mut per_col_val_num: HashMap = HashMap::new(); + let mut per_col_null_val_num: HashMap = HashMap::new(); + let mut min_max_agg = MinMaxColAggregator::new(schema); + + for row_group in metadata.row_groups() { + for column_chunk_metadata in row_group.columns() { + let parquet_path = column_chunk_metadata.column_descr().path().string(); + + let Some(&field_id) = index_by_parquet_path.get(&parquet_path) else { + continue; + }; + + *per_col_size.entry(field_id).or_insert(0) += + column_chunk_metadata.compressed_size() as u64; + *per_col_val_num.entry(field_id).or_insert(0) += + column_chunk_metadata.num_values() as u64; + + if let Some(statistics) = column_chunk_metadata.statistics() { + if let Some(null_count) = statistics.null_count_opt() { + *per_col_null_val_num.entry(field_id).or_insert(0) += null_count; + } + + min_max_agg.update(field_id, statistics.clone())?; + } + } + } + ( + per_col_size, + per_col_val_num, + per_col_null_val_num, + min_max_agg.produce(), + ) + }; + + let mut builder = DataFileBuilder::default(); + builder + .content(DataContentType::Data) + .file_path(file_path) + .file_format(DataFileFormat::Parquet) + .partition(partition.clone()) + .record_count(metadata.file_metadata().num_rows() as u64) + .file_size_in_bytes(written_size as u64) + .column_sizes(column_sizes) + .value_counts(value_counts) + .null_value_counts(null_value_counts) + .lower_bounds(lower_bounds) + .upper_bounds(upper_bounds) + .split_offsets( + metadata + .row_groups() + .iter() + .filter_map(|group| group.file_offset()) + .collect(), + ); + + Ok(builder) + } + fn create_default_partition_value(&self, partition_type: &StructType) -> Result { let literals = partition_type .fields() diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index 4a0fffcc1..7237aff6c 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -24,7 +24,7 @@ use super::CurrentFileStatus; use crate::spec::DataFileBuilder; use crate::Result; -mod parquet_writer; +pub mod parquet_writer; pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder}; mod track_writer; diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 5561b1913..c5ee58ac4 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -105,7 +105,8 @@ impl FileWriterBuilder for ParquetWr } } -struct IndexByParquetPathName { +/// A mapping from Parquet column path names to internal field id +pub struct IndexByParquetPathName { name_to_id: HashMap, field_names: Vec, @@ -114,6 +115,7 @@ struct IndexByParquetPathName { } impl IndexByParquetPathName { + /// Creates a new, empty `IndexByParquetPathName` pub fn new() -> Self { Self { name_to_id: HashMap::new(), @@ -122,6 +124,7 @@ impl IndexByParquetPathName { } } + /// Retrieves the internal field ID pub fn get(&self, name: &str) -> Option<&i32> { self.name_to_id.get(name) } @@ -219,14 +222,15 @@ pub struct ParquetWriter { } /// Used to aggregate min and max value of each column. -struct MinMaxColAggregator { +pub struct MinMaxColAggregator { lower_bounds: HashMap, upper_bounds: HashMap, schema: SchemaRef, } impl MinMaxColAggregator { - fn new(schema: SchemaRef) -> Self { + /// Creates new and empty `MinMaxColAggregator` + pub fn new(schema: SchemaRef) -> Self { Self { lower_bounds: HashMap::new(), upper_bounds: HashMap::new(), @@ -256,7 +260,8 @@ impl MinMaxColAggregator { .or_insert(datum); } - fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> { + /// Update statistics + pub fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> { let Some(ty) = self .schema .field_by_id(field_id) @@ -301,7 +306,8 @@ impl MinMaxColAggregator { Ok(()) } - fn produce(self) -> (HashMap, HashMap) { + /// Returns lower and upper bounds + pub fn produce(self) -> (HashMap, HashMap) { (self.lower_bounds, self.upper_bounds) } } From e1dd3559dd911996b41a7e70d7e9582b7e6d5754 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Tue, 11 Feb 2025 20:11:06 -0500 Subject: [PATCH 04/10] fmt fix --- crates/iceberg/src/transaction.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 40ee8f01c..b4ccc9a4a 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -215,7 +215,7 @@ impl<'a> Transaction<'a> { ) -> Result> { let mut data_files: Vec = Vec::new(); let partition_value = - self.create_default_partition_value(&table_metadata.default_partition_type)?; + self.create_default_partition_value(&table_metadata.default_partition_type)?; for file_path in file_paths { let input_file = file_io.new_input(&file_path)?; From 8756a71183d85ac60132ab54eb40e711567125a8 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Tue, 11 Feb 2025 20:28:40 -0500 Subject: [PATCH 05/10] clippy fix --- crates/iceberg/src/writer/file_writer/parquet_writer.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index c5ee58ac4..aac431060 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -130,6 +130,12 @@ impl IndexByParquetPathName { } } +impl Default for IndexByParquetPathName { + fn default() -> Self { + Self::new() + } +} + impl SchemaVisitor for IndexByParquetPathName { type T = (); From a9c6b94969c7ace2edd466d48b6f1a90ce6d252e Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Thu, 13 Feb 2025 16:06:47 -0500 Subject: [PATCH 06/10] switch to unpartitioned --- crates/iceberg/src/scan.rs | 261 ++++++++++++++++++++++++++- crates/iceberg/src/spec/datatypes.rs | 23 --- crates/iceberg/src/transaction.rs | 101 +++++++---- 3 files changed, 320 insertions(+), 65 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 6a9515ef1..61f06035e 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -1139,8 +1139,8 @@ pub mod tests { use crate::scan::FileScanTask; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry, - ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PrimitiveType, - Schema, Struct, TableMetadata, Type, + ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec, + PrimitiveType, Schema, Struct, StructType, TableMetadata, Type, }; use crate::table::Table; use crate::TableIdent; @@ -1194,6 +1194,55 @@ pub mod tests { } } + pub fn new_unpartitioned() -> Self { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().join("table1"); + let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro"); + let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro"); + let table_metadata1_location = table_location.join("metadata/v1.json"); + + let file_io = FileIO::from_path(table_location.to_str().unwrap()) + .unwrap() + .build() + .unwrap(); + + let mut table_metadata = { + let template_json_str = fs::read_to_string(format!( + "{}/testdata/example_table_metadata_v2.json", + env!("CARGO_MANIFEST_DIR") + )) + .unwrap(); + let mut context = Context::new(); + context.insert("table_location", &table_location); + context.insert("manifest_list_1_location", &manifest_list1_location); + context.insert("manifest_list_2_location", &manifest_list2_location); + context.insert("table_metadata_1_location", &table_metadata1_location); + + let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap(); + serde_json::from_str::(&metadata_json).unwrap() + }; + + table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec()); + table_metadata.partition_specs.clear(); + table_metadata.default_partition_type = StructType::new(vec![]); + table_metadata + .partition_specs + .insert(0, table_metadata.default_spec.clone()); + + let table = Table::builder() + .metadata(table_metadata) + .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) + .file_io(file_io.clone()) + .metadata_location(table_metadata1_location.to_str().unwrap()) + .build() + .unwrap(); + + Self { + table_location: table_location.to_str().unwrap().to_string(), + table, + } + } + fn next_manifest_file(&self) -> OutputFile { self.table .file_io() @@ -1413,6 +1462,214 @@ pub mod tests { writer.close().unwrap(); } } + + pub async fn setup_unpartitioned_manifest_files(&mut self) { + let current_snapshot = self.table.metadata().current_snapshot().unwrap(); + let parent_snapshot = current_snapshot + .parent_snapshot(self.table.metadata()) + .unwrap(); + let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); + let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec()); + + // Write data files using an empty partition for unpartitioned tables. + let mut writer = ManifestWriterBuilder::new( + self.next_manifest_file(), + Some(current_snapshot.snapshot_id()), + vec![], + current_schema.clone(), + current_partition_spec.as_ref().clone(), + ) + .build_v2_data(); + + // Create an empty partition value. + let empty_partition = Struct::empty(); + + writer + .add_entry( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!("{}/1.parquet", &self.table_location)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + + writer + .add_delete_entry( + ManifestEntry::builder() + .status(ManifestStatus::Deleted) + .snapshot_id(parent_snapshot.snapshot_id()) + .sequence_number(parent_snapshot.sequence_number()) + .file_sequence_number(parent_snapshot.sequence_number()) + .data_file( + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!("{}/2.parquet", &self.table_location)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(empty_partition.clone()) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + + writer + .add_existing_entry( + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(parent_snapshot.snapshot_id()) + .sequence_number(parent_snapshot.sequence_number()) + .file_sequence_number(parent_snapshot.sequence_number()) + .data_file( + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!("{}/3.parquet", &self.table_location)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(empty_partition.clone()) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + + let data_file_manifest = writer.write_manifest_file().await.unwrap(); + + // Write to manifest list + let mut manifest_list_write = ManifestListWriter::v2( + self.table + .file_io() + .new_output(current_snapshot.manifest_list()) + .unwrap(), + current_snapshot.snapshot_id(), + current_snapshot.parent_snapshot_id(), + current_snapshot.sequence_number(), + ); + manifest_list_write + .add_manifests(vec![data_file_manifest].into_iter()) + .unwrap(); + manifest_list_write.close().await.unwrap(); + + // prepare data for parquet files + let schema = { + let fields = vec![ + arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "4".to_string(), + )])), + arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )])), + arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "6".to_string(), + )])), + arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )])), + arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "8".to_string(), + )])), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + }; + + // Build the arrays for the RecordBatch + let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; + + let mut values = vec![2; 512]; + values.append(vec![3; 200].as_mut()); + values.append(vec![4; 300].as_mut()); + values.append(vec![5; 12].as_mut()); + let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; + + let mut values = vec![3; 512]; + values.append(vec![4; 512].as_mut()); + let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; + + let mut values = vec!["Apache"; 512]; + values.append(vec!["Iceberg"; 512].as_mut()); + let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef; + + let mut values = vec![100.0f64; 512]; + values.append(vec![150.0f64; 12].as_mut()); + values.append(vec![200.0f64; 500].as_mut()); + let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef; + + let mut values = vec![100i32; 512]; + values.append(vec![150i32; 12].as_mut()); + values.append(vec![200i32; 500].as_mut()); + let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef; + + let mut values = vec![100i64; 512]; + values.append(vec![150i64; 12].as_mut()); + values.append(vec![200i64; 500].as_mut()); + let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; + + let mut values = vec![false; 512]; + values.append(vec![true; 512].as_mut()); + let values: BooleanArray = values.into(); + let col8 = Arc::new(values) as ArrayRef; + + let to_write = RecordBatch::try_new(schema.clone(), vec![ + col1, col2, col3, col4, col5, col6, col7, col8, + ]) + .unwrap(); + + // Write the Parquet files + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + for n in 1..=3 { + let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap(); + let mut writer = + ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap(); + + writer.write(&to_write).expect("Writing batch"); + + // writer must be closed to write footer + writer.close().unwrap(); + } + } } #[test] diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index 0856280df..c806d16ea 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -25,7 +25,6 @@ use std::ops::Index; use std::sync::{Arc, OnceLock}; use ::serde::de::{MapAccess, Visitor}; -use ordered_float::OrderedFloat; use serde::de::{Error, IntoDeserializer}; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use serde_json::Value as JsonValue; @@ -267,28 +266,6 @@ impl PrimitiveType { | (PrimitiveType::Binary, PrimitiveLiteral::Binary(_)) ) } - - /// Used to convert `PrimitiveType` to `Literal`. - pub fn type_to_literal(&self) -> Literal { - Literal::Primitive(match self { - PrimitiveType::Boolean => PrimitiveLiteral::Boolean(false), - PrimitiveType::Int => PrimitiveLiteral::Int(0), - PrimitiveType::Long => PrimitiveLiteral::Long(0), - PrimitiveType::Float => PrimitiveLiteral::Float(OrderedFloat(0.0)), - PrimitiveType::Double => PrimitiveLiteral::Double(OrderedFloat(0.0)), - PrimitiveType::Decimal { .. } => PrimitiveLiteral::Int128(0), - PrimitiveType::Date => PrimitiveLiteral::Int(0), - PrimitiveType::Time => PrimitiveLiteral::Long(0), - PrimitiveType::Timestamp => PrimitiveLiteral::Long(0), - PrimitiveType::Timestamptz => PrimitiveLiteral::Long(0), - PrimitiveType::TimestampNs => PrimitiveLiteral::Long(0), - PrimitiveType::TimestamptzNs => PrimitiveLiteral::Long(0), - PrimitiveType::String => PrimitiveLiteral::String(String::new()), - PrimitiveType::Uuid => PrimitiveLiteral::UInt128(0), - PrimitiveType::Fixed(_size) => PrimitiveLiteral::Binary(Vec::new()), - PrimitiveType::Binary => PrimitiveLiteral::Binary(Vec::new()), - }) - } } impl Serialize for Type { diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index b4ccc9a4a..22cc4f14e 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -24,6 +24,8 @@ use std::mem::discriminant; use std::ops::RangeFrom; use std::sync::Arc; +use arrow_array::StringArray; +use futures::TryStreamExt; use parquet::arrow::async_reader::AsyncFileReader; use parquet::file::metadata::ParquetMetaData; use uuid::Uuid; @@ -183,14 +185,42 @@ impl<'a> Transaction<'a> { check_duplicate_files: bool, ) -> Result> { if check_duplicate_files { - let unique_paths: HashSet<_> = file_paths.iter().collect(); - if unique_paths.len() != file_paths.len() { + let new_files: HashSet<&str> = file_paths.iter().map(|s| s.as_str()).collect(); + + let mut manifest_stream = self.table.inspect().manifests().scan().await?; + let mut referenced_files = Vec::new(); + + while let Some(batch) = manifest_stream.try_next().await? { + let file_path_array = batch + .column(1) + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Failed to downcast file_path column to StringArray", + ) + })?; + + for i in 0..batch.num_rows() { + let file_path = file_path_array.value(i); + if new_files.contains(file_path) { + referenced_files.push(file_path.to_string()); + } + } + } + + if !referenced_files.is_empty() { return Err(Error::new( ErrorKind::DataInvalid, - "Duplicate file paths provided", + format!( + "Cannot add files that are already referenced by table, files: {}", + referenced_files.join(", ") + ), )); } } + let table_metadata = self.table.metadata(); let data_files = Transaction::parquet_files_to_data_files( @@ -214,17 +244,17 @@ impl<'a> Transaction<'a> { table_metadata: &TableMetadata, ) -> Result> { let mut data_files: Vec = Vec::new(); - let partition_value = - self.create_default_partition_value(&table_metadata.default_partition_type)?; + + // TODO: support adding to partitioned table + if !table_metadata.default_spec.fields().is_empty() { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Appending to partitioned tables is not supported", + )); + } for file_path in file_paths { let input_file = file_io.new_input(&file_path)?; - if !input_file.exists().await? { - return Err(Error::new( - ErrorKind::DataInvalid, - "File does not exist".to_string(), - )); - } let file_metadata = input_file.metadata().await?; let file_size_in_bytes = file_metadata.size as usize; let reader = input_file.reader().await?; @@ -239,7 +269,6 @@ impl<'a> Transaction<'a> { let builder = self.parquet_to_data_file_builder( table_metadata.current_schema().clone(), parquet_metadata, - &partition_value, file_size_in_bytes, file_path, )?; @@ -254,7 +283,6 @@ impl<'a> Transaction<'a> { &self, schema: SchemaRef, metadata: Arc, - partition: &Struct, written_size: usize, file_path: String, ) -> Result { @@ -305,7 +333,7 @@ impl<'a> Transaction<'a> { .content(DataContentType::Data) .file_path(file_path) .file_format(DataFileFormat::Parquet) - .partition(partition.clone()) + .partition(Struct::empty()) .record_count(metadata.file_metadata().num_rows() as u64) .file_size_in_bytes(written_size as u64) .column_sizes(column_sizes) @@ -323,24 +351,6 @@ impl<'a> Transaction<'a> { Ok(builder) } - - fn create_default_partition_value(&self, partition_type: &StructType) -> Result { - let literals = partition_type - .fields() - .iter() - .map(|field| { - let primitive_type = field.field_type.as_primitive_type().ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "Partition field should only be a primitive type.", - ) - })?; - Ok(Some(primitive_type.type_to_literal())) - }) - .collect::>>()?; - - Ok(Struct::from_iter(literals)) - } } /// FastAppendAction is a transaction action for fast append data files to the table. @@ -491,6 +501,7 @@ impl<'a> SnapshotProduceAction<'a> { "Partition value is not compatible with partition type", )); } + for (value, field) in partition_value.fields().iter().zip(partition_type.fields()) { if !field .field_type @@ -1045,9 +1056,9 @@ mod tests { } #[tokio::test] - async fn test_add_existing_parquet_files() { - let mut fixture = TableTestFixture::new(); - fixture.setup_manifest_files().await; + async fn test_add_existing_parquet_files_to_unpartitioned_table() { + let mut fixture = TableTestFixture::new_unpartitioned(); + fixture.setup_unpartitioned_manifest_files().await; let tx = crate::transaction::Transaction::new(&fixture.table); let file_paths = vec![ @@ -1056,7 +1067,7 @@ mod tests { format!("{}/3.parquet", &fixture.table_location), ]; - // attempt to add the existing Parquet files with fast append + // Attempt to add the existing Parquet files with fast append. let new_tx = tx .add_parquet_files(file_paths.clone(), true) .await @@ -1094,7 +1105,12 @@ mod tests { .await .expect("Failed to load manifest list"); - assert_eq!(manifest_list.entries().len(), 2); + assert_eq!( + manifest_list.entries().len(), + 2, + "Expected 2 manifest list entries, got {}", + manifest_list.entries().len() + ); // Load the manifest from the manifest list let manifest = manifest_list.entries()[0] @@ -1102,10 +1118,15 @@ mod tests { .await .expect("Failed to load manifest"); - // Since we added three files with add_parquet_files, check that the manifest contains three entries - assert_eq!(manifest.entries().len(), 3); + // Check that the manifest contains three entries. + assert_eq!( + manifest.entries().len(), + 3, + "Expected 3 manifest entries, got {}", + manifest.entries().len() + ); - // Verify each file path appears in manifest + // Verify each file path appears in manifest. let manifest_paths: Vec = manifest .entries() .iter() From e01ead812b7945dbf2a7809c4688ef9ddf20d21e Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Mon, 17 Feb 2025 20:40:23 -0500 Subject: [PATCH 07/10] code organization fixes --- crates/iceberg/src/transaction.rs | 209 ++++++------------ .../src/writer/file_writer/parquet_writer.rs | 78 ++++++- 2 files changed, 142 insertions(+), 145 deletions(-) diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 22cc4f14e..4a34a5c18 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -22,25 +22,23 @@ use std::collections::{HashMap, HashSet}; use std::future::Future; use std::mem::discriminant; use std::ops::RangeFrom; -use std::sync::Arc; use arrow_array::StringArray; use futures::TryStreamExt; use parquet::arrow::async_reader::AsyncFileReader; -use parquet::file::metadata::ParquetMetaData; use uuid::Uuid; use crate::arrow::ArrowFileReader; use crate::error::Result; use crate::io::{FileIO, OutputFile}; use crate::spec::{ - visit_schema, DataContentType, DataFile, DataFileBuilder, DataFileFormat, FormatVersion, - ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriterBuilder, NullOrder, Operation, - SchemaRef, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, - Struct, StructType, Summary, TableMetadata, Transform, MAIN_BRANCH, + DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, ManifestListWriter, + ManifestWriterBuilder, NullOrder, Operation, Snapshot, SnapshotReference, SnapshotRetention, + SortDirection, SortField, SortOrder, Struct, StructType, Summary, TableMetadata, Transform, + MAIN_BRANCH, }; use crate::table::Table; -use crate::writer::file_writer::parquet_writer::{IndexByParquetPathName, MinMaxColAggregator}; +use crate::writer::file_writer::ParquetWriter; use crate::TableUpdate::UpgradeFormatVersion; use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; @@ -178,65 +176,6 @@ impl<'a> Transaction<'a> { catalog.update_table(table_commit).await } - /// Adds existing parquet files - pub async fn add_parquet_files( - self, - file_paths: Vec, - check_duplicate_files: bool, - ) -> Result> { - if check_duplicate_files { - let new_files: HashSet<&str> = file_paths.iter().map(|s| s.as_str()).collect(); - - let mut manifest_stream = self.table.inspect().manifests().scan().await?; - let mut referenced_files = Vec::new(); - - while let Some(batch) = manifest_stream.try_next().await? { - let file_path_array = batch - .column(1) - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - "Failed to downcast file_path column to StringArray", - ) - })?; - - for i in 0..batch.num_rows() { - let file_path = file_path_array.value(i); - if new_files.contains(file_path) { - referenced_files.push(file_path.to_string()); - } - } - } - - if !referenced_files.is_empty() { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot add files that are already referenced by table, files: {}", - referenced_files.join(", ") - ), - )); - } - } - - let table_metadata = self.table.metadata(); - - let data_files = Transaction::parquet_files_to_data_files( - &self, - self.table.file_io(), - file_paths, - table_metadata, - ) - .await?; - - let mut fast_append_action = self.fast_append(Some(Uuid::new_v4()), Vec::new())?; - fast_append_action.add_data_files(data_files)?; - - fast_append_action.apply().await - } - async fn parquet_files_to_data_files( &self, file_io: &FileIO, @@ -266,7 +205,7 @@ impl<'a> Transaction<'a> { format!("Error reading Parquet metadata: {}", err), ) })?; - let builder = self.parquet_to_data_file_builder( + let builder = ParquetWriter::parquet_to_data_file_builder( table_metadata.current_schema().clone(), parquet_metadata, file_size_in_bytes, @@ -277,80 +216,6 @@ impl<'a> Transaction<'a> { } Ok(data_files) } - - /// `ParquetMetadata` to data file builder - pub fn parquet_to_data_file_builder( - &self, - schema: SchemaRef, - metadata: Arc, - written_size: usize, - file_path: String, - ) -> Result { - let index_by_parquet_path = { - let mut visitor = IndexByParquetPathName::new(); - visit_schema(&schema, &mut visitor)?; - visitor - }; - - let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = { - let mut per_col_size: HashMap = HashMap::new(); - let mut per_col_val_num: HashMap = HashMap::new(); - let mut per_col_null_val_num: HashMap = HashMap::new(); - let mut min_max_agg = MinMaxColAggregator::new(schema); - - for row_group in metadata.row_groups() { - for column_chunk_metadata in row_group.columns() { - let parquet_path = column_chunk_metadata.column_descr().path().string(); - - let Some(&field_id) = index_by_parquet_path.get(&parquet_path) else { - continue; - }; - - *per_col_size.entry(field_id).or_insert(0) += - column_chunk_metadata.compressed_size() as u64; - *per_col_val_num.entry(field_id).or_insert(0) += - column_chunk_metadata.num_values() as u64; - - if let Some(statistics) = column_chunk_metadata.statistics() { - if let Some(null_count) = statistics.null_count_opt() { - *per_col_null_val_num.entry(field_id).or_insert(0) += null_count; - } - - min_max_agg.update(field_id, statistics.clone())?; - } - } - } - ( - per_col_size, - per_col_val_num, - per_col_null_val_num, - min_max_agg.produce(), - ) - }; - - let mut builder = DataFileBuilder::default(); - builder - .content(DataContentType::Data) - .file_path(file_path) - .file_format(DataFileFormat::Parquet) - .partition(Struct::empty()) - .record_count(metadata.file_metadata().num_rows() as u64) - .file_size_in_bytes(written_size as u64) - .column_sizes(column_sizes) - .value_counts(value_counts) - .null_value_counts(null_value_counts) - .lower_bounds(lower_bounds) - .upper_bounds(upper_bounds) - .split_offsets( - metadata - .row_groups() - .iter() - .filter_map(|group| group.file_offset()) - .collect(), - ); - - Ok(builder) - } } /// FastAppendAction is a transaction action for fast append data files to the table. @@ -387,6 +252,64 @@ impl<'a> FastAppendAction<'a> { Ok(self) } + /// Adds existing parquet files + pub async fn add_parquet_files( + transaction: Transaction<'a>, + file_paths: Vec, + ) -> Result> { + // Checks duplicate files + let new_files: HashSet<&str> = file_paths.iter().map(|s| s.as_str()).collect(); + + let mut manifest_stream = transaction.table.inspect().manifests().scan().await?; + let mut referenced_files = Vec::new(); + + while let Some(batch) = manifest_stream.try_next().await? { + let file_path_array = batch + .column(1) + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Failed to downcast file_path column to StringArray", + ) + })?; + + for i in 0..batch.num_rows() { + let file_path = file_path_array.value(i); + if new_files.contains(file_path) { + referenced_files.push(file_path.to_string()); + } + } + } + + if !referenced_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add files that are already referenced by table, files: {}", + referenced_files.join(", ") + ), + )); + } + + let table_metadata = transaction.table.metadata(); + + let data_files = Transaction::parquet_files_to_data_files( + &transaction, + transaction.table.file_io(), + file_paths, + table_metadata, + ) + .await?; + + let mut fast_append_action = + Transaction::fast_append(transaction, Some(Uuid::new_v4()), Vec::new())?; + fast_append_action.add_data_files(data_files)?; + + fast_append_action.apply().await + } + /// Finished building the action and apply it to the transaction. pub async fn apply(self) -> Result> { self.snapshot_produce_action @@ -789,6 +712,7 @@ mod tests { use std::fs::File; use std::io::BufReader; + use super::*; use crate::io::FileIOBuilder; use crate::scan::tests::TableTestFixture; use crate::spec::{ @@ -1068,8 +992,7 @@ mod tests { ]; // Attempt to add the existing Parquet files with fast append. - let new_tx = tx - .add_parquet_files(file_paths.clone(), true) + let new_tx = FastAppendAction::add_parquet_files(tx, file_paths.clone()) .await .expect("Adding existing Parquet files should succeed"); diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index aac431060..273b68874 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -27,6 +27,7 @@ use futures::future::BoxFuture; use itertools::Itertools; use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter; use parquet::arrow::AsyncArrowWriter; +use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; use parquet::file::statistics::{from_thrift, Statistics}; use parquet::format::FileMetaData; @@ -39,8 +40,8 @@ use crate::arrow::{ }; use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ - visit_schema, DataFileBuilder, DataFileFormat, Datum, ListType, MapType, NestedFieldRef, - PrimitiveType, Schema, SchemaRef, SchemaVisitor, StructType, Type, + visit_schema, DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, MapType, + NestedFieldRef, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct, StructType, Type, }; use crate::writer::CurrentFileStatus; use crate::{Error, ErrorKind, Result}; @@ -403,6 +404,79 @@ impl ParquetWriter { ); Ok(builder) } + + /// `ParquetMetadata` to data file builder + pub fn parquet_to_data_file_builder( + schema: SchemaRef, + metadata: Arc, + written_size: usize, + file_path: String, + ) -> Result { + let index_by_parquet_path = { + let mut visitor = IndexByParquetPathName::new(); + visit_schema(&schema, &mut visitor)?; + visitor + }; + + let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = { + let mut per_col_size: HashMap = HashMap::new(); + let mut per_col_val_num: HashMap = HashMap::new(); + let mut per_col_null_val_num: HashMap = HashMap::new(); + let mut min_max_agg = MinMaxColAggregator::new(schema); + + for row_group in metadata.row_groups() { + for column_chunk_metadata in row_group.columns() { + let parquet_path = column_chunk_metadata.column_descr().path().string(); + + let Some(&field_id) = index_by_parquet_path.get(&parquet_path) else { + continue; + }; + + *per_col_size.entry(field_id).or_insert(0) += + column_chunk_metadata.compressed_size() as u64; + *per_col_val_num.entry(field_id).or_insert(0) += + column_chunk_metadata.num_values() as u64; + + if let Some(statistics) = column_chunk_metadata.statistics() { + if let Some(null_count) = statistics.null_count_opt() { + *per_col_null_val_num.entry(field_id).or_insert(0) += null_count; + } + + min_max_agg.update(field_id, statistics.clone())?; + } + } + } + ( + per_col_size, + per_col_val_num, + per_col_null_val_num, + min_max_agg.produce(), + ) + }; + + let mut builder = DataFileBuilder::default(); + builder + .content(DataContentType::Data) + .file_path(file_path) + .file_format(DataFileFormat::Parquet) + .partition(Struct::empty()) + .record_count(metadata.file_metadata().num_rows() as u64) + .file_size_in_bytes(written_size as u64) + .column_sizes(column_sizes) + .value_counts(value_counts) + .null_value_counts(null_value_counts) + .lower_bounds(lower_bounds) + .upper_bounds(upper_bounds) + .split_offsets( + metadata + .row_groups() + .iter() + .filter_map(|group| group.file_offset()) + .collect(), + ); + + Ok(builder) + } } impl FileWriter for ParquetWriter { From b38496ebab18a13c95379dc4864d76927d956316 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Tue, 18 Feb 2025 15:00:04 -0500 Subject: [PATCH 08/10] Update crates/iceberg/src/transaction.rs Co-authored-by: Renjie Liu --- crates/iceberg/src/transaction.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 4a34a5c18..56a6740f6 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -254,8 +254,8 @@ impl<'a> FastAppendAction<'a> { /// Adds existing parquet files pub async fn add_parquet_files( - transaction: Transaction<'a>, - file_paths: Vec, + &mut self, + file_path: &str ) -> Result> { // Checks duplicate files let new_files: HashSet<&str> = file_paths.iter().map(|s| s.as_str()).collect(); From 86ea8eb5d1260c77b8b181df686ba653bc178743 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Fri, 21 Feb 2025 01:16:17 -0500 Subject: [PATCH 09/10] fixes --- crates/iceberg/src/catalog/mod.rs | 2 +- crates/iceberg/src/transaction.rs | 80 +++++++++---------- crates/iceberg/src/writer/file_writer/mod.rs | 2 +- .../src/writer/file_writer/parquet_writer.rs | 2 +- 4 files changed, 42 insertions(+), 44 deletions(-) diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index cbda6c905..eb478e304 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -285,7 +285,7 @@ impl TableCommit { } /// TableRequirement represents a requirement for a table in the catalog. -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(tag = "type")] pub enum TableRequirement { /// The table must not already exist; used for create transactions diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 56a6740f6..534585e72 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -45,6 +45,7 @@ use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdat const META_ROOT_PATH: &str = "metadata"; /// Table transaction. +#[derive(Clone)] pub struct Transaction<'a> { table: &'a Table, updates: Vec, @@ -137,7 +138,7 @@ impl<'a> Transaction<'a> { /// Creates a fast append action. pub fn fast_append( - self, + &self, commit_uuid: Option, key_metadata: Vec, ) -> Result> { @@ -182,9 +183,9 @@ impl<'a> Transaction<'a> { file_paths: Vec, table_metadata: &TableMetadata, ) -> Result> { + // TODO: support adding to partitioned table let mut data_files: Vec = Vec::new(); - // TODO: support adding to partitioned table if !table_metadata.default_spec.fields().is_empty() { return Err(Error::new( ErrorKind::FeatureUnsupported, @@ -214,6 +215,7 @@ impl<'a> Transaction<'a> { let data_file = builder.build().unwrap(); data_files.push(data_file); } + Ok(data_files) } } @@ -226,7 +228,7 @@ pub struct FastAppendAction<'a> { impl<'a> FastAppendAction<'a> { #[allow(clippy::too_many_arguments)] pub(crate) fn new( - tx: Transaction<'a>, + tx: &Transaction<'a>, snapshot_id: i64, commit_uuid: Uuid, key_metadata: Vec, @@ -254,11 +256,35 @@ impl<'a> FastAppendAction<'a> { /// Adds existing parquet files pub async fn add_parquet_files( - &mut self, - file_path: &str + transaction: Transaction<'a>, + file_path: Vec, ) -> Result> { + let table_metadata = transaction.table.metadata(); + + let data_file = Transaction::parquet_files_to_data_files( + &transaction, + transaction.table.file_io(), + file_path, + table_metadata, + ) + .await?; + + let mut fast_append_action = + Transaction::fast_append(&transaction, Some(Uuid::new_v4()), Vec::new())?; + fast_append_action.add_data_files(data_file)?; + + fast_append_action.apply(transaction).await + } + + /// Finished building the action and apply it to the transaction. + pub async fn apply(self, transaction: Transaction<'a>) -> Result> { // Checks duplicate files - let new_files: HashSet<&str> = file_paths.iter().map(|s| s.as_str()).collect(); + let new_files: HashSet<&str> = self + .snapshot_produce_action + .added_data_files + .iter() + .map(|df| df.file_path.as_str()) + .collect(); let mut manifest_stream = transaction.table.inspect().manifests().scan().await?; let mut referenced_files = Vec::new(); @@ -293,25 +319,6 @@ impl<'a> FastAppendAction<'a> { )); } - let table_metadata = transaction.table.metadata(); - - let data_files = Transaction::parquet_files_to_data_files( - &transaction, - transaction.table.file_io(), - file_paths, - table_metadata, - ) - .await?; - - let mut fast_append_action = - Transaction::fast_append(transaction, Some(Uuid::new_v4()), Vec::new())?; - fast_append_action.add_data_files(data_files)?; - - fast_append_action.apply().await - } - - /// Finished building the action and apply it to the transaction. - pub async fn apply(self) -> Result> { self.snapshot_produce_action .apply(FastAppendOperation, DefaultManifestProcess) .await @@ -396,14 +403,14 @@ struct SnapshotProduceAction<'a> { impl<'a> SnapshotProduceAction<'a> { pub(crate) fn new( - tx: Transaction<'a>, + tx: &Transaction<'a>, snapshot_id: i64, key_metadata: Vec, commit_uuid: Uuid, snapshot_properties: HashMap, ) -> Result { Ok(Self { - tx, + tx: tx.clone(), snapshot_id, commit_uuid, snapshot_properties, @@ -884,7 +891,7 @@ mod tests { async fn test_fast_append_action() { let table = make_v2_minimal_table(); let tx = Transaction::new(&table); - let mut action = tx.fast_append(None, vec![]).unwrap(); + let mut action = tx.clone().fast_append(None, vec![]).unwrap(); // check add data file with incompatible partition value let data_file = DataFileBuilder::default() @@ -908,7 +915,7 @@ mod tests { .build() .unwrap(); action.add_data_files(vec![data_file.clone()]).unwrap(); - let tx = action.apply().await.unwrap(); + let tx = action.apply(tx).await.unwrap(); // check updates and requirements assert!( @@ -983,7 +990,7 @@ mod tests { async fn test_add_existing_parquet_files_to_unpartitioned_table() { let mut fixture = TableTestFixture::new_unpartitioned(); fixture.setup_unpartitioned_manifest_files().await; - let tx = crate::transaction::Transaction::new(&fixture.table); + let mut tx = crate::transaction::Transaction::new(&fixture.table); let file_paths = vec![ format!("{}/1.parquet", &fixture.table_location), @@ -1042,12 +1049,7 @@ mod tests { .expect("Failed to load manifest"); // Check that the manifest contains three entries. - assert_eq!( - manifest.entries().len(), - 3, - "Expected 3 manifest entries, got {}", - manifest.entries().len() - ); + assert_eq!(manifest.entries().len(), 3); // Verify each file path appears in manifest. let manifest_paths: Vec = manifest @@ -1056,11 +1058,7 @@ mod tests { .map(|entry| entry.data_file().file_path.clone()) .collect(); for path in file_paths { - assert!( - manifest_paths.contains(&path), - "Manifest does not contain expected file path: {}", - path - ); + assert!(manifest_paths.contains(&path)); } } } diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index 7237aff6c..4a0fffcc1 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -24,7 +24,7 @@ use super::CurrentFileStatus; use crate::spec::DataFileBuilder; use crate::Result; -pub mod parquet_writer; +mod parquet_writer; pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder}; mod track_writer; diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 273b68874..3c21c350b 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -229,7 +229,7 @@ pub struct ParquetWriter { } /// Used to aggregate min and max value of each column. -pub struct MinMaxColAggregator { +struct MinMaxColAggregator { lower_bounds: HashMap, upper_bounds: HashMap, schema: SchemaRef, From c0cfb5602d7b61cdafcee105a80f00e871f15f3b Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Fri, 21 Feb 2025 01:33:51 -0500 Subject: [PATCH 10/10] clippy --- crates/iceberg/src/transaction.rs | 2 +- .../tests/shared_tests/append_data_file_test.rs | 4 ++-- .../tests/shared_tests/append_partition_data_file_test.rs | 2 +- .../tests/shared_tests/conflict_commit_test.rs | 5 +++-- crates/integration_tests/tests/shared_tests/scan_all_type.rs | 2 +- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 534585e72..a8f3fadb5 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -990,7 +990,7 @@ mod tests { async fn test_add_existing_parquet_files_to_unpartitioned_table() { let mut fixture = TableTestFixture::new_unpartitioned(); fixture.setup_unpartitioned_manifest_files().await; - let mut tx = crate::transaction::Transaction::new(&fixture.table); + let tx = crate::transaction::Transaction::new(&fixture.table); let file_paths = vec![ format!("{}/1.parquet", &fixture.table_location), diff --git a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs index a24b88634..ae959a9e2 100644 --- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs @@ -114,7 +114,7 @@ async fn test_append_data_file() { let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx = append_action.apply().await.unwrap(); + let tx = append_action.apply(tx).await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result @@ -134,7 +134,7 @@ async fn test_append_data_file() { let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx = append_action.apply().await.unwrap(); + let tx = append_action.apply(tx).await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result again diff --git a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs index 42cc596f5..4da42d666 100644 --- a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs @@ -123,7 +123,7 @@ async fn test_append_partition_data_file() { append_action .add_data_files(data_file_valid.clone()) .unwrap(); - let tx = append_action.apply().await.unwrap(); + let tx = append_action.apply(tx).await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result diff --git a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs index 2686a1d26..be5d9d4e9 100644 --- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs @@ -92,11 +92,12 @@ async fn test_append_data_file_conflict() { let tx1 = Transaction::new(&table); let mut append_action = tx1.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx1 = append_action.apply().await.unwrap(); + let tx1 = append_action.apply(tx1).await.unwrap(); + let tx2 = Transaction::new(&table); let mut append_action = tx2.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx2 = append_action.apply().await.unwrap(); + let tx2 = append_action.apply(tx2).await.unwrap(); let table = tx2 .commit(&rest_catalog) .await diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs b/crates/integration_tests/tests/shared_tests/scan_all_type.rs index 673a78ac0..2d738c2bb 100644 --- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs +++ b/crates/integration_tests/tests/shared_tests/scan_all_type.rs @@ -311,7 +311,7 @@ async fn test_scan_all_type() { let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx = append_action.apply().await.unwrap(); + let tx = append_action.apply(tx).await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result