Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move the to_timestamp* functions to datafusion-functions #9388

Merged
merged 26 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6ffb364
WIP
Omega359 Feb 21, 2024
aa56090
Merge remote-tracking branch 'upstream/main' into feature/9291
Omega359 Feb 21, 2024
b1868ef
WIP
Omega359 Feb 21, 2024
aa1c5a6
Merge remote-tracking branch 'upstream/main' into feature/9291
Omega359 Feb 28, 2024
d8e6016
cargo fmt updates.
Omega359 Feb 28, 2024
78baa96
Migrate to_timestamp* functions to new functions crate.
Omega359 Feb 28, 2024
45bea0f
Merge remote-tracking branch 'upstream/main' into feature/9291
Omega359 Feb 28, 2024
a02ecae
update to allow wasm run to complete.
Omega359 Feb 28, 2024
87a9797
Fix expr_api example
Omega359 Feb 28, 2024
54d5ee6
cargo fmt.
Omega359 Feb 28, 2024
00fa3cb
Update datafusion/core/tests/simplification.rs
Omega359 Feb 28, 2024
320f867
Revert changes to sql in tests to restore to_timestamp(..) calls.
Omega359 Feb 28, 2024
152fc83
Merge remote-tracking branch 'upstream/main' into feature/9291
Omega359 Feb 29, 2024
4fd9ec9
Rust fmt apparently can't make up it's mind.
Omega359 Feb 29, 2024
d83cef5
Updates for merge and moving some expression simplifier tests from op…
Omega359 Feb 29, 2024
dd9baa0
Revert test changes
alamb Feb 29, 2024
10b0a8a
Update datafusion/core/tests/simplification.rs
alamb Feb 29, 2024
c172c4c
Move cast_column to `ColumnarValue::cast_to`
alamb Feb 29, 2024
d5f9615
Remove datafusion-physical-expr dependency
alamb Feb 29, 2024
e29e62d
Merge pull request #1 from alamb/alamb/move_cast
Omega359 Mar 1, 2024
0767a07
Merge remote-tracking branch 'upstream/main' into feature/9291
Omega359 Mar 1, 2024
f59254e
Move dependencies to dev
alamb Mar 1, 2024
7a1cc02
Fix merge conflict by migrating to_date to new functions module along…
Omega359 Mar 1, 2024
c3a8d0e
Merge remote-tracking branch 'origin/feature/9291' into feature/9291
Omega359 Mar 1, 2024
b1fd2bf
Cargo lock update from merge.
Omega359 Mar 1, 2024
200e2f8
Add missing licenses.
Omega359 Mar 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ jobs:
- name: Check function packages (array_expressions)
run: cargo check --no-default-features --features=array_expressions -p datafusion

- name: Check function packages (datetime_expressions)
run: cargo check --no-default-features --features=datetime_expressions -p datafusion

- name: Check Cargo.lock for datafusion-cli
run: |
# If this test fails, try running `cargo update` in the `datafusion-cli` directory
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ Default features:
- `array_expressions`: functions for working with arrays such as `array_to_string`
- `compression`: reading files compressed with `xz2`, `bzip2`, `flate2`, and `zstd`
- `crypto_expressions`: cryptographic functions such as `md5` and `sha256`
- `datetime_expressions`: date and time functions such as `to_timestamp`
- `encoding_expressions`: `encode` and `decode` functions
- `parquet`: support for reading the [Apache Parquet] format
- `regex_expressions`: regular expression functions, such as `regexp_match`
Expand Down
46 changes: 44 additions & 2 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "36.0.0", features = [
"avro",
"crypto_expressions",
"datetime_expressions",
"encoding_expressions",
"parquet",
"regex_expressions",
Expand Down
11 changes: 5 additions & 6 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::sync::Arc;

use arrow::array::{BooleanArray, Int32Array};
use arrow::record_batch::RecordBatch;

use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::common::{DFField, DFSchema};
use datafusion::error::Result;
Expand All @@ -30,8 +34,6 @@ use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::expr::BinaryExpr;
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::{ColumnarValue, ExprSchemable, Operator};
use std::collections::HashMap;
use std::sync::Arc;

/// This example demonstrates the DataFusion [`Expr`] API.
///
Expand Down Expand Up @@ -113,10 +115,7 @@ fn evaluate_demo() -> Result<()> {
fn simplify_demo() -> Result<()> {
// For example, lets say you have has created an expression such
// ts = to_timestamp("2020-09-08T12:00:00+00:00")
let expr = col("ts").eq(call_fn(
"to_timestamp",
vec![lit("2020-09-08T12:00:00+00:00")],
)?);
let expr = col("ts").eq(to_timestamp(vec![lit("2020-09-08T12:00:00+00:00")]));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


// Naively evaluating such an expression against a large number of
// rows would involve re-converting "2020-09-08T12:00:00+00:00" to a
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ avro = ["apache-avro", "num-traits", "datafusion-common/avro"]
backtrace = ["datafusion-common/backtrace"]
compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression", "tokio-util"]
crypto_expressions = ["datafusion-physical-expr/crypto_expressions", "datafusion-optimizer/crypto_expressions"]
datetime_expressions = ["datafusion-functions/datetime_expressions"]
default = [
"array_expressions",
"crypto_expressions",
"datetime_expressions",
"encoding_expressions",
"regex_expressions",
"unicode_expressions",
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ async fn test_prune(
async fn prune_timestamps_nanos() {
test_prune(
Scenario::Timestamps,
"SELECT * FROM t where nanos < to_timestamp('2020-01-02 01:01:11Z')",
"SELECT * FROM t where nanos < arrow_cast('2020-01-02T01:01:11', 'Timestamp(Nanosecond, None)')",
Some(0),
Some(5),
10,
Expand All @@ -284,7 +284,7 @@ async fn prune_timestamps_nanos() {
async fn prune_timestamps_micros() {
test_prune(
Scenario::Timestamps,
"SELECT * FROM t where micros < to_timestamp_micros('2020-01-02 01:01:11Z')",
"SELECT * FROM t where micros < arrow_cast('2020-01-02T01:01:11', 'Timestamp(Microsecond, None)')",
Some(0),
Some(5),
10,
Expand All @@ -301,7 +301,7 @@ async fn prune_timestamps_micros() {
async fn prune_timestamps_millis() {
test_prune(
Scenario::Timestamps,
"SELECT * FROM t where millis < to_timestamp_millis('2020-01-02 01:01:11Z')",
"SELECT * FROM t where millis < arrow_cast('2020-01-02T01:01:11', 'Timestamp(Millisecond, None)')",
Some(0),
Some(5),
10,
Expand All @@ -319,7 +319,7 @@ async fn prune_timestamps_millis() {
async fn prune_timestamps_seconds() {
test_prune(
Scenario::Timestamps,
"SELECT * FROM t where seconds < to_timestamp_seconds('2020-01-02 01:01:11Z')",
"SELECT * FROM t where seconds < arrow_cast('2020-01-02T01:01:11', 'Timestamp(Second, None)')",
Some(0),
Some(5),
10,
Expand Down Expand Up @@ -728,7 +728,7 @@ async fn without_pushdown_filter() {
let mut context = ContextWithParquet::new(Scenario::Timestamps, Page).await;

let output2 = context
.query("SELECT * FROM t where nanos < to_timestamp('2023-01-02 01:01:11Z')")
.query("SELECT * FROM t where nanos < arrow_cast('2023-01-02T01:01:11', 'Timestamp(Nanosecond, None)')")
.await;

let bytes_scanned_without_filter = cast_count_metric(
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/tests/parquet/row_group_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl RowGroupPruningTest {
async fn prune_timestamps_nanos() {
RowGroupPruningTest::new()
.with_scenario(Scenario::Timestamps)
.with_query("SELECT * FROM t where nanos < to_timestamp('2020-01-02 01:01:11Z')")
.with_query("SELECT * FROM t where nanos < arrow_cast('2020-01-02T01:01:11', 'Timestamp(Nanosecond, None)')")
.with_expected_errors(Some(0))
.with_pruned_by_stats(Some(1))
.with_pruned_by_bloom_filter(Some(0))
Expand All @@ -126,7 +126,7 @@ async fn prune_timestamps_micros() {
RowGroupPruningTest::new()
.with_scenario(Scenario::Timestamps)
.with_query(
"SELECT * FROM t where micros < to_timestamp_micros('2020-01-02 01:01:11Z')",
"SELECT * FROM t where micros < arrow_cast('2020-01-02T01:01:11', 'Timestamp(Microsecond, None)')",
)
.with_expected_errors(Some(0))
.with_pruned_by_stats(Some(1))
Expand All @@ -141,7 +141,7 @@ async fn prune_timestamps_millis() {
RowGroupPruningTest::new()
.with_scenario(Scenario::Timestamps)
.with_query(
"SELECT * FROM t where micros < to_timestamp_millis('2020-01-02 01:01:11Z')",
"SELECT * FROM t where millis < arrow_cast('2020-01-02T01:01:11', 'Timestamp(Millisecond, None)')",
)
.with_expected_errors(Some(0))
.with_pruned_by_stats(Some(1))
Expand All @@ -156,7 +156,7 @@ async fn prune_timestamps_seconds() {
RowGroupPruningTest::new()
.with_scenario(Scenario::Timestamps)
.with_query(
"SELECT * FROM t where seconds < to_timestamp_seconds('2020-01-02 01:01:11Z')",
"SELECT * FROM t where seconds < arrow_cast('2020-01-02T01:01:11', 'Timestamp(Second, None)')",
)
.with_expected_errors(Some(0))
.with_pruned_by_stats(Some(1))
Expand Down Expand Up @@ -209,7 +209,7 @@ async fn prune_date64() {
async fn prune_disabled() {
RowGroupPruningTest::new()
.with_scenario(Scenario::Timestamps)
.with_query("SELECT * FROM t where nanos < to_timestamp('2020-01-02 01:01:11Z')")
.with_query("SELECT * FROM t where nanos < arrow_cast('2020-01-02T01:01:11', 'Timestamp(Nanosecond, None)')")
.with_expected_errors(Some(0))
.with_pruned_by_stats(Some(1))
.with_pruned_by_bloom_filter(Some(0))
Expand All @@ -218,7 +218,7 @@ async fn prune_disabled() {
.await;

// test without pruning
let query = "SELECT * FROM t where nanos < to_timestamp('2020-01-02 01:01:11Z')";
let query = "SELECT * FROM t where nanos < arrow_cast('2020-01-02T01:01:11', 'Timestamp(Nanosecond, None)')";
let expected_rows = 10;
let config = SessionConfig::new().with_parquet_pruning(false);

Expand Down
118 changes: 116 additions & 2 deletions datafusion/core/tests/simplification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@
//! This program demonstrates the DataFusion expression simplification API.

use arrow::datatypes::{DataType, Field, Schema};
use chrono::{DateTime, TimeZone, Utc};
use datafusion::common::DFSchema;
use datafusion::{error::Result, execution::context::ExecutionProps, prelude::*};
use datafusion_expr::{Expr, ExprSchemable};
use datafusion_optimizer::simplify_expressions::{ExprSimplifier, SimplifyInfo};
use datafusion_common::ScalarValue;
use datafusion_expr::{
table_scan, Cast, Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder,
};
use datafusion_optimizer::simplify_expressions::{
ExprSimplifier, SimplifyExpressions, SimplifyInfo,
};
use datafusion_optimizer::{OptimizerContext, OptimizerRule};

/// In order to simplify expressions, DataFusion must have information
/// about the expressions.
Expand Down Expand Up @@ -79,6 +86,43 @@ fn schema() -> DFSchema {
.unwrap()
}

fn test_table_scan() -> LogicalPlan {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests were migrated from the optimizer/simplify_expressions file

let schema = Schema::new(vec![
Field::new("a", DataType::Boolean, false),
Field::new("b", DataType::Boolean, false),
Field::new("c", DataType::Boolean, false),
Field::new("d", DataType::UInt32, false),
Field::new("e", DataType::UInt32, true),
]);
table_scan(Some("test"), &schema, None)
.expect("creating scan")
.build()
.expect("building plan")
}

fn get_optimized_plan_formatted(plan: &LogicalPlan, date_time: &DateTime<Utc>) -> String {
let config = OptimizerContext::new().with_query_execution_start_time(*date_time);
let rule = SimplifyExpressions::new();

let optimized_plan = rule
.try_optimize(plan, &config)
.unwrap()
.expect("failed to optimize plan");
format!("{optimized_plan:?}")
}

fn now_expr() -> Expr {
call_fn("now", vec![]).unwrap()
}

fn cast_to_int64_expr(expr: Expr) -> Expr {
Expr::Cast(Cast::new(expr.into(), DataType::Int64))
}

fn to_timestamp_expr(arg: impl Into<String>) -> Expr {
to_timestamp(vec![lit(arg.into())])
}

#[test]
fn basic() {
let info: MyInfo = schema().into();
Expand Down Expand Up @@ -108,3 +152,73 @@ fn fold_and_simplify() {
let simplified = simplifier.simplify(expr).unwrap();
assert_eq!(simplified, lit(true))
}

#[test]
fn to_timestamp_expr_folded() -> Result<()> {
let table_scan = test_table_scan();
let proj = vec![to_timestamp_expr("2020-09-08T12:00:00+00:00")];

let plan = LogicalPlanBuilder::from(table_scan)
.project(proj)?
.build()?;

let expected = "Projection: TimestampNanosecond(1599566400000000000, None) AS to_timestamp(Utf8(\"2020-09-08T12:00:00+00:00\"))\
\n TableScan: test"
.to_string();
let actual = get_optimized_plan_formatted(&plan, &Utc::now());
assert_eq!(expected, actual);
Ok(())
}

#[test]
fn now_less_than_timestamp() -> Result<()> {
let table_scan = test_table_scan();

let ts_string = "2020-09-08T12:05:00+00:00";
let time = Utc.timestamp_nanos(1599566400000000000i64);

// cast(now() as int) < cast(to_timestamp(...) as int) + 50000_i64
let plan = LogicalPlanBuilder::from(table_scan)
.filter(
cast_to_int64_expr(now_expr())
.lt(cast_to_int64_expr(to_timestamp_expr(ts_string)) + lit(50000_i64)),
)?
.build()?;

// Note that constant folder runs and folds the entire
// expression down to a single constant (true)
let expected = "Filter: Boolean(true)\
\n TableScan: test";
let actual = get_optimized_plan_formatted(&plan, &time);

assert_eq!(expected, actual);
Ok(())
}

#[test]
fn select_date_plus_interval() -> Result<()> {
let table_scan = test_table_scan();

let ts_string = "2020-09-08T12:05:00+00:00";
let time = Utc.timestamp_nanos(1599566400000000000i64);

// now() < cast(to_timestamp(...) as int) + 5000000000
let schema = table_scan.schema();

let date_plus_interval_expr = to_timestamp_expr(ts_string)
.cast_to(&DataType::Date32, schema)?
+ Expr::Literal(ScalarValue::IntervalDayTime(Some(123i64 << 32)));

let plan = LogicalPlanBuilder::from(table_scan.clone())
.project(vec![date_plus_interval_expr])?
.build()?;

// Note that constant folder runs and folds the entire
// expression down to a single constant (true)
let expected = r#"Projection: Date32("18636") AS to_timestamp(Utf8("2020-09-08T12:05:00+00:00")) + IntervalDayTime("528280977408")
TableScan: test"#;
let actual = get_optimized_plan_formatted(&plan, &time);

assert_eq!(expected, actual);
Ok(())
}
Loading
Loading