From 0c5224aa860fc03e67029b9ebc9f2295bcb6bc29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E7=8E=8B?= <58297137+chl-wxp@users.noreply.github.com> Date: Sun, 15 Sep 2024 10:33:04 +0800 Subject: [PATCH 1/5] [feature] sftp support multiple and save_mode --- docs/en/connector-v2/sink/SftpFile.md | 44 ++++++++ .../file/sftp/catalog/SftpFileCatalog.java | 29 +++++ .../sftp/catalog/SftpFileCatalogFactory.java | 53 +++++++++ .../seatunnel/file/sftp/config/SftpConf.java | 19 ++-- .../file/sftp/sink/SftpFileSink.java | 41 ++----- .../file/sftp/sink/SftpFileSinkFactory.java | 24 +++- .../file/sftp/source/SftpFileSource.java | 3 +- .../e2e/connector/file/fstp/SftpFileIT.java | 78 +++++++++++++ .../text/multiple_fake_to_sftp_file_text.conf | 103 ++++++++++++++++++ .../multiple_fake_to_sftp_file_text_2.conf | 102 +++++++++++++++++ 10 files changed, 447 insertions(+), 49 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/catalog/SftpFileCatalog.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/catalog/SftpFileCatalogFactory.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/multiple_fake_to_sftp_file_text.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/multiple_fake_to_sftp_file_text_2.conf diff --git a/docs/en/connector-v2/sink/SftpFile.md b/docs/en/connector-v2/sink/SftpFile.md index a383cc72da5..4509baa7cff 100644 --- a/docs/en/connector-v2/sink/SftpFile.md +++ b/docs/en/connector-v2/sink/SftpFile.md @@ -63,6 +63,8 @@ By default, we use 2PC commit to ensure `exactly-once` | parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. | | parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. | | encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. | +| schema_save_mode | string | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Existing dir processing method | +| data_save_mode | string | no | APPEND_DATA | Existing data processing method | ### host [string] @@ -220,6 +222,19 @@ Support writing Parquet INT96 from a 12-byte field, only valid for parquet files Only used when file_format_type is json,text,csv,xml. The encoding of the file to write. This param will be parsed by `Charset.forName(encoding)`. +### schema_save_mode [string] +Existing dir processing method. +- RECREATE_SCHEMA: will create when the dir does not exist, delete and recreate when the dir is exist +- CREATE_SCHEMA_WHEN_NOT_EXIST: will create when the dir does not exist, skipped when the dir is exist +- ERROR_WHEN_SCHEMA_NOT_EXIST: error will be reported when the dir does not exist +- IGNORE :Ignore the treatment of the table + +### data_save_mode [string] +Existing data processing method. +- DROP_DATA: preserve dir and delete data files +- APPEND_DATA: preserve dir, preserve data files +- ERROR_WHEN_DATA_EXISTS: when there is data files, an error is reported + ## Example For text file format with `have_partition` and `custom_filename` and `sink_columns` @@ -247,6 +262,35 @@ SftpFile { is_enable_transaction = true } +``` + +When our source end is multiple tables, and wants different expressions to different directory, we can configure this way + +```hocon +SftpFile { + host = "xxx.xxx.xxx.xxx" + port = 22 + user = "username" + password = "password" + path = "/data/sftp/seatunnel/job1/${table_name}" + tmp_path = "/data/sftp/seatunnel/tmp" + file_format_type = "text" + field_delimiter = "\t" + row_delimiter = "\n" + have_partition = true + partition_by = ["age"] + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + custom_filename = true + file_name_expression = "${transactionId}_${now}" + filename_time_format = "yyyy.MM.dd" + sink_columns = ["name","age"] + is_enable_transaction = true + schema_save_mode=RECREATE_SCHEMA + data_save_mode=DROP_DATA +} + + ``` ## Changelog diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/catalog/SftpFileCatalog.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/catalog/SftpFileCatalog.java new file mode 100644 index 00000000000..d6c1d4d11b5 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/catalog/SftpFileCatalog.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.sftp.catalog; + +import org.apache.seatunnel.connectors.seatunnel.file.catalog.AbstractFileCatalog; +import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy; + +public class SftpFileCatalog extends AbstractFileCatalog { + + public SftpFileCatalog( + HadoopFileSystemProxy hadoopFileSystemProxy, String filePath, String catalogName) { + super(hadoopFileSystemProxy, filePath, catalogName); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/catalog/SftpFileCatalogFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/catalog/SftpFileCatalogFactory.java new file mode 100644 index 00000000000..283169c2506 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/catalog/SftpFileCatalogFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.sftp.catalog; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.factory.CatalogFactory; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy; +import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConf; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class SftpFileCatalogFactory implements CatalogFactory { + @Override + public Catalog createCatalog(String catalogName, ReadonlyConfig options) { + HadoopFileSystemProxy fileSystemUtils = + new HadoopFileSystemProxy(SftpConf.buildWithConfig(options)); + return new SftpFileCatalog( + fileSystemUtils, + options.get(BaseSourceConfigOptions.FILE_PATH), + factoryIdentifier()); + } + + @Override + public String factoryIdentifier() { + return FileSystemType.SFTP.getFileSystemPluginName(); + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder().build(); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SftpConf.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SftpConf.java index 78b4e1ef71b..5353fecae53 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SftpConf.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SftpConf.java @@ -17,8 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.sftp.config; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import java.util.HashMap; @@ -42,20 +41,16 @@ public String getSchema() { return SCHEMA; } - public static HadoopConf buildWithConfig(Config config) { - String host = config.getString(SftpConfigOptions.SFTP_HOST.key()); - int port = config.getInt(SftpConfigOptions.SFTP_PORT.key()); + public static HadoopConf buildWithConfig(ReadonlyConfig config) { + String host = config.get(SftpConfigOptions.SFTP_HOST); + int port = config.get(SftpConfigOptions.SFTP_PORT); String defaultFS = String.format("sftp://%s:%s", host, port); HadoopConf hadoopConf = new SftpConf(defaultFS); HashMap sftpOptions = new HashMap<>(); + sftpOptions.put("fs.sftp.user." + host, config.get(SftpConfigOptions.SFTP_USER)); sftpOptions.put( - "fs.sftp.user." + host, config.getString(SftpConfigOptions.SFTP_USER.key())); - sftpOptions.put( - "fs.sftp.password." - + host - + "." - + config.getString(SftpConfigOptions.SFTP_USER.key()), - config.getString(SftpConfigOptions.SFTP_PASSWORD.key())); + "fs.sftp.password." + host + "." + config.get(SftpConfigOptions.SFTP_USER), + config.get(SftpConfigOptions.SFTP_PASSWORD)); hadoopConf.setExtraOptions(sftpOptions); return hadoopConf; } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java index 56a8d879918..dd3f1080b88 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java @@ -17,46 +17,19 @@ package org.apache.seatunnel.connectors.seatunnel.file.sftp.sink; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.common.PrepareFailException; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.api.sink.SeaTunnelSink; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; -import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConf; -import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConfigOptions; -import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink; +import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink; -import com.google.auto.service.AutoService; +public class SftpFileSink extends BaseMultipleTableFileSink { + public SftpFileSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { + super(SftpConf.buildWithConfig(readonlyConfig), readonlyConfig, catalogTable); + } -@AutoService(SeaTunnelSink.class) -public class SftpFileSink extends BaseFileSink { @Override public String getPluginName() { return FileSystemType.SFTP.getFileSystemPluginName(); } - - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { - CheckResult result = - CheckConfigUtil.checkAllExists( - pluginConfig, - SftpConfigOptions.SFTP_HOST.key(), - SftpConfigOptions.SFTP_PORT.key(), - SftpConfigOptions.SFTP_USER.key(), - SftpConfigOptions.SFTP_PASSWORD.key()); - if (!result.isSuccess()) { - throw new FileConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SINK, result.getMsg())); - } - super.prepare(pluginConfig); - hadoopConf = SftpConf.buildWithConfig(pluginConfig); - } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java index e8116c35f18..22f42d2bf9e 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java @@ -17,18 +17,27 @@ package org.apache.seatunnel.connectors.seatunnel.file.sftp.sink; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.factory.BaseMultipleTableFileSinkFactory; import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConfigOptions; +import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState; import com.google.auto.service.AutoService; @AutoService(Factory.class) -public class SftpFileSinkFactory implements TableSinkFactory { +public class SftpFileSinkFactory extends BaseMultipleTableFileSinkFactory { @Override public String factoryIdentifier() { return FileSystemType.SFTP.getFileSystemPluginName(); @@ -43,6 +52,9 @@ public OptionRule optionRule() { .required(SftpConfigOptions.SFTP_USER) .required(SftpConfigOptions.SFTP_PASSWORD) .optional(BaseSinkConfig.FILE_FORMAT_TYPE) + .optional(BaseSinkConfig.SCHEMA_SAVE_MODE) + .optional(BaseSinkConfig.DATA_SAVE_MODE) + .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) .conditional( BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, @@ -93,4 +105,12 @@ public OptionRule optionRule() { .optional(BaseSinkConfig.TIME_FORMAT) .build(); } + + @Override + public TableSink + createSink(TableSinkFactoryContext context) { + ReadonlyConfig readonlyConfig = context.getOptions(); + CatalogTable catalogTable = context.getCatalogTable(); + return () -> new SftpFileSink(readonlyConfig, catalogTable); + } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java index d20823adc19..04e6620a757 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; @@ -78,7 +79,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { "Sftp file source connector only support read [text, csv, json, xml] files"); } String path = pluginConfig.getString(SftpConfigOptions.FILE_PATH.key()); - hadoopConf = SftpConf.buildWithConfig(pluginConfig); + hadoopConf = SftpConf.buildWithConfig(ReadonlyConfig.fromConfig(pluginConfig)); readStrategy = ReadStrategyFactory.of( pluginConfig.getString(SftpConfigOptions.FILE_FORMAT_TYPE.key())); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java index 9645268882e..7d61e33d437 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java @@ -19,22 +19,33 @@ import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.container.TestContainerId; import org.apache.seatunnel.e2e.common.container.TestHelper; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.util.ContainerUtil; +import org.apache.commons.lang3.StringUtils; + import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.GenericContainer; import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.github.dockerjava.core.command.ExecStartResultCallback; +import com.github.dockerjava.api.command.ExecCreateCmdResponse; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.stream.Stream; @DisabledOnContainer( @@ -102,6 +113,10 @@ public void startUp() throws Exception { } @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.FLINK}, + disabledReason = "Flink dosen't support multi-table at now") public void testSftpFileReadAndWrite(TestContainer container) throws IOException, InterruptedException { TestHelper helper = new TestHelper(container); @@ -131,6 +146,69 @@ public void testSftpFileReadAndWrite(TestContainer container) helper.execute("/xml/sftp_file_xml_to_assert.conf"); } + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.FLINK}, + disabledReason = "Flink dosen't support multi-table at now") + public void testMultipleTableAndSaveMode(TestContainer container) + throws IOException, InterruptedException { + TestHelper helper = new TestHelper(container); + // test mult table and save_mode:RECREATE_SCHEMA DROP_DATA + String homePath = "/home/seatunnel"; + String path1 = "/tmp/multiple_1/seatunnel/text/source_1"; + String path2 = "/tmp/multiple_1/seatunnel/text/source_2"; + Assertions.assertEquals(getFileListFromContainer(homePath + path1).size(), 0); + Assertions.assertEquals(getFileListFromContainer(homePath + path2).size(), 0); + helper.execute("/text/multiple_fake_to_sftp_file_text.conf"); + Assertions.assertEquals(getFileListFromContainer(homePath + path1).size(), 1); + Assertions.assertEquals(getFileListFromContainer(homePath + path2).size(), 1); + helper.execute("/text/multiple_fake_to_sftp_file_text.conf"); + Assertions.assertEquals(getFileListFromContainer(homePath + path1).size(), 1); + Assertions.assertEquals(getFileListFromContainer(homePath + path2).size(), 1); + // test mult table and save_mode:CREATE_SCHEMA_WHEN_NOT_EXIST APPEND_DATA + String path3 = "/tmp/multiple_2/seatunnel/text/source_1"; + String path4 = "/tmp/multiple_2/seatunnel/text/source_2"; + Assertions.assertEquals(getFileListFromContainer(homePath + path3).size(), 0); + Assertions.assertEquals(getFileListFromContainer(homePath + path4).size(), 0); + helper.execute("/text/multiple_fake_to_sftp_file_text_2.conf"); + Assertions.assertEquals(getFileListFromContainer(homePath + path3).size(), 1); + Assertions.assertEquals(getFileListFromContainer(homePath + path4).size(), 1); + helper.execute("/text/multiple_fake_to_sftp_file_text_2.conf"); + Assertions.assertEquals(getFileListFromContainer(homePath + path3).size(), 2); + Assertions.assertEquals(getFileListFromContainer(homePath + path4).size(), 2); + } + + @SneakyThrows + private List getFileListFromContainer(String path) { + String command = "ls -1 " + path; + ExecCreateCmdResponse execCreateCmdResponse = + dockerClient + .execCreateCmd(sftpContainer.getContainerId()) + .withCmd("sh", "-c", command) + .withAttachStdout(true) + .withAttachStderr(true) + .exec(); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + dockerClient + .execStartCmd(execCreateCmdResponse.getId()) + .exec(new ExecStartResultCallback(outputStream, System.err)) + .awaitCompletion(); + + String output = new String(outputStream.toByteArray(), StandardCharsets.UTF_8).trim(); + List fileList = new ArrayList<>(); + log.info("container path file list is :{}", output); + String[] files = output.split("\n"); + for (String file : files) { + if (StringUtils.isNotEmpty(file)) { + log.info("container path file name is :{}", file); + fileList.add(file); + } + } + return fileList; + } + @AfterAll @Override public void tearDown() { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/multiple_fake_to_sftp_file_text.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/multiple_fake_to_sftp_file_text.conf new file mode 100644 index 00000000000..9d659f27ca2 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/multiple_fake_to_sftp_file_text.conf @@ -0,0 +1,103 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + result_table_name = "sftp" + tables_configs = [ + { + schema = { + table = "source_1" + fields { + id = int + val_bool = boolean + val_tinyint = tinyint + val_smallint = smallint + val_int = int + val_bigint = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"] + } + ] + }, + { + schema = { + table = "source_2" + fields { + id = int + val_bool = boolean + val_tinyint = tinyint + val_smallint = smallint + val_int = int + val_bigint = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3] + } + ] + } + ] + } +} + + +sink { + SftpFile { + host = "sftp" + port = 22 + user = seatunnel + password = pass + path = "tmp/multiple_1/seatunnel/text/${table_name}" + source_table_name = "sftp" + row_delimiter = "\n" + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "text" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + compress_codec = "lzo" + "schema_save_mode"="RECREATE_SCHEMA" + "data_save_mode"="DROP_DATA" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/multiple_fake_to_sftp_file_text_2.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/multiple_fake_to_sftp_file_text_2.conf new file mode 100644 index 00000000000..7ec0143bb87 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/multiple_fake_to_sftp_file_text_2.conf @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + result_table_name = "sftp" + tables_configs = [ + { + schema = { + table = "source_1" + fields { + id = int + val_bool = boolean + val_tinyint = tinyint + val_smallint = smallint + val_int = int + val_bigint = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"] + } + ] + }, + { + schema = { + table = "source_2" + fields { + id = int + val_bool = boolean + val_tinyint = tinyint + val_smallint = smallint + val_int = int + val_bigint = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3] + } + ] + } + ] + } +} + +sink { + SftpFile { + host = "sftp" + port = 22 + user = seatunnel + password = pass + path = "tmp/multiple_2/seatunnel/text/${table_name}" + source_table_name = "sftp" + row_delimiter = "\n" + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "text" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + compress_codec = "lzo" + "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST" + "data_save_mode"="APPEND_DATA" + } +} \ No newline at end of file From 8500b7dcb58b19f4288c8bd3996b7d5ba4864f92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E7=8E=8B?= <58297137+chl-wxp@users.noreply.github.com> Date: Sun, 15 Sep 2024 10:36:53 +0800 Subject: [PATCH 2/5] [feature] sftp support multiple and save_mode 2 --- .../apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java index 7d61e33d437..68746e19d93 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java @@ -113,10 +113,6 @@ public void startUp() throws Exception { } @TestTemplate - @DisabledOnContainer( - value = {}, - type = {EngineType.FLINK}, - disabledReason = "Flink dosen't support multi-table at now") public void testSftpFileReadAndWrite(TestContainer container) throws IOException, InterruptedException { TestHelper helper = new TestHelper(container); From fc1b4ab0d75d329bfb4fcd787ee9d468260bb063 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E7=8E=8B?= <58297137+chl-wxp@users.noreply.github.com> Date: Sun, 15 Sep 2024 21:33:26 +0800 Subject: [PATCH 3/5] [feature] sftp support multiple and save_mode 3 --- .../apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java index 68746e19d93..c461f7fda81 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java @@ -145,7 +145,7 @@ public void testSftpFileReadAndWrite(TestContainer container) @TestTemplate @DisabledOnContainer( value = {}, - type = {EngineType.FLINK}, + type = {EngineType.FLINK, EngineType.SPARK}, disabledReason = "Flink dosen't support multi-table at now") public void testMultipleTableAndSaveMode(TestContainer container) throws IOException, InterruptedException { From 042d492d25545d91133e4d825e8101bbae252d14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E7=8E=8B?= <58297137+chl-wxp@users.noreply.github.com> Date: Sun, 29 Sep 2024 13:35:14 +0800 Subject: [PATCH 4/5] [feature] sftp support multiple and save_mode 4 --- .../e2e/connector/file/fstp/SftpFileIT.java | 32 ++++++++++++++++--- ...ltiple_fake_to_sftp_file_text_append.conf} | 0 ...ke_to_sftp_file_text_recreate_schema.conf} | 0 3 files changed, 27 insertions(+), 5 deletions(-) rename seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/{multiple_fake_to_sftp_file_text_2.conf => multiple_fake_to_sftp_file_text_append.conf} (100%) rename seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/{multiple_fake_to_sftp_file_text.conf => multiple_fake_to_sftp_file_text_recreate_schema.conf} (100%) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java index c461f7fda81..9b4af990c72 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java @@ -145,7 +145,7 @@ public void testSftpFileReadAndWrite(TestContainer container) @TestTemplate @DisabledOnContainer( value = {}, - type = {EngineType.FLINK, EngineType.SPARK}, + type = {EngineType.FLINK}, disabledReason = "Flink dosen't support multi-table at now") public void testMultipleTableAndSaveMode(TestContainer container) throws IOException, InterruptedException { @@ -154,23 +154,27 @@ public void testMultipleTableAndSaveMode(TestContainer container) String homePath = "/home/seatunnel"; String path1 = "/tmp/multiple_1/seatunnel/text/source_1"; String path2 = "/tmp/multiple_1/seatunnel/text/source_2"; + deleteFileFromContainer(homePath + path1); + deleteFileFromContainer(homePath + path2); Assertions.assertEquals(getFileListFromContainer(homePath + path1).size(), 0); Assertions.assertEquals(getFileListFromContainer(homePath + path2).size(), 0); - helper.execute("/text/multiple_fake_to_sftp_file_text.conf"); + helper.execute("/text/multiple_fake_to_sftp_file_text_recreate_schema.conf"); Assertions.assertEquals(getFileListFromContainer(homePath + path1).size(), 1); Assertions.assertEquals(getFileListFromContainer(homePath + path2).size(), 1); - helper.execute("/text/multiple_fake_to_sftp_file_text.conf"); + helper.execute("/text/multiple_fake_to_sftp_file_text_recreate_schema.conf"); Assertions.assertEquals(getFileListFromContainer(homePath + path1).size(), 1); Assertions.assertEquals(getFileListFromContainer(homePath + path2).size(), 1); // test mult table and save_mode:CREATE_SCHEMA_WHEN_NOT_EXIST APPEND_DATA String path3 = "/tmp/multiple_2/seatunnel/text/source_1"; String path4 = "/tmp/multiple_2/seatunnel/text/source_2"; + deleteFileFromContainer(homePath + path3); + deleteFileFromContainer(homePath + path4); Assertions.assertEquals(getFileListFromContainer(homePath + path3).size(), 0); Assertions.assertEquals(getFileListFromContainer(homePath + path4).size(), 0); - helper.execute("/text/multiple_fake_to_sftp_file_text_2.conf"); + helper.execute("/text/multiple_fake_to_sftp_file_text_append.conf"); Assertions.assertEquals(getFileListFromContainer(homePath + path3).size(), 1); Assertions.assertEquals(getFileListFromContainer(homePath + path4).size(), 1); - helper.execute("/text/multiple_fake_to_sftp_file_text_2.conf"); + helper.execute("/text/multiple_fake_to_sftp_file_text_append.conf"); Assertions.assertEquals(getFileListFromContainer(homePath + path3).size(), 2); Assertions.assertEquals(getFileListFromContainer(homePath + path4).size(), 2); } @@ -205,6 +209,24 @@ private List getFileListFromContainer(String path) { return fileList; } + @SneakyThrows + private void deleteFileFromContainer(String path) { + String command = "rm -rf " + path; + ExecCreateCmdResponse execCreateCmdResponse = + dockerClient + .execCreateCmd(sftpContainer.getContainerId()) + .withCmd("sh", "-c", command) + .withAttachStdout(true) + .withAttachStderr(true) + .exec(); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + dockerClient + .execStartCmd(execCreateCmdResponse.getId()) + .exec(new ExecStartResultCallback(outputStream, System.err)) + .awaitCompletion(); + } + @AfterAll @Override public void tearDown() { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/multiple_fake_to_sftp_file_text_2.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/multiple_fake_to_sftp_file_text_append.conf similarity index 100% rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/multiple_fake_to_sftp_file_text_2.conf rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/multiple_fake_to_sftp_file_text_append.conf diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/multiple_fake_to_sftp_file_text.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/multiple_fake_to_sftp_file_text_recreate_schema.conf similarity index 100% rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/multiple_fake_to_sftp_file_text.conf rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/multiple_fake_to_sftp_file_text_recreate_schema.conf From 167c0ccd23691eadd1be70f7befa883c1bc618d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E7=8E=8B?= <58297137+chl-wxp@users.noreply.github.com> Date: Sun, 29 Sep 2024 14:36:44 +0800 Subject: [PATCH 5/5] [feature] sftp support multiple and save_mode 5 --- .../apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java index 9acb984e38c..0f9f7dc61d0 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java @@ -19,7 +19,6 @@ import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; -import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.container.TestContainerId; import org.apache.seatunnel.e2e.common.container.TestHelper; @@ -150,10 +149,6 @@ public void testSftpFileReadAndWrite(TestContainer container) } @TestTemplate - @DisabledOnContainer( - value = {}, - type = {EngineType.FLINK}, - disabledReason = "Flink dosen't support multi-table at now") public void testMultipleTableAndSaveMode(TestContainer container) throws IOException, InterruptedException { TestHelper helper = new TestHelper(container);