From 87b3d1b5e9a30dbb7855b3e638ad1e5a7c19f949 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Thu, 16 Jan 2025 18:47:02 +0800 Subject: [PATCH] test: Added tests for repartitions from n to m --- .../src/repartition/on_demand_repartition.rs | 111 +++++++++++++----- 1 file changed, 83 insertions(+), 28 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/on_demand_repartition.rs b/datafusion/physical-plan/src/repartition/on_demand_repartition.rs index d57040252c08..3a9c592bf309 100644 --- a/datafusion/physical-plan/src/repartition/on_demand_repartition.rs +++ b/datafusion/physical-plan/src/repartition/on_demand_repartition.rs @@ -709,6 +709,34 @@ mod tests { use datafusion_execution::runtime_env::RuntimeEnvBuilder; use tokio::task::JoinSet; + use arrow_schema::SortOptions; + + use crate::coalesce_partitions::CoalescePartitionsExec; + use crate::union::UnionExec; + use datafusion_physical_expr::expressions::col; + use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; + + /// Asserts that the plan is as expected + /// + /// `$EXPECTED_PLAN_LINES`: input plan + /// `$PLAN`: the plan to optimized + /// + macro_rules! assert_plan { + ($EXPECTED_PLAN_LINES: expr, $PLAN: expr) => { + let physical_plan = $PLAN; + let formatted = crate::displayable(&physical_plan).indent(true).to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + + let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES + .iter().map(|s| *s).collect(); + + assert_eq!( + expected_plan_lines, actual, + "\n**Original Plan Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n" + ); + }; + } + fn test_schema() -> Arc { Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])) } @@ -775,6 +803,61 @@ mod tests { Ok(()) } + #[tokio::test] + async fn many_to_many_on_demand_with_coalesce() -> Result<()> { + let schema = test_schema(); + let partition: Vec = create_vec_batches(1); + let partitions = vec![partition.clone(), partition.clone()]; + let input = Arc::new( + MemoryExec::try_new(&partitions, Arc::clone(&schema), None).unwrap(), + ); + let exec = + OnDemandRepartitionExec::try_new(input, Partitioning::OnDemand(3)).unwrap(); + + let coalesce_exec = + CoalescePartitionsExec::new(Arc::new(exec) as Arc); + + let expected_plan = [ + "CoalescePartitionsExec", + " OnDemandRepartitionExec: partitioning=OnDemand(3), input_partitions=2", + " MemoryExec: partitions=2, partition_sizes=[1, 1]", + ]; + assert_plan!(expected_plan, coalesce_exec.clone()); + + // execute the plan + let task_ctx = Arc::new(TaskContext::default()); + let stream = coalesce_exec.execute(0, task_ctx)?; + let batches = crate::common::collect(stream).await?; + + #[rustfmt::skip] + let expected = vec![ + "+----+", + "| c0 |", + "+----+", + "| 1 |", + "| 1 |", + "| 2 |", + "| 2 |", + "| 3 |", + "| 3 |", + "| 4 |", + "| 4 |", + "| 5 |", + "| 5 |", + "| 6 |", + "| 6 |", + "| 7 |", + "| 7 |", + "| 8 |", + "| 8 |", + "+----+", + ]; + + assert_batches_sorted_eq!(&expected, &batches); + + Ok(()) + } + #[tokio::test] async fn unsupported_partitioning() { let task_ctx = Arc::new(TaskContext::default()); @@ -1105,34 +1188,6 @@ mod tests { .unwrap() } - use arrow_schema::SortOptions; - - use crate::coalesce_partitions::CoalescePartitionsExec; - use crate::union::UnionExec; - use datafusion_physical_expr::expressions::col; - use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; - - /// Asserts that the plan is as expected - /// - /// `$EXPECTED_PLAN_LINES`: input plan - /// `$PLAN`: the plan to optimized - /// - macro_rules! assert_plan { - ($EXPECTED_PLAN_LINES: expr, $PLAN: expr) => { - let physical_plan = $PLAN; - let formatted = crate::displayable(&physical_plan).indent(true).to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - - let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES - .iter().map(|s| *s).collect(); - - assert_eq!( - expected_plan_lines, actual, - "\n**Original Plan Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n" - ); - }; - } - #[tokio::test] async fn test_preserve_order() -> Result<()> { let schema = test_schema();