diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 1835eebc57..6ce3f1e4ac 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -239,6 +239,7 @@ impl DeltaTableState { } } +// TODO: Collapse with operations/transaction/state.rs method of same name fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option { let field = table .get_schema() @@ -262,7 +263,9 @@ fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option Some(v) => serde_json::Value::String(v.to_string()), None => serde_json::Value::Null, }; - to_correct_scalar_value(&value, &data_type).unwrap_or(ScalarValue::Null) + to_correct_scalar_value(&value, &data_type).unwrap_or( + get_null_of_arrow_type(&data_type).expect("Could not determine null type"), + ) } else if let Ok(Some(statistics)) = add.get_stats() { let values = if get_max { statistics.max_values @@ -273,9 +276,12 @@ fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option values .get(&column.name) .and_then(|f| to_correct_scalar_value(f.as_value()?, &data_type)) - .unwrap_or(ScalarValue::Null) + .unwrap_or( + get_null_of_arrow_type(&data_type).expect("Could not determine null type"), + ) } else { - ScalarValue::Null + // No statistics available + get_null_of_arrow_type(&data_type).expect("Could not determine null type") } }); ScalarValue::iter_to_array(values).ok() @@ -547,7 +553,7 @@ impl ExecutionPlan for DeltaScan { } } -fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult { +pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult { match t { ArrowDataType::Null => Ok(ScalarValue::Null), ArrowDataType::Boolean => Ok(ScalarValue::Boolean(None)), @@ -584,11 +590,14 @@ fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult { TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(None, tz), }) } + ArrowDataType::Dictionary(k, v) => Ok(ScalarValue::Dictionary( + k.clone(), + Box::new(get_null_of_arrow_type(v).unwrap()), + )), //Unsupported types... ArrowDataType::Float16 | ArrowDataType::Decimal256(_, _) | ArrowDataType::Union(_, _) - | ArrowDataType::Dictionary(_, _) | ArrowDataType::LargeList(_) | ArrowDataType::Struct(_) | ArrowDataType::List(_) diff --git a/rust/src/operations/transaction/state.rs b/rust/src/operations/transaction/state.rs index bae30c748c..397601c547 100644 --- a/rust/src/operations/transaction/state.rs +++ b/rust/src/operations/transaction/state.rs @@ -20,7 +20,9 @@ use sqlparser::parser::Parser; use sqlparser::tokenizer::Tokenizer; use crate::action::Add; -use crate::delta_datafusion::{logical_expr_to_physical_expr, to_correct_scalar_value}; +use crate::delta_datafusion::{ + get_null_of_arrow_type, logical_expr_to_physical_expr, to_correct_scalar_value, +}; use crate::table_state::DeltaTableState; use crate::DeltaResult; use crate::DeltaTableError; @@ -190,6 +192,8 @@ impl<'a> AddContainer<'a> { return None; } + let data_type = field.data_type(); + let values = self.inner.iter().map(|add| { if self.partition_columns.contains(&column.name) { let value = add.partition_values.get(&column.name).unwrap(); @@ -197,7 +201,9 @@ impl<'a> AddContainer<'a> { Some(v) => serde_json::Value::String(v.to_string()), None => serde_json::Value::Null, }; - to_correct_scalar_value(&value, field.data_type()).unwrap_or(ScalarValue::Null) + to_correct_scalar_value(&value, data_type).unwrap_or( + get_null_of_arrow_type(data_type).expect("Could not determine null type"), + ) } else if let Ok(Some(statistics)) = add.get_stats() { let values = if get_max { statistics.max_values @@ -207,10 +213,12 @@ impl<'a> AddContainer<'a> { values .get(&column.name) - .and_then(|f| to_correct_scalar_value(f.as_value()?, field.data_type())) - .unwrap_or(ScalarValue::Null) + .and_then(|f| to_correct_scalar_value(f.as_value()?, data_type)) + .unwrap_or( + get_null_of_arrow_type(data_type).expect("Could not determine null type"), + ) } else { - ScalarValue::Null + get_null_of_arrow_type(data_type).expect("Could not determine null type") } }); ScalarValue::iter_to_array(values).ok() diff --git a/rust/tests/data/issue_1374/_delta_log/00000000000000000000.json b/rust/tests/data/issue_1374/_delta_log/00000000000000000000.json new file mode 100644 index 0000000000..493d40ebfb --- /dev/null +++ b/rust/tests/data/issue_1374/_delta_log/00000000000000000000.json @@ -0,0 +1,3 @@ +{"protocol":{"minReaderVersion":1,"minWriterVersion":1}} +{"metaData":{"id":"d5ad9276-c21f-474e-bfa8-996099dce265","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"temperature\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["date"],"createdTime":1684886484991,"configuration":{}}} +{"commitInfo":{"timestamp":1684886484992,"operation":"CREATE TABLE","operationParameters":{"mode":"ErrorIfExists","metadata":"{\"configuration\":{},\"created_time\":1684886484991,\"description\":null,\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"d5ad9276-c21f-474e-bfa8-996099dce265\",\"name\":null,\"partition_columns\":[\"date\"],\"schema\":{\"fields\":[{\"metadata\":{},\"name\":\"timestamp\",\"nullable\":true,\"type\":\"timestamp\"},{\"metadata\":{},\"name\":\"temperature\",\"nullable\":true,\"type\":\"integer\"},{\"metadata\":{},\"name\":\"date\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"}}","protocol":"{\"minReaderVersion\":1,\"minWriterVersion\":1}","location":"file:///Users/cole/github.com/cmackenzie1/delta-rs/rust/tests/data/issue_1374"},"clientVersion":"delta-rs.0.11.0"}} \ No newline at end of file diff --git a/rust/tests/data/issue_1374/_delta_log/00000000000000000001.checkpoint.parquet b/rust/tests/data/issue_1374/_delta_log/00000000000000000001.checkpoint.parquet new file mode 100644 index 0000000000..ea7b775bb5 Binary files /dev/null and b/rust/tests/data/issue_1374/_delta_log/00000000000000000001.checkpoint.parquet differ diff --git a/rust/tests/data/issue_1374/_delta_log/00000000000000000001.json b/rust/tests/data/issue_1374/_delta_log/00000000000000000001.json new file mode 100644 index 0000000000..74ce0bf390 --- /dev/null +++ b/rust/tests/data/issue_1374/_delta_log/00000000000000000001.json @@ -0,0 +1,3 @@ +{"add":{"path":"date=2023-05-24/part-00000-e2b01fc6-a906-4008-82df-e98efdcdd49c-c000.snappy.parquet","size":1021,"partitionValues":{"date":"2023-05-24"},"modificationTime":1684886485017,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"timestamp\":null,\"temperature\":8},\"maxValues\":{\"timestamp\":\"2023-05-24T00:01:25.014Z\",\"temperature\":90},\"nullCount\":{\"temperature\":0,\"timestamp\":0}}","tags":null}} +{"add":{"path":"date=2023-05-24/part-00000-e2b01fc6-a906-4008-82df-e98efdcdd47d-c000.snappy.parquet","size":1021,"partitionValues":{"date":"2023-05-24"},"modificationTime":1684886485017,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"timestamp\":\"2023-05-24T00:01:25.014Z\",\"temperature\":8},\"maxValues\":{\"timestamp\":\"2023-05-24T00:01:25.014Z\",\"temperature\":90},\"nullCount\":{\"temperature\":0,\"timestamp\":0}}","tags":null}} +{"commitInfo":{"timestamp":1685483647338,"clientVersion":"delta-rs.0.11.0"}} \ No newline at end of file diff --git a/rust/tests/data/issue_1374/_delta_log/_last_checkpoint b/rust/tests/data/issue_1374/_delta_log/_last_checkpoint new file mode 100644 index 0000000000..1c0d1f36c5 --- /dev/null +++ b/rust/tests/data/issue_1374/_delta_log/_last_checkpoint @@ -0,0 +1 @@ +{"parts":null,"size":20622,"version":1} \ No newline at end of file diff --git a/rust/tests/data/issue_1374/date=2023-05-24/part-00000-e2b01fc6-a906-4008-82df-e98efdcdd47d-c000.snappy.parquet b/rust/tests/data/issue_1374/date=2023-05-24/part-00000-e2b01fc6-a906-4008-82df-e98efdcdd47d-c000.snappy.parquet new file mode 100644 index 0000000000..cc6a8f345f Binary files /dev/null and b/rust/tests/data/issue_1374/date=2023-05-24/part-00000-e2b01fc6-a906-4008-82df-e98efdcdd47d-c000.snappy.parquet differ diff --git a/rust/tests/data/issue_1374/date=2023-05-24/part-00000-e2b01fc6-a906-4008-82df-e98efdcdd49c-c000.snappy.parquet b/rust/tests/data/issue_1374/date=2023-05-24/part-00000-e2b01fc6-a906-4008-82df-e98efdcdd49c-c000.snappy.parquet new file mode 100644 index 0000000000..cc6a8f345f Binary files /dev/null and b/rust/tests/data/issue_1374/date=2023-05-24/part-00000-e2b01fc6-a906-4008-82df-e98efdcdd49c-c000.snappy.parquet differ diff --git a/rust/tests/datafusion_test.rs b/rust/tests/datafusion_test.rs index 19bc0b6e90..5d3e07f003 100644 --- a/rust/tests/datafusion_test.rs +++ b/rust/tests/datafusion_test.rs @@ -873,3 +873,40 @@ async fn test_issue_1291_datafusion_sql_partitioned_data() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_issue_1374() -> Result<()> { + let ctx = SessionContext::new(); + let table = deltalake::open_table("./tests/data/issue_1374") + .await + .unwrap(); + ctx.register_table("t", Arc::new(table))?; + + let batches = ctx + .sql( + r#"SELECT * + FROM t + WHERE timestamp BETWEEN '2023-05-24T00:00:00.000Z' AND '2023-05-25T00:00:00.000Z' + LIMIT 5 + "#, + ) + .await? + .collect() + .await?; + + let expected = vec![ + "+----------------------------+-------------+------------+", + "| timestamp | temperature | date |", + "+----------------------------+-------------+------------+", + "| 2023-05-24T00:01:25.010301 | 8 | 2023-05-24 |", + "| 2023-05-24T00:01:25.013902 | 21 | 2023-05-24 |", + "| 2023-05-24T00:01:25.013972 | 58 | 2023-05-24 |", + "| 2023-05-24T00:01:25.014025 | 24 | 2023-05-24 |", + "| 2023-05-24T00:01:25.014072 | 90 | 2023-05-24 |", + "+----------------------------+-------------+------------+", + ]; + + assert_batches_sorted_eq!(&expected, &batches); + + Ok(()) +}