Skip to content

Commit

Permalink
fix(executor, frontend): Lookup join should not have unaligned eq keys (
Browse files Browse the repository at this point in the history
#8402)

Co-authored-by: jon-chuang <[email protected]>
  • Loading branch information
2 people authored and stdrc committed Mar 8, 2023
1 parent 589d1ee commit 6bdbe40
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 20 deletions.
15 changes: 5 additions & 10 deletions src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@ use std::mem::swap;
use futures::pin_mut;
use itertools::Itertools;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId, TableOption};
use risingwave_common::error::Result;
use risingwave_common::error::{internal_error, Result};
use risingwave_common::hash::{HashKey, HashKeyDispatcher};
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::scan_range::ScanRange;
use risingwave_common::util::sort_util::OrderType;
use risingwave_expr::expr::{build_from_prost, new_unary_expr, BoxedExpression, LiteralExpression};
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::expr::expr_node::Type;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{Distribution, TableIter};
Expand Down Expand Up @@ -370,13 +369,9 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {
let datum = if inner_type == outer_type {
datum
} else {
let cast_expr = new_unary_expr(
Type::Cast,
inner_type.clone(),
Box::new(LiteralExpression::new(outer_type.clone(), datum.clone())),
)?;

cast_expr.eval_row(&OwnedRow::empty())?
return Err(internal_error(format!(
"Join key types are not aligned: LHS: {outer_type:?}, RHS: {inner_type:?}"
)));
};

scan_range.eq_conds.push(datum);
Expand Down
14 changes: 4 additions & 10 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ use risingwave_common::error::{internal_error, Result};
use risingwave_common::hash::{
ExpandedParallelUnitMapping, HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode,
};
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::scan_range::ScanRange;
use risingwave_common::util::worker_util::get_pu_to_worker_mapping;
use risingwave_expr::expr::{build_from_prost, new_unary_expr, BoxedExpression, LiteralExpression};
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_pb::batch_plan::exchange_info::DistributionMode;
use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
use risingwave_pb::batch_plan::plan_node::NodeBody;
Expand All @@ -37,7 +36,6 @@ use risingwave_pb::batch_plan::{
PlanFragment, PlanNode, RowSeqScanNode, TaskId as ProstTaskId, TaskOutputId,
};
use risingwave_pb::common::{BatchQueryEpoch, WorkerNode};
use risingwave_pb::expr::expr_node::Type;
use risingwave_pb::plan_common::StorageTableDesc;
use uuid::Uuid;

Expand Down Expand Up @@ -188,13 +186,9 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
let datum = if inner_type == outer_type {
datum
} else {
let cast_expr = new_unary_expr(
Type::Cast,
inner_type.clone(),
Box::new(LiteralExpression::new(outer_type.clone(), datum.clone())),
)?;

cast_expr.eval_row(&OwnedRow::empty())?
return Err(internal_error(format!(
"Join key types are not aligned: LHS: {outer_type:?}, RHS: {inner_type:?}"
)));
};

scan_range.eq_conds.push(datum);
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/batch_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl BatchLookupJoin {
// We cannot create a `BatchLookupJoin` without any eq keys. We require eq keys to do the
// lookup.
assert!(eq_join_predicate.has_eq());
assert!(eq_join_predicate.eq_keys_are_type_aligned());
let ctx = logical.base.ctx.clone();
let dist = Self::derive_dist(logical.left().distribution(), &logical);
let base = PlanBase::new_batch(ctx, logical.schema().clone(), dist, Order::any());
Expand Down

0 comments on commit 6bdbe40

Please sign in to comment.