diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 166baa521..6915ef920 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -1129,14 +1129,14 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { /// - `metadata_size_hint`: Provide a hint as to the size of the parquet file's footer. /// - `preload_column_index`: Load the Column Index as part of [`Self::get_metadata`]. /// - `preload_offset_index`: Load the Offset Index as part of [`Self::get_metadata`]. -struct ArrowFileReader { +pub struct ArrowFileReader { meta: FileMetadata, r: R, } impl ArrowFileReader { /// Create a new ArrowFileReader - fn new(meta: FileMetadata, r: R) -> Self { + pub fn new(meta: FileMetadata, r: R) -> Self { Self { meta, r } } } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index cbda6c905..eb478e304 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -285,7 +285,7 @@ impl TableCommit { } /// TableRequirement represents a requirement for a table in the catalog. -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(tag = "type")] pub enum TableRequirement { /// The table must not already exist; used for create transactions diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 11c1d8190..bfa1266dd 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -1139,14 +1139,14 @@ pub mod tests { use crate::scan::FileScanTask; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry, - ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PrimitiveType, - Schema, Struct, TableMetadata, Type, + ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec, + PrimitiveType, Schema, Struct, StructType, TableMetadata, Type, }; use crate::table::Table; use crate::TableIdent; pub struct TableTestFixture { - table_location: String, + pub table_location: String, pub table: Table, } @@ -1194,6 +1194,55 @@ pub mod tests { } } + pub fn new_unpartitioned() -> Self { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().join("table1"); + let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro"); + let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro"); + let table_metadata1_location = table_location.join("metadata/v1.json"); + + let file_io = FileIO::from_path(table_location.to_str().unwrap()) + .unwrap() + .build() + .unwrap(); + + let mut table_metadata = { + let template_json_str = fs::read_to_string(format!( + "{}/testdata/example_table_metadata_v2.json", + env!("CARGO_MANIFEST_DIR") + )) + .unwrap(); + let mut context = Context::new(); + context.insert("table_location", &table_location); + context.insert("manifest_list_1_location", &manifest_list1_location); + context.insert("manifest_list_2_location", &manifest_list2_location); + context.insert("table_metadata_1_location", &table_metadata1_location); + + let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap(); + serde_json::from_str::(&metadata_json).unwrap() + }; + + table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec()); + table_metadata.partition_specs.clear(); + table_metadata.default_partition_type = StructType::new(vec![]); + table_metadata + .partition_specs + .insert(0, table_metadata.default_spec.clone()); + + let table = Table::builder() + .metadata(table_metadata) + .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) + .file_io(file_io.clone()) + .metadata_location(table_metadata1_location.to_str().unwrap()) + .build() + .unwrap(); + + Self { + table_location: table_location.to_str().unwrap().to_string(), + table, + } + } + fn next_manifest_file(&self) -> OutputFile { self.table .file_io() @@ -1413,6 +1462,214 @@ pub mod tests { writer.close().unwrap(); } } + + pub async fn setup_unpartitioned_manifest_files(&mut self) { + let current_snapshot = self.table.metadata().current_snapshot().unwrap(); + let parent_snapshot = current_snapshot + .parent_snapshot(self.table.metadata()) + .unwrap(); + let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); + let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec()); + + // Write data files using an empty partition for unpartitioned tables. + let mut writer = ManifestWriterBuilder::new( + self.next_manifest_file(), + Some(current_snapshot.snapshot_id()), + vec![], + current_schema.clone(), + current_partition_spec.as_ref().clone(), + ) + .build_v2_data(); + + // Create an empty partition value. + let empty_partition = Struct::empty(); + + writer + .add_entry( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!("{}/1.parquet", &self.table_location)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + + writer + .add_delete_entry( + ManifestEntry::builder() + .status(ManifestStatus::Deleted) + .snapshot_id(parent_snapshot.snapshot_id()) + .sequence_number(parent_snapshot.sequence_number()) + .file_sequence_number(parent_snapshot.sequence_number()) + .data_file( + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!("{}/2.parquet", &self.table_location)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(empty_partition.clone()) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + + writer + .add_existing_entry( + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(parent_snapshot.snapshot_id()) + .sequence_number(parent_snapshot.sequence_number()) + .file_sequence_number(parent_snapshot.sequence_number()) + .data_file( + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!("{}/3.parquet", &self.table_location)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(empty_partition.clone()) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + + let data_file_manifest = writer.write_manifest_file().await.unwrap(); + + // Write to manifest list + let mut manifest_list_write = ManifestListWriter::v2( + self.table + .file_io() + .new_output(current_snapshot.manifest_list()) + .unwrap(), + current_snapshot.snapshot_id(), + current_snapshot.parent_snapshot_id(), + current_snapshot.sequence_number(), + ); + manifest_list_write + .add_manifests(vec![data_file_manifest].into_iter()) + .unwrap(); + manifest_list_write.close().await.unwrap(); + + // prepare data for parquet files + let schema = { + let fields = vec![ + arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "4".to_string(), + )])), + arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )])), + arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "6".to_string(), + )])), + arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )])), + arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "8".to_string(), + )])), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + }; + + // Build the arrays for the RecordBatch + let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; + + let mut values = vec![2; 512]; + values.append(vec![3; 200].as_mut()); + values.append(vec![4; 300].as_mut()); + values.append(vec![5; 12].as_mut()); + let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; + + let mut values = vec![3; 512]; + values.append(vec![4; 512].as_mut()); + let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; + + let mut values = vec!["Apache"; 512]; + values.append(vec!["Iceberg"; 512].as_mut()); + let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef; + + let mut values = vec![100.0f64; 512]; + values.append(vec![150.0f64; 12].as_mut()); + values.append(vec![200.0f64; 500].as_mut()); + let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef; + + let mut values = vec![100i32; 512]; + values.append(vec![150i32; 12].as_mut()); + values.append(vec![200i32; 500].as_mut()); + let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef; + + let mut values = vec![100i64; 512]; + values.append(vec![150i64; 12].as_mut()); + values.append(vec![200i64; 500].as_mut()); + let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; + + let mut values = vec![false; 512]; + values.append(vec![true; 512].as_mut()); + let values: BooleanArray = values.into(); + let col8 = Arc::new(values) as ArrayRef; + + let to_write = RecordBatch::try_new(schema.clone(), vec![ + col1, col2, col3, col4, col5, col6, col7, col8, + ]) + .unwrap(); + + // Write the Parquet files + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + for n in 1..=3 { + let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap(); + let mut writer = + ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap(); + + writer.write(&to_write).expect("Writing batch"); + + // writer must be closed to write footer + writer.close().unwrap(); + } + } } #[test] diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index c27a107da..a8f3fadb5 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -18,27 +18,34 @@ //! This module contains transaction api. use std::cmp::Ordering; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::future::Future; use std::mem::discriminant; use std::ops::RangeFrom; +use arrow_array::StringArray; +use futures::TryStreamExt; +use parquet::arrow::async_reader::AsyncFileReader; use uuid::Uuid; +use crate::arrow::ArrowFileReader; use crate::error::Result; -use crate::io::OutputFile; +use crate::io::{FileIO, OutputFile}; use crate::spec::{ DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriterBuilder, NullOrder, Operation, Snapshot, SnapshotReference, SnapshotRetention, - SortDirection, SortField, SortOrder, Struct, StructType, Summary, Transform, MAIN_BRANCH, + SortDirection, SortField, SortOrder, Struct, StructType, Summary, TableMetadata, Transform, + MAIN_BRANCH, }; use crate::table::Table; +use crate::writer::file_writer::ParquetWriter; use crate::TableUpdate::UpgradeFormatVersion; use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; const META_ROOT_PATH: &str = "metadata"; /// Table transaction. +#[derive(Clone)] pub struct Transaction<'a> { table: &'a Table, updates: Vec, @@ -131,7 +138,7 @@ impl<'a> Transaction<'a> { /// Creates a fast append action. pub fn fast_append( - self, + &self, commit_uuid: Option, key_metadata: Vec, ) -> Result> { @@ -169,6 +176,48 @@ impl<'a> Transaction<'a> { catalog.update_table(table_commit).await } + + async fn parquet_files_to_data_files( + &self, + file_io: &FileIO, + file_paths: Vec, + table_metadata: &TableMetadata, + ) -> Result> { + // TODO: support adding to partitioned table + let mut data_files: Vec = Vec::new(); + + if !table_metadata.default_spec.fields().is_empty() { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Appending to partitioned tables is not supported", + )); + } + + for file_path in file_paths { + let input_file = file_io.new_input(&file_path)?; + let file_metadata = input_file.metadata().await?; + let file_size_in_bytes = file_metadata.size as usize; + let reader = input_file.reader().await?; + + let mut parquet_reader = ArrowFileReader::new(file_metadata, reader); + let parquet_metadata = parquet_reader.get_metadata().await.map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + format!("Error reading Parquet metadata: {}", err), + ) + })?; + let builder = ParquetWriter::parquet_to_data_file_builder( + table_metadata.current_schema().clone(), + parquet_metadata, + file_size_in_bytes, + file_path, + )?; + let data_file = builder.build().unwrap(); + data_files.push(data_file); + } + + Ok(data_files) + } } /// FastAppendAction is a transaction action for fast append data files to the table. @@ -179,7 +228,7 @@ pub struct FastAppendAction<'a> { impl<'a> FastAppendAction<'a> { #[allow(clippy::too_many_arguments)] pub(crate) fn new( - tx: Transaction<'a>, + tx: &Transaction<'a>, snapshot_id: i64, commit_uuid: Uuid, key_metadata: Vec, @@ -205,8 +254,71 @@ impl<'a> FastAppendAction<'a> { Ok(self) } + /// Adds existing parquet files + pub async fn add_parquet_files( + transaction: Transaction<'a>, + file_path: Vec, + ) -> Result> { + let table_metadata = transaction.table.metadata(); + + let data_file = Transaction::parquet_files_to_data_files( + &transaction, + transaction.table.file_io(), + file_path, + table_metadata, + ) + .await?; + + let mut fast_append_action = + Transaction::fast_append(&transaction, Some(Uuid::new_v4()), Vec::new())?; + fast_append_action.add_data_files(data_file)?; + + fast_append_action.apply(transaction).await + } + /// Finished building the action and apply it to the transaction. - pub async fn apply(self) -> Result> { + pub async fn apply(self, transaction: Transaction<'a>) -> Result> { + // Checks duplicate files + let new_files: HashSet<&str> = self + .snapshot_produce_action + .added_data_files + .iter() + .map(|df| df.file_path.as_str()) + .collect(); + + let mut manifest_stream = transaction.table.inspect().manifests().scan().await?; + let mut referenced_files = Vec::new(); + + while let Some(batch) = manifest_stream.try_next().await? { + let file_path_array = batch + .column(1) + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Failed to downcast file_path column to StringArray", + ) + })?; + + for i in 0..batch.num_rows() { + let file_path = file_path_array.value(i); + if new_files.contains(file_path) { + referenced_files.push(file_path.to_string()); + } + } + } + + if !referenced_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add files that are already referenced by table, files: {}", + referenced_files.join(", ") + ), + )); + } + self.snapshot_produce_action .apply(FastAppendOperation, DefaultManifestProcess) .await @@ -291,14 +403,14 @@ struct SnapshotProduceAction<'a> { impl<'a> SnapshotProduceAction<'a> { pub(crate) fn new( - tx: Transaction<'a>, + tx: &Transaction<'a>, snapshot_id: i64, key_metadata: Vec, commit_uuid: Uuid, snapshot_properties: HashMap, ) -> Result { Ok(Self { - tx, + tx: tx.clone(), snapshot_id, commit_uuid, snapshot_properties, @@ -319,6 +431,7 @@ impl<'a> SnapshotProduceAction<'a> { "Partition value is not compatible with partition type", )); } + for (value, field) in partition_value.fields().iter().zip(partition_type.fields()) { if !field .field_type @@ -606,7 +719,9 @@ mod tests { use std::fs::File; use std::io::BufReader; + use super::*; use crate::io::FileIOBuilder; + use crate::scan::tests::TableTestFixture; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Struct, TableMetadata, @@ -776,7 +891,7 @@ mod tests { async fn test_fast_append_action() { let table = make_v2_minimal_table(); let tx = Transaction::new(&table); - let mut action = tx.fast_append(None, vec![]).unwrap(); + let mut action = tx.clone().fast_append(None, vec![]).unwrap(); // check add data file with incompatible partition value let data_file = DataFileBuilder::default() @@ -800,7 +915,7 @@ mod tests { .build() .unwrap(); action.add_data_files(vec![data_file.clone()]).unwrap(); - let tx = action.apply().await.unwrap(); + let tx = action.apply(tx).await.unwrap(); // check updates and requirements assert!( @@ -847,6 +962,7 @@ mod tests { .sequence_number() .expect("Inherit sequence number by load manifest") ); + assert_eq!( new_snapshot.snapshot_id(), manifest.entries()[0].snapshot_id().unwrap() @@ -869,4 +985,80 @@ mod tests { "Should not allow to do same kinds update in same transaction" ); } + + #[tokio::test] + async fn test_add_existing_parquet_files_to_unpartitioned_table() { + let mut fixture = TableTestFixture::new_unpartitioned(); + fixture.setup_unpartitioned_manifest_files().await; + let tx = crate::transaction::Transaction::new(&fixture.table); + + let file_paths = vec![ + format!("{}/1.parquet", &fixture.table_location), + format!("{}/2.parquet", &fixture.table_location), + format!("{}/3.parquet", &fixture.table_location), + ]; + + // Attempt to add the existing Parquet files with fast append. + let new_tx = FastAppendAction::add_parquet_files(tx, file_paths.clone()) + .await + .expect("Adding existing Parquet files should succeed"); + + let mut found_add_snapshot = false; + let mut found_set_snapshot_ref = false; + for update in new_tx.updates.iter() { + match update { + TableUpdate::AddSnapshot { .. } => { + found_add_snapshot = true; + } + TableUpdate::SetSnapshotRef { + ref_name, + reference, + } => { + found_set_snapshot_ref = true; + assert_eq!(ref_name, crate::transaction::MAIN_BRANCH); + assert!(reference.snapshot_id > 0); + } + _ => {} + } + } + assert!(found_add_snapshot); + assert!(found_set_snapshot_ref); + + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &new_tx.updates[0] { + snapshot + } else { + panic!("Expected the first update to be an AddSnapshot update"); + }; + + let manifest_list = new_snapshot + .load_manifest_list(fixture.table.file_io(), fixture.table.metadata()) + .await + .expect("Failed to load manifest list"); + + assert_eq!( + manifest_list.entries().len(), + 2, + "Expected 2 manifest list entries, got {}", + manifest_list.entries().len() + ); + + // Load the manifest from the manifest list + let manifest = manifest_list.entries()[0] + .load_manifest(fixture.table.file_io()) + .await + .expect("Failed to load manifest"); + + // Check that the manifest contains three entries. + assert_eq!(manifest.entries().len(), 3); + + // Verify each file path appears in manifest. + let manifest_paths: Vec = manifest + .entries() + .iter() + .map(|entry| entry.data_file().file_path.clone()) + .collect(); + for path in file_paths { + assert!(manifest_paths.contains(&path)); + } + } } diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 5561b1913..3c21c350b 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -27,6 +27,7 @@ use futures::future::BoxFuture; use itertools::Itertools; use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter; use parquet::arrow::AsyncArrowWriter; +use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; use parquet::file::statistics::{from_thrift, Statistics}; use parquet::format::FileMetaData; @@ -39,8 +40,8 @@ use crate::arrow::{ }; use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ - visit_schema, DataFileBuilder, DataFileFormat, Datum, ListType, MapType, NestedFieldRef, - PrimitiveType, Schema, SchemaRef, SchemaVisitor, StructType, Type, + visit_schema, DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, MapType, + NestedFieldRef, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct, StructType, Type, }; use crate::writer::CurrentFileStatus; use crate::{Error, ErrorKind, Result}; @@ -105,7 +106,8 @@ impl FileWriterBuilder for ParquetWr } } -struct IndexByParquetPathName { +/// A mapping from Parquet column path names to internal field id +pub struct IndexByParquetPathName { name_to_id: HashMap, field_names: Vec, @@ -114,6 +116,7 @@ struct IndexByParquetPathName { } impl IndexByParquetPathName { + /// Creates a new, empty `IndexByParquetPathName` pub fn new() -> Self { Self { name_to_id: HashMap::new(), @@ -122,11 +125,18 @@ impl IndexByParquetPathName { } } + /// Retrieves the internal field ID pub fn get(&self, name: &str) -> Option<&i32> { self.name_to_id.get(name) } } +impl Default for IndexByParquetPathName { + fn default() -> Self { + Self::new() + } +} + impl SchemaVisitor for IndexByParquetPathName { type T = (); @@ -226,7 +236,8 @@ struct MinMaxColAggregator { } impl MinMaxColAggregator { - fn new(schema: SchemaRef) -> Self { + /// Creates new and empty `MinMaxColAggregator` + pub fn new(schema: SchemaRef) -> Self { Self { lower_bounds: HashMap::new(), upper_bounds: HashMap::new(), @@ -256,7 +267,8 @@ impl MinMaxColAggregator { .or_insert(datum); } - fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> { + /// Update statistics + pub fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> { let Some(ty) = self .schema .field_by_id(field_id) @@ -301,7 +313,8 @@ impl MinMaxColAggregator { Ok(()) } - fn produce(self) -> (HashMap, HashMap) { + /// Returns lower and upper bounds + pub fn produce(self) -> (HashMap, HashMap) { (self.lower_bounds, self.upper_bounds) } } @@ -391,6 +404,79 @@ impl ParquetWriter { ); Ok(builder) } + + /// `ParquetMetadata` to data file builder + pub fn parquet_to_data_file_builder( + schema: SchemaRef, + metadata: Arc, + written_size: usize, + file_path: String, + ) -> Result { + let index_by_parquet_path = { + let mut visitor = IndexByParquetPathName::new(); + visit_schema(&schema, &mut visitor)?; + visitor + }; + + let (column_sizes, value_counts, null_value_counts, (lower_bounds, upper_bounds)) = { + let mut per_col_size: HashMap = HashMap::new(); + let mut per_col_val_num: HashMap = HashMap::new(); + let mut per_col_null_val_num: HashMap = HashMap::new(); + let mut min_max_agg = MinMaxColAggregator::new(schema); + + for row_group in metadata.row_groups() { + for column_chunk_metadata in row_group.columns() { + let parquet_path = column_chunk_metadata.column_descr().path().string(); + + let Some(&field_id) = index_by_parquet_path.get(&parquet_path) else { + continue; + }; + + *per_col_size.entry(field_id).or_insert(0) += + column_chunk_metadata.compressed_size() as u64; + *per_col_val_num.entry(field_id).or_insert(0) += + column_chunk_metadata.num_values() as u64; + + if let Some(statistics) = column_chunk_metadata.statistics() { + if let Some(null_count) = statistics.null_count_opt() { + *per_col_null_val_num.entry(field_id).or_insert(0) += null_count; + } + + min_max_agg.update(field_id, statistics.clone())?; + } + } + } + ( + per_col_size, + per_col_val_num, + per_col_null_val_num, + min_max_agg.produce(), + ) + }; + + let mut builder = DataFileBuilder::default(); + builder + .content(DataContentType::Data) + .file_path(file_path) + .file_format(DataFileFormat::Parquet) + .partition(Struct::empty()) + .record_count(metadata.file_metadata().num_rows() as u64) + .file_size_in_bytes(written_size as u64) + .column_sizes(column_sizes) + .value_counts(value_counts) + .null_value_counts(null_value_counts) + .lower_bounds(lower_bounds) + .upper_bounds(upper_bounds) + .split_offsets( + metadata + .row_groups() + .iter() + .filter_map(|group| group.file_offset()) + .collect(), + ); + + Ok(builder) + } } impl FileWriter for ParquetWriter { diff --git a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs index a24b88634..ae959a9e2 100644 --- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs @@ -114,7 +114,7 @@ async fn test_append_data_file() { let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx = append_action.apply().await.unwrap(); + let tx = append_action.apply(tx).await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result @@ -134,7 +134,7 @@ async fn test_append_data_file() { let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx = append_action.apply().await.unwrap(); + let tx = append_action.apply(tx).await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result again diff --git a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs index 42cc596f5..4da42d666 100644 --- a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs @@ -123,7 +123,7 @@ async fn test_append_partition_data_file() { append_action .add_data_files(data_file_valid.clone()) .unwrap(); - let tx = append_action.apply().await.unwrap(); + let tx = append_action.apply(tx).await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result diff --git a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs index 2686a1d26..be5d9d4e9 100644 --- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs @@ -92,11 +92,12 @@ async fn test_append_data_file_conflict() { let tx1 = Transaction::new(&table); let mut append_action = tx1.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx1 = append_action.apply().await.unwrap(); + let tx1 = append_action.apply(tx1).await.unwrap(); + let tx2 = Transaction::new(&table); let mut append_action = tx2.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx2 = append_action.apply().await.unwrap(); + let tx2 = append_action.apply(tx2).await.unwrap(); let table = tx2 .commit(&rest_catalog) .await diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs b/crates/integration_tests/tests/shared_tests/scan_all_type.rs index 673a78ac0..2d738c2bb 100644 --- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs +++ b/crates/integration_tests/tests/shared_tests/scan_all_type.rs @@ -311,7 +311,7 @@ async fn test_scan_all_type() { let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); - let tx = append_action.apply().await.unwrap(); + let tx = append_action.apply(tx).await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result