Skip to content

Commit

Permalink
Update according to review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
2010YOUY01 committed Jul 11, 2023
1 parent 329e5e8 commit 947d71a
Show file tree
Hide file tree
Showing 6 changed files with 353 additions and 338 deletions.
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ parquet = { workspace = true }
percent-encoding = "2.2.0"
pin-project-lite = "^0.2.7"
rand = "0.8"
regex = "1.5.4"
smallvec = { version = "1.6", features = ["union"] }
sqlparser = { workspace = true }
tempfile = "3"
Expand All @@ -107,6 +106,7 @@ env_logger = "0.10"
half = "2.2.1"
postgres-protocol = "0.6.4"
postgres-types = { version = "0.2.4", features = ["derive", "with-chrono-0_4"] }
regex = "1.5.4"
rstest = "0.17.0"
rust_decimal = { version = "1.27.0", features = ["tokio-pg"] }
sqllogictest = "0.14.0"
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1202,7 +1202,7 @@ mod tests {
Ok(())
}

/// Parappel scan on a csv file with only 1 byte in each line
/// Parallel scan on a csv file with only 1 byte in each line
/// Testing partition byte range land on line boundaries
///
/// one_col.csv:
Expand Down Expand Up @@ -1254,7 +1254,7 @@ mod tests {
Ok(())
}

/// Parappel scan on a csv file with 2 wide rows
/// Parallel scan on a csv file with 2 wide rows
/// The byte range of a partition might be within some line
///
/// wode_rows.csv:
Expand Down
7 changes: 7 additions & 0 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,18 @@ impl CsvExec {

/// Redistribute files across partitions according to their size
/// See comments on `repartition_file_groups()` for more detail.
///
/// Return `None` if can't get repartitioned(empty/compressed file).
pub fn get_repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize,
) -> Option<Self> {
// Parallel execution on compressed CSV file is not supported yet.
if self.file_compression_type.is_compressed() {
return None;
}

let repartitioned_file_groups_option = FileScanConfig::repartition_file_groups(
self.base_config.file_groups.clone(),
target_partitions,
Expand Down
342 changes: 342 additions & 0 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1555,4 +1555,346 @@ mod tests {
extensions: None,
}
}

/// Unit tests for `repartition_file_groups()`
mod repartition_file_groups_test {
use super::*;

/// Empty file won't get partitioned
#[tokio::test]
async fn repartition_empty_file_only() {
let partitioned_file_empty = PartitionedFile::new("empty".to_string(), 0);
let file_group = vec![vec![partitioned_file_empty]];

let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: file_group,
file_schema: Arc::new(Schema::empty()),
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
},
None,
None,
);

let partitioned_file = parquet_exec
.get_repartitioned(4, 0)
.base_config()
.file_groups
.clone();

assert!(partitioned_file[0][0].range.is_none());
}

// Repartition when there is a empty file in file groups
#[tokio::test]
async fn repartition_empty_files() {
let partitioned_file_a = PartitionedFile::new("a".to_string(), 10);
let partitioned_file_b = PartitionedFile::new("b".to_string(), 10);
let partitioned_file_empty = PartitionedFile::new("empty".to_string(), 0);

let empty_first = vec![
vec![partitioned_file_empty.clone()],
vec![partitioned_file_a.clone()],
vec![partitioned_file_b.clone()],
];
let empty_middle = vec![
vec![partitioned_file_a.clone()],
vec![partitioned_file_empty.clone()],
vec![partitioned_file_b.clone()],
];
let empty_last = vec![
vec![partitioned_file_a],
vec![partitioned_file_b],
vec![partitioned_file_empty],
];

// Repartition file groups into x partitions
let expected_2 =
vec![(0, "a".to_string(), 0, 10), (1, "b".to_string(), 0, 10)];
let expected_3 = vec![
(0, "a".to_string(), 0, 7),
(1, "a".to_string(), 7, 10),
(1, "b".to_string(), 0, 4),
(2, "b".to_string(), 4, 10),
];

//let file_groups_testset = [empty_first, empty_middle, empty_last];
let file_groups_testset = [empty_first, empty_middle, empty_last];

for fg in file_groups_testset {
for (n_partition, expected) in [(2, &expected_2), (3, &expected_3)] {
let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: fg.clone(),
file_schema: Arc::new(Schema::empty()),
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
},
None,
None,
);

let actual = file_groups_to_vec(
parquet_exec
.get_repartitioned(n_partition, 10)
.base_config()
.file_groups
.clone(),
);

assert_eq!(expected, &actual);
}
}
}

#[tokio::test]
async fn repartition_single_file() {
// Single file, single partition into multiple partitions
let partitioned_file = PartitionedFile::new("a".to_string(), 123);
let single_partition = vec![vec![partitioned_file]];
let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: single_partition,
file_schema: Arc::new(Schema::empty()),
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
},
None,
None,
);

let actual = file_groups_to_vec(
parquet_exec
.get_repartitioned(4, 10)
.base_config()
.file_groups
.clone(),
);
let expected = vec![
(0, "a".to_string(), 0, 31),
(1, "a".to_string(), 31, 62),
(2, "a".to_string(), 62, 93),
(3, "a".to_string(), 93, 123),
];
assert_eq!(expected, actual);
}

#[tokio::test]
async fn repartition_too_much_partitions() {
// Single file, single parittion into 96 partitions
let partitioned_file = PartitionedFile::new("a".to_string(), 8);
let single_partition = vec![vec![partitioned_file]];
let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: single_partition,
file_schema: Arc::new(Schema::empty()),
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
},
None,
None,
);

let actual = file_groups_to_vec(
parquet_exec
.get_repartitioned(96, 5)
.base_config()
.file_groups
.clone(),
);
let expected = vec![
(0, "a".to_string(), 0, 1),
(1, "a".to_string(), 1, 2),
(2, "a".to_string(), 2, 3),
(3, "a".to_string(), 3, 4),
(4, "a".to_string(), 4, 5),
(5, "a".to_string(), 5, 6),
(6, "a".to_string(), 6, 7),
(7, "a".to_string(), 7, 8),
];
assert_eq!(expected, actual);
}

#[tokio::test]
async fn repartition_multiple_partitions() {
// Multiple files in single partition after redistribution
let partitioned_file_1 = PartitionedFile::new("a".to_string(), 40);
let partitioned_file_2 = PartitionedFile::new("b".to_string(), 60);
let source_partitions =
vec![vec![partitioned_file_1], vec![partitioned_file_2]];
let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: source_partitions,
file_schema: Arc::new(Schema::empty()),
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
},
None,
None,
);

let actual = file_groups_to_vec(
parquet_exec
.get_repartitioned(3, 10)
.base_config()
.file_groups
.clone(),
);
let expected = vec![
(0, "a".to_string(), 0, 34),
(1, "a".to_string(), 34, 40),
(1, "b".to_string(), 0, 28),
(2, "b".to_string(), 28, 60),
];
assert_eq!(expected, actual);
}

#[tokio::test]
async fn repartition_same_num_partitions() {
// "Rebalance" files across partitions
let partitioned_file_1 = PartitionedFile::new("a".to_string(), 40);
let partitioned_file_2 = PartitionedFile::new("b".to_string(), 60);
let source_partitions =
vec![vec![partitioned_file_1], vec![partitioned_file_2]];
let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: source_partitions,
file_schema: Arc::new(Schema::empty()),
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
},
None,
None,
);

let actual = file_groups_to_vec(
parquet_exec
.get_repartitioned(2, 10)
.base_config()
.file_groups
.clone(),
);
let expected = vec![
(0, "a".to_string(), 0, 40),
(0, "b".to_string(), 0, 10),
(1, "b".to_string(), 10, 60),
];
assert_eq!(expected, actual);
}

#[tokio::test]
async fn repartition_no_action_ranges() {
// No action due to Some(range) in second file
let partitioned_file_1 = PartitionedFile::new("a".to_string(), 123);
let mut partitioned_file_2 = PartitionedFile::new("b".to_string(), 144);
partitioned_file_2.range = Some(FileRange { start: 1, end: 50 });

let source_partitions =
vec![vec![partitioned_file_1], vec![partitioned_file_2]];
let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: source_partitions,
file_schema: Arc::new(Schema::empty()),
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
},
None,
None,
);

let actual = parquet_exec
.get_repartitioned(65, 10)
.base_config()
.file_groups
.clone();
assert_eq!(2, actual.len());
}

#[tokio::test]
async fn repartition_no_action_min_size() {
// No action due to target_partition_size
let partitioned_file = PartitionedFile::new("a".to_string(), 123);
let single_partition = vec![vec![partitioned_file]];
let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: single_partition,
file_schema: Arc::new(Schema::empty()),
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
},
None,
None,
);

let actual = parquet_exec
.get_repartitioned(65, 500)
.base_config()
.file_groups
.clone();
assert_eq!(1, actual.len());
}

fn file_groups_to_vec(
file_groups: Vec<Vec<PartitionedFile>>,
) -> Vec<(usize, String, i64, i64)> {
file_groups
.iter()
.enumerate()
.flat_map(|(part_idx, files)| {
files
.iter()
.map(|f| {
(
part_idx,
f.object_meta.location.to_string(),
f.range.as_ref().unwrap().start,
f.range.as_ref().unwrap().end,
)
})
.collect_vec()
})
.collect_vec()
}
}
}
Loading

0 comments on commit 947d71a

Please sign in to comment.