Skip to content

Commit

Permalink
[Feature][starter][flink] Support transform-v2 for flink (apache#3396)
Browse files Browse the repository at this point in the history
* [Feature][starter][flink] Support transform-v2 for flink
  • Loading branch information
john8628 authored and lhyundeadsoul committed Jan 3, 2023
1 parent 8dc576a commit fe2bca9
Show file tree
Hide file tree
Showing 16 changed files with 84 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,30 @@
package org.apache.seatunnel.core.starter.flink.execution;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFlinkTransformPluginDiscovery;
import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import com.google.common.collect.Lists;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class TransformExecuteProcessor extends AbstractPluginExecuteProcessor<FlinkStreamTransform> {
public class TransformExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunnelTransform> {

private static final String PLUGIN_TYPE = "transform";

Expand All @@ -43,17 +50,19 @@ protected TransformExecuteProcessor(List<URL> jarPaths, List<? extends Config> p
}

@Override
protected List<FlinkStreamTransform> initializePlugins(List<URL> jarPaths, List<? extends Config> pluginConfigs) {
protected List<SeaTunnelTransform> initializePlugins(List<URL> jarPaths, List<? extends Config> pluginConfigs) {
SeaTunnelFlinkTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelFlinkTransformPluginDiscovery();
List<URL> pluginJars = new ArrayList<>();
List<FlinkStreamTransform> transforms = pluginConfigs.stream()
List<SeaTunnelTransform> transforms = pluginConfigs.stream()
.map(transformConfig -> {
PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, transformConfig.getString(PLUGIN_NAME));
pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
FlinkStreamTransform pluginInstance = (FlinkStreamTransform) transformPluginDiscovery.createPluginInstance(pluginIdentifier);
pluginInstance.setConfig(transformConfig);
pluginInstance.prepare(flinkEnvironment);
return pluginInstance;
List<URL> pluginJarPaths = transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
SeaTunnelTransform<?> seaTunnelTransform =
transformPluginDiscovery.createPluginInstance(pluginIdentifier);
jarPaths.addAll(pluginJarPaths);
seaTunnelTransform.prepare(transformConfig);
seaTunnelTransform.setJobContext(jobContext);
return seaTunnelTransform;
}).distinct().collect(Collectors.toList());
jarPaths.addAll(pluginJars);
return transforms;
Expand All @@ -68,12 +77,11 @@ public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
List<DataStream<Row>> result = new ArrayList<>();
for (int i = 0; i < plugins.size(); i++) {
try {
FlinkStreamTransform transform = plugins.get(i);
SeaTunnelTransform<SeaTunnelRow> transform = plugins.get(i);
Config pluginConfig = pluginConfigs.get(i);
DataStream<Row> stream = fromSourceTable(pluginConfig).orElse(input);
input = transform.processStream(flinkEnvironment, stream);
input = flinkTransform(transform, stream);
registerResultTable(pluginConfig, input);
transform.registerFunction(flinkEnvironment);
result.add(input);
} catch (Exception e) {
throw new TaskExecuteException(
Expand All @@ -82,4 +90,25 @@ public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
}
return result;
}

protected DataStream<Row> flinkTransform(SeaTunnelTransform transform, DataStream<Row> stream) {
SeaTunnelDataType seaTunnelDataType = TypeConverterUtils.convert(stream.getType());
transform.setTypeInfo(seaTunnelDataType);
TypeInformation rowTypeInfo = TypeConverterUtils.convert(transform.getProducedType());
FlinkRowConverter transformInputRowConverter = new FlinkRowConverter(seaTunnelDataType);
FlinkRowConverter transformOutputRowConverter = new FlinkRowConverter(transform.getProducedType());
DataStream<Row> output = stream.flatMap(new FlatMapFunction<Row, Row>() {
@Override
public void flatMap(Row value, Collector<Row> out) throws Exception {
SeaTunnelRow seaTunnelRow = transformInputRowConverter.reconvert(value);
SeaTunnelRow dataRow = (SeaTunnelRow) transform.map(seaTunnelRow);
if (dataRow != null) {
Row copy = transformOutputRowConverter.convert(dataRow);
out.collect(copy);
}
}
},
rowTypeInfo);
return output;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,19 @@ source {
}

transform {
sql {
sql = "select name,age from fake"
}
Filter {
source_table_name = "fake"
result_table_name = "fake1"
fields = ["name", "age"]
}

# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
}

sink {
Assert {
source_table_name = "fake1"
rules =
{
row_rules = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,19 @@ source {
}

transform {
sql {
sql = "SELECT 'root.sink_group.device_a' AS device_name, ts, c_string, c_boolean, c_tinyint, c_smallint, c_int, c_bigint, c_float, c_double FROM fake WHERE device_name = 'root.source_group.device_a' UNION SELECT 'root.sink_group.device_b' AS device_name, ts, c_string, c_boolean, c_tinyint, c_smallint, c_int, c_bigint, c_float, c_double FROM fake WHERE device_name = 'root.source_group.device_b'"
Replace {
source_table_name = "fake"
result_table_name = "fake1"
replace_field = "device_name"
pattern = "root.source_group"
replacement = "root.sink_group"
is_regex = false
}
}

sink {
IoTDB {
source_table_name = "fake1"
node_urls = ["flink_e2e_iotdb_sink:6667"]
username = "root"
password = "root"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
TestCaseInvocationContextProvider.class
})
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@DisabledOnContainer(value = {}, type = EngineType.SEATUNNEL, disabledReason = "TODO: SeaTunnel engine e2e test isn't completed")
@DisabledOnContainer(value = {}, type = {EngineType.SEATUNNEL, EngineType.SPARK}, disabledReason = "TODO: SeaTunnel engine e2e test isn't completed")
public abstract class TestSuiteBase {

protected static final Network NETWORK = TestContainer.NETWORK;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ source {
}

transform {
sql {
sql = "select name,age from fake"
}

# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ source {
}

transform {
sql {
sql = "select name from fake"
}

# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@ source{
}

transform {
sql {
sql = "select name,age from jdbc"
Filter {
source_table_name = "jdbc"
result_table_name = "jdbc1"
fields = ["name", "age"]
}

}

sink {
jdbc {

source_table_name = "jdbc1"
url = "jdbc:mysql://mysql:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@ source{
}

transform {
sql {
sql = "select name,age from jdbc"
Filter {
source_table_name = "jdbc"
result_table_name = "jdbc1"
fields = ["name", "age"]
}
}

sink {
jdbc {

source_table_name = "jdbc1"
url = "jdbc:mysql://mysql:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ source{
}

transform {
sql {
sql = "select name,age from jdbc"
Filter {
source_table_name = "jdbc"
result_table_name = "jdbc1"
fields = ["name", "age"]
}
}

sink {
jdbc {

source_table_name = "jdbc1"
url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"
user = "root"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@ source{
}

transform {
sql {
sql = "select name,age from jdbc"
Filter {
source_table_name = "jdbc"
result_table_name = "jdbc1"
fields = ["name", "age"]
}
}

sink {
jdbc {
source_table_name = "jdbc1"
url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
TestCaseInvocationContextProvider.class
})
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@DisabledOnContainer(value = {}, type = {EngineType.FLINK, EngineType.SPARK}, disabledReason = "TODO: Transform v2 translation to spark/flink isn't completed")
@DisabledOnContainer(value = {}, type = {EngineType.SPARK}, disabledReason = "TODO: Transform v2 translation to spark isn't completed")
public abstract class TestSuiteBase {

protected static final Network NETWORK = TestContainer.NETWORK;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
<version>${flink.1.13.6.version}</version>
<scope>${flink.scope}</scope>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ source {
}

transform {
sql {
sql = "select name,age from fake"
}

# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/category/transform
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,16 @@

package org.apache.seatunnel.plugin.discovery.seatunnel;

import org.apache.seatunnel.flink.BaseFlinkTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;

/**
* Discovery for the SeaTunnel Flink transform.
*/
public class SeaTunnelFlinkTransformPluginDiscovery extends AbstractPluginDiscovery<BaseFlinkTransform> {

public class SeaTunnelFlinkTransformPluginDiscovery extends AbstractPluginDiscovery<SeaTunnelTransform> {
public SeaTunnelFlinkTransformPluginDiscovery() {
super("seatunnel");
}

@Override
protected Class<BaseFlinkTransform> getPluginBaseClass() {
return BaseFlinkTransform.class;
protected Class<SeaTunnelTransform> getPluginBaseClass() {
return SeaTunnelTransform.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.api.table.type.SqlType;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -33,7 +34,7 @@
*
* @param <T> engine row
*/
public abstract class RowConverter<T> {
public abstract class RowConverter<T> implements Serializable {
protected final SeaTunnelDataType<?> dataType;

public RowConverter(SeaTunnelDataType<?> dataType) {
Expand Down

0 comments on commit fe2bca9

Please sign in to comment.