Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce unified DataSourceExec for provided datasources, remove ParquetExec, CsvExec, etc #14224

Merged
merged 86 commits into from
Feb 6, 2025

Conversation

mertak-synnada
Copy link
Contributor

@mertak-synnada mertak-synnada commented Jan 21, 2025

Which issue does this PR close?

Rationale for this change

This PR merges all Data sources into one Execution Plan, named DataSourceExec, and a single trait named DataSource which is inspired by DataSink. File-related shared behaviors are merged into FileSourceConfig, and a MemorySourceConfig is created. MemoryExec, ArrowExec, AvroExec, CsvExec, NdJsonExec, and ParquetExec are deprecated and merged into DataSourceExec.

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

fix csv_json example
# Conflicts:
#	datafusion/physical-plan/src/aggregates/mod.rs
# Conflicts:
#	datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
#	datafusion/physical-plan/src/memory.rs
# Conflicts:
#	datafusion/core/src/datasource/file_format/arrow.rs
#	datafusion/core/src/datasource/file_format/csv.rs
#	datafusion/core/src/datasource/file_format/json.rs
#	datafusion/core/src/datasource/file_format/parquet.rs
#	datafusion/core/src/datasource/memory.rs
#	datafusion/core/src/test/mod.rs
#	datafusion/physical-plan/src/memory.rs
# Conflicts:
#	datafusion/sqllogictest/test_files/aggregate.slt
#	datafusion/sqllogictest/test_files/limit.slt
#	datafusion/sqllogictest/test_files/union.slt
@mertak-synnada
Copy link
Contributor Author

@alamb The FileType enum was forgotten, I deleted it completely and returned just a &str for file_type()

Changed the ParquetSource initialization, now new() is only accepting TableParquetOptions , metadata_size_hint and predicate (also file_schema was related to it) moved into with_xxx functions.

Deprecated ParquetExecBuilder, and added link to ParquetSource example documentation. Also added a link from FileSource to ParquetSource and CsvSource examples.

Thanks for all the valuable feedback! I believe we're in a better state now 🚀

@ozankabak
Copy link
Contributor

@alamb The FileType enum was forgotten, I deleted it completely and returned just a &str for file_type()

Great - custom source creators will just implement some MyCoolFormatSource just like ParquetSource implements FileSource. No need for hardcoded enums or another level of traits. They inform the overall mechanism of their file type extension by simply overriding the file_type method.


I think we addressed all the feedback and made the PR even better. Migration path is simple, and two major downstream projects took a pass on it. Let's wait for a few hours in case anything else comes in, and then proceed with the merge. @alamb, can you take a final look to verify all is good? Thanks.

@alamb
Copy link
Contributor

alamb commented Feb 6, 2025

@alamb, can you take a final look to verify all is good? Thanks.

Doing now -- I am figuring out the pattern for migration where we have code that looks for ParquetExec in the plan . Will have an answer in a few minutes.

@alamb
Copy link
Contributor

alamb commented Feb 6, 2025

I found with this small change I could rewrite our optimizer passes fairly easily (it also seems to not change this particular PR). We can fix as a follow on PR

diff --git a/datafusion/physical-plan/src/source.rs b/datafusion/physical-plan/src/source.rs
index 0b7e6dd03..6f57870ce 100644
--- a/datafusion/physical-plan/src/source.rs
+++ b/datafusion/physical-plan/src/source.rs
@@ -176,8 +176,8 @@ impl DataSourceExec {

     /// Return the source object
     #[allow(unused)]
-    pub fn source(&self) -> Arc<dyn DataSource> {
-        Arc::clone(&self.source)
+    pub fn source(&self) -> &Arc<dyn DataSource> {
+        &self.source
     }

Rationale

We have a bunch of code like this in influxdb_iox for rewriting ParquetExec in various ways:

https://github.com/influxdata/influxdb3_core/blob/a5f6076c966f4940a67998e0b85d12c3e8596715/iox_query/src/physical_optimizer/cached_parquet_data.rs#L52-L88

To make this easier I am trying to replicate the pattern of plan.as_any().downcast_ref::<ParquetExec> and I came up with this:

(However, the rust lifetime rules prevented this from compiling without the change above

/// A view of a [`DataSourceExec`] that scans parquet files
pub struct ParquetExecWrapper<'a> {
    base_config: &'a FileScanConfig,
    parquet_source: &'a ParquetSource,
}

impl<'a> ParquetExecWrapper<'a> {
    /// Create a new wrapper from a [`ExecutionPlan`], returning `None` if the
    /// plan is not a [`DataSourceExec`] that scans Parquet files
    pub fn try_new(plan: &'a dyn ExecutionPlan) -> Option<Self> {
        let plan = plan.as_ref();
        let data_source_exec = plan.as_any().downcast_ref::<DataSourceExec>()?;
        let base_config = data_source_exec
            .source()
            .as_any()
            .downcast_ref::<FileScanConfig>()?;
        let parquet_source = base_config
            .file_source()
            .as_any()
            .downcast_ref::<ParquetSource>()?;
        Some(Self {
            base_config,
            parquet_source,
        })
    }

    /// Return the underlying [`FileScanConfig`]
    pub fn base_config(&self) -> &'a FileScanConfig {
        self.base_config
    }

    /// Return the underlying [`ParquetSource`]
    pub fn parquet_source(&self) -> &'a ParquetSource {
        self.parquet_source
    }
}

Otherwise things are going well -- I am making good progress

@ozankabak
Copy link
Contributor

ozankabak commented Feb 6, 2025

Otherwise things are going well -- I am making good progress

Great! The change you suggested makes sense. Since it is a very small change, we'll shortly add it to this PR and so migration smoother to others as well.

@alamb
Copy link
Contributor

alamb commented Feb 6, 2025

Something else I noticed is the import paths for ParquetExec are gone (so I got a lot of compile errors about missing ParquetExec):

error[E0432]: unresolved import `datafusion::datasource::physical_plan::ParquetExec`
 --> iox_query/src/physical_optimizer/limits.rs:4:5
  |
4 | use datafusion::datasource::physical_plan::ParquetExec;
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ no `ParquetExec` in `datasource::physical_plan`
  |
help: consider importing this struct instead
  |
4 | use datafusion::datasource::physical_plan::parquet::ParquetExec;
  |     ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Suggested action is reexport the file here

--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -33,7 +33,7 @@ pub(crate) use self::json::plan_to_json;
 #[cfg(feature = "parquet")]
 pub use self::parquet::source::ParquetSource;
 #[cfg(feature = "parquet")]
-pub use self::parquet::{ParquetFileMetrics, ParquetFileReaderFactory};
+pub use self::parquet::{ParquetFileMetrics, ParquetFileReaderFactory, ParquetExec};

@alamb
Copy link
Contributor

alamb commented Feb 6, 2025

Great! The change you suggested makes sense. Since it is a very small change, we'll shortly add it to this PR and so migration smoother to others as well.

To be clear, I think we can do these kinds of changes in a follow on PR too -- I haven't found any blockers to this PR being merged yet but I don't have our tests running yet to be sure.

But all in all things are going well from my perspective

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got enough of our code to compile and tests running that i think this PR is ok to merge. Thank you @mertak-synnada @ozankabak and @berkaysynnada -- this is pretty epic

I believe this will be a non trivial change for any system that directly creates and manipulates ParquetExec but I think the new structures seem to support everything we need.

It would be really great to write up a document (maybe a blog post) that explains this change and gives help for people upgrading. Specifically some examples of creating ParquetExec before and after (and something similar to CsvExec and AvroExec.

As I work through the rest of our upgrade, I may have additional ideas to make it easier.

I can't realistically review this PR in detail, bug I trust that other committers have done so 👍

@ozankabak
Copy link
Contributor

I got enough of our code to compile and tests running that i think this PR is ok to merge. Thank you @mertak-synnada @ozankabak and @berkaysynnada -- this is pretty epic

Great, let's go ahead 🚀

It would be really great to write up a document (maybe a blog post) that explains this change and gives help for people upgrading. Specifically some examples of creating ParquetExec before and after (and something similar to CsvExec and AvroExec.

This is a terrific idea -- we will be happy to write up a blog post that will serve a dual purpose: (1) help people with upgrading, and (2) brag about this epic PR 🙂

As I work through the rest of our upgrade, I may have additional ideas to make it easier.

Great, we will be happy to collaborate on any follow-on PRs.

@ozankabak ozankabak merged commit 5e1e693 into apache:main Feb 6, 2025
26 checks passed
@ozankabak
Copy link
Contributor

I just merged the PR -- I will check back in half an hour to see if there are any problems

@alamb
Copy link
Contributor

alamb commented Feb 6, 2025

This is a terrific idea -- we will be happy to write up a blog post that will serve a dual purpose: (1) help people with upgrading, and (2) brag about this epic PR 🙂

Yes 100% that would be perfect. I think such a post would be find to put on the datafusion blog but content marketing wise if you write it putting it on a synnada blog would make total sense too

@alamb
Copy link
Contributor

alamb commented Feb 14, 2025

Here is one small follow up from this PR:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate core Core DataFusion crate documentation Improvements or additions to documentation execution Related to the execution crate optimizer Optimizer rules physical-expr Physical Expressions proto Related to proto crate sqllogictest SQL Logic Tests (.slt) substrait
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement fetch limit for MemoryExec [DISCUSS] Single Source ExecutionPlan Across All TableProviders
10 participants