diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 2f7a6012f228..078facd6a1d6 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -84,7 +84,6 @@ parquet = { workspace = true } percent-encoding = "2.2.0" pin-project-lite = "^0.2.7" rand = "0.8" -regex = "1.5.4" smallvec = { version = "1.6", features = ["union"] } sqlparser = { workspace = true } tempfile = "3" @@ -107,6 +106,7 @@ env_logger = "0.10" half = "2.2.1" postgres-protocol = "0.6.4" postgres-types = { version = "0.2.4", features = ["derive", "with-chrono-0_4"] } +regex = "1.5.4" rstest = "0.17.0" rust_decimal = { version = "1.27.0", features = ["tokio-pg"] } sqllogictest = "0.14.0" diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index a3021b954c73..b284079ec6e4 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -1202,7 +1202,7 @@ mod tests { Ok(()) } - /// Parappel scan on a csv file with only 1 byte in each line + /// Parallel scan on a csv file with only 1 byte in each line /// Testing partition byte range land on line boundaries /// /// one_col.csv: @@ -1254,7 +1254,7 @@ mod tests { Ok(()) } - /// Parappel scan on a csv file with 2 wide rows + /// Parallel scan on a csv file with 2 wide rows /// The byte range of a partition might be within some line /// /// wode_rows.csv: diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 4283b50e93ca..e7c7ec21687c 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -106,11 +106,18 @@ impl CsvExec { /// Redistribute files across partitions according to their size /// See comments on `repartition_file_groups()` for more detail. + /// + /// Return `None` if can't get repartitioned(empty/compressed file). pub fn get_repartitioned( &self, target_partitions: usize, repartition_file_min_size: usize, ) -> Option { + // Parallel execution on compressed CSV file is not supported yet. + if self.file_compression_type.is_compressed() { + return None; + } + let repartitioned_file_groups_option = FileScanConfig::repartition_file_groups( self.base_config.file_groups.clone(), target_partitions, diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index cde29a1e4483..0d799f44c0b6 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -1555,4 +1555,346 @@ mod tests { extensions: None, } } + + /// Unit tests for `repartition_file_groups()` + mod repartition_file_groups_test { + use super::*; + + /// Empty file won't get partitioned + #[tokio::test] + async fn repartition_empty_file_only() { + let partitioned_file_empty = PartitionedFile::new("empty".to_string(), 0); + let file_group = vec![vec![partitioned_file_empty]]; + + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: file_group, + file_schema: Arc::new(Schema::empty()), + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + infinite_source: false, + }, + None, + None, + ); + + let partitioned_file = parquet_exec + .get_repartitioned(4, 0) + .base_config() + .file_groups + .clone(); + + assert!(partitioned_file[0][0].range.is_none()); + } + + // Repartition when there is a empty file in file groups + #[tokio::test] + async fn repartition_empty_files() { + let partitioned_file_a = PartitionedFile::new("a".to_string(), 10); + let partitioned_file_b = PartitionedFile::new("b".to_string(), 10); + let partitioned_file_empty = PartitionedFile::new("empty".to_string(), 0); + + let empty_first = vec![ + vec![partitioned_file_empty.clone()], + vec![partitioned_file_a.clone()], + vec![partitioned_file_b.clone()], + ]; + let empty_middle = vec![ + vec![partitioned_file_a.clone()], + vec![partitioned_file_empty.clone()], + vec![partitioned_file_b.clone()], + ]; + let empty_last = vec![ + vec![partitioned_file_a], + vec![partitioned_file_b], + vec![partitioned_file_empty], + ]; + + // Repartition file groups into x partitions + let expected_2 = + vec![(0, "a".to_string(), 0, 10), (1, "b".to_string(), 0, 10)]; + let expected_3 = vec![ + (0, "a".to_string(), 0, 7), + (1, "a".to_string(), 7, 10), + (1, "b".to_string(), 0, 4), + (2, "b".to_string(), 4, 10), + ]; + + //let file_groups_testset = [empty_first, empty_middle, empty_last]; + let file_groups_testset = [empty_first, empty_middle, empty_last]; + + for fg in file_groups_testset { + for (n_partition, expected) in [(2, &expected_2), (3, &expected_3)] { + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: fg.clone(), + file_schema: Arc::new(Schema::empty()), + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + infinite_source: false, + }, + None, + None, + ); + + let actual = file_groups_to_vec( + parquet_exec + .get_repartitioned(n_partition, 10) + .base_config() + .file_groups + .clone(), + ); + + assert_eq!(expected, &actual); + } + } + } + + #[tokio::test] + async fn repartition_single_file() { + // Single file, single partition into multiple partitions + let partitioned_file = PartitionedFile::new("a".to_string(), 123); + let single_partition = vec![vec![partitioned_file]]; + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: single_partition, + file_schema: Arc::new(Schema::empty()), + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + infinite_source: false, + }, + None, + None, + ); + + let actual = file_groups_to_vec( + parquet_exec + .get_repartitioned(4, 10) + .base_config() + .file_groups + .clone(), + ); + let expected = vec![ + (0, "a".to_string(), 0, 31), + (1, "a".to_string(), 31, 62), + (2, "a".to_string(), 62, 93), + (3, "a".to_string(), 93, 123), + ]; + assert_eq!(expected, actual); + } + + #[tokio::test] + async fn repartition_too_much_partitions() { + // Single file, single parittion into 96 partitions + let partitioned_file = PartitionedFile::new("a".to_string(), 8); + let single_partition = vec![vec![partitioned_file]]; + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: single_partition, + file_schema: Arc::new(Schema::empty()), + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + infinite_source: false, + }, + None, + None, + ); + + let actual = file_groups_to_vec( + parquet_exec + .get_repartitioned(96, 5) + .base_config() + .file_groups + .clone(), + ); + let expected = vec![ + (0, "a".to_string(), 0, 1), + (1, "a".to_string(), 1, 2), + (2, "a".to_string(), 2, 3), + (3, "a".to_string(), 3, 4), + (4, "a".to_string(), 4, 5), + (5, "a".to_string(), 5, 6), + (6, "a".to_string(), 6, 7), + (7, "a".to_string(), 7, 8), + ]; + assert_eq!(expected, actual); + } + + #[tokio::test] + async fn repartition_multiple_partitions() { + // Multiple files in single partition after redistribution + let partitioned_file_1 = PartitionedFile::new("a".to_string(), 40); + let partitioned_file_2 = PartitionedFile::new("b".to_string(), 60); + let source_partitions = + vec![vec![partitioned_file_1], vec![partitioned_file_2]]; + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: source_partitions, + file_schema: Arc::new(Schema::empty()), + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + infinite_source: false, + }, + None, + None, + ); + + let actual = file_groups_to_vec( + parquet_exec + .get_repartitioned(3, 10) + .base_config() + .file_groups + .clone(), + ); + let expected = vec![ + (0, "a".to_string(), 0, 34), + (1, "a".to_string(), 34, 40), + (1, "b".to_string(), 0, 28), + (2, "b".to_string(), 28, 60), + ]; + assert_eq!(expected, actual); + } + + #[tokio::test] + async fn repartition_same_num_partitions() { + // "Rebalance" files across partitions + let partitioned_file_1 = PartitionedFile::new("a".to_string(), 40); + let partitioned_file_2 = PartitionedFile::new("b".to_string(), 60); + let source_partitions = + vec![vec![partitioned_file_1], vec![partitioned_file_2]]; + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: source_partitions, + file_schema: Arc::new(Schema::empty()), + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + infinite_source: false, + }, + None, + None, + ); + + let actual = file_groups_to_vec( + parquet_exec + .get_repartitioned(2, 10) + .base_config() + .file_groups + .clone(), + ); + let expected = vec![ + (0, "a".to_string(), 0, 40), + (0, "b".to_string(), 0, 10), + (1, "b".to_string(), 10, 60), + ]; + assert_eq!(expected, actual); + } + + #[tokio::test] + async fn repartition_no_action_ranges() { + // No action due to Some(range) in second file + let partitioned_file_1 = PartitionedFile::new("a".to_string(), 123); + let mut partitioned_file_2 = PartitionedFile::new("b".to_string(), 144); + partitioned_file_2.range = Some(FileRange { start: 1, end: 50 }); + + let source_partitions = + vec![vec![partitioned_file_1], vec![partitioned_file_2]]; + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: source_partitions, + file_schema: Arc::new(Schema::empty()), + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + infinite_source: false, + }, + None, + None, + ); + + let actual = parquet_exec + .get_repartitioned(65, 10) + .base_config() + .file_groups + .clone(); + assert_eq!(2, actual.len()); + } + + #[tokio::test] + async fn repartition_no_action_min_size() { + // No action due to target_partition_size + let partitioned_file = PartitionedFile::new("a".to_string(), 123); + let single_partition = vec![vec![partitioned_file]]; + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: single_partition, + file_schema: Arc::new(Schema::empty()), + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + infinite_source: false, + }, + None, + None, + ); + + let actual = parquet_exec + .get_repartitioned(65, 500) + .base_config() + .file_groups + .clone(); + assert_eq!(1, actual.len()); + } + + fn file_groups_to_vec( + file_groups: Vec>, + ) -> Vec<(usize, String, i64, i64)> { + file_groups + .iter() + .enumerate() + .flat_map(|(part_idx, files)| { + files + .iter() + .map(|f| { + ( + part_idx, + f.object_meta.location.to_string(), + f.range.as_ref().unwrap().start, + f.range.as_ref().unwrap().end, + ) + }) + .collect_vec() + }) + .collect_vec() + } + } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index a603f1a6ae9d..66fb12a9b8b7 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -1861,339 +1861,6 @@ mod tests { assert_eq!(predicate.unwrap().to_string(), filter_phys.to_string()); } - /// Empty file won't get partitioned - #[tokio::test] - async fn parquet_exec_repartition_empty_file_only() { - let partitioned_file_empty = PartitionedFile::new("empty".to_string(), 0); - let file_group = vec![vec![partitioned_file_empty]]; - - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: file_group, - file_schema: Arc::new(Schema::empty()), - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - infinite_source: false, - }, - None, - None, - ); - - let partitioned_file = parquet_exec - .get_repartitioned(4, 0) - .base_config() - .file_groups - .clone(); - - assert!(partitioned_file[0][0].range.is_none()); - } - - // Repartition when there is a empty file in file groups - #[tokio::test] - async fn parquet_exec_repartition_empty_files() { - let partitioned_file_a = PartitionedFile::new("a".to_string(), 10); - let partitioned_file_b = PartitionedFile::new("b".to_string(), 10); - let partitioned_file_empty = PartitionedFile::new("empty".to_string(), 0); - - let empty_first = vec![ - vec![partitioned_file_empty.clone()], - vec![partitioned_file_a.clone()], - vec![partitioned_file_b.clone()], - ]; - let empty_middle = vec![ - vec![partitioned_file_a.clone()], - vec![partitioned_file_empty.clone()], - vec![partitioned_file_b.clone()], - ]; - let empty_last = vec![ - vec![partitioned_file_a], - vec![partitioned_file_b], - vec![partitioned_file_empty], - ]; - - // Repartition file groups into x partitions - let expected_2 = vec![(0, "a".to_string(), 0, 10), (1, "b".to_string(), 0, 10)]; - let expected_3 = vec![ - (0, "a".to_string(), 0, 7), - (1, "a".to_string(), 7, 10), - (1, "b".to_string(), 0, 4), - (2, "b".to_string(), 4, 10), - ]; - - //let file_groups_testset = [empty_first, empty_middle, empty_last]; - let file_groups_testset = [empty_first, empty_middle, empty_last]; - - for fg in file_groups_testset { - for (n_partition, expected) in [(2, &expected_2), (3, &expected_3)] { - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: fg.clone(), - file_schema: Arc::new(Schema::empty()), - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - infinite_source: false, - }, - None, - None, - ); - - let actual = file_groups_to_vec( - parquet_exec - .get_repartitioned(n_partition, 10) - .base_config() - .file_groups - .clone(), - ); - - assert_eq!(expected, &actual); - } - } - } - - #[tokio::test] - async fn parquet_exec_repartition_single_file() { - // Single file, single partition into multiple partitions - let partitioned_file = PartitionedFile::new("a".to_string(), 123); - let single_partition = vec![vec![partitioned_file]]; - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: single_partition, - file_schema: Arc::new(Schema::empty()), - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - infinite_source: false, - }, - None, - None, - ); - - let actual = file_groups_to_vec( - parquet_exec - .get_repartitioned(4, 10) - .base_config() - .file_groups - .clone(), - ); - let expected = vec![ - (0, "a".to_string(), 0, 31), - (1, "a".to_string(), 31, 62), - (2, "a".to_string(), 62, 93), - (3, "a".to_string(), 93, 123), - ]; - assert_eq!(expected, actual); - } - - #[tokio::test] - async fn parquet_exec_repartition_too_much_partitions() { - // Single file, single parittion into 96 partitions - let partitioned_file = PartitionedFile::new("a".to_string(), 8); - let single_partition = vec![vec![partitioned_file]]; - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: single_partition, - file_schema: Arc::new(Schema::empty()), - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - infinite_source: false, - }, - None, - None, - ); - - let actual = file_groups_to_vec( - parquet_exec - .get_repartitioned(96, 5) - .base_config() - .file_groups - .clone(), - ); - let expected = vec![ - (0, "a".to_string(), 0, 1), - (1, "a".to_string(), 1, 2), - (2, "a".to_string(), 2, 3), - (3, "a".to_string(), 3, 4), - (4, "a".to_string(), 4, 5), - (5, "a".to_string(), 5, 6), - (6, "a".to_string(), 6, 7), - (7, "a".to_string(), 7, 8), - ]; - assert_eq!(expected, actual); - } - - #[tokio::test] - async fn parquet_exec_repartition_multiple_partitions() { - // Multiple files in single partition after redistribution - let partitioned_file_1 = PartitionedFile::new("a".to_string(), 40); - let partitioned_file_2 = PartitionedFile::new("b".to_string(), 60); - let source_partitions = vec![vec![partitioned_file_1], vec![partitioned_file_2]]; - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: source_partitions, - file_schema: Arc::new(Schema::empty()), - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - infinite_source: false, - }, - None, - None, - ); - - let actual = file_groups_to_vec( - parquet_exec - .get_repartitioned(3, 10) - .base_config() - .file_groups - .clone(), - ); - let expected = vec![ - (0, "a".to_string(), 0, 34), - (1, "a".to_string(), 34, 40), - (1, "b".to_string(), 0, 28), - (2, "b".to_string(), 28, 60), - ]; - assert_eq!(expected, actual); - } - - #[tokio::test] - async fn parquet_exec_repartition_same_num_partitions() { - // "Rebalance" files across partitions - let partitioned_file_1 = PartitionedFile::new("a".to_string(), 40); - let partitioned_file_2 = PartitionedFile::new("b".to_string(), 60); - let source_partitions = vec![vec![partitioned_file_1], vec![partitioned_file_2]]; - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: source_partitions, - file_schema: Arc::new(Schema::empty()), - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - infinite_source: false, - }, - None, - None, - ); - - let actual = file_groups_to_vec( - parquet_exec - .get_repartitioned(2, 10) - .base_config() - .file_groups - .clone(), - ); - let expected = vec![ - (0, "a".to_string(), 0, 40), - (0, "b".to_string(), 0, 10), - (1, "b".to_string(), 10, 60), - ]; - assert_eq!(expected, actual); - } - - #[tokio::test] - async fn parquet_exec_repartition_no_action_ranges() { - // No action due to Some(range) in second file - let partitioned_file_1 = PartitionedFile::new("a".to_string(), 123); - let mut partitioned_file_2 = PartitionedFile::new("b".to_string(), 144); - partitioned_file_2.range = Some(FileRange { start: 1, end: 50 }); - - let source_partitions = vec![vec![partitioned_file_1], vec![partitioned_file_2]]; - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: source_partitions, - file_schema: Arc::new(Schema::empty()), - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - infinite_source: false, - }, - None, - None, - ); - - let actual = parquet_exec - .get_repartitioned(65, 10) - .base_config() - .file_groups - .clone(); - assert_eq!(2, actual.len()); - } - - #[tokio::test] - async fn parquet_exec_repartition_no_action_min_size() { - // No action due to target_partition_size - let partitioned_file = PartitionedFile::new("a".to_string(), 123); - let single_partition = vec![vec![partitioned_file]]; - let parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: single_partition, - file_schema: Arc::new(Schema::empty()), - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - infinite_source: false, - }, - None, - None, - ); - - let actual = parquet_exec - .get_repartitioned(65, 500) - .base_config() - .file_groups - .clone(); - assert_eq!(1, actual.len()); - } - - fn file_groups_to_vec( - file_groups: Vec>, - ) -> Vec<(usize, String, i64, i64)> { - file_groups - .iter() - .enumerate() - .flat_map(|(part_idx, files)| { - files - .iter() - .map(|f| { - ( - part_idx, - f.object_meta.location.to_string(), - f.range.as_ref().unwrap().start, - f.range.as_ref().unwrap().end, - ) - }) - .collect_vec() - }) - .collect_vec() - } - /// returns the sum of all the metrics with the specified name /// the returned set. /// diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index d1079af29ae2..c5f30310784b 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -258,8 +258,7 @@ fn optimize_partitions( } if let Some(csv_exec) = new_plan.as_any().downcast_ref::() { - // The underlying CsvOpener will only fetch certain part of csv file from the object store, which can't be decompressed separately - if repartition_file_scans && !csv_exec.file_compression_type.is_compressed() { + if repartition_file_scans { let repartitioned_exec_option = csv_exec.get_repartitioned(target_partitions, repartition_file_min_size); if let Some(repartitioned_exec) = repartitioned_exec_option {