Skip to content

Commit

Permalink
feat: support array_insert (#1073)
Browse files Browse the repository at this point in the history
* Part of the implementation of array_insert

* Missing methods

* Working version

* Reformat code

* Fix code-style

* Add comments about spark's implementation.

* Implement negative indices

+ fix tests for spark < 3.4

* Fix code-style

* Fix scalastyle

* Fix tests for spark < 3.4

* Fixes & tests

- added test for the negative index
- added test for the legacy spark mode

* Use assume(isSpark34Plus) in tests

* Test else-branch & improve coverage

* Update native/spark-expr/src/list.rs

Co-authored-by: Andy Grove <[email protected]>

* Fix fallback test

In one case there is a zero in index and test fails due to spark error

* Adjust the behaviour for the NULL case to Spark

* Move the logic of type checking to the method

* Fix code-style

---------

Co-authored-by: Andy Grove <[email protected]>
  • Loading branch information
SemyonSinchenko and andygrove authored Nov 22, 2024
1 parent e602305 commit 9990b34
Show file tree
Hide file tree
Showing 6 changed files with 537 additions and 12 deletions.
20 changes: 18 additions & 2 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ use datafusion_comet_proto::{
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning},
};
use datafusion_comet_spark_expr::{
Cast, CreateNamedStruct, DateTruncExpr, GetArrayStructFields, GetStructField, HourExpr, IfExpr,
ListExtract, MinuteExpr, RLike, SecondExpr, TimestampTruncExpr, ToJson,
ArrayInsert, Cast, CreateNamedStruct, DateTruncExpr, GetArrayStructFields, GetStructField,
HourExpr, IfExpr, ListExtract, MinuteExpr, RLike, SecondExpr, TimestampTruncExpr, ToJson,
};
use datafusion_common::scalar::ScalarStructBuilder;
use datafusion_common::{
Expand Down Expand Up @@ -720,6 +720,22 @@ impl PhysicalPlanner {
)?;
Ok(Arc::new(case_expr))
}
ExprStruct::ArrayInsert(expr) => {
let src_array_expr = self.create_expr(
expr.src_array_expr.as_ref().unwrap(),
Arc::clone(&input_schema),
)?;
let pos_expr =
self.create_expr(expr.pos_expr.as_ref().unwrap(), Arc::clone(&input_schema))?;
let item_expr =
self.create_expr(expr.item_expr.as_ref().unwrap(), Arc::clone(&input_schema))?;
Ok(Arc::new(ArrayInsert::new(
src_array_expr,
pos_expr,
item_expr,
expr.legacy_negative_index,
)))
}
expr => Err(ExecutionError::GeneralError(format!(
"Not implemented: {:?}",
expr
Expand Down
9 changes: 9 additions & 0 deletions native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ message Expr {
ListExtract list_extract = 56;
GetArrayStructFields get_array_struct_fields = 57;
BinaryExpr array_append = 58;
ArrayInsert array_insert = 59;
}
}

Expand Down Expand Up @@ -403,6 +404,14 @@ enum NullOrdering {
NullsLast = 1;
}

// Array functions
message ArrayInsert {
Expr src_array_expr = 1;
Expr pos_expr = 2;
Expr item_expr = 3;
bool legacy_negative_index = 4;
}

message DataType {
enum DataTypeId {
BOOL = 0;
Expand Down
2 changes: 1 addition & 1 deletion native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub mod utils;
pub use cast::{spark_cast, Cast};
pub use error::{SparkError, SparkResult};
pub use if_expr::IfExpr;
pub use list::{GetArrayStructFields, ListExtract};
pub use list::{ArrayInsert, GetArrayStructFields, ListExtract};
pub use regexp::RLike;
pub use structs::{CreateNamedStruct, GetStructField};
pub use temporal::{DateTruncExpr, HourExpr, MinuteExpr, SecondExpr, TimestampTruncExpr};
Expand Down
Loading

0 comments on commit 9990b34

Please sign in to comment.