Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream CSV file during schema inference #4661

Merged
merged 7 commits into from
Dec 23, 2022

Conversation

Jefffrey
Copy link
Contributor

@Jefffrey Jefffrey commented Dec 18, 2022

Which issue does this PR close?

Closes #3658.

Rationale for this change

What changes are included in this PR?

During CSV schema inference, instead of reading entire object/file into memory before taking as many rows as configured to infer the schema, will instead stream in the CSV (using helper method from object_store) which feeds into newline_delimited_stream to produce the stream of CSV rows, which can be read as needed instead of needing entire file in memory.

Certain adjustments needed for this:

  • Only read header for first chunk (if header present)
  • As inferring chunks of the file separately, must merge the inferred datatypes of these chunks together akin to what arrow::csv::reader::infer_reader_schema(...) already does
  • If has header row, and in first chunk only the header row was read (i.e. no data rows) then must ignore the datatype (as will default to Utf8 which can cause issues with merging the schemas of the chunks)

Are these changes tested?

New test using mocked ObjectStore providing an infinite stream, used to prove inference doesn't depend on consuming entire stream first.

Are there any user-facing changes?

@github-actions github-actions bot added the core Core DataFusion crate label Dec 18, 2022
@Jefffrey
Copy link
Contributor Author

Bit of ugly duplication with code ripped from arrow-rs (mainly the build_schema_helper(...) method), so maybe could upstream some of this into arrow-rs instead?

Copy link
Contributor

@metesynnada metesynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the work. In general, the idea is quite cool and it removes a redundant whole file materialising.

However, I think you should find a better way to deal with the first chunk of the byte stream. It seems that there is a code duplication that can be handled better.

// first chunk may have header, initialize names & types vec
// as use header names, and types vec is used to record all inferred types across chunks
let (column_names, mut column_types): (Vec<_>, Vec<_>) =
if let Some(data) = stream.next().await.transpose()? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first chunk does not need to be handled outside of the stream consumer below. I understand what is different from other chunks, but it can be handled with using a state. Merging could reduce the code change significantly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback, I was initially back and forth on how to handle this special case. I'll give a shot at refactoring to reduce the duplication here

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Jefffrey -- thank you for this contribution

Is there any way to test this code ? Perhaps with a mocked out object store and verify that only the first file is fetched?

Also, I can't help but wonder if we could implement the logic using future::Stream more efficiently , but I haven't studied the logic in detail to really know for sure

@Jefffrey
Copy link
Contributor Author

Jefffrey commented Dec 23, 2022

Hi @Jefffrey -- thank you for this contribution

Is there any way to test this code ? Perhaps with a mocked out object store and verify that only the first file is fetched?

Also, I can't help but wonder if we could implement the logic using future::Stream more efficiently , but I haven't studied the logic in detail to really know for sure

Hi @alamb, I've pushed a test which uses a mocked ObjectStore which returns an infinite stream of bytes, with the test proving that by finishing it won't try to consume the entire stream first before inferring (which was the previous behaviour)

Could you elaborate on the future::Stream part?

Edit: though on second thought, maybe this test is a bad design since 'failure' kinda means letting it run infinitely... might need to rethink the test case

Edit2: I've modified the test case to instead have the mocked object store keep track of how many repeats were called (and is terminated at some configured value), to ensure CSV inference only reads as many rows as was configured, and that the test case doesn't rely on an infinite loop for its failure case

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @Jefffrey -- the test looks great

Could you elaborate on the future::Stream part?

I guess I was thinking rather than using two nested loops and break 'iterating _objects it would be possible to use future::Streams to do the iteration.

I tried a few ways and nothing really looked all that much better.

Ok(GetResult::Stream(
futures::stream::repeat_with(move || {
let arc_inner = arc.clone();
*arc_inner.lock().unwrap() += 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

actual_fields
);
// ensuring on csv infer that it won't try to read entire file
// should only read as many rows as was configured in the CsvFormat
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome

@alamb alamb merged commit 720bdb0 into apache:master Dec 23, 2022
@alamb
Copy link
Contributor

alamb commented Dec 23, 2022

Here is a follow on PR to reduce the nesting: #4717

@ursabot
Copy link

ursabot commented Dec 23, 2022

Benchmark runs are scheduled for baseline = 6a4e0df and contender = 720bdb0. 720bdb0 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@Jefffrey Jefffrey deleted the 3658_stream_csv_infer branch December 23, 2022 20:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

CSV inference reads in the whole file to memory, regardless of row limit
4 participants