-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bench: Replace RoundRobinBatch
with OnDemandRepartition
#60
base: apache_main
Are you sure you want to change the base?
bench: Replace RoundRobinBatch
with OnDemandRepartition
#60
Conversation
…obin RepartitionExec
#[tokio::test] | ||
async fn test_preserve_order_with_coalesce() -> Result<()> { | ||
let schema = Arc::new(Schema::new(vec![Field::new( | ||
"my_awesome_field", | ||
DataType::UInt32, | ||
false, | ||
)])); | ||
let options = SortOptions::default(); | ||
let sort_exprs = LexOrdering::new(vec![PhysicalSortExpr { | ||
expr: col("my_awesome_field", &schema).unwrap(), | ||
options, | ||
}]); | ||
|
||
let batch = RecordBatch::try_from_iter(vec![( | ||
"my_awesome_field", | ||
Arc::new(UInt32Array::from(vec![1, 2, 3, 4])) as ArrayRef, | ||
)])?; | ||
|
||
let source = Arc::new( | ||
MemoryExec::try_new(&[vec![batch.clone()]], Arc::clone(&schema), None) | ||
.unwrap() | ||
.try_with_sort_information(vec![sort_exprs.clone()]) | ||
.unwrap(), | ||
); | ||
|
||
// output has multiple partitions, and is sorted | ||
let union = UnionExec::new(vec![source.clone(), source]); | ||
let repartition_exec = | ||
OnDemandRepartitionExec::try_new(Arc::new(union), Partitioning::OnDemand(5)) | ||
.unwrap() | ||
.with_preserve_order(); | ||
|
||
let coalesce_exec = CoalescePartitionsExec::new( | ||
Arc::new(repartition_exec) as Arc<dyn ExecutionPlan> | ||
); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unit Test for the order preserving scenario
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[2025-01-16T14:26:24Z DEBUG on_demand_repartition] OnDemandRepartitionExec, with preserve order: true
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll ObservedStream
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll result Pending
[2025-01-16T14:26:24Z TRACE datafusion_physical_plan::repartition::on_demand_repartition] Start OnDemandRepartitionExec::execute for partition: 1
[2025-01-16T14:26:24Z TRACE datafusion_physical_plan::repartition::on_demand_repartition] Start OnDemandRepartitionExec::execute for partition: 0
[2025-01-16T14:26:24Z TRACE datafusion_physical_plan::repartition::on_demand_repartition] Start OnDemandRepartitionExec::execute for partition: 2
[2025-01-16T14:26:24Z TRACE datafusion_physical_plan::union] Start UnionExec::execute for partition 0 of context session_id DEFAULT and task_id None
[2025-01-16T14:26:24Z TRACE datafusion_physical_plan::union] Start UnionExec::execute for partition 1 of context session_id DEFAULT and task_id None
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::union] Found a Union partition to execute
[2025-01-16T14:26:24Z TRACE datafusion_physical_plan::repartition::on_demand_repartition] Before returning stream in OnDemandRepartitionExec::execute for partition: 1
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::union] Found a Union partition to execute
[2025-01-16T14:26:24Z TRACE datafusion_physical_plan::repartition::on_demand_repartition] Before returning stream in OnDemandRepartitionExec::execute for partition: 0
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 0, start, is_requested false
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 1, start, is_requested false
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 0, send partition number
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 1, send partition number
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On demand pull from input Part: 0
Are Output channels empty? false
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 0, start, is_requested false
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll ObservedStream
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::memory] Memory Stream poll
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 0, send partition number
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On demand pull from input Part: 1
Are Output channels empty? false
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll ObservedStream
[2025-01-16T14:26:24Z TRACE datafusion_physical_plan::repartition::on_demand_repartition] Before returning stream in OnDemandRepartitionExec::execute for partition: 2
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 1, start, is_requested false
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::memory] Memory Stream poll
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 1, send partition number
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 1, start, is_requested true
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 0, start, is_requested true
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 1, start, is_requested true
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 0, start, is_requested true
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 1, start, is_requested true
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 1, start, is_requested true
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll result Ready(Some(Ok(RecordBatch { schema: Schema { fields: [Field { name: "c0", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<UInt32>
[
1,
2,
3,
4,
]], row_count: 4 })))
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 0, start, is_requested true
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 1, start, is_requested true
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll result Ready(Some(Ok(RecordBatch { schema: Schema { fields: [Field { name: "c0", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<UInt32>
[
1,
2,
3,
4,
]], row_count: 4 })))
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On demand pull from input Part: 0
Are Output channels empty? false
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 2, start, is_requested false
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On demand pull from input Part: 1
Are Output channels empty? false
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 0, start, is_requested true
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll ObservedStream
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 1, start, is_requested true
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 2, send partition number
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll ObservedStream
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 0, return batch
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::memory] Memory Stream poll
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::memory] Memory stream exhausted!
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 1, start, is_requested true
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll result Ready(None)
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::memory] Memory Stream poll
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 1, return batch
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::memory] Memory stream exhausted!
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll result Ready(None)
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 2, start, is_requested false
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 2, send partition number
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 2, input partitions finished
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 2, start, is_requested true
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 2, input partitions finished
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 1, start, is_requested true
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 0, start, is_requested true
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 1, input partitions finished
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 0, input partitions finished
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll ObservedStream
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll result Pending
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 0, start, is_requested false
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 0, send partition number
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 1, start, is_requested false
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 0, input partitions finished
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 1, send partition number
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition per partition poll 1, input partitions finished
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll ObservedStream
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll result Ready(Some(Ok(RecordBatch { schema: Schema { fields: [Field { name: "c0", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<UInt32>
[
1,
2,
3,
4,
]], row_count: 4 })))
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll ObservedStream
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll result Ready(Some(Ok(RecordBatch { schema: Schema { fields: [Field { name: "c0", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<UInt32>
[
1,
2,
3,
4,
]], row_count: 4 })))
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll ObservedStream
[2025-01-16T14:26:24Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll result Ready(None)
#[tokio::test] | ||
async fn many_to_many_on_demand_with_coalesce() -> Result<()> { | ||
let schema = test_schema(); | ||
let partition: Vec<RecordBatch> = create_vec_batches(1); | ||
let partitions = vec![partition.clone(), partition.clone()]; | ||
let input = Arc::new( | ||
MemoryExec::try_new(&partitions, Arc::clone(&schema), None).unwrap(), | ||
); | ||
let exec = | ||
OnDemandRepartitionExec::try_new(input, Partitioning::OnDemand(3)).unwrap(); | ||
|
||
let coalesce_exec = | ||
CoalescePartitionsExec::new(Arc::new(exec) as Arc<dyn ExecutionPlan>); | ||
|
||
let expected_plan = [ | ||
"CoalescePartitionsExec", | ||
" OnDemandRepartitionExec: partitioning=OnDemand(3), input_partitions=2", | ||
" MemoryExec: partitions=2, partition_sizes=[1, 1]", | ||
]; | ||
assert_plan!(expected_plan, coalesce_exec.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unit test for the scenario in which repartitions are from 2 to 3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll ObservedStream
[2025-01-16T14:31:01Z TRACE datafusion_physical_plan::repartition::on_demand_repartition] Start OnDemandRepartitionExec::execute for partition: 0
[2025-01-16T14:31:01Z TRACE datafusion_physical_plan::repartition::on_demand_repartition] Start OnDemandRepartitionExec::execute for partition: 2
[2025-01-16T14:31:01Z TRACE datafusion_physical_plan::repartition::on_demand_repartition] Start OnDemandRepartitionExec::execute for partition: 1
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll result Pending
[2025-01-16T14:31:01Z TRACE datafusion_physical_plan::repartition::on_demand_repartition] Before returning stream in OnDemandRepartitionExec::execute for partition: 0
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 0, start, is_requested false
[2025-01-16T14:31:01Z TRACE datafusion_physical_plan::repartition::on_demand_repartition] Before returning stream in OnDemandRepartitionExec::execute for partition: 2
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 2, start, is_requested false
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 2, send partition number
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 0, send partition number
[2025-01-16T14:31:01Z TRACE datafusion_physical_plan::repartition::on_demand_repartition] Before returning stream in OnDemandRepartitionExec::execute for partition: 1
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On demand pull from input Part: 0
Are Output channels empty? false
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On demand pull from input Part: 2
Are Output channels empty? false
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::memory] Memory Stream poll
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::memory] Memory Stream poll
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 1, start, is_requested false
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 1, send partition number
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On demand pull from input Part: 1
Are Output channels empty? false
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 2, start, is_requested true
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::memory] Memory Stream poll
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 2, return batch
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 2, start, is_requested false
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 1, start, is_requested true
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 2, send partition number
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 1, return batch
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll ObservedStream
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On demand pull from input Part: 2
Are Output channels empty? false
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::memory] Memory Stream poll
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 1, start, is_requested false
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 2, start, is_requested true
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 0, start, is_requested true
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 2, return batch
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 0, return batch
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 0, start, is_requested false
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 0, send partition number
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 1, send partition number
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 2, start, is_requested false
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On demand pull from input Part: 1
Are Output channels empty? false
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::memory] Memory Stream poll
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 2, send partition number
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::memory] Memory stream exhausted!
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On demand pull from input Part: 0
Are Output channels empty? false
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::memory] Memory Stream poll
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::memory] Memory stream exhausted!
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll result Ready(Some(Ok(RecordBatch { schema: Schema { fields: [Field { name: "c0", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<UInt32>
[
1,
2,
3,
4,
5,
6,
7,
8,
]], row_count: 8 })))
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll ObservedStream
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 1, start, is_requested true
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 0, start, is_requested true
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 2, start, is_requested true
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll result Ready(Some(Ok(RecordBatch { schema: Schema { fields: [Field { name: "c0", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<UInt32>
[
1,
2,
3,
4,
5,
6,
7,
8,
]], row_count: 8 })))
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 2, input partitions processed: 1
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 1, input partitions processed: 1
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 2, send partition number
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll ObservedStream
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 2, input partitions processed: 2
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 0, input partitions processed: 1
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 1, send partition number
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 0, send partition number
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 1, input partitions processed: 2
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::repartition::on_demand_repartition] On Demand Repartition poll 0, input partitions processed: 2
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll result Ready(Some(Ok(RecordBatch { schema: Schema { fields: [Field { name: "c0", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<UInt32>
[
1,
2,
3,
4,
5,
6,
7,
8,
]], row_count: 8 })))
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll ObservedStream
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll result Ready(Some(Ok(RecordBatch { schema: Schema { fields: [Field { name: "c0", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<UInt32>
[
1,
2,
3,
4,
5,
6,
7,
8,
]], row_count: 8 })))
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll ObservedStream
[2025-01-16T14:31:01Z DEBUG datafusion_physical_plan::stream] Coalesce partitions poll result Ready(None)
// TODO: make sure that this is only called for hash partitioning | ||
// match self.partitioning() { | ||
// Partitioning::Hash(_, _) => {} | ||
// _ => { | ||
// panic!( | ||
// "RepartitionExec::execute should never be called directly. \ | ||
// Partition type: {:?}", | ||
// self.partitioning() | ||
// ); | ||
// } | ||
// } | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uncomment it to make sure there is no RoundRobinBatch
running
d2f4245
to
7dbd8e3
Compare
d0409d5
to
d257d93
Compare
d257d93
to
0b9feff
Compare
if preserve_order { | ||
let (txs, rxs) = (0..num_input_partitions) | ||
.map(|_| async_channel::unbounded()) | ||
.unzip::<_, _, Vec<_>, Vec<_>>(); | ||
// TODO: this approach is not ideal, as it pre-fetches too many partitions | ||
for i in 0..num_output_partitions { | ||
txs.iter().for_each(|tx| { | ||
tx.send_blocking(i).expect("send partition number"); | ||
}); | ||
} | ||
Mutex::new((txs, rxs)) | ||
} else { | ||
let (tx, rx) = async_channel::unbounded(); | ||
for i in num_input_partitions..num_output_partitions { | ||
tx.send(i).await.expect("send partition number"); | ||
} | ||
Mutex::new((vec![tx], vec![rx])) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pre-fetching for only 1 RecordBatch per partition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ on-demand-repartition-replace-roundrobin ┃ on-demand-repartition-without-prefetch ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 96.72ms │ 104.88ms │ 1.08x slower │
│ QQuery 2 │ 22.39ms │ 22.36ms │ no change │
│ QQuery 3 │ 40.81ms │ 41.56ms │ no change │
│ QQuery 4 │ 40.74ms │ 37.96ms │ +1.07x faster │
│ QQuery 5 │ 110.69ms │ 93.65ms │ +1.18x faster │
│ QQuery 6 │ 18.20ms │ 19.09ms │ no change │
│ QQuery 7 │ 114.26ms │ 127.37ms │ 1.11x slower │
│ QQuery 8 │ 81.05ms │ 78.30ms │ no change │
│ QQuery 9 │ 113.23ms │ 102.60ms │ +1.10x faster │
│ QQuery 10 │ 88.45ms │ 88.70ms │ no change │
│ QQuery 11 │ 15.69ms │ 16.52ms │ 1.05x slower │
│ QQuery 12 │ 63.47ms │ 56.40ms │ +1.13x faster │
│ QQuery 13 │ 63.23ms │ 51.80ms │ +1.22x faster │
│ QQuery 14 │ 29.90ms │ 27.67ms │ +1.08x faster │
│ QQuery 15 │ 44.76ms │ 45.70ms │ no change │
│ QQuery 16 │ 21.72ms │ 20.37ms │ +1.07x faster │
│ QQuery 17 │ 91.00ms │ 90.60ms │ no change │
│ QQuery 18 │ 239.67ms │ 205.91ms │ +1.16x faster │
│ QQuery 19 │ 48.20ms │ 46.97ms │ no change │
│ QQuery 20 │ 61.66ms │ 61.29ms │ no change │
│ QQuery 21 │ 148.19ms │ 145.16ms │ no change │
│ QQuery 22 │ 16.31ms │ 22.17ms │ 1.36x slower │
└──────────────┴──────────────────────────────────────────┴────────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (on-demand-repartition-replace-roundrobin) │ 1570.34ms │
│ Total Time (on-demand-repartition-without-prefetch) │ 1507.04ms │
│ Average Time (on-demand-repartition-replace-roundrobin) │ 71.38ms │
│ Average Time (on-demand-repartition-without-prefetch) │ 68.50ms │
│ Queries Faster │ 8 │
│ Queries Slower │ 4 │
│ Queries with No Change │ 10 │
└─────────────────────────────────────────────────────────┴───────────┘
--------------------
Benchmark tpch_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ on-demand-repartition-replace-roundrobin ┃ on-demand-repartition-without-prefetch ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 860.45ms │ 855.31ms │ no change │
│ QQuery 2 │ 177.13ms │ 175.01ms │ no change │
│ QQuery 3 │ 701.53ms │ 719.69ms │ no change │
│ QQuery 4 │ 348.10ms │ 398.75ms │ 1.15x slower │
│ QQuery 5 │ 971.04ms │ 1104.76ms │ 1.14x slower │
│ QQuery 6 │ 138.41ms │ 138.01ms │ no change │
│ QQuery 7 │ 1500.41ms │ 1642.55ms │ 1.09x slower │
│ QQuery 8 │ 1124.79ms │ 1224.42ms │ 1.09x slower │
│ QQuery 9 │ 1705.72ms │ 1862.73ms │ 1.09x slower │
│ QQuery 10 │ 863.52ms │ 885.08ms │ no change │
│ QQuery 11 │ 86.06ms │ 91.63ms │ 1.06x slower │
│ QQuery 12 │ 465.17ms │ 528.57ms │ 1.14x slower │
│ QQuery 13 │ 678.71ms │ 788.26ms │ 1.16x slower │
│ QQuery 14 │ 229.66ms │ 227.71ms │ no change │
│ QQuery 15 │ 405.37ms │ 397.42ms │ no change │
│ QQuery 16 │ 150.66ms │ 121.65ms │ +1.24x faster │
│ QQuery 17 │ 1079.53ms │ 1070.55ms │ no change │
│ QQuery 18 │ 2694.29ms │ 2818.74ms │ no change │
│ QQuery 19 │ 390.06ms │ 389.29ms │ no change │
│ QQuery 20 │ 604.29ms │ 601.79ms │ no change │
│ QQuery 21 │ 2405.13ms │ 2599.56ms │ 1.08x slower │
│ QQuery 22 │ 225.75ms │ 245.89ms │ 1.09x slower │
└──────────────┴──────────────────────────────────────────┴────────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (on-demand-repartition-replace-roundrobin) │ 17805.76ms │
│ Total Time (on-demand-repartition-without-prefetch) │ 18887.35ms │
│ Average Time (on-demand-repartition-replace-roundrobin) │ 809.35ms │
│ Average Time (on-demand-repartition-without-prefetch) │ 858.52ms │
│ Queries Faster │ 1 │
│ Queries Slower │ 10 │
│ Queries with No Change │ 11 │
└─────────────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_sf50.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ on-demand-repartition-replace-roundrobin ┃ on-demand-repartition-without-prefetch ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 6395.05ms │ 5656.67ms │ +1.13x faster │
│ QQuery 2 │ 1424.15ms │ 1226.77ms │ +1.16x faster │
│ QQuery 3 │ 4086.06ms │ 3581.65ms │ +1.14x faster │
│ QQuery 4 │ 1982.80ms │ 1752.00ms │ +1.13x faster │
│ QQuery 5 │ 6637.08ms │ 5842.49ms │ +1.14x faster │
│ QQuery 6 │ 645.05ms │ 677.16ms │ no change │
│ QQuery 7 │ 22379.44ms │ 20693.95ms │ +1.08x faster │
│ QQuery 8 │ 6473.06ms │ 5886.33ms │ +1.10x faster │
│ QQuery 9 │ 11610.86ms │ 10531.03ms │ +1.10x faster │
│ QQuery 10 │ 5445.47ms │ 5108.82ms │ +1.07x faster │
│ QQuery 11 │ 854.62ms │ 835.96ms │ no change │
│ QQuery 12 │ 2304.48ms │ 2265.38ms │ no change │
│ QQuery 13 │ 4314.10ms │ 4132.23ms │ no change │
│ QQuery 14 │ 1069.96ms │ 1069.89ms │ no change │
│ QQuery 15 │ 2429.48ms │ 2512.86ms │ no change │
│ QQuery 16 │ 633.26ms │ 680.85ms │ 1.08x slower │
│ QQuery 17 │ 5877.39ms │ 5889.50ms │ no change │
│ QQuery 18 │ 25553.80ms │ 23910.79ms │ +1.07x faster │
│ QQuery 19 │ 1707.11ms │ 1865.55ms │ 1.09x slower │
│ QQuery 20 │ 3323.36ms │ 3535.05ms │ 1.06x slower │
│ QQuery 21 │ 16934.77ms │ 15111.54ms │ +1.12x faster │
│ QQuery 22 │ 1225.45ms │ 1212.27ms │ no change │
└──────────────┴──────────────────────────────────────────┴────────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ Total Time (on-demand-repartition-replace-roundrobin) │ 133306.77ms │
│ Total Time (on-demand-repartition-without-prefetch) │ 123978.76ms │
│ Average Time (on-demand-repartition-replace-roundrobin) │ 6059.40ms │
│ Average Time (on-demand-repartition-without-prefetch) │ 5635.40ms │
│ Queries Faster │ 11 │
│ Queries Slower │ 3 │
│ Queries with No Change │ 8 │
└─────────────────────────────────────────────────────────┴─────────────┘
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This optimization seems less effective on a large dataset like tpch_sf50
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the number of partitions increases (e.g., 20 partitions), the pre-fetch impact becomes noticeable.
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ on-demand-repartition-replace-roundrobin ┃ on-demand-repartition-without-prefetch ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 95.31ms │ 98.31ms │ no change │
│ QQuery 2 │ 23.73ms │ 23.03ms │ no change │
│ QQuery 3 │ 39.42ms │ 42.96ms │ 1.09x slower │
│ QQuery 4 │ 34.76ms │ 44.47ms │ 1.28x slower │
│ QQuery 5 │ 111.28ms │ 104.75ms │ +1.06x faster │
│ QQuery 6 │ 18.35ms │ 18.19ms │ no change │
│ QQuery 7 │ 133.22ms │ 111.54ms │ +1.19x faster │
│ QQuery 8 │ 88.20ms │ 84.82ms │ no change │
│ QQuery 9 │ 107.83ms │ 109.24ms │ no change │
│ QQuery 10 │ 87.40ms │ 96.54ms │ 1.10x slower │
│ QQuery 11 │ 15.26ms │ 14.93ms │ no change │
│ QQuery 12 │ 55.97ms │ 59.03ms │ 1.05x slower │
│ QQuery 13 │ 50.08ms │ 54.24ms │ 1.08x slower │
│ QQuery 14 │ 29.82ms │ 29.79ms │ no change │
│ QQuery 15 │ 45.04ms │ 45.20ms │ no change │
│ QQuery 16 │ 23.53ms │ 23.14ms │ no change │
│ QQuery 17 │ 90.03ms │ 89.35ms │ no change │
│ QQuery 18 │ 192.57ms │ 195.34ms │ no change │
│ QQuery 19 │ 48.53ms │ 46.89ms │ no change │
│ QQuery 20 │ 65.51ms │ 61.19ms │ +1.07x faster │
│ QQuery 21 │ 143.90ms │ 139.34ms │ no change │
│ QQuery 22 │ 20.22ms │ 19.04ms │ +1.06x faster │
└──────────────┴──────────────────────────────────────────┴────────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (on-demand-repartition-replace-roundrobin) │ 1519.96ms │
│ Total Time (on-demand-repartition-without-prefetch) │ 1511.33ms │
│ Average Time (on-demand-repartition-replace-roundrobin) │ 69.09ms │
│ Average Time (on-demand-repartition-without-prefetch) │ 68.70ms │
│ Queries Faster │ 4 │
│ Queries Slower │ 5 │
│ Queries with No Change │ 13 │
└─────────────────────────────────────────────────────────┴───────────┘
--------------------
Benchmark tpch_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ on-demand-repartition-replace-roundrobin ┃ on-demand-repartition-without-prefetch ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 867.43ms │ 853.48ms │ no change │
│ QQuery 2 │ 158.14ms │ 175.43ms │ 1.11x slower │
│ QQuery 3 │ 705.16ms │ 626.26ms │ +1.13x faster │
│ QQuery 4 │ 326.22ms │ 349.08ms │ 1.07x slower │
│ QQuery 5 │ 1053.89ms │ 1116.19ms │ 1.06x slower │
│ QQuery 6 │ 145.22ms │ 143.41ms │ no change │
│ QQuery 7 │ 1267.09ms │ 1489.60ms │ 1.18x slower │
│ QQuery 8 │ 1155.62ms │ 1192.79ms │ no change │
│ QQuery 9 │ 1743.53ms │ 1702.39ms │ no change │
│ QQuery 10 │ 914.08ms │ 943.89ms │ no change │
│ QQuery 11 │ 97.70ms │ 96.26ms │ no change │
│ QQuery 12 │ 448.54ms │ 518.56ms │ 1.16x slower │
│ QQuery 13 │ 628.52ms │ 630.54ms │ no change │
│ QQuery 14 │ 243.18ms │ 246.24ms │ no change │
│ QQuery 15 │ 407.33ms │ 417.43ms │ no change │
│ QQuery 16 │ 148.11ms │ 153.41ms │ no change │
│ QQuery 17 │ 1100.94ms │ 1087.50ms │ no change │
│ QQuery 18 │ 2629.93ms │ 2665.17ms │ no change │
│ QQuery 19 │ 443.48ms │ 412.67ms │ +1.07x faster │
│ QQuery 20 │ 609.54ms │ 611.18ms │ no change │
│ QQuery 21 │ 2313.97ms │ 2321.72ms │ no change │
│ QQuery 22 │ 215.95ms │ 224.28ms │ no change │
└──────────────┴──────────────────────────────────────────┴────────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (on-demand-repartition-replace-roundrobin) │ 17623.56ms │
│ Total Time (on-demand-repartition-without-prefetch) │ 17977.52ms │
│ Average Time (on-demand-repartition-replace-roundrobin) │ 801.07ms │
│ Average Time (on-demand-repartition-without-prefetch) │ 817.16ms │
│ Queries Faster │ 2 │
│ Queries Slower │ 5 │
│ Queries with No Change │ 15 │
└─────────────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_sf50.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ on-demand-repartition-replace-roundrobin ┃ on-demand-repartition-without-prefetch ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 5532.99ms │ 5390.94ms │ no change │
│ QQuery 2 │ 1236.69ms │ 1237.80ms │ no change │
│ QQuery 3 │ 3473.12ms │ 3899.69ms │ 1.12x slower │
│ QQuery 4 │ 1609.12ms │ 1701.40ms │ 1.06x slower │
│ QQuery 5 │ 5896.20ms │ 6344.38ms │ 1.08x slower │
│ QQuery 6 │ 692.00ms │ 715.17ms │ no change │
│ QQuery 7 │ 26702.46ms │ 21597.19ms │ +1.24x faster │
│ QQuery 8 │ 5985.10ms │ 6111.78ms │ no change │
│ QQuery 9 │ 26732.16ms │ 35092.38ms │ 1.31x slower │
│ QQuery 10 │ 4798.24ms │ 4673.95ms │ no change │
│ QQuery 11 │ 1126.43ms │ 1099.72ms │ no change │
│ QQuery 12 │ 2360.55ms │ 2334.11ms │ no change │
│ QQuery 13 │ 3915.00ms │ 3890.49ms │ no change │
│ QQuery 14 │ 1139.22ms │ 1107.63ms │ no change │
│ QQuery 15 │ 2487.07ms │ 2521.86ms │ no change │
│ QQuery 16 │ 696.90ms │ 694.05ms │ no change │
│ QQuery 17 │ 6101.21ms │ 6129.60ms │ no change │
│ QQuery 18 │ 41804.39ms │ 39732.32ms │ no change │
│ QQuery 19 │ 1859.20ms │ 1916.88ms │ no change │
│ QQuery 20 │ 3231.03ms │ 3264.09ms │ no change │
│ QQuery 21 │ 14863.56ms │ 14528.08ms │ no change │
│ QQuery 22 │ 1250.98ms │ 1203.56ms │ no change │
└──────────────┴──────────────────────────────────────────┴────────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ Total Time (on-demand-repartition-replace-roundrobin) │ 163493.60ms │
│ Total Time (on-demand-repartition-without-prefetch) │ 165187.05ms │
│ Average Time (on-demand-repartition-replace-roundrobin) │ 7431.53ms │
│ Average Time (on-demand-repartition-without-prefetch) │ 7508.50ms │
│ Queries Faster │ 1 │
│ Queries Slower │ 4 │
│ Queries with No Change │ 17 │
└─────────────────────────────────────────────────────────┴─────────────┘
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
It seems each RoundRobin is replaced with OnDemandRepartition and working.
As far as I understand this code is doing a pre-fetch for each partition only once, and once they're consumed it's waiting for the input poll response again. However, what we imagined is, having a buffer (like in SPM) and storing 1 Record Batch there, so that each time a partition is requested, returning that pre-fetched RecordBatch and asynchronously fetching from the input again and storing it to the buffer so that it can be returned in a new poll. This seems doing the same behavior but only once at the beginning so it has no big performance difference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It always performs a pre-fetch because when the prepared batch is consumed, it continues to process the next one. The channel for sending the partition number always keeps every partition because the poll function can send a new partition when consumed. It can help to pre-fetch the next poll.
pub(crate) async fn pull_from_input(
input: Arc<dyn ExecutionPlan>,
partition: usize,
mut output_channels: HashMap<
usize,
(DistributionSender<MaybeBatch>, SharedMemoryReservation),
>,
partitioning: Partitioning,
output_partition_rx: Receiver<usize>,
metrics: RepartitionMetrics,
context: Arc<TaskContext>,
) -> Result<()> {
let mut partitioner =
BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?;
// execute the child operator
let timer = metrics.fetch_time.timer();
let mut stream = input.execute(partition, context)?;
timer.done();
// While there are still outputs to send to, keep pulling inputs
let mut batches_until_yield = partitioner.num_partitions();
while !output_channels.is_empty() {
// fetch the next batch
let timer = metrics.fetch_time.timer();
let result = stream.next().await;
timer.done();
// Input is done
let batch = match result {
Some(result) => result?,
None => break,
};
// Get the partition number from the output partition
let partition = output_partition_rx.recv().await.map_err(|e| {
internal_datafusion_err!(
"Error receiving partition number from output partition: {}",
e
)
})?;
// TODO: To be removed
debug!(
"On demand pull from input Part: {} \n Are Output channels empty? {}",
partition,
output_channels.is_empty()
);
impl Stream for OnDemandRepartitionStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
// TODO: To be removed
debug!(
"On Demand Repartition poll {}, start, is_requested {}",
self.partition, self.is_requested,
);
loop {
// Send partition number to input partitions
if !self.sender.is_closed() && !self.is_requested {
self.sender.send_blocking(self.partition).map_err(|e| {
internal_datafusion_err!(
"Error sending partition number to input partitions: {}",
e
)
})?;
debug!(
"On Demand Repartition poll {}, send partition number",
self.partition
);
self.is_requested = true;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will consider a buffer (like SPM)
UPDATE: The performance has improved a bit when using one task for processing and one for distributing the batch.
Comparing on-demand-repartition-replace-roundrobin and on-demand-repartition-without-prefetch
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ on-demand-repartition-replace-roundrobin ┃ on-demand-repartition-without-prefetch ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 101.05ms │ 95.22ms │ +1.06x faster │
│ QQuery 2 │ 22.17ms │ 22.07ms │ no change │
│ QQuery 3 │ 41.38ms │ 38.97ms │ +1.06x faster │
│ QQuery 4 │ 39.80ms │ 42.82ms │ 1.08x slower │
│ QQuery 5 │ 91.81ms │ 106.76ms │ 1.16x slower │
│ QQuery 6 │ 18.84ms │ 18.44ms │ no change │
│ QQuery 7 │ 128.56ms │ 131.67ms │ no change │
│ QQuery 8 │ 84.78ms │ 86.02ms │ no change │
│ QQuery 9 │ 117.57ms │ 118.42ms │ no change │
│ QQuery 10 │ 86.60ms │ 88.41ms │ no change │
│ QQuery 11 │ 16.38ms │ 14.87ms │ +1.10x faster │
│ QQuery 12 │ 59.67ms │ 63.42ms │ 1.06x slower │
│ QQuery 13 │ 56.23ms │ 57.17ms │ no change │
│ QQuery 14 │ 29.25ms │ 28.57ms │ no change │
│ QQuery 15 │ 45.30ms │ 45.31ms │ no change │
│ QQuery 16 │ 23.06ms │ 22.19ms │ no change │
│ QQuery 17 │ 95.49ms │ 89.47ms │ +1.07x faster │
│ QQuery 18 │ 204.34ms │ 233.89ms │ 1.14x slower │
│ QQuery 19 │ 48.39ms │ 48.35ms │ no change │
│ QQuery 20 │ 63.96ms │ 60.80ms │ no change │
│ QQuery 21 │ 105.41ms │ 96.60ms │ +1.09x faster │
│ QQuery 22 │ 20.14ms │ 20.60ms │ no change │
└──────────────┴──────────────────────────────────────────┴────────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (on-demand-repartition-replace-roundrobin) │ 1500.16ms │
│ Total Time (on-demand-repartition-without-prefetch) │ 1530.05ms │
│ Average Time (on-demand-repartition-replace-roundrobin) │ 68.19ms │
│ Average Time (on-demand-repartition-without-prefetch) │ 69.55ms │
│ Queries Faster │ 5 │
│ Queries Slower │ 4 │
│ Queries with No Change │ 13 │
└─────────────────────────────────────────────────────────┴───────────┘
--------------------
Benchmark tpch_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ on-demand-repartition-replace-roundrobin ┃ on-demand-repartition-without-prefetch ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 844.05ms │ 865.30ms │ no change │
│ QQuery 2 │ 181.66ms │ 171.34ms │ +1.06x faster │
│ QQuery 3 │ 759.58ms │ 822.10ms │ 1.08x slower │
│ QQuery 4 │ 377.12ms │ 365.09ms │ no change │
│ QQuery 5 │ 1134.34ms │ 1231.87ms │ 1.09x slower │
│ QQuery 6 │ 140.23ms │ 136.95ms │ no change │
│ QQuery 7 │ 1554.83ms │ 1700.98ms │ 1.09x slower │
│ QQuery 8 │ 1213.22ms │ 1350.11ms │ 1.11x slower │
│ QQuery 9 │ 1576.67ms │ 1985.24ms │ 1.26x slower │
│ QQuery 10 │ 919.58ms │ 1053.39ms │ 1.15x slower │
│ QQuery 11 │ 96.04ms │ 88.00ms │ +1.09x faster │
│ QQuery 12 │ 531.73ms │ 538.45ms │ no change │
│ QQuery 13 │ 785.54ms │ 844.42ms │ 1.07x slower │
│ QQuery 14 │ 238.57ms │ 231.57ms │ no change │
│ QQuery 15 │ 393.27ms │ 404.61ms │ no change │
│ QQuery 16 │ 112.30ms │ 146.72ms │ 1.31x slower │
│ QQuery 17 │ 1068.94ms │ 1065.28ms │ no change │
│ QQuery 18 │ 3046.81ms │ 3332.41ms │ 1.09x slower │
│ QQuery 19 │ 407.31ms │ 382.81ms │ +1.06x faster │
│ QQuery 20 │ 597.33ms │ 587.91ms │ no change │
│ QQuery 21 │ 2525.42ms │ 2847.28ms │ 1.13x slower │
│ QQuery 22 │ 255.02ms │ 278.30ms │ 1.09x slower │
└──────────────┴──────────────────────────────────────────┴────────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (on-demand-repartition-replace-roundrobin) │ 18759.57ms │
│ Total Time (on-demand-repartition-without-prefetch) │ 20430.12ms │
│ Average Time (on-demand-repartition-replace-roundrobin) │ 852.71ms │
│ Average Time (on-demand-repartition-without-prefetch) │ 928.64ms │
│ Queries Faster │ 3 │
│ Queries Slower │ 11 │
│ Queries with No Change │ 8 │
└─────────────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_sf50.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ on-demand-repartition-replace-roundrobin ┃ on-demand-repartition-without-prefetch ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 5630.77ms │ 6063.83ms │ 1.08x slower │
│ QQuery 2 │ 1332.75ms │ 1219.98ms │ +1.09x faster │
│ QQuery 3 │ 4195.18ms │ 3983.65ms │ +1.05x faster │
│ QQuery 4 │ 2020.46ms │ 1933.75ms │ no change │
│ QQuery 5 │ 6998.04ms │ 6555.28ms │ +1.07x faster │
│ QQuery 6 │ 678.68ms │ 680.64ms │ no change │
│ QQuery 7 │ 21129.42ms │ 22438.01ms │ 1.06x slower │
│ QQuery 8 │ 6424.30ms │ 6504.34ms │ no change │
│ QQuery 9 │ 10982.68ms │ 11596.16ms │ 1.06x slower │
│ QQuery 10 │ 5438.16ms │ 5133.71ms │ +1.06x faster │
│ QQuery 11 │ 881.07ms │ 832.57ms │ +1.06x faster │
│ QQuery 12 │ 2299.35ms │ 2349.00ms │ no change │
│ QQuery 13 │ 4234.12ms │ 4261.88ms │ no change │
│ QQuery 14 │ 1034.70ms │ 1049.49ms │ no change │
│ QQuery 15 │ 2445.49ms │ 2525.28ms │ no change │
│ QQuery 16 │ 696.63ms │ 557.66ms │ +1.25x faster │
│ QQuery 17 │ 5887.00ms │ 5932.39ms │ no change │
│ QQuery 18 │ 22858.52ms │ 25328.14ms │ 1.11x slower │
│ QQuery 19 │ 1707.48ms │ 1812.90ms │ 1.06x slower │
│ QQuery 20 │ 3274.13ms │ 3662.20ms │ 1.12x slower │
│ QQuery 21 │ 16277.60ms │ 10251.30ms │ +1.59x faster │
│ QQuery 22 │ 1206.50ms │ 1271.62ms │ 1.05x slower │
└──────────────┴──────────────────────────────────────────┴────────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ Total Time (on-demand-repartition-replace-roundrobin) │ 127633.02ms │
│ Total Time (on-demand-repartition-without-prefetch) │ 125943.77ms │
│ Average Time (on-demand-repartition-replace-roundrobin) │ 5801.50ms │
│ Average Time (on-demand-repartition-without-prefetch) │ 5724.72ms │
│ Queries Faster │ 7 │
│ Queries Slower │ 7 │
│ Queries with No Change │ 8 │
└─────────────────────────────────────────────────────────┴─────────────┘
These new results are exciting. How long do you estimate it will take to get this PR ready? |
This PR evaluates the performance of on-demand repartitioning after replacing |
Round-robin repartitioning is a method to evenly distribute the load, based on the assumption that each execution path performs at similar rates. However, this may not always be the case, especially as the partition count increases. What I expect with this on-demand approach is that the work will actually be distributed more evenly. If one partition processes more data, it should receive more batches. This should improve overall performance. To make this even more performant, we should consider utilizing pre-fetching, which would certainly enhance the results. BTW, under how many partitions were these results obtained? |
I use the default setting; on my side, it is 11. |
I retested the benchmark by replacing
|
We are both on annual leave and will be back next week. I don’t want to block you from proceeding, so perhaps you can prepare this PR for upstream in the meantime. To do this:
I just quickly skimmed the changes and couldn't do a detailed review, so if any of my suggestions are not applicable, feel free to ignore them. |
I am preparing PR upstream now, apache#14411. |
Which issue does this PR close?
Follow #55
Replacing all
RoundRobins
withOnDemandRepartitions
so that we can check every possible scenario on DataFusion.What changes are included in this PR?
Replace
RepartitionExec::Hash
withOnDemandRepartitionExec
Are these changes tested?
Yes
BenchMark
Summary
On-demand repartitioning can partially eliminate the performance impact of pull-based SPM, particularly when processing large datasets like TPCH-50. The impact becomes more noticeable as the number of partitions increases beyond the default setting of 11 partitions, such as when using 20 partitions.