diff --git a/docs/en/connector-v2/source/CosFile.md b/docs/en/connector-v2/source/CosFile.md index 15b6de0c6f8..1cbda880139 100644 --- a/docs/en/connector-v2/source/CosFile.md +++ b/docs/en/connector-v2/source/CosFile.md @@ -343,6 +343,7 @@ The compress codec of archive files and the details that supported as the follow | ZIP | txt,json,excel,xml | .zip | | TAR | txt,json,excel,xml | .tar | | TAR_GZ | txt,json,excel,xml | .tar.gz | +| GZ | txt,json,xml | .gz | | NONE | all | .* | ### encoding [string] diff --git a/docs/en/connector-v2/source/FtpFile.md b/docs/en/connector-v2/source/FtpFile.md index 6d114813769..59f3852cb0d 100644 --- a/docs/en/connector-v2/source/FtpFile.md +++ b/docs/en/connector-v2/source/FtpFile.md @@ -328,6 +328,7 @@ The compress codec of archive files and the details that supported as the follow | ZIP | txt,json,excel,xml | .zip | | TAR | txt,json,excel,xml | .tar | | TAR_GZ | txt,json,excel,xml | .tar.gz | +| GZ | txt,json,xml | .gz | | NONE | all | .* | ### encoding [string] diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md index 405dfff820f..161b0e63183 100644 --- a/docs/en/connector-v2/source/HdfsFile.md +++ b/docs/en/connector-v2/source/HdfsFile.md @@ -144,6 +144,7 @@ The compress codec of archive files and the details that supported as the follow | ZIP | txt,json,excel,xml | .zip | | TAR | txt,json,excel,xml | .tar | | TAR_GZ | txt,json,excel,xml | .tar.gz | +| GZ | txt,json,xml | .gz | | NONE | all | .* | ### encoding [string] diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md index 65f287f057b..8923a031607 100644 --- a/docs/en/connector-v2/source/LocalFile.md +++ b/docs/en/connector-v2/source/LocalFile.md @@ -322,6 +322,7 @@ The compress codec of archive files and the details that supported as the follow | ZIP | txt,json,excel,xml | .zip | | TAR | txt,json,excel,xml | .tar | | TAR_GZ | txt,json,excel,xml | .tar.gz | +| GZ | txt,json,xml | .gz | | NONE | all | .* | ### encoding [string] @@ -490,4 +491,6 @@ sink { - [BugFix] Fix the bug of incorrect path in windows environment ([2980](https://github.com/apache/seatunnel/pull/2980)) - [Improve] Support extract partition from SeaTunnelRow fields ([3085](https://github.com/apache/seatunnel/pull/3085)) - [Improve] Support parse field from file path ([2985](https://github.com/apache/seatunnel/pull/2985)) +### 2.3.9-beta 2024-11-12 +- [Improve] Support parse field from file path ([8019](https://github.com/apache/seatunnel/issues/8019)) diff --git a/docs/en/connector-v2/source/OssJindoFile.md b/docs/en/connector-v2/source/OssJindoFile.md index 933439edc9f..1db5d62a441 100644 --- a/docs/en/connector-v2/source/OssJindoFile.md +++ b/docs/en/connector-v2/source/OssJindoFile.md @@ -335,6 +335,7 @@ The compress codec of archive files and the details that supported as the follow | ZIP | txt,json,excel,xml | .zip | | TAR | txt,json,excel,xml | .tar | | TAR_GZ | txt,json,excel,xml | .tar.gz | +| GZ | txt,json,xml | .gz | | NONE | all | .* | ### encoding [string] diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md index 4834b025bc3..ba4b71cfe93 100644 --- a/docs/en/connector-v2/source/S3File.md +++ b/docs/en/connector-v2/source/S3File.md @@ -299,6 +299,7 @@ The compress codec of archive files and the details that supported as the follow | ZIP | txt,json,excel,xml | .zip | | TAR | txt,json,excel,xml | .tar | | TAR_GZ | txt,json,excel,xml | .tar.gz | +| GZ | txt,json,xml | .gz | | NONE | all | .* | ### encoding [string] diff --git a/docs/en/connector-v2/source/SftpFile.md b/docs/en/connector-v2/source/SftpFile.md index 95c710110a0..8e80bedd4b3 100644 --- a/docs/en/connector-v2/source/SftpFile.md +++ b/docs/en/connector-v2/source/SftpFile.md @@ -235,11 +235,12 @@ The compress codec of files and the details that supported as the following show The compress codec of archive files and the details that supported as the following shown: | archive_compress_codec | file_format | archive_compress_suffix | -|------------------------|--------------------|-------------------------| -| ZIP | txt,json,excel,xml | .zip | -| TAR | txt,json,excel,xml | .tar | -| TAR_GZ | txt,json,excel,xml | .tar.gz | -| NONE | all | .* | +|--------------------|--------------------|---------------------| +| ZIP | txt,json,excel,xml | .zip | +| TAR | txt,json,excel,xml | .tar | +| TAR_GZ | txt,json,excel,xml | .tar.gz | +| GZ | txt,json,xml | .gz | +| NONE | all | .* | ### encoding [string] @@ -384,4 +385,4 @@ sink { Console { } } -``` \ No newline at end of file +``` diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/ArchiveCompressFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/ArchiveCompressFormat.java index da30887a824..b90990d4348 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/ArchiveCompressFormat.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/ArchiveCompressFormat.java @@ -35,6 +35,7 @@ public enum ArchiveCompressFormat { ZIP(".zip"), TAR(".tar"), TAR_GZ(".tar.gz"), + GZ(".gz"), ; private final String archiveCompressCodec; diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java index 00d90d84195..a1a99d32cac 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java @@ -238,6 +238,11 @@ protected void resolveArchiveCompressedInputStream( } } break; + case GZ: + GzipCompressorInputStream gzipIn = + new GzipCompressorInputStream(hadoopFileSystemProxy.getInputStream(path)); + readProcess(path, tableId, output, copyInputStream(gzipIn), partitionsMap, path); + break; case NONE: readProcess( path, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java index 0175f26f585..ed055a3a303 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java @@ -50,6 +50,8 @@ import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; @@ -149,6 +151,13 @@ public class LocalFileIT extends TestSuiteBase { "/seatunnel/read/tar_gz/txt/multifile/multiTarGz.tar.gz", container); + Path txtGz = + convertToGzFile( + Lists.newArrayList(ContainerUtil.getResourcesFile("/text/e2e.txt")), + "e2e-txt-gz"); + ContainerUtil.copyFileIntoContainers( + txtGz, "/seatunnel/read/gz/txt/single/e2e-txt-gz.gz", container); + Path jsonZip = convertToZipFile( Lists.newArrayList( @@ -168,6 +177,14 @@ public class LocalFileIT extends TestSuiteBase { "/seatunnel/read/zip/json/multifile/multiJson.zip", container); + Path jsonGz = + convertToGzFile( + Lists.newArrayList( + ContainerUtil.getResourcesFile("/json/e2e.json")), + "e2e-json-gz"); + ContainerUtil.copyFileIntoContainers( + jsonGz, "/seatunnel/read/gz/json/single/e2e-json-gz.gz", container); + ContainerUtil.copyFileIntoContainers( "/text/e2e_gbk.txt", "/seatunnel/read/encoding/text/e2e_gbk.txt", @@ -193,6 +210,13 @@ public class LocalFileIT extends TestSuiteBase { ContainerUtil.copyFileIntoContainers( xmlZip, "/seatunnel/read/zip/xml/single/e2e-xml.zip", container); + Path xmlGz = + convertToGzFile( + Lists.newArrayList(ContainerUtil.getResourcesFile("/xml/e2e.xml")), + "e2e-xml-gz"); + ContainerUtil.copyFileIntoContainers( + xmlGz, "/seatunnel/read/gz/xml/single/e2e-xml-gz.gz", container); + Path txtLzo = convertToLzoFile(ContainerUtil.getResourcesFile("/text/e2e.txt")); ContainerUtil.copyFileIntoContainers( txtLzo, "/seatunnel/read/lzo_text/e2e.txt", container); @@ -313,6 +337,7 @@ public void testLocalFileReadAndWrite(TestContainer container) /** Compressed file test */ // test read single local text file with zip compression helper.execute("/text/local_file_zip_text_to_assert.conf"); + helper.execute("/text/local_file_gz_text_to_assert.conf"); // test read multi local text file with zip compression helper.execute("/text/local_file_multi_zip_text_to_assert.conf"); // test read single local text file with tar compression @@ -325,10 +350,12 @@ public void testLocalFileReadAndWrite(TestContainer container) helper.execute("/text/local_file_multi_tar_gz_text_to_assert.conf"); // test read single local json file with zip compression helper.execute("/json/local_file_json_zip_to_assert.conf"); + helper.execute("/json/local_file_json_gz_to_assert.conf"); // test read multi local json file with zip compression helper.execute("/json/local_file_json_multi_zip_to_assert.conf"); // test read single local xml file with zip compression helper.execute("/xml/local_file_zip_xml_to_assert.conf"); + helper.execute("/xml/local_file_gz_xml_to_assert.conf"); // test read single local excel file with zip compression helper.execute("/excel/local_excel_zip_to_assert.conf"); // test read multi local excel file with zip compression @@ -551,4 +578,29 @@ public FileVisitResult visitFile( return tarGzFilePath; } + + public Path convertToGzFile(List files, String name) throws IOException { + if (files == null || files.isEmpty()) { + throw new IllegalArgumentException("File list is empty or invalid"); + } + + File firstFile = files.get(0); + Path gzFilePath = Paths.get(firstFile.getParent(), String.format("%s.gz", name)); + + try (FileInputStream fis = new FileInputStream(firstFile); + FileOutputStream fos = new FileOutputStream(gzFilePath.toFile()); + GZIPOutputStream gzos = new GZIPOutputStream(fos)) { + + byte[] buffer = new byte[2048]; + int length; + + while ((length = fis.read(buffer)) > 0) { + gzos.write(buffer, 0, length); + } + gzos.finish(); + } catch (IOException e) { + e.printStackTrace(); + } + return gzFilePath; + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_gz_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_gz_to_assert.conf new file mode 100644 index 00000000000..0433aa5f5cb --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_gz_to_assert.conf @@ -0,0 +1,117 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + LocalFile { + path = "/seatunnel/read/gz/json/single/e2e-json-gz.gz" + file_format_type = "json" + archive_compress_codec = "gz" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + C_MAP = "map" + C_ARRAY = "array" + C_STRING = string + C_BOOLEAN = boolean + C_TINYINT = tinyint + C_SMALLINT = smallint + C_INT = int + C_BIGINT = bigint + C_FLOAT = float + C_DOUBLE = double + C_BYTES = bytes + C_DATE = date + C_DECIMAL = "decimal(38, 18)" + C_TIMESTAMP = timestamp + } + } + } + result_table_name = "fake" + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + }, + { + rule_type = MIN_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_gz_text_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_gz_text_to_assert.conf new file mode 100644 index 00000000000..d4f71e9901c --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_gz_text_to_assert.conf @@ -0,0 +1,117 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + LocalFile { + path = "/seatunnel/read/gz/txt/single/e2e-txt-gz.gz" + file_format_type = "text" + archive_compress_codec = "gz" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + result_table_name = "fake" + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + }, + { + rule_type = MIN_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/xml/local_file_gz_xml_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/xml/local_file_gz_xml_to_assert.conf new file mode 100644 index 00000000000..2a216a18ee5 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/xml/local_file_gz_xml_to_assert.conf @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + LocalFile { + path = "/seatunnel/read/gz/xml/single/e2e-xml-gz.gz" + file_format_type = "xml" + archive_compress_codec = "gz" + xml_row_tag = "RECORD" + xml_use_attr_format = true + schema = { + fields { + c_bytes = "tinyint" + c_short = "smallint" + c_int = "int" + c_bigint = "bigint" + c_string = "string" + c_double = "double" + c_float = "float" + c_decimal = "decimal(10, 2)" + c_boolean = "boolean" + c_map = "map" + c_array = "array" + c_date = "date" + c_datetime = "timestamp" + c_time = "time" + } + } + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 1 + }, + { + rule_type = MIN_ROW + rule_value = 1 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +}