Skip to content

Commit

Permalink
remove config param in writer builder
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Dec 9, 2024
1 parent 4fba3f4 commit d0a2897
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 50 deletions.
31 changes: 8 additions & 23 deletions crates/iceberg/src/writer/base_writer/data_file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,38 +29,27 @@ use crate::Result;
#[derive(Clone)]
pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
inner: B,
partition_value: Option<Struct>,
}

impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
/// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
pub fn new(inner: B) -> Self {
Self { inner }
}
}

/// Config for `DataFileWriter`.
pub struct DataFileWriterConfig {
partition_value: Struct,
}

impl DataFileWriterConfig {
/// Create a new `DataFileWriterConfig` with partition value.
pub fn new(partition_value: Option<Struct>) -> Self {
pub fn new(inner: B, partition_value: Option<Struct>) -> Self {
Self {
partition_value: partition_value.unwrap_or(Struct::empty()),
inner,
partition_value,
}
}
}

#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
type R = DataFileWriter<B>;
type C = DataFileWriterConfig;

async fn build(self, config: Self::C) -> Result<Self::R> {
async fn build(self) -> Result<Self::R> {
Ok(DataFileWriter {
inner_writer: Some(self.inner.clone().build().await?),
partition_value: config.partition_value,
partition_value: self.partition_value.unwrap_or(Struct::empty()),
})
}
}
Expand Down Expand Up @@ -115,9 +104,7 @@ mod test {

use crate::io::FileIOBuilder;
use crate::spec::{DataContentType, DataFileFormat, Schema, Struct};
use crate::writer::base_writer::data_file_writer::{
DataFileWriterBuilder, DataFileWriterConfig,
};
use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
use crate::writer::file_writer::ParquetWriterBuilder;
Expand All @@ -140,9 +127,7 @@ mod test {
location_gen,
file_name_gen,
);
let mut data_file_writer = DataFileWriterBuilder::new(pw)
.build(DataFileWriterConfig::new(None))
.await?;
let mut data_file_writer = DataFileWriterBuilder::new(pw, None).build().await?;

let data_file = data_file_writer.close().await.unwrap();
assert_eq!(data_file.len(), 1);
Expand Down
24 changes: 12 additions & 12 deletions crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,18 @@ use crate::{Error, ErrorKind, Result};
#[derive(Clone)]
pub struct EqualityDeleteFileWriterBuilder<B: FileWriterBuilder> {
inner: B,
config: EqualityDeleteWriterConfig,
}

impl<B: FileWriterBuilder> EqualityDeleteFileWriterBuilder<B> {
/// Create a new `EqualityDeleteFileWriterBuilder` using a `FileWriterBuilder`.
pub fn new(inner: B) -> Self {
Self { inner }
pub fn new(inner: B, config: EqualityDeleteWriterConfig) -> Self {
Self { inner, config }
}
}

/// Config for `EqualityDeleteWriter`.
#[derive(Clone)]
pub struct EqualityDeleteWriterConfig {
// Field ids used to determine row equality in equality delete files.
equality_ids: Vec<i32>,
Expand Down Expand Up @@ -108,14 +110,13 @@ impl EqualityDeleteWriterConfig {
#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriterBuilder for EqualityDeleteFileWriterBuilder<B> {
type R = EqualityDeleteFileWriter<B>;
type C = EqualityDeleteWriterConfig;

async fn build(self, config: Self::C) -> Result<Self::R> {
async fn build(self) -> Result<Self::R> {
Ok(EqualityDeleteFileWriter {
inner_writer: Some(self.inner.clone().build().await?),
projector: config.projector,
equality_ids: config.equality_ids,
partition_value: config.partition_value,
projector: self.config.projector,
equality_ids: self.config.equality_ids,
partition_value: self.config.partition_value,
})
}
}
Expand Down Expand Up @@ -396,9 +397,8 @@ mod test {
location_gen,
file_name_gen,
);

let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb)
.build(equality_config)
let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb, equality_config)
.build()
.await?;

// write
Expand Down Expand Up @@ -561,8 +561,8 @@ mod test {
location_gen,
file_name_gen,
);
let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb)
.build(config)
let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb, config)
.build()
.await?;

// prepare data
Expand Down
4 changes: 1 addition & 3 deletions crates/iceberg/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,8 @@ pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
{
/// The associated writer type.
type R: IcebergWriter<I, O>;
/// The associated writer config type used to build the writer.
type C;
/// Build the iceberg writer.
async fn build(self, config: Self::C) -> Result<Self::R>;
async fn build(self) -> Result<Self::R>;
}

/// The iceberg writer used to write data to iceberg table.
Expand Down
9 changes: 3 additions & 6 deletions crates/integration_tests/tests/append_data_file_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
use futures::TryStreamExt;
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::transaction::Transaction;
use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig};
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
Expand Down Expand Up @@ -97,11 +97,8 @@ async fn test_append_data_file() {
location_generator.clone(),
file_name_generator.clone(),
);
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder);
let mut data_file_writer = data_file_writer_builder
.build(DataFileWriterConfig::new(None))
.await
.unwrap();
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None);
let mut data_file_writer = data_file_writer_builder.build().await.unwrap();
let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]);
Expand Down
9 changes: 3 additions & 6 deletions crates/integration_tests/tests/conflict_commit_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
use futures::TryStreamExt;
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::transaction::Transaction;
use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig};
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
Expand Down Expand Up @@ -96,11 +96,8 @@ async fn test_append_data_file_conflict() {
location_generator.clone(),
file_name_generator.clone(),
);
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder);
let mut data_file_writer = data_file_writer_builder
.build(DataFileWriterConfig::new(None))
.await
.unwrap();
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None);
let mut data_file_writer = data_file_writer_builder.build().await.unwrap();
let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]);
Expand Down

0 comments on commit d0a2897

Please sign in to comment.