Skip to content

Commit

Permalink
test: Added tests for repartitions from n to m
Browse files Browse the repository at this point in the history
  • Loading branch information
Weijun-H committed Jan 16, 2025
1 parent 2c95ca3 commit 87b3d1b
Showing 1 changed file with 83 additions and 28 deletions.
111 changes: 83 additions & 28 deletions datafusion/physical-plan/src/repartition/on_demand_repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,34 @@ mod tests {
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use tokio::task::JoinSet;

use arrow_schema::SortOptions;

use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::union::UnionExec;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};

/// Asserts that the plan is as expected
///
/// `$EXPECTED_PLAN_LINES`: input plan
/// `$PLAN`: the plan to optimized
///
macro_rules! assert_plan {
($EXPECTED_PLAN_LINES: expr, $PLAN: expr) => {
let physical_plan = $PLAN;
let formatted = crate::displayable(&physical_plan).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();

let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES
.iter().map(|s| *s).collect();

assert_eq!(
expected_plan_lines, actual,
"\n**Original Plan Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n"
);
};
}

fn test_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]))
}
Expand Down Expand Up @@ -775,6 +803,61 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn many_to_many_on_demand_with_coalesce() -> Result<()> {
let schema = test_schema();
let partition: Vec<RecordBatch> = create_vec_batches(1);
let partitions = vec![partition.clone(), partition.clone()];
let input = Arc::new(
MemoryExec::try_new(&partitions, Arc::clone(&schema), None).unwrap(),
);
let exec =
OnDemandRepartitionExec::try_new(input, Partitioning::OnDemand(3)).unwrap();

let coalesce_exec =
CoalescePartitionsExec::new(Arc::new(exec) as Arc<dyn ExecutionPlan>);

let expected_plan = [
"CoalescePartitionsExec",
" OnDemandRepartitionExec: partitioning=OnDemand(3), input_partitions=2",
" MemoryExec: partitions=2, partition_sizes=[1, 1]",
];
assert_plan!(expected_plan, coalesce_exec.clone());

// execute the plan
let task_ctx = Arc::new(TaskContext::default());
let stream = coalesce_exec.execute(0, task_ctx)?;
let batches = crate::common::collect(stream).await?;

#[rustfmt::skip]
let expected = vec![
"+----+",
"| c0 |",
"+----+",
"| 1 |",
"| 1 |",
"| 2 |",
"| 2 |",
"| 3 |",
"| 3 |",
"| 4 |",
"| 4 |",
"| 5 |",
"| 5 |",
"| 6 |",
"| 6 |",
"| 7 |",
"| 7 |",
"| 8 |",
"| 8 |",
"+----+",
];

assert_batches_sorted_eq!(&expected, &batches);

Ok(())
}

#[tokio::test]
async fn unsupported_partitioning() {
let task_ctx = Arc::new(TaskContext::default());
Expand Down Expand Up @@ -1105,34 +1188,6 @@ mod tests {
.unwrap()
}

use arrow_schema::SortOptions;

use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::union::UnionExec;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};

/// Asserts that the plan is as expected
///
/// `$EXPECTED_PLAN_LINES`: input plan
/// `$PLAN`: the plan to optimized
///
macro_rules! assert_plan {
($EXPECTED_PLAN_LINES: expr, $PLAN: expr) => {
let physical_plan = $PLAN;
let formatted = crate::displayable(&physical_plan).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();

let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES
.iter().map(|s| *s).collect();

assert_eq!(
expected_plan_lines, actual,
"\n**Original Plan Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n"
);
};
}

#[tokio::test]
async fn test_preserve_order() -> Result<()> {
let schema = test_schema();
Expand Down

0 comments on commit 87b3d1b

Please sign in to comment.