From fe2bca91ef3f6a3206b787401333133370c15e8c Mon Sep 17 00:00:00 2001 From: john Date: Tue, 3 Jan 2023 10:15:44 +0800 Subject: [PATCH] [Feature][starter][flink] Support transform-v2 for flink (#3396) * [Feature][starter][flink] Support transform-v2 for flink --- .../execution/TransformExecuteProcessor.java | 53 ++++++++++++++----- .../assertion/fakesource_to_assert.conf | 9 ++-- .../resources/iotdb/iotdb_source_to_sink.conf | 10 +++- .../seatunnel/e2e/common/TestSuiteBase.java | 2 +- .../datahub/fakesource_to_datahub.conf | 4 -- .../resources/jdbc/fakesource_to_jdbc.conf | 3 -- .../jdbc_mysql_source_and_sink_parallel.conf | 9 ++-- ..._source_and_sink_parallel_upper_lower.conf | 8 +-- ...dbc_postgres_source_and_sink_parallel.conf | 8 +-- ..._source_and_sink_parallel_upper_lower.conf | 7 ++- .../e2e/transform/TestSuiteBase.java | 2 +- .../pom.xml | 1 - .../resources/examples/fake_to_console.conf | 3 -- .../flink/FlinkTransformPluginDiscovery.java | 42 --------------- ...eaTunnelFlinkTransformPluginDiscovery.java | 12 ++--- .../serialization/RowConverter.java | 3 +- 16 files changed, 84 insertions(+), 92 deletions(-) delete mode 100644 seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java index 4c56b3cdf192..39171f17201d 100644 --- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java @@ -18,23 +18,30 @@ package org.apache.seatunnel.core.starter.flink.execution; import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.transform.SeaTunnelTransform; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; -import org.apache.seatunnel.flink.stream.FlinkStreamTransform; import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFlinkTransformPluginDiscovery; +import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter; +import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils; import org.apache.seatunnel.shade.com.typesafe.config.Config; import com.google.common.collect.Lists; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; -public class TransformExecuteProcessor extends AbstractPluginExecuteProcessor { +public class TransformExecuteProcessor extends AbstractPluginExecuteProcessor { private static final String PLUGIN_TYPE = "transform"; @@ -43,17 +50,19 @@ protected TransformExecuteProcessor(List jarPaths, List p } @Override - protected List initializePlugins(List jarPaths, List pluginConfigs) { + protected List initializePlugins(List jarPaths, List pluginConfigs) { SeaTunnelFlinkTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelFlinkTransformPluginDiscovery(); List pluginJars = new ArrayList<>(); - List transforms = pluginConfigs.stream() + List transforms = pluginConfigs.stream() .map(transformConfig -> { PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, transformConfig.getString(PLUGIN_NAME)); - pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier))); - FlinkStreamTransform pluginInstance = (FlinkStreamTransform) transformPluginDiscovery.createPluginInstance(pluginIdentifier); - pluginInstance.setConfig(transformConfig); - pluginInstance.prepare(flinkEnvironment); - return pluginInstance; + List pluginJarPaths = transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)); + SeaTunnelTransform seaTunnelTransform = + transformPluginDiscovery.createPluginInstance(pluginIdentifier); + jarPaths.addAll(pluginJarPaths); + seaTunnelTransform.prepare(transformConfig); + seaTunnelTransform.setJobContext(jobContext); + return seaTunnelTransform; }).distinct().collect(Collectors.toList()); jarPaths.addAll(pluginJars); return transforms; @@ -68,12 +77,11 @@ public List> execute(List> upstreamDataStreams) List> result = new ArrayList<>(); for (int i = 0; i < plugins.size(); i++) { try { - FlinkStreamTransform transform = plugins.get(i); + SeaTunnelTransform transform = plugins.get(i); Config pluginConfig = pluginConfigs.get(i); DataStream stream = fromSourceTable(pluginConfig).orElse(input); - input = transform.processStream(flinkEnvironment, stream); + input = flinkTransform(transform, stream); registerResultTable(pluginConfig, input); - transform.registerFunction(flinkEnvironment); result.add(input); } catch (Exception e) { throw new TaskExecuteException( @@ -82,4 +90,25 @@ public List> execute(List> upstreamDataStreams) } return result; } + + protected DataStream flinkTransform(SeaTunnelTransform transform, DataStream stream) { + SeaTunnelDataType seaTunnelDataType = TypeConverterUtils.convert(stream.getType()); + transform.setTypeInfo(seaTunnelDataType); + TypeInformation rowTypeInfo = TypeConverterUtils.convert(transform.getProducedType()); + FlinkRowConverter transformInputRowConverter = new FlinkRowConverter(seaTunnelDataType); + FlinkRowConverter transformOutputRowConverter = new FlinkRowConverter(transform.getProducedType()); + DataStream output = stream.flatMap(new FlatMapFunction() { + @Override + public void flatMap(Row value, Collector out) throws Exception { + SeaTunnelRow seaTunnelRow = transformInputRowConverter.reconvert(value); + SeaTunnelRow dataRow = (SeaTunnelRow) transform.map(seaTunnelRow); + if (dataRow != null) { + Row copy = transformOutputRowConverter.convert(dataRow); + out.collect(copy); + } + } + }, + rowTypeInfo); + return output; + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf index a98c4f042efd..4e512683f0b3 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf @@ -41,9 +41,11 @@ source { } transform { - sql { - sql = "select name,age from fake" - } + Filter { + source_table_name = "fake" + result_table_name = "fake1" + fields = ["name", "age"] + } # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql @@ -51,6 +53,7 @@ transform { sink { Assert { + source_table_name = "fake1" rules = { row_rules = [ diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf index 61fa0357d3db..8c7ddb5d507b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/resources/iotdb/iotdb_source_to_sink.conf @@ -54,13 +54,19 @@ source { } transform { - sql { - sql = "SELECT 'root.sink_group.device_a' AS device_name, ts, c_string, c_boolean, c_tinyint, c_smallint, c_int, c_bigint, c_float, c_double FROM fake WHERE device_name = 'root.source_group.device_a' UNION SELECT 'root.sink_group.device_b' AS device_name, ts, c_string, c_boolean, c_tinyint, c_smallint, c_int, c_bigint, c_float, c_double FROM fake WHERE device_name = 'root.source_group.device_b'" + Replace { + source_table_name = "fake" + result_table_name = "fake1" + replace_field = "device_name" + pattern = "root.source_group" + replacement = "root.sink_group" + is_regex = false } } sink { IoTDB { + source_table_name = "fake1" node_urls = ["flink_e2e_iotdb_sink:6667"] username = "root" password = "root" diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/TestSuiteBase.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/TestSuiteBase.java index b098a2058497..71158dc362d1 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/TestSuiteBase.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/TestSuiteBase.java @@ -37,7 +37,7 @@ TestCaseInvocationContextProvider.class }) @TestInstance(TestInstance.Lifecycle.PER_CLASS) -@DisabledOnContainer(value = {}, type = EngineType.SEATUNNEL, disabledReason = "TODO: SeaTunnel engine e2e test isn't completed") +@DisabledOnContainer(value = {}, type = {EngineType.SEATUNNEL, EngineType.SPARK}, disabledReason = "TODO: SeaTunnel engine e2e test isn't completed") public abstract class TestSuiteBase { protected static final Network NETWORK = TestContainer.NETWORK; diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-datahub-flink-e2e/src/test/resources/datahub/fakesource_to_datahub.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-datahub-flink-e2e/src/test/resources/datahub/fakesource_to_datahub.conf index 87eb52fdfab2..d183a6a8fba3 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-datahub-flink-e2e/src/test/resources/datahub/fakesource_to_datahub.conf +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-datahub-flink-e2e/src/test/resources/datahub/fakesource_to_datahub.conf @@ -42,10 +42,6 @@ source { } transform { - sql { - sql = "select name,age from fake" - } - # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, # please go to https://seatunnel.apache.org/docs/transform/sql } diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf index 0f732b63b949..1c3bc120bf74 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf @@ -42,9 +42,6 @@ source { } transform { - sql { - sql = "select name from fake" - } # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, # please go to https://seatunnel.apache.org/docs/transform/sql diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf index 9173abcec807..2ac53beaf855 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf @@ -35,14 +35,17 @@ source{ } transform { - sql { - sql = "select name,age from jdbc" + Filter { + source_table_name = "jdbc" + result_table_name = "jdbc1" + fields = ["name", "age"] } + } sink { jdbc { - + source_table_name = "jdbc1" url = "jdbc:mysql://mysql:3306/test" driver = "com.mysql.cj.jdbc.Driver" user = "root" diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf index b4e39e0e9571..373cbc887d1f 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf @@ -37,14 +37,16 @@ source{ } transform { - sql { - sql = "select name,age from jdbc" + Filter { + source_table_name = "jdbc" + result_table_name = "jdbc1" + fields = ["name", "age"] } } sink { jdbc { - + source_table_name = "jdbc1" url = "jdbc:mysql://mysql:3306/test" driver = "com.mysql.cj.jdbc.Driver" user = "root" diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf index 887066eff071..036670835bc0 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf @@ -34,14 +34,16 @@ source{ } transform { - sql { - sql = "select name,age from jdbc" + Filter { + source_table_name = "jdbc" + result_table_name = "jdbc1" + fields = ["name", "age"] } } sink { jdbc { - + source_table_name = "jdbc1" url = "jdbc:postgresql://postgresql:5432/test" driver = "org.postgresql.Driver" user = "root" diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf index 38d9ac960053..e224e786c2b3 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf @@ -37,13 +37,16 @@ source{ } transform { - sql { - sql = "select name,age from jdbc" + Filter { + source_table_name = "jdbc" + result_table_name = "jdbc1" + fields = ["name", "age"] } } sink { jdbc { + source_table_name = "jdbc1" url = "jdbc:postgresql://postgresql:5432/test" driver = "org.postgresql.Driver" diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestSuiteBase.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestSuiteBase.java index e33c34b9d5a7..2672fd466141 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestSuiteBase.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestSuiteBase.java @@ -37,7 +37,7 @@ TestCaseInvocationContextProvider.class }) @TestInstance(TestInstance.Lifecycle.PER_CLASS) -@DisabledOnContainer(value = {}, type = {EngineType.FLINK, EngineType.SPARK}, disabledReason = "TODO: Transform v2 translation to spark/flink isn't completed") +@DisabledOnContainer(value = {}, type = {EngineType.SPARK}, disabledReason = "TODO: Transform v2 translation to spark isn't completed") public abstract class TestSuiteBase { protected static final Network NETWORK = TestContainer.NETWORK; diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml index 8cadae625f79..98219890ab5a 100644 --- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml @@ -95,7 +95,6 @@ ${flink.1.13.6.version} ${flink.scope} - com.squareup.okhttp3 mockwebserver diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf index aea7d4c8c50b..0f927351fb63 100644 --- a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf @@ -45,9 +45,6 @@ source { } transform { - sql { - sql = "select name,age from fake" - } # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, # please go to https://seatunnel.apache.org/docs/category/transform diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java deleted file mode 100644 index 12a91d0881f0..000000000000 --- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.plugin.discovery.flink; - -import org.apache.seatunnel.flink.BaseFlinkTransform; -import org.apache.seatunnel.plugin.discovery.PluginIdentifier; - -import java.net.URL; -import java.util.ArrayList; -import java.util.List; - -public class FlinkTransformPluginDiscovery extends FlinkAbstractPluginDiscovery { - - public FlinkTransformPluginDiscovery() { - super("flink"); - } - - @Override - public List getPluginJarPaths(List pluginIdentifiers) { - return new ArrayList<>(); - } - - @Override - protected Class getPluginBaseClass() { - return BaseFlinkTransform.class; - } -} diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFlinkTransformPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFlinkTransformPluginDiscovery.java index 2b5620f4a563..1525f110467b 100644 --- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFlinkTransformPluginDiscovery.java +++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFlinkTransformPluginDiscovery.java @@ -17,20 +17,16 @@ package org.apache.seatunnel.plugin.discovery.seatunnel; -import org.apache.seatunnel.flink.BaseFlinkTransform; +import org.apache.seatunnel.api.transform.SeaTunnelTransform; import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery; -/** - * Discovery for the SeaTunnel Flink transform. - */ -public class SeaTunnelFlinkTransformPluginDiscovery extends AbstractPluginDiscovery { - +public class SeaTunnelFlinkTransformPluginDiscovery extends AbstractPluginDiscovery { public SeaTunnelFlinkTransformPluginDiscovery() { super("seatunnel"); } @Override - protected Class getPluginBaseClass() { - return BaseFlinkTransform.class; + protected Class getPluginBaseClass() { + return SeaTunnelTransform.class; } } diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java index 95a1b96b6cda..5884b7c9c3e5 100644 --- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java +++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.type.SqlType; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -33,7 +34,7 @@ * * @param engine row */ -public abstract class RowConverter { +public abstract class RowConverter implements Serializable { protected final SeaTunnelDataType dataType; public RowConverter(SeaTunnelDataType dataType) {