From d0a2897cb35385c781020dd5d69f098244295042 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Mon, 9 Dec 2024 18:45:17 +0800 Subject: [PATCH 1/2] remove config param in writer builder --- .../writer/base_writer/data_file_writer.rs | 31 +++++-------------- .../base_writer/equality_delete_writer.rs | 24 +++++++------- crates/iceberg/src/writer/mod.rs | 4 +-- .../tests/append_data_file_test.rs | 9 ++---- .../tests/conflict_commit_test.rs | 9 ++---- 5 files changed, 27 insertions(+), 50 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs index c32c98bbc..c6573165e 100644 --- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -29,25 +29,15 @@ use crate::Result; #[derive(Clone)] pub struct DataFileWriterBuilder { inner: B, + partition_value: Option, } impl DataFileWriterBuilder { /// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`. - pub fn new(inner: B) -> Self { - Self { inner } - } -} - -/// Config for `DataFileWriter`. -pub struct DataFileWriterConfig { - partition_value: Struct, -} - -impl DataFileWriterConfig { - /// Create a new `DataFileWriterConfig` with partition value. - pub fn new(partition_value: Option) -> Self { + pub fn new(inner: B, partition_value: Option) -> Self { Self { - partition_value: partition_value.unwrap_or(Struct::empty()), + inner, + partition_value, } } } @@ -55,12 +45,11 @@ impl DataFileWriterConfig { #[async_trait::async_trait] impl IcebergWriterBuilder for DataFileWriterBuilder { type R = DataFileWriter; - type C = DataFileWriterConfig; - async fn build(self, config: Self::C) -> Result { + async fn build(self) -> Result { Ok(DataFileWriter { inner_writer: Some(self.inner.clone().build().await?), - partition_value: config.partition_value, + partition_value: self.partition_value.unwrap_or(Struct::empty()), }) } } @@ -115,9 +104,7 @@ mod test { use crate::io::FileIOBuilder; use crate::spec::{DataContentType, DataFileFormat, Schema, Struct}; - use crate::writer::base_writer::data_file_writer::{ - DataFileWriterBuilder, DataFileWriterConfig, - }; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; use crate::writer::file_writer::location_generator::test::MockLocationGenerator; use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; use crate::writer::file_writer::ParquetWriterBuilder; @@ -140,9 +127,7 @@ mod test { location_gen, file_name_gen, ); - let mut data_file_writer = DataFileWriterBuilder::new(pw) - .build(DataFileWriterConfig::new(None)) - .await?; + let mut data_file_writer = DataFileWriterBuilder::new(pw, None).build().await?; let data_file = data_file_writer.close().await.unwrap(); assert_eq!(data_file.len(), 1); diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 06d96f830..8e0a82e8e 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -35,16 +35,18 @@ use crate::{Error, ErrorKind, Result}; #[derive(Clone)] pub struct EqualityDeleteFileWriterBuilder { inner: B, + config: EqualityDeleteWriterConfig, } impl EqualityDeleteFileWriterBuilder { /// Create a new `EqualityDeleteFileWriterBuilder` using a `FileWriterBuilder`. - pub fn new(inner: B) -> Self { - Self { inner } + pub fn new(inner: B, config: EqualityDeleteWriterConfig) -> Self { + Self { inner, config } } } /// Config for `EqualityDeleteWriter`. +#[derive(Clone)] pub struct EqualityDeleteWriterConfig { // Field ids used to determine row equality in equality delete files. equality_ids: Vec, @@ -108,14 +110,13 @@ impl EqualityDeleteWriterConfig { #[async_trait::async_trait] impl IcebergWriterBuilder for EqualityDeleteFileWriterBuilder { type R = EqualityDeleteFileWriter; - type C = EqualityDeleteWriterConfig; - async fn build(self, config: Self::C) -> Result { + async fn build(self) -> Result { Ok(EqualityDeleteFileWriter { inner_writer: Some(self.inner.clone().build().await?), - projector: config.projector, - equality_ids: config.equality_ids, - partition_value: config.partition_value, + projector: self.config.projector, + equality_ids: self.config.equality_ids, + partition_value: self.config.partition_value, }) } } @@ -396,9 +397,8 @@ mod test { location_gen, file_name_gen, ); - - let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb) - .build(equality_config) + let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb, equality_config) + .build() .await?; // write @@ -561,8 +561,8 @@ mod test { location_gen, file_name_gen, ); - let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb) - .build(config) + let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb, config) + .build() .await?; // prepare data diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 6cb9aaee6..64357a0fe 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -63,10 +63,8 @@ pub trait IcebergWriterBuilder: { /// The associated writer type. type R: IcebergWriter; - /// The associated writer config type used to build the writer. - type C; /// Build the iceberg writer. - async fn build(self, config: Self::C) -> Result; + async fn build(self) -> Result; } /// The iceberg writer used to write data to iceberg table. diff --git a/crates/integration_tests/tests/append_data_file_test.rs b/crates/integration_tests/tests/append_data_file_test.rs index 87e805c23..60d4f04c6 100644 --- a/crates/integration_tests/tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/append_data_file_test.rs @@ -24,7 +24,7 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::transaction::Transaction; -use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig}; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; @@ -97,11 +97,8 @@ async fn test_append_data_file() { location_generator.clone(), file_name_generator.clone(), ); - let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder); - let mut data_file_writer = data_file_writer_builder - .build(DataFileWriterConfig::new(None)) - .await - .unwrap(); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None); + let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); diff --git a/crates/integration_tests/tests/conflict_commit_test.rs b/crates/integration_tests/tests/conflict_commit_test.rs index f3dd70f16..52575d1ce 100644 --- a/crates/integration_tests/tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/conflict_commit_test.rs @@ -24,7 +24,7 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::transaction::Transaction; -use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig}; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; @@ -96,11 +96,8 @@ async fn test_append_data_file_conflict() { location_generator.clone(), file_name_generator.clone(), ); - let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder); - let mut data_file_writer = data_file_writer_builder - .build(DataFileWriterConfig::new(None)) - .await - .unwrap(); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None); + let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); From d51ab8544e027aeea014ddfb9663d7bc4aaac195 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 10 Dec 2024 00:36:48 +0800 Subject: [PATCH 2/2] add Debug derive --- crates/iceberg/src/arrow/record_batch_projector.rs | 2 +- crates/iceberg/src/writer/base_writer/data_file_writer.rs | 3 ++- .../iceberg/src/writer/base_writer/equality_delete_writer.rs | 5 +++-- crates/iceberg/src/writer/file_writer/location_generator.rs | 4 ++-- crates/iceberg/src/writer/file_writer/parquet_writer.rs | 2 +- 5 files changed, 9 insertions(+), 7 deletions(-) diff --git a/crates/iceberg/src/arrow/record_batch_projector.rs b/crates/iceberg/src/arrow/record_batch_projector.rs index f218983aa..e167eeedd 100644 --- a/crates/iceberg/src/arrow/record_batch_projector.rs +++ b/crates/iceberg/src/arrow/record_batch_projector.rs @@ -24,7 +24,7 @@ use crate::error::Result; use crate::{Error, ErrorKind}; /// Help to project specific field from `RecordBatch`` according to the fields id. -#[derive(Clone)] +#[derive(Clone, Debug)] pub(crate) struct RecordBatchProjector { // A vector of vectors, where each inner vector represents the index path to access a specific field in a nested structure. // E.g. [[0], [1, 2]] means the first field is accessed directly from the first column, diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs index c6573165e..6f9c0a892 100644 --- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -26,7 +26,7 @@ use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder}; use crate::Result; /// Builder for `DataFileWriter`. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct DataFileWriterBuilder { inner: B, partition_value: Option, @@ -55,6 +55,7 @@ impl IcebergWriterBuilder for DataFileWriterBuilder { } /// A writer write data is within one spec/partition. +#[derive(Debug)] pub struct DataFileWriter { inner_writer: Option, partition_value: Struct, diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 8e0a82e8e..328e2b93d 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -32,7 +32,7 @@ use crate::writer::{IcebergWriter, IcebergWriterBuilder}; use crate::{Error, ErrorKind, Result}; /// Builder for `EqualityDeleteWriter`. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct EqualityDeleteFileWriterBuilder { inner: B, config: EqualityDeleteWriterConfig, @@ -46,7 +46,7 @@ impl EqualityDeleteFileWriterBuilder { } /// Config for `EqualityDeleteWriter`. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct EqualityDeleteWriterConfig { // Field ids used to determine row equality in equality delete files. equality_ids: Vec, @@ -122,6 +122,7 @@ impl IcebergWriterBuilder for EqualityDeleteFileWriterBuil } /// Writer used to write equality delete files. +#[derive(Debug)] pub struct EqualityDeleteFileWriter { inner_writer: Option, projector: RecordBatchProjector, diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs b/crates/iceberg/src/writer/file_writer/location_generator.rs index def18b580..eb255fc67 100644 --- a/crates/iceberg/src/writer/file_writer/location_generator.rs +++ b/crates/iceberg/src/writer/file_writer/location_generator.rs @@ -35,7 +35,7 @@ const WRITE_DATA_LOCATION: &str = "write.data.path"; const WRITE_FOLDER_STORAGE_LOCATION: &str = "write.folder-storage.path"; const DEFAULT_DATA_DIR: &str = "/data"; -#[derive(Clone)] +#[derive(Clone, Debug)] /// `DefaultLocationGenerator` used to generate the data dir location of data file. /// The location is generated based on the table location and the data location in table properties. pub struct DefaultLocationGenerator { @@ -87,7 +87,7 @@ pub trait FileNameGenerator: Clone + Send + 'static { /// `DefaultFileNameGenerator` used to generate file name for data file. The file name can be /// passed to `LocationGenerator` to generate the location of the file. /// The file name format is "{prefix}-{file_count}[-{suffix}].{file_format}". -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct DefaultFileNameGenerator { prefix: String, suffix: String, diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 35c91e637..09f9a7057 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -46,7 +46,7 @@ use crate::writer::CurrentFileStatus; use crate::{Error, ErrorKind, Result}; /// ParquetWriterBuilder is used to builder a [`ParquetWriter`] -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ParquetWriterBuilder { props: WriterProperties, schema: SchemaRef,