Skip to content

Commit

Permalink
fix: sql planner creates cross join instead of inner join from select…
Browse files Browse the repository at this point in the history
… predicates
  • Loading branch information
xudong963 committed Jan 17, 2022
1 parent f027e5f commit fd62c56
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 27 deletions.
52 changes: 32 additions & 20 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub enum JoinType {
}

/// Join constraint
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum JoinConstraint {
/// Join ON
On,
Expand All @@ -59,7 +59,7 @@ pub enum JoinConstraint {

/// Evaluates an arbitrary list of expressions (essentially a
/// SELECT with an expression list) on its input.
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub struct Projection {
/// The list of expressions
pub expr: Vec<Expr>,
Expand All @@ -79,7 +79,7 @@ pub struct Projection {
/// If the value of `<predicate>` is true, the input row is passed to
/// the output. If the value of `<predicate>` is false, the row is
/// discarded.
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub struct Filter {
/// The predicate expression, which must have Boolean type.
pub predicate: Expr,
Expand All @@ -88,7 +88,7 @@ pub struct Filter {
}

/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub struct Window {
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
Expand All @@ -115,8 +115,14 @@ pub struct TableScan {
pub limit: Option<usize>,
}

impl PartialEq for TableScan {
fn eq(&self, other: &Self) -> bool {
self.table_name == other.table_name
}
}

/// Apply Cross Join to two logical plans
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub struct CrossJoin {
/// Left input
pub left: Arc<LogicalPlan>,
Expand All @@ -127,7 +133,7 @@ pub struct CrossJoin {
}

/// Repartition the plan based on a partitioning scheme.
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub struct Repartition {
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
Expand All @@ -136,7 +142,7 @@ pub struct Repartition {
}

/// Union multiple inputs
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub struct Union {
/// Inputs to merge
pub inputs: Vec<LogicalPlan>,
Expand All @@ -147,7 +153,7 @@ pub struct Union {
}

/// Creates an in memory table.
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub struct CreateMemoryTable {
/// The table name
pub name: String,
Expand All @@ -156,7 +162,7 @@ pub struct CreateMemoryTable {
}

/// Creates an external table.
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub struct CreateExternalTable {
/// The table schema
pub schema: DFSchemaRef,
Expand All @@ -171,7 +177,7 @@ pub struct CreateExternalTable {
}

/// Drops a table.
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub struct DropTable {
/// The table name
pub name: String,
Expand All @@ -183,7 +189,7 @@ pub struct DropTable {

/// Produces a relation with string representations of
/// various parts of the plan
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub struct Explain {
/// Should extra (detailed, intermediate plans) be included?
pub verbose: bool,
Expand All @@ -197,7 +203,7 @@ pub struct Explain {

/// Runs the actual plan, and then prints the physical plan with
/// with execution metrics.
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub struct Analyze {
/// Should extra detail be included?
pub verbose: bool,
Expand All @@ -214,8 +220,14 @@ pub struct Extension {
pub node: Arc<dyn UserDefinedLogicalNode + Send + Sync>,
}

impl PartialEq for Extension {
fn eq(&self, _other: &Self) -> bool {
todo!()
}
}

/// Produces no rows: An empty relation with an empty schema
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub struct EmptyRelation {
/// Whether to produce a placeholder row
pub produce_one_row: bool,
Expand All @@ -224,7 +236,7 @@ pub struct EmptyRelation {
}

/// Produces the first `n` tuples from its input and discards the rest.
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub struct Limit {
/// The limit
pub n: usize,
Expand All @@ -235,7 +247,7 @@ pub struct Limit {
/// Values expression. See
/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
/// documentation for more details.
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub struct Values {
/// The table schema
pub schema: DFSchemaRef,
Expand All @@ -245,7 +257,7 @@ pub struct Values {

/// Aggregates its input based on a set of grouping and aggregate
/// expressions (e.g. SUM).
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub struct Aggregate {
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
Expand All @@ -258,7 +270,7 @@ pub struct Aggregate {
}

/// Sorts its input according to a list of sort expressions.
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub struct Sort {
/// The sort expressions
pub expr: Vec<Expr>,
Expand All @@ -267,7 +279,7 @@ pub struct Sort {
}

/// Join two logical plans on one or more join columns
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub struct Join {
/// Left input
pub left: Arc<LogicalPlan>,
Expand All @@ -292,7 +304,7 @@ pub struct Join {
/// an output relation (table) with a (potentially) different
/// schema. A plan represents a dataflow tree where data flows
/// from leaves up to the root to produce the query result.
#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub enum LogicalPlan {
/// Evaluates an arbitrary list of expressions (essentially a
/// SELECT with an expression list) on its input.
Expand Down Expand Up @@ -547,7 +559,7 @@ impl LogicalPlan {
}

/// Logical partitioning schemes supported by the repartition operator.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub enum Partitioning {
/// Allocate batches using a round-robin algorithm and the specified number of partitions
RoundRobinBatch(usize),
Expand Down
27 changes: 20 additions & 7 deletions datafusion/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
alias: Option<String>,
) -> Result<LogicalPlan> {
let plans = self.plan_from_tables(&select.from, ctes)?;

let plan = match &select.selection {
Some(predicate_expr) => {
// build join schema
Expand All @@ -714,10 +713,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
extract_possible_join_keys(&filter_expr, &mut possible_join_keys)?;

let mut all_join_keys = HashSet::new();
let mut left = plans[0].clone();
for right in plans.iter().skip(1) {
let mut mut_plans = plans.clone();
let mut left = mut_plans[0].clone();
let mut idx = 1;
while idx < mut_plans.len() {
let left_schema = left.schema();
let right_schema = right.schema();
let right_schema = mut_plans[idx].schema();
let mut join_keys = vec![];
for (l, r) in &possible_join_keys {
if left_schema.field_from_column(l).is_ok()
Expand All @@ -731,20 +732,32 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
}
if join_keys.is_empty() {
left =
LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
if mut_plans.contains(&mut_plans[idx])
|| idx == mut_plans.len() - 1
{
left = LogicalPlanBuilder::from(left)
.cross_join(&mut_plans[idx])?
.build()?;
} else {
mut_plans.push(mut_plans[idx].clone());
}
} else {
let left_keys: Vec<Column> =
join_keys.iter().map(|(l, _)| l.clone()).collect();
let right_keys: Vec<Column> =
join_keys.iter().map(|(_, r)| r.clone()).collect();
let builder = LogicalPlanBuilder::from(left);
left = builder
.join(right, JoinType::Inner, (left_keys, right_keys))?
.join(
&mut_plans[idx],
JoinType::Inner,
(left_keys, right_keys),
)?
.build()?;
}

all_join_keys.extend(join_keys);
idx += 1;
}

// remove join expressions from filter
Expand Down

0 comments on commit fd62c56

Please sign in to comment.