Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add cross join support to ballista #891

Merged
merged 1 commit into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,10 @@ message LogicalPlanNode {
RepartitionNode repartition = 9;
EmptyRelationNode empty_relation = 10;
CreateExternalTableNode create_external_table = 11;
AnalyzeNode analyze = 14;
ExplainNode explain = 12;
WindowNode window = 13;
AnalyzeNode analyze = 14;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb i reordered the node down here so it's easier to do the counting. I am guessing you put it next to ExplainNode because that's how they should be logically grouped? If so, maybe we could keep it next to ExplainNode and reorder the subsequent fields.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong preference. rust will tell us when the cases are missing so the order in which they are defined is not all that critical in my opinion

CrossJoinNode cross_join = 15;
}
}

Expand Down Expand Up @@ -399,6 +400,11 @@ message JoinNode {
repeated Column right_join_column = 6;
}

message CrossJoinNode {
LogicalPlanNode left = 1;
LogicalPlanNode right = 2;
}

message LimitNode {
LogicalPlanNode input = 1;
uint32 limit = 2;
Expand Down Expand Up @@ -432,6 +438,7 @@ message PhysicalPlanNode {
RepartitionExecNode repartition = 16;
WindowAggExecNode window = 17;
ShuffleWriterExecNode shuffle_writer = 18;
CrossJoinExecNode cross_join = 19;
}
}

Expand Down Expand Up @@ -593,6 +600,11 @@ message HashJoinExecNode {
PartitionMode partition_mode = 6;
}

message CrossJoinExecNode {
PhysicalPlanNode left = 1;
PhysicalPlanNode right = 2;
}

message PhysicalColumn {
string name = 1;
uint32 index = 2;
Expand Down
11 changes: 9 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ use std::{
unimplemented,
};

// use uuid::Uuid;

impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
type Error = BallistaError;

Expand Down Expand Up @@ -290,6 +288,15 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {

builder.build().map_err(|e| e.into())
}
LogicalPlanType::CrossJoin(crossjoin) => {
let left = convert_box_required!(crossjoin.left)?;
let right = convert_box_required!(crossjoin.right)?;

LogicalPlanBuilder::from(left)
.cross_join(&right)?
.build()
.map_err(|e| e.into())
}
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,18 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
}
LogicalPlan::Extension { .. } => unimplemented!(),
LogicalPlan::Union { .. } => unimplemented!(),
LogicalPlan::CrossJoin { .. } => unimplemented!(),
LogicalPlan::CrossJoin { left, right, .. } => {
let left: protobuf::LogicalPlanNode = left.as_ref().try_into()?;
let right: protobuf::LogicalPlanNode = right.as_ref().try_into()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CrossJoin(Box::new(
protobuf::CrossJoinNode {
left: Some(Box::new(left)),
right: Some(Box::new(right)),
},
))),
})
}
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use datafusion::physical_plan::window_functions::{
use datafusion::physical_plan::windows::{create_window_expr, WindowAggExec};
use datafusion::physical_plan::{
coalesce_batches::CoalesceBatchesExec,
cross_join::CrossJoinExec,
csv::CsvExec,
empty::EmptyExec,
expressions::{
Expand Down Expand Up @@ -372,6 +373,12 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
partition_mode,
)?))
}
PhysicalPlanType::CrossJoin(crossjoin) => {
let left: Arc<dyn ExecutionPlan> = convert_box_required!(crossjoin.left)?;
let right: Arc<dyn ExecutionPlan> =
convert_box_required!(crossjoin.right)?;
Ok(Arc::new(CrossJoinExec::try_new(left, right)?))
}
PhysicalPlanType::ShuffleWriter(shuffle_writer) => {
let input: Arc<dyn ExecutionPlan> =
convert_box_required!(shuffle_writer.input)?;
Expand Down
12 changes: 12 additions & 0 deletions ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::{

use datafusion::logical_plan::JoinType;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::cross_join::CrossJoinExec;
use datafusion::physical_plan::csv::CsvExec;
use datafusion::physical_plan::expressions::{
CaseExpr, InListExpr, IsNotNullExpr, IsNullExpr, NegativeExpr, NotExpr,
Expand Down Expand Up @@ -155,6 +156,17 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
},
))),
})
} else if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
let left: protobuf::PhysicalPlanNode = exec.left().to_owned().try_into()?;
let right: protobuf::PhysicalPlanNode = exec.right().to_owned().try_into()?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
protobuf::CrossJoinExecNode {
left: Some(Box::new(left)),
right: Some(Box::new(right)),
},
))),
})
} else if let Some(exec) = plan.downcast_ref::<HashAggregateExec>() {
let groups = exec
.group_expr()
Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

use super::analyze::AnalyzeExec;
use super::{
aggregates, cross_join::CrossJoinExec, empty::EmptyExec, expressions::binary,
functions, hash_join::PartitionMode, udaf, union::UnionExec, windows,
aggregates, empty::EmptyExec, expressions::binary, functions,
hash_join::PartitionMode, udaf, union::UnionExec, windows,
};
use crate::execution::context::ExecutionContextState;
use crate::logical_plan::{
Expand All @@ -29,6 +29,7 @@ use crate::logical_plan::{
UserDefinedLogicalNode,
};
use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
use crate::physical_plan::cross_join::CrossJoinExec;
use crate::physical_plan::explain::ExplainExec;
use crate::physical_plan::expressions;
use crate::physical_plan::expressions::{CaseExpr, Column, Literal, PhysicalSortExpr};
Expand Down