Skip to content

Commit

Permalink
feat: Array element extraction (#899)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman authored Sep 5, 2024
1 parent 60cad71 commit 0f80df2
Show file tree
Hide file tree
Showing 8 changed files with 486 additions and 9 deletions.
12 changes: 7 additions & 5 deletions docs/source/user-guide/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,13 @@ The following Spark expressions are currently available. Any known compatibility

## Complex Types

| Expression | Notes |
| ----------------- | ----- |
| CreateNamedStruct | |
| GetElementAt | |
| StructsToJson | |
| Expression | Notes |
| ----------------- | ----------- |
| CreateNamedStruct | |
| ElementAt | Arrays only |
| GetArrayItem | |
| GetStructField | |
| StructsToJson | |

## Other

Expand Down
5 changes: 3 additions & 2 deletions docs/spark_expressions_support.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
- [x] variance

### array_funcs
- [ ] array
- [x] array
- [ ] array_append
- [ ] array_compact
- [ ] array_contains
Expand All @@ -97,8 +97,9 @@
- [ ] array_union
- [ ] arrays_overlap
- [ ] arrays_zip
- [x] element_at
- [ ] flatten
- [ ] get
- [x] get
- [ ] sequence
- [ ] shuffle
- [ ] slice
Expand Down
22 changes: 20 additions & 2 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ use datafusion_comet_proto::{
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning},
};
use datafusion_comet_spark_expr::{
Cast, CreateNamedStruct, DateTruncExpr, GetStructField, HourExpr, IfExpr, MinuteExpr, RLike,
SecondExpr, TimestampTruncExpr, ToJson,
Cast, CreateNamedStruct, DateTruncExpr, GetStructField, HourExpr, IfExpr, ListExtract,
MinuteExpr, RLike, SecondExpr, TimestampTruncExpr, ToJson,
};
use datafusion_common::scalar::ScalarStructBuilder;
use datafusion_common::{
Expand Down Expand Up @@ -660,6 +660,24 @@ impl PhysicalPlanner {
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?;
Ok(Arc::new(ToJson::new(child, &expr.timezone)))
}
ExprStruct::ListExtract(expr) => {
let child =
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
let ordinal =
self.create_expr(expr.ordinal.as_ref().unwrap(), Arc::clone(&input_schema))?;
let default_value = expr
.default_value
.as_ref()
.map(|e| self.create_expr(e, Arc::clone(&input_schema)))
.transpose()?;
Ok(Arc::new(ListExtract::new(
child,
ordinal,
default_value,
expr.one_based,
expr.fail_on_error,
)))
}
expr => Err(ExecutionError::GeneralError(format!(
"Not implemented: {:?}",
expr
Expand Down
9 changes: 9 additions & 0 deletions native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ message Expr {
CreateNamedStruct create_named_struct = 53;
GetStructField get_struct_field = 54;
ToJson to_json = 55;
ListExtract list_extract = 56;
}
}

Expand Down Expand Up @@ -508,6 +509,14 @@ message GetStructField {
int32 ordinal = 2;
}

message ListExtract {
Expr child = 1;
Expr ordinal = 2;
Expr default_value = 3;
bool one_based = 4;
bool fail_on_error = 5;
}

enum SortDirection {
Ascending = 0;
Descending = 1;
Expand Down
2 changes: 2 additions & 0 deletions native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod error;
mod if_expr;

mod kernels;
mod list;
mod regexp;
pub mod scalar_funcs;
pub mod spark_hash;
Expand All @@ -37,6 +38,7 @@ mod xxhash64;
pub use cast::{spark_cast, Cast};
pub use error::{SparkError, SparkResult};
pub use if_expr::IfExpr;
pub use list::ListExtract;
pub use regexp::RLike;
pub use structs::{CreateNamedStruct, GetStructField};
pub use temporal::{DateTruncExpr, HourExpr, MinuteExpr, SecondExpr, TimestampTruncExpr};
Expand Down
Loading

0 comments on commit 0f80df2

Please sign in to comment.