Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Linearize binary expressions to reduce proto tree complexity #4115

Merged
merged 1 commit into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,10 @@ message AliasNode {
}

message BinaryExprNode {
LogicalExprNode l = 1;
LogicalExprNode r = 2;
// Represents the operands from the left inner most expression
// to the right outer most expression where each of them are chained
// with the operator 'op'.
repeated LogicalExprNode operands = 1;
string op = 3;
}

Expand Down
95 changes: 94 additions & 1 deletion datafusion/proto/src/bytes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,98 @@ mod test {
Expr::from_bytes(&bytes).unwrap();
}

fn roundtrip_expr(expr: &Expr) -> Expr {
let bytes = expr.to_bytes().unwrap();
Expr::from_bytes(&bytes).unwrap()
}

#[test]
fn exact_roundtrip_linearized_binary_expr() {
// (((A AND B) AND C) AND D)
let expr_ordered = col("A").and(col("B")).and(col("C")).and(col("D"));
assert_eq!(expr_ordered, roundtrip_expr(&expr_ordered));

// Ensure that no other variation becomes equal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

let other_variants = vec![
// (((B AND A) AND C) AND D)
col("B").and(col("A")).and(col("C")).and(col("D")),
// (((A AND C) AND B) AND D)
col("A").and(col("C")).and(col("B")).and(col("D")),
// (((A AND B) AND D) AND C)
col("A").and(col("B")).and(col("D")).and(col("C")),
// A AND (B AND (C AND D)))
col("A").and(col("B").and(col("C").and(col("D")))),
];
for case in other_variants {
// Each variant is still equal to itself
assert_eq!(case, roundtrip_expr(&case));

// But non of them is equal to the original
assert_ne!(expr_ordered, roundtrip_expr(&case));
assert_ne!(roundtrip_expr(&expr_ordered), roundtrip_expr(&case));
}
}

#[test]
fn roundtrip_deeply_nested_binary_expr() {
// We need more stack space so this doesn't overflow in dev builds
std::thread::Builder::new()
.stack_size(10_000_000)
.spawn(|| {
let n = 100;
// a < 5
let basic_expr = col("a").lt(lit(5i32));
// (a < 5) OR (a < 5) OR (a < 5) OR ...
let or_chain = (0..n)
.fold(basic_expr.clone(), |expr, _| expr.or(basic_expr.clone()));
// (a < 5) OR (a < 5) AND (a < 5) OR (a < 5) AND (a < 5) AND (a < 5) OR ...
let expr =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

(0..n).fold(or_chain.clone(), |expr, _| expr.and(or_chain.clone()));

// Should work fine.
let bytes = expr.to_bytes().unwrap();

let decoded_expr = Expr::from_bytes(&bytes).expect(
"serialization worked, so deserialization should work as well",
);
assert_eq!(decoded_expr, expr);
})
.expect("spawning thread")
.join()
.expect("joining thread");
}

#[test]
fn roundtrip_deeply_nested_binary_expr_reverse_order() {
// We need more stack space so this doesn't overflow in dev builds
std::thread::Builder::new()
.stack_size(10_000_000)
.spawn(|| {
let n = 100;

// a < 5
let expr_base = col("a").lt(lit(5i32));

// ((a < 5 AND a < 5) AND a < 5) AND ...
let and_chain =
(0..n).fold(expr_base.clone(), |expr, _| expr.and(expr_base.clone()));

// a < 5 AND (a < 5 AND (a < 5 AND ...))
let expr = expr_base.and(and_chain);

// Should work fine.
let bytes = expr.to_bytes().unwrap();

let decoded_expr = Expr::from_bytes(&bytes).expect(
"serialization worked, so deserialization should work as well",
);
assert_eq!(decoded_expr, expr);
})
.expect("spawning thread")
.join()
.expect("joining thread");
}

#[test]
fn roundtrip_deeply_nested() {
// we need more stack space so this doesn't overflow in dev builds
Expand All @@ -332,7 +424,8 @@ mod test {
println!("testing: {n}");

let expr_base = col("a").lt(lit(5i32));
let expr = (0..n).fold(expr_base.clone(), |expr, _| expr.and(expr_base.clone()));
// Generate a tree of AND and OR expressions (no subsequent ANDs or ORs).
let expr = (0..n).fold(expr_base.clone(), |expr, n| if n % 2 == 0 { expr.and(expr_base.clone()) } else { expr.or(expr_base.clone()) });

// Convert it to an opaque form
let bytes = match expr.to_bytes() {
Expand Down
28 changes: 23 additions & 5 deletions datafusion/proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,11 +690,29 @@ pub fn parse_expr(
.ok_or_else(|| Error::required("expr_type"))?;

match expr_type {
ExprType::BinaryExpr(binary_expr) => Ok(Expr::BinaryExpr(BinaryExpr::new(
Box::new(parse_required_expr(&binary_expr.l, registry, "l")?),
from_proto_binary_op(&binary_expr.op)?,
Box::new(parse_required_expr(&binary_expr.r, registry, "r")?),
))),
ExprType::BinaryExpr(binary_expr) => {
let op = from_proto_binary_op(&binary_expr.op)?;
let operands = binary_expr
.operands
.iter()
.map(|expr| parse_expr(expr, registry))
.collect::<Result<Vec<_>, _>>()?;

if operands.len() < 2 {
return Err(proto_error(
"A binary expression must always have at least 2 operands",
));
}

// Reduce the linearized operands (ordered by left innermost to right
// outermost) into a single expression tree.
Ok(operands
.into_iter()
.reduce(|left, right| {
Expr::BinaryExpr(BinaryExpr::new(Box::new(left), op, Box::new(right)))
})
.expect("Binary expression could not be reduced to a single expression."))
}
ExprType::GetIndexedField(field) => {
let key = field
.key
Expand Down
41 changes: 12 additions & 29 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 29 additions & 4 deletions datafusion/proto/src/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,11 +455,36 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
}
}
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
let binary_expr = Box::new(protobuf::BinaryExprNode {
l: Some(Box::new(left.as_ref().try_into()?)),
r: Some(Box::new(right.as_ref().try_into()?)),
// Try to linerize a nested binary expression tree of the same operator
// into a flat vector of expressions.
let mut exprs = vec![right.as_ref()];
let mut current_expr = left.as_ref();
while let Expr::BinaryExpr(BinaryExpr {
left,
op: current_op,
right,
}) = current_expr
{
if current_op == op {
exprs.push(right.as_ref());
current_expr = left.as_ref();
} else {
break;
}
}
exprs.push(current_expr);

let binary_expr = protobuf::BinaryExprNode {
// We need to reverse exprs since operands are expected to be
// linearized from left innermost to right outermost (but while
// traversing the chain we do the exact opposite).
operands: exprs
.into_iter()
.rev()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, Error>>()?,
op: format!("{:?}", op),
});
};
Self {
expr_type: Some(ExprType::BinaryExpr(binary_expr)),
}
Expand Down