Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for executing infinite files and boundedness-aware join reordering rule #4694

Merged
merged 17 commits into from
Dec 28, 2022
Merged

Support for executing infinite files and boundedness-aware join reordering rule #4694

merged 17 commits into from
Dec 28, 2022

Conversation

metesynnada
Copy link
Contributor

@metesynnada metesynnada commented Dec 21, 2022

Note: Approximately 1100 lines of changes come from test and test utils.

Which issue does this PR close?

Makes progress on #4285.
Closes #4692

Rationale for this change

This PR adds support for FIFO files and relevant basic infrastructure to Datafusion so that others can build on top of Datafusion to develop more complex streaming systems.

Key features and benefits of this addition include:

  • Infrastructure support for data streams in CSV, JSON, and AVRO formats.
  • Enabling incremental/streaming data processing use cases for tools that build on Datafusion.

What changes are included in this PR?

We will discuss the changes in multiple categories:

  • Data source changes: We covered CSV, JSON, and AVRO formats for this PR.
  • Execution plan changes: A basic API addition to handle/accommodate infinite streams at the plan level when writing custom operators.
  • Physical optimization changes: A new rule checks whether a given query can handle its infinite inputs, and reorders join inputs to transform breaking queries into runnable queries in such cases.

Data source changes

The current situation is that Datafusion provides support for working with regular

  • CSV files
  • JSON files
  • Avro files
  • Parquet files

Within this PR, we worked on the first three file types so that unbounded sources involving such formats will be supported.

Changes in CsvReadOptions, AvroReadOptions, and NdJsonReadOptions

For each file type, we added a new attribute to its options so that a user can "mark" their data source as infinite. This information is thereafter propagated appropriately: ListingOptions propagates the necessary information into FileScanConfig, this in turn affects each file format’s execution plans (CsvExec, NdJsonExec, and AvroExec).

struct FileScanConfig {
	..
	/// Indicates whether this plan may produce an infinite stream of records.
	pub infinite_source: bool,
}
struct ListingOptions {
	..
	/// DataFusion may optimize or adjust query plans (e.g. joins) to
	/// accommodate infinite data sources and run the given query in full
	/// pipelining mode. This flag lets Datafusion know that this file
	/// is potentially infinite. Currently, CSV, JSON, and AVRO formats
	/// are supported.
	pub infinite_source: bool,
}

We did not include ParquetReadOptions in this PR.

ExecutionPlan changes

The following addition to the ExecutionPlan API is powerful enough to provide a foundation for custom streaming operators:

pub trait ExecutionPlan: Debug + Send + Sync {
    ..
    /// Specifies whether this plan generates an infinite stream of records.
    /// If the plan does not support pipelining, but it its input(s) are
    /// infinite, returns an error to indicate this.
    fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
        Ok(false)
    }
}

With this change, each ExecutionPlan can now know whether its input is potentially infinite or not. For FilterExec, it is

impl ExecutionPlan for FilterExec { 
    ..
    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
        Ok(children[0])
    }
}

since FilterExec does not collect its whole input for calculating results. However, SortExec does not override the default Ok(false) setting since it has to collect everything to generate any result at all. Thus, it is a pipeline breaker.

Let’s define a simple INNER JOIN query

SELECT t2.c1 FROM infinite as t1 JOIN finite as t2 ON t1.c1 = t2.c1

As we know, a HashJoinExec collects the left side and streams the right side. Thus, the left side has to be a finite source.

	+--------------+              +--------------+                                            
        |              |  unbounded   |              |                                            
 Left   | Infinite     |    true      | Hash         |\true                                       
        | Data source  |--------------| Repartition  | \   +--------------+       +--------------+
        |              |              |              |  \  |              |       |              |
        +--------------+              +--------------+   - |  Hash Join   |-------| Projection   |
                                                         - |              |       |              |
        +--------------+              +--------------+  /  +--------------+       +--------------+
        |              |  unbounded   |              | /                                          
 Right  | Finite       |    false     | Hash         |/false                                      
        | Data Source  |--------------| Repartition  |                                            
        |              |              |              |                                            
        +--------------+              +--------------+

Given this new simple API, we now have the ability to write a rule that swaps join inputs to transform this query into a runnable query.

Physical optimization changes

Before this PR, we could not execute the query SELECT t2.c1 FROM infinite as t1 JOIN finite as t2 ON t1.c1 = t2.c1 since the left side comes from an infinite data source. However, we could save this query and make it executable by simply swapping join sides. Now, we introduce a new physical optimizer rule named PipelineChecker that coordinates the “execution saver” subrules.

Swapping join sides depending on statistical properties is not new, but we add an additional swap rule depending on the boundedness properties of the inputs (hash_join_swap_subrule). It basically transforms the above physical plan into this:

	+--------------+              +--------------+
        |              |  unbounded   |              |
 Left   | Finite       |    false     | Hash         |\false
        | Data source  |--------------| Repartition  | \   +--------------+       +--------------+
        |              |              |              |  \  |              | true  |              | true
        +--------------+              +--------------+   - |  Hash Join   |-------| Projection   |-----
                                                         - |              |       |              |
        +--------------+              +--------------+  /  +--------------+       +--------------+
        |              |  unbounded   |              | /
 Right  | Infinite     |    true      | Hash         |/true
        | Data Source  |--------------| Repartition  |
        |              |              |              |
        +--------------+              +--------------+

Obviously, not all queries can be "saved". Therefore, we introduce the checker that leverages the unbounded_output(children: &[bool]) API to output a useful error indicating exactly why the query can not run (i.e. it shows where the pipeline breaks). The rule simply applies the following logic via the transform_up API:

type PipelineCheckerSubrule =
    dyn Fn(&PipelineStatePropagator) -> Option<Result<PipelineStatePropagator>>;
fn apply_subrules_and_check_finiteness_requirements(
    mut input: PipelineStatePropagator,
    physical_optimizer_subrules: &Vec<Box<PhysicalOptimizerSubrule>>,
) -> Result<Option<PipelineStatePropagator>> {
    for sub_rule in physical_optimizer_subrules {
        match sub_rule(&input) {
            Some(Ok(value)) => {
                input = value;
            }
            Some(Err(e)) => return Err(e),
            _ => {}
        }
    }

    let plan = input.plan;
    let children = &input.children_unbounded;
    match plan.unbounded_output(children) {
        Ok(value) => Ok(Some(PipelineStatePropagator {
            plan,
            unbounded: value,
            children_unbounded: input.children_unbounded,
        })),
        Err(e) => Err(e),
    }
}

As seen above, the checker retrieves boundedness properties of an operator's children and checks whether the operator supports the configuration in question.

Are these changes tested?

  • PipelineChecker tests
  • hash_join_swap_subrule tests
  • File reading API attribute addition tests:
    • CsvReadOptions, NdJsonReadOptions, and AvroReadOptions creation tests
    • CsvReadOptions, NdJsonReadOptions, and AvroReadOptions to ListingTable conversion tests
    • Tests ensuring that schema is required for infinite files
  • SQL tests that verify end-to-end execution

Are there any user-facing changes?

There is one new optional flag users can supply when reading files to mark them as infinite; e.g.

// CSV
ctx.register_csv(
      "unbounded",
      path.as_str(),
      CsvReadOptions::new()
          .schema(unbounded_schema.as_ref()) // already exists, optional
          .mark_infinite(true), // added, optional
        )
        .await?;
// JSON
ctx.register_json(
      "unbounded",
      path.as_str(),
      NdJsonReadOptions::default()
          .schema(unbounded_schema.as_ref()) // added, optional
          .mark_infinite(true), // added, optional
).await?;
// AVRO
ctx.register_avro(
      "unbounded",
      path.as_str(),
      AvroReadOptions::default()
          .schema(unbounded_schema.as_ref()) // added, optional
          .mark_infinite(true), // added, optional
).await?;;

We also unified the APIs so that all now support schema injection from options, which was missing for JSON and AVRO formats. There is no breaking API change.

@github-actions github-actions bot added the core Core DataFusion crate label Dec 21, 2022
@ozankabak
Copy link
Contributor

ozankabak commented Dec 21, 2022

As we all agreed when discussing the streaming roadmap a few weeks ago, our initial focus was to identify the small-but-powerful infrastructural improvements and hooks we can introduce in Datafusion so that more complex streaming use cases can be supported either outside or inside Datafusion.

IMO, this PR makes a huge step towards this goal. It enables Datafusion to process infinite files like FIFOs, to present an API for factoring in boundedness during planning and optimization, and it even gives Datafusion power to deduce whether it can run a given query with the given finite/infinite inputs.

As @metesynnada mentions, the PR looks big, but the LOC comes mostly from tests. Other than those, changes are mostly localized to the file defining the PipelineChecker rule.

Looking forward to your comments and feedback!

@alamb
Copy link
Contributor

alamb commented Dec 23, 2022

Thank you for this PR -- I hope to find time to review it in the next day or two

@alamb
Copy link
Contributor

alamb commented Dec 26, 2022

I am sorry for the delay in review. It is on my queue. I just haven't had the time yet

@ozankabak
Copy link
Contributor

No worries, thank you!

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through this code and tests carefully and I found it well written, well documented, and straightforward to follow 🏆 . Very impressive work.

Overall Feedback

Thank you for the wonderful PR description.

I left some minor comments but I don't think they are necessary to merge this PR. Two I think are worth addressing (with comments, not necessarily code) are

  1. The new nix dependency
  2. Running BasicEnforcement twice

End to end tests

I also recommend creating an even higher level test in

datafusion/core/tests/fifo.rs
  1. Creates a FIFO file
  2. Plans a query with infinite input
  3. Starts running the query
  4. Feeds data into the FIFO
  5. Ensures the query produces results

to test this feature end-to-end

Documentation

It would be great to document the capability somewhere in the docs to help it become more discoverable. Somewhere in

https://github.com/apache/arrow-datafusion/tree/master/docs

I added a note to the "documentation epic" #3058

I recommend eventually creating an end to end (SQL) test with fifo into their own test file, perhaps something like

datafusion/core/tests/fifo.rs

Doing so would help its discoverability

_,
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti | JoinType::Full,
) => Err(DataFusionError::Internal(format!(
"{} join cannot be swapped.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"{} join cannot be swapped.",
"{} join cannot be swapped for unbounded input.",

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend making these messages more specific to aid greping for them if they are hit in end to end tests

Comment on lines 53 to 55
let physical_optimizer_subrules: Vec<Box<PipelineCheckerSubrule>> =
vec![Box::new(hash_join_swap_subrule)];
let state = pipeline.transform_up(&|p| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the need for a Vec of subrules. Does this means you intend to add more such rules in a follow on PR?

Copy link
Contributor Author

@metesynnada metesynnada Dec 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we are planning to expand this type of subrules. Separating the execution plan changes (it will be more than one eventually 😀) into multiple PhysicalOptimizerRule will add redundant plan traversals (and possibly a fixed point iteration), we plan to make the necessary changes for all possible executor changes while we are traversing.

@alamb
Copy link
Contributor

alamb commented Dec 27, 2022

cc @hntd187

@metesynnada
Copy link
Contributor Author

Thanks for the detailed review. We are also excited about adding such functionality to Datafusion. We will work on documentation as well 😀.

I'd like to ask about the "end-to-end" test, I provide the unbounded_file_with_swapped_join test, it quite looks like an end-to-end test. Do you suggest moving it to a better place? Maybe a version into datafusion-examples?

@alamb
Copy link
Contributor

alamb commented Dec 28, 2022

I'd like to ask about the "end-to-end" test, I provide the unbounded_file_with_swapped_join test, it quite looks like an end-to-end test. Do you suggest moving it to a better place? Maybe a version into datafusion-examples?

I think unbounded_file_with_swapped_join is good but it has low level manipulation of FIFOs, and doesn't verify the output record batch content (but verifies that the plumbing is working well)

What I was imaginging was something that showed this feature working end to end -- with SQL and a FIFO file as input and record batches as output.

@alamb
Copy link
Contributor

alamb commented Dec 28, 2022

@metesynnada please let me know when you are ready to merge this PR and I will do so. We can then continue to iterate on the API in subsequent PRs I think

@ozankabak
Copy link
Contributor

@alamb, feel free to merge after CI passes. The last commit I just sent only has some minor code simplifications and comment improvements. We agree to address the API stuff in a follow-on. Thank you!

@alamb alamb merged commit cd4fd80 into apache:master Dec 28, 2022
@alamb
Copy link
Contributor

alamb commented Dec 28, 2022

Thanks again @ozankabak

@ursabot
Copy link

ursabot commented Dec 28, 2022

Benchmark runs are scheduled for baseline = 760f108 and contender = cd4fd80. cd4fd80 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for executing infinite files
4 participants