Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ballista: Implement scalable distributed joins #634

Merged
merged 2 commits into from
Jul 4, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,7 @@ fn build_exec_plan_diagram(

/// Create a DataFusion context that is compatible with Ballista
pub fn create_datafusion_context() -> ExecutionContext {
// remove Repartition rule because that isn't supported yet
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
Arc::new(CoalesceBatches::new()),
Arc::new(AddCoalescePartitionsExec::new()),
];
let config = ExecutionConfig::new()
.with_concurrency(1)
.with_repartition_joins(false)
.with_repartition_aggregations(false)
.with_physical_optimizer_rules(rules);
let config = ExecutionConfig::new().with_concurrency(2); // TODO: this is hack to enable partitioned joins
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the idea here for later? I guess the repartitioning needs to be applied with concurrency=1 too to avoid inefficient plans?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecutionContext::with_config(config)
}

Expand Down
106 changes: 38 additions & 68 deletions ballista/rust/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,11 @@ use ballista_core::{
execution_plans::{ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec},
serde::scheduler::PartitionLocation,
};
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
use datafusion::execution::context::ExecutionContext;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::HashJoinExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::windows::WindowAggExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
use log::info;

type PartialQueryStageResult = (Arc<dyn ExecutionPlan>, Vec<Arc<ShuffleWriterExec>>);
Expand Down Expand Up @@ -71,13 +67,18 @@ impl DistributedPlanner {
info!("planning query stages");
let (new_plan, mut stages) =
self.plan_query_stages_internal(job_id, execution_plan)?;
stages.push(create_query_stage(job_id, self.next_stage_id(), new_plan)?);
stages.push(create_shuffle_writer(
job_id,
self.next_stage_id(),
new_plan,
None,
)?);
Ok(stages)
}

/// Returns a potentially modified version of the input execution_plan along with the resulting query stages.
/// This function is needed because the input execution_plan might need to be modified, but it might not hold a
/// compelte query stage (its parent might also belong to the same stage)
/// complete query stage (its parent might also belong to the same stage)
fn plan_query_stages_internal(
&mut self,
job_id: &str,
Expand All @@ -98,58 +99,44 @@ impl DistributedPlanner {
}

if let Some(adapter) = execution_plan.as_any().downcast_ref::<DfTableAdapter>() {
// remove Repartition rule because that isn't supported yet
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
Arc::new(CoalesceBatches::new()),
Arc::new(AddCoalescePartitionsExec::new()),
];
let config = ExecutionConfig::new().with_physical_optimizer_rules(rules);
let ctx = ExecutionContext::with_config(config);
let ctx = ExecutionContext::new();
Ok((ctx.create_physical_plan(&adapter.logical_plan)?, stages))
} else if let Some(merge) = execution_plan
} else if let Some(coalesce) = execution_plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
{
let query_stage = create_query_stage(
let query_stage = create_shuffle_writer(
job_id,
self.next_stage_id(),
merge.children()[0].clone(),
coalesce.children()[0].clone(),
None,
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
vec![query_stage.stage_id()],
query_stage.schema(),
query_stage.output_partitioning().partition_count(),
));
stages.push(query_stage);
Ok((merge.with_new_children(vec![unresolved_shuffle])?, stages))
} else if let Some(agg) =
execution_plan.as_any().downcast_ref::<HashAggregateExec>()
Ok((
coalesce.with_new_children(vec![unresolved_shuffle])?,
stages,
))
} else if let Some(repart) =
execution_plan.as_any().downcast_ref::<RepartitionExec>()
{
//TODO should insert query stages in more generic way based on partitioning metadata
// and not specifically for this operator
match agg.mode() {
AggregateMode::Final | AggregateMode::FinalPartitioned => {
let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
for child in &children {
let new_stage = create_query_stage(
job_id,
self.next_stage_id(),
child.clone(),
)?;
new_children.push(Arc::new(UnresolvedShuffleExec::new(
vec![new_stage.stage_id()],
new_stage.schema().clone(),
new_stage.output_partitioning().partition_count(),
)));
stages.push(new_stage);
}
Ok((agg.with_new_children(new_children)?, stages))
}
AggregateMode::Partial => Ok((agg.with_new_children(children)?, stages)),
}
} else if let Some(join) = execution_plan.as_any().downcast_ref::<HashJoinExec>()
{
Ok((join.with_new_children(children)?, stages))
let query_stage = create_shuffle_writer(
job_id,
self.next_stage_id(),
repart.children()[0].clone(),
Some(repart.partitioning().to_owned()),
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
vec![query_stage.stage_id()],
query_stage.schema(),
query_stage.output_partitioning().partition_count(),
));
stages.push(query_stage);
Ok((unresolved_shuffle, stages))
} else if let Some(window) =
execution_plan.as_any().downcast_ref::<WindowAggExec>()
{
Expand All @@ -158,25 +145,7 @@ impl DistributedPlanner {
window
)))
} else {
// TODO check for compatible partitioning schema, not just count
if execution_plan.output_partitioning().partition_count()
!= children[0].output_partitioning().partition_count()
{
let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
for child in &children {
let new_stage =
create_query_stage(job_id, self.next_stage_id(), child.clone())?;
new_children.push(Arc::new(UnresolvedShuffleExec::new(
vec![new_stage.stage_id()],
new_stage.schema().clone(),
new_stage.output_partitioning().partition_count(),
)));
stages.push(new_stage);
}
Ok((execution_plan.with_new_children(new_children)?, stages))
} else {
Ok((execution_plan.with_new_children(children)?, stages))
}
Ok((execution_plan.with_new_children(children)?, stages))
}
}

Expand Down Expand Up @@ -224,17 +193,18 @@ pub fn remove_unresolved_shuffles(
Ok(stage.with_new_children(new_children)?)
}

fn create_query_stage(
fn create_shuffle_writer(
job_id: &str,
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
partitioning: Option<Partitioning>,
) -> Result<Arc<ShuffleWriterExec>> {
Ok(Arc::new(ShuffleWriterExec::try_new(
job_id.to_owned(),
stage_id,
plan,
"".to_owned(), // executor will decide on the work_dir path
None,
partitioning,
)?))
}

Expand Down
18 changes: 2 additions & 16 deletions ballista/rust/scheduler/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,18 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use ballista_core::error::Result;

use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
use datafusion::execution::context::ExecutionContext;
use datafusion::physical_plan::csv::CsvReadOptions;

pub const TPCH_TABLES: &[&str] = &[
"part", "supplier", "partsupp", "customer", "orders", "lineitem", "nation", "region",
];

pub fn datafusion_test_context(path: &str) -> Result<ExecutionContext> {
// remove Repartition rule because that isn't supported yet
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
Arc::new(AddCoalescePartitionsExec::new()),
Arc::new(CoalesceBatches::new()),
];
let config = ExecutionConfig::new()
.with_physical_optimizer_rules(rules)
.with_repartition_aggregations(false);
let mut ctx = ExecutionContext::with_config(config);

let mut ctx = ExecutionContext::new();
for table in TPCH_TABLES {
let schema = get_tpch_schema(table);
let options = CsvReadOptions::new()
Expand Down