-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
parallel csv scan #6801
parallel csv scan #6801
Conversation
Thank you @2010YOUY01 -- I will review this over the coming days. This looks awesoem |
datafusion/core/Cargo.toml
Outdated
@@ -84,6 +84,7 @@ parquet = { workspace = true } | |||
percent-encoding = "2.2.0" | |||
pin-project-lite = "^0.2.7" | |||
rand = "0.8" | |||
regex = "1.5.4" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is only used for tests, I think it can be put in the dev-dependencies
section
I started running some tests with this branch. I will continue tomorrow |
This is on my list to review, but I might not have time for it until later in the week (Wednesday) -- I am somewhat occupied with #6800 at the moment |
Hi @2010YOUY01 -- I am having trouble reproducing the benchmark results you reported ResultsMaster:
This PR branch:
(I also merged up your branch from master and it still had the same performance) Methodology:I tested this branch out using the TPCH SF1 (6M rows, 725 MB) lineitem CSV file (created with (arrow_dev) alamb@MacBook-Pro-8:~$ du -h /Users/alamb/Software/arrow-datafusion/benchmarks/data/lineitem.tbl
725M /Users/alamb/Software/arrow-datafusion/benchmarks/data/lineitem.tbl
(arrow_dev) alamb@MacBook-Pro-8:~$ wc -l /Users/alamb/Software/arrow-datafusion/benchmarks/data/lineitem.tbl
6001215 /Users/alamb/Software/arrow-datafusion/benchmarks/data/lineitem.tbl And used CREATE EXTERNAL TABLE lineitem (
l_orderkey BIGINT,
l_partkey BIGINT,
l_suppkey BIGINT,
l_linenumber INTEGER,
l_quantity DECIMAL(15, 2),
l_extendedprice DECIMAL(15, 2),
l_discount DECIMAL(15, 2),
l_tax DECIMAL(15, 2),
l_returnflag VARCHAR,
l_linestatus VARCHAR,
l_shipdate DATE,
l_commitdate DATE,
l_receiptdate DATE,
l_shipinstruct VARCHAR,
l_shipmode VARCHAR,
l_comment VARCHAR,
l_rev VARCHAR,
) STORED AS CSV DELIMITER '|' LOCATION '/Users/alamb/Software/arrow-datafusion/benchmarks/data/lineitem.tbl';
--- Run a query that scans the entire CSV
select count(*) from lineitem where l_quantity < 10;
+-----------------+
| COUNT(UInt8(1)) |
+-----------------+
| 1079240 |
+-----------------+ |
let current_partition_size: usize = 0; | ||
|
||
// Partition byte range evenly for all `PartitionedFile`s | ||
let repartitioned_files = flattened_files |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I was thinking about for the range based approach is that it isn't likely to work for streaming compressed files (as you need to decompress the data linearly)
I wonder if you have considered that 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. For now, the physical optimizer will check that, and won't get compressed CSV files repartitioned.
@alamb Thank you for the feedback!
alternative
Main branch:
|
Thanks @2010YOUY01 -- I am starting to look at this again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @2010YOUY01 -- I tried this out again and it does indeed go (much!) faster -- 3x faster in my initial testing. 👏
I also reviewed the code, and I found it very easy to read, well structured and well tested. Thank you so much for this and I am sorry for the delay.
I have some thoughts on how to improve the byte range calculations to avoid the number of object store requests. I think this could be done as a follow on PR as well.
My proposal is to:
- Review the comments on this PR and consider if you want to make any more changes
- Merge this PR
- File tickets for any remaining items
- File tickets for parallel reading of compressed CSV / JSON files (I will do this)
- File ticket for parallel reading of JSON files
Testing Results
This branch now goes about 3x faster (3.9s vs 13.4s) on my test:
$ datafusion-cli -f /tmp/test.sql
DataFusion CLI v27.0.0
0 rows in set. Query took 0.001 seconds.
+-----------------+
| COUNT(UInt8(1)) |
+-----------------+
| 10796819 |
+-----------------+
1 row in set. Query took 3.961 seconds.
main:
$ ~/Software/target-df/release/datafusion-cli -f /tmp/test.sql
DataFusion CLI v27.0.0
0 rows in set. Query took 0.001 seconds.
+-----------------+
| COUNT(UInt8(1)) |
+-----------------+
| 10796819 |
+-----------------+
1 row in set. Query took 13.364 seconds.
Here is the queryL:
$ cat /tmp/test.sql
CREATE EXTERNAL TABLE lineitem (
l_orderkey BIGINT,
l_partkey BIGINT,
l_suppkey BIGINT,
l_linenumber INTEGER,
l_quantity DECIMAL(15, 2),
l_extendedprice DECIMAL(15, 2),
l_discount DECIMAL(15, 2),
l_tax DECIMAL(15, 2),
l_returnflag VARCHAR,
l_linestatus VARCHAR,
l_shipdate DATE,
l_commitdate DATE,
l_receiptdate DATE,
l_shipinstruct VARCHAR,
l_shipmode VARCHAR,
l_comment VARCHAR,
l_rev VARCHAR,
) STORED AS CSV DELIMITER '|' LOCATION '/Users/alamb/Software/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem.tbl';
--- Run a query that scans the entire CSV
select count(*) from lineitem where l_quantity < 10;
Ok(()) | ||
} | ||
|
||
/// Parappel scan on a csv file with only 1 byte in each line |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Parappel scan on a csv file with only 1 byte in each line | |
/// Parallel scan on a csv file with only 1 byte in each line |
Ok(()) | ||
} | ||
|
||
/// Parappel scan on a csv file with 2 wide rows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Parappel scan on a csv file with 2 wide rows | |
/// Parallel scan on a csv file with 2 wide rows |
if has_ranges { | ||
return self.clone(); | ||
} | ||
let repartitioned_file_groups_option = FileScanConfig::repartition_file_groups( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 for this refactor
|
||
// Repartition when there is a empty file in file groups | ||
#[tokio::test] | ||
async fn parquet_exec_repartition_empty_files() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe as a follow on the tests for file repartitioning could be moved to datafusion/core/src/datasource/physical_plan/mod.rs to be in the same module they are testing (as they are now no longer specific to parquet)
--------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | ||
----------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl]]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], has_header=false | ||
|
||
--------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/core/tests/sqllogictests/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], has_header=false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
if repartition_file_scans && !csv_exec.file_compression_type.is_compressed() { | ||
let repartitioned_exec_option = | ||
csv_exec.get_repartitioned(target_partitions, repartition_file_min_size); | ||
if let Some(repartitioned_exec) = repartitioned_exec_option { | ||
return Ok(Transformed::Yes(Arc::new(repartitioned_exec))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend moving the check for compression csv_exec.file_compression_type.is_compressed()
into CsvExec::get_repartitioned()
to keep the logic for handling CSV files together. I don't think this is required however.
if repartition_file_scans && !csv_exec.file_compression_type.is_compressed() { | |
let repartitioned_exec_option = | |
csv_exec.get_repartitioned(target_partitions, repartition_file_min_size); | |
if let Some(repartitioned_exec) = repartitioned_exec_option { | |
return Ok(Transformed::Yes(Arc::new(repartitioned_exec))); | |
} | |
if repartition_file_scans { | |
let repartitioned_exec_option = | |
csv_exec.get_repartitioned(target_partitions, repartition_file_min_size); | |
if let Some(repartitioned_exec) = repartitioned_exec_option { | |
return Ok(Transformed::Yes(Arc::new(repartitioned_exec))); | |
} |
/// If `file_meta.range` is `Some(FileRange {start, end})`, this signifies that the partition | ||
/// corresponds to the byte range [start, end) within the file. | ||
/// | ||
/// Note: `start` or `end` might be in the middle of some lines. In such cases, the following rules |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand it, this code does potentially several object store requests to adjust the initial ranges based on where the end of the CSV lines actually fall:
Initial situation
CSV data with the next newlines (\n) after range marked
┌─────────────┬──┬────────────────────┬──┬────────────────┬──┬────────┬──┬────────┐
│ ... │\n│ ... │\n│ ... │\n│ ... │\n│ ... │
└─────────────┴──┴────────────────────┴──┴────────────────┴──┴────────┴──┴────────┘
▲ ▲ ▲ ▲
└─────────────┬──────┴─────────────────────┴───────────────┘
│
│
Initial file_meta.range es
This PR
This PR: adjust the ranges prior to IO start, via
object store operations
┌ ─ ─ ─ ─ ─ ─ ─ ─┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
┌─────────────┬──┼────────────────────┬──┬────────────────┬──┬────────┬──┬────────┤
│ ... │\n│ ... │\n│ ... │\n│ ... │\n│ ... │
└─────────────┴──┼────────────────────┴──┴────────────────┴──┴────────┴──┴────────┤
│ │ │ │
─ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
Partition 0 Partition 1 Partition 2 Partition 3
Read Read Read Read
This design has the nice property that each partition reads exactly the bytes it needs
This design has the downside that it requires several object store reads to find the newlines, and the overhead of each object store operation often is much larger than the overhead of reading extra data.
Alternate idea
Another approach that would reduce the number of object store requests would be to read past the initial range and stop at the next newline \n
like this:
Each partition reads *more* than its assigned ranged to find
the trailing new line, and ignores everything afterwards
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
┌─────────────┬──┬───────┼────────────┬──┬────────────────┬──┬────────┬──┬────────┐
│ ... │\n│ ... │\n│ ... │\n│ ... │\n│ ... │
└─────────────┴──┴───────┼────────────┴──┴────────────────┴──┴────────┴──┴────────┘
│
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
Partition 0
Read
I think the tricky bit of this design would be to ensure enough extra data was read. Initially, maybe we could just pick something sufficiently large for most files, like 1MB and error if the next newline can't be found. As a follow on we could add some fanciness like make another object store request if necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These text charts look awesome 😮 How did you draw that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use https://monodraw.helftone.com/ to make the ASCII art -- I haven't found a free alternative yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternate idea
Another approach that would reduce the number of object store requests would be to read past the initial range and stop at the next newline
\n
like this:Each partition reads *more* than its assigned ranged to find the trailing new line, and ignores everything afterwards ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌─────────────┬──┬───────┼────────────┬──┬────────────────┬──┬────────┬──┬────────┐ │ ... │\n│ ... │\n│ ... │\n│ ... │\n│ ... │ └─────────────┴──┴───────┼────────────┴──┴────────────────┴──┴────────┴──┴────────┘ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ Partition 0 Read
I think the tricky bit of this design would be to ensure enough extra data was read. Initially, maybe we could just pick something sufficiently large for most files, like 1MB and error if the next newline can't be found. As a follow on we could add some fanciness like make another object store request if necessary.
@alamb
...working on #8502; I noticed the same thing.
So I did some testing.
I issued just a single GetRequest to the object store with an extended end_range (factor 1.2); then looped over the result byte stream and computed the start and end offset in a single pass.
I benchmarked it against 60mil rows of NDJSON - with no difference compared to the other approach. I could not gain any performance. Maybe my implementation is to naive? Or the overhead for the object store requests is to small when working with the local Filesystem?
let result = store.get_opts(location, options).await?;
let mut result_stream = result.into_stream();
let mut index = 0;
let mut buffer = Bytes::new();
let mut start_delta = 0;
let mut end_delta = 0;
'outer: loop {
if buffer.is_empty() {
match result_stream.next().await.transpose()? {
Some(bytes) => buffer = bytes,
None => break,
}
}
for byte in &buffer {
if *byte == b'\n' {
if start != 0 && start_delta == 0 {
start_delta = index;
if end == file_size {
break 'outer;
}
}
if start + index > end {
end_delta = index;
break 'outer;
}
}
index += 1;
}
buffer.clear();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I benchmarked it against 60mil rows of NDJSON - with no difference compared to the other approach. I could not gain any performance. Maybe my implementation is to naive? Or the overhead for the object store requests is to small when working with the local Filesystem?
Yes, I think this is the fundamental observation -- multiple requests to an actual remote object store is quite costly (like 10s of ms minimum) and fetching anything less than several MB in one request is likely less efficient than a single large request.
There is more about object storage in Exploiting Cloud Object Storage for High-Performance Analytics : VLDB 2023 paper about running high performance analytics in object stores.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb
Thanks for linking the paper.
So I'll guess the alternate approach is still worth pursuing?
I would proceed by testing the POC against a remote store and if this looks promising - I'd create a separate issue to discuss and refine the approach further?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like a good plan to me
Thank you for the review ❤️ I will update this PR as per the comments next week |
memo for myself: update comments/docs in configurations |
Previous review comments should be all addressed in recent commits, it's ready for review now. |
Thanks @2010YOUY01 -- I took the liberty of merging up from main to resolve a conflict on this branch and I plan to merge it when the CI has passed |
Thanks again @2010YOUY01 |
* parallel csv scan * add max line length * Update according to review comments * Update Configuration doc --------- Co-authored-by: Andrew Lamb <[email protected]>
* Vectorized hash grouping * Prepare for merge to main * Improve comments and update size calculations * Implement test for accumulate_boolean refactor * Use resize instead of resize_with * fix avg size calculation * Simplify sum accumulator * Add comments explaining i64 as counts * Clarify `aggreate_arguments` * Apply suggestions from code review Co-authored-by: Mustafa Akur <[email protected]> * Clarify rationale for ScratchSpace being a field * use slice syntax * Update datafusion/physical-expr/src/aggregate/average.rs Co-authored-by: Mustafa Akur <[email protected]> * Update datafusion/physical-expr/src/aggregate/count.rs Co-authored-by: Mustafa Akur <[email protected]> * Update datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs Co-authored-by: Mustafa Akur <[email protected]> * fix diagram * Update datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs Co-authored-by: Mustafa Akur <[email protected]> * simplify the supported logic * Add a log message when using slow adapter * fmt * Revert "chore(deps): update bigdecimal requirement from 0.3.0 to 0.4.0 (#6848)" (#6896) This reverts commit d0def42. * Make FileScanConfig::project pub (#6931) Co-authored-by: Daniël Heres <[email protected]> * feat: add round trip test of physical plan in tpch unit tests (#6918) * Use thiserror to implement the From trait for DFSqlLogicTestError (#6924) * parallel csv scan (#6801) * parallel csv scan * add max line length * Update according to review comments * Update Configuration doc --------- Co-authored-by: Andrew Lamb <[email protected]> * Add additional test coverage for aggregaes using dates/times/timestamps/decimals (#6939) * Add additional test coverage for aggregaes using dates/times/timestamps/decimals * Add coverage for date32/date64 * Support timestamp types for min/max * Fix aggregate nullability calculation --------- Co-authored-by: Mustafa Akur <[email protected]> Co-authored-by: Daniël Heres <[email protected]> Co-authored-by: Daniël Heres <[email protected]> Co-authored-by: r.4ntix <[email protected]> Co-authored-by: Jonah Gao <[email protected]> Co-authored-by: Yongting You <[email protected]>
Which issue does this PR close?
Closes #6325.
Rationale for this change
There are 3 steps to scan a CSV file in parallel:
CsvExec: file_groups={1 group: [[a.csv:0..100]]}
PhysicalOptimizerRule
(Repartition
) will decide if theCsvExec
node can be parallelized, it won't be parallelized if the parent node requires certain order or have some other conditions.Repartition
, and store partitions inCsvExec.base_config.file_groups
, also this step doesn't care about how to separate lines correctly (partitions may contain half lines)CsvExec: file_groups={2 groups: [[a.csv:0..50], [a:50..100]]}
[a.csv:0..50]
), and deal with partition boundaries[a.csv:0..50] -> Lines:1..5
).What changes are included in this PR?
Step 1
The parallel parquet scan PR has already done it #5057, for parallel CSV scan, it only added a rule to not repartition if the CSV file is compressed
Testing - Added
CsvExec
case along with the existing tests for optimizing ruleRepartition
onParquetExec
Step 2
Parallel Parquet PR also has done this but inside
ParquetExec
, now the repartition byte range logic is refactored to somewhere else to letCsvExec
reuse it.Testing - Unit test for partitioning byte range has also been well covered by the parallel Parquet PR https://github.com/apache/arrow-datafusion/blob/e91af991c5ae4a6b4afab2cb1b0c9307a69e4046/datafusion/core/src/datasource/physical_plan/parquet.rs#L1924-L2128
Step 3
The logic for handling line boundaries correctly is done in
CsvOpener
, the rule used is exactly the same as #6325 mentioned. Unlike parallel Parquet scan, which can use metadata to decide which row groups to read according to the approximate byte range, CSV scan needs to inspect the file content around partition boundaries to determine the byte range of lines in this partition. In the implementation, it finds the offset to the first newline encountered from the approximate byte partition start or end, calculates the byte range for complete lines of this partition, and reads that actual byte range from the object store.Testing - Added some integration tests in
file_format/csv.rs
(also manually run these tests on S3). For more complex query tests, the existing TPCH correctness test under sqllogictest is scanned on CSV files with 4 partitions.Some issue:
target_partitions
requests on cloud stores(billed by number of requests) might be a problem. This may be improved by some follow on PR, but I haven't come up with a good solution yet, arrow CSV reader now only can handle valid CSV files. Maybe add a wrapper on the byte stream fetched from the object store, and let it skip the first line and read until the first newline after the range length?Are these changes tested?
See above section.
A very simple benchmark: scan TPC-H lineitem table (SF1) with very selective predicate (
select * from lineitem where column_1 = 1;
). 6 is # of physical cores for my machineAre there any user-facing changes?
No