Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add existing parquet files #960

Open
wants to merge 21 commits into
base: main
Choose a base branch
from

Conversation

jonathanc-n
Copy link
Contributor

Completes #932

Allows for adding existing parquet files by using parquet metadata to create DataFiles. Then fast appending them using existing api

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jonathanc-n for this pr, this seems duplicated with fast_append api, is there anything missing in that api?

@jonathanc-n
Copy link
Contributor Author

@liurenjie1024 I believe FastAppend is for directly appending DataFiles to the snapshot, whereas this pr takes existing parquet file paths, parses the Parquet metadata and converts it into DataFiles which are then fast appended. @ZENOTME Would you like to verify this?

@liurenjie1024
Copy link
Contributor

@liurenjie1024 I believe FastAppend is for directly appending DataFiles to the snapshot, whereas this pr takes existing parquet file paths, parses the Parquet metadata and converts it into DataFiles which are then fast appended. @ZENOTME Would you like to verify this?

I have concerns to put this into transaction api. It seems that what's necessary is to build a DataFile from exisitng parquet file, then user could call fast append to do it. But this is typically a dangerous operation because the schema in parquet is not verified. Is it possible to use FileWriter to write data to parquet in your case?

@jonathanc-n jonathanc-n marked this pull request as draft February 10, 2025 04:12
@ZENOTME
Copy link
Contributor

ZENOTME commented Feb 10, 2025

@liurenjie1024 I believe FastAppend is for directly appending DataFiles to the snapshot, whereas this pr takes existing parquet file paths, parses the Parquet metadata and converts it into DataFiles which are then fast appended. @ZENOTME Would you like to verify this?

I have concerns to put this into transaction api. It seems that what's necessary is to build a DataFile from exisitng parquet file, then user could call fast append to do it. But this is typically a dangerous operation because the schema in parquet is not verified. Is it possible to use FileWriter to write data to parquet in your case?

This API is different from FastAppend:

  • FastAppend is used to append DataFile
  • This API is used to extract DataFile from existing file and append DataFile

In iceberg-python, it's a API in transaction: https://github.com/apache/iceberg-python/blob/dd175aadfdf03df707bed37008f217258a916369/pyiceberg/table/__init__.py#L671.
But interestingly, seems I can't find it in iceberg-java. cc @Fokko

But this is typically a dangerous operation because the schema in parquet is not verified. Is it possible to use FileWriter to write data to parquet in your case?

Yes, we should verify the schema. And it's also done in iceberg-python: https://github.com/apache/iceberg-python/blob/main/pyiceberg/io/pyarrow.py#L2431.

@jonathanc-n
Copy link
Contributor Author

Yes, i will put this out for review after I add some schema validation. should be put out tomorrow

@jonathanc-n
Copy link
Contributor Author

@ZENOTME Do you think the api should be kept in transactions or somewhere else?

@ZENOTME
Copy link
Contributor

ZENOTME commented Feb 10, 2025

@ZENOTME Do you think the api should be kept in transactions or somewhere else?

It's ok to keep it in transaction seems it's a safe operation if we have schema validation and that's what iceberg-python do. Is there other concern for this? @liurenjie1024

@Fokko
Copy link
Contributor

Fokko commented Feb 10, 2025

PyIceberg and Iceberg-Java are a bit different. Where PyIceberg is used by end-users, Iceberg-Java is often embedded in a query engine. I think this is the reason why it isn't part of the transaction API. Spark does have an add_files procedure.

To successfully be able to add files to a table, I think three things are essential:

  • Schema As already mentioned, the schema should be either the same or compatible. I would start with the first to make it simple and robust.
  • Name Mapping Since the Parquet probably doesn't contain field IDs for column tracking, we need to fall back on name-mapping.
  • Metrics When adding a file to the table, we should extract the upper-lower bound, number of nulls, etc from the Parquet footer and store it in the Iceberg metadata. This is important for Iceberg to maintain its promise of doing efficient scan's. Without this information, the file would always be included when planning a query.

@jonathanc-n
Copy link
Contributor Author

@Fokko I was looking to do the name mapping and more metrics in another pr. Would you rather I include it in this one?

@Fokko
Copy link
Contributor

Fokko commented Feb 10, 2025

@jonathanc-n Let's do that in a separate PR 👍

@jonathanc-n
Copy link
Contributor Author

jonathanc-n commented Feb 11, 2025

For metadata retrieval, it seems i can use ParquetWriter::to_data_file_builder to avoid duplicating more code. However the file metadata it takes in is the thrift FileMetadata , while the ArrowFileReader provides the parsed metadata which doesn't contain enough information. I was planning on submitting a pr in arrow-rs to allow ParquetMetadataReader to return the tfilemetadata format. Are there any other alternatives to this? cc @ZENOTME @Fokko @liurenjie1024

@ZENOTME
Copy link
Contributor

ZENOTME commented Feb 11, 2025

while the ArrowFileReader provides the parsed metadata which doesn't contain enough information

Thanks for your investigation @jonathanc-n! But seems the parsed metadata contain enough information (such as statistics)

For metadata retrieval, it seems i can use ParquetWriter::to_data_file_builder to avoid duplicating more code.
I was planning on submitting a pr in arrow-rs to allow ParquetMetadataReader to return the tfilemetadata format.

I think the parsed format(memory representation) maybe more friendly for us to extract information. (Actually I will convert the thrift format to parsed format). So I think we should refine the to_data_file_builder to take the parsed format.🤔

@ZENOTME
Copy link
Contributor

ZENOTME commented Feb 11, 2025

@Fokko I was looking to do the name mapping and more metrics in another pr. Would you rather I include it in this one?

def pyarrow_to_schema(
    schema: pa.Schema, name_mapping: Optional[NameMapping] = None, downcast_ns_timestamp_to_us: bool = False
) -> Schema:
    has_ids = visit_pyarrow(schema, _HasIds())
    if has_ids:
        return visit_pyarrow(schema, _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us))
    elif name_mapping is not None:
        schema_without_ids = _pyarrow_to_schema_without_ids(schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
        return apply_name_mapping(schema_without_ids, name_mapping)
    else:
        raise ValueError(
            "Parquet file does not have field-ids and the Iceberg table does not have 'schema.name-mapping.default' defined"
        )

For name map apply, it relies on the SchemaVisitorWithPartner and it will be introduced at #731. Maybe we can skip the schema without field id now and support name map apply after #731 complete.

@jonathanc-n jonathanc-n marked this pull request as ready for review February 12, 2025 01:29
@jonathanc-n
Copy link
Contributor Author

jonathanc-n commented Feb 12, 2025

I have changed the data file builder and reimplemented the original. I couldn't change the parameter passed to to_data_file_builder as the ParquetWriter returns the unparsed metadata. I can make a follow up pr for metadata validation.

@liurenjie1024
Copy link
Contributor

PyIceberg and Iceberg-Java are a bit different. Where PyIceberg is used by end-users, Iceberg-Java is often embedded in a query engine.

I think iceberg-rust's position is more like iceberg-java, since there already existing sql engines written in rust(datafusion, databend, polars, etc), and integrating them with iceberg-rust makes things easier. I think it's a good idea to have methods tto convert a parquet file to a data file, including works mentioned by @Fokko . But it's better to handle other things to query engines, since it involves io, parallelism management.

@ZENOTME
Copy link
Contributor

ZENOTME commented Feb 12, 2025

PyIceberg and Iceberg-Java are a bit different. Where PyIceberg is used by end-users, Iceberg-Java is often embedded in a query engine.

I think iceberg-rust's position is more like iceberg-java, since there already existing sql engines written in rust(datafusion, databend, polars, etc), and integrating them with iceberg-rust makes things easier. I think it's a good idea to have methods tto convert a parquet file to a data file, including works mentioned by @Fokko . But it's better to handle other things to query engines, since it involves io, parallelism management.

So in summary, for this feature, what we would like to provide is a parquet_files_to_data_files in the arrow module(or create a parquet module). The parquet_files_to_data_files actually does two things:

  1. schema compatibility check
  2. metrics collection (we can derive a function data_file_statistics_from_parquet_metadata for this which can be reused in parquet file writer)

How do you think @liurenjie1024 @jonathanc-n @Fokko

@jonathanc-n
Copy link
Contributor Author

@ZENOTME Yes, that souds fine to me. This pr contains the parquet files to data files. However I plan on making up a follow up pr with the schema compatability check

@liurenjie1024
Copy link
Contributor

So in summary, for this feature, what we would like to provide is a parquet_files_to_data_files in the arrow module(or create a parquet module). The parquet_files_to_data_files actually does two things:

schema compatibility check
metrics collection (we can derive a function data_file_statistics_from_parquet_metadata for this which can be reused in parquet file writer)

This sounds reasonable to me. After we have this function, could extend FastAppendAction to add existing parquet file.

"Partition field should only be a primitive type.",
)
})?;
Ok(Some(primitive_type.type_to_literal()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the type_to_literal function, I think this is correct. We don't want to set default values in the partition spec. I think there are two options:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this makes sense, we can go with first option and add a todo on the check if partition exist.

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jonathanc-n for this pr, generally I'm fine with current direction, but I have some suggestions with code orgnization.

}

/// `ParquetMetadata` to data file builder
pub fn parquet_to_data_file_builder(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some suggestion for this method:

  1. It has a lot of duplicated with parquet file writer, we should reuse them.
  2. This method should not be part of Transaction, it should be parquet module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i mentioned the problem earlier with reusing the function here. Writer returns raw metadata which is what the original ParquetWriter::to_data_file_builder. In this case we are doing a read with the arrowfilereader which can only return the parsed metadata. This can be fixed if there is some conversion function to convert raw to parsed but I haven't seemed to be able to find one 🤔

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jonathanc-n , just finished first round of review, and left some suggestions to improve.

Comment on lines 257 to 258
transaction: Transaction<'a>,
file_paths: Vec<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
transaction: Transaction<'a>,
file_paths: Vec<String>,
&mut self,
file_path: &str

As an api, I would suggest to ask user to provid one filename only so that user could do it concurrently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe support for this should be added after #949. as of right now duplicate actions are not allowed

/// `ParquetMetadata` to data file builder
pub fn parquet_to_data_file_builder(
schema: SchemaRef,
metadata: Arc<ParquetMetaData>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get why we need to duplicate so much code with ParquetWriter::to_data_file_builder, how about simply extracting FileMetaData from ParquetMetaData and calling to_data_file_builder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused here, to clarify the FileMetadata in ParquetMetadata is different from the FileMetadata.

If I were to convert the ParquetMetadata to the filemetadata, I do not think the conversion overhead is worth it .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, currently used FileMetadata is the auto generated data structure from thrift definition, while the FileMetadata I'm suggesting is an in memory representation of it. I think we should use the in memory representation in the original one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked in #1004

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that after merge. Anything else needs to be changed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants