Skip to content

Commit

Permalink
Minor changes
Browse files Browse the repository at this point in the history
- Use tokio::Mutex in async environment
- Remove Option from enum, since it is only used for taking.
  • Loading branch information
metesynnada committed Apr 16, 2024
1 parent e0cdaf7 commit c6eaf74
Showing 1 changed file with 11 additions and 25 deletions.
36 changes: 11 additions & 25 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::Write;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

use crate::datasource::file_format::arrow::ArrowFormat;
use crate::datasource::file_format::avro::AvroFormat;
Expand Down Expand Up @@ -74,6 +74,7 @@ use arrow::compute::SortOptions;
use arrow::datatypes::{Schema, SchemaRef};
use arrow_array::builder::StringBuilder;
use arrow_array::RecordBatch;
use datafusion_common::config::FormatOptions;
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::{
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
Expand All @@ -92,16 +93,16 @@ use datafusion_expr::{
ScalarFunctionDefinition, StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_sql::utils::window_expr_common_partition_keys;

use async_trait::async_trait;
use datafusion_common::config::FormatOptions;
use datafusion_physical_expr::LexOrdering;
use futures::{StreamExt, TryStreamExt};
use itertools::{multiunzip, Itertools};
use log::{debug, trace};
use sqlparser::ast::NullTreatment;
use tokio::sync::Mutex;

fn create_function_physical_name(
fun: &str,
Expand Down Expand Up @@ -515,9 +516,7 @@ enum NodeState {
/// Nodes with multiple children will have multiple tasks accessing it,
/// and each task will append their contribution until the last task takes
/// all the children to build the parent node.
///
/// Wrapped in an Option to make it easier to take the Vec at the end.
TwoOrMoreChildren(Mutex<Option<Vec<ExecutionPlanChild>>>),
TwoOrMoreChildren(Mutex<Vec<ExecutionPlanChild>>),
}

/// To avoid needing to pass single child wrapped in a Vec for nodes
Expand Down Expand Up @@ -603,7 +602,7 @@ impl DefaultPhysicalPlanner {
1 => NodeState::ZeroOrOneChild,
_ => {
let ready_children = Vec::with_capacity(node.inputs().len());
let ready_children = Mutex::new(Some(ready_children));
let ready_children = Mutex::new(ready_children);
NodeState::TwoOrMoreChildren(ready_children)
}
};
Expand Down Expand Up @@ -690,24 +689,15 @@ impl DefaultPhysicalPlanner {
}
// See if we have all children to build the node.
NodeState::TwoOrMoreChildren(children) => {
let mut children = {
let mut guard = children.lock().map_err(|_| {
internal_datafusion_err!(
"Poisoned mutex protecitng children vec"
)
})?;
// Safe unwrap on option as only the last task reaching this
// node will take the contents (which happens after this line).
let children = guard.as_mut().ok_or_else(|| {
internal_datafusion_err!("Children vec is already taken")
})?;
let mut children: Vec<ExecutionPlanChild> = {
let mut guard = children.lock().await;
// Add our contribution to this parent node.
// Vec is pre-allocated so no allocation should occur here.
children.push(ExecutionPlanChild {
guard.push(ExecutionPlanChild {
index: current_index,
plan,
});
if children.len() < node.node.inputs().len() {
if guard.len() < node.node.inputs().len() {
// This node is not ready yet, still pending more children.
// This task is finished forever.
return Ok(None);
Expand All @@ -717,11 +707,7 @@ impl DefaultPhysicalPlanner {
// This task is the only one building this node now, and thus
// no other task will need the Mutex for this node, so take
// all children.
//
// This take is the only place the Option becomes None.
guard.take().ok_or_else(|| {
internal_datafusion_err!("Failed to take children vec")
})?
std::mem::take(guard.as_mut())
};

// Indices refer to position in flat tree Vec, which means they are
Expand Down

0 comments on commit c6eaf74

Please sign in to comment.