Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][transform] transform support explode #7928

Merged
merged 31 commits into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9b09ad8
[Feature][transform] transform support explode
CosmosNi Oct 28, 2024
d83c2aa
[Feature][transform] transform support explode
CosmosNi Oct 28, 2024
6f7f52d
[Feature][transform] transform support explode
CosmosNi Oct 29, 2024
6b80660
[Feature][transform] transform support explode
CosmosNi Oct 29, 2024
d30be82
[Feature][transform] explode transform support spilt list
CosmosNi Oct 30, 2024
48adff1
[Feature][transform] explode transform support spilt list
CosmosNi Oct 30, 2024
3a55991
[Feature][transform] explode transform support spilt list
CosmosNi Oct 30, 2024
87f3bef
[Feature][transform] sql transform support lateral view explode
CosmosNi Nov 1, 2024
ecb676c
Merge remote-tracking branch 'refs/remotes/upstream/dev' into feature…
CosmosNi Nov 1, 2024
445ccc1
[Feature][transform] sql transform support lateral view explode
CosmosNi Nov 1, 2024
83f2ca7
[Feature][transform] sql transform support lateral view explode
CosmosNi Nov 1, 2024
4c23649
Merge remote-tracking branch 'refs/remotes/upstream/dev' into feature…
CosmosNi Nov 5, 2024
5a1d277
[Feature][transform] sql transform support lateral view explode
CosmosNi Nov 5, 2024
db7b403
[Feature][transform] sql transform support lateral view explode
CosmosNi Nov 5, 2024
caea970
[Feature][transform] sql transform support lateral view explode
CosmosNi Nov 5, 2024
cb0f56d
[Feature][transform] sql transform support lateral view explode
CosmosNi Nov 5, 2024
4bfa386
[Feature][transform] sql transform support lateral view explode
CosmosNi Nov 5, 2024
46125fe
[Feature][transform] fix cr
CosmosNi Nov 5, 2024
2c440d5
[Feature][transform] sql transform support lateral view explode
CosmosNi Nov 6, 2024
88d49ca
[Feature][transform] sql transform support lateral view explode
CosmosNi Nov 6, 2024
873a951
[Feature][transform] sql transform support lateral view explode
CosmosNi Nov 6, 2024
c5c0399
[Feature][transform] sql transform support lateral view explode
CosmosNi Nov 6, 2024
97ea914
[Feature][transform] sql transform support lateral view explode
CosmosNi Nov 6, 2024
27f320d
[Feature][transform] sql transform support lateral view explode
CosmosNi Nov 6, 2024
e1c60e3
[Feature][transform] handle null
CosmosNi Nov 6, 2024
5f80930
[Feature][transform] fix array type
CosmosNi Nov 6, 2024
fd715b9
[Feature][transform] fix error word
CosmosNi Nov 8, 2024
013c284
[Feature][transform] transform support explode
CosmosNi Nov 8, 2024
619cd2f
[Feature][RestAPI] Support submit job with seatunnel style hocon form…
CosmosNi Nov 8, 2024
46f0857
[Feature][transform] transform support explode
CosmosNi Nov 8, 2024
b337b50
[Feature][transform] transform support explode
CosmosNi Nov 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/en/transform-v2/sql-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -984,3 +984,33 @@ Example:

select UUID() as seatunnel_uuid

### ARRAY

Generate an array.

Example:

select ARRAY('test1','test2','test3') as arrays

### SPLIT
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Split a string into an array.

Example:

select SPLIT(test,';') as arrays

### LATERAL VIEW
#### EXPLODE

explode array column to rows.
OUTER EXPLODE will return NULL, while array is NULL or empty
EXPLODE(SPLIT(FIELD_NAME,separator))Used to split string type. The first parameter of SPLIT function is the field name, the second parameter is the separator
EXPLODE(ARRAY(value1,value2)) Used to custom array type.
```
SELECT * FROM fake
LATERAL VIEW EXPLODE ( SPLIT ( NAME, ',' ) ) AS NAME
LATERAL VIEW EXPLODE ( SPLIT ( pk_id, ';' ) ) AS pk_id
LATERAL VIEW OUTER EXPLODE ( age ) AS age
LATERAL VIEW OUTER EXPLODE ( ARRAY(1,1) ) AS num
```
31 changes: 31 additions & 0 deletions docs/zh/transform-v2/sql-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -975,3 +975,34 @@ case when c_string in ('c_string') then 1 else 0 end

select UUID() as seatunnel_uuid


### ARRAY

生成一个数组。

示例:

select ARRAY('test1','test2','test3') as arrays

### SPLIT

将字符串切分成数组。

示例:

select SPLIT(test,';') as arrays

### LATERAL VIEW
#### EXPLODE

将 array 列展开成多行。
OUTER EXPLODE 当 array 为NULL或者为空时,返回NULL
EXPLODE(SPLIT(FIELD_NAME,separator))用来切分字符串类型,SPLIT 第一个参数是字段名,第二个参数是分隔符
EXPLODE(ARRAY(value1,value2)) 用于自定义数组切分,在原有基础上生成一个新的字段。
```
SELECT * FROM fake
LATERAL VIEW EXPLODE ( SPLIT ( NAME, ',' ) ) AS NAME
LATERAL VIEW EXPLODE ( SPLIT ( pk_id, ';' ) ) AS pk_id
LATERAL VIEW OUTER EXPLODE ( age ) AS age
LATERAL VIEW OUTER EXPLODE ( ARRAY(1,1) ) AS num
```
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@
<maven-scm-provider-jgit.version>2.0.0</maven-scm-provider-jgit.version>
<testcontainer.version>1.17.6</testcontainer.version>
<spotless.version>2.29.0</spotless.version>
<jsqlparser.version>4.5</jsqlparser.version>
<jsqlparser.version>4.9</jsqlparser.version>
<json-path.version>2.7.0</json-path.version>
<groovy.version>4.0.16</groovy.version>
<jetty.version>9.4.56.v20240826</jetty.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.api.transform;

import java.util.List;

public interface SeaTunnelMultiRowTransform<T> extends SeaTunnelTransform<T> {

/**
* Transform input data to {@link this#getProducedCatalogTable().getSeaTunnelRowType()} types
* data.
*
* @param row the data need be transformed.
* @return transformed data.
*/
List<T> flatMap(T row);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


default T map(T row) {
throw new UnsupportedOperationException("Heads-up conversion is not supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import net.sf.jsqlparser.statement.insert.Insert;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.statement.select.SelectExpressionItem;
import net.sf.jsqlparser.statement.select.SelectItem;

import java.nio.file.Files;
Expand Down Expand Up @@ -360,12 +359,12 @@ private static void parseInsertSql(
String sourceTableName;
String resultTableName;
if (plainSelect.getFromItem() == null) {
List<SelectItem> selectItems = plainSelect.getSelectItems();
List<SelectItem<?>> selectItems = plainSelect.getSelectItems();
if (selectItems.size() != 1) {
throw new ParserException(
"Source table must be specified in SQL: " + insertSql);
}
SelectExpressionItem selectItem = (SelectExpressionItem) selectItems.get(0);
SelectItem<?> selectItem = selectItems.get(0);
Column column = (Column) selectItem.getExpression();
sourceTableName = column.getColumnName();
resultTableName = sourceTableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ protected Collection<JdbcSourceSplit> createSplits(
partitionEnd = range.getRight();
}
if (partitionStart == null || partitionEnd == null) {
JdbcSourceSplit spilt = createSingleSplit(table);
return Collections.singletonList(spilt);
JdbcSourceSplit split = createSingleSplit(table);
return Collections.singletonList(split);
}

return createNumberColumnSplits(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
import net.sf.jsqlparser.statement.select.AllColumns;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.statement.select.SelectBody;
import net.sf.jsqlparser.statement.select.SelectExpressionItem;
import net.sf.jsqlparser.statement.select.SelectItem;

import java.math.BigDecimal;
Expand Down Expand Up @@ -83,7 +81,7 @@ public static PlainSelect convertToPlainSelect(String query) {
throw new IllegalArgumentException("Only SELECT statements are supported.");
}
Select select = (Select) statement;
SelectBody selectBody = select.getSelectBody();
Select selectBody = select.getSelectBody();
if (!(selectBody instanceof PlainSelect)) {
throw new IllegalArgumentException("Only simple SELECT statements are supported.");
}
Expand All @@ -101,18 +99,15 @@ public static PlainSelect convertToPlainSelect(String query) {
public static int[] convertSqlSelectToPaimonProjectionIndex(
String[] fieldNames, PlainSelect plainSelect) {
int[] projectionIndex = null;
List<SelectItem> selectItems = plainSelect.getSelectItems();
List<SelectItem<?>> selectItems = plainSelect.getSelectItems();

List<String> columnNames = new ArrayList<>();
for (SelectItem selectItem : selectItems) {
if (selectItem instanceof AllColumns) {
if (selectItem.getExpression() instanceof AllColumns) {
return null;
} else if (selectItem instanceof SelectExpressionItem) {
SelectExpressionItem selectExpressionItem = (SelectExpressionItem) selectItem;
String columnName = selectExpressionItem.getExpression().toString();
columnNames.add(columnName);
} else {
throw new IllegalArgumentException("Error encountered parsing query fields.");
String columnName = ((Column) selectItem.getExpression()).getColumnName();
columnNames.add(columnName);
Comment on lines +102 to +110
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @dailai and @TaoZex

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;

import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.util.Collector;

import java.net.URL;
import java.util.ArrayList;
Expand Down Expand Up @@ -140,6 +144,11 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS

protected DataStream<SeaTunnelRow> flinkTransform(
SeaTunnelTransform transform, DataStream<SeaTunnelRow> stream) {
if (transform instanceof SeaTunnelMultiRowTransform) {
return stream.flatMap(
new ArrayFlatMap(transform), TypeInformation.of(SeaTunnelRow.class));
}

return stream.transform(
String.format("%s-Transform", transform.getPluginName()),
TypeInformation.of(SeaTunnelRow.class),
Expand All @@ -151,4 +160,24 @@ protected DataStream<SeaTunnelRow> flinkTransform(
((SeaTunnelTransform<SeaTunnelRow>) transform)
.map(row))));
}

public static class ArrayFlatMap implements FlatMapFunction<SeaTunnelRow, SeaTunnelRow> {

private SeaTunnelTransform transform;

public ArrayFlatMap(SeaTunnelTransform transform) {
this.transform = transform;
}

@Override
public void flatMap(SeaTunnelRow row, Collector<SeaTunnelRow> collector) {
List<SeaTunnelRow> rows =
((SeaTunnelMultiRowTransform<SeaTunnelRow>) transform).flatMap(row);
if (CollectionUtils.isNotEmpty(rows)) {
for (SeaTunnelRow rowResult : rows) {
collector.collect(rowResult);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelMultiRowTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
Expand All @@ -35,7 +36,8 @@
import org.apache.seatunnel.translation.spark.serialization.SeaTunnelRowConverter;
import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;

import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.commons.collections.CollectionUtils;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
Expand All @@ -45,7 +47,6 @@

import lombok.extern.slf4j.Slf4j;

import java.io.Serializable;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -164,57 +165,49 @@ private Dataset<Row> sparkTransform(SeaTunnelTransform transform, DatasetTableIn
SeaTunnelRowConverter inputRowConverter = new SeaTunnelRowConverter(inputDataType);
SeaTunnelRowConverter outputRowConverter = new SeaTunnelRowConverter(outputDataTYpe);
ExpressionEncoder<Row> encoder = RowEncoder.apply(outputSchema);
return stream.mapPartitions(
(MapPartitionsFunction<Row, Row>)
(Iterator<Row> rowIterator) ->
new TransformIterator(
rowIterator,
transform,
outputSchema,
inputRowConverter,
outputRowConverter),

return stream.flatMap(
new TransformMapPartitionsFunction(
transform, inputRowConverter, outputRowConverter),
encoder)
.filter(Objects::nonNull);
}

private static class TransformIterator implements Iterator<Row>, Serializable {
private Iterator<Row> sourceIterator;
private static class TransformMapPartitionsFunction implements FlatMapFunction<Row, Row> {
private SeaTunnelTransform<SeaTunnelRow> transform;
private StructType structType;
private SeaTunnelRowConverter inputRowConverter;
private SeaTunnelRowConverter outputRowConverter;

public TransformIterator(
Iterator<Row> sourceIterator,
public TransformMapPartitionsFunction(
SeaTunnelTransform<SeaTunnelRow> transform,
StructType structType,
SeaTunnelRowConverter inputRowConverter,
SeaTunnelRowConverter outputRowConverter) {
this.sourceIterator = sourceIterator;
this.transform = transform;
this.structType = structType;
this.inputRowConverter = inputRowConverter;
this.outputRowConverter = outputRowConverter;
}

@Override
public boolean hasNext() {
return sourceIterator.hasNext();
}

@Override
public Row next() {
try {
Row row = sourceIterator.next();
SeaTunnelRow seaTunnelRow = inputRowConverter.unpack((GenericRowWithSchema) row);
seaTunnelRow = (SeaTunnelRow) transform.map(seaTunnelRow);
if (seaTunnelRow == null) {
return null;
public Iterator<Row> call(Row row) throws Exception {
List<Row> rows = new ArrayList<>();

SeaTunnelRow seaTunnelRow = inputRowConverter.unpack((GenericRowWithSchema) row);
if (transform instanceof SeaTunnelMultiRowTransform) {
List<SeaTunnelRow> seaTunnelRows =
((SeaTunnelMultiRowTransform<SeaTunnelRow>) transform)
.flatMap(seaTunnelRow);
if (CollectionUtils.isNotEmpty(seaTunnelRows)) {
for (SeaTunnelRow seaTunnelRowTransform : seaTunnelRows) {
rows.add(outputRowConverter.parcel(seaTunnelRowTransform));
}
}
} else {
SeaTunnelRow seaTunnelRowTransform = transform.map(seaTunnelRow);
if (seaTunnelRowTransform != null) {
rows.add(outputRowConverter.parcel(seaTunnelRowTransform));
}
return outputRowConverter.parcel(seaTunnelRow);
} catch (Exception e) {
throw new TaskExecuteException("Row convert failed, caused: " + e.getMessage(), e);
}
return rows.iterator();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public void testSQLTransform(TestContainer container) throws IOException, Interr
Assertions.assertEquals(0, sqlAllColumns.getExitCode());
Container.ExecResult caseWhenSql = container.executeJob("/sql_transform/case_when.conf");
Assertions.assertEquals(0, caseWhenSql.getExitCode());

Container.ExecResult execResultBySql =
container.executeJob("/sql_transform/explode_transform.conf");
Assertions.assertEquals(0, execResultBySql.getExitCode());
}

@TestTemplate
Expand Down
Loading
Loading