Skip to content

Commit

Permalink
Linearize binary expressions to reduce proto tree complexity
Browse files Browse the repository at this point in the history
  • Loading branch information
isidentical committed Nov 7, 2022
1 parent 238e179 commit a1d6b02
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 46 deletions.
6 changes: 4 additions & 2 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,10 @@ message AliasNode {
}

message BinaryExprNode {
LogicalExprNode l = 1;
LogicalExprNode r = 2;
// Represents the operands from the left inner most expression
// to the right outer most expression where each of them are chained
// with the operator 'op'.
repeated LogicalExprNode operands = 1;
string op = 3;
}

Expand Down
64 changes: 63 additions & 1 deletion datafusion/proto/src/bytes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,67 @@ mod test {
Expr::from_bytes(&bytes).unwrap();
}

fn roundtrip_expr(expr: &Expr) -> Expr {
let bytes = expr.to_bytes().unwrap();
Expr::from_bytes(&bytes).unwrap()
}

#[test]
fn exact_roundtrip_linearized_binary_expr() {
// (((A AND B) AND C) AND D)
let expr_ordered = col("A").and(col("B")).and(col("C")).and(col("D"));
assert_eq!(expr_ordered, roundtrip_expr(&expr_ordered));

// Ensure that no other variation becomes equal
let other_variants = vec![
// (((B AND A) AND C) AND D)
col("B").and(col("A")).and(col("C")).and(col("D")),
// (((A AND C) AND B) AND D)
col("A").and(col("C")).and(col("B")).and(col("D")),
// (((A AND B) AND D) AND C)
col("A").and(col("B")).and(col("D")).and(col("C")),
// A AND (B AND (C AND D)))
col("A").and(col("B").and(col("C").and(col("D")))),
];
for case in other_variants {
// Each variant is still equal to itself
assert_eq!(case, roundtrip_expr(&case));

// But non of them is equal to the original
assert_ne!(expr_ordered, roundtrip_expr(&case));
assert_ne!(roundtrip_expr(&expr_ordered), roundtrip_expr(&case));
}
}

#[test]
fn roundtrip_deeply_nested_binary_expr() {
// We need more stack space so this doesn't overflow in dev builds
std::thread::Builder::new()
.stack_size(10_000_000)
.spawn(|| {
let n = 100;
// a < 5
let basic_expr = col("a").lt(lit(5i32));
// (a < 5) OR (a < 5) OR (a < 5) OR ...
let or_chain = (0..n)
.fold(basic_expr.clone(), |expr, _| expr.or(basic_expr.clone()));
// (a < 5) OR (a < 5) AND (a < 5) OR (a < 5) AND (a < 5) AND (a < 5) OR ...
let expr =
(0..n).fold(or_chain.clone(), |expr, _| expr.and(or_chain.clone()));

// Should work fine.
let bytes = expr.to_bytes().unwrap();

let decoded_expr = Expr::from_bytes(&bytes).expect(
"serialization worked, so deserialization should work as well",
);
assert_eq!(decoded_expr, expr);
})
.expect("spawning thread")
.join()
.expect("joining thread");
}

#[test]
fn roundtrip_deeply_nested() {
// we need more stack space so this doesn't overflow in dev builds
Expand All @@ -332,7 +393,8 @@ mod test {
println!("testing: {n}");

let expr_base = col("a").lt(lit(5i32));
let expr = (0..n).fold(expr_base.clone(), |expr, _| expr.and(expr_base.clone()));
// Generate a tree of AND and OR expressions (no subsequent ANDs or ORs).
let expr = (0..n).fold(expr_base.clone(), |expr, n| if n % 2 == 0 { expr.and(expr_base.clone()) } else { expr.or(expr_base.clone()) });

// Convert it to an opaque form
let bytes = match expr.to_bytes() {
Expand Down
33 changes: 28 additions & 5 deletions datafusion/proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,11 +690,34 @@ pub fn parse_expr(
.ok_or_else(|| Error::required("expr_type"))?;

match expr_type {
ExprType::BinaryExpr(binary_expr) => Ok(Expr::BinaryExpr(BinaryExpr::new(
Box::new(parse_required_expr(&binary_expr.l, registry, "l")?),
from_proto_binary_op(&binary_expr.op)?,
Box::new(parse_required_expr(&binary_expr.r, registry, "r")?),
))),
ExprType::BinaryExpr(binary_expr) => {
let op = from_proto_binary_op(&binary_expr.op)?;
let operands = binary_expr
.operands
.iter()
.map(|expr| parse_expr(expr, registry))
.collect::<Result<Vec<_>, _>>()?;

if operands.len() < 2 {
return Err(proto_error(
"A binary expression must always have at least 2 operands",
));
}

// Reduce the linearized operands (ordered by left innermost to right
// outermost) into a single expression tree.
Ok(operands
.into_iter()
.reduce(|left, right| {
Expr::BinaryExpr(BinaryExpr::new(Box::new(left), op, Box::new(right)))
})
.unwrap_or_else(|| {
// As long as we have the bounds check above, this should never happen.
panic!(
"Binary expression could not be reduced to a single expression."
)
}))
}
ExprType::GetIndexedField(field) => {
let key = field
.key
Expand Down
41 changes: 12 additions & 29 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 29 additions & 4 deletions datafusion/proto/src/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,11 +455,36 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
}
}
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
let binary_expr = Box::new(protobuf::BinaryExprNode {
l: Some(Box::new(left.as_ref().try_into()?)),
r: Some(Box::new(right.as_ref().try_into()?)),
// Try to linerize a nested binary expression tree of the same operator
// into a flat vector of expressions.
let mut exprs = vec![right.as_ref()];
let mut current_expr = left.as_ref();
while let Expr::BinaryExpr(BinaryExpr {
left,
op: current_op,
right,
}) = current_expr
{
if current_op == op {
exprs.push(right.as_ref());
current_expr = left.as_ref();
} else {
break;
}
}
exprs.push(current_expr);

let binary_expr = protobuf::BinaryExprNode {
// We need to reverse exprs since operands are expected to be
// linearized from left innermost to right outermost (but while
// traversing the chain we do the exact opposite).
operands: exprs
.into_iter()
.rev()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, Error>>()?,
op: format!("{:?}", op),
});
};
Self {
expr_type: Some(ExprType::BinaryExpr(binary_expr)),
}
Expand Down

0 comments on commit a1d6b02

Please sign in to comment.