diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java index 3f341b63f18..9c8d01cb327 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.fasterxml.jackson.core.type.TypeReference; +import lombok.NonNull; import org.apache.commons.lang3.StringUtils; import java.lang.reflect.Type; @@ -162,6 +163,18 @@ public Type getType() { }); } + /** + * Construct an option with multiple options and only one of them can be selected + */ + public SingleChoiceOptionBuilder singleChoice(@NonNull Class optionType, @NonNull List optionValues) { + return new SingleChoiceOptionBuilder(key, new TypeReference() { + @Override + public Type getType() { + return optionType; + } + }, optionValues); + } + /** * The value of the definition option should be represented as T. * @@ -205,4 +218,35 @@ public Option noDefaultValue() { return new Option<>(key, typeReference, null); } } + + public static class SingleChoiceOptionBuilder { + private final List optionValues; + private final String key; + private final TypeReference typeReference; + + SingleChoiceOptionBuilder(String key, TypeReference typeReference, List optionValues) { + this.optionValues = optionValues; + this.key = key; + this.typeReference = typeReference; + } + + /** + * Creates a Option with the given default value. + * + * @param value The default value for the config option + * @return The config option with the default value. + */ + public Option defaultValue(T value) { + return new SingleChoiceOption(key, typeReference, optionValues, value); + } + + /** + * Creates a Option without a default value. + * + * @return The config option without a default value. + */ + public Option noDefaultValue() { + return new SingleChoiceOption(key, typeReference, optionValues, null); + } + } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/SingleChoiceOption.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/SingleChoiceOption.java new file mode 100644 index 00000000000..a5130865641 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/SingleChoiceOption.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.configuration; + +import com.fasterxml.jackson.core.type.TypeReference; +import lombok.Getter; + +import java.util.List; + +public class SingleChoiceOption extends Option{ + + @Getter + private final List optionValues; + + public SingleChoiceOption(String key, + TypeReference typeReference, + List optionValues, + T defaultValue) { + super(key, typeReference, defaultValue); + this.optionValues = optionValues; + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigValidator.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigValidator.java index d6074173b94..8573abee928 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigValidator.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigValidator.java @@ -21,8 +21,12 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.SingleChoiceOption; + +import org.apache.commons.collections4.CollectionUtils; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -41,6 +45,38 @@ public void validate(OptionRule rule) { List requiredOptions = rule.getRequiredOptions(); for (RequiredOption requiredOption : requiredOptions) { validate(requiredOption); + requiredOption.getOptions().forEach(option -> { + if (SingleChoiceOption.class.isAssignableFrom(option.getClass())) { + validateSingleChoice(option); + } + }); + } + + for (Option option : rule.getOptionalOptions()) { + if (SingleChoiceOption.class.isAssignableFrom(option.getClass())) { + validateSingleChoice(option); + } + } + } + + void validateSingleChoice(Option option) { + SingleChoiceOption singleChoiceOption = (SingleChoiceOption) option; + List optionValues = singleChoiceOption.getOptionValues(); + if (CollectionUtils.isEmpty(optionValues)) { + throw new OptionValidationException("These options(%s) are SingleChoiceOption, the optionValues must not be null.", getOptionKeys( + Arrays.asList(singleChoiceOption))); + } + + Object o = singleChoiceOption.defaultValue(); + if (o != null && !optionValues.contains(o)) { + throw new OptionValidationException("These options(%s) are SingleChoiceOption, the defaultValue(%s) must be one of the optionValues.", getOptionKeys( + Arrays.asList(singleChoiceOption)), o); + } + + Object value = config.get(option); + if (value != null && !optionValues.contains(value)) { + throw new OptionValidationException("These options(%s) are SingleChoiceOption, the value(%s) must be one of the optionValues.", getOptionKeys( + Arrays.asList(singleChoiceOption)), value); } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java new file mode 100644 index 00000000000..8284d341dc0 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.sink; + +/** + * The SaveMode for the Sink connectors that use table or other table structures to organize data + */ +public enum DataSaveMode { + // Will drop table in MySQL, Will drop path for File Connector. + DROP_SCHEMA, + + // Only drop the data in MySQL, Only drop the files in the path for File Connector. + KEEP_SCHEMA_DROP_DATA, + + // Keep the table and data and continue to write data to the existing table for MySQL. Keep the path and files in the path, create new files in the path. + KEEP_SCHEMA_AND_DATA, + + // Throw error when table is exists for MySQL. Throw error when path is exists. + ERROR_WHEN_EXISTS +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java index 0bc266688ad..cdab3566e9a 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java @@ -22,6 +22,8 @@ public class SinkCommonOptions { + public static final String DATA_SAVE_MODE = "save_mode"; + public static final Option SOURCE_TABLE_NAME = Options.key("source_table_name") .stringType() diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportDataSaveMode.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportDataSaveMode.java new file mode 100644 index 00000000000..6b46b0fa014 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportDataSaveMode.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.sink; + +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import java.util.List; +import java.util.Locale; + +/** + * The Sink Connectors which support data SaveMode should implement this interface + */ +public interface SupportDataSaveMode { + + /** + * We hope every sink connector use the same option name to config SaveMode, So I add checkOptions method to this interface. + * checkOptions method have a default implement to check whether `save_mode` parameter is in config. + * + * @param config config of sink Connector + */ + default void checkOptions(Config config) { + if (config.hasPath(SinkCommonOptions.DATA_SAVE_MODE)) { + String tableSaveMode = config.getString(SinkCommonOptions.DATA_SAVE_MODE); + DataSaveMode dataSaveMode = DataSaveMode.valueOf(tableSaveMode.toUpperCase(Locale.ROOT)); + if (!supportedDataSaveModeValues().contains(dataSaveMode)) { + throw new SeaTunnelRuntimeException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + "This connector don't support save mode: " + dataSaveMode); + } + } + } + + /** + * Get the {@link DataSaveMode} that the user configured + * @return DataSaveMode + */ + DataSaveMode getDataSaveMode(); + + /** + * Return the {@link DataSaveMode} list supported by this connector + * @return the list of supported data save modes + */ + List supportedDataSaveModeValues(); + + /** + * The implementation of specific logic according to different {@link DataSaveMode} + * @param saveMode data save mode + */ + void handleSaveMode(DataSaveMode saveMode); +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java index 489091f400d..0f9d1cdff55 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java @@ -17,14 +17,20 @@ package org.apache.seatunnel.api.table.factory; +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.DataSaveMode; import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkCommonOptions; +import org.apache.seatunnel.api.sink.SupportDataSaveMode; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceCommonOptions; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.source.SupportParallelism; import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.connector.TableSource; import lombok.NonNull; @@ -162,7 +168,7 @@ public static List discoverFactories(ClassLoader classLoader) { /** * This method is called by SeaTunnel Web to get the full option rule of a source. - * @return + * @return Option rule */ public static OptionRule sourceFullOptionRule(@NonNull TableSourceFactory factory) { OptionRule sourceOptionRule = factory.optionRule(); @@ -179,4 +185,34 @@ public static OptionRule sourceFullOptionRule(@NonNull TableSourceFactory factor return sourceOptionRule; } + + /** + * This method is called by SeaTunnel Web to get the full option rule of a sink. + * @return Option rule + */ + public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory factory) { + OptionRule sinkOptionRule = factory.optionRule(); + if (sinkOptionRule == null) { + throw new FactoryException("sinkOptionRule can not be null"); + } + + try { + TableSink sink = factory.createSink(null); + if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) { + SupportDataSaveMode supportDataSaveModeSink = (SupportDataSaveMode) sink; + Option saveMode = + Options.key(SinkCommonOptions.DATA_SAVE_MODE) + .singleChoice(DataSaveMode.class, supportDataSaveModeSink.supportedDataSaveModeValues()) + .noDefaultValue() + .withDescription("data save mode"); + OptionRule sinkCommonOptionRule = + OptionRule.builder().required(saveMode).build(); + sinkOptionRule.getOptionalOptions().addAll(sinkCommonOptionRule.getOptionalOptions()); + } + } catch (UnsupportedOperationException e) { + LOG.warn("Add save mode option need sink connector support create sink by TableSinkFactory"); + } + + return sinkOptionRule; + } } diff --git a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigValidatorTest.java b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigValidatorTest.java index ba9acec79b3..c993081ca6a 100644 --- a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigValidatorTest.java +++ b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/ConfigValidatorTest.java @@ -63,6 +63,18 @@ public class ConfigValidatorTest { .noDefaultValue() .withDescription("base64 encoded kerberos ticket of the Neo4j. for Auth."); + public static final Option SINGLE_CHOICE_TEST = + Options.key("single_choice_test") + .singleChoice(String.class, Arrays.asList("A", "B", "C")) + .defaultValue("M") + .withDescription("test single choice error"); + + public static final Option SINGLE_CHOICE_VALUE_TEST = + Options.key("single_choice_test") + .singleChoice(String.class, Arrays.asList("A", "B", "C")) + .defaultValue("A") + .withDescription("test single choice value"); + void validate(Map config, OptionRule rule) { ConfigValidator.of(ReadonlyConfig.fromMap(config)).validate(rule); } @@ -245,4 +257,28 @@ public void testComplexConditionalRequiredOptions() { config.put(KEY_USERNAME.key(), "asuka111"); Assertions.assertDoesNotThrow(executable); } + + @Test + public void testSingleChoiceOptionDefaultValueValidator() { + OptionRule optionRule = OptionRule.builder().required(SINGLE_CHOICE_TEST).build(); + Map config = new HashMap<>(); + config.put(SINGLE_CHOICE_TEST.key(), "A"); + Executable executable = () -> validate(config, optionRule); + assertEquals("ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - These options('single_choice_test') are SingleChoiceOption, the defaultValue(M) must be one of the optionValues.", + assertThrows(OptionValidationException.class, executable).getMessage()); + } + + @Test + public void testSingleChoiceOptionValueValidator() { + OptionRule optionRule = OptionRule.builder().required(SINGLE_CHOICE_VALUE_TEST).build(); + Map config = new HashMap<>(); + config.put(SINGLE_CHOICE_VALUE_TEST.key(), "A"); + Executable executable = () -> validate(config, optionRule); + Assertions.assertDoesNotThrow(executable); + + config.put(SINGLE_CHOICE_VALUE_TEST.key(), "N"); + executable = () -> validate(config, optionRule); + assertEquals("ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - These options('single_choice_test') are SingleChoiceOption, the value(N) must be one of the optionValues.", + assertThrows(OptionValidationException.class, executable).getMessage()); + } } diff --git a/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/SingleChoiceOptionTest.java b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/SingleChoiceOptionTest.java new file mode 100644 index 00000000000..821e5faafcc --- /dev/null +++ b/seatunnel-api/src/test/java/org/apache/seatunnel/api/configuration/util/SingleChoiceOptionTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.configuration.util; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.SingleChoiceOption; +import org.apache.seatunnel.api.sink.DataSaveMode; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +public class SingleChoiceOptionTest { + + @Test + public void test() { + Option stringOption = + Options.key("test_single_choice").singleChoice(String.class, Arrays.asList("A", "B", "C")) + .defaultValue("A"); + + Option saveModeOption = + Options.key("save_mode") + .singleChoice(DataSaveMode.class, Arrays.asList(DataSaveMode.DROP_SCHEMA, DataSaveMode.KEEP_SCHEMA_DROP_DATA)) + .defaultValue(DataSaveMode.DROP_SCHEMA) + .withDescription("save mode test"); + + OptionRule build = OptionRule.builder().optional(stringOption, saveModeOption).build(); + List> optionalOptions = build.getOptionalOptions(); + Option option = optionalOptions.get(0); + Assertions.assertTrue(SingleChoiceOption.class.isAssignableFrom(option.getClass())); + SingleChoiceOption singleChoiceOption = (SingleChoiceOption) option; + Assertions.assertEquals(3, singleChoiceOption.getOptionValues().size()); + Assertions.assertEquals("A", singleChoiceOption.defaultValue()); + + option = optionalOptions.get(1); + singleChoiceOption = (SingleChoiceOption) option; + Assertions.assertEquals(2, singleChoiceOption.getOptionValues().size()); + Assertions.assertEquals(DataSaveMode.DROP_SCHEMA, singleChoiceOption.defaultValue()); + } +} diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index f9eb1df42cc..a6e5cccfdf8 100644 --- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -18,7 +18,9 @@ package org.apache.seatunnel.core.starter.flink.execution; import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.sink.DataSaveMode; import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SupportDataSaveMode; import org.apache.seatunnel.api.source.SourceCommonOptions; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -63,6 +65,10 @@ protected List> execute(List> upstreamDataStreams) SeaTunnelSink seaTunnelSink = plugins.get(i); DataStream stream = fromSourceTable(sinkConfig).orElse(input); seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType())); + if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) { + SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink; + DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode(); + saveModeSink.handleSaveMode(dataSaveMode); + } DataStreamSink dataStreamSink = stream.sinkTo(new FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName()); if (sinkConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) { int parallelism = sinkConfig.getInt(SourceCommonOptions.PARALLELISM.key()); diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index 44707668821..9693c998858 100644 --- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -19,8 +19,10 @@ import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.env.EnvCommonOptions; +import org.apache.seatunnel.api.sink.DataSaveMode; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkCommonOptions; +import org.apache.seatunnel.api.sink.SupportDataSaveMode; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.core.starter.enums.PluginType; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; @@ -54,16 +56,20 @@ protected SinkExecuteProcessor(SparkRuntimeEnvironment sparkRuntimeEnvironment, SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(); List pluginJars = new ArrayList<>(); List> sinks = pluginConfigs.stream() - .map(sinkConfig -> { - PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, sinkConfig.getString(PLUGIN_NAME)); - pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier))); - SeaTunnelSink seaTunnelSink = sinkPluginDiscovery.createPluginInstance(pluginIdentifier); - seaTunnelSink.prepare(sinkConfig); - seaTunnelSink.setJobContext(jobContext); - return seaTunnelSink; - }) - .distinct() - .collect(Collectors.toList()); + .map(sinkConfig -> { + PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, sinkConfig.getString(PLUGIN_NAME)); + pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier))); + SeaTunnelSink seaTunnelSink = sinkPluginDiscovery.createPluginInstance(pluginIdentifier); + seaTunnelSink.prepare(sinkConfig); + seaTunnelSink.setJobContext(jobContext); + if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) { + SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink; + saveModeSink.checkOptions(sinkConfig); + } + return seaTunnelSink; + }) + .distinct() + .collect(Collectors.toList()); sparkRuntimeEnvironment.registerPlugin(pluginJars); return sinks; } @@ -80,14 +86,20 @@ public List> execute(List> upstreamDataStreams) throws parallelism = sinkConfig.getInt(SinkCommonOptions.PARALLELISM.key()); } else { parallelism = sparkRuntimeEnvironment.getSparkConf() - .getInt(EnvCommonOptions.PARALLELISM.key(), EnvCommonOptions.PARALLELISM.defaultValue()); + .getInt(EnvCommonOptions.PARALLELISM.key(), EnvCommonOptions.PARALLELISM.defaultValue()); } dataset.sparkSession().read().option(SinkCommonOptions.PARALLELISM.key(), parallelism); // TODO modify checkpoint location seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(dataset.schema())); + if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) { + SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink; + DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode(); + saveModeSink.handleSaveMode(dataSaveMode); + } SparkSinkInjector.inject(dataset.write(), seaTunnelSink).option("checkpointLocation", "/tmp").save(); } // the sink is the last stream return null; } + } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java index a7162815794..6086670bf05 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SupportDataSaveMode; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.transform.SeaTunnelTransform; @@ -79,6 +80,11 @@ public static ImmutablePair(seaTunnelSink, new HashSet<>(pluginJarPaths)); } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java index 322636ef694..5cd880b9a8f 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java @@ -18,8 +18,10 @@ package org.apache.seatunnel.engine.core.parse; import org.apache.seatunnel.api.env.EnvCommonOptions; +import org.apache.seatunnel.api.sink.DataSaveMode; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkCommonOptions; +import org.apache.seatunnel.api.sink.SupportDataSaveMode; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceCommonOptions; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; @@ -200,7 +202,13 @@ private void complexAnalyze(List sourceConfigs, } else { dataType = transformAnalyze(sourceTableName, sinkAction); } - sinkListImmutablePair.getLeft().setTypeInfo((SeaTunnelRowType) dataType); + SeaTunnelSink seaTunnelSink = sinkListImmutablePair.getLeft(); + seaTunnelSink.setTypeInfo((SeaTunnelRowType) dataType); + if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) { + SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink; + DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode(); + saveModeSink.handleSaveMode(dataSaveMode); + } } } @@ -353,8 +361,14 @@ private void sampleAnalyze(List sourceConfigs, sinkListImmutablePair.getLeft(), sinkListImmutablePair.getRight() ); - sinkAction.getSink().setTypeInfo((SeaTunnelRowType) dataType); + SeaTunnelSink seaTunnelSink = sinkAction.getSink(); + seaTunnelSink.setTypeInfo((SeaTunnelRowType) dataType); sinkAction.setParallelism(sinkUpstreamAction.getParallelism()); + if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) { + SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink; + DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode(); + saveModeSink.handleSaveMode(dataSaveMode); + } actions.add(sinkAction); } diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java index e632fead5c0..c10569a8104 100644 --- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java +++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java @@ -236,7 +236,7 @@ public Map> getAllPlugin PluginType.SINK.getType(), plugin.factoryIdentifier() ), - plugin.optionRule()); + FactoryUtil.sinkFullOptionRule((TableSinkFactory) plugin)); return; }