From a16fa6a626ae598a50e2e6e791ceb760c3dd7ae2 Mon Sep 17 00:00:00 2001 From: Jefffrey Date: Mon, 8 Apr 2024 20:45:27 +1000 Subject: [PATCH 01/12] Refactor physical create_initial_plan to construct bottom up --- datafusion/core/src/physical_planner.rs | 990 ++++++++++++++---------- 1 file changed, 567 insertions(+), 423 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index c25523c5ae33..abb11c5cf010 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -17,6 +17,7 @@ //! Planner for [`LogicalPlan`] to [`ExecutionPlan`] +use std::borrow::Cow; use std::collections::HashMap; use std::fmt::Write; use std::sync::Arc; @@ -35,12 +36,11 @@ use crate::error::{DataFusionError, Result}; use crate::execution::context::{ExecutionProps, SessionState}; use crate::logical_expr::utils::generate_sort_key; use crate::logical_expr::{ - Aggregate, EmptyRelation, Join, Projection, Sort, SubqueryAlias, TableScan, Unnest, - Window, + Aggregate, EmptyRelation, Join, Projection, Sort, TableScan, Unnest, Window, }; use crate::logical_expr::{ - CrossJoin, Expr, LogicalPlan, Partitioning as LogicalPartitioning, PlanType, - Repartition, Union, UserDefinedLogicalNode, + Expr, LogicalPlan, Partitioning as LogicalPartitioning, PlanType, Repartition, Union, + UserDefinedLogicalNode, }; use crate::logical_expr::{Limit, Values}; use crate::physical_expr::{create_physical_expr, create_physical_exprs}; @@ -87,8 +87,8 @@ use datafusion_expr::expr::{ use datafusion_expr::expr_rewriter::unnormalize_cols; use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary; use datafusion_expr::{ - DescribeTable, DmlStatement, RecursiveQuery, ScalarFunctionDefinition, - StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp, + DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery, + ScalarFunctionDefinition, StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp, }; use datafusion_physical_expr::expressions::Literal; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; @@ -97,8 +97,6 @@ use datafusion_sql::utils::window_expr_common_partition_keys; use async_trait::async_trait; use datafusion_common::config::FormatOptions; use datafusion_physical_expr::LexOrdering; -use futures::future::BoxFuture; -use futures::{FutureExt, StreamExt, TryStreamExt}; use itertools::{multiunzip, Itertools}; use log::{debug, trace}; use sqlparser::ast::NullTreatment; @@ -496,59 +494,29 @@ impl DefaultPhysicalPlanner { Self { extension_planners } } - /// Create a physical plans for multiple logical plans. - /// - /// This is the same as [`create_initial_plan`](Self::create_initial_plan) but runs the planning concurrently. - /// - /// The result order is the same as the input order. - fn create_initial_plan_multi<'a>( - &'a self, - logical_plans: impl IntoIterator + Send + 'a, - session_state: &'a SessionState, - ) -> BoxFuture<'a, Result>>> { - async move { - // First build futures with as little references as possible, then performing some stream magic. - // Otherwise rustc bails out w/: - // - // error: higher-ranked lifetime error - // ... - // note: could not prove `[async block@...]: std::marker::Send` - let futures = logical_plans - .into_iter() - .enumerate() - .map(|(idx, lp)| async move { - let plan = self.create_initial_plan(lp, session_state).await?; - Ok((idx, plan)) as Result<_> - }) - .collect::>(); - - let mut physical_plans = futures::stream::iter(futures) - .buffer_unordered( - session_state - .config_options() - .execution - .planning_concurrency, - ) - .try_collect::)>>() - .await?; - physical_plans.sort_by_key(|(idx, _plan)| *idx); - let physical_plans = physical_plans - .into_iter() - .map(|(_idx, plan)| plan) - .collect::>(); - Ok(physical_plans) - } - .boxed() - } - /// Create a physical plan from a logical plan - fn create_initial_plan<'a>( - &'a self, - logical_plan: &'a LogicalPlan, - session_state: &'a SessionState, - ) -> BoxFuture<'a, Result>> { - async move { - let exec_plan: Result> = match logical_plan { + async fn create_initial_plan( + &self, + logical_plan: &LogicalPlan, + session_state: &SessionState, + ) -> Result> { + // DFS the tree to flatten it into a Vec. + // This will allow us to build the Physical Plan from the leaves up + // to avoid recursion, and also to make it easier to build a valid + // Physical Plan from the start and not rely on some intermediate + // representation (since parents need to know their children at + // construction time). + let mut flat_tree = vec![]; + let mut dfs_visit_stack = vec![logical_plan]; + while let Some(visit) = dfs_visit_stack.pop() { + dfs_visit_stack.extend(visit.inputs()); + flat_tree.push(visit); + } + + let mut children = ChildrenStack::new(); + while let Some(node) = flat_tree.pop() { + let exec_node: Arc = match node { + // Leaves (no children) LogicalPlan::TableScan(TableScan { source, projection, @@ -561,16 +529,55 @@ impl DefaultPhysicalPlanner { // doesn't know (nor should care) how the relation was // referred to in the query let filters = unnormalize_cols(filters.iter().cloned()); - source.scan(session_state, projection.as_ref(), &filters, *fetch).await + source + .scan(session_state, projection.as_ref(), &filters, *fetch) + .await? } - LogicalPlan::Copy(CopyTo{ + LogicalPlan::Values(Values { values, schema }) => { + let exec_schema = schema.as_ref().to_owned().into(); + let exprs = values + .iter() + .map(|row| { + row.iter() + .map(|expr| { + self.create_physical_expr(expr, schema, session_state) + }) + .collect::>>>() + }) + .collect::>>()?; + let value_exec = + ValuesExec::try_new(SchemaRef::new(exec_schema), exprs)?; + Arc::new(value_exec) + } + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema, + }) => Arc::new(EmptyExec::new(SchemaRef::new( + schema.as_ref().to_owned().into(), + ))), + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: true, + schema, + }) => Arc::new(PlaceholderRowExec::new(SchemaRef::new( + schema.as_ref().to_owned().into(), + ))), + LogicalPlan::DescribeTable(DescribeTable { + schema, + output_schema, + }) => { + let output_schema: Schema = output_schema.as_ref().into(); + self.plan_describe(schema.clone(), Arc::new(output_schema))? + } + + // 1 Child + LogicalPlan::Copy(CopyTo { input, output_url, format_options, partition_by, - options: source_option_tuples + options: source_option_tuples, }) => { - let input_exec = self.create_initial_plan(input, session_state).await?; + let input_exec = children.pop()?; let parsed_url = ListingTableUrl::parse(output_url)?; let object_store_url = parsed_url.object_store(); @@ -579,7 +586,8 @@ impl DefaultPhysicalPlanner { // Note: the DataType passed here is ignored for the purposes of writing and inferred instead // from the schema of the RecordBatch being written. This allows COPY statements to specify only // the column name rather than column name + explicit data type. - let table_partition_cols = partition_by.iter() + let table_partition_cols = partition_by + .iter() .map(|s| (s.to_string(), arrow_schema::DataType::Null)) .collect::>(); @@ -597,95 +605,83 @@ impl DefaultPhysicalPlanner { FormatOptions::CSV(options) => { table_options.csv = options.clone(); table_options.set_file_format(FileType::CSV); - table_options.alter_with_string_hash_map(source_option_tuples)?; + table_options + .alter_with_string_hash_map(source_option_tuples)?; Arc::new(CsvFormat::default().with_options(table_options.csv)) - }, + } FormatOptions::JSON(options) => { table_options.json = options.clone(); table_options.set_file_format(FileType::JSON); - table_options.alter_with_string_hash_map(source_option_tuples)?; - Arc::new(JsonFormat::default().with_options(table_options.json)) - }, + table_options + .alter_with_string_hash_map(source_option_tuples)?; + Arc::new( + JsonFormat::default().with_options(table_options.json), + ) + } #[cfg(feature = "parquet")] FormatOptions::PARQUET(options) => { table_options.parquet = options.clone(); table_options.set_file_format(FileType::PARQUET); - table_options.alter_with_string_hash_map(source_option_tuples)?; - Arc::new(ParquetFormat::default().with_options(table_options.parquet)) - }, - FormatOptions::AVRO => Arc::new(AvroFormat {} ), + table_options + .alter_with_string_hash_map(source_option_tuples)?; + Arc::new( + ParquetFormat::default() + .with_options(table_options.parquet), + ) + } + FormatOptions::AVRO => Arc::new(AvroFormat {}), FormatOptions::ARROW => Arc::new(ArrowFormat {}), }; - sink_format.create_writer_physical_plan(input_exec, session_state, config, None).await + sink_format + .create_writer_physical_plan( + input_exec, + session_state, + config, + None, + ) + .await? } LogicalPlan::Dml(DmlStatement { table_name, op: WriteOp::InsertInto, - input, .. }) => { let name = table_name.table(); let schema = session_state.schema_for_ref(table_name.clone())?; if let Some(provider) = schema.table(name).await? { - let input_exec = self.create_initial_plan(input, session_state).await?; - provider.insert_into(session_state, input_exec, false).await + let input_exec = children.pop()?; + provider + .insert_into(session_state, input_exec, false) + .await? } else { - return exec_err!( - "Table '{table_name}' does not exist" - ); + return exec_err!("Table '{table_name}' does not exist"); } } LogicalPlan::Dml(DmlStatement { table_name, op: WriteOp::InsertOverwrite, - input, .. }) => { let name = table_name.table(); let schema = session_state.schema_for_ref(table_name.clone())?; if let Some(provider) = schema.table(name).await? { - let input_exec = self.create_initial_plan(input, session_state).await?; - provider.insert_into(session_state, input_exec, true).await + let input_exec = children.pop()?; + provider + .insert_into(session_state, input_exec, true) + .await? } else { - return exec_err!( - "Table '{table_name}' does not exist" - ); + return exec_err!("Table '{table_name}' does not exist"); } } - LogicalPlan::Values(Values { - values, - schema, - }) => { - let exec_schema = schema.as_ref().to_owned().into(); - let exprs = values.iter() - .map(|row| { - row.iter().map(|expr| { - self.create_physical_expr( - expr, - schema, - session_state, - ) - }) - .collect::>>>() - }) - .collect::>>()?; - let value_exec = ValuesExec::try_new( - SchemaRef::new(exec_schema), - exprs, - )?; - Ok(Arc::new(value_exec)) - } LogicalPlan::Window(Window { input, window_expr, .. }) => { if window_expr.is_empty() { - return internal_err!( - "Impossibly got empty window expression" - ); + return internal_err!("Impossibly got empty window expression"); } - let input_exec = self.create_initial_plan(input, session_state).await?; + let input_exec = children.pop()?; // at this moment we are guaranteed by the logical planner // to have all the window_expr to have equal sort key @@ -695,8 +691,7 @@ impl DefaultPhysicalPlanner { && session_state.config().target_partitions() > 1 && session_state.config().repartition_window_functions(); - let physical_partition_keys = if can_repartition - { + let physical_partition_keys = if can_repartition { partition_keys .iter() .map(|e| { @@ -712,18 +707,19 @@ impl DefaultPhysicalPlanner { }; let get_sort_keys = |expr: &Expr| match expr { - Expr::WindowFunction(WindowFunction{ + Expr::WindowFunction(WindowFunction { ref partition_by, ref order_by, .. }) => generate_sort_key(partition_by, order_by), - Expr::Alias(Alias{expr,..}) => { + Expr::Alias(Alias { expr, .. }) => { // Convert &Box to &T match &**expr { - Expr::WindowFunction(WindowFunction{ + Expr::WindowFunction(WindowFunction { ref partition_by, ref order_by, - ..}) => generate_sort_key(partition_by, order_by), + .. + }) => generate_sort_key(partition_by, order_by), _ => unreachable!(), } } @@ -739,7 +735,7 @@ impl DefaultPhysicalPlanner { ); } - let logical_schema = logical_plan.schema(); + let logical_schema = node.schema(); let window_expr = window_expr .iter() .map(|e| { @@ -751,12 +747,11 @@ impl DefaultPhysicalPlanner { }) .collect::>>()?; - let uses_bounded_memory = window_expr - .iter() - .all(|e| e.uses_bounded_memory()); + let uses_bounded_memory = + window_expr.iter().all(|e| e.uses_bounded_memory()); // If all window expressions can run with bounded memory, // choose the bounded window variant: - Ok(if uses_bounded_memory { + if uses_bounded_memory { Arc::new(BoundedWindowAggExec::try_new( window_expr, input_exec, @@ -769,7 +764,7 @@ impl DefaultPhysicalPlanner { input_exec, physical_partition_keys, )?) - }) + } } LogicalPlan::Aggregate(Aggregate { input, @@ -778,7 +773,7 @@ impl DefaultPhysicalPlanner { .. }) => { // Initially need to perform the aggregate and then merge the partitions - let input_exec = self.create_initial_plan(input, session_state).await?; + let input_exec = children.pop()?; let physical_input_schema = input_exec.schema(); let logical_input_schema = input.as_ref().schema(); @@ -786,7 +781,8 @@ impl DefaultPhysicalPlanner { group_expr, logical_input_schema, &physical_input_schema, - session_state)?; + session_state, + )?; let agg_filter = aggr_expr .iter() @@ -800,7 +796,8 @@ impl DefaultPhysicalPlanner { }) .collect::>>()?; - let (aggregates, filters, _order_bys) : (Vec<_>, Vec<_>, Vec<_>) = multiunzip(agg_filter); + let (aggregates, filters, _order_bys): (Vec<_>, Vec<_>, Vec<_>) = + multiunzip(agg_filter); let initial_aggr = Arc::new(AggregateExec::try_new( AggregateMode::Partial, @@ -812,7 +809,8 @@ impl DefaultPhysicalPlanner { )?); // update group column indices based on partial aggregate plan evaluation - let final_group: Vec> = initial_aggr.output_group_expr(); + let final_group: Vec> = + initial_aggr.output_group_expr(); let can_repartition = !groups.is_empty() && session_state.config().target_partitions() > 1 @@ -839,92 +837,49 @@ impl DefaultPhysicalPlanner { .iter() .enumerate() .map(|(i, expr)| (expr.clone(), groups.expr()[i].1.clone())) - .collect() + .collect(), ); - Ok(Arc::new(AggregateExec::try_new( + Arc::new(AggregateExec::try_new( next_partition_mode, final_grouping_set, updated_aggregates, filters, initial_aggr, physical_input_schema.clone(), - )?)) - } - LogicalPlan::Projection(Projection { input, expr, .. }) => { - let input_exec = self.create_initial_plan(input, session_state).await?; - let input_schema = input.as_ref().schema(); - - let physical_exprs = expr - .iter() - .map(|e| { - // For projections, SQL planner and logical plan builder may convert user - // provided expressions into logical Column expressions if their results - // are already provided from the input plans. Because we work with - // qualified columns in logical plane, derived columns involve operators or - // functions will contain qualifiers as well. This will result in logical - // columns with names like `SUM(t1.c1)`, `t1.c1 + t1.c2`, etc. - // - // If we run these logical columns through physical_name function, we will - // get physical names with column qualifiers, which violates DataFusion's - // field name semantics. To account for this, we need to derive the - // physical name from physical input instead. - // - // This depends on the invariant that logical schema field index MUST match - // with physical schema field index. - let physical_name = if let Expr::Column(col) = e { - match input_schema.index_of_column(col) { - Ok(idx) => { - // index physical field using logical field index - Ok(input_exec.schema().field(idx).name().to_string()) - } - // logical column is not a derived column, safe to pass along to - // physical_name - Err(_) => physical_name(e), - } - } else { - physical_name(e) - }; - - tuple_err(( - self.create_physical_expr( - e, - input_schema, - session_state, - ), - physical_name, - )) - }) - .collect::>>()?; - - Ok(Arc::new(ProjectionExec::try_new( - physical_exprs, - input_exec, - )?)) + )?) } - LogicalPlan::Filter(filter) => { - let physical_input = self.create_initial_plan(&filter.input, session_state).await?; - let input_dfschema = filter.input.schema(); + LogicalPlan::Projection(Projection { input, expr, .. }) => self + .create_project_physical_exec( + session_state, + children.pop()?, + input, + expr, + )?, + LogicalPlan::Filter(Filter { + predicate, input, .. + }) => { + let physical_input = children.pop()?; + let input_dfschema = input.schema(); let runtime_expr = self.create_physical_expr( - &filter.predicate, + predicate, input_dfschema, session_state, )?; - let selectivity = session_state.config().options().optimizer.default_filter_selectivity; + let selectivity = session_state + .config() + .options() + .optimizer + .default_filter_selectivity; let filter = FilterExec::try_new(runtime_expr, physical_input)?; - Ok(Arc::new(filter.with_default_selectivity(selectivity)?)) - } - LogicalPlan::Union(Union { inputs, schema: _ }) => { - let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?; - - Ok(Arc::new(UnionExec::new(physical_plans))) + Arc::new(filter.with_default_selectivity(selectivity)?) } LogicalPlan::Repartition(Repartition { input, partitioning_scheme, }) => { - let physical_input = self.create_initial_plan(input, session_state).await?; + let physical_input = children.pop()?; let input_dfschema = input.as_ref().schema(); let physical_partitioning = match partitioning_scheme { LogicalPartitioning::RoundRobinBatch(n) => { @@ -947,19 +902,60 @@ impl DefaultPhysicalPlanner { return not_impl_err!("Physical plan does not support DistributeBy partitioning"); } }; - Ok(Arc::new(RepartitionExec::try_new( + Arc::new(RepartitionExec::try_new( physical_input, physical_partitioning, - )?)) + )?) } - LogicalPlan::Sort(Sort { expr, input, fetch, .. }) => { - let physical_input = self.create_initial_plan(input, session_state).await?; + LogicalPlan::Sort(Sort { + expr, input, fetch, .. + }) => { + let physical_input = children.pop()?; let input_dfschema = input.as_ref().schema(); - let sort_expr = create_physical_sort_exprs(expr, input_dfschema, session_state.execution_props())?; - let new_sort = SortExec::new(sort_expr, physical_input) - .with_fetch(*fetch); - Ok(Arc::new(new_sort)) + let sort_expr = create_physical_sort_exprs( + expr, + input_dfschema, + session_state.execution_props(), + )?; + let new_sort = + SortExec::new(sort_expr, physical_input).with_fetch(*fetch); + Arc::new(new_sort) } + LogicalPlan::Subquery(_) => todo!(), + LogicalPlan::SubqueryAlias(_) => children.pop()?, + LogicalPlan::Limit(Limit { skip, fetch, .. }) => { + let input = children.pop()?; + + // GlobalLimitExec requires a single partition for input + let input = if input.output_partitioning().partition_count() == 1 { + input + } else { + // Apply a LocalLimitExec to each partition. The optimizer will also insert + // a CoalescePartitionsExec between the GlobalLimitExec and LocalLimitExec + if let Some(fetch) = fetch { + Arc::new(LocalLimitExec::new(input, *fetch + skip)) + } else { + input + } + }; + + Arc::new(GlobalLimitExec::new(input, *skip, *fetch)) + } + LogicalPlan::Unnest(Unnest { + column, + schema, + options, + .. + }) => { + let input = children.pop()?; + let column_exec = schema + .index_of_column(column) + .map(|idx| Column::new(&column.name, idx))?; + let schema = SchemaRef::new(schema.as_ref().to_owned().into()); + Arc::new(UnnestExec::new(input, column_exec, schema, options.clone())) + } + + // 2 Children LogicalPlan::Join(Join { left, right, @@ -972,40 +968,137 @@ impl DefaultPhysicalPlanner { }) => { let null_equals_null = *null_equals_null; + let (physical_left, physical_right) = children.pop_2()?; + // If join has expression equijoin keys, add physical projection. let has_expr_join_key = keys.iter().any(|(l, r)| { - !(matches!(l, Expr::Column(_)) - && matches!(r, Expr::Column(_))) + !(matches!(l, Expr::Column(_)) && matches!(r, Expr::Column(_))) }); - if has_expr_join_key { - // Logic extracted into a function here as subsequent recursive create_initial_plan() - // call can cause a stack overflow for a large number of joins. - // - // See #9962 and #1047 for detailed explanation. - let join_plan = project_expr_join_keys(keys,left,right,logical_plan,join_schema)?; - return self - .create_initial_plan(&join_plan, session_state) - .await; - } + let (new_logical, physical_left, physical_right) = + if has_expr_join_key { + // TODO: Can we extract this transformation to somewhere before physical plan + // creation? + let (left_keys, right_keys): (Vec<_>, Vec<_>) = + keys.iter().cloned().unzip(); + + let (left, left_col_keys, left_projected) = + wrap_projection_for_join_if_necessary( + &left_keys, + left.as_ref().clone(), + )?; + let (right, right_col_keys, right_projected) = + wrap_projection_for_join_if_necessary( + &right_keys, + right.as_ref().clone(), + )?; + let column_on = (left_col_keys, right_col_keys); + + let left = Arc::new(left); + let right = Arc::new(right); + let new_join = + LogicalPlan::Join(Join::try_new_with_project_input( + node, + left.clone(), + right.clone(), + column_on, + )?); + + // If inputs were projected then create ExecutionPlan for these new + // LogicalPlan nodes. + let physical_left = match (left_projected, left.as_ref()) { + // If left_projected is true we are guaranteed that left is a Projection + ( + true, + LogicalPlan::Projection(Projection { + input, + expr, + .. + }), + ) => self.create_project_physical_exec( + session_state, + physical_left, + input, + expr, + )?, + _ => physical_left, + }; + let physical_right = match (right_projected, right.as_ref()) { + // If right_projected is true we are guaranteed that right is a Projection + ( + true, + LogicalPlan::Projection(Projection { + input, + expr, + .. + }), + ) => self.create_project_physical_exec( + session_state, + physical_right, + input, + expr, + )?, + _ => physical_right, + }; + + // Remove temporary projected columns + if left_projected || right_projected { + let final_join_result = join_schema + .iter() + .map(|(qualifier, field)| { + Expr::Column(datafusion_common::Column::from(( + qualifier, + field.as_ref(), + ))) + }) + .collect::>(); + let projection = + LogicalPlan::Projection(Projection::try_new( + final_join_result, + Arc::new(new_join), + )?); + // LogicalPlan mutated + (Cow::Owned(projection), physical_left, physical_right) + } else { + // LogicalPlan mutated + (Cow::Owned(new_join), physical_left, physical_right) + } + } else { + // LogicalPlan unchanged + (Cow::Borrowed(node), physical_left, physical_right) + }; + + // Retrieving new left/right and join keys (in case plan was mutated above) + let (left, right, keys, new_project) = match new_logical.as_ref() { + LogicalPlan::Projection(Projection { input, expr, .. }) => { + if let LogicalPlan::Join(Join { + left, right, on, .. + }) = input.as_ref() + { + (left, right, on, Some((input, expr))) + } else { + unreachable!() + } + } + LogicalPlan::Join(Join { + left, right, on, .. + }) => (left, right, on, None), + // Should either be the original Join, or Join with a Projection on top + _ => unreachable!(), + }; // All equi-join keys are columns now, create physical join plan - let left_right = self.create_initial_plan_multi([left.as_ref(), right.as_ref()], session_state).await?; - let [physical_left, physical_right]: [Arc; 2] = left_right.try_into().map_err(|_| DataFusionError::Internal("`create_initial_plan_multi` is broken".to_string()))?; let left_df_schema = left.schema(); let right_df_schema = right.schema(); let execution_props = session_state.execution_props(); let join_on = keys .iter() .map(|(l, r)| { - let l = create_physical_expr( - l, - left_df_schema, - execution_props - )?; + let l = + create_physical_expr(l, left_df_schema, execution_props)?; let r = create_physical_expr( r, right_df_schema, - execution_props + execution_props, )?; Ok((l, r)) }) @@ -1017,47 +1110,70 @@ impl DefaultPhysicalPlanner { let cols = expr.to_columns()?; // Collect left & right field indices, the field indices are sorted in ascending order - let left_field_indices = cols.iter() + let left_field_indices = cols + .iter() .filter_map(|c| match left_df_schema.index_of_column(c) { Ok(idx) => Some(idx), _ => None, - }).sorted() + }) + .sorted() .collect::>(); - let right_field_indices = cols.iter() - .filter_map(|c| match right_df_schema.index_of_column(c) { - Ok(idx) => Some(idx), - _ => None, - }).sorted() + let right_field_indices = cols + .iter() + .filter_map(|c| { + match right_df_schema.index_of_column(c) { + Ok(idx) => Some(idx), + _ => None, + } + }) + .sorted() .collect::>(); // Collect DFFields and Fields required for intermediate schemas - let (filter_df_fields, filter_fields): (Vec<_>, Vec<_>) = left_field_indices.clone() + let (filter_df_fields, filter_fields): (Vec<_>, Vec<_>) = + left_field_indices + .clone() + .into_iter() + .map(|i| { + ( + left_df_schema.qualified_field(i), + physical_left.schema().field(i).clone(), + ) + }) + .chain(right_field_indices.clone().into_iter().map( + |i| { + ( + right_df_schema.qualified_field(i), + physical_right.schema().field(i).clone(), + ) + }, + )) + .unzip(); + let filter_df_fields = filter_df_fields .into_iter() - .map(|i| ( - left_df_schema.qualified_field(i), - physical_left.schema().field(i).clone(), - )) - .chain( - right_field_indices.clone() - .into_iter() - .map(|i| ( - right_df_schema.qualified_field(i), - physical_right.schema().field(i).clone(), - )) - ) - .unzip(); - let filter_df_fields = filter_df_fields.into_iter().map(|(qualifier, field)| (qualifier.cloned(), Arc::new(field.clone()))).collect(); + .map(|(qualifier, field)| { + (qualifier.cloned(), Arc::new(field.clone())) + }) + .collect(); // Construct intermediate schemas used for filtering data and // convert logical expression to physical according to filter schema - let filter_df_schema = DFSchema::new_with_metadata(filter_df_fields, HashMap::new())?; - let filter_schema = Schema::new_with_metadata(filter_fields, HashMap::new()); + let filter_df_schema = DFSchema::new_with_metadata( + filter_df_fields, + HashMap::new(), + )?; + let filter_schema = + Schema::new_with_metadata(filter_fields, HashMap::new()); let filter_expr = create_physical_expr( expr, &filter_df_schema, session_state.execution_props(), )?; - let column_indices = join_utils::JoinFilter::build_column_indices(left_field_indices, right_field_indices); + let column_indices = + join_utils::JoinFilter::build_column_indices( + left_field_indices, + right_field_indices, + ); Some(join_utils::JoinFilter::new( filter_expr, @@ -1065,20 +1181,21 @@ impl DefaultPhysicalPlanner { filter_schema, )) } - _ => None + _ => None, }; - let prefer_hash_join = session_state.config_options().optimizer.prefer_hash_join; + let prefer_hash_join = + session_state.config_options().optimizer.prefer_hash_join; - if join_on.is_empty() { + let join: Arc = if join_on.is_empty() { // there is no equal join condition, use the nested loop join // TODO optimize the plan, and use the config of `target_partitions` and `repartition_joins` - Ok(Arc::new(NestedLoopJoinExec::try_new( + Arc::new(NestedLoopJoinExec::try_new( physical_left, physical_right, join_filter, join_type, - )?)) + )?) } else if session_state.config().target_partitions() > 1 && session_state.config().repartition_joins() && !prefer_hash_join @@ -1087,7 +1204,7 @@ impl DefaultPhysicalPlanner { // Sort-Merge join support currently is experimental let join_on_len = join_on.len(); - Ok(Arc::new(SortMergeJoinExec::try_new( + Arc::new(SortMergeJoinExec::try_new( physical_left, physical_right, join_on, @@ -1095,134 +1212,77 @@ impl DefaultPhysicalPlanner { *join_type, vec![SortOptions::default(); join_on_len], null_equals_null, - )?)) + )?) } else if session_state.config().target_partitions() > 1 && session_state.config().repartition_joins() - && prefer_hash_join { - let partition_mode = { + && prefer_hash_join + { + let partition_mode = { if session_state.config().collect_statistics() { PartitionMode::Auto } else { PartitionMode::Partitioned } - }; - Ok(Arc::new(HashJoinExec::try_new( + }; + Arc::new(HashJoinExec::try_new( physical_left, physical_right, join_on, join_filter, join_type, - None, + None, partition_mode, null_equals_null, - )?)) + )?) } else { - Ok(Arc::new(HashJoinExec::try_new( + Arc::new(HashJoinExec::try_new( physical_left, physical_right, join_on, join_filter, join_type, - None, + None, PartitionMode::CollectLeft, null_equals_null, - )?)) - } - } - LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => { - let left_right = self.create_initial_plan_multi([left.as_ref(), right.as_ref()], session_state).await?; - let [left, right]: [Arc; 2] = left_right.try_into().map_err(|_| DataFusionError::Internal("`create_initial_plan_multi` is broken".to_string()))?; - Ok(Arc::new(CrossJoinExec::new(left, right))) - } - LogicalPlan::Subquery(_) => todo!(), - LogicalPlan::EmptyRelation(EmptyRelation { - produce_one_row: false, - schema, - }) => Ok(Arc::new(EmptyExec::new( - SchemaRef::new(schema.as_ref().to_owned().into()), - ))), - LogicalPlan::EmptyRelation(EmptyRelation { - produce_one_row: true, - schema, - }) => Ok(Arc::new(PlaceholderRowExec::new( - SchemaRef::new(schema.as_ref().to_owned().into()), - ))), - LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => { - self.create_initial_plan(input, session_state).await - } - LogicalPlan::Limit(Limit { input, skip, fetch, .. }) => { - let input = self.create_initial_plan(input, session_state).await?; - - // GlobalLimitExec requires a single partition for input - let input = if input.output_partitioning().partition_count() == 1 { - input - } else { - // Apply a LocalLimitExec to each partition. The optimizer will also insert - // a CoalescePartitionsExec between the GlobalLimitExec and LocalLimitExec - if let Some(fetch) = fetch { - Arc::new(LocalLimitExec::new(input, *fetch + skip)) - } else { - input - } + )?) }; - Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch))) - } - LogicalPlan::Unnest(Unnest { input, column, schema, options }) => { - let input = self.create_initial_plan(input, session_state).await?; - let column_exec = schema.index_of_column(column) - .map(|idx| Column::new(&column.name, idx))?; - let schema = SchemaRef::new(schema.as_ref().to_owned().into()); - Ok(Arc::new(UnnestExec::new(input, column_exec, schema, options.clone()))) - } - LogicalPlan::Ddl(ddl) => { - // There is no default plan for DDl statements -- - // it must be handled at a higher level (so that - // the appropriate table can be registered with - // the context) - let name = ddl.name(); - not_impl_err!( - "Unsupported logical plan: {name}" - ) - } - LogicalPlan::Prepare(_) => { - // There is no default plan for "PREPARE" -- it must be - // handled at a higher level (so that the appropriate - // statement can be prepared) - not_impl_err!( - "Unsupported logical plan: Prepare" - ) - } - LogicalPlan::Dml(dml) => { - // DataFusion is a read-only query engine, but also a library, so consumers may implement this - not_impl_err!( - "Unsupported logical plan: Dml({0})", dml.op - ) + // If plan was mutated previously then need to create the ExecutionPlan + // for the new Projection that was applied on top. + if let Some((input, expr)) = new_project { + self.create_project_physical_exec( + session_state, + join, + input, + expr, + )? + } else { + join + } } - LogicalPlan::Statement(statement) => { - // DataFusion is a read-only query engine, but also a library, so consumers may implement this - let name = statement.name(); - not_impl_err!( - "Unsupported logical plan: Statement({name})" - ) + LogicalPlan::CrossJoin(_) => { + let (left, right) = children.pop_2()?; + Arc::new(CrossJoinExec::new(left, right)) } - LogicalPlan::DescribeTable(DescribeTable { schema, output_schema}) => { - let output_schema: Schema = output_schema.as_ref().into(); - self.plan_describe(schema.clone(), Arc::new(output_schema)) + LogicalPlan::RecursiveQuery(RecursiveQuery { + name, is_distinct, .. + }) => { + let (static_term, recursive_term) = children.pop_2()?; + Arc::new(RecursiveQueryExec::try_new( + name.clone(), + static_term, + recursive_term, + *is_distinct, + )?) } - LogicalPlan::Explain(_) => internal_err!( - "Unsupported logical plan: Explain must be root of the plan" - ), - LogicalPlan::Distinct(_) => { - internal_err!( - "Unsupported logical plan: Distinct should be replaced to Aggregate" - ) + + // N Children + LogicalPlan::Union(Union { inputs, .. }) => { + let physical_plans = children.pop_n(inputs.len())?; + Arc::new(UnionExec::new(physical_plans)) } - LogicalPlan::Analyze(_) => internal_err!( - "Unsupported logical plan: Analyze must be root of the plan" - ), - LogicalPlan::Extension(e) => { - let physical_inputs = self.create_initial_plan_multi(e.node.inputs(), session_state).await?; + LogicalPlan::Extension(Extension { node }) => { + let physical_inputs = children.pop_n(node.inputs().len())?; let mut maybe_plan = None; for planner in &self.extension_planners { @@ -1230,42 +1290,82 @@ impl DefaultPhysicalPlanner { break; } - let logical_input = e.node.inputs(); - maybe_plan = planner.plan_extension( - self, - e.node.as_ref(), - &logical_input, - &physical_inputs, - session_state, - ).await?; + let logical_input = node.inputs(); + maybe_plan = planner + .plan_extension( + self, + node.as_ref(), + &logical_input, + &physical_inputs, + session_state, + ) + .await?; } let plan = match maybe_plan { Some(v) => Ok(v), - _ => plan_err!("No installed planner was able to convert the custom node to an execution plan: {:?}", e.node) + _ => plan_err!("No installed planner was able to convert the custom node to an execution plan: {:?}", node) }?; // Ensure the ExecutionPlan's schema matches the // declared logical schema to catch and warn about // logic errors when creating user defined plans. - if !e.node.schema().matches_arrow_schema(&plan.schema()) { - plan_err!( + if !node.schema().matches_arrow_schema(&plan.schema()) { + return plan_err!( "Extension planner for {:?} created an ExecutionPlan with mismatched schema. \ LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}", - e.node, e.node.schema(), plan.schema() - ) + node, node.schema(), plan.schema() + ); } else { - Ok(plan) + plan } } - LogicalPlan::RecursiveQuery(RecursiveQuery { name, static_term, recursive_term, is_distinct,.. }) => { - let static_term = self.create_initial_plan(static_term, session_state).await?; - let recursive_term = self.create_initial_plan(recursive_term, session_state).await?; - Ok(Arc::new(RecursiveQueryExec::try_new(name.clone(), static_term, recursive_term, *is_distinct)?)) + + // Other + LogicalPlan::Statement(statement) => { + // DataFusion is a read-only query engine, but also a library, so consumers may implement this + let name = statement.name(); + return not_impl_err!("Unsupported logical plan: Statement({name})"); + } + LogicalPlan::Prepare(_) => { + // There is no default plan for "PREPARE" -- it must be + // handled at a higher level (so that the appropriate + // statement can be prepared) + return not_impl_err!("Unsupported logical plan: Prepare"); + } + LogicalPlan::Dml(dml) => { + // DataFusion is a read-only query engine, but also a library, so consumers may implement this + return not_impl_err!("Unsupported logical plan: Dml({0})", dml.op); + } + LogicalPlan::Ddl(ddl) => { + // There is no default plan for DDl statements -- + // it must be handled at a higher level (so that + // the appropriate table can be registered with + // the context) + let name = ddl.name(); + return not_impl_err!("Unsupported logical plan: {name}"); + } + LogicalPlan::Explain(_) => { + return internal_err!( + "Unsupported logical plan: Explain must be root of the plan" + ) + } + LogicalPlan::Distinct(_) => { + return internal_err!( + "Unsupported logical plan: Distinct should be replaced to Aggregate" + ) + } + LogicalPlan::Analyze(_) => { + return internal_err!( + "Unsupported logical plan: Analyze must be root of the plan" + ) } }; - exec_plan - }.boxed() + children.push(exec_node); + } + // Should be no dangling children + debug_assert_eq!(children.0.len(), 1); + children.pop() } fn create_grouping_physical_expr( @@ -1320,6 +1420,45 @@ impl DefaultPhysicalPlanner { } } +/// Thin wrapper to make stack operations easier (e.g. popping with +/// error instead of unwrap()). +struct ChildrenStack(Vec>); + +impl ChildrenStack { + fn new() -> Self { + Self(vec![]) + } + + fn push(&mut self, node: Arc) { + self.0.push(node); + } + + fn pop(&mut self) -> Result> { + if let Some(node) = self.0.pop() { + Ok(node) + } else { + // Ideally this can never occur for a valid LogicalPlan + internal_err!("Invalid state when creating physical plan from logical plan") + } + } + + fn pop_2(&mut self) -> Result<(Arc, Arc)> { + let right = self.pop()?; + let left = self.pop()?; + Ok((left, right)) + } + + fn pop_n(&mut self, n: usize) -> Result>> { + if n > self.0.len() { + // Ideally this can never occur for a valid LogicalPlan + internal_err!("Invalid state when creating physical plan from logical plan") + } else { + let at = self.0.len() - n; + Ok(self.0.split_off(at)) + } + } +} + /// Expand and align a GROUPING SET expression. /// (see ) /// @@ -1941,6 +2080,59 @@ impl DefaultPhysicalPlanner { let mem_exec = MemoryExec::try_new(&partitions, schema, projection)?; Ok(Arc::new(mem_exec)) } + + fn create_project_physical_exec( + &self, + session_state: &SessionState, + input_exec: Arc, + input: &Arc, + expr: &[Expr], + ) -> Result> { + let input_schema = input.as_ref().schema(); + + let physical_exprs = expr + .iter() + .map(|e| { + // For projections, SQL planner and logical plan builder may convert user + // provided expressions into logical Column expressions if their results + // are already provided from the input plans. Because we work with + // qualified columns in logical plane, derived columns involve operators or + // functions will contain qualifiers as well. This will result in logical + // columns with names like `SUM(t1.c1)`, `t1.c1 + t1.c2`, etc. + // + // If we run these logical columns through physical_name function, we will + // get physical names with column qualifiers, which violates DataFusion's + // field name semantics. To account for this, we need to derive the + // physical name from physical input instead. + // + // This depends on the invariant that logical schema field index MUST match + // with physical schema field index. + let physical_name = if let Expr::Column(col) = e { + match input_schema.index_of_column(col) { + Ok(idx) => { + // index physical field using logical field index + Ok(input_exec.schema().field(idx).name().to_string()) + } + // logical column is not a derived column, safe to pass along to + // physical_name + Err(_) => physical_name(e), + } + } else { + physical_name(e) + }; + + tuple_err(( + self.create_physical_expr(e, input_schema, session_state), + physical_name, + )) + }) + .collect::>>()?; + + Ok(Arc::new(ProjectionExec::try_new( + physical_exprs, + input_exec, + )?)) + } } fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { @@ -1952,54 +2144,6 @@ fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { } } -/// Adding physical projection to join if has expression equijoin keys. -fn project_expr_join_keys( - keys: &[(Expr, Expr)], - left: &Arc, - right: &Arc, - logical_plan: &LogicalPlan, - join_schema: &Arc, -) -> Result { - let left_keys = keys.iter().map(|(l, _r)| l).cloned().collect::>(); - let right_keys = keys.iter().map(|(_l, r)| r).cloned().collect::>(); - let (left, right, column_on, added_project) = { - let (left, left_col_keys, left_projected) = - wrap_projection_for_join_if_necessary( - left_keys.as_slice(), - left.as_ref().clone(), - )?; - let (right, right_col_keys, right_projected) = - wrap_projection_for_join_if_necessary(&right_keys, right.as_ref().clone())?; - ( - left, - right, - (left_col_keys, right_col_keys), - left_projected || right_projected, - ) - }; - - let join_plan = LogicalPlan::Join(Join::try_new_with_project_input( - logical_plan, - Arc::new(left), - Arc::new(right), - column_on, - )?); - - // Remove temporary projected columns - if added_project { - let final_join_result = join_schema - .iter() - .map(|(qualifier, field)| { - Expr::Column(datafusion_common::Column::from((qualifier, field.as_ref()))) - }) - .collect::>(); - let projection = Projection::try_new(final_join_result, Arc::new(join_plan))?; - Ok(LogicalPlan::Projection(projection)) - } else { - Ok(join_plan) - } -} - #[cfg(test)] mod tests { use std::any::Any; From c9148f7ae2773f517fa9ac8da096ad5f6b3eac68 Mon Sep 17 00:00:00 2001 From: Jefffrey Date: Tue, 9 Apr 2024 20:00:49 +1000 Subject: [PATCH 02/12] Refactor node mapping into separate function --- datafusion/core/src/physical_planner.rs | 1560 +++++++++++------------ 1 file changed, 763 insertions(+), 797 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index abb11c5cf010..3de98045cc2f 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -39,7 +39,7 @@ use crate::logical_expr::{ Aggregate, EmptyRelation, Join, Projection, Sort, TableScan, Unnest, Window, }; use crate::logical_expr::{ - Expr, LogicalPlan, Partitioning as LogicalPartitioning, PlanType, Repartition, Union, + Expr, LogicalPlan, Partitioning as LogicalPartitioning, PlanType, Repartition, UserDefinedLogicalNode, }; use crate::logical_expr::{Limit, Values}; @@ -515,857 +515,829 @@ impl DefaultPhysicalPlanner { let mut children = ChildrenStack::new(); while let Some(node) = flat_tree.pop() { - let exec_node: Arc = match node { - // Leaves (no children) - LogicalPlan::TableScan(TableScan { - source, - projection, - filters, - fetch, - .. - }) => { - let source = source_as_provider(source)?; - // Remove all qualifiers from the scan as the provider - // doesn't know (nor should care) how the relation was - // referred to in the query - let filters = unnormalize_cols(filters.iter().cloned()); - source - .scan(session_state, projection.as_ref(), &filters, *fetch) + let children_nodes = children.pop_n(node.inputs().len())?; + let exec_node = self + .map_logical_node_to_physical(node, session_state, children_nodes) + .await?; + children.push(exec_node); + } + debug_assert_eq!( + children.0.len(), + 1, + "Should be no dangling children after physical planning" + ); + children.pop() + } + + async fn map_logical_node_to_physical( + &self, + node: &LogicalPlan, + session_state: &SessionState, + mut children: Vec>, + ) -> Result> { + let exec_node: Arc = match node { + // Leaves (no children) + LogicalPlan::TableScan(TableScan { + source, + projection, + filters, + fetch, + .. + }) => { + let source = source_as_provider(source)?; + // Remove all qualifiers from the scan as the provider + // doesn't know (nor should care) how the relation was + // referred to in the query + let filters = unnormalize_cols(filters.iter().cloned()); + source + .scan(session_state, projection.as_ref(), &filters, *fetch) + .await? + } + LogicalPlan::Values(Values { values, schema }) => { + let exec_schema = schema.as_ref().to_owned().into(); + let exprs = values + .iter() + .map(|row| { + row.iter() + .map(|expr| { + self.create_physical_expr(expr, schema, session_state) + }) + .collect::>>>() + }) + .collect::>>()?; + let value_exec = ValuesExec::try_new(SchemaRef::new(exec_schema), exprs)?; + Arc::new(value_exec) + } + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema, + }) => Arc::new(EmptyExec::new(SchemaRef::new( + schema.as_ref().to_owned().into(), + ))), + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: true, + schema, + }) => Arc::new(PlaceholderRowExec::new(SchemaRef::new( + schema.as_ref().to_owned().into(), + ))), + LogicalPlan::DescribeTable(DescribeTable { + schema, + output_schema, + }) => { + let output_schema: Schema = output_schema.as_ref().into(); + self.plan_describe(schema.clone(), Arc::new(output_schema))? + } + + // 1 Child + LogicalPlan::Copy(CopyTo { + input, + output_url, + format_options, + partition_by, + options: source_option_tuples, + }) => { + let input_exec = children.pop().unwrap(); + let parsed_url = ListingTableUrl::parse(output_url)?; + let object_store_url = parsed_url.object_store(); + + let schema: Schema = (**input.schema()).clone().into(); + + // Note: the DataType passed here is ignored for the purposes of writing and inferred instead + // from the schema of the RecordBatch being written. This allows COPY statements to specify only + // the column name rather than column name + explicit data type. + let table_partition_cols = partition_by + .iter() + .map(|s| (s.to_string(), arrow_schema::DataType::Null)) + .collect::>(); + + // Set file sink related options + let config = FileSinkConfig { + object_store_url, + table_paths: vec![parsed_url], + file_groups: vec![], + output_schema: Arc::new(schema), + table_partition_cols, + overwrite: false, + }; + let mut table_options = session_state.default_table_options(); + let sink_format: Arc = match format_options { + FormatOptions::CSV(options) => { + table_options.csv = options.clone(); + table_options.set_file_format(FileType::CSV); + table_options.alter_with_string_hash_map(source_option_tuples)?; + Arc::new(CsvFormat::default().with_options(table_options.csv)) + } + FormatOptions::JSON(options) => { + table_options.json = options.clone(); + table_options.set_file_format(FileType::JSON); + table_options.alter_with_string_hash_map(source_option_tuples)?; + Arc::new(JsonFormat::default().with_options(table_options.json)) + } + #[cfg(feature = "parquet")] + FormatOptions::PARQUET(options) => { + table_options.parquet = options.clone(); + table_options.set_file_format(FileType::PARQUET); + table_options.alter_with_string_hash_map(source_option_tuples)?; + Arc::new( + ParquetFormat::default().with_options(table_options.parquet), + ) + } + FormatOptions::AVRO => Arc::new(AvroFormat {}), + FormatOptions::ARROW => Arc::new(ArrowFormat {}), + }; + + sink_format + .create_writer_physical_plan(input_exec, session_state, config, None) + .await? + } + LogicalPlan::Dml(DmlStatement { + table_name, + op: WriteOp::InsertInto, + .. + }) => { + let name = table_name.table(); + let schema = session_state.schema_for_ref(table_name.clone())?; + if let Some(provider) = schema.table(name).await? { + let input_exec = children.pop().unwrap(); + provider + .insert_into(session_state, input_exec, false) .await? + } else { + return exec_err!("Table '{table_name}' does not exist"); } - LogicalPlan::Values(Values { values, schema }) => { - let exec_schema = schema.as_ref().to_owned().into(); - let exprs = values - .iter() - .map(|row| { - row.iter() - .map(|expr| { - self.create_physical_expr(expr, schema, session_state) - }) - .collect::>>>() - }) - .collect::>>()?; - let value_exec = - ValuesExec::try_new(SchemaRef::new(exec_schema), exprs)?; - Arc::new(value_exec) + } + LogicalPlan::Dml(DmlStatement { + table_name, + op: WriteOp::InsertOverwrite, + .. + }) => { + let name = table_name.table(); + let schema = session_state.schema_for_ref(table_name.clone())?; + if let Some(provider) = schema.table(name).await? { + let input_exec = children.pop().unwrap(); + provider + .insert_into(session_state, input_exec, true) + .await? + } else { + return exec_err!("Table '{table_name}' does not exist"); } - LogicalPlan::EmptyRelation(EmptyRelation { - produce_one_row: false, - schema, - }) => Arc::new(EmptyExec::new(SchemaRef::new( - schema.as_ref().to_owned().into(), - ))), - LogicalPlan::EmptyRelation(EmptyRelation { - produce_one_row: true, - schema, - }) => Arc::new(PlaceholderRowExec::new(SchemaRef::new( - schema.as_ref().to_owned().into(), - ))), - LogicalPlan::DescribeTable(DescribeTable { - schema, - output_schema, - }) => { - let output_schema: Schema = output_schema.as_ref().into(); - self.plan_describe(schema.clone(), Arc::new(output_schema))? + } + LogicalPlan::Window(Window { + input, window_expr, .. + }) => { + if window_expr.is_empty() { + return internal_err!("Impossibly got empty window expression"); } - // 1 Child - LogicalPlan::Copy(CopyTo { - input, - output_url, - format_options, - partition_by, - options: source_option_tuples, - }) => { - let input_exec = children.pop()?; - let parsed_url = ListingTableUrl::parse(output_url)?; - let object_store_url = parsed_url.object_store(); - - let schema: Schema = (**input.schema()).clone().into(); - - // Note: the DataType passed here is ignored for the purposes of writing and inferred instead - // from the schema of the RecordBatch being written. This allows COPY statements to specify only - // the column name rather than column name + explicit data type. - let table_partition_cols = partition_by + let input_exec = children.pop().unwrap(); + + // at this moment we are guaranteed by the logical planner + // to have all the window_expr to have equal sort key + let partition_keys = window_expr_common_partition_keys(window_expr)?; + + let can_repartition = !partition_keys.is_empty() + && session_state.config().target_partitions() > 1 + && session_state.config().repartition_window_functions(); + + let physical_partition_keys = if can_repartition { + partition_keys .iter() - .map(|s| (s.to_string(), arrow_schema::DataType::Null)) - .collect::>(); - - // Set file sink related options - let config = FileSinkConfig { - object_store_url, - table_paths: vec![parsed_url], - file_groups: vec![], - output_schema: Arc::new(schema), - table_partition_cols, - overwrite: false, - }; - let mut table_options = session_state.default_table_options(); - let sink_format: Arc = match format_options { - FormatOptions::CSV(options) => { - table_options.csv = options.clone(); - table_options.set_file_format(FileType::CSV); - table_options - .alter_with_string_hash_map(source_option_tuples)?; - Arc::new(CsvFormat::default().with_options(table_options.csv)) - } - FormatOptions::JSON(options) => { - table_options.json = options.clone(); - table_options.set_file_format(FileType::JSON); - table_options - .alter_with_string_hash_map(source_option_tuples)?; - Arc::new( - JsonFormat::default().with_options(table_options.json), - ) - } - #[cfg(feature = "parquet")] - FormatOptions::PARQUET(options) => { - table_options.parquet = options.clone(); - table_options.set_file_format(FileType::PARQUET); - table_options - .alter_with_string_hash_map(source_option_tuples)?; - Arc::new( - ParquetFormat::default() - .with_options(table_options.parquet), - ) - } - FormatOptions::AVRO => Arc::new(AvroFormat {}), - FormatOptions::ARROW => Arc::new(ArrowFormat {}), - }; + .map(|e| { + self.create_physical_expr(e, input.schema(), session_state) + }) + .collect::>>>()? + } else { + vec![] + }; - sink_format - .create_writer_physical_plan( - input_exec, - session_state, - config, - None, - ) - .await? - } - LogicalPlan::Dml(DmlStatement { - table_name, - op: WriteOp::InsertInto, - .. - }) => { - let name = table_name.table(); - let schema = session_state.schema_for_ref(table_name.clone())?; - if let Some(provider) = schema.table(name).await? { - let input_exec = children.pop()?; - provider - .insert_into(session_state, input_exec, false) - .await? - } else { - return exec_err!("Table '{table_name}' does not exist"); - } - } - LogicalPlan::Dml(DmlStatement { - table_name, - op: WriteOp::InsertOverwrite, - .. - }) => { - let name = table_name.table(); - let schema = session_state.schema_for_ref(table_name.clone())?; - if let Some(provider) = schema.table(name).await? { - let input_exec = children.pop()?; - provider - .insert_into(session_state, input_exec, true) - .await? - } else { - return exec_err!("Table '{table_name}' does not exist"); + let get_sort_keys = |expr: &Expr| match expr { + Expr::WindowFunction(WindowFunction { + ref partition_by, + ref order_by, + .. + }) => generate_sort_key(partition_by, order_by), + Expr::Alias(Alias { expr, .. }) => { + // Convert &Box to &T + match &**expr { + Expr::WindowFunction(WindowFunction { + ref partition_by, + ref order_by, + .. + }) => generate_sort_key(partition_by, order_by), + _ => unreachable!(), + } } + _ => unreachable!(), + }; + let sort_keys = get_sort_keys(&window_expr[0])?; + if window_expr.len() > 1 { + debug_assert!( + window_expr[1..] + .iter() + .all(|expr| get_sort_keys(expr).unwrap() == sort_keys), + "all window expressions shall have the same sort keys, as guaranteed by logical planning" + ); } - LogicalPlan::Window(Window { - input, window_expr, .. - }) => { - if window_expr.is_empty() { - return internal_err!("Impossibly got empty window expression"); - } - let input_exec = children.pop()?; + let logical_schema = node.schema(); + let window_expr = window_expr + .iter() + .map(|e| { + create_window_expr( + e, + logical_schema, + session_state.execution_props(), + ) + }) + .collect::>>()?; + + let uses_bounded_memory = + window_expr.iter().all(|e| e.uses_bounded_memory()); + // If all window expressions can run with bounded memory, + // choose the bounded window variant: + if uses_bounded_memory { + Arc::new(BoundedWindowAggExec::try_new( + window_expr, + input_exec, + physical_partition_keys, + InputOrderMode::Sorted, + )?) + } else { + Arc::new(WindowAggExec::try_new( + window_expr, + input_exec, + physical_partition_keys, + )?) + } + } + LogicalPlan::Aggregate(Aggregate { + input, + group_expr, + aggr_expr, + .. + }) => { + // Initially need to perform the aggregate and then merge the partitions + let input_exec = children.pop().unwrap(); + let physical_input_schema = input_exec.schema(); + let logical_input_schema = input.as_ref().schema(); + + let groups = self.create_grouping_physical_expr( + group_expr, + logical_input_schema, + &physical_input_schema, + session_state, + )?; - // at this moment we are guaranteed by the logical planner - // to have all the window_expr to have equal sort key - let partition_keys = window_expr_common_partition_keys(window_expr)?; + let agg_filter = aggr_expr + .iter() + .map(|e| { + create_aggregate_expr_and_maybe_filter( + e, + logical_input_schema, + &physical_input_schema, + session_state.execution_props(), + ) + }) + .collect::>>()?; - let can_repartition = !partition_keys.is_empty() - && session_state.config().target_partitions() > 1 - && session_state.config().repartition_window_functions(); + let (aggregates, filters, _order_bys): (Vec<_>, Vec<_>, Vec<_>) = + multiunzip(agg_filter); + + let initial_aggr = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + groups.clone(), + aggregates.clone(), + filters.clone(), + input_exec, + physical_input_schema.clone(), + )?); + + // update group column indices based on partial aggregate plan evaluation + let final_group: Vec> = + initial_aggr.output_group_expr(); + + let can_repartition = !groups.is_empty() + && session_state.config().target_partitions() > 1 + && session_state.config().repartition_aggregations(); + + // Some aggregators may be modified during initialization for + // optimization purposes. For example, a FIRST_VALUE may turn + // into a LAST_VALUE with the reverse ordering requirement. + // To reflect such changes to subsequent stages, use the updated + // `AggregateExpr`/`PhysicalSortExpr` objects. + let updated_aggregates = initial_aggr.aggr_expr().to_vec(); + + let next_partition_mode = if can_repartition { + // construct a second aggregation with 'AggregateMode::FinalPartitioned' + AggregateMode::FinalPartitioned + } else { + // construct a second aggregation, keeping the final column name equal to the + // first aggregation and the expressions corresponding to the respective aggregate + AggregateMode::Final + }; - let physical_partition_keys = if can_repartition { - partition_keys + let final_grouping_set = PhysicalGroupBy::new_single( + final_group + .iter() + .enumerate() + .map(|(i, expr)| (expr.clone(), groups.expr()[i].1.clone())) + .collect(), + ); + + Arc::new(AggregateExec::try_new( + next_partition_mode, + final_grouping_set, + updated_aggregates, + filters, + initial_aggr, + physical_input_schema.clone(), + )?) + } + LogicalPlan::Projection(Projection { input, expr, .. }) => self + .create_project_physical_exec( + session_state, + children.pop().unwrap(), + input, + expr, + )?, + LogicalPlan::Filter(Filter { + predicate, input, .. + }) => { + let physical_input = children.pop().unwrap(); + let input_dfschema = input.schema(); + + let runtime_expr = + self.create_physical_expr(predicate, input_dfschema, session_state)?; + let selectivity = session_state + .config() + .options() + .optimizer + .default_filter_selectivity; + let filter = FilterExec::try_new(runtime_expr, physical_input)?; + Arc::new(filter.with_default_selectivity(selectivity)?) + } + LogicalPlan::Repartition(Repartition { + input, + partitioning_scheme, + }) => { + let physical_input = children.pop().unwrap(); + let input_dfschema = input.as_ref().schema(); + let physical_partitioning = match partitioning_scheme { + LogicalPartitioning::RoundRobinBatch(n) => { + Partitioning::RoundRobinBatch(*n) + } + LogicalPartitioning::Hash(expr, n) => { + let runtime_expr = expr .iter() .map(|e| { self.create_physical_expr( e, - input.schema(), + input_dfschema, session_state, ) }) - .collect::>>>()? - } else { - vec![] - }; - - let get_sort_keys = |expr: &Expr| match expr { - Expr::WindowFunction(WindowFunction { - ref partition_by, - ref order_by, - .. - }) => generate_sort_key(partition_by, order_by), - Expr::Alias(Alias { expr, .. }) => { - // Convert &Box to &T - match &**expr { - Expr::WindowFunction(WindowFunction { - ref partition_by, - ref order_by, - .. - }) => generate_sort_key(partition_by, order_by), - _ => unreachable!(), - } - } - _ => unreachable!(), - }; - let sort_keys = get_sort_keys(&window_expr[0])?; - if window_expr.len() > 1 { - debug_assert!( - window_expr[1..] - .iter() - .all(|expr| get_sort_keys(expr).unwrap() == sort_keys), - "all window expressions shall have the same sort keys, as guaranteed by logical planning" + .collect::>>()?; + Partitioning::Hash(runtime_expr, *n) + } + LogicalPartitioning::DistributeBy(_) => { + return not_impl_err!( + "Physical plan does not support DistributeBy partitioning" ); } - - let logical_schema = node.schema(); - let window_expr = window_expr - .iter() - .map(|e| { - create_window_expr( - e, - logical_schema, - session_state.execution_props(), - ) - }) - .collect::>>()?; - - let uses_bounded_memory = - window_expr.iter().all(|e| e.uses_bounded_memory()); - // If all window expressions can run with bounded memory, - // choose the bounded window variant: - if uses_bounded_memory { - Arc::new(BoundedWindowAggExec::try_new( - window_expr, - input_exec, - physical_partition_keys, - InputOrderMode::Sorted, - )?) + }; + Arc::new(RepartitionExec::try_new( + physical_input, + physical_partitioning, + )?) + } + LogicalPlan::Sort(Sort { + expr, input, fetch, .. + }) => { + let physical_input = children.pop().unwrap(); + let input_dfschema = input.as_ref().schema(); + let sort_expr = create_physical_sort_exprs( + expr, + input_dfschema, + session_state.execution_props(), + )?; + let new_sort = + SortExec::new(sort_expr, physical_input).with_fetch(*fetch); + Arc::new(new_sort) + } + LogicalPlan::Subquery(_) => todo!(), + LogicalPlan::SubqueryAlias(_) => children.pop().unwrap(), + LogicalPlan::Limit(Limit { skip, fetch, .. }) => { + let input = children.pop().unwrap(); + + // GlobalLimitExec requires a single partition for input + let input = if input.output_partitioning().partition_count() == 1 { + input + } else { + // Apply a LocalLimitExec to each partition. The optimizer will also insert + // a CoalescePartitionsExec between the GlobalLimitExec and LocalLimitExec + if let Some(fetch) = fetch { + Arc::new(LocalLimitExec::new(input, *fetch + skip)) } else { - Arc::new(WindowAggExec::try_new( - window_expr, - input_exec, - physical_partition_keys, - )?) + input } - } - LogicalPlan::Aggregate(Aggregate { - input, - group_expr, - aggr_expr, - .. - }) => { - // Initially need to perform the aggregate and then merge the partitions - let input_exec = children.pop()?; - let physical_input_schema = input_exec.schema(); - let logical_input_schema = input.as_ref().schema(); - - let groups = self.create_grouping_physical_expr( - group_expr, - logical_input_schema, - &physical_input_schema, - session_state, - )?; - - let agg_filter = aggr_expr - .iter() - .map(|e| { - create_aggregate_expr_and_maybe_filter( - e, - logical_input_schema, - &physical_input_schema, - session_state.execution_props(), - ) - }) - .collect::>>()?; + }; - let (aggregates, filters, _order_bys): (Vec<_>, Vec<_>, Vec<_>) = - multiunzip(agg_filter); + Arc::new(GlobalLimitExec::new(input, *skip, *fetch)) + } + LogicalPlan::Unnest(Unnest { + column, + schema, + options, + .. + }) => { + let input = children.pop().unwrap(); + let column_exec = schema + .index_of_column(column) + .map(|idx| Column::new(&column.name, idx))?; + let schema = SchemaRef::new(schema.as_ref().to_owned().into()); + Arc::new(UnnestExec::new(input, column_exec, schema, options.clone())) + } - let initial_aggr = Arc::new(AggregateExec::try_new( - AggregateMode::Partial, - groups.clone(), - aggregates.clone(), - filters.clone(), - input_exec, - physical_input_schema.clone(), + // 2 Children + LogicalPlan::Join(Join { + left, + right, + on: keys, + filter, + join_type, + null_equals_null, + schema: join_schema, + .. + }) => { + let null_equals_null = *null_equals_null; + + let physical_right = children.pop().unwrap(); + let physical_left = children.pop().unwrap(); + + // If join has expression equijoin keys, add physical projection. + let has_expr_join_key = keys.iter().any(|(l, r)| { + !(matches!(l, Expr::Column(_)) && matches!(r, Expr::Column(_))) + }); + let (new_logical, physical_left, physical_right) = if has_expr_join_key { + // TODO: Can we extract this transformation to somewhere before physical plan + // creation? + let (left_keys, right_keys): (Vec<_>, Vec<_>) = + keys.iter().cloned().unzip(); + + let (left, left_col_keys, left_projected) = + wrap_projection_for_join_if_necessary( + &left_keys, + left.as_ref().clone(), + )?; + let (right, right_col_keys, right_projected) = + wrap_projection_for_join_if_necessary( + &right_keys, + right.as_ref().clone(), + )?; + let column_on = (left_col_keys, right_col_keys); + + let left = Arc::new(left); + let right = Arc::new(right); + let new_join = LogicalPlan::Join(Join::try_new_with_project_input( + node, + left.clone(), + right.clone(), + column_on, )?); - // update group column indices based on partial aggregate plan evaluation - let final_group: Vec> = - initial_aggr.output_group_expr(); - - let can_repartition = !groups.is_empty() - && session_state.config().target_partitions() > 1 - && session_state.config().repartition_aggregations(); - - // Some aggregators may be modified during initialization for - // optimization purposes. For example, a FIRST_VALUE may turn - // into a LAST_VALUE with the reverse ordering requirement. - // To reflect such changes to subsequent stages, use the updated - // `AggregateExpr`/`PhysicalSortExpr` objects. - let updated_aggregates = initial_aggr.aggr_expr().to_vec(); - - let next_partition_mode = if can_repartition { - // construct a second aggregation with 'AggregateMode::FinalPartitioned' - AggregateMode::FinalPartitioned - } else { - // construct a second aggregation, keeping the final column name equal to the - // first aggregation and the expressions corresponding to the respective aggregate - AggregateMode::Final + // If inputs were projected then create ExecutionPlan for these new + // LogicalPlan nodes. + let physical_left = match (left_projected, left.as_ref()) { + // If left_projected is true we are guaranteed that left is a Projection + ( + true, + LogicalPlan::Projection(Projection { input, expr, .. }), + ) => self.create_project_physical_exec( + session_state, + physical_left, + input, + expr, + )?, + _ => physical_left, }; - - let final_grouping_set = PhysicalGroupBy::new_single( - final_group - .iter() - .enumerate() - .map(|(i, expr)| (expr.clone(), groups.expr()[i].1.clone())) - .collect(), - ); - - Arc::new(AggregateExec::try_new( - next_partition_mode, - final_grouping_set, - updated_aggregates, - filters, - initial_aggr, - physical_input_schema.clone(), - )?) - } - LogicalPlan::Projection(Projection { input, expr, .. }) => self - .create_project_physical_exec( - session_state, - children.pop()?, - input, - expr, - )?, - LogicalPlan::Filter(Filter { - predicate, input, .. - }) => { - let physical_input = children.pop()?; - let input_dfschema = input.schema(); - - let runtime_expr = self.create_physical_expr( - predicate, - input_dfschema, - session_state, - )?; - let selectivity = session_state - .config() - .options() - .optimizer - .default_filter_selectivity; - let filter = FilterExec::try_new(runtime_expr, physical_input)?; - Arc::new(filter.with_default_selectivity(selectivity)?) - } - LogicalPlan::Repartition(Repartition { - input, - partitioning_scheme, - }) => { - let physical_input = children.pop()?; - let input_dfschema = input.as_ref().schema(); - let physical_partitioning = match partitioning_scheme { - LogicalPartitioning::RoundRobinBatch(n) => { - Partitioning::RoundRobinBatch(*n) - } - LogicalPartitioning::Hash(expr, n) => { - let runtime_expr = expr - .iter() - .map(|e| { - self.create_physical_expr( - e, - input_dfschema, - session_state, - ) - }) - .collect::>>()?; - Partitioning::Hash(runtime_expr, *n) - } - LogicalPartitioning::DistributeBy(_) => { - return not_impl_err!("Physical plan does not support DistributeBy partitioning"); - } + let physical_right = match (right_projected, right.as_ref()) { + // If right_projected is true we are guaranteed that right is a Projection + ( + true, + LogicalPlan::Projection(Projection { input, expr, .. }), + ) => self.create_project_physical_exec( + session_state, + physical_right, + input, + expr, + )?, + _ => physical_right, }; - Arc::new(RepartitionExec::try_new( - physical_input, - physical_partitioning, - )?) - } - LogicalPlan::Sort(Sort { - expr, input, fetch, .. - }) => { - let physical_input = children.pop()?; - let input_dfschema = input.as_ref().schema(); - let sort_expr = create_physical_sort_exprs( - expr, - input_dfschema, - session_state.execution_props(), - )?; - let new_sort = - SortExec::new(sort_expr, physical_input).with_fetch(*fetch); - Arc::new(new_sort) - } - LogicalPlan::Subquery(_) => todo!(), - LogicalPlan::SubqueryAlias(_) => children.pop()?, - LogicalPlan::Limit(Limit { skip, fetch, .. }) => { - let input = children.pop()?; - // GlobalLimitExec requires a single partition for input - let input = if input.output_partitioning().partition_count() == 1 { - input + // Remove temporary projected columns + if left_projected || right_projected { + let final_join_result = join_schema + .iter() + .map(|(qualifier, field)| { + Expr::Column(datafusion_common::Column::from(( + qualifier, + field.as_ref(), + ))) + }) + .collect::>(); + let projection = LogicalPlan::Projection(Projection::try_new( + final_join_result, + Arc::new(new_join), + )?); + // LogicalPlan mutated + (Cow::Owned(projection), physical_left, physical_right) } else { - // Apply a LocalLimitExec to each partition. The optimizer will also insert - // a CoalescePartitionsExec between the GlobalLimitExec and LocalLimitExec - if let Some(fetch) = fetch { - Arc::new(LocalLimitExec::new(input, *fetch + skip)) - } else { - input - } - }; - - Arc::new(GlobalLimitExec::new(input, *skip, *fetch)) - } - LogicalPlan::Unnest(Unnest { - column, - schema, - options, - .. - }) => { - let input = children.pop()?; - let column_exec = schema - .index_of_column(column) - .map(|idx| Column::new(&column.name, idx))?; - let schema = SchemaRef::new(schema.as_ref().to_owned().into()); - Arc::new(UnnestExec::new(input, column_exec, schema, options.clone())) - } + // LogicalPlan mutated + (Cow::Owned(new_join), physical_left, physical_right) + } + } else { + // LogicalPlan unchanged + (Cow::Borrowed(node), physical_left, physical_right) + }; - // 2 Children - LogicalPlan::Join(Join { - left, - right, - on: keys, - filter, - join_type, - null_equals_null, - schema: join_schema, - .. - }) => { - let null_equals_null = *null_equals_null; - - let (physical_left, physical_right) = children.pop_2()?; - - // If join has expression equijoin keys, add physical projection. - let has_expr_join_key = keys.iter().any(|(l, r)| { - !(matches!(l, Expr::Column(_)) && matches!(r, Expr::Column(_))) - }); - let (new_logical, physical_left, physical_right) = - if has_expr_join_key { - // TODO: Can we extract this transformation to somewhere before physical plan - // creation? - let (left_keys, right_keys): (Vec<_>, Vec<_>) = - keys.iter().cloned().unzip(); - - let (left, left_col_keys, left_projected) = - wrap_projection_for_join_if_necessary( - &left_keys, - left.as_ref().clone(), - )?; - let (right, right_col_keys, right_projected) = - wrap_projection_for_join_if_necessary( - &right_keys, - right.as_ref().clone(), - )?; - let column_on = (left_col_keys, right_col_keys); - - let left = Arc::new(left); - let right = Arc::new(right); - let new_join = - LogicalPlan::Join(Join::try_new_with_project_input( - node, - left.clone(), - right.clone(), - column_on, - )?); - - // If inputs were projected then create ExecutionPlan for these new - // LogicalPlan nodes. - let physical_left = match (left_projected, left.as_ref()) { - // If left_projected is true we are guaranteed that left is a Projection - ( - true, - LogicalPlan::Projection(Projection { - input, - expr, - .. - }), - ) => self.create_project_physical_exec( - session_state, - physical_left, - input, - expr, - )?, - _ => physical_left, - }; - let physical_right = match (right_projected, right.as_ref()) { - // If right_projected is true we are guaranteed that right is a Projection - ( - true, - LogicalPlan::Projection(Projection { - input, - expr, - .. - }), - ) => self.create_project_physical_exec( - session_state, - physical_right, - input, - expr, - )?, - _ => physical_right, - }; - - // Remove temporary projected columns - if left_projected || right_projected { - let final_join_result = join_schema - .iter() - .map(|(qualifier, field)| { - Expr::Column(datafusion_common::Column::from(( - qualifier, - field.as_ref(), - ))) - }) - .collect::>(); - let projection = - LogicalPlan::Projection(Projection::try_new( - final_join_result, - Arc::new(new_join), - )?); - // LogicalPlan mutated - (Cow::Owned(projection), physical_left, physical_right) - } else { - // LogicalPlan mutated - (Cow::Owned(new_join), physical_left, physical_right) - } + // Retrieving new left/right and join keys (in case plan was mutated above) + let (left, right, keys, new_project) = match new_logical.as_ref() { + LogicalPlan::Projection(Projection { input, expr, .. }) => { + if let LogicalPlan::Join(Join { + left, right, on, .. + }) = input.as_ref() + { + (left, right, on, Some((input, expr))) } else { - // LogicalPlan unchanged - (Cow::Borrowed(node), physical_left, physical_right) - }; - - // Retrieving new left/right and join keys (in case plan was mutated above) - let (left, right, keys, new_project) = match new_logical.as_ref() { - LogicalPlan::Projection(Projection { input, expr, .. }) => { - if let LogicalPlan::Join(Join { - left, right, on, .. - }) = input.as_ref() - { - (left, right, on, Some((input, expr))) - } else { - unreachable!() - } + unreachable!() } - LogicalPlan::Join(Join { - left, right, on, .. - }) => (left, right, on, None), - // Should either be the original Join, or Join with a Projection on top - _ => unreachable!(), - }; + } + LogicalPlan::Join(Join { + left, right, on, .. + }) => (left, right, on, None), + // Should either be the original Join, or Join with a Projection on top + _ => unreachable!(), + }; - // All equi-join keys are columns now, create physical join plan - let left_df_schema = left.schema(); - let right_df_schema = right.schema(); - let execution_props = session_state.execution_props(); - let join_on = keys - .iter() - .map(|(l, r)| { - let l = - create_physical_expr(l, left_df_schema, execution_props)?; - let r = create_physical_expr( - r, - right_df_schema, - execution_props, - )?; - Ok((l, r)) - }) - .collect::>()?; + // All equi-join keys are columns now, create physical join plan + let left_df_schema = left.schema(); + let right_df_schema = right.schema(); + let execution_props = session_state.execution_props(); + let join_on = keys + .iter() + .map(|(l, r)| { + let l = create_physical_expr(l, left_df_schema, execution_props)?; + let r = + create_physical_expr(r, right_df_schema, execution_props)?; + Ok((l, r)) + }) + .collect::>()?; - let join_filter = match filter { - Some(expr) => { - // Extract columns from filter expression and saved in a HashSet - let cols = expr.to_columns()?; + let join_filter = match filter { + Some(expr) => { + // Extract columns from filter expression and saved in a HashSet + let cols = expr.to_columns()?; - // Collect left & right field indices, the field indices are sorted in ascending order - let left_field_indices = cols - .iter() - .filter_map(|c| match left_df_schema.index_of_column(c) { - Ok(idx) => Some(idx), - _ => None, - }) - .sorted() - .collect::>(); - let right_field_indices = cols - .iter() - .filter_map(|c| { - match right_df_schema.index_of_column(c) { - Ok(idx) => Some(idx), - _ => None, - } - }) - .sorted() - .collect::>(); - - // Collect DFFields and Fields required for intermediate schemas - let (filter_df_fields, filter_fields): (Vec<_>, Vec<_>) = - left_field_indices - .clone() - .into_iter() - .map(|i| { - ( - left_df_schema.qualified_field(i), - physical_left.schema().field(i).clone(), - ) - }) - .chain(right_field_indices.clone().into_iter().map( - |i| { - ( - right_df_schema.qualified_field(i), - physical_right.schema().field(i).clone(), - ) - }, - )) - .unzip(); - let filter_df_fields = filter_df_fields + // Collect left & right field indices, the field indices are sorted in ascending order + let left_field_indices = cols + .iter() + .filter_map(|c| match left_df_schema.index_of_column(c) { + Ok(idx) => Some(idx), + _ => None, + }) + .sorted() + .collect::>(); + let right_field_indices = cols + .iter() + .filter_map(|c| match right_df_schema.index_of_column(c) { + Ok(idx) => Some(idx), + _ => None, + }) + .sorted() + .collect::>(); + + // Collect DFFields and Fields required for intermediate schemas + let (filter_df_fields, filter_fields): (Vec<_>, Vec<_>) = + left_field_indices + .clone() .into_iter() - .map(|(qualifier, field)| { - (qualifier.cloned(), Arc::new(field.clone())) + .map(|i| { + ( + left_df_schema.qualified_field(i), + physical_left.schema().field(i).clone(), + ) }) - .collect(); - - // Construct intermediate schemas used for filtering data and - // convert logical expression to physical according to filter schema - let filter_df_schema = DFSchema::new_with_metadata( - filter_df_fields, - HashMap::new(), - )?; - let filter_schema = - Schema::new_with_metadata(filter_fields, HashMap::new()); - let filter_expr = create_physical_expr( - expr, - &filter_df_schema, - session_state.execution_props(), - )?; - let column_indices = - join_utils::JoinFilter::build_column_indices( - left_field_indices, - right_field_indices, - ); - - Some(join_utils::JoinFilter::new( - filter_expr, - column_indices, - filter_schema, - )) - } - _ => None, - }; + .chain(right_field_indices.clone().into_iter().map(|i| { + ( + right_df_schema.qualified_field(i), + physical_right.schema().field(i).clone(), + ) + })) + .unzip(); + let filter_df_fields = filter_df_fields + .into_iter() + .map(|(qualifier, field)| { + (qualifier.cloned(), Arc::new(field.clone())) + }) + .collect(); + + // Construct intermediate schemas used for filtering data and + // convert logical expression to physical according to filter schema + let filter_df_schema = DFSchema::new_with_metadata( + filter_df_fields, + HashMap::new(), + )?; + let filter_schema = + Schema::new_with_metadata(filter_fields, HashMap::new()); + let filter_expr = create_physical_expr( + expr, + &filter_df_schema, + session_state.execution_props(), + )?; + let column_indices = join_utils::JoinFilter::build_column_indices( + left_field_indices, + right_field_indices, + ); - let prefer_hash_join = - session_state.config_options().optimizer.prefer_hash_join; + Some(join_utils::JoinFilter::new( + filter_expr, + column_indices, + filter_schema, + )) + } + _ => None, + }; - let join: Arc = if join_on.is_empty() { - // there is no equal join condition, use the nested loop join - // TODO optimize the plan, and use the config of `target_partitions` and `repartition_joins` - Arc::new(NestedLoopJoinExec::try_new( - physical_left, - physical_right, - join_filter, - join_type, - )?) - } else if session_state.config().target_partitions() > 1 - && session_state.config().repartition_joins() - && !prefer_hash_join - { - // Use SortMergeJoin if hash join is not preferred - // Sort-Merge join support currently is experimental - - let join_on_len = join_on.len(); - Arc::new(SortMergeJoinExec::try_new( - physical_left, - physical_right, - join_on, - join_filter, - *join_type, - vec![SortOptions::default(); join_on_len], - null_equals_null, - )?) - } else if session_state.config().target_partitions() > 1 - && session_state.config().repartition_joins() - && prefer_hash_join - { - let partition_mode = { - if session_state.config().collect_statistics() { - PartitionMode::Auto - } else { - PartitionMode::Partitioned - } - }; - Arc::new(HashJoinExec::try_new( - physical_left, - physical_right, - join_on, - join_filter, - join_type, - None, - partition_mode, - null_equals_null, - )?) - } else { - Arc::new(HashJoinExec::try_new( - physical_left, - physical_right, - join_on, - join_filter, - join_type, - None, - PartitionMode::CollectLeft, - null_equals_null, - )?) + let prefer_hash_join = + session_state.config_options().optimizer.prefer_hash_join; + + let join: Arc = if join_on.is_empty() { + // there is no equal join condition, use the nested loop join + // TODO optimize the plan, and use the config of `target_partitions` and `repartition_joins` + Arc::new(NestedLoopJoinExec::try_new( + physical_left, + physical_right, + join_filter, + join_type, + )?) + } else if session_state.config().target_partitions() > 1 + && session_state.config().repartition_joins() + && !prefer_hash_join + { + // Use SortMergeJoin if hash join is not preferred + // Sort-Merge join support currently is experimental + + let join_on_len = join_on.len(); + Arc::new(SortMergeJoinExec::try_new( + physical_left, + physical_right, + join_on, + join_filter, + *join_type, + vec![SortOptions::default(); join_on_len], + null_equals_null, + )?) + } else if session_state.config().target_partitions() > 1 + && session_state.config().repartition_joins() + && prefer_hash_join + { + let partition_mode = { + if session_state.config().collect_statistics() { + PartitionMode::Auto + } else { + PartitionMode::Partitioned + } }; - - // If plan was mutated previously then need to create the ExecutionPlan - // for the new Projection that was applied on top. - if let Some((input, expr)) = new_project { - self.create_project_physical_exec( - session_state, - join, - input, - expr, - )? - } else { - join - } - } - LogicalPlan::CrossJoin(_) => { - let (left, right) = children.pop_2()?; - Arc::new(CrossJoinExec::new(left, right)) - } - LogicalPlan::RecursiveQuery(RecursiveQuery { - name, is_distinct, .. - }) => { - let (static_term, recursive_term) = children.pop_2()?; - Arc::new(RecursiveQueryExec::try_new( - name.clone(), - static_term, - recursive_term, - *is_distinct, + Arc::new(HashJoinExec::try_new( + physical_left, + physical_right, + join_on, + join_filter, + join_type, + None, + partition_mode, + null_equals_null, )?) - } + } else { + Arc::new(HashJoinExec::try_new( + physical_left, + physical_right, + join_on, + join_filter, + join_type, + None, + PartitionMode::CollectLeft, + null_equals_null, + )?) + }; - // N Children - LogicalPlan::Union(Union { inputs, .. }) => { - let physical_plans = children.pop_n(inputs.len())?; - Arc::new(UnionExec::new(physical_plans)) + // If plan was mutated previously then need to create the ExecutionPlan + // for the new Projection that was applied on top. + if let Some((input, expr)) = new_project { + self.create_project_physical_exec(session_state, join, input, expr)? + } else { + join } - LogicalPlan::Extension(Extension { node }) => { - let physical_inputs = children.pop_n(node.inputs().len())?; - - let mut maybe_plan = None; - for planner in &self.extension_planners { - if maybe_plan.is_some() { - break; - } + } + LogicalPlan::CrossJoin(_) => { + let right = children.pop().unwrap(); + let left = children.pop().unwrap(); + Arc::new(CrossJoinExec::new(left, right)) + } + LogicalPlan::RecursiveQuery(RecursiveQuery { + name, is_distinct, .. + }) => { + let recursive_term = children.pop().unwrap(); + let static_term = children.pop().unwrap(); + Arc::new(RecursiveQueryExec::try_new( + name.clone(), + static_term, + recursive_term, + *is_distinct, + )?) + } - let logical_input = node.inputs(); - maybe_plan = planner - .plan_extension( - self, - node.as_ref(), - &logical_input, - &physical_inputs, - session_state, - ) - .await?; + // N Children + LogicalPlan::Union(_) => Arc::new(UnionExec::new(children)), + LogicalPlan::Extension(Extension { node }) => { + let mut maybe_plan = None; + for planner in &self.extension_planners { + if maybe_plan.is_some() { + break; } - let plan = match maybe_plan { + let logical_input = node.inputs(); + maybe_plan = planner + .plan_extension( + self, + node.as_ref(), + &logical_input, + &children, + session_state, + ) + .await?; + } + + let plan = match maybe_plan { Some(v) => Ok(v), _ => plan_err!("No installed planner was able to convert the custom node to an execution plan: {:?}", node) }?; - // Ensure the ExecutionPlan's schema matches the - // declared logical schema to catch and warn about - // logic errors when creating user defined plans. - if !node.schema().matches_arrow_schema(&plan.schema()) { - return plan_err!( + // Ensure the ExecutionPlan's schema matches the + // declared logical schema to catch and warn about + // logic errors when creating user defined plans. + if !node.schema().matches_arrow_schema(&plan.schema()) { + return plan_err!( "Extension planner for {:?} created an ExecutionPlan with mismatched schema. \ LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}", node, node.schema(), plan.schema() ); - } else { - plan - } + } else { + plan } + } - // Other - LogicalPlan::Statement(statement) => { - // DataFusion is a read-only query engine, but also a library, so consumers may implement this - let name = statement.name(); - return not_impl_err!("Unsupported logical plan: Statement({name})"); - } - LogicalPlan::Prepare(_) => { - // There is no default plan for "PREPARE" -- it must be - // handled at a higher level (so that the appropriate - // statement can be prepared) - return not_impl_err!("Unsupported logical plan: Prepare"); - } - LogicalPlan::Dml(dml) => { - // DataFusion is a read-only query engine, but also a library, so consumers may implement this - return not_impl_err!("Unsupported logical plan: Dml({0})", dml.op); - } - LogicalPlan::Ddl(ddl) => { - // There is no default plan for DDl statements -- - // it must be handled at a higher level (so that - // the appropriate table can be registered with - // the context) - let name = ddl.name(); - return not_impl_err!("Unsupported logical plan: {name}"); - } - LogicalPlan::Explain(_) => { - return internal_err!( - "Unsupported logical plan: Explain must be root of the plan" - ) - } - LogicalPlan::Distinct(_) => { - return internal_err!( + // Other + LogicalPlan::Statement(statement) => { + // DataFusion is a read-only query engine, but also a library, so consumers may implement this + let name = statement.name(); + return not_impl_err!("Unsupported logical plan: Statement({name})"); + } + LogicalPlan::Prepare(_) => { + // There is no default plan for "PREPARE" -- it must be + // handled at a higher level (so that the appropriate + // statement can be prepared) + return not_impl_err!("Unsupported logical plan: Prepare"); + } + LogicalPlan::Dml(dml) => { + // DataFusion is a read-only query engine, but also a library, so consumers may implement this + return not_impl_err!("Unsupported logical plan: Dml({0})", dml.op); + } + LogicalPlan::Ddl(ddl) => { + // There is no default plan for DDl statements -- + // it must be handled at a higher level (so that + // the appropriate table can be registered with + // the context) + let name = ddl.name(); + return not_impl_err!("Unsupported logical plan: {name}"); + } + LogicalPlan::Explain(_) => { + return internal_err!( + "Unsupported logical plan: Explain must be root of the plan" + ) + } + LogicalPlan::Distinct(_) => { + return internal_err!( "Unsupported logical plan: Distinct should be replaced to Aggregate" ) - } - LogicalPlan::Analyze(_) => { - return internal_err!( - "Unsupported logical plan: Analyze must be root of the plan" - ) - } - }; - children.push(exec_node); - } - // Should be no dangling children - debug_assert_eq!(children.0.len(), 1); - children.pop() + } + LogicalPlan::Analyze(_) => { + return internal_err!( + "Unsupported logical plan: Analyze must be root of the plan" + ) + } + }; + Ok(exec_node) } fn create_grouping_physical_expr( @@ -1442,12 +1414,6 @@ impl ChildrenStack { } } - fn pop_2(&mut self) -> Result<(Arc, Arc)> { - let right = self.pop()?; - let left = self.pop()?; - Ok((left, right)) - } - fn pop_n(&mut self, n: usize) -> Result>> { if n > self.0.len() { // Ideally this can never occur for a valid LogicalPlan From fce069fe43076ccc4d68b68390ec884cc4c8723c Mon Sep 17 00:00:00 2001 From: Jefffrey Date: Wed, 10 Apr 2024 07:57:44 +1000 Subject: [PATCH 03/12] Experiment with concurrent bottom up physical planning --- datafusion/core/src/physical_planner.rs | 201 +++++++++++++++++------- 1 file changed, 147 insertions(+), 54 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 3de98045cc2f..07ab7f63d276 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -20,7 +20,7 @@ use std::borrow::Cow; use std::collections::HashMap; use std::fmt::Write; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use crate::datasource::file_format::arrow::ArrowFormat; use crate::datasource::file_format::avro::AvroFormat; @@ -97,6 +97,7 @@ use datafusion_sql::utils::window_expr_common_partition_keys; use async_trait::async_trait; use datafusion_common::config::FormatOptions; use datafusion_physical_expr::LexOrdering; +use futures::{StreamExt, TryStreamExt}; use itertools::{multiunzip, Itertools}; use log::{debug, trace}; use sqlparser::ast::NullTreatment; @@ -483,6 +484,20 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { } } +#[derive(Debug)] +enum NodeStatus { + Ready, + PendingChildren(Mutex)>>), +} + +#[derive(Debug)] +struct LogicalNode<'a> { + node: &'a LogicalPlan, + // None if root + parent_index: Option, + status: NodeStatus, +} + impl DefaultPhysicalPlanner { /// Create a physical planner that uses `extension_planners` to /// plan user-defined logical nodes [`LogicalPlan::Extension`]. @@ -507,26 +522,137 @@ impl DefaultPhysicalPlanner { // representation (since parents need to know their children at // construction time). let mut flat_tree = vec![]; - let mut dfs_visit_stack = vec![logical_plan]; - while let Some(visit) = dfs_visit_stack.pop() { - dfs_visit_stack.extend(visit.inputs()); - flat_tree.push(visit); - } - - let mut children = ChildrenStack::new(); - while let Some(node) = flat_tree.pop() { - let children_nodes = children.pop_n(node.inputs().len())?; - let exec_node = self - .map_logical_node_to_physical(node, session_state, children_nodes) - .await?; - children.push(exec_node); - } - debug_assert_eq!( - children.0.len(), - 1, - "Should be no dangling children after physical planning" - ); - children.pop() + let mut dfs_visit_stack = vec![(None, logical_plan)]; + // Use this to be able to find the leaves to start construction bottom + // up concurrently. + let mut flat_tree_leaf_indices = vec![]; + while let Some((parent_index, visit)) = dfs_visit_stack.pop() { + let current_index = flat_tree.len(); + dfs_visit_stack.extend( + visit + .inputs() + .iter() + .map(|&n| (Some(current_index), n)) + .rev(), + ); + let status = match visit.inputs().len() { + 0 => { + flat_tree_leaf_indices.push(current_index); + NodeStatus::Ready + } + 1 => NodeStatus::Ready, + _ => NodeStatus::PendingChildren(Mutex::new(vec![])), + }; + let node = LogicalNode { + node: visit, + parent_index, + status, + }; + flat_tree.push(node); + } + + let planning_concurrency = session_state + .config_options() + .execution + .planning_concurrency; + // Can never spawn more tasks than leaves in the tree. + // As these tasks must all converge down to the root node. + // Which can only be processed by one task. + let max_concurrency = planning_concurrency.min(flat_tree_leaf_indices.len()); + + // We need a work queue to spawn tasks from + // We have up to n tasks + // A task needs to build up a single lineage + // A task terminates when it has to build a parent with an unresolved child + // Need a way for tasks to know when a parent is now available (all children ready) + // Need a way for tasks to associate parents to child (and also child to parent) + // + // When building up, reach a join point + // Either: + // - not ready (pending children) = finish task -> where to store results? + // - when finish task, look for another leaf. + // if no more leaves, then can't spawn anymore tasks + // because we can't parallelize any further + // since max parallelism = number of leaves + // - ready (children done) = proceed -> where to retrieve children in right order? + // + // So children need to know where their parents are + // So when task finishes its lineage, it will check parent + // See parent ready via a mutex of a vec of tuple (index, processed child) + + // TODO: cleanup comments + let flat_tree = Arc::new(flat_tree); + + let leaf_tasks = flat_tree_leaf_indices + .into_iter() + .map(|index| self.task_helper(index, flat_tree.clone(), session_state)) + .collect::>(); + + let outputs = futures::stream::iter(leaf_tasks) + .buffer_unordered(max_concurrency) + .try_collect::>() + .await?; + let mut outputs = outputs.into_iter().flatten().collect::>(); + assert!(outputs.len() == 1); + let plan = outputs.pop().unwrap(); + Ok(plan) + } + + async fn task_helper<'a>( + &'a self, + leaf_starter_index: usize, + flat_tree: Arc>>, + session_state: &'a SessionState, + ) -> Result>> { + // We always start with a leaf, so can ignore status and pass empty children + let mut l_node = &flat_tree[leaf_starter_index]; + let mut plan = self + .map_logical_node_to_physical(l_node.node, session_state, vec![]) + .await?; + let mut current_index = leaf_starter_index; + while let Some(parent_index) = l_node.parent_index { + l_node = &flat_tree[parent_index]; + match &l_node.status { + NodeStatus::Ready => { + plan = self + .map_logical_node_to_physical( + l_node.node, + session_state, + vec![plan], + ) + .await?; + } + NodeStatus::PendingChildren(children) => { + let mut children = { + let mut children = children.lock().unwrap(); + // Add our contribution to this parent node. + children.push((current_index, plan)); + if children.len() < l_node.node.inputs().len() { + // This node is not ready yet, still pending more children. + // This task is finished forever. + return Ok(None); + } else { + // With our contribution we have enough children. + // We are the only ones building this node now. + children.clone() + } + }; + // Unstable as the indices are guaranteed to be unique. + children.sort_unstable_by_key(|(index, _)| *index); + let children = children.into_iter().map(|(_, plan)| plan).collect(); + + plan = self + .map_logical_node_to_physical( + l_node.node, + session_state, + children, + ) + .await?; + } + } + current_index = parent_index; + } + Ok(Some(plan)) } async fn map_logical_node_to_physical( @@ -1392,39 +1518,6 @@ impl DefaultPhysicalPlanner { } } -/// Thin wrapper to make stack operations easier (e.g. popping with -/// error instead of unwrap()). -struct ChildrenStack(Vec>); - -impl ChildrenStack { - fn new() -> Self { - Self(vec![]) - } - - fn push(&mut self, node: Arc) { - self.0.push(node); - } - - fn pop(&mut self) -> Result> { - if let Some(node) = self.0.pop() { - Ok(node) - } else { - // Ideally this can never occur for a valid LogicalPlan - internal_err!("Invalid state when creating physical plan from logical plan") - } - } - - fn pop_n(&mut self, n: usize) -> Result>> { - if n > self.0.len() { - // Ideally this can never occur for a valid LogicalPlan - internal_err!("Invalid state when creating physical plan from logical plan") - } else { - let at = self.0.len() - n; - Ok(self.0.split_off(at)) - } - } -} - /// Expand and align a GROUPING SET expression. /// (see ) /// From cf594d6e8b9376ab3f9faab955300d19a1e9a89e Mon Sep 17 00:00:00 2001 From: Jefffrey Date: Wed, 10 Apr 2024 19:27:20 +1000 Subject: [PATCH 04/12] Refactoring and comments --- datafusion/core/src/physical_planner.rs | 140 ++++++++++++------------ 1 file changed, 71 insertions(+), 69 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 07ab7f63d276..a071854450ee 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -485,9 +485,15 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { } #[derive(Debug)] -enum NodeStatus { - Ready, - PendingChildren(Mutex)>>), +enum NodeState { + ZeroOrOneChild, + /// If a node has multiple children, we lock it's ready children behind a Mutex + /// such that concurrent tasks are able to append safely, and ultimately + /// a single task will take all the ready children to build the plan. + /// + /// Vec element is (index, plan) where index is needed to order + /// children to ensure consistent order in plan for parent nodes. + TwoOrMoreChildren(Mutex)>>), } #[derive(Debug)] @@ -495,7 +501,7 @@ struct LogicalNode<'a> { node: &'a LogicalPlan, // None if root parent_index: Option, - status: NodeStatus, + state: NodeState, } impl DefaultPhysicalPlanner { @@ -526,78 +532,65 @@ impl DefaultPhysicalPlanner { // Use this to be able to find the leaves to start construction bottom // up concurrently. let mut flat_tree_leaf_indices = vec![]; - while let Some((parent_index, visit)) = dfs_visit_stack.pop() { + while let Some((parent_index, node)) = dfs_visit_stack.pop() { let current_index = flat_tree.len(); - dfs_visit_stack.extend( - visit - .inputs() - .iter() - .map(|&n| (Some(current_index), n)) - .rev(), - ); - let status = match visit.inputs().len() { + // Because of how we extend the visit stack here, we visit the children + // in reverse order of how they appeart, so later we need to reverse + // the order of children when building the nodes. + dfs_visit_stack + .extend(node.inputs().iter().map(|&n| (Some(current_index), n))); + let state = match node.inputs().len() { 0 => { flat_tree_leaf_indices.push(current_index); - NodeStatus::Ready + NodeState::ZeroOrOneChild } - 1 => NodeStatus::Ready, - _ => NodeStatus::PendingChildren(Mutex::new(vec![])), + 1 => NodeState::ZeroOrOneChild, + _ => NodeState::TwoOrMoreChildren(Mutex::new(vec![])), }; let node = LogicalNode { - node: visit, + node, parent_index, - status, + state, }; flat_tree.push(node); } + let flat_tree = Arc::new(flat_tree); let planning_concurrency = session_state .config_options() .execution .planning_concurrency; - // Can never spawn more tasks than leaves in the tree. - // As these tasks must all converge down to the root node. - // Which can only be processed by one task. + // Can never spawn more tasks than leaves in the tree, as these tasks must + // all converge down to the root node, which can only be processed by a + // single task. let max_concurrency = planning_concurrency.min(flat_tree_leaf_indices.len()); - // We need a work queue to spawn tasks from - // We have up to n tasks - // A task needs to build up a single lineage - // A task terminates when it has to build a parent with an unresolved child - // Need a way for tasks to know when a parent is now available (all children ready) - // Need a way for tasks to associate parents to child (and also child to parent) - // - // When building up, reach a join point - // Either: - // - not ready (pending children) = finish task -> where to store results? - // - when finish task, look for another leaf. - // if no more leaves, then can't spawn anymore tasks - // because we can't parallelize any further - // since max parallelism = number of leaves - // - ready (children done) = proceed -> where to retrieve children in right order? - // - // So children need to know where their parents are - // So when task finishes its lineage, it will check parent - // See parent ready via a mutex of a vec of tuple (index, processed child) - - // TODO: cleanup comments - let flat_tree = Arc::new(flat_tree); - - let leaf_tasks = flat_tree_leaf_indices + // Spawning tasks which will traverse leaf up to the root. + let tasks = flat_tree_leaf_indices .into_iter() .map(|index| self.task_helper(index, flat_tree.clone(), session_state)) .collect::>(); - - let outputs = futures::stream::iter(leaf_tasks) + let mut outputs = futures::stream::iter(tasks) .buffer_unordered(max_concurrency) .try_collect::>() - .await?; - let mut outputs = outputs.into_iter().flatten().collect::>(); - assert!(outputs.len() == 1); + .await? + .into_iter() + .flatten() + .collect::>(); + // Ideally this never happens if we have a valid LogicalPlan tree + assert!( + outputs.len() == 1, + "Invalid physical plan created from logical plan" + ); let plan = outputs.pop().unwrap(); Ok(plan) } + /// These tasks start at a leaf and traverse up the tree towards the root, building + /// an ExecutionPlan as they go. When they reach a node with two or more children, + /// they append their current result (a child of the parent node) to the children + /// vector, and if this is sufficient to create the parent then continues traversing + /// the tree to create nodes. Otherwise, the task terminates. async fn task_helper<'a>( &'a self, leaf_starter_index: usize, @@ -605,60 +598,69 @@ impl DefaultPhysicalPlanner { session_state: &'a SessionState, ) -> Result>> { // We always start with a leaf, so can ignore status and pass empty children - let mut l_node = &flat_tree[leaf_starter_index]; + let mut node = &flat_tree[leaf_starter_index]; let mut plan = self - .map_logical_node_to_physical(l_node.node, session_state, vec![]) + .map_logical_node_to_physical(node.node, session_state, vec![]) .await?; let mut current_index = leaf_starter_index; - while let Some(parent_index) = l_node.parent_index { - l_node = &flat_tree[parent_index]; - match &l_node.status { - NodeStatus::Ready => { + // parent_index is None only for root + while let Some(parent_index) = node.parent_index { + node = &flat_tree[parent_index]; + match &node.state { + NodeState::ZeroOrOneChild => { plan = self .map_logical_node_to_physical( - l_node.node, + node.node, session_state, vec![plan], ) .await?; } - NodeStatus::PendingChildren(children) => { + // See if we have all children to build the node. + NodeState::TwoOrMoreChildren(children) => { let mut children = { let mut children = children.lock().unwrap(); // Add our contribution to this parent node. children.push((current_index, plan)); - if children.len() < l_node.node.inputs().len() { + if children.len() < node.node.inputs().len() { // This node is not ready yet, still pending more children. // This task is finished forever. return Ok(None); } else { - // With our contribution we have enough children. - // We are the only ones building this node now. + // With this task's contribution we have enough children. + // This task is the only one building this node now, and thus + // no other task will need the Mutex for this node. + + // TODO: How to do this without a clone? Take from the inner Mutex? children.clone() } }; - // Unstable as the indices are guaranteed to be unique. - children.sort_unstable_by_key(|(index, _)| *index); - let children = children.into_iter().map(|(_, plan)| plan).collect(); + + // Indices refer to position in flat tree Vec, which means they are + // guaranteed to be unique, hence unstable sort used. + // We reverse sort because of how we visited the node in the initial + // DFS traversal (see above). + children.sort_unstable_by_key(|(index, _)| std::cmp::Reverse(*index)); + let children = + children.iter().map(|(_, plan)| plan.clone()).collect(); plan = self - .map_logical_node_to_physical( - l_node.node, - session_state, - children, - ) + .map_logical_node_to_physical(node.node, session_state, children) .await?; } } current_index = parent_index; } + // Only one task should ever reach this point for a valid LogicalPlan tree. Ok(Some(plan)) } + /// Given a single LogicalPlan node, map it to it's physical ExecutionPlan counterpart. async fn map_logical_node_to_physical( &self, node: &LogicalPlan, session_state: &SessionState, + // TODO: refactor to not use Vec? Wasted for leaves/1 child mut children: Vec>, ) -> Result> { let exec_node: Arc = match node { From aeb6b49bf16ae87cfee9dfb95dd2b536a5094b15 Mon Sep 17 00:00:00 2001 From: Jefffrey Date: Wed, 10 Apr 2024 20:00:58 +1000 Subject: [PATCH 05/12] Remove unnecessary collect --- datafusion/core/src/physical_planner.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index a071854450ee..58c83c13b4d2 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -568,8 +568,7 @@ impl DefaultPhysicalPlanner { // Spawning tasks which will traverse leaf up to the root. let tasks = flat_tree_leaf_indices .into_iter() - .map(|index| self.task_helper(index, flat_tree.clone(), session_state)) - .collect::>(); + .map(|index| self.task_helper(index, flat_tree.clone(), session_state)); let mut outputs = futures::stream::iter(tasks) .buffer_unordered(max_concurrency) .try_collect::>() From daa45f81b8fa9a90b61173495c6cda0c32876536 Mon Sep 17 00:00:00 2001 From: Jefffrey Date: Wed, 10 Apr 2024 21:06:51 +1000 Subject: [PATCH 06/12] Preallocate vector capacity --- datafusion/core/src/physical_planner.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 58c83c13b4d2..209e19d84e28 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -545,7 +545,11 @@ impl DefaultPhysicalPlanner { NodeState::ZeroOrOneChild } 1 => NodeState::ZeroOrOneChild, - _ => NodeState::TwoOrMoreChildren(Mutex::new(vec![])), + _ => { + let ready_children = Vec::with_capacity(node.inputs().len()); + let ready_children = Mutex::new(ready_children); + NodeState::TwoOrMoreChildren(ready_children) + } }; let node = LogicalNode { node, From 3e0a9aaa91a9f65ed0f391610963fdd589156cfa Mon Sep 17 00:00:00 2001 From: Jefffrey Date: Wed, 10 Apr 2024 21:52:25 +1000 Subject: [PATCH 07/12] Remove children.clone() --- datafusion/core/src/physical_planner.rs | 33 +++++++++++++++---------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 209e19d84e28..af202b744d8f 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -484,16 +484,19 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { } } +/// (index, plan) since index is needed to order children to ensure consistent +/// order in parent node plans. +type ExecutionPlansWithIndex = Vec<(usize, Arc)>; + #[derive(Debug)] enum NodeState { ZeroOrOneChild, - /// If a node has multiple children, we lock it's ready children behind a Mutex + /// If a node has multiple children, we lock the ready children behind a Mutex /// such that concurrent tasks are able to append safely, and ultimately /// a single task will take all the ready children to build the plan. /// - /// Vec element is (index, plan) where index is needed to order - /// children to ensure consistent order in plan for parent nodes. - TwoOrMoreChildren(Mutex)>>), + /// Wrapped in an Option to make it easier to take the Vec at the end. + TwoOrMoreChildren(Mutex>), } #[derive(Debug)] @@ -547,7 +550,7 @@ impl DefaultPhysicalPlanner { 1 => NodeState::ZeroOrOneChild, _ => { let ready_children = Vec::with_capacity(node.inputs().len()); - let ready_children = Mutex::new(ready_children); + let ready_children = Mutex::new(Some(ready_children)); NodeState::TwoOrMoreChildren(ready_children) } }; @@ -622,21 +625,25 @@ impl DefaultPhysicalPlanner { // See if we have all children to build the node. NodeState::TwoOrMoreChildren(children) => { let mut children = { - let mut children = children.lock().unwrap(); + let mut guard = children.lock().unwrap(); + // Safe unwrap on option as only the last task reaching this + // node will take the contents (which happens after this line). + let children = guard.as_mut().unwrap(); // Add our contribution to this parent node. children.push((current_index, plan)); if children.len() < node.node.inputs().len() { // This node is not ready yet, still pending more children. // This task is finished forever. return Ok(None); - } else { - // With this task's contribution we have enough children. - // This task is the only one building this node now, and thus - // no other task will need the Mutex for this node. - - // TODO: How to do this without a clone? Take from the inner Mutex? - children.clone() } + + // With this task's contribution we have enough children. + // This task is the only one building this node now, and thus + // no other task will need the Mutex for this node, so take + // all children. + // + // This take is the only place the Option becomes None. + guard.take().unwrap() }; // Indices refer to position in flat tree Vec, which means they are From 53ba4b7afe4352a001bbf1181b1914f5a1b9b48b Mon Sep 17 00:00:00 2001 From: Jefffrey Date: Wed, 10 Apr 2024 22:18:07 +1000 Subject: [PATCH 08/12] Introduce ChildrenContainer enum --- datafusion/core/src/physical_planner.rs | 80 +++++++++++++++++-------- 1 file changed, 56 insertions(+), 24 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index af202b744d8f..a6194005d857 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -499,6 +499,37 @@ enum NodeState { TwoOrMoreChildren(Mutex>), } +/// To avoid needing to pass single child wrapped in a Vec for nodes +/// with only one child. +enum ChildrenContainer { + None, + One(Arc), + Multiple(Vec>), +} + +impl ChildrenContainer { + fn one(self) -> Arc { + match self { + Self::One(p) => p, + _ => unreachable!(), + } + } + + fn two(self) -> [Arc; 2] { + match self { + Self::Multiple(v) => v.try_into().unwrap(), + _ => unreachable!(), + } + } + + fn vec(self) -> Vec> { + match self { + Self::Multiple(v) => v, + _ => unreachable!(), + } + } +} + #[derive(Debug)] struct LogicalNode<'a> { node: &'a LogicalPlan, @@ -606,7 +637,11 @@ impl DefaultPhysicalPlanner { // We always start with a leaf, so can ignore status and pass empty children let mut node = &flat_tree[leaf_starter_index]; let mut plan = self - .map_logical_node_to_physical(node.node, session_state, vec![]) + .map_logical_node_to_physical( + node.node, + session_state, + ChildrenContainer::None, + ) .await?; let mut current_index = leaf_starter_index; // parent_index is None only for root @@ -618,7 +653,7 @@ impl DefaultPhysicalPlanner { .map_logical_node_to_physical( node.node, session_state, - vec![plan], + ChildrenContainer::One(plan), ) .await?; } @@ -653,7 +688,7 @@ impl DefaultPhysicalPlanner { children.sort_unstable_by_key(|(index, _)| std::cmp::Reverse(*index)); let children = children.iter().map(|(_, plan)| plan.clone()).collect(); - + let children = ChildrenContainer::Multiple(children); plan = self .map_logical_node_to_physical(node.node, session_state, children) .await?; @@ -670,8 +705,7 @@ impl DefaultPhysicalPlanner { &self, node: &LogicalPlan, session_state: &SessionState, - // TODO: refactor to not use Vec? Wasted for leaves/1 child - mut children: Vec>, + children: ChildrenContainer, ) -> Result> { let exec_node: Arc = match node { // Leaves (no children) @@ -734,7 +768,7 @@ impl DefaultPhysicalPlanner { partition_by, options: source_option_tuples, }) => { - let input_exec = children.pop().unwrap(); + let input_exec = children.one(); let parsed_url = ListingTableUrl::parse(output_url)?; let object_store_url = parsed_url.object_store(); @@ -796,7 +830,7 @@ impl DefaultPhysicalPlanner { let name = table_name.table(); let schema = session_state.schema_for_ref(table_name.clone())?; if let Some(provider) = schema.table(name).await? { - let input_exec = children.pop().unwrap(); + let input_exec = children.one(); provider .insert_into(session_state, input_exec, false) .await? @@ -812,7 +846,7 @@ impl DefaultPhysicalPlanner { let name = table_name.table(); let schema = session_state.schema_for_ref(table_name.clone())?; if let Some(provider) = schema.table(name).await? { - let input_exec = children.pop().unwrap(); + let input_exec = children.one(); provider .insert_into(session_state, input_exec, true) .await? @@ -827,7 +861,7 @@ impl DefaultPhysicalPlanner { return internal_err!("Impossibly got empty window expression"); } - let input_exec = children.pop().unwrap(); + let input_exec = children.one(); // at this moment we are guaranteed by the logical planner // to have all the window_expr to have equal sort key @@ -915,7 +949,7 @@ impl DefaultPhysicalPlanner { .. }) => { // Initially need to perform the aggregate and then merge the partitions - let input_exec = children.pop().unwrap(); + let input_exec = children.one(); let physical_input_schema = input_exec.schema(); let logical_input_schema = input.as_ref().schema(); @@ -994,14 +1028,14 @@ impl DefaultPhysicalPlanner { LogicalPlan::Projection(Projection { input, expr, .. }) => self .create_project_physical_exec( session_state, - children.pop().unwrap(), + children.one(), input, expr, )?, LogicalPlan::Filter(Filter { predicate, input, .. }) => { - let physical_input = children.pop().unwrap(); + let physical_input = children.one(); let input_dfschema = input.schema(); let runtime_expr = @@ -1018,7 +1052,7 @@ impl DefaultPhysicalPlanner { input, partitioning_scheme, }) => { - let physical_input = children.pop().unwrap(); + let physical_input = children.one(); let input_dfschema = input.as_ref().schema(); let physical_partitioning = match partitioning_scheme { LogicalPartitioning::RoundRobinBatch(n) => { @@ -1051,7 +1085,7 @@ impl DefaultPhysicalPlanner { LogicalPlan::Sort(Sort { expr, input, fetch, .. }) => { - let physical_input = children.pop().unwrap(); + let physical_input = children.one(); let input_dfschema = input.as_ref().schema(); let sort_expr = create_physical_sort_exprs( expr, @@ -1063,9 +1097,9 @@ impl DefaultPhysicalPlanner { Arc::new(new_sort) } LogicalPlan::Subquery(_) => todo!(), - LogicalPlan::SubqueryAlias(_) => children.pop().unwrap(), + LogicalPlan::SubqueryAlias(_) => children.one(), LogicalPlan::Limit(Limit { skip, fetch, .. }) => { - let input = children.pop().unwrap(); + let input = children.one(); // GlobalLimitExec requires a single partition for input let input = if input.output_partitioning().partition_count() == 1 { @@ -1088,7 +1122,7 @@ impl DefaultPhysicalPlanner { options, .. }) => { - let input = children.pop().unwrap(); + let input = children.one(); let column_exec = schema .index_of_column(column) .map(|idx| Column::new(&column.name, idx))?; @@ -1109,8 +1143,7 @@ impl DefaultPhysicalPlanner { }) => { let null_equals_null = *null_equals_null; - let physical_right = children.pop().unwrap(); - let physical_left = children.pop().unwrap(); + let [physical_left, physical_right] = children.two(); // If join has expression equijoin keys, add physical projection. let has_expr_join_key = keys.iter().any(|(l, r)| { @@ -1378,15 +1411,13 @@ impl DefaultPhysicalPlanner { } } LogicalPlan::CrossJoin(_) => { - let right = children.pop().unwrap(); - let left = children.pop().unwrap(); + let [left, right] = children.two(); Arc::new(CrossJoinExec::new(left, right)) } LogicalPlan::RecursiveQuery(RecursiveQuery { name, is_distinct, .. }) => { - let recursive_term = children.pop().unwrap(); - let static_term = children.pop().unwrap(); + let [static_term, recursive_term] = children.two(); Arc::new(RecursiveQueryExec::try_new( name.clone(), static_term, @@ -1396,9 +1427,10 @@ impl DefaultPhysicalPlanner { } // N Children - LogicalPlan::Union(_) => Arc::new(UnionExec::new(children)), + LogicalPlan::Union(_) => Arc::new(UnionExec::new(children.vec())), LogicalPlan::Extension(Extension { node }) => { let mut maybe_plan = None; + let children = children.vec(); for planner in &self.extension_planners { if maybe_plan.is_some() { break; From e1b41a959092962d64d1976a78b2e7fee320ee12 Mon Sep 17 00:00:00 2001 From: Jefffrey Date: Wed, 10 Apr 2024 22:20:24 +1000 Subject: [PATCH 09/12] Formatting --- datafusion/core/src/physical_planner.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index a6194005d857..ed431f9e1ba2 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -683,6 +683,7 @@ impl DefaultPhysicalPlanner { // Indices refer to position in flat tree Vec, which means they are // guaranteed to be unique, hence unstable sort used. + // // We reverse sort because of how we visited the node in the initial // DFS traversal (see above). children.sort_unstable_by_key(|(index, _)| std::cmp::Reverse(*index)); From 69997f0904ae80bc1e042c2e7781e71042ba4ea1 Mon Sep 17 00:00:00 2001 From: Jefffrey Date: Wed, 10 Apr 2024 23:19:33 +1000 Subject: [PATCH 10/12] Fix case where extension may have 0 or 1 children --- datafusion/core/src/physical_planner.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index ed431f9e1ba2..0e33d8370897 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -524,8 +524,9 @@ impl ChildrenContainer { fn vec(self) -> Vec> { match self { + Self::None => vec![], + Self::One(p) => vec![p], Self::Multiple(v) => v, - _ => unreachable!(), } } } From c40e2d9e165b570d028ff63cd749b54acbd4f8ff Mon Sep 17 00:00:00 2001 From: Jefffrey Date: Mon, 15 Apr 2024 21:38:10 +1000 Subject: [PATCH 11/12] Documentation and cleanup unwraps --- datafusion/core/src/physical_planner.rs | 127 ++++++++++++++++-------- datafusion/core/tests/tpcds_planning.rs | 1 - 2 files changed, 84 insertions(+), 44 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 0e33d8370897..4b0cb6002b6d 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -76,7 +76,8 @@ use arrow_array::builder::StringBuilder; use arrow_array::RecordBatch; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::{ - exec_err, internal_err, not_impl_err, plan_err, DFSchema, FileType, ScalarValue, + exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, + FileType, ScalarValue, }; use datafusion_expr::dml::CopyTo; use datafusion_expr::expr::{ @@ -444,6 +445,22 @@ pub trait ExtensionPlanner { /// Default single node physical query planner that converts a /// `LogicalPlan` to an `ExecutionPlan` suitable for execution. +/// +/// This planner will first flatten the `LogicalPlan` tree via a +/// depth first approach, which allows it to identify the leaves +/// of the tree. +/// +/// Tasks are spawned from these leaves and traverse back up the +/// tree towards the root, converting each `LogicalPlan` node it +/// reaches into their equivalent `ExecutionPlan` node. When these +/// tasks reach a common node, they will terminate until the last +/// task reaches the node which will then continue building up the +/// tree. +/// +/// Up to [`planning_concurrency`] tasks are buffered at once to +/// execute concurrently. +/// +/// [`planning_concurrency`]: crate::config::ExecutionOptions::planning_concurrency #[derive(Default)] pub struct DefaultPhysicalPlanner { extension_planners: Vec>, @@ -484,19 +501,23 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { } } -/// (index, plan) since index is needed to order children to ensure consistent -/// order in parent node plans. -type ExecutionPlansWithIndex = Vec<(usize, Arc)>; +#[derive(Debug)] +struct ExecutionPlanChild { + /// Index needed to order children of parent to ensure consistency with original + /// `LogicalPlan` + index: usize, + plan: Arc, +} #[derive(Debug)] enum NodeState { ZeroOrOneChild, - /// If a node has multiple children, we lock the ready children behind a Mutex - /// such that concurrent tasks are able to append safely, and ultimately - /// a single task will take all the ready children to build the plan. + /// Nodes with multiple children will have multiple tasks accessing it, + /// and each task will append their contribution until the last task takes + /// all the children to build the parent node. /// /// Wrapped in an Option to make it easier to take the Vec at the end. - TwoOrMoreChildren(Mutex>), + TwoOrMoreChildren(Mutex>>), } /// To avoid needing to pass single child wrapped in a Vec for nodes @@ -508,17 +529,17 @@ enum ChildrenContainer { } impl ChildrenContainer { - fn one(self) -> Arc { + fn one(self) -> Result> { match self { - Self::One(p) => p, - _ => unreachable!(), + Self::One(p) => Ok(p), + _ => internal_err!("More than one child in ChildrenContainer"), } } - fn two(self) -> [Arc; 2] { + fn two(self) -> Result<[Arc; 2]> { match self { - Self::Multiple(v) => v.try_into().unwrap(), - _ => unreachable!(), + Self::Multiple(v) if v.len() == 2 => Ok(v.try_into().unwrap()), + _ => internal_err!("ChildrenContainer doesn't contain exactly 2 children"), } } @@ -570,7 +591,7 @@ impl DefaultPhysicalPlanner { while let Some((parent_index, node)) = dfs_visit_stack.pop() { let current_index = flat_tree.len(); // Because of how we extend the visit stack here, we visit the children - // in reverse order of how they appeart, so later we need to reverse + // in reverse order of how they appear, so later we need to reverse // the order of children when building the nodes. dfs_visit_stack .extend(node.inputs().iter().map(|&n| (Some(current_index), n))); @@ -616,10 +637,11 @@ impl DefaultPhysicalPlanner { .flatten() .collect::>(); // Ideally this never happens if we have a valid LogicalPlan tree - assert!( - outputs.len() == 1, - "Invalid physical plan created from logical plan" - ); + if outputs.len() != 1 { + return internal_err!( + "Failed to convert LogicalPlan to ExecutionPlan: More than one root detected" + ); + } let plan = outputs.pop().unwrap(); Ok(plan) } @@ -636,7 +658,11 @@ impl DefaultPhysicalPlanner { session_state: &'a SessionState, ) -> Result>> { // We always start with a leaf, so can ignore status and pass empty children - let mut node = &flat_tree[leaf_starter_index]; + let mut node = flat_tree.get(leaf_starter_index).ok_or_else(|| { + internal_datafusion_err!( + "Invalid index whilst creating initial physical plan" + ) + })?; let mut plan = self .map_logical_node_to_physical( node.node, @@ -647,7 +673,11 @@ impl DefaultPhysicalPlanner { let mut current_index = leaf_starter_index; // parent_index is None only for root while let Some(parent_index) = node.parent_index { - node = &flat_tree[parent_index]; + node = flat_tree.get(parent_index).ok_or_else(|| { + internal_datafusion_err!( + "Invalid index whilst creating initial physical plan" + ) + })?; match &node.state { NodeState::ZeroOrOneChild => { plan = self @@ -661,12 +691,22 @@ impl DefaultPhysicalPlanner { // See if we have all children to build the node. NodeState::TwoOrMoreChildren(children) => { let mut children = { - let mut guard = children.lock().unwrap(); + let mut guard = children.lock().map_err(|_| { + internal_datafusion_err!( + "Poisoned mutex protecitng children vec" + ) + })?; // Safe unwrap on option as only the last task reaching this // node will take the contents (which happens after this line). - let children = guard.as_mut().unwrap(); + let children = guard.as_mut().ok_or_else(|| { + internal_datafusion_err!("Children vec is already taken") + })?; // Add our contribution to this parent node. - children.push((current_index, plan)); + // Vec is pre-allocated so no allocation should occur here. + children.push(ExecutionPlanChild { + index: current_index, + plan, + }); if children.len() < node.node.inputs().len() { // This node is not ready yet, still pending more children. // This task is finished forever. @@ -679,7 +719,9 @@ impl DefaultPhysicalPlanner { // all children. // // This take is the only place the Option becomes None. - guard.take().unwrap() + guard.take().ok_or_else(|| { + internal_datafusion_err!("Failed to take children vec") + })? }; // Indices refer to position in flat tree Vec, which means they are @@ -687,9 +729,8 @@ impl DefaultPhysicalPlanner { // // We reverse sort because of how we visited the node in the initial // DFS traversal (see above). - children.sort_unstable_by_key(|(index, _)| std::cmp::Reverse(*index)); - let children = - children.iter().map(|(_, plan)| plan.clone()).collect(); + children.sort_unstable_by_key(|epc| std::cmp::Reverse(epc.index)); + let children = children.into_iter().map(|epc| epc.plan).collect(); let children = ChildrenContainer::Multiple(children); plan = self .map_logical_node_to_physical(node.node, session_state, children) @@ -770,7 +811,7 @@ impl DefaultPhysicalPlanner { partition_by, options: source_option_tuples, }) => { - let input_exec = children.one(); + let input_exec = children.one()?; let parsed_url = ListingTableUrl::parse(output_url)?; let object_store_url = parsed_url.object_store(); @@ -832,7 +873,7 @@ impl DefaultPhysicalPlanner { let name = table_name.table(); let schema = session_state.schema_for_ref(table_name.clone())?; if let Some(provider) = schema.table(name).await? { - let input_exec = children.one(); + let input_exec = children.one()?; provider .insert_into(session_state, input_exec, false) .await? @@ -848,7 +889,7 @@ impl DefaultPhysicalPlanner { let name = table_name.table(); let schema = session_state.schema_for_ref(table_name.clone())?; if let Some(provider) = schema.table(name).await? { - let input_exec = children.one(); + let input_exec = children.one()?; provider .insert_into(session_state, input_exec, true) .await? @@ -863,7 +904,7 @@ impl DefaultPhysicalPlanner { return internal_err!("Impossibly got empty window expression"); } - let input_exec = children.one(); + let input_exec = children.one()?; // at this moment we are guaranteed by the logical planner // to have all the window_expr to have equal sort key @@ -951,7 +992,7 @@ impl DefaultPhysicalPlanner { .. }) => { // Initially need to perform the aggregate and then merge the partitions - let input_exec = children.one(); + let input_exec = children.one()?; let physical_input_schema = input_exec.schema(); let logical_input_schema = input.as_ref().schema(); @@ -1030,14 +1071,14 @@ impl DefaultPhysicalPlanner { LogicalPlan::Projection(Projection { input, expr, .. }) => self .create_project_physical_exec( session_state, - children.one(), + children.one()?, input, expr, )?, LogicalPlan::Filter(Filter { predicate, input, .. }) => { - let physical_input = children.one(); + let physical_input = children.one()?; let input_dfschema = input.schema(); let runtime_expr = @@ -1054,7 +1095,7 @@ impl DefaultPhysicalPlanner { input, partitioning_scheme, }) => { - let physical_input = children.one(); + let physical_input = children.one()?; let input_dfschema = input.as_ref().schema(); let physical_partitioning = match partitioning_scheme { LogicalPartitioning::RoundRobinBatch(n) => { @@ -1087,7 +1128,7 @@ impl DefaultPhysicalPlanner { LogicalPlan::Sort(Sort { expr, input, fetch, .. }) => { - let physical_input = children.one(); + let physical_input = children.one()?; let input_dfschema = input.as_ref().schema(); let sort_expr = create_physical_sort_exprs( expr, @@ -1099,9 +1140,9 @@ impl DefaultPhysicalPlanner { Arc::new(new_sort) } LogicalPlan::Subquery(_) => todo!(), - LogicalPlan::SubqueryAlias(_) => children.one(), + LogicalPlan::SubqueryAlias(_) => children.one()?, LogicalPlan::Limit(Limit { skip, fetch, .. }) => { - let input = children.one(); + let input = children.one()?; // GlobalLimitExec requires a single partition for input let input = if input.output_partitioning().partition_count() == 1 { @@ -1124,7 +1165,7 @@ impl DefaultPhysicalPlanner { options, .. }) => { - let input = children.one(); + let input = children.one()?; let column_exec = schema .index_of_column(column) .map(|idx| Column::new(&column.name, idx))?; @@ -1145,7 +1186,7 @@ impl DefaultPhysicalPlanner { }) => { let null_equals_null = *null_equals_null; - let [physical_left, physical_right] = children.two(); + let [physical_left, physical_right] = children.two()?; // If join has expression equijoin keys, add physical projection. let has_expr_join_key = keys.iter().any(|(l, r)| { @@ -1413,13 +1454,13 @@ impl DefaultPhysicalPlanner { } } LogicalPlan::CrossJoin(_) => { - let [left, right] = children.two(); + let [left, right] = children.two()?; Arc::new(CrossJoinExec::new(left, right)) } LogicalPlan::RecursiveQuery(RecursiveQuery { name, is_distinct, .. }) => { - let [static_term, recursive_term] = children.two(); + let [static_term, recursive_term] = children.two()?; Arc::new(RecursiveQueryExec::try_new( name.clone(), static_term, diff --git a/datafusion/core/tests/tpcds_planning.rs b/datafusion/core/tests/tpcds_planning.rs index a4a85c6bd1f2..237771248f53 100644 --- a/datafusion/core/tests/tpcds_planning.rs +++ b/datafusion/core/tests/tpcds_planning.rs @@ -846,7 +846,6 @@ async fn tpcds_physical_q63() -> Result<()> { create_physical_plan(63).await } -#[ignore] // thread 'q64' has overflowed its stack #[tokio::test] async fn tpcds_physical_q64() -> Result<()> { create_physical_plan(64).await From c6eaf744e2e43a969efadb3c7f67fe83cd218b92 Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Tue, 16 Apr 2024 10:57:01 +0300 Subject: [PATCH 12/12] Minor changes - Use tokio::Mutex in async environment - Remove Option from enum, since it is only used for taking. --- datafusion/core/src/physical_planner.rs | 36 ++++++++----------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index b7094038d0fb..301f68c0f24b 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -20,7 +20,7 @@ use std::borrow::Cow; use std::collections::HashMap; use std::fmt::Write; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use crate::datasource::file_format::arrow::ArrowFormat; use crate::datasource::file_format::avro::AvroFormat; @@ -74,6 +74,7 @@ use arrow::compute::SortOptions; use arrow::datatypes::{Schema, SchemaRef}; use arrow_array::builder::StringBuilder; use arrow_array::RecordBatch; +use datafusion_common::config::FormatOptions; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::{ exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, @@ -92,16 +93,16 @@ use datafusion_expr::{ ScalarFunctionDefinition, StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp, }; use datafusion_physical_expr::expressions::Literal; +use datafusion_physical_expr::LexOrdering; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_sql::utils::window_expr_common_partition_keys; use async_trait::async_trait; -use datafusion_common::config::FormatOptions; -use datafusion_physical_expr::LexOrdering; use futures::{StreamExt, TryStreamExt}; use itertools::{multiunzip, Itertools}; use log::{debug, trace}; use sqlparser::ast::NullTreatment; +use tokio::sync::Mutex; fn create_function_physical_name( fun: &str, @@ -515,9 +516,7 @@ enum NodeState { /// Nodes with multiple children will have multiple tasks accessing it, /// and each task will append their contribution until the last task takes /// all the children to build the parent node. - /// - /// Wrapped in an Option to make it easier to take the Vec at the end. - TwoOrMoreChildren(Mutex>>), + TwoOrMoreChildren(Mutex>), } /// To avoid needing to pass single child wrapped in a Vec for nodes @@ -603,7 +602,7 @@ impl DefaultPhysicalPlanner { 1 => NodeState::ZeroOrOneChild, _ => { let ready_children = Vec::with_capacity(node.inputs().len()); - let ready_children = Mutex::new(Some(ready_children)); + let ready_children = Mutex::new(ready_children); NodeState::TwoOrMoreChildren(ready_children) } }; @@ -690,24 +689,15 @@ impl DefaultPhysicalPlanner { } // See if we have all children to build the node. NodeState::TwoOrMoreChildren(children) => { - let mut children = { - let mut guard = children.lock().map_err(|_| { - internal_datafusion_err!( - "Poisoned mutex protecitng children vec" - ) - })?; - // Safe unwrap on option as only the last task reaching this - // node will take the contents (which happens after this line). - let children = guard.as_mut().ok_or_else(|| { - internal_datafusion_err!("Children vec is already taken") - })?; + let mut children: Vec = { + let mut guard = children.lock().await; // Add our contribution to this parent node. // Vec is pre-allocated so no allocation should occur here. - children.push(ExecutionPlanChild { + guard.push(ExecutionPlanChild { index: current_index, plan, }); - if children.len() < node.node.inputs().len() { + if guard.len() < node.node.inputs().len() { // This node is not ready yet, still pending more children. // This task is finished forever. return Ok(None); @@ -717,11 +707,7 @@ impl DefaultPhysicalPlanner { // This task is the only one building this node now, and thus // no other task will need the Mutex for this node, so take // all children. - // - // This take is the only place the Option becomes None. - guard.take().ok_or_else(|| { - internal_datafusion_err!("Failed to take children vec") - })? + std::mem::take(guard.as_mut()) }; // Indices refer to position in flat tree Vec, which means they are