Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leverage more engine capabilities in data skipping 2/n #83

Merged
merged 15 commits into from
Feb 15, 2024
1 change: 1 addition & 0 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ bytes = "1.4"
chrono = { version = "0.4" }
either = "1.8"
fix-hidden-lifetime-bug = "0.2"
indexmap = "2.2.1"
roeap marked this conversation as resolved.
Show resolved Hide resolved
itertools = "0.12"
lazy_static = "1.4"
regex = "1.8"
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/dump-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ fn main() {
let scan = ScanBuilder::new(snapshot).build();

let schema = scan.schema();
let header_names = schema.fields.iter().map(|field| {
let header_names = schema.fields().map(|field| {
let cell = Cell::new(field.name());
if cli.ascii {
cell
Expand Down
18 changes: 5 additions & 13 deletions kernel/src/client/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ impl TryFrom<&StructType> for ArrowSchema {
fn try_from(s: &StructType) -> Result<Self, ArrowError> {
let fields = s
.fields()
.iter()
.map(|f| <ArrowField as TryFrom<&StructField>>::try_from(*f))
.map(TryInto::try_into)
.collect::<Result<Vec<ArrowField>, ArrowError>>()?;
roeap marked this conversation as resolved.
Show resolved Hide resolved

Ok(ArrowSchema::new(fields))
Expand Down Expand Up @@ -105,11 +104,10 @@ impl TryFrom<&DataType> for ArrowDataType {
PrimitiveType::Decimal(precision, scale) => {
if precision <= &38 {
Ok(ArrowDataType::Decimal128(*precision, *scale))
} else if precision <= &76 {
Ok(ArrowDataType::Decimal256(*precision, *scale))
} else {
// NOTE: since we are converting from delta, we should never get here.
Err(ArrowError::SchemaError(format!(
"Precision too large to be represented in Arrow: {}",
"Precision too large to be represented as Delta type: {} > 38",
roeap marked this conversation as resolved.
Show resolved Hide resolved
precision
)))
}
Expand All @@ -127,8 +125,7 @@ impl TryFrom<&DataType> for ArrowDataType {
}
DataType::Struct(s) => Ok(ArrowDataType::Struct(
s.fields()
.iter()
.map(|f| <ArrowField as TryFrom<&StructField>>::try_from(*f))
.map(TryInto::try_into)
.collect::<Result<Vec<ArrowField>, ArrowError>>()?
.into(),
)),
Expand Down Expand Up @@ -216,12 +213,7 @@ impl TryFrom<&ArrowDataType> for DataType {
ArrowDataType::Binary => Ok(DataType::Primitive(PrimitiveType::Binary)),
ArrowDataType::FixedSizeBinary(_) => Ok(DataType::Primitive(PrimitiveType::Binary)),
ArrowDataType::LargeBinary => Ok(DataType::Primitive(PrimitiveType::Binary)),
ArrowDataType::Decimal128(p, s) => {
Ok(DataType::Primitive(PrimitiveType::Decimal(*p, *s)))
}
ArrowDataType::Decimal256(p, s) => {
Ok(DataType::Primitive(PrimitiveType::Decimal(*p, *s)))
}
ArrowDataType::Decimal128(p, s) => Ok(DataType::decimal(*p, *s)),
ArrowDataType::Date32 => Ok(DataType::Primitive(PrimitiveType::Date)),
ArrowDataType::Date64 => Ok(DataType::Primitive(PrimitiveType::Date)),
ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => {
Expand Down
55 changes: 34 additions & 21 deletions kernel/src/client/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ use arrow_array::{
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, StringArray,
StructArray, TimestampMicrosecondArray,
};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow_ord::cmp::{distinct, eq, gt, gt_eq, lt, lt_eq, neq};
use arrow_schema::{ArrowError, Schema as ArrowSchema};
use arrow_select::nullif::nullif;

use crate::error::{DeltaResult, Error};
use crate::expressions::{BinaryOperator, Expression, Scalar, UnaryOperator, VariadicOperator};
Expand Down Expand Up @@ -65,8 +64,7 @@ impl Scalar {
PrimitiveType::Binary => Arc::new(BinaryArray::new_null(num_rows)),
PrimitiveType::Decimal(precision, scale) => Arc::new(
Decimal128Array::new_null(num_rows)
.with_precision_and_scale(*precision, *scale)
.unwrap(),
.with_precision_and_scale(*precision, *scale)?,
),
},
DataType::Array(_) => unimplemented!(),
Expand Down Expand Up @@ -199,34 +197,40 @@ fn evaluate_expression(

eval(&left_arr, &right_arr).map_err(Error::generic_err)
}
(VariadicOperation { op, exprs }, _) => {
(VariadicOperation { op, exprs }, Some(&DataType::BOOLEAN)) => {
type Operation = fn(&BooleanArray, &BooleanArray) -> Result<BooleanArray, ArrowError>;
let (reducer, default): (Operation, _) = match op {
VariadicOperator::And => (and, true),
VariadicOperator::Or => (or, false),
};
exprs
.iter()
.map(|expr| evaluate_expression(expr, batch, Some(&DataType::BOOLEAN)))
.map(|expr| evaluate_expression(expr, batch, result_type))
.reduce(|l, r| {
Ok(reducer(downcast_to_bool(&l?)?, downcast_to_bool(&r?)?)
.map(wrap_comparison_result)?)
})
.unwrap_or_else(|| {
evaluate_expression(
&Expression::literal(default),
batch,
Some(&DataType::BOOLEAN),
)
evaluate_expression(&Expression::literal(default), batch, result_type)
})
}
(NullIf { expr, if_expr }, _) => {
let expr_arr = evaluate_expression(expr.as_ref(), batch, None)?;
let if_expr_arr =
evaluate_expression(if_expr.as_ref(), batch, Some(&DataType::BOOLEAN))?;
let if_expr_arr = downcast_to_bool(&if_expr_arr)?;
Ok(nullif(&expr_arr, if_expr_arr)?)
(VariadicOperation { .. }, _) => {
// NOTE: If we get here, it would be a bug in our code. However it does swallow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use | at L200, to make sure we only match known operations? But I think the sub-match at L203 already gives that compile-time safety?

Or, perhaps it's enough to do this?

Suggested change
(VariadicOperation { .. }, _) => {
// NOTE: If we get here, it would be a bug in our code. However it does swallow
(VariadicOperation { .. }, Some(_)) => {
// NOTE: If we get here, it would be a bug in our code. However it does swallow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, would something like this work?

(VariadicOperation { op, exprs }, None | Some(&DataType::BOOLEAN)) =>  {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, it does! powerful pattern matching must be one of the features I appreciate most in programming languages :)

// the error message from the compiler if we add variants to the enum and forget to add them here.
Err(Error::Generic(format!(
"Current variadic expressions are expected to return boolean results, got {:?}",
ryan-johnson-databricks marked this conversation as resolved.
Show resolved Hide resolved
result_type
)))
}
(Distinct { lhs, rhs }, Some(&DataType::BOOLEAN)) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a new top-level expression type? Can Distinct to be a new BinaryOperator instead?

        (BinaryOperation { op, left, right }, _) => {
              ...
            let eval: Operation = match op {
                  ... 
                Equal => |l, r| eq(l, r).map(wrap_comparison_result),
                NotEqual => |l, r| neq(l, r).map(wrap_comparison_result),
+               Distinct => |l, r| distinct(l, r).map(wrap_comparison_result),
            };

(bonus: whatever type checking we eventually add would then benefit all comparison operators)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: In a future PR that adds better type checking, should we introduce a ComparisonOperator sub-enum, for things that map (T, T) -> bool? And if we did that, should we also add an AlgebraicOperator (**) sub-enum, for things that map (T, T) -> T? That would capture the vast majority of binary operations in a structured way, while still allowing to add arbitrary other binary operators if needed (***)?

Edit: In retrospect, this seems very related to your question #83 (comment)

(**) According to Wikipedia,

An algebraic operation may also be defined simply as a function from a Cartesian power of a set to the same set.

(***) Perhaps ironically, arrow's nullif function is one such operator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let lhs_arr = evaluate_expression(lhs.as_ref(), batch, None)?;
let rhs_arr = evaluate_expression(rhs.as_ref(), batch, None)?;
Ok(distinct(&lhs_arr, &rhs_arr).map(wrap_comparison_result)?)
}
(Distinct { .. }, _) => Err(Error::Generic(format!(
"Distinct will always return boolean result, got {:?}",
result_type
))),
}
}

Expand Down Expand Up @@ -272,6 +276,7 @@ impl ExpressionEvaluator for DefaultExpressionEvaluator {

#[cfg(test)]
mod tests {

use super::*;
use arrow_array::Int32Array;
use arrow_schema::{DataType, Field, Fields, Schema};
Expand Down Expand Up @@ -426,7 +431,9 @@ mod tests {
let column_b = Expression::column("b");

let expression = Box::new(column_a.clone().and(column_b.clone()));
let results = evaluate_expression(&expression, &batch, None).unwrap();
let results =
evaluate_expression(&expression, &batch, Some(&crate::schema::DataType::BOOLEAN))
.unwrap();
let expected = Arc::new(BooleanArray::from(vec![false, false]));
assert_eq!(results.as_ref(), expected.as_ref());

Expand All @@ -435,12 +442,16 @@ mod tests {
.clone()
.and(Expression::literal(Scalar::Boolean(true))),
ryan-johnson-databricks marked this conversation as resolved.
Show resolved Hide resolved
);
let results = evaluate_expression(&expression, &batch, None).unwrap();
let results =
evaluate_expression(&expression, &batch, Some(&crate::schema::DataType::BOOLEAN))
.unwrap();
let expected = Arc::new(BooleanArray::from(vec![true, false]));
assert_eq!(results.as_ref(), expected.as_ref());

let expression = Box::new(column_a.clone().or(column_b));
let results = evaluate_expression(&expression, &batch, None).unwrap();
let results =
evaluate_expression(&expression, &batch, Some(&crate::schema::DataType::BOOLEAN))
.unwrap();
let expected = Arc::new(BooleanArray::from(vec![true, true]));
assert_eq!(results.as_ref(), expected.as_ref());

Expand All @@ -449,7 +460,9 @@ mod tests {
.clone()
.or(Expression::literal(Scalar::Boolean(false))),
);
let results = evaluate_expression(&expression, &batch, None).unwrap();
let results =
evaluate_expression(&expression, &batch, Some(&crate::schema::DataType::BOOLEAN))
.unwrap();
let expected = Arc::new(BooleanArray::from(vec![true, false]));
assert_eq!(results.as_ref(), expected.as_ref());
}
Expand Down
31 changes: 14 additions & 17 deletions kernel/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ pub enum VariadicOperator {
impl Display for BinaryOperator {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
// Self::And => write!(f, "AND"),
// Self::Or => write!(f, "OR"),
Self::Plus => write!(f, "+"),
Self::Minus => write!(f, "-"),
Self::Multiply => write!(f, "*"),
Expand Down Expand Up @@ -101,12 +99,11 @@ pub enum Expression {
/// The expressions.
exprs: Vec<Expression>,
},
// TODO how to model required functions?
NullIf {
/// The expression to evaluate.
expr: Box<Expression>,
/// The expression to compare against.
if_expr: Box<Expression>,
Distinct {
/// left hand side of the distinct
lhs: Box<Expression>,
/// right hand side of the distinct
rhs: Box<Expression>,
},
// TODO: support more expressions, such as IS IN, LIKE, etc.
}
Expand Down Expand Up @@ -148,7 +145,7 @@ impl Display for Expression {
)
}
},
Self::NullIf { expr, if_expr } => write!(f, "NULLIF({}, {})", expr, if_expr),
Self::Distinct { lhs, rhs } => write!(f, "DISTINCT({}, {})", lhs, rhs),
}
}
}
Expand Down Expand Up @@ -274,11 +271,11 @@ impl Expression {
Self::or_from([self, other])
}

/// Create a new expression `NULLIF(self, other)`
pub fn null_if(self, other: Self) -> Self {
Self::NullIf {
expr: Box::new(self),
if_expr: Box::new(other),
/// Create a new expression `DISTINCT(self, other)`
pub fn distinct(self, other: Self) -> Self {
Self::Distinct {
lhs: Box::new(self),
rhs: Box::new(other),
}
}

Expand All @@ -302,9 +299,9 @@ impl Expression {
Self::VariadicOperation { exprs, .. } => {
stack.extend(exprs.iter());
}
Self::NullIf { expr, if_expr } => {
stack.push(expr);
stack.push(if_expr);
Self::Distinct { lhs, rhs } => {
stack.push(lhs);
stack.push(rhs);
}
}
Some(expr)
Expand Down
8 changes: 2 additions & 6 deletions kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,7 @@ impl DataSkippingFilter {
static ref PREDICATE_SCHEMA: DataType = StructType::new(vec![
StructField::new("predicate", DataType::BOOLEAN, true),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: too bad rust doesn't support named args for passing magic constants...

                StructField::new("predicate", DataType::BOOLEAN, nullable = true),

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, you can always put it in a comment ... DataType::BOOLEAN, /* nullable */ true), but it's not nearly as nice

]).into();
static ref FILTER_EXPR: Expr = Expr::is_null(Expr::null_if(
Expr::column("predicate"),
Expr::column("predicate"),
));
static ref FILTER_EXPR: Expr = Expr::column("predicate").distinct(Expr::literal(false));
static ref STATS_EXPR: Expr = Expr::column("add.stats");
);

Expand All @@ -133,8 +130,7 @@ impl DataSkippingFilter {
// Build the stats read schema by extracting the column names referenced by the predicate,
// extracting the corresponding field from the table schema, and inserting that field.
let data_fields: Vec<_> = table_schema
.fields
.iter()
.fields()
.filter(|field| field_names.contains(&field.name.as_str()))
.cloned()
.collect();
Expand Down
13 changes: 5 additions & 8 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ impl Scan {
let read_schema = Arc::new(StructType::new(
self.schema()
.fields()
.into_iter()
.filter(|f| {
!self
.snapshot
Expand All @@ -173,7 +172,6 @@ impl Scan {

let select_fields = read_schema
.fields()
.iter()
.map(|f| Expression::Column(f.name().to_string()))
.collect_vec();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL about collect_vec! But why use it only here? Wouldn't it work just as well for L158, etc?

Similarly, it seems like we could use try_collect at e.g. L171 above and L190 below?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be honest, i am not consistent in that. its only available if Itertools is in scope, so if that is availabe i start using it, and may just have realized too late.


Expand Down Expand Up @@ -202,13 +200,12 @@ impl Scan {
let mut fields =
Vec::with_capacity(partition_fields.len() + batch.num_columns());
for field in &partition_fields {
let value_expression =
if let Some(Some(value)) = add.partition_values.get(field.name()) {
let value_expression = match add.partition_values.get(field.name()) {
Some(Some(value)) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between None and Some(None) here?
(trying to understand why we need nested options like that?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None means no value is present in the map, Some(None) means there is a value and its None.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Worth a quick code comment, perhaps?

Expression::Literal(get_partition_value(value, field.data_type())?)
} else {
// TODO: is it allowed to assume null for missing partition values?
Expression::Literal(Scalar::Null(field.data_type().clone()))
};
}
_ => Expression::Literal(Scalar::Null(field.data_type().clone())),
};
fields.push(value_expression);
}
fields.extend(select_fields.clone());
Expand Down
Loading
Loading