Skip to content

Commit

Permalink
[FLINK-37267][table] Add support for UNNEST WITH ORDINALITY
Browse files Browse the repository at this point in the history
This closes apache#26113.
  • Loading branch information
gustavodemorais authored Feb 21, 2025
1 parent c92f208 commit 4091e85
Show file tree
Hide file tree
Showing 13 changed files with 2,179 additions and 314 deletions.
90 changes: 86 additions & 4 deletions docs/content/docs/dev/table/sql/queries/joins.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,14 +326,96 @@ FROM Orders AS o

In the example above, the Orders table is enriched with data from the Customers table which resides in a MySQL database. The `FOR SYSTEM_TIME AS OF` clause with the subsequent processing time attribute ensures that each row of the `Orders` table is joined with those Customers rows that match the join predicate at the point in time when the `Orders` row is processed by the join operator. It also prevents that the join result is updated when a joined `Customer` row is updated in the future. The lookup join also requires a mandatory equality join predicate, in the example above `o.customer_id = c.id`.

Array Expansion
Array, Multiset and Map Expansion
--------------

Returns a new row for each element in the given array. Unnesting `WITH ORDINALITY` is not yet supported.
Unnest returns a new row for each element in the given array, multiset or map. Supports both `CROSS JOIN` and `LEFT JOIN`.
```sql
-- Returns a new row for each element in a constant array
SELECT * FROM (VALUES('order_1')), UNNEST(ARRAY['shirt', 'pants', 'hat'])

id product_name
======= ============
order_1 shirt
order_1 pants
order_1 hat

-- Returns a new row for each element in the array
-- assuming a Orders table with an array column `product_names`
SELECT order_id, product_name
FROM Orders
CROSS JOIN UNNEST(product_names) AS t(product_name)
```

Unnesting `WITH ORDINALITY` is also supported. Currently, `WITH ORDINALITY` only supports `CROSS JOIN` but not `LEFT JOIN`.


```sql
SELECT order_id, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
-- Returns a new row for each element in a constant array and its position in the array
SELECT *
FROM (VALUES ('order_1'), ('order_2'))
CROSS JOIN UNNEST(ARRAY['shirt', 'pants', 'hat'])
WITH ORDINALITY AS t(product_name, index)

id product_name index
======= ============ =====
order_1 shirt 1
order_1 pants 2
order_1 hat 3
order_2 shirt 1
order_2 pants 2
order_2 hat 3

-- Returns a new row for each element and its position in the array
-- assuming a Orders table with an array column `product_names`
SELECT order_id, product_name, product_index
FROM Orders
CROSS JOIN UNNEST(product_names)
WITH ORDINALITY AS t(product_name, product_index)
```

An unnest with ordinality will return each element and the position of the element in the data structure, 1-indexed.
The order of the elements for arrays is guaranteed. Since maps and multisets are unordered, the order of the elements is not guaranteed.

```sql
-- Returns a new row each key/value pair in the map.
SELECT *
FROM
(VALUES('order_1'))
CROSS JOIN UNNEST(MAP['shirt', 2, 'pants', 1, 'hat', 1]) WITH ORDINALITY

id product_name amount index
======= ============ ===== =====
order_1 shirt 2 1
order_1 pants 1 2
order_1 hat 1 3

-- Returns a new row for each instance of a element in a multiset
-- If an element has been seen twice (multiplicity is 2), it will be returned twice
WITH ProductMultiset AS
(SELECT COLLECT(product_name) AS product_multiset
FROM (
VALUES ('shirt'), ('pants'), ('hat'), ('shirt'), ('hat')
) AS t(product_name)) -- produces { 'shirt': 2, 'pants': 1, 'hat': 2 }
SELECT id, product_name, ordinality
FROM
(VALUES ('order_1'), ('order_2')) AS t(id),
ProductMultiset
CROSS JOIN UNNEST(product_multiset) WITH
ORDINALITY AS u(product_name, ordinality);

id product_name index
======= ============ =====
order_1 shirt 1
order_1 shirt 2
order_1 pants 3
order_1 hat 4
order_1 hat 5
order_2 shirt 1
order_2 shirt 2
order_2 pants 3
order_2 hat 4
order_1 hat 5
```

Table Function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,17 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.internal()
.build();

public static final BuiltInFunctionDefinition INTERNAL_UNNEST_ROWS_WITH_ORDINALITY =
BuiltInFunctionDefinition.newBuilder()
.name("$UNNEST_ROWS_WITH_ORDINALITY$1")
.kind(TABLE)
.inputTypeStrategy(sequence(ANY))
.outputTypeStrategy(SpecificTypeStrategies.UNUSED)
.runtimeClass(
"org.apache.flink.table.runtime.functions.table.UnnestRowsWithOrdinalityFunction")
.internal()
.build();

public static final BuiltInFunctionDefinition INTERNAL_HASHCODE =
BuiltInFunctionDefinition.newBuilder()
.name("$HASHCODE$1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.functions.table.UnnestRowsFunction;
import org.apache.flink.table.runtime.functions.table.UnnestRowsFunctionBase;
import org.apache.flink.table.types.logical.LogicalType;

import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -68,26 +68,18 @@ public boolean matches(RelOptRuleCall call) {
LogicalFilter logicalFilter = (LogicalFilter) right;
RelNode relNode = getRel(logicalFilter.getInput());
if (relNode instanceof Uncollect) {
return !((Uncollect) relNode).withOrdinality;
return true;
} else if (relNode instanceof LogicalProject) {
LogicalProject logicalProject = (LogicalProject) relNode;
relNode = getRel(logicalProject.getInput());
if (relNode instanceof Uncollect) {
return !((Uncollect) relNode).withOrdinality;
}
return false;
return relNode instanceof Uncollect;
}
} else if (right instanceof LogicalProject) {
LogicalProject logicalProject = (LogicalProject) right;
RelNode relNode = getRel(logicalProject.getInput());
if (relNode instanceof Uncollect) {
Uncollect uncollect = (Uncollect) relNode;
return !uncollect.withOrdinality;
}
return false;
} else if (right instanceof Uncollect) {
Uncollect uncollect = (Uncollect) right;
return !uncollect.withOrdinality;
return relNode instanceof Uncollect;
} else {
return right instanceof Uncollect;
}
return false;
}
Expand Down Expand Up @@ -131,16 +123,22 @@ private RelNode convert(RelNode relNode, LogicalCorrelate correlate) {
((Map.Entry) uncollect.getInput().getRowType().getFieldList().get(0))
.getValue();
LogicalType logicalType = FlinkTypeFactory.toLogicalType(relDataType);

BridgingSqlFunction sqlFunction =
BridgingSqlFunction.of(
cluster, BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS);
cluster,
uncollect.withOrdinality
? BuiltInFunctionDefinitions
.INTERNAL_UNNEST_ROWS_WITH_ORDINALITY
: BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS);
RexNode rexCall =
cluster.getRexBuilder()
.makeCall(
typeFactory.createFieldTypeFromLogicalType(
toRowType(
UnnestRowsFunction.getUnnestedType(
logicalType))),
UnnestRowsFunctionBase.getUnnestedType(
logicalType,
uncollect.withOrdinality))),
sqlFunction,
((LogicalProject) getRel(uncollect.getInput())).getProjects());
return new LogicalTableFunctionScan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.functions.table.UnnestRowsFunction;
import org.apache.flink.table.runtime.functions.table.UnnestRowsFunctionBase;
import org.apache.flink.table.types.logical.LogicalType;

import org.apache.calcite.plan.RelOptCluster;
Expand All @@ -37,7 +37,6 @@
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlFunction;
import org.immutables.value.Value;

import java.util.Collections;
Expand Down Expand Up @@ -91,14 +90,20 @@ private RelNode convertUncollect(Uncollect uc) {
RelDataType relDataType = uc.getInput().getRowType().getFieldList().get(0).getValue();
LogicalType logicalType = FlinkTypeFactory.toLogicalType(relDataType);

SqlFunction sqlFunction =
BridgingSqlFunction.of(cluster, BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS);
BridgingSqlFunction sqlFunction =
BridgingSqlFunction.of(
cluster,
uc.withOrdinality
? BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS_WITH_ORDINALITY
: BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS);

RexNode rexCall =
cluster.getRexBuilder()
.makeCall(
typeFactory.createFieldTypeFromLogicalType(
toRowType(UnnestRowsFunction.getUnnestedType(logicalType))),
toRowType(
UnnestRowsFunctionBase.getUnnestedType(
logicalType, uc.withOrdinality))),
sqlFunction,
((LogicalProject) getRel(uc.getInput())).getProjects());
return new LogicalTableFunctionScan(
Expand Down
Loading

0 comments on commit 4091e85

Please sign in to comment.