From bd43315251100a296ede59c9f18f555ab03c71f7 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Wed, 11 Jan 2023 20:12:46 +0800 Subject: [PATCH 1/4] [Improve][Core][API] Remove old version apis --- seatunnel-apis/pom.xml | 36 -- seatunnel-apis/seatunnel-api-base/pom.xml | 40 -- .../seatunnel/apis/base/api/BaseSink.java | 27 -- .../seatunnel/apis/base/api/BaseSource.java | 28 -- .../apis/base/api/BaseTransform.java | 28 -- .../seatunnel/apis/base/command/Command.java | 35 -- .../apis/base/command/CommandArgs.java | 24 -- .../apis/base/command/CommandBuilder.java | 25 -- .../seatunnel/apis/base/env/Execution.java | 46 --- .../seatunnel/apis/base/env/RuntimeEnv.java | 47 --- .../seatunnel/apis/base/plugin/Plugin.java | 83 ----- .../base/plugin/PluginClosedException.java | 43 --- seatunnel-apis/seatunnel-api-flink/pom.xml | 129 ------- .../apache/seatunnel/flink/BaseFlinkSink.java | 27 -- .../seatunnel/flink/BaseFlinkSource.java | 28 -- .../seatunnel/flink/BaseFlinkTransform.java | 31 -- .../seatunnel/flink/FlinkEnvironment.java | 344 ------------------ .../flink/batch/FlinkBatchExecution.java | 121 ------ .../seatunnel/flink/batch/FlinkBatchSink.java | 32 -- .../flink/batch/FlinkBatchSource.java | 32 -- .../flink/batch/FlinkBatchTransform.java | 33 -- .../seatunnel/flink/enums/FormatType.java | 51 --- .../flink/stream/FlinkStreamExecution.java | 104 ------ .../flink/stream/FlinkStreamSink.java | 33 -- .../flink/stream/FlinkStreamSource.java | 32 -- .../flink/stream/FlinkStreamTransform.java | 32 -- .../seatunnel/flink/util/ConfigKeyName.java | 48 --- .../seatunnel/flink/util/EnvironmentUtil.java | 102 ------ .../seatunnel/flink/util/SchemaUtil.java | 238 ------------ .../seatunnel/flink/util/TableUtil.java | 64 ---- seatunnel-apis/seatunnel-api-spark/pom.xml | 74 ---- .../apache/seatunnel/spark/BaseSparkSink.java | 46 --- .../seatunnel/spark/BaseSparkSource.java | 43 --- .../seatunnel/spark/BaseSparkTransform.java | 46 --- .../seatunnel/spark/SparkEnvironment.java | 191 ---------- .../spark/batch/SparkBatchExecution.java | 72 ---- .../seatunnel/spark/batch/SparkBatchSink.java | 30 -- .../spark/batch/SparkBatchSource.java | 31 -- .../StructuredStreamingExecution.java | 74 ---- .../StructuredStreamingSink.java | 31 -- .../StructuredStreamingSource.java | 31 -- .../spark/utils/SparkStructTypeUtil.java | 109 ------ .../stream/SparkStreamingExecution.scala | 79 ---- .../spark/stream/SparkStreamingSink.scala | 25 -- .../spark/stream/SparkStreamingSource.scala | 43 --- 45 files changed, 2868 deletions(-) delete mode 100644 seatunnel-apis/pom.xml delete mode 100644 seatunnel-apis/seatunnel-api-base/pom.xml delete mode 100644 seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseSink.java delete mode 100644 seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseSource.java delete mode 100644 seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseTransform.java delete mode 100644 seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/Command.java delete mode 100644 seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/CommandArgs.java delete mode 100644 seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/CommandBuilder.java delete mode 100644 seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/env/Execution.java delete mode 100644 seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/env/RuntimeEnv.java delete mode 100644 seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/Plugin.java delete mode 100644 seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/PluginClosedException.java delete mode 100644 seatunnel-apis/seatunnel-api-flink/pom.xml delete mode 100644 seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSink.java delete mode 100644 seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSource.java delete mode 100644 seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkTransform.java delete mode 100644 seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java delete mode 100644 seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java delete mode 100644 seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSink.java delete mode 100644 seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSource.java delete mode 100644 seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchTransform.java delete mode 100644 seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/enums/FormatType.java delete mode 100644 seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java delete mode 100644 seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSink.java delete mode 100644 seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSource.java delete mode 100644 seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamTransform.java delete mode 100644 seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/ConfigKeyName.java delete mode 100644 seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java delete mode 100644 seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java delete mode 100644 seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java delete mode 100644 seatunnel-apis/seatunnel-api-spark/pom.xml delete mode 100644 seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSink.java delete mode 100644 seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java delete mode 100644 seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkTransform.java delete mode 100644 seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java delete mode 100644 seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java delete mode 100644 seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSink.java delete mode 100644 seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSource.java delete mode 100644 seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java delete mode 100644 seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSink.java delete mode 100644 seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSource.java delete mode 100644 seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/utils/SparkStructTypeUtil.java delete mode 100644 seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala delete mode 100644 seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingSink.scala delete mode 100644 seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingSource.scala diff --git a/seatunnel-apis/pom.xml b/seatunnel-apis/pom.xml deleted file mode 100644 index c12ffda4aba..00000000000 --- a/seatunnel-apis/pom.xml +++ /dev/null @@ -1,36 +0,0 @@ - - - - - org.apache.seatunnel - seatunnel - ${revision} - - 4.0.0 - seatunnel-apis - pom - - seatunnel-api-base - seatunnel-api-flink - seatunnel-api-spark - - diff --git a/seatunnel-apis/seatunnel-api-base/pom.xml b/seatunnel-apis/seatunnel-api-base/pom.xml deleted file mode 100644 index da26ee48172..00000000000 --- a/seatunnel-apis/seatunnel-api-base/pom.xml +++ /dev/null @@ -1,40 +0,0 @@ - - - - - org.apache.seatunnel - seatunnel-apis - ${revision} - ../pom.xml - - 4.0.0 - seatunnel-api-base - - - - org.apache.seatunnel - seatunnel-common - ${project.version} - - - - diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseSink.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseSink.java deleted file mode 100644 index 89f3283ee7a..00000000000 --- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseSink.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.apis.base.api; - -import org.apache.seatunnel.apis.base.env.RuntimeEnv; -import org.apache.seatunnel.apis.base.plugin.Plugin; - -/** - * a base interface indicates a sink plugin which will write data to other system. - */ -public interface BaseSink extends Plugin { -} diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseSource.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseSource.java deleted file mode 100644 index 18e22e5a1a1..00000000000 --- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseSource.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.apis.base.api; - -import org.apache.seatunnel.apis.base.env.RuntimeEnv; -import org.apache.seatunnel.apis.base.plugin.Plugin; - -/** - * a base interface indicates a source plugin which will read data from other system. - */ -public interface BaseSource extends Plugin { - -} diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseTransform.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseTransform.java deleted file mode 100644 index 14d273f3d85..00000000000 --- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseTransform.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.apis.base.api; - -import org.apache.seatunnel.apis.base.env.RuntimeEnv; -import org.apache.seatunnel.apis.base.plugin.Plugin; - -/** - * a base interface indicates a transform plugin which will do transformations on data. - */ -public interface BaseTransform extends Plugin { - -} diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/Command.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/Command.java deleted file mode 100644 index 0d4b2d1642c..00000000000 --- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/Command.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.apis.base.command; - -/** - * Command interface. - * - * @param args type - */ -@FunctionalInterface -public interface Command { - - /** - * Execute command - * - * @param commandArgs args - */ - void execute(T commandArgs); - -} diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/CommandArgs.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/CommandArgs.java deleted file mode 100644 index 27118c43041..00000000000 --- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/CommandArgs.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.apis.base.command; - -/** - * Used to create command. - */ -public interface CommandArgs { -} diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/CommandBuilder.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/CommandBuilder.java deleted file mode 100644 index 4eed944bdb8..00000000000 --- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/CommandBuilder.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.apis.base.command; - -@FunctionalInterface -public interface CommandBuilder { - - Command buildCommand(T commandArgs); - -} diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/env/Execution.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/env/Execution.java deleted file mode 100644 index 73040459158..00000000000 --- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/env/Execution.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.apis.base.env; - -import org.apache.seatunnel.apis.base.api.BaseSink; -import org.apache.seatunnel.apis.base.api.BaseSource; -import org.apache.seatunnel.apis.base.api.BaseTransform; -import org.apache.seatunnel.apis.base.plugin.Plugin; - -import java.util.List; - -/** - * the SeaTunnel job's execution context - */ -public interface Execution< - SR extends BaseSource, - TF extends BaseTransform, - SK extends BaseSink, - RE extends RuntimeEnv> extends Plugin { - - /** - * start to execute the SeaTunnel job - * - * @param sources source plugin list - * @param transforms transform plugin list - * @param sinks sink plugin list - */ - // todo: change the method to receive a ExecutionContext - void start(List sources, List transforms, List sinks) throws Exception; - -} diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/env/RuntimeEnv.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/env/RuntimeEnv.java deleted file mode 100644 index e8532a20872..00000000000 --- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/env/RuntimeEnv.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.apis.base.env; - -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.JobMode; - -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import java.net.URL; -import java.util.List; - -/** - * engine related runtime environment - */ -public interface RuntimeEnv { - - RuntimeEnv setConfig(Config config); - - Config getConfig(); - - CheckResult checkConfig(); - - RuntimeEnv prepare(); - - RuntimeEnv setJobMode(JobMode mode); - - JobMode getJobMode(); - - void registerPlugin(List pluginPaths); - -} diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/Plugin.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/Plugin.java deleted file mode 100644 index b1562792686..00000000000 --- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/Plugin.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.apis.base.plugin; - -import org.apache.seatunnel.apis.base.env.RuntimeEnv; -import org.apache.seatunnel.common.config.CheckResult; - -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import java.io.Serializable; - -/** - * A base interface indicates belonging to SeaTunnel. - * Plugin will be used as follows: - *
{@code
- *      try(Plugin plugin = new PluginA<>()) {
- *          plugin.setConfig(Config);
- *          CheckResult checkResult = plugin.checkConfig();
- *          if (checkResult.getSuccess()) {
- *              plugin.prepare();
- *              // plugin execute code
- *          }
- *      }
- *
- * }
- */ - -public interface Plugin extends Serializable, AutoCloseable { - String RESULT_TABLE_NAME = "result_table_name"; - String SOURCE_TABLE_NAME = "source_table_name"; - - void setConfig(Config config); - - Config getConfig(); - - default CheckResult checkConfig() { - return CheckResult.success(); - } - - /** - * This is a lifecycle method, this method will be executed after Plugin created. - * - * @param env environment - */ - default void prepare(T env) { - - } - - /** - * This is a lifecycle method, this method will be executed before Plugin destroy. - * It's used to release some resource. - * - * @throws Exception when close failed. - */ - @Override - default void close() throws Exception { - - } - - /** - * Return the plugin name, this is used in seatunnel conf DSL. - * - * @return plugin name. - */ - default String getPluginName() { - return this.getClass().getSimpleName(); - } -} diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/PluginClosedException.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/PluginClosedException.java deleted file mode 100644 index 1733a0a4c89..00000000000 --- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/PluginClosedException.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.apis.base.plugin; - -/** - * an Exception used for the scenes when plugin closed error. - */ -public class PluginClosedException extends RuntimeException { - - public PluginClosedException() { - } - - public PluginClosedException(String message) { - super(message); - } - - public PluginClosedException(String message, Throwable cause) { - super(message, cause); - } - - public PluginClosedException(Throwable cause) { - super(cause); - } - - public PluginClosedException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } -} diff --git a/seatunnel-apis/seatunnel-api-flink/pom.xml b/seatunnel-apis/seatunnel-api-flink/pom.xml deleted file mode 100644 index 2e0d90a74f2..00000000000 --- a/seatunnel-apis/seatunnel-api-flink/pom.xml +++ /dev/null @@ -1,129 +0,0 @@ - - - - - org.apache.seatunnel - seatunnel-apis - ${revision} - ../pom.xml - - 4.0.0 - - seatunnel-api-flink - - - 1.8.2 - - - - - org.apache.seatunnel - seatunnel-api-base - ${project.version} - - - - org.apache.seatunnel - seatunnel-common - ${project.version} - - - - org.apache.flink - flink-java - ${flink.1.13.6.version} - provided - - - org.apache.flink - flink-table-planner_${scala.binary.version} - ${flink.1.13.6.version} - provided - - - org.apache.flink - flink-table-planner-blink_${scala.binary.version} - ${flink.1.13.6.version} - provided - - - org.apache.flink - flink-streaming-java_${scala.binary.version} - ${flink.1.13.6.version} - provided - - - org.apache.flink - flink-optimizer_${scala.binary.version} - ${flink.1.13.6.version} - provided - - - - - org.apache.flink - flink-statebackend-rocksdb_${scala.binary.version} - ${flink.1.13.6.version} - provided - - - - - org.apache.flink - flink-csv - ${flink.1.13.6.version} - - - org.apache.flink - flink-orc_${scala.binary.version} - ${flink.1.13.6.version} - - - org.apache.flink - flink-parquet_${scala.binary.version} - ${flink.1.13.6.version} - - - org.apache.flink - flink-json - ${flink.1.13.6.version} - - - org.apache.flink - flink-avro - ${flink.1.13.6.version} - - - avro - org.apache.avro - - - - - org.apache.avro - avro - ${avro.version} - - - - - diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSink.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSink.java deleted file mode 100644 index 67b1c1715e0..00000000000 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSink.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.flink; - -import org.apache.seatunnel.apis.base.api.BaseSink; - -/** - * a base interface indicates a sink plugin running on Flink. - */ -public interface BaseFlinkSink extends BaseSink { - -} diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSource.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSource.java deleted file mode 100644 index 20bc2bae440..00000000000 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSource.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.flink; - -import org.apache.seatunnel.apis.base.api.BaseSource; - -/** - * a base interface indicates a source plugin running on Flink. - */ -public interface BaseFlinkSource extends BaseSource { - - OUT getData(FlinkEnvironment env); -} diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkTransform.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkTransform.java deleted file mode 100644 index e467916fde1..00000000000 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkTransform.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.flink; - -import org.apache.seatunnel.apis.base.api.BaseTransform; - -/** - * a base interface indicates a transform plugin running on Flink. - */ -public interface BaseFlinkTransform extends BaseTransform { - - default void registerFunction(FlinkEnvironment flinkEnvironment) { - - } - -} diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java deleted file mode 100644 index 491865b3d8d..00000000000 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java +++ /dev/null @@ -1,344 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.flink; - -import org.apache.seatunnel.apis.base.env.RuntimeEnv; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.JobMode; -import org.apache.seatunnel.common.utils.ReflectionUtils; -import org.apache.seatunnel.flink.util.ConfigKeyName; -import org.apache.seatunnel.flink.util.EnvironmentUtil; -import org.apache.seatunnel.flink.util.TableUtil; - -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import lombok.extern.slf4j.Slf4j; -import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.PipelineOptions; -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; -import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.CheckpointConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.types.Row; -import org.apache.flink.util.TernaryBoolean; - -import java.net.URL; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.stream.Collectors; - -@Slf4j -public class FlinkEnvironment implements RuntimeEnv { - - private Config config; - - private StreamExecutionEnvironment environment; - - private StreamTableEnvironment tableEnvironment; - - private ExecutionEnvironment batchEnvironment; - - private BatchTableEnvironment batchTableEnvironment; - - private JobMode jobMode; - - private String jobName = "seatunnel"; - - private static final String RESULT_TABLE_NAME = "result_table_name"; - - @Override - public FlinkEnvironment setConfig(Config config) { - this.config = config; - return this; - } - - @Override - public Config getConfig() { - return config; - } - - @Override - public CheckResult checkConfig() { - return EnvironmentUtil.checkRestartStrategy(config); - } - - @Override - public FlinkEnvironment prepare() { - // Batch/Streaming both use data stream api in SeaTunnel New API - createStreamEnvironment(); - createStreamTableEnvironment(); - if (!isStreaming()) { - createExecutionEnvironment(); - createBatchTableEnvironment(); - } - if (config.hasPath("job.name")) { - jobName = config.getString("job.name"); - } - return this; - } - - public String getJobName() { - return jobName; - } - - public boolean isStreaming() { - return JobMode.STREAMING.equals(jobMode); - } - - @Override - public FlinkEnvironment setJobMode(JobMode jobMode) { - this.jobMode = jobMode; - return this; - } - - @Override - public JobMode getJobMode() { - return jobMode; - } - - @Override - public void registerPlugin(List pluginPaths) { - pluginPaths.forEach(url -> log.info("register plugins : {}", url)); - List configurations = new ArrayList<>(); - try { - configurations.add((Configuration) Objects.requireNonNull(ReflectionUtils.getDeclaredMethod(StreamExecutionEnvironment.class, - "getConfiguration")).orElseThrow(() -> new RuntimeException("can't find " + - "method: getConfiguration")).invoke(this.environment)); - if (!isStreaming()) { - configurations.add(batchEnvironment.getConfiguration()); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - configurations.forEach(configuration -> { - List jars = configuration.get(PipelineOptions.JARS); - if (jars == null) { - jars = new ArrayList<>(); - } - jars.addAll(pluginPaths.stream().map(URL::toString).collect(Collectors.toList())); - configuration.set(PipelineOptions.JARS, jars.stream().distinct().collect(Collectors.toList())); - List classpath = configuration.get(PipelineOptions.CLASSPATHS); - if (classpath == null) { - classpath = new ArrayList<>(); - } - classpath.addAll(pluginPaths.stream().map(URL::toString).collect(Collectors.toList())); - configuration.set(PipelineOptions.CLASSPATHS, classpath.stream().distinct().collect(Collectors.toList())); - }); - } - - public StreamExecutionEnvironment getStreamExecutionEnvironment() { - return environment; - } - - public StreamTableEnvironment getStreamTableEnvironment() { - return tableEnvironment; - } - - private void createStreamTableEnvironment() { - // use blink and streammode - EnvironmentSettings.Builder envBuilder = EnvironmentSettings.newInstance() - .inStreamingMode(); - if (this.config.hasPath(ConfigKeyName.PLANNER) && "blink" - .equals(this.config.getString(ConfigKeyName.PLANNER))) { - envBuilder.useBlinkPlanner(); - } else { - envBuilder.useOldPlanner(); - } - EnvironmentSettings environmentSettings = envBuilder.build(); - - tableEnvironment = StreamTableEnvironment.create(getStreamExecutionEnvironment(), environmentSettings); - TableConfig config = tableEnvironment.getConfig(); - if (this.config.hasPath(ConfigKeyName.MAX_STATE_RETENTION_TIME) && this.config - .hasPath(ConfigKeyName.MIN_STATE_RETENTION_TIME)) { - long max = this.config.getLong(ConfigKeyName.MAX_STATE_RETENTION_TIME); - long min = this.config.getLong(ConfigKeyName.MIN_STATE_RETENTION_TIME); - config.setIdleStateRetentionTime(Time.seconds(min), Time.seconds(max)); - } - } - - private void createStreamEnvironment() { - Configuration configuration = new Configuration(); - EnvironmentUtil.initConfiguration(config, configuration); - environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration); - setTimeCharacteristic(); - - setCheckpoint(); - - EnvironmentUtil.setRestartStrategy(config, environment.getConfig()); - - if (config.hasPath(ConfigKeyName.BUFFER_TIMEOUT_MILLIS)) { - long timeout = config.getLong(ConfigKeyName.BUFFER_TIMEOUT_MILLIS); - environment.setBufferTimeout(timeout); - } - - if (config.hasPath(ConfigKeyName.PARALLELISM)) { - int parallelism = config.getInt(ConfigKeyName.PARALLELISM); - environment.setParallelism(parallelism); - } - - if (config.hasPath(ConfigKeyName.MAX_PARALLELISM)) { - int max = config.getInt(ConfigKeyName.MAX_PARALLELISM); - environment.setMaxParallelism(max); - } - - if (this.jobMode.equals(JobMode.BATCH)) { - environment.setRuntimeMode(RuntimeExecutionMode.BATCH); - } - } - - public ExecutionEnvironment getBatchEnvironment() { - return batchEnvironment; - } - - public BatchTableEnvironment getBatchTableEnvironment() { - return batchTableEnvironment; - } - - private void createExecutionEnvironment() { - batchEnvironment = ExecutionEnvironment.getExecutionEnvironment(); - if (config.hasPath(ConfigKeyName.PARALLELISM)) { - int parallelism = config.getInt(ConfigKeyName.PARALLELISM); - batchEnvironment.setParallelism(parallelism); - } - EnvironmentUtil.setRestartStrategy(config, batchEnvironment.getConfig()); - } - - private void createBatchTableEnvironment() { - batchTableEnvironment = BatchTableEnvironment.create(batchEnvironment); - } - - private void setTimeCharacteristic() { - if (config.hasPath(ConfigKeyName.TIME_CHARACTERISTIC)) { - String timeType = config.getString(ConfigKeyName.TIME_CHARACTERISTIC); - switch (timeType.toLowerCase()) { - case "event-time": - environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - break; - case "ingestion-time": - environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - break; - case "processing-time": - environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); - break; - default: - log.warn( - "set time-characteristic failed, unknown time-characteristic [{}],only support event-time,ingestion-time,processing-time", - timeType); - break; - } - } - } - - private void setCheckpoint() { - if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) { - CheckpointConfig checkpointConfig = environment.getCheckpointConfig(); - long interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL); - environment.enableCheckpointing(interval); - - if (config.hasPath(ConfigKeyName.CHECKPOINT_MODE)) { - String mode = config.getString(ConfigKeyName.CHECKPOINT_MODE); - switch (mode.toLowerCase()) { - case "exactly-once": - checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); - break; - case "at-least-once": - checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); - break; - default: - log.warn( - "set checkpoint.mode failed, unknown checkpoint.mode [{}],only support exactly-once,at-least-once", - mode); - break; - } - } - - if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) { - long timeout = config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT); - checkpointConfig.setCheckpointTimeout(timeout); - } - - if (config.hasPath(ConfigKeyName.CHECKPOINT_DATA_URI)) { - String uri = config.getString(ConfigKeyName.CHECKPOINT_DATA_URI); - StateBackend fsStateBackend = new FsStateBackend(uri); - if (config.hasPath(ConfigKeyName.STATE_BACKEND)) { - String stateBackend = config.getString(ConfigKeyName.STATE_BACKEND); - if ("rocksdb".equalsIgnoreCase(stateBackend)) { - StateBackend rocksDBStateBackend = new RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE); - environment.setStateBackend(rocksDBStateBackend); - } - } else { - environment.setStateBackend(fsStateBackend); - } - } - - if (config.hasPath(ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS)) { - int max = config.getInt(ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS); - checkpointConfig.setMaxConcurrentCheckpoints(max); - } - - if (config.hasPath(ConfigKeyName.CHECKPOINT_CLEANUP_MODE)) { - boolean cleanup = config.getBoolean(ConfigKeyName.CHECKPOINT_CLEANUP_MODE); - if (cleanup) { - checkpointConfig.enableExternalizedCheckpoints( - CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); - } else { - checkpointConfig.enableExternalizedCheckpoints( - CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); - - } - } - - if (config.hasPath(ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS)) { - long minPause = config.getLong(ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS); - checkpointConfig.setMinPauseBetweenCheckpoints(minPause); - } - - if (config.hasPath(ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS)) { - int failNum = config.getInt(ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS); - checkpointConfig.setTolerableCheckpointFailureNumber(failNum); - } - } - } - - public void registerResultTable(Config config, DataStream dataStream) { - if (config.hasPath(RESULT_TABLE_NAME)) { - String name = config.getString(RESULT_TABLE_NAME); - StreamTableEnvironment tableEnvironment = this.getStreamTableEnvironment(); - if (!TableUtil.tableExists(tableEnvironment, name)) { - if (config.hasPath("field_name")) { - String fieldName = config.getString("field_name"); - tableEnvironment.registerDataStream(name, dataStream, fieldName); - } else { - tableEnvironment.registerDataStream(name, dataStream); - } - } - } - } -} diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java deleted file mode 100644 index bb8060e38ef..00000000000 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.flink.batch; - -import org.apache.seatunnel.apis.base.env.Execution; -import org.apache.seatunnel.flink.FlinkEnvironment; -import org.apache.seatunnel.flink.util.TableUtil; - -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import lombok.extern.slf4j.Slf4j; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; -import org.apache.flink.types.Row; - -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -@Slf4j -public class FlinkBatchExecution implements Execution { - - private Config config; - - private final FlinkEnvironment flinkEnvironment; - - public FlinkBatchExecution(FlinkEnvironment flinkEnvironment) { - this.flinkEnvironment = flinkEnvironment; - } - - @Override - public void start(List sources, List transforms, List sinks) throws Exception { - List> data = new ArrayList<>(); - - for (FlinkBatchSource source : sources) { - DataSet dataSet = source.getData(flinkEnvironment); - data.add(dataSet); - registerResultTable(source.getConfig(), dataSet); - } - - DataSet input = data.get(0); - - for (FlinkBatchTransform transform : transforms) { - DataSet dataSet = fromSourceTable(transform.getConfig()).orElse(input); - input = transform.processBatch(flinkEnvironment, dataSet); - registerResultTable(transform.getConfig(), input); - transform.registerFunction(flinkEnvironment); - } - - for (FlinkBatchSink sink : sinks) { - DataSet dataSet = fromSourceTable(sink.getConfig()).orElse(input); - sink.outputBatch(flinkEnvironment, dataSet); - } - - if (whetherExecute(sinks)) { - try { - log.info("Flink Execution Plan:{}", flinkEnvironment.getBatchEnvironment().getExecutionPlan()); - JobExecutionResult execute = flinkEnvironment.getBatchEnvironment().execute(flinkEnvironment.getJobName()); - log.info(execute.toString()); - } catch (Exception e) { - log.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName()); - throw e; - } - } - } - - private void registerResultTable(Config pluginConfig, DataSet dataSet) { - if (pluginConfig.hasPath(RESULT_TABLE_NAME)) { - String name = pluginConfig.getString(RESULT_TABLE_NAME); - BatchTableEnvironment tableEnvironment = flinkEnvironment.getBatchTableEnvironment(); - if (!TableUtil.tableExists(tableEnvironment, name)) { - if (pluginConfig.hasPath("field_name")) { - String fieldName = pluginConfig.getString("field_name"); - tableEnvironment.registerDataSet(name, dataSet, fieldName); - } else { - tableEnvironment.registerDataSet(name, dataSet); - } - } - } - } - - private Optional> fromSourceTable(Config pluginConfig) { - if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) { - BatchTableEnvironment tableEnvironment = flinkEnvironment.getBatchTableEnvironment(); - Table table = tableEnvironment.scan(pluginConfig.getString(SOURCE_TABLE_NAME)); - return Optional.ofNullable(TableUtil.tableToDataSet(tableEnvironment, table)); - } - return Optional.empty(); - } - - @Override - public void setConfig(Config config) { - this.config = config; - } - - @Override - public Config getConfig() { - return config; - } - - private boolean whetherExecute(List sinks) { - return sinks.stream().noneMatch(s -> "ConsoleSink".equals(s.getPluginName()) || "AssertSink".equals(s.getPluginName())); - } -} diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSink.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSink.java deleted file mode 100644 index a85db77da18..00000000000 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSink.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.flink.batch; - -import org.apache.seatunnel.flink.BaseFlinkSink; -import org.apache.seatunnel.flink.FlinkEnvironment; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.types.Row; - -/** - * a FlinkBatchSink plugin will write data to other system using Flink DataSet API. - */ -public interface FlinkBatchSink extends BaseFlinkSink { - - void outputBatch(FlinkEnvironment env, DataSet inDataSet); -} diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSource.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSource.java deleted file mode 100644 index 60472360d69..00000000000 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchSource.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.flink.batch; - -import org.apache.seatunnel.flink.BaseFlinkSource; -import org.apache.seatunnel.flink.FlinkEnvironment; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.types.Row; - -/** - * a FlinkBatchSource plugin will read data from other system using Flink DataSet API. - */ -public interface FlinkBatchSource extends BaseFlinkSource> { - - DataSet getData(FlinkEnvironment env); -} diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchTransform.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchTransform.java deleted file mode 100644 index dd9d1dbfb88..00000000000 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchTransform.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.flink.batch; - -import org.apache.seatunnel.flink.BaseFlinkTransform; -import org.apache.seatunnel.flink.FlinkEnvironment; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.types.Row; - -/** - * a FlinkBatchTransform plugin will do transformations to Flink DataSet. - */ -public interface FlinkBatchTransform extends BaseFlinkTransform { - - DataSet processBatch(FlinkEnvironment env, DataSet data) throws Exception; - -} diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/enums/FormatType.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/enums/FormatType.java deleted file mode 100644 index 7b25c1721bd..00000000000 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/enums/FormatType.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.flink.enums; - -import java.util.Arrays; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; - -public enum FormatType { - - JSON("json"), - CSV("csv"), - ORC("orc"), - AVRO("avro"), - PARQUET("parquet"), - TEXT("text"), - ; - - private final String name; - - private static final Map NAME_MAP = Arrays.stream(FormatType.values()) - .collect(Collectors.toMap(FormatType::getName, Function.identity())); - - FormatType(String name) { - this.name = name; - } - - public String getName() { - return name; - } - - public static FormatType from(String name) { - return NAME_MAP.get(name); - } -} diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java deleted file mode 100644 index 8672171cd59..00000000000 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.flink.stream; - -import org.apache.seatunnel.apis.base.env.Execution; -import org.apache.seatunnel.apis.base.plugin.Plugin; -import org.apache.seatunnel.flink.FlinkEnvironment; -import org.apache.seatunnel.flink.util.TableUtil; - -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import lombok.extern.slf4j.Slf4j; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.types.Row; - -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -@Slf4j -public class FlinkStreamExecution implements Execution { - - private Config config; - - private final FlinkEnvironment flinkEnvironment; - - public FlinkStreamExecution(FlinkEnvironment streamEnvironment) { - this.flinkEnvironment = streamEnvironment; - } - - @Override - public void start(List sources, List transforms, List sinks) throws Exception { - List> data = new ArrayList<>(); - - for (FlinkStreamSource source : sources) { - DataStream dataStream = source.getData(flinkEnvironment); - data.add(dataStream); - registerResultTable(source, dataStream); - } - - DataStream input = data.get(0); - - for (FlinkStreamTransform transform : transforms) { - DataStream stream = fromSourceTable(transform.getConfig()).orElse(input); - input = transform.processStream(flinkEnvironment, stream); - registerResultTable(transform, input); - transform.registerFunction(flinkEnvironment); - } - - for (FlinkStreamSink sink : sinks) { - DataStream stream = fromSourceTable(sink.getConfig()).orElse(input); - sink.outputStream(flinkEnvironment, stream); - } - try { - log.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan()); - flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName()); - } catch (Exception e) { - log.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName()); - throw e; - } - } - - private void registerResultTable(Plugin plugin, DataStream dataStream) { - Config config = plugin.getConfig(); - flinkEnvironment.registerResultTable(config, dataStream); - } - - private Optional> fromSourceTable(Config pluginConfig) { - if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) { - StreamTableEnvironment tableEnvironment = flinkEnvironment.getStreamTableEnvironment(); - Table table = tableEnvironment.scan(pluginConfig.getString(SOURCE_TABLE_NAME)); - return Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true)); - } - return Optional.empty(); - } - - @Override - public void setConfig(Config config) { - this.config = config; - } - - @Override - public Config getConfig() { - return config; - } - -} diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSink.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSink.java deleted file mode 100644 index 07c6350e394..00000000000 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSink.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.flink.stream; - -import org.apache.seatunnel.flink.BaseFlinkSink; -import org.apache.seatunnel.flink.FlinkEnvironment; - -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.types.Row; - -/** - * a FlinkStreamSink plugin will write data to other system using Flink DataStream API. - */ -public interface FlinkStreamSink extends BaseFlinkSink { - - void outputStream(FlinkEnvironment env, DataStream dataStream); - -} diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSource.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSource.java deleted file mode 100644 index 096b665384c..00000000000 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamSource.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.flink.stream; - -import org.apache.seatunnel.flink.BaseFlinkSource; -import org.apache.seatunnel.flink.FlinkEnvironment; - -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.types.Row; - -/** - * a FlinkStreamSource plugin will read data from other system using Flink DataStream API. - */ -public interface FlinkStreamSource extends BaseFlinkSource> { - - DataStream getData(FlinkEnvironment env); -} diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamTransform.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamTransform.java deleted file mode 100644 index 3678e17593c..00000000000 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamTransform.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.flink.stream; - -import org.apache.seatunnel.flink.BaseFlinkTransform; -import org.apache.seatunnel.flink.FlinkEnvironment; - -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.types.Row; - -/** - * a FlinkBatchTransform plugin will do transformations to Flink DataStream. - */ -public interface FlinkStreamTransform extends BaseFlinkTransform { - - DataStream processStream(FlinkEnvironment env, DataStream dataStream) throws Exception; -} diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/ConfigKeyName.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/ConfigKeyName.java deleted file mode 100644 index 88b76751760..00000000000 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/ConfigKeyName.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.flink.util; - -public class ConfigKeyName { - - private ConfigKeyName() { - throw new IllegalStateException("Utility class"); - } - - public static final String TIME_CHARACTERISTIC = "execution.time-characteristic"; - public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout"; - public static final String PARALLELISM = "execution.parallelism"; - public static final String MAX_PARALLELISM = "execution.max-parallelism"; - public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval"; - public static final String CHECKPOINT_MODE = "execution.checkpoint.mode"; - public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout"; - public static final String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri"; - public static final String MAX_CONCURRENT_CHECKPOINTS = "execution.max-concurrent-checkpoints"; - public static final String CHECKPOINT_CLEANUP_MODE = "execution.checkpoint.cleanup-mode"; - public static final String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpoint.min-pause"; - public static final String FAIL_ON_CHECKPOINTING_ERRORS = "execution.checkpoint.fail-on-error"; - public static final String RESTART_STRATEGY = "execution.restart.strategy"; - public static final String RESTART_ATTEMPTS = "execution.restart.attempts"; - public static final String RESTART_DELAY_BETWEEN_ATTEMPTS = "execution.restart.delayBetweenAttempts"; - public static final String RESTART_FAILURE_INTERVAL = "execution.restart.failureInterval"; - public static final String RESTART_FAILURE_RATE = "execution.restart.failureRate"; - public static final String RESTART_DELAY_INTERVAL = "execution.restart.delayInterval"; - public static final String MAX_STATE_RETENTION_TIME = "execution.query.state.max-retention"; - public static final String MIN_STATE_RETENTION_TIME = "execution.query.state.min-retention"; - public static final String STATE_BACKEND = "execution.state.backend"; - public static final String PLANNER = "execution.planner"; -} diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java deleted file mode 100644 index aa52a121f44..00000000000 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.flink.util; - -import org.apache.seatunnel.common.config.CheckResult; - -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import lombok.extern.slf4j.Slf4j; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.PipelineOptions; - -import java.util.concurrent.TimeUnit; - -@Slf4j -public final class EnvironmentUtil { - - private EnvironmentUtil() { - } - - public static void setRestartStrategy(Config config, ExecutionConfig executionConfig) { - try { - if (config.hasPath(ConfigKeyName.RESTART_STRATEGY)) { - String restartStrategy = config.getString(ConfigKeyName.RESTART_STRATEGY); - switch (restartStrategy.toLowerCase()) { - case "no": - executionConfig.setRestartStrategy(RestartStrategies.noRestart()); - break; - case "fixed-delay": - int attempts = config.getInt(ConfigKeyName.RESTART_ATTEMPTS); - long delay = config.getLong(ConfigKeyName.RESTART_DELAY_BETWEEN_ATTEMPTS); - executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(attempts, delay)); - break; - case "failure-rate": - long failureInterval = config.getLong(ConfigKeyName.RESTART_FAILURE_INTERVAL); - int rate = config.getInt(ConfigKeyName.RESTART_FAILURE_RATE); - long delayInterval = config.getLong(ConfigKeyName.RESTART_DELAY_INTERVAL); - executionConfig.setRestartStrategy(RestartStrategies.failureRateRestart(rate, - Time.of(failureInterval, TimeUnit.MILLISECONDS), - Time.of(delayInterval, TimeUnit.MILLISECONDS))); - break; - default: - log.warn("set restart.strategy failed, unknown restart.strategy [{}],only support no,fixed-delay,failure-rate", restartStrategy); - } - } - } catch (Exception e) { - log.warn("set restart.strategy in config '{}' exception", config, e); - } - } - - public static CheckResult checkRestartStrategy(Config config) { - if (config.hasPath(ConfigKeyName.RESTART_STRATEGY)) { - String restartStrategy = config.getString(ConfigKeyName.RESTART_STRATEGY); - switch (restartStrategy.toLowerCase()) { - case "fixed-delay": - if (!(config.hasPath(ConfigKeyName.RESTART_ATTEMPTS) && config.hasPath(ConfigKeyName.RESTART_DELAY_BETWEEN_ATTEMPTS))) { - return CheckResult.error(String.format("fixed-delay restart strategy must set [%s],[%s]", ConfigKeyName.RESTART_ATTEMPTS, ConfigKeyName.RESTART_DELAY_BETWEEN_ATTEMPTS)); - } - break; - case "failure-rate": - if (!(config.hasPath(ConfigKeyName.RESTART_FAILURE_INTERVAL) && config.hasPath(ConfigKeyName.RESTART_FAILURE_RATE) && config.hasPath(ConfigKeyName.RESTART_DELAY_INTERVAL))) { - return CheckResult.error(String.format("failure-rate restart strategy must set [%s],[%s],[%s]", ConfigKeyName.RESTART_FAILURE_INTERVAL, ConfigKeyName.RESTART_FAILURE_RATE, ConfigKeyName.RESTART_DELAY_INTERVAL)); - } - break; - default: - return CheckResult.success(); - } - } - return CheckResult.success(); - } - - public static void initConfiguration(Config config, Configuration configuration) { - if (config.hasPath("pipeline")) { - Config pipeline = config.getConfig("pipeline"); - if (pipeline.hasPath("jars")) { - configuration.setString(PipelineOptions.JARS.key(), pipeline.getString("jars")); - } - if (pipeline.hasPath("classpaths")) { - configuration.setString(PipelineOptions.CLASSPATHS.key(), pipeline.getString("classpaths")); - } - } - - } -} diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java deleted file mode 100644 index 7a393cca86c..00000000000 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.flink.util; - -import org.apache.seatunnel.flink.enums.FormatType; - -import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.DecimalNode; -import com.fasterxml.jackson.databind.node.DoubleNode; -import com.fasterxml.jackson.databind.node.FloatNode; -import com.fasterxml.jackson.databind.node.IntNode; -import com.fasterxml.jackson.databind.node.LongNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.databind.node.TextNode; -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.api.scala.typeutils.Types; -import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; -import org.apache.flink.table.descriptors.Avro; -import org.apache.flink.table.descriptors.Csv; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.FormatDescriptor; -import org.apache.flink.table.descriptors.Json; -import org.apache.flink.table.descriptors.Schema; -import org.apache.flink.table.utils.TypeStringUtils; -import org.apache.flink.types.Row; - -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.regex.Pattern; - -public final class SchemaUtil { - - private static final Pattern DASH_COMPILE = Pattern.compile("-"); - - private SchemaUtil() { - } - - public static void setSchema(Schema schema, Object info, FormatType format) { - - switch (format) { - case JSON: - getJsonSchema(schema, (ObjectNode) info); - break; - case CSV: - getCsvSchema(schema, (ArrayNode) info); - break; - case ORC: - getOrcSchema(schema, (ObjectNode) info); - break; - case AVRO: - getAvroSchema(schema, (ObjectNode) info); - break; - case PARQUET: - getParquetSchema(schema, (ObjectNode) info); - break; - default: - } - } - - public static FormatDescriptor setFormat(FormatType format, Config config) throws Exception { - FormatDescriptor formatDescriptor = null; - switch (format) { - case JSON: - formatDescriptor = new Json().failOnMissingField(false).deriveSchema(); - break; - case CSV: - Csv csv = new Csv().deriveSchema(); - Field interPro = csv.getClass().getDeclaredField("internalProperties"); - interPro.setAccessible(true); - Object desc = interPro.get(csv); - Class descCls = DescriptorProperties.class; - Method putMethod = descCls.getDeclaredMethod("put", String.class, String.class); - putMethod.setAccessible(true); - for (Map.Entry entry : config.entrySet()) { - String key = entry.getKey(); - if (key.startsWith("format.") && !StringUtils.equals(key, "format.type")) { - String value = config.getString(key); - putMethod.invoke(desc, key, value); - } - } - formatDescriptor = csv; - break; - case AVRO: - formatDescriptor = new Avro().avroSchema(config.getString("schema")); - break; - case ORC: - case PARQUET: - default: - break; - } - return formatDescriptor; - } - - private static void getJsonSchema(Schema schema, ObjectNode json) { - Iterator> nodeIterator = json.fields(); - while (nodeIterator.hasNext()) { - Map.Entry entry = nodeIterator.next(); - String key = entry.getKey(); - Object value = entry.getValue(); - if (value instanceof TextNode) { - schema.field(key, Types.STRING()); - } else if (value instanceof IntNode) { - schema.field(key, Types.INT()); - } else if (value instanceof LongNode) { - schema.field(key, Types.LONG()); - } else if (value instanceof DecimalNode) { - schema.field(key, Types.JAVA_BIG_DEC()); - } else if (value instanceof FloatNode) { - schema.field(key, Types.FLOAT()); - } else if (value instanceof DoubleNode) { - schema.field(key, Types.DOUBLE()); - } else if (value instanceof ObjectNode) { - schema.field(key, getTypeInformation((ObjectNode) value)); - } else if (value instanceof ArrayNode) { - Object obj = ((ArrayNode) value).get(0); - if (obj instanceof ObjectNode) { - schema.field(key, ObjectArrayTypeInfo.getInfoFor(Row[].class, getTypeInformation((ObjectNode) obj))); - } else { - schema.field(key, ObjectArrayTypeInfo.getInfoFor(Object[].class, TypeInformation.of(Object.class))); - } - } - } - } - - private static void getCsvSchema(Schema schema, ArrayNode schemaList) { - Iterator iterator = schemaList.elements(); - - while (iterator.hasNext()) { - JsonNode jsonNode = iterator.next(); - String field = jsonNode.get("field").textValue(); - String type = jsonNode.get("type").textValue().toUpperCase(); - - schema.field(field, type); - } - } - - public static TypeInformation[] getCsvType(List> schemaList) { - TypeInformation[] typeInformation = new TypeInformation[schemaList.size()]; - int i = 0; - for (Map map : schemaList) { - String type = map.get("type").toUpperCase(); - typeInformation[i++] = TypeStringUtils.readTypeInfo(type); - } - return typeInformation; - } - - /** - * todo - * - * @param schema schema - * @param json json - */ - private static void getOrcSchema(Schema schema, ObjectNode json) { - - } - - /** - * todo - * - * @param schema schema - * @param json json - */ - private static void getParquetSchema(Schema schema, ObjectNode json) { - - } - - private static void getAvroSchema(Schema schema, ObjectNode json) { - RowTypeInfo typeInfo = (RowTypeInfo) AvroSchemaConverter.convertToTypeInfo(json.toString()); - String[] fieldNames = typeInfo.getFieldNames(); - for (String name : fieldNames) { - schema.field(name, typeInfo.getTypeAt(name)); - } - } - - public static RowTypeInfo getTypeInformation(ObjectNode json) { - int size = json.size(); - String[] fields = new String[size]; - TypeInformation[] informations = new TypeInformation[size]; - int i = 0; - Iterator> nodeIterator = json.fields(); - while (nodeIterator.hasNext()) { - Map.Entry entry = nodeIterator.next(); - String key = entry.getKey(); - Object value = entry.getValue(); - fields[i] = key; - if (value instanceof TextNode) { - informations[i] = Types.STRING(); - } else if (value instanceof IntNode) { - informations[i] = Types.INT(); - } else if (value instanceof LongNode) { - informations[i] = Types.LONG(); - } else if (value instanceof DecimalNode) { - informations[i] = Types.JAVA_BIG_DEC(); - } else if (value instanceof FloatNode) { - informations[i] = Types.FLOAT(); - } else if (value instanceof DoubleNode) { - informations[i] = Types.DOUBLE(); - } else if (value instanceof ObjectNode) { - informations[i] = getTypeInformation((ObjectNode) value); - } else if (value instanceof ArrayNode) { - ObjectNode demo = (ObjectNode) ((ArrayNode) value).get(0); - informations[i] = ObjectArrayTypeInfo.getInfoFor(Row[].class, getTypeInformation(demo)); - } - i++; - } - return new RowTypeInfo(informations, fields); - } - - public static String getUniqueTableName() { - return DASH_COMPILE.matcher(UUID.randomUUID().toString()).replaceAll("_"); - } -} diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java deleted file mode 100644 index e1877effb5e..00000000000 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/TableUtil.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.flink.util; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.types.Row; - -import java.util.Arrays; - -public final class TableUtil { - - private TableUtil() { - } - - public static DataStream tableToDataStream(StreamTableEnvironment tableEnvironment, Table table, boolean isAppend) { - - TypeInformation typeInfo = table.getSchema().toRowType(); - if (isAppend) { - return tableEnvironment.toAppendStream(table, typeInfo); - } - return tableEnvironment - .toRetractStream(table, typeInfo) - .filter(row -> row.f0) - .map(row -> row.f1) - .returns(typeInfo); - } - - public static DataSet tableToDataSet(BatchTableEnvironment tableEnvironment, Table table) { - return tableEnvironment.toDataSet(table, table.getSchema().toRowType()); - } - - public static void dataStreamToTable(StreamTableEnvironment tableEnvironment, String tableName, DataStream dataStream) { - tableEnvironment.registerDataStream(tableName, dataStream); - } - - public static void dataSetToTable(BatchTableEnvironment tableEnvironment, String tableName, DataSet dataSet) { - tableEnvironment.registerDataSet(tableName, dataSet); - } - - public static boolean tableExists(TableEnvironment tableEnvironment, String name) { - return Arrays.asList(tableEnvironment.listTables()).contains(name); - } -} diff --git a/seatunnel-apis/seatunnel-api-spark/pom.xml b/seatunnel-apis/seatunnel-api-spark/pom.xml deleted file mode 100644 index 020635abaab..00000000000 --- a/seatunnel-apis/seatunnel-api-spark/pom.xml +++ /dev/null @@ -1,74 +0,0 @@ - - - - - org.apache.seatunnel - seatunnel-apis - ${revision} - ../pom.xml - - 4.0.0 - - seatunnel-api-spark - - - 1.3.0 - - - - - org.apache.seatunnel - seatunnel-api-base - ${project.version} - - - org.apache.seatunnel - seatunnel-common - ${project.version} - - - - org.apache.spark - spark-streaming_${scala.binary.version} - ${spark.2.4.0.version} - ${spark.scope} - - - org.apache.spark - spark-core_${scala.binary.version} - ${spark.2.4.0.version} - ${spark.scope} - - - org.apache.spark - spark-sql_${scala.binary.version} - ${spark.2.4.0.version} - ${spark.scope} - - - net.jpountz.lz4 - lz4 - ${lz4.version} - - - - diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSink.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSink.java deleted file mode 100644 index 3c7dea7ce07..00000000000 --- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSink.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.spark; - -import org.apache.seatunnel.apis.base.api.BaseSink; - -import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; - -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; - -/** - * a base interface indicates a sink plugin running on Spark. - */ -public abstract class BaseSparkSink implements BaseSink { - - protected Config config = ConfigFactory.empty(); - - @Override - public void setConfig(Config config) { - this.config = config; - } - - @Override - public Config getConfig() { - return config; - } - - public abstract OUT output(Dataset data, SparkEnvironment env); -} diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java deleted file mode 100644 index 0e1a24fbfb6..00000000000 --- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.spark; - -import org.apache.seatunnel.apis.base.api.BaseSource; - -import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; - -/** - * a base interface indicates a source plugin running on Spark. - */ -public abstract class BaseSparkSource implements BaseSource { - - protected Config config = ConfigFactory.empty(); - - @Override - public void setConfig(Config config) { - this.config = config; - } - - @Override - public Config getConfig() { - return this.config; - } - - public abstract OUT getData(SparkEnvironment env); -} diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkTransform.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkTransform.java deleted file mode 100644 index b395ae09810..00000000000 --- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkTransform.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.spark; - -import org.apache.seatunnel.apis.base.api.BaseTransform; - -import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; - -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; - -/** - * a base interface indicates a transform plugin running on Spark. - */ -public abstract class BaseSparkTransform implements BaseTransform { - - protected Config config = ConfigFactory.empty(); - - @Override - public void setConfig(Config config) { - this.config = config; - } - - @Override - public Config getConfig() { - return this.config; - } - - public abstract Dataset process(Dataset data, SparkEnvironment env); -} diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java deleted file mode 100644 index 37af7cf34d3..00000000000 --- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.spark; - -import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME; -import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME; - -import org.apache.seatunnel.apis.base.env.RuntimeEnv; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.config.ConfigRuntimeException; -import org.apache.seatunnel.common.constants.JobMode; - -import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; - -import lombok.extern.slf4j.Slf4j; -import org.apache.spark.SparkConf; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.streaming.Seconds; -import org.apache.spark.streaming.StreamingContext; - -import java.net.URL; -import java.util.List; - -@Slf4j -public class SparkEnvironment implements RuntimeEnv { - - private static final long DEFAULT_SPARK_STREAMING_DURATION = 5; - - private SparkConf sparkConf; - - private SparkSession sparkSession; - - private StreamingContext streamingContext; - - private Config config = ConfigFactory.empty(); - - private boolean enableHive = false; - - private JobMode jobMode; - - public SparkEnvironment setEnableHive(boolean enableHive) { - this.enableHive = enableHive; - return this; - } - - @Override - public SparkEnvironment setConfig(Config config) { - this.config = config; - return this; - } - - @Override - public RuntimeEnv setJobMode(JobMode mode) { - this.jobMode = mode; - return this; - } - - @Override - public JobMode getJobMode() { - return jobMode; - } - - @Override - public Config getConfig() { - return this.config; - } - - @Override - public CheckResult checkConfig() { - return CheckResult.success(); - } - - @Override - public void registerPlugin(List pluginPaths) { - log.info("register plugins :" + pluginPaths); - // TODO we use --jar parameter to support submit multi-jar in spark cluster at now. Refactor it to - // support submit multi-jar in code or remove this logic. - // this.sparkSession.conf().set("spark.jars",pluginPaths.stream().map(URL::getPath).collect(Collectors.joining(","))); - } - - @Override - public SparkEnvironment prepare() { - sparkConf = createSparkConf(); - SparkSession.Builder builder = SparkSession.builder().config(sparkConf); - if (enableHive) { - builder.enableHiveSupport(); - } - this.sparkSession = builder.getOrCreate(); - createStreamingContext(); - return this; - } - - public SparkSession getSparkSession() { - return this.sparkSession; - } - - public StreamingContext getStreamingContext() { - return this.streamingContext; - } - - public SparkConf getSparkConf() { - return this.sparkConf; - } - - private SparkConf createSparkConf() { - SparkConf sparkConf = new SparkConf(); - this.config.entrySet().forEach(entry -> sparkConf.set(entry.getKey(), String.valueOf(entry.getValue().unwrapped()))); - if (config.hasPath("job.name")) { - sparkConf.setAppName(config.getString("job.name")); - } - return sparkConf; - } - - private void createStreamingContext() { - SparkConf conf = this.sparkSession.sparkContext().getConf(); - long duration = conf.getLong("spark.stream.batchDuration", DEFAULT_SPARK_STREAMING_DURATION); - if (this.streamingContext == null) { - this.streamingContext = new StreamingContext(sparkSession.sparkContext(), Seconds.apply(duration)); - } - } - - public static void registerTempView(String tableName, Dataset ds) { - ds.createOrReplaceTempView(tableName); - } - - public static Dataset registerInputTempView(BaseSparkSource> source, SparkEnvironment environment) { - Config config = source.getConfig(); - if (config.hasPath(RESULT_TABLE_NAME)) { - String tableName = config.getString(RESULT_TABLE_NAME); - Dataset data = source.getData(environment); - registerTempView(tableName, data); - return data; - } else { - throw new ConfigRuntimeException("Plugin[" + source.getClass().getName() + "] " + - "must be registered as dataset/table, please set \"" + RESULT_TABLE_NAME + "\" config"); - } - } - - public static Dataset transformProcess(SparkEnvironment environment, BaseSparkTransform transform, Dataset ds) { - Dataset fromDs; - Config config = transform.getConfig(); - if (config.hasPath(SOURCE_TABLE_NAME)) { - String sourceTableName = config.getString(SOURCE_TABLE_NAME); - fromDs = environment.getSparkSession().read().table(sourceTableName); - } else { - fromDs = ds; - } - return transform.process(fromDs, environment); - } - - public static void registerTransformTempView(BaseSparkTransform transform, Dataset ds) { - Config config = transform.getConfig(); - if (config.hasPath(RESULT_TABLE_NAME)) { - String resultTableName = config.getString(RESULT_TABLE_NAME); - registerTempView(resultTableName, ds); - } - } - - public static T sinkProcess(SparkEnvironment environment, BaseSparkSink sink, Dataset ds) { - Dataset fromDs; - Config config = sink.getConfig(); - if (config.hasPath(SOURCE_TABLE_NAME)) { - String sourceTableName = config.getString(SOURCE_TABLE_NAME); - fromDs = environment.getSparkSession().read().table(sourceTableName); - } else { - fromDs = ds; - } - return sink.output(fromDs, environment); - } -} - - - diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java deleted file mode 100644 index cfabb85b8e7..00000000000 --- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.spark.batch; - -import org.apache.seatunnel.apis.base.env.Execution; -import org.apache.seatunnel.spark.BaseSparkTransform; -import org.apache.seatunnel.spark.SparkEnvironment; - -import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; - -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; - -import java.util.List; -import java.util.stream.Collectors; - -public class SparkBatchExecution implements Execution { - - private final SparkEnvironment environment; - - private Config config = ConfigFactory.empty(); - - public SparkBatchExecution(SparkEnvironment environment) { - this.environment = environment; - } - - @Override - public void start(List sources, List transforms, List sinks) { - - List> sourceDatasets = sources.stream().map(source -> SparkEnvironment.registerInputTempView(source, environment)) - .collect(Collectors.toList()); - - if (!sources.isEmpty()) { - Dataset ds = sourceDatasets.get(0); - for (BaseSparkTransform transform : transforms) { - ds = SparkEnvironment.transformProcess(environment, transform, ds); - SparkEnvironment.registerTransformTempView(transform, ds); - } - for (SparkBatchSink sink : sinks) { - SparkEnvironment.sinkProcess(environment, sink, ds); - } - } - } - - @Override - public void setConfig(Config config) { - this.config = config; - } - - @Override - public Config getConfig() { - return this.config; - } - -} - diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSink.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSink.java deleted file mode 100644 index 38a1a7cd930..00000000000 --- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSink.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.spark.batch; - -import org.apache.seatunnel.spark.BaseSparkSink; - -import scala.Unit; - -/** - * a SparkBatchSink plugin will write data to other system - * using Spark DataSet API. - */ -@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") -public abstract class SparkBatchSink extends BaseSparkSink { -} diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSource.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSource.java deleted file mode 100644 index 515c19f8085..00000000000 --- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSource.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.spark.batch; - -import org.apache.seatunnel.spark.BaseSparkSource; - -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; - -/** - * a SparkBatchSource plugin will read data from other system - * using Spark DataSet API. - */ -@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") -public abstract class SparkBatchSource extends BaseSparkSource> { -} diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java deleted file mode 100644 index d6a9570e571..00000000000 --- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.spark.structuredstream; - -import org.apache.seatunnel.apis.base.env.Execution; -import org.apache.seatunnel.spark.BaseSparkTransform; -import org.apache.seatunnel.spark.SparkEnvironment; - -import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; - -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; - -import java.util.List; -import java.util.stream.Collectors; - -public class StructuredStreamingExecution implements Execution { - - private final SparkEnvironment sparkEnvironment; - - private Config config = ConfigFactory.empty(); - - public StructuredStreamingExecution(SparkEnvironment sparkEnvironment) { - this.sparkEnvironment = sparkEnvironment; - } - - @Override - public void start(List sources, List transforms, - List sinks) throws Exception { - - List> datasetList = sources.stream().map(s -> - SparkEnvironment.registerInputTempView(s, sparkEnvironment) - ).collect(Collectors.toList()); - if (datasetList.size() > 0) { - Dataset ds = datasetList.get(0); - for (BaseSparkTransform tf : transforms) { - ds = SparkEnvironment.transformProcess(sparkEnvironment, tf, ds); - SparkEnvironment.registerTransformTempView(tf, ds); - } - - for (StructuredStreamingSink sink : sinks) { - SparkEnvironment.sinkProcess(sparkEnvironment, sink, ds).start(); - } - sparkEnvironment.getSparkSession().streams().awaitAnyTermination(); - } - } - - @Override - public void setConfig(Config config) { - this.config = config; - } - - @Override - public Config getConfig() { - return this.config; - } - -} diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSink.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSink.java deleted file mode 100644 index ece636b80f7..00000000000 --- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSink.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.spark.structuredstream; - -import org.apache.seatunnel.spark.BaseSparkSink; - -import org.apache.spark.sql.Row; -import org.apache.spark.sql.streaming.DataStreamWriter; - -/** - * a StructuredStreamingSink plugin will write data to other system - * using Spark Structured streaming API. - */ -@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") -public abstract class StructuredStreamingSink extends BaseSparkSink> { -} diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSource.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSource.java deleted file mode 100644 index d72b3db260f..00000000000 --- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingSource.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.spark.structuredstream; - -import org.apache.seatunnel.spark.BaseSparkSource; - -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; - -/** - * a StructuredStreamingSource plugin will read data from other system - * using Spark Structured streaming API. - */ -@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") -public abstract class StructuredStreamingSource extends BaseSparkSource> { -} diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/utils/SparkStructTypeUtil.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/utils/SparkStructTypeUtil.java deleted file mode 100644 index 27833f6d3b4..00000000000 --- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/utils/SparkStructTypeUtil.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.spark.utils; - -import org.apache.seatunnel.common.config.ConfigRuntimeException; -import org.apache.seatunnel.common.utils.JsonUtils; - -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.StructType; - -import java.util.List; -import java.util.Map; - -public final class SparkStructTypeUtil { - - private SparkStructTypeUtil() { - } - - public static StructType getStructType(StructType schema, ObjectNode json) { - StructType newSchema = schema.copy(schema.fields()); - Map jsonMap = JsonUtils.toMap(json); - for (Map.Entry entry : jsonMap.entrySet()) { - String field = entry.getKey(); - Object type = entry.getValue(); - if (type instanceof ObjectNode) { - StructType st = getStructType(new StructType(), (ObjectNode) type); - newSchema = newSchema.add(field, st); - } else if (type instanceof List) { - List list = (List) type; - - if (list.size() == 0) { - newSchema = newSchema.add(field, DataTypes.createArrayType(null, true)); - } else { - Object o = list.get(0); - if (o instanceof ObjectNode) { - StructType st = getStructType(new StructType(), (ObjectNode) o); - newSchema = newSchema.add(field, DataTypes.createArrayType(st, true)); - } else { - DataType st = getType(o.toString()); - newSchema = newSchema.add(field, DataTypes.createArrayType(st, true)); - } - } - - } else { - newSchema = newSchema.add(field, getType(type.toString())); - } - } - return newSchema; - } - - private static DataType getType(String type) { - DataType dataType; - switch (type.toLowerCase()) { - case "string": - dataType = DataTypes.StringType; - break; - case "integer": - dataType = DataTypes.IntegerType; - break; - case "long": - dataType = DataTypes.LongType; - break; - case "double": - dataType = DataTypes.DoubleType; - break; - case "float": - dataType = DataTypes.FloatType; - break; - case "short": - dataType = DataTypes.ShortType; - break; - case "date": - dataType = DataTypes.DateType; - break; - case "timestamp": - dataType = DataTypes.TimestampType; - break; - case "boolean": - dataType = DataTypes.BooleanType; - break; - case "binary": - dataType = DataTypes.BinaryType; - break; - case "byte": - dataType = DataTypes.ByteType; - break; - default: - throw new ConfigRuntimeException("Throw data type exception, unknown type: " + type); - } - return dataType; - } -} diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala deleted file mode 100644 index afb62667fdd..00000000000 --- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.seatunnel.spark.stream - -import org.apache.seatunnel.apis.base.env.Execution -import org.apache.seatunnel.apis.base.plugin.Plugin -import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory} -import org.apache.seatunnel.spark.{BaseSparkSink, BaseSparkSource, BaseSparkTransform, SparkEnvironment} -import org.apache.spark.sql.{Dataset, Row} - -import java.util.{List => JList} -import scala.collection.JavaConversions._ - -class SparkStreamingExecution(sparkEnvironment: SparkEnvironment) - extends Execution[BaseSparkSource[_], BaseSparkTransform, BaseSparkSink[_], SparkEnvironment] { - - private var config = ConfigFactory.empty() - - override def start(sources: JList[BaseSparkSource[_]], transforms: JList[BaseSparkTransform], sinks: JList[BaseSparkSink[_]]): Unit = { - val source = sources.get(0).asInstanceOf[SparkStreamingSource[_]] - - sources.subList(1, sources.size()).foreach(s => { - SparkEnvironment.registerInputTempView( - s.asInstanceOf[BaseSparkSource[Dataset[Row]]], - sparkEnvironment) - }) - source.start( - sparkEnvironment, - dataset => { - val conf = source.getConfig - if (conf.hasPath(Plugin.RESULT_TABLE_NAME)) { - SparkEnvironment.registerTempView( - conf.getString(Plugin.RESULT_TABLE_NAME), - dataset) - } - var ds = dataset - - if (ds.take(1).length > 0) { - for (tf <- transforms) { - ds = SparkEnvironment.transformProcess(sparkEnvironment, tf, ds) - SparkEnvironment.registerTransformTempView(tf, ds) - } - } - - source.beforeOutput() - - if (ds.take(1).length > 0) { - sinks.foreach(sink => { - SparkEnvironment.sinkProcess(sparkEnvironment, sink, ds) - }) - } - - source.afterOutput() - }) - - val streamingContext = sparkEnvironment.getStreamingContext - streamingContext.start() - streamingContext.awaitTermination() - } - - override def setConfig(config: Config): Unit = this.config = config - - override def getConfig: Config = config - -} diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingSink.scala b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingSink.scala deleted file mode 100644 index 2b04d72997b..00000000000 --- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingSink.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.seatunnel.spark.stream - -import org.apache.seatunnel.spark.BaseSparkSink - -/** - * a SparkStreamingSink plugin will write data to other system - * using Spark Streaming API. - */ -trait SparkStreamingSink extends BaseSparkSink[Unit] {} diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingSource.scala b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingSource.scala deleted file mode 100644 index 7c949fa4484..00000000000 --- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingSource.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.seatunnel.spark.stream - -import org.apache.seatunnel.spark.{BaseSparkSource, SparkEnvironment} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Dataset, Row, SparkSession} -import org.apache.spark.streaming.dstream.DStream - -/** - * a SparkStreamingSource plugin will read data from other system - * using Spark Streaming API. - */ -trait SparkStreamingSource[T] extends BaseSparkSource[DStream[T]] { - - def beforeOutput(): Unit = {} - - def afterOutput(): Unit = {} - - def rdd2dataset(sparkSession: SparkSession, rdd: RDD[T]): Dataset[Row] - - def start(env: SparkEnvironment, handler: Dataset[Row] => Unit): Unit = { - getData(env).foreachRDD(rdd => { - val dataset = rdd2dataset(env.getSparkSession, rdd) - handler(dataset) - }) - } - -} From b111208948936b9e3f714d391ffdc943ad36e720 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Wed, 11 Jan 2023 20:15:46 +0800 Subject: [PATCH 2/4] [Improve][Core][API] Update main pom --- pom.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/pom.xml b/pom.xml index c567aa76623..7b03c043f96 100644 --- a/pom.xml +++ b/pom.xml @@ -78,7 +78,6 @@ seatunnel-config --> seatunnel-common - seatunnel-apis seatunnel-core seatunnel-transforms-v2 seatunnel-connectors-v2 From ed5ef7de5315c9612ee77b9198456c329adc28e4 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Wed, 11 Jan 2023 20:48:20 +0800 Subject: [PATCH 3/4] [Improve][Core][API] Fix dependencies --- tools/dependencies/known-dependencies.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index eca7004a1cb..21260623513 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -33,7 +33,6 @@ log4j-api-2.17.1.jar log4j-core-2.17.1.jar log4j-slf4j-impl-2.17.1.jar log4j-1.2-api-2.17.1.jar -lz4-1.3.0.jar orc-core-1.5.6.jar orc-shims-1.5.6.jar paranamer-2.7.jar From d8f9b9ebef7c7008a6fe1a30932a74d07324d7dc Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Wed, 11 Jan 2023 20:48:55 +0800 Subject: [PATCH 4/4] [Improve][Core][API] Fix CI --- .github/workflows/backend.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 8866d27f55c..d1025b0f41b 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -116,7 +116,6 @@ jobs: - "seatunnel-e2e/seatunnel-connector-v2-e2e/**" api: - "seatunnel-api/**" - - "seatunnel-apis/**" - "seatunnel-common/**" - "seatunnel-config/**" - "seatunnel-connectors/**"