Skip to content

Commit 8acdd12

Browse files
committed
Refactor Ballista planner to support RepartitionExec
1 parent 23b7898 commit 8acdd12

File tree

3 files changed

+41
-94
lines changed

3 files changed

+41
-94
lines changed

ballista/rust/core/src/utils.rs

+1-10
Original file line numberDiff line numberDiff line change
@@ -227,16 +227,7 @@ fn build_exec_plan_diagram(
227227

228228
/// Create a DataFusion context that is compatible with Ballista
229229
pub fn create_datafusion_context() -> ExecutionContext {
230-
// remove Repartition rule because that isn't supported yet
231-
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
232-
Arc::new(CoalesceBatches::new()),
233-
Arc::new(AddCoalescePartitionsExec::new()),
234-
];
235-
let config = ExecutionConfig::new()
236-
.with_concurrency(1)
237-
.with_repartition_joins(false)
238-
.with_repartition_aggregations(false)
239-
.with_physical_optimizer_rules(rules);
230+
let config = ExecutionConfig::new().with_concurrency(2); // TODO: this is hack to enable partitioned joins
240231
ExecutionContext::with_config(config)
241232
}
242233

ballista/rust/scheduler/src/planner.rs

+38-68
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,11 @@ use ballista_core::{
2828
execution_plans::{ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec},
2929
serde::scheduler::PartitionLocation,
3030
};
31-
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
32-
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
33-
use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
34-
use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
31+
use datafusion::execution::context::ExecutionContext;
3532
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
36-
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
37-
use datafusion::physical_plan::hash_join::HashJoinExec;
33+
use datafusion::physical_plan::repartition::RepartitionExec;
3834
use datafusion::physical_plan::windows::WindowAggExec;
39-
use datafusion::physical_plan::ExecutionPlan;
35+
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
4036
use log::info;
4137

4238
type PartialQueryStageResult = (Arc<dyn ExecutionPlan>, Vec<Arc<ShuffleWriterExec>>);
@@ -71,13 +67,18 @@ impl DistributedPlanner {
7167
info!("planning query stages");
7268
let (new_plan, mut stages) =
7369
self.plan_query_stages_internal(job_id, execution_plan)?;
74-
stages.push(create_query_stage(job_id, self.next_stage_id(), new_plan)?);
70+
stages.push(create_shuffle_writer(
71+
job_id,
72+
self.next_stage_id(),
73+
new_plan,
74+
None,
75+
)?);
7576
Ok(stages)
7677
}
7778

7879
/// Returns a potentially modified version of the input execution_plan along with the resulting query stages.
7980
/// This function is needed because the input execution_plan might need to be modified, but it might not hold a
80-
/// compelte query stage (its parent might also belong to the same stage)
81+
/// complete query stage (its parent might also belong to the same stage)
8182
fn plan_query_stages_internal(
8283
&mut self,
8384
job_id: &str,
@@ -98,58 +99,44 @@ impl DistributedPlanner {
9899
}
99100

100101
if let Some(adapter) = execution_plan.as_any().downcast_ref::<DfTableAdapter>() {
101-
// remove Repartition rule because that isn't supported yet
102-
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
103-
Arc::new(CoalesceBatches::new()),
104-
Arc::new(AddCoalescePartitionsExec::new()),
105-
];
106-
let config = ExecutionConfig::new().with_physical_optimizer_rules(rules);
107-
let ctx = ExecutionContext::with_config(config);
102+
let ctx = ExecutionContext::new();
108103
Ok((ctx.create_physical_plan(&adapter.logical_plan)?, stages))
109-
} else if let Some(merge) = execution_plan
104+
} else if let Some(coalesce) = execution_plan
110105
.as_any()
111106
.downcast_ref::<CoalescePartitionsExec>()
112107
{
113-
let query_stage = create_query_stage(
108+
let query_stage = create_shuffle_writer(
114109
job_id,
115110
self.next_stage_id(),
116-
merge.children()[0].clone(),
111+
coalesce.children()[0].clone(),
112+
None,
117113
)?;
118114
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
119115
vec![query_stage.stage_id()],
120116
query_stage.schema(),
121117
query_stage.output_partitioning().partition_count(),
122118
));
123119
stages.push(query_stage);
124-
Ok((merge.with_new_children(vec![unresolved_shuffle])?, stages))
125-
} else if let Some(agg) =
126-
execution_plan.as_any().downcast_ref::<HashAggregateExec>()
120+
Ok((
121+
coalesce.with_new_children(vec![unresolved_shuffle])?,
122+
stages,
123+
))
124+
} else if let Some(repart) =
125+
execution_plan.as_any().downcast_ref::<RepartitionExec>()
127126
{
128-
//TODO should insert query stages in more generic way based on partitioning metadata
129-
// and not specifically for this operator
130-
match agg.mode() {
131-
AggregateMode::Final | AggregateMode::FinalPartitioned => {
132-
let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
133-
for child in &children {
134-
let new_stage = create_query_stage(
135-
job_id,
136-
self.next_stage_id(),
137-
child.clone(),
138-
)?;
139-
new_children.push(Arc::new(UnresolvedShuffleExec::new(
140-
vec![new_stage.stage_id()],
141-
new_stage.schema().clone(),
142-
new_stage.output_partitioning().partition_count(),
143-
)));
144-
stages.push(new_stage);
145-
}
146-
Ok((agg.with_new_children(new_children)?, stages))
147-
}
148-
AggregateMode::Partial => Ok((agg.with_new_children(children)?, stages)),
149-
}
150-
} else if let Some(join) = execution_plan.as_any().downcast_ref::<HashJoinExec>()
151-
{
152-
Ok((join.with_new_children(children)?, stages))
127+
let query_stage = create_shuffle_writer(
128+
job_id,
129+
self.next_stage_id(),
130+
repart.children()[0].clone(),
131+
Some(repart.partitioning().to_owned()),
132+
)?;
133+
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
134+
vec![query_stage.stage_id()],
135+
query_stage.schema(),
136+
query_stage.output_partitioning().partition_count(),
137+
));
138+
stages.push(query_stage);
139+
Ok((unresolved_shuffle, stages))
153140
} else if let Some(window) =
154141
execution_plan.as_any().downcast_ref::<WindowAggExec>()
155142
{
@@ -158,25 +145,7 @@ impl DistributedPlanner {
158145
window
159146
)))
160147
} else {
161-
// TODO check for compatible partitioning schema, not just count
162-
if execution_plan.output_partitioning().partition_count()
163-
!= children[0].output_partitioning().partition_count()
164-
{
165-
let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
166-
for child in &children {
167-
let new_stage =
168-
create_query_stage(job_id, self.next_stage_id(), child.clone())?;
169-
new_children.push(Arc::new(UnresolvedShuffleExec::new(
170-
vec![new_stage.stage_id()],
171-
new_stage.schema().clone(),
172-
new_stage.output_partitioning().partition_count(),
173-
)));
174-
stages.push(new_stage);
175-
}
176-
Ok((execution_plan.with_new_children(new_children)?, stages))
177-
} else {
178-
Ok((execution_plan.with_new_children(children)?, stages))
179-
}
148+
Ok((execution_plan.with_new_children(children)?, stages))
180149
}
181150
}
182151

@@ -224,17 +193,18 @@ pub fn remove_unresolved_shuffles(
224193
Ok(stage.with_new_children(new_children)?)
225194
}
226195

227-
fn create_query_stage(
196+
fn create_shuffle_writer(
228197
job_id: &str,
229198
stage_id: usize,
230199
plan: Arc<dyn ExecutionPlan>,
200+
partitioning: Option<Partitioning>,
231201
) -> Result<Arc<ShuffleWriterExec>> {
232202
Ok(Arc::new(ShuffleWriterExec::try_new(
233203
job_id.to_owned(),
234204
stage_id,
235205
plan,
236206
"".to_owned(), // executor will decide on the work_dir path
237-
None,
207+
partitioning,
238208
)?))
239209
}
240210

ballista/rust/scheduler/src/test_utils.rs

+2-16
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,18 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::sync::Arc;
19-
2018
use ballista_core::error::Result;
2119

2220
use datafusion::arrow::datatypes::{DataType, Field, Schema};
23-
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
24-
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
25-
use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
26-
use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
21+
use datafusion::execution::context::ExecutionContext;
2722
use datafusion::physical_plan::csv::CsvReadOptions;
2823

2924
pub const TPCH_TABLES: &[&str] = &[
3025
"part", "supplier", "partsupp", "customer", "orders", "lineitem", "nation", "region",
3126
];
3227

3328
pub fn datafusion_test_context(path: &str) -> Result<ExecutionContext> {
34-
// remove Repartition rule because that isn't supported yet
35-
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
36-
Arc::new(AddCoalescePartitionsExec::new()),
37-
Arc::new(CoalesceBatches::new()),
38-
];
39-
let config = ExecutionConfig::new()
40-
.with_physical_optimizer_rules(rules)
41-
.with_repartition_aggregations(false);
42-
let mut ctx = ExecutionContext::with_config(config);
43-
29+
let mut ctx = ExecutionContext::new();
4430
for table in TPCH_TABLES {
4531
let schema = get_tpch_schema(table);
4632
let options = CsvReadOptions::new()

0 commit comments

Comments
 (0)