diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java index 56d21688d8c..8c5d7c5475e 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java @@ -18,11 +18,25 @@ package org.apache.seatunnel.common.utils; import java.time.Instant; +import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.LocalTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.format.SignStyle; +import java.time.temporal.TemporalAccessor; +import java.time.temporal.TemporalQueries; import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import static java.time.temporal.ChronoField.DAY_OF_MONTH; +import static java.time.temporal.ChronoField.MONTH_OF_YEAR; +import static java.time.temporal.ChronoField.YEAR; public class DateTimeUtils { @@ -48,6 +62,162 @@ public class DateTimeUtils { FORMATTER_MAP.put( Formatter.YYYY_MM_DD_HH_MM_SS_ISO8601, DateTimeFormatter.ofPattern(Formatter.YYYY_MM_DD_HH_MM_SS_ISO8601.value)); + FORMATTER_MAP.put( + Formatter.YYYY_MM_DD_HH_MM_SS_SSS_ISO8601, + DateTimeFormatter.ofPattern(Formatter.YYYY_MM_DD_HH_MM_SS_SSS_ISO8601.value)); + FORMATTER_MAP.put( + Formatter.YYYY_MM_DD_HH_MM_SS_SSSSSS_ISO8601, + DateTimeFormatter.ofPattern(Formatter.YYYY_MM_DD_HH_MM_SS_SSSSSS_ISO8601.value)); + FORMATTER_MAP.put( + Formatter.YYYY_MM_DD_HH_MM_SS_SSSSSSSSS_ISO8601, + DateTimeFormatter.ofPattern(Formatter.YYYY_MM_DD_HH_MM_SS_SSSSSSSSS_ISO8601.value)); + } + + // if the datatime string length is 19, find the DateTimeFormatter from this map + public static final Map YYYY_MM_DD_HH_MM_SS_19_FORMATTER_MAP = + new LinkedHashMap<>(); + public static Set> + YYYY_MM_DD_HH_MM_SS_19_FORMATTER_MAP_ENTRY_SET = new LinkedHashSet<>(); + + // if the datatime string length bigger than 19, find the DateTimeFormatter from this map + public static final Map YYYY_MM_DD_HH_MM_SS_M19_FORMATTER_MAP = + new LinkedHashMap<>(); + public static Set> + YYYY_MM_DD_HH_MM_SS_M19_FORMATTER_MAP_ENTRY_SET = new LinkedHashSet<>(); + + // if the datatime string length is 14, use this formatter + public static final DateTimeFormatter YYYY_MM_DD_HH_MM_SS_14_FORMATTER = + DateTimeFormatter.ofPattern(Formatter.YYYY_MM_DD_HH_MM_SS_NO_SPLIT.value); + + static { + YYYY_MM_DD_HH_MM_SS_19_FORMATTER_MAP.put( + Pattern.compile("\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2}"), + DateTimeFormatter.ofPattern(Formatter.YYYY_MM_DD_HH_MM_SS.value)); + + YYYY_MM_DD_HH_MM_SS_M19_FORMATTER_MAP.put( + Pattern.compile("\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}.*"), + new DateTimeFormatterBuilder() + .parseCaseInsensitive() + .append(DateTimeFormatter.ISO_LOCAL_DATE) + .appendLiteral(' ') + .append(DateTimeFormatter.ISO_LOCAL_TIME) + .toFormatter()); + + YYYY_MM_DD_HH_MM_SS_19_FORMATTER_MAP.put( + Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}"), + DateTimeFormatter.ofPattern(Formatter.YYYY_MM_DD_HH_MM_SS_ISO8601.value)); + + YYYY_MM_DD_HH_MM_SS_M19_FORMATTER_MAP.put( + Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}.*"), + DateTimeFormatter.ISO_LOCAL_DATE_TIME); + + YYYY_MM_DD_HH_MM_SS_19_FORMATTER_MAP.put( + Pattern.compile("\\d{4}/\\d{2}/\\d{2}\\s\\d{2}:\\d{2}:\\d{2}"), + DateTimeFormatter.ofPattern(Formatter.YYYY_MM_DD_HH_MM_SS_SLASH.value)); + + YYYY_MM_DD_HH_MM_SS_M19_FORMATTER_MAP.put( + Pattern.compile("\\d{4}/\\d{2}/\\d{2}\\s\\d{2}:\\d{2}.*"), + new DateTimeFormatterBuilder() + .parseCaseInsensitive() + .append( + new DateTimeFormatterBuilder() + .appendValue(YEAR, 4, 10, SignStyle.EXCEEDS_PAD) + .appendLiteral('/') + .appendValue(MONTH_OF_YEAR, 2) + .appendLiteral('/') + .appendValue(DAY_OF_MONTH, 2) + .toFormatter()) + .appendLiteral(' ') + .append(DateTimeFormatter.ISO_LOCAL_TIME) + .toFormatter()); + + YYYY_MM_DD_HH_MM_SS_19_FORMATTER_MAP.put( + Pattern.compile("\\d{4}\\.\\d{2}\\.\\d{2}\\s\\d{2}:\\d{2}:\\d{2}"), + DateTimeFormatter.ofPattern(Formatter.YYYY_MM_DD_HH_MM_SS_SPOT.value)); + + YYYY_MM_DD_HH_MM_SS_M19_FORMATTER_MAP.put( + Pattern.compile("\\d{4}\\.\\d{2}\\.\\d{2}\\s\\d{2}:\\d{2}.*"), + new DateTimeFormatterBuilder() + .parseCaseInsensitive() + .append( + new DateTimeFormatterBuilder() + .appendValue(YEAR, 4, 10, SignStyle.EXCEEDS_PAD) + .appendLiteral('.') + .appendValue(MONTH_OF_YEAR, 2) + .appendLiteral('.') + .appendValue(DAY_OF_MONTH, 2) + .toFormatter()) + .appendLiteral(' ') + .append(DateTimeFormatter.ISO_LOCAL_TIME) + .toFormatter()); + + YYYY_MM_DD_HH_MM_SS_M19_FORMATTER_MAP.put( + Pattern.compile("\\d{4}年\\d{2}月\\d{2}日\\s\\d{2}时\\d{2}分\\d{2}秒"), + DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH时mm分ss秒")); + + YYYY_MM_DD_HH_MM_SS_19_FORMATTER_MAP_ENTRY_SET.addAll( + YYYY_MM_DD_HH_MM_SS_19_FORMATTER_MAP.entrySet()); + YYYY_MM_DD_HH_MM_SS_M19_FORMATTER_MAP_ENTRY_SET.addAll( + YYYY_MM_DD_HH_MM_SS_M19_FORMATTER_MAP.entrySet()); + } + + /** + * gave a datetime string and return the {@link DateTimeFormatter} which can be used to parse + * it. + * + * @param dateTime eg: 2020-02-03 12:12:10.101 + * @return the DateTimeFormatter matched, will return null when not matched any pattern + */ + public static DateTimeFormatter matchDateTimeFormatter(String dateTime) { + if (dateTime.length() == 19) { + for (Map.Entry entry : + YYYY_MM_DD_HH_MM_SS_19_FORMATTER_MAP_ENTRY_SET) { + if (entry.getKey().matcher(dateTime).matches()) { + return entry.getValue(); + } + } + } else if (dateTime.length() > 19) { + for (Map.Entry entry : + YYYY_MM_DD_HH_MM_SS_M19_FORMATTER_MAP_ENTRY_SET) { + if (entry.getKey().matcher(dateTime).matches()) { + return entry.getValue(); + } + } + } else if (dateTime.length() == 14) { + return YYYY_MM_DD_HH_MM_SS_14_FORMATTER; + } + return null; + } + + public static LocalDateTime parse(String dateTime, DateTimeFormatter dateTimeFormatter) { + TemporalAccessor parsedTimestamp = dateTimeFormatter.parse(dateTime); + LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime()); + LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate()); + return LocalDateTime.of(localDate, localTime); + } + + /** + * gave a datetime string and return {@link LocalDateTime} + * + *

Due to the need to determine the rules of the formatter through regular expressions, there + * will be a certain performance loss. When tested on 8c16g macos, the most significant + * performance decrease compared to directly passing the formatter is + * 'Pattern.compile("\\d{4}\\.\\d{2}\\.\\d{2}\\s\\d{2}:\\d{2}.*")' has increased from 4.5 + * seconds to 10 seconds in a scenario where 1000w calculations are performed. + * + *

Analysis shows that there are two main reasons: one is that the regular expression + * position in the map is 4, before this, three regular expression matches are required. + * + *

Another reason is to support the length of non fixed millisecond bits (minimum 0, maximum + * 9), we used {@link DateTimeFormatter#ISO_LOCAL_TIME}, which also increases the time for time + * conversion. + * + * @param dateTime eg: 2020-02-03 12:12:10.101 + * @return {@link LocalDateTime} + */ + public static LocalDateTime parse(String dateTime) { + DateTimeFormatter dateTimeFormatter = matchDateTimeFormatter(dateTime); + return LocalDateTime.parse(dateTime, dateTimeFormatter); } public static LocalDateTime parse(String dateTime, Formatter formatter) { @@ -78,7 +248,10 @@ public enum Formatter { YYYY_MM_DD_HH_MM_SS_SPOT("yyyy.MM.dd HH:mm:ss"), YYYY_MM_DD_HH_MM_SS_SLASH("yyyy/MM/dd HH:mm:ss"), YYYY_MM_DD_HH_MM_SS_NO_SPLIT("yyyyMMddHHmmss"), - YYYY_MM_DD_HH_MM_SS_ISO8601("yyyy-MM-dd'T'HH:mm:ss"); + YYYY_MM_DD_HH_MM_SS_ISO8601("yyyy-MM-dd'T'HH:mm:ss"), + YYYY_MM_DD_HH_MM_SS_SSS_ISO8601("yyyy-MM-dd'T'HH:mm:ss.SSS"), + YYYY_MM_DD_HH_MM_SS_SSSSSS_ISO8601("yyyy-MM-dd'T'HH:mm:ss.SSSSSS"), + YYYY_MM_DD_HH_MM_SS_SSSSSSSSS_ISO8601("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS"); private final String value; diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateUtils.java index 1895201c4ed..1f2d7f6fc58 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateUtils.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateUtils.java @@ -19,8 +19,15 @@ import java.time.LocalDate; import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.format.SignStyle; import java.util.HashMap; import java.util.Map; +import java.util.regex.Pattern; + +import static java.time.temporal.ChronoField.DAY_OF_MONTH; +import static java.time.temporal.ChronoField.MONTH_OF_YEAR; +import static java.time.temporal.ChronoField.YEAR; public class DateUtils { private static final Map FORMATTER_MAP = new HashMap<>(); @@ -36,6 +43,106 @@ public class DateUtils { DateTimeFormatter.ofPattern(Formatter.YYYY_MM_DD_SLASH.value)); } + public static final Pattern[] PATTERN_ARRAY = + new Pattern[] { + Pattern.compile("\\d{4}-\\d{2}-\\d{2}"), + Pattern.compile("\\d{4}年\\d{2}月\\d{2}日"), + Pattern.compile("\\d{4}/\\d{2}/\\d{2}"), + Pattern.compile("\\d{4}\\.\\d{2}\\.\\d{2}"), + Pattern.compile("\\d{8}") + }; + + public static final Map DATE_FORMATTER_MAP = new HashMap(); + + static { + DATE_FORMATTER_MAP.put( + PATTERN_ARRAY[0], + new DateTimeFormatterBuilder() + .parseCaseInsensitive() + .append(DateTimeFormatter.ISO_LOCAL_DATE) + .toFormatter()); + + DATE_FORMATTER_MAP.put( + PATTERN_ARRAY[1], + new DateTimeFormatterBuilder() + .parseCaseInsensitive() + .append( + new DateTimeFormatterBuilder() + .appendValue(YEAR, 4, 10, SignStyle.EXCEEDS_PAD) + .appendLiteral("年") + .appendValue(MONTH_OF_YEAR, 2) + .appendLiteral("月") + .appendValue(DAY_OF_MONTH, 2) + .appendLiteral("日") + .toFormatter()) + .toFormatter()); + + DATE_FORMATTER_MAP.put( + PATTERN_ARRAY[2], + new DateTimeFormatterBuilder() + .parseCaseInsensitive() + .append( + new DateTimeFormatterBuilder() + .appendValue(YEAR, 4, 10, SignStyle.EXCEEDS_PAD) + .appendLiteral('/') + .appendValue(MONTH_OF_YEAR, 2) + .appendLiteral('/') + .appendValue(DAY_OF_MONTH, 2) + .toFormatter()) + .toFormatter()); + + DATE_FORMATTER_MAP.put( + PATTERN_ARRAY[3], + new DateTimeFormatterBuilder() + .parseCaseInsensitive() + .append( + new DateTimeFormatterBuilder() + .appendValue(YEAR, 4, 10, SignStyle.EXCEEDS_PAD) + .appendLiteral('.') + .appendValue(MONTH_OF_YEAR, 2) + .appendLiteral('.') + .appendValue(DAY_OF_MONTH, 2) + .toFormatter()) + .toFormatter()); + + DATE_FORMATTER_MAP.put( + PATTERN_ARRAY[4], + new DateTimeFormatterBuilder() + .parseCaseInsensitive() + .append( + new DateTimeFormatterBuilder() + .appendValue(YEAR, 4, 10, SignStyle.EXCEEDS_PAD) + .appendValue(MONTH_OF_YEAR, 2) + .appendValue(DAY_OF_MONTH, 2) + .toFormatter()) + .toFormatter()); + } + + /** + * gave a date string and return the {@link DateTimeFormatter} which can be used to parse it. + * + * @param dateTime eg: 2020-02-03 + * @return the DateTimeFormatter matched, will return null when not matched any pattern in + * {@link #PATTERN_ARRAY} + */ + public static DateTimeFormatter matchDateFormatter(String dateTime) { + for (int j = 0; j < PATTERN_ARRAY.length; j++) { + if (PATTERN_ARRAY[j].matcher(dateTime).matches()) { + return DATE_FORMATTER_MAP.get(PATTERN_ARRAY[j]); + } + } + return null; + } + + public static LocalDate parse(String date) { + DateTimeFormatter dateTimeFormatter = matchDateFormatter(date); + return parse(date, dateTimeFormatter); + } + + public static LocalDate parse(String date, DateTimeFormatter dateTimeFormatter) { + return LocalDate.parse(date, dateTimeFormatter); + } + public static LocalDate parse(String date, Formatter formatter) { return LocalDate.parse(date, FORMATTER_MAP.get(formatter)); } diff --git a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateTimeUtilsTest.java b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateTimeUtilsTest.java index ef1f971ce5a..d4ee0462de4 100644 --- a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateTimeUtilsTest.java +++ b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateTimeUtilsTest.java @@ -24,6 +24,7 @@ import java.time.LocalDateTime; import java.time.ZoneId; +import java.time.format.DateTimeFormatter; public class DateTimeUtilsTest { @@ -54,4 +55,170 @@ public void testParseTimestamp() { Assertions.assertEquals(2023, parse.getYear()); Assertions.assertEquals(22, parse.getDayOfMonth()); } + + @Test + public void testAutoDateTimeFormatter() { + String datetimeStr = "2020-10-10 10:10:10"; + Assertions.assertEquals("2020-10-10T10:10:10", DateTimeUtils.parse(datetimeStr).toString()); + + datetimeStr = "2020-10-10T10:10:10"; + Assertions.assertEquals("2020-10-10T10:10:10", DateTimeUtils.parse(datetimeStr).toString()); + + datetimeStr = "2020/10/10 10:10:10"; + Assertions.assertEquals("2020-10-10T10:10:10", DateTimeUtils.parse(datetimeStr).toString()); + + datetimeStr = "2020年10月10日 10时10分10秒"; + Assertions.assertEquals("2020-10-10T10:10:10", DateTimeUtils.parse(datetimeStr).toString()); + + datetimeStr = "2020.10.10 10:10:10"; + Assertions.assertEquals("2020-10-10T10:10:10", DateTimeUtils.parse(datetimeStr).toString()); + + datetimeStr = "20201010101010"; + Assertions.assertEquals("2020-10-10T10:10:10", DateTimeUtils.parse(datetimeStr).toString()); + + datetimeStr = "2020-10-10 10:10:10.201"; + Assertions.assertEquals( + "2020-10-10T10:10:10.201", DateTimeUtils.parse(datetimeStr).toString()); + + datetimeStr = "2020-10-10 10:10:10.201111"; + Assertions.assertEquals( + "2020-10-10T10:10:10.201111", DateTimeUtils.parse(datetimeStr).toString()); + + datetimeStr = "2020-10-10 10:10:10.201111001"; + Assertions.assertEquals( + "2020-10-10T10:10:10.201111001", DateTimeUtils.parse(datetimeStr).toString()); + } + + @Test + public void testMatchDateTimeFormatter() { + String datetimeStr = "2020-10-10 10:10:10"; + Assertions.assertEquals( + "2020-10-10T10:10:10", + DateTimeUtils.parse(datetimeStr, DateTimeUtils.matchDateTimeFormatter(datetimeStr)) + .toString()); + + datetimeStr = "2020-10-10T10:10:10"; + Assertions.assertEquals( + "2020-10-10T10:10:10", + DateTimeUtils.parse(datetimeStr, DateTimeUtils.matchDateTimeFormatter(datetimeStr)) + .toString()); + + datetimeStr = "2020/10/10 10:10:10"; + Assertions.assertEquals( + "2020-10-10T10:10:10", + DateTimeUtils.parse(datetimeStr, DateTimeUtils.matchDateTimeFormatter(datetimeStr)) + .toString()); + + datetimeStr = "2020年10月10日 10时10分10秒"; + Assertions.assertEquals( + "2020-10-10T10:10:10", + DateTimeUtils.parse(datetimeStr, DateTimeUtils.matchDateTimeFormatter(datetimeStr)) + .toString()); + + datetimeStr = "2020.10.10 10:10:10"; + Assertions.assertEquals( + "2020-10-10T10:10:10", + DateTimeUtils.parse(datetimeStr, DateTimeUtils.matchDateTimeFormatter(datetimeStr)) + .toString()); + + datetimeStr = "20201010101010"; + Assertions.assertEquals( + "2020-10-10T10:10:10", + DateTimeUtils.parse(datetimeStr, DateTimeUtils.matchDateTimeFormatter(datetimeStr)) + .toString()); + + datetimeStr = "2020-10-10 10:10:10.201"; + Assertions.assertEquals( + "2020-10-10T10:10:10.201", + DateTimeUtils.parse(datetimeStr, DateTimeUtils.matchDateTimeFormatter(datetimeStr)) + .toString()); + + datetimeStr = "2020-10-10 10:10:10.201111"; + Assertions.assertEquals( + "2020-10-10T10:10:10.201111", + DateTimeUtils.parse(datetimeStr, DateTimeUtils.matchDateTimeFormatter(datetimeStr)) + .toString()); + + datetimeStr = "2020-10-10 10:10:10.201111001"; + Assertions.assertEquals( + "2020-10-10T10:10:10.201111001", + DateTimeUtils.parse(datetimeStr, DateTimeUtils.matchDateTimeFormatter(datetimeStr)) + .toString()); + } + + @Test + public void testPerformance() { + String datetimeStr = "2020-10-10 10:10:10"; + DateTimeFormatter dateTimeFormatter = DateTimeUtils.matchDateTimeFormatter(datetimeStr); + String datetimeStr1 = "20201010101010"; + DateTimeFormatter dateTimeFormatter1 = DateTimeUtils.matchDateTimeFormatter(datetimeStr1); + String datetimeStr2 = "2020.10.10 10:10:10.100"; + DateTimeFormatter dateTimeFormatter2 = DateTimeUtils.matchDateTimeFormatter(datetimeStr2); + String datetimeStr3 = "2020.10.10 10:10:10"; + DateTimeFormatter dateTimeFormatter3 = DateTimeUtils.matchDateTimeFormatter(datetimeStr3); + long t1 = System.currentTimeMillis(); + for (int i = 0; i < 10000000; i++) { + DateTimeUtils.parse(datetimeStr, dateTimeFormatter); + } + long t2 = System.currentTimeMillis(); + // Use an explicit time format 'yyyy-MM-dd HH:mm:ss' for processing, use time: 4552ms + System.out.println((t2 - t1) + ""); + + for (int i = 0; i < 10000000; i++) { + DateTimeUtils.parse(datetimeStr); + } + long t3 = System.currentTimeMillis(); + // If format is not specified, the system automatically obtains the format 'yyyy-MM-dd + // HH:mm:ss' for processing, use time: 6082ms + System.out.println((t3 - t2) + ""); + + long t4 = System.currentTimeMillis(); + for (int i = 0; i < 10000000; i++) { + DateTimeUtils.parse(datetimeStr1, dateTimeFormatter1); + } + long t5 = System.currentTimeMillis(); + // Use an explicit time format 'yyyyMMddHHmmss' for processing, use time: 4610ms + System.out.println((t5 - t4) + ""); + + for (int i = 0; i < 10000000; i++) { + DateTimeUtils.parse(datetimeStr1); + } + long t6 = System.currentTimeMillis(); + // If format is not specified, the system automatically obtains the format 'yyyyMMddHHmmss' + // for processing, use time: 4842ms + + System.out.println((t6 - t5) + ""); + + long t7 = System.currentTimeMillis(); + for (int i = 0; i < 10000000; i++) { + DateTimeUtils.parse(datetimeStr2, dateTimeFormatter2); + } + long t8 = System.currentTimeMillis(); + // Use an explicit time format 'yyyy.MM.dd HH:mm:ss.SSS' for processing, use time: 8162ms + System.out.println((t8 - t7) + ""); + + for (int i = 0; i < 10000000; i++) { + DateTimeUtils.parse(datetimeStr2); + } + long t9 = System.currentTimeMillis(); + // If format is not specified, the system automatically obtains the format 'yyyy.MM.dd + // HH:mm:ss.SSS' for processing, use time: 11366ms + System.out.println((t9 - t8) + ""); + + long t10 = System.currentTimeMillis(); + for (int i = 0; i < 10000000; i++) { + DateTimeUtils.parse(datetimeStr3, dateTimeFormatter3); + } + long t11 = System.currentTimeMillis(); + // Use an explicit time format 'yyyy.MM.dd HH:mm:ss' for processing, use time: 4405ms + System.out.println((t11 - t10) + ""); + + for (int i = 0; i < 10000000; i++) { + DateTimeUtils.parse(datetimeStr3); + } + long t12 = System.currentTimeMillis(); + // If format is not specified, the system automatically obtains the format 'yyyy.MM.dd + // HH:mm:ss' for processing, use time: 7771ms + System.out.println((t12 - t11) + ""); + } } diff --git a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateUtilsTest.java b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateUtilsTest.java new file mode 100644 index 00000000000..b7426c9d16f --- /dev/null +++ b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateUtilsTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.common.utils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class DateUtilsTest { + + @Test + public void testAutoDateFormatter() { + String datetimeStr = "2020-10-10"; + Assertions.assertEquals("2020-10-10", DateUtils.parse(datetimeStr).toString()); + + datetimeStr = "2020年10月10日"; + Assertions.assertEquals("2020-10-10", DateUtils.parse(datetimeStr).toString()); + + datetimeStr = "2020/10/10"; + Assertions.assertEquals("2020-10-10", DateUtils.parse(datetimeStr).toString()); + + datetimeStr = "2020.10.10"; + Assertions.assertEquals("2020-10-10", DateUtils.parse(datetimeStr).toString()); + + datetimeStr = "20201010"; + Assertions.assertEquals("2020-10-10", DateUtils.parse(datetimeStr).toString()); + } + + @Test + public void testMatchDateTimeFormatter() { + String datetimeStr = "2020-10-10"; + Assertions.assertEquals( + "2020-10-10", + DateUtils.parse(datetimeStr, DateUtils.matchDateFormatter(datetimeStr)).toString()); + + datetimeStr = "2020年10月10日"; + Assertions.assertEquals( + "2020-10-10", + DateUtils.parse(datetimeStr, DateUtils.matchDateFormatter(datetimeStr)).toString()); + + datetimeStr = "2020/10/10"; + Assertions.assertEquals( + "2020-10-10", + DateUtils.parse(datetimeStr, DateUtils.matchDateFormatter(datetimeStr)).toString()); + + datetimeStr = "2020.10.10"; + Assertions.assertEquals( + "2020-10-10", + DateUtils.parse(datetimeStr, DateUtils.matchDateFormatter(datetimeStr)).toString()); + + datetimeStr = "20201010"; + Assertions.assertEquals( + "2020-10-10", + DateUtils.parse(datetimeStr, DateUtils.matchDateFormatter(datetimeStr)).toString()); + } +} diff --git a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java index bc73dbf1511..d60fb46c8c9 100644 --- a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java +++ b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java @@ -76,12 +76,12 @@ private boolean pass( SeaTunnelDataType type = rowType.getFieldType(index); Object value = rowData.getField(index); - + String fieldName = rowType.getFieldName(index); Boolean typeChecked = checkType(value, assertFieldRule.getFieldType()); if (Boolean.FALSE.equals(typeChecked)) { return Boolean.FALSE; } - Boolean valueChecked = checkValue(value, type, assertFieldRule.getFieldRules()); + Boolean valueChecked = checkValue(value, type, assertFieldRule.getFieldRules(), fieldName); if (Boolean.FALSE.equals(valueChecked)) { return Boolean.FALSE; } @@ -91,10 +91,11 @@ private boolean pass( private Boolean checkValue( Object value, SeaTunnelDataType type, - List fieldValueRules) { + List fieldValueRules, + String fieldName) { Optional failValueRule = fieldValueRules.stream() - .filter(valueRule -> !pass(value, type, valueRule)) + .filter(valueRule -> !pass(value, type, valueRule, fieldName)) .findFirst(); if (failValueRule.isPresent()) { return Boolean.FALSE; @@ -104,7 +105,10 @@ private Boolean checkValue( } private boolean pass( - Object value, SeaTunnelDataType type, AssertFieldRule.AssertRule valueRule) { + Object value, + SeaTunnelDataType type, + AssertFieldRule.AssertRule valueRule, + String fieldName) { AssertFieldRule.AssertRuleType ruleType = valueRule.getRuleType(); boolean isPass = true; if (ruleType != null) { @@ -112,7 +116,7 @@ private boolean pass( } if (Objects.nonNull(value) && valueRule.getEqualTo() != null) { - isPass = isPass && compareValue(value, type, valueRule); + isPass = isPass && compareValue(value, type, valueRule, fieldName); } return isPass; } @@ -156,17 +160,21 @@ private boolean checkAssertRule( } private boolean compareValue( - Object value, SeaTunnelDataType type, AssertFieldRule.AssertRule valueRule) { + Object value, + SeaTunnelDataType type, + AssertFieldRule.AssertRule valueRule, + String fieldName) { Object config = valueRule.getEqualTo(); String confJsonStr = JsonUtils.toJsonString(config); JsonToRowConverters converters = new JsonToRowConverters(true, false); - JsonToRowConverters.JsonToRowConverter converter = converters.createConverter(type); + JsonToRowConverters.JsonToObjectConverter converter = converters.createConverter(type); Object confValue; try { confValue = - converter.convert(JsonUtils.stringToJsonNode(JsonUtils.toJsonString(config))); + converter.convert( + JsonUtils.stringToJsonNode(JsonUtils.toJsonString(config)), fieldName); } catch (IOException e) { throw CommonError.jsonOperationError("Assert", confJsonStr, e); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java index fa33f45ad69..bd6809aad1c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java @@ -147,9 +147,6 @@ public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) { TextDeserializationSchema.Builder builder = TextDeserializationSchema.builder() .delimiter(TextFormatConstant.PLACEHOLDER) - .dateFormatter(dateFormat) - .dateTimeFormatter(datetimeFormat) - .timeFormatter(timeFormat) .textLineSplitor(textLineSplitor); if (isMergePartition) { deserializationSchema = @@ -187,9 +184,6 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { TextDeserializationSchema.Builder builder = TextDeserializationSchema.builder() .delimiter(fieldDelimiter) - .dateFormatter(dateFormat) - .dateTimeFormatter(datetimeFormat) - .timeFormatter(timeFormat) .textLineSplitor(textLineSplitor); if (isMergePartition) { deserializationSchema = diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/exception/HttpConnectorErrorCode.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/exception/HttpConnectorErrorCode.java index c12180f20a7..bbea694ce91 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/exception/HttpConnectorErrorCode.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/exception/HttpConnectorErrorCode.java @@ -20,7 +20,8 @@ import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; public enum HttpConnectorErrorCode implements SeaTunnelErrorCode { - FIELD_DATA_IS_INCONSISTENT("HTTP-01", "The field data is inconsistent"); + FIELD_DATA_IS_INCONSISTENT("HTTP-01", "The field data is inconsistent"), + REQUEST_FAILED("HTTP-02", "The request is failed"); private final String code; private final String description; diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java index 34402e300ef..4e3f3ab1a59 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java @@ -73,6 +73,7 @@ public HttpSource(Config pluginConfig) { "PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SOURCE, result.getMsg())); } + this.httpParameter.buildWithConfig(pluginConfig); buildSchemaWithConfig(pluginConfig); buildPagingWithConfig(pluginConfig); @@ -94,9 +95,6 @@ private void buildPagingWithConfig(Config pluginConfig) { if (pluginConfig.hasPath(HttpConfig.PAGEING.key())) { pageInfo = new PageInfo(); Config pageConfig = pluginConfig.getConfig(HttpConfig.PAGEING.key()); - if (pageConfig.hasPath(HttpConfig.TOTAL_PAGE_SIZE.key())) { - pageInfo.setTotalPageSize(pageConfig.getLong(HttpConfig.TOTAL_PAGE_SIZE.key())); - } if (pageConfig.hasPath(HttpConfig.TOTAL_PAGE_SIZE.key())) { pageInfo.setTotalPageSize(pageConfig.getLong(HttpConfig.TOTAL_PAGE_SIZE.key())); } else { diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java index 0bd0185e81b..db0d36d9b63 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java @@ -136,10 +136,11 @@ public void pollAndCollectData(Collector output) throws Exception response.getCode(), response.getContent()); } else { - log.error( - "http client execute exception, http response status code:[{}], content:[{}]", - response.getCode(), - response.getContent()); + String msg = + String.format( + "http client execute exception, http response status code:[%s], content:[%s]", + response.getCode(), response.getContent()); + throw new HttpConnectorException(HttpConnectorErrorCode.REQUEST_FAILED, msg); } } @@ -166,12 +167,11 @@ public void internalPollNext(Collector output) throws Exception { updateRequestParam(info); pollAndCollectData(output); pageIndex += 1; + Thread.sleep(10); } } else { pollAndCollectData(output); } - } catch (Exception e) { - log.error(e.getMessage(), e); } finally { if (Boundedness.BOUNDED.equals(context.getBoundedness()) && noMoreElementFlag) { // signal to the source that we have reached the end of the data. @@ -189,7 +189,7 @@ private void collect(Collector output, String data) throws IOExcep if (contentJson != null) { data = JsonUtils.stringToJsonNode(getPartOfJson(data)).toString(); } - if (jsonField != null) { + if (jsonField != null && contentJson == null) { this.initJsonPath(jsonField); data = JsonUtils.toJsonNode(parseToMap(decodeJSON(data), jsonField)).toString(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_json_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_json_to_assert.conf index 217373b8c7a..46fa6e6f42d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_json_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_json_to_assert.conf @@ -26,6 +26,9 @@ source { url = "http://mockserver:1080/example/http" method = "GET" format = "json" + date_format="yyyy-MM-dd" + datetime_format="yyyy-MM-dd'T'HH:mm:ss" + time_format="HH:mm:ss" schema = { fields { c_map = "map" diff --git a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java index 45072c32dcd..4d00725dd50 100644 --- a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java +++ b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java @@ -63,7 +63,7 @@ public class CompatibleKafkaConnectDeserializationSchema private transient Method keyConverterMethod; private transient Method valueConverterMethod; private final SeaTunnelRowType seaTunnelRowType; - private final JsonToRowConverters.JsonToRowConverter runtimeConverter; + private final JsonToRowConverters.JsonToObjectConverter runtimeConverter; private final boolean keySchemaEnable; private final boolean valueSchemaEnable; /** Object mapper for parsing the JSON. */ @@ -83,7 +83,7 @@ public CompatibleKafkaConnectDeserializationSchema( // Runtime converter this.runtimeConverter = new JsonToRowConverters(failOnMissingField, ignoreParseErrors) - .createConverter(checkNotNull(seaTunnelRowType)); + .createRowConverter(checkNotNull(seaTunnelRowType)); } @Override @@ -130,7 +130,7 @@ private SeaTunnelRow convertJsonNode(JsonNode jsonNode) { try { org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode jsonData = objectMapper.readTree(jsonNode.toString()); - return (SeaTunnelRow) runtimeConverter.convert(jsonData); + return (SeaTunnelRow) runtimeConverter.convert(jsonData, null); } catch (Throwable t) { throw CommonError.jsonOperationError(FORMAT, jsonNode.toString(), t); } diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java index 1f3925192c7..afc2f070c60 100644 --- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java @@ -58,7 +58,7 @@ public class JsonDeserializationSchema implements DeserializationSchema fieldFormatterMap = new HashMap<>(); + public JsonToRowConverters(boolean failOnMissingField, boolean ignoreParseErrors) { this.failOnMissingField = failOnMissingField; this.ignoreParseErrors = ignoreParseErrors; } /** Creates a runtime converter which is null safe. */ - public JsonToRowConverter createConverter(SeaTunnelDataType type) { + public JsonToObjectConverter createConverter(SeaTunnelDataType type) { return wrapIntoNullableConverter(createNotNullConverter(type)); } /** Creates a runtime converter which assuming input object is not null. */ - private JsonToRowConverter createNotNullConverter(SeaTunnelDataType type) { + private JsonToObjectConverter createNotNullConverter(SeaTunnelDataType type) { SqlType sqlType = type.getSqlType(); switch (sqlType) { - case ROW: - return createRowConverter((SeaTunnelRowType) type); case NULL: - return new JsonToRowConverter() { + return new JsonToObjectConverter() { @Override - public Object convert(JsonNode jsonNode) { + public Object convert(JsonNode jsonNode, String fieldName) { return null; } }; case BOOLEAN: - return new JsonToRowConverter() { + return new JsonToObjectConverter() { @Override - public Object convert(JsonNode jsonNode) { + public Object convert(JsonNode jsonNode, String fieldName) { return convertToBoolean(jsonNode); } }; case TINYINT: - return new JsonToRowConverter() { + return new JsonToObjectConverter() { @Override - public Object convert(JsonNode jsonNode) { + public Object convert(JsonNode jsonNode, String fieldName) { return Byte.parseByte(jsonNode.asText().trim()); } }; case SMALLINT: - return new JsonToRowConverter() { + return new JsonToObjectConverter() { @Override - public Object convert(JsonNode jsonNode) { + public Object convert(JsonNode jsonNode, String fieldName) { return Short.parseShort(jsonNode.asText().trim()); } }; case INT: - return new JsonToRowConverter() { + return new JsonToObjectConverter() { @Override - public Object convert(JsonNode jsonNode) { + public Object convert(JsonNode jsonNode, String fieldName) { return convertToInt(jsonNode); } }; case BIGINT: - return new JsonToRowConverter() { + return new JsonToObjectConverter() { @Override - public Object convert(JsonNode jsonNode) { + public Object convert(JsonNode jsonNode, String fieldName) { return convertToLong(jsonNode); } }; case DATE: - return new JsonToRowConverter() { + return new JsonToObjectConverter() { @Override - public Object convert(JsonNode jsonNode) { - return convertToLocalDate(jsonNode); + public Object convert(JsonNode jsonNode, String fieldName) { + return convertToLocalDate(jsonNode, fieldName); } }; case TIME: - return new JsonToRowConverter() { + return new JsonToObjectConverter() { @Override - public Object convert(JsonNode jsonNode) { + public Object convert(JsonNode jsonNode, String fieldName) { return convertToLocalTime(jsonNode); } }; case TIMESTAMP: - return new JsonToRowConverter() { + return new JsonToObjectConverter() { @Override - public Object convert(JsonNode jsonNode) { - return convertToLocalDateTime(jsonNode); + public Object convert(JsonNode jsonNode, String fieldName) { + return convertToLocalDateTime(jsonNode, fieldName); } }; case FLOAT: - return new JsonToRowConverter() { + return new JsonToObjectConverter() { @Override - public Object convert(JsonNode jsonNode) { + public Object convert(JsonNode jsonNode, String fieldName) { return convertToFloat(jsonNode); } }; case DOUBLE: - return new JsonToRowConverter() { + return new JsonToObjectConverter() { @Override - public Object convert(JsonNode jsonNode) { + public Object convert(JsonNode jsonNode, String fieldName) { return convertToDouble(jsonNode); } }; case STRING: - return new JsonToRowConverter() { + return new JsonToObjectConverter() { @Override - public Object convert(JsonNode jsonNode) { + public Object convert(JsonNode jsonNode, String fieldName) { return convertToString(jsonNode); } }; case BYTES: - return new JsonToRowConverter() { + return new JsonToObjectConverter() { @Override - public Object convert(JsonNode jsonNode) { + public Object convert(JsonNode jsonNode, String fieldName) { return convertToBytes(jsonNode); } }; case DECIMAL: - return new JsonToRowConverter() { + return new JsonToObjectConverter() { @Override - public Object convert(JsonNode jsonNode) { + public Object convert(JsonNode jsonNode, String fieldName) { return convertToBigDecimal(jsonNode); } }; @@ -193,6 +195,8 @@ public Object convert(JsonNode jsonNode) { return createArrayConverter((ArrayType) type); case MAP: return createMapConverter((MapType) type); + case ROW: + return createRowConverter((SeaTunnelRowType) type); default: throw new SeaTunnelJsonFormatException( CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, @@ -245,8 +249,15 @@ private float convertToFloat(JsonNode jsonNode) { } } - private LocalDate convertToLocalDate(JsonNode jsonNode) { - return ISO_LOCAL_DATE.parse(jsonNode.asText()).query(TemporalQueries.localDate()); + private LocalDate convertToLocalDate(JsonNode jsonNode, String fieldName) { + String dateStr = jsonNode.asText(); + DateTimeFormatter dateFormatter = fieldFormatterMap.get(fieldName); + if (dateFormatter == null) { + dateFormatter = DateUtils.matchDateFormatter(dateStr); + fieldFormatterMap.put(fieldName, dateFormatter); + } + + return dateFormatter.parse(jsonNode.asText()).query(TemporalQueries.localDate()); } private LocalTime convertToLocalTime(JsonNode jsonNode) { @@ -254,9 +265,15 @@ private LocalTime convertToLocalTime(JsonNode jsonNode) { return parsedTime.query(TemporalQueries.localTime()); } - private LocalDateTime convertToLocalDateTime(JsonNode jsonNode) { - TemporalAccessor parsedTimestamp = - DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(jsonNode.asText()); + private LocalDateTime convertToLocalDateTime(JsonNode jsonNode, String fieldName) { + String datetimeStr = jsonNode.asText(); + DateTimeFormatter dateTimeFormatter = fieldFormatterMap.get(fieldName); + if (dateTimeFormatter == null) { + dateTimeFormatter = DateTimeUtils.matchDateTimeFormatter(datetimeStr); + fieldFormatterMap.put(fieldName, dateTimeFormatter); + } + + TemporalAccessor parsedTimestamp = dateTimeFormatter.parse(datetimeStr); LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime()); LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate()); return LocalDateTime.of(localDate, localTime); @@ -289,8 +306,8 @@ private BigDecimal convertToBigDecimal(JsonNode jsonNode) { return bigDecimal; } - private JsonToRowConverter createRowConverter(SeaTunnelRowType rowType) { - final JsonToRowConverter[] fieldConverters = + public JsonToObjectConverter createRowConverter(SeaTunnelRowType rowType) { + final JsonToObjectConverter[] fieldConverters = Arrays.stream(rowType.getFieldTypes()) .map( new Function, Object>() { @@ -300,17 +317,20 @@ public Object apply(SeaTunnelDataType seaTunnelDataType) { } }) .toArray( - new IntFunction() { + new IntFunction() { @Override - public JsonToRowConverter[] apply(int value) { - return new JsonToRowConverter[value]; + public JsonToObjectConverter[] apply(int value) { + return new JsonToObjectConverter[value]; } }); final String[] fieldNames = rowType.getFieldNames(); - return new JsonToRowConverter() { + return new JsonToObjectConverter() { @Override - public Object convert(JsonNode jsonNode) { + public SeaTunnelRow convert(JsonNode jsonNode, String rowFieldName) { + if (jsonNode == null || jsonNode.isNull() || jsonNode.isMissingNode()) { + return null; + } int arity = fieldNames.length; SeaTunnelRow row = new SeaTunnelRow(arity); for (int i = 0; i < arity; i++) { @@ -322,6 +342,9 @@ public Object convert(JsonNode jsonNode) { field = jsonNode.get(fieldName); } try { + if (StringUtils.isNotBlank(rowFieldName)) { + fieldName = rowFieldName + "." + fieldName; + } Object convertedField = convertField(fieldConverters[i], fieldName, field); row.setField(i, convertedField); } catch (Throwable t) { @@ -336,27 +359,27 @@ public Object convert(JsonNode jsonNode) { }; } - private JsonToRowConverter createArrayConverter(ArrayType type) { - JsonToRowConverter valueConverter = createConverter(type.getElementType()); - return new JsonToRowConverter() { + private JsonToObjectConverter createArrayConverter(ArrayType type) { + JsonToObjectConverter valueConverter = createConverter(type.getElementType()); + return new JsonToObjectConverter() { @Override - public Object convert(JsonNode jsonNode) { + public Object convert(JsonNode jsonNode, String fieldName) { Object arr = Array.newInstance(type.getElementType().getTypeClass(), jsonNode.size()); for (int i = 0; i < jsonNode.size(); i++) { - Array.set(arr, i, valueConverter.convert(jsonNode.get(i))); + Array.set(arr, i, valueConverter.convert(jsonNode.get(i), fieldName)); } return arr; } }; } - private JsonToRowConverter createMapConverter(MapType type) { - JsonToRowConverter keyConverter = createConverter(type.getKeyType()); - JsonToRowConverter valueConverter = createConverter(type.getValueType()); - return new JsonToRowConverter() { + private JsonToObjectConverter createMapConverter(MapType type) { + JsonToObjectConverter keyConverter = createConverter(type.getKeyType()); + JsonToObjectConverter valueConverter = createConverter(type.getValueType()); + return new JsonToObjectConverter() { @Override - public Object convert(JsonNode jsonNode) { + public Object convert(JsonNode jsonNode, String fieldName) { Map value = new HashMap<>(); jsonNode.fields() .forEachRemaining( @@ -373,8 +396,9 @@ public void accept(Map.Entry entry) { FORMAT, entry.getKey(), e); } value.put( - keyConverter.convert(keyNode), - valueConverter.convert(entry.getValue())); + keyConverter.convert(keyNode, fieldName + ".key"), + valueConverter.convert( + entry.getValue(), fieldName + ".value")); } }); return value; @@ -383,7 +407,7 @@ public void accept(Map.Entry entry) { } private Object convertField( - JsonToRowConverter fieldConverter, String fieldName, JsonNode field) { + JsonToObjectConverter fieldConverter, String fieldName, JsonNode field) { if (field == null) { if (failOnMissingField) { throw new IllegalArgumentException( @@ -392,19 +416,19 @@ private Object convertField( return null; } } else { - return fieldConverter.convert(field); + return fieldConverter.convert(field, fieldName); } } - private JsonToRowConverter wrapIntoNullableConverter(JsonToRowConverter converter) { - return new JsonToRowConverter() { + private JsonToObjectConverter wrapIntoNullableConverter(JsonToObjectConverter converter) { + return new JsonToObjectConverter() { @Override - public Object convert(JsonNode jsonNode) { + public Object convert(JsonNode jsonNode, String fieldName) { if (jsonNode == null || jsonNode.isNull() || jsonNode.isMissingNode()) { return null; } try { - return converter.convert(jsonNode); + return converter.convert(jsonNode, fieldName); } catch (RuntimeException e) { if (!ignoreParseErrors) { throw e; @@ -418,9 +442,8 @@ public Object convert(JsonNode jsonNode) { /** * Runtime converter that converts {@link JsonNode}s into objects of internal data structures. */ - @FunctionalInterface - public interface JsonToRowConverter extends Serializable { - Object convert(JsonNode jsonNode); + public interface JsonToObjectConverter extends Serializable { + Object convert(JsonNode jsonNode, String fieldName); } /** Exception which refers to parse errors in converters. */ diff --git a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java index 7a816258115..f552d7519ac 100644 --- a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java +++ b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java @@ -100,6 +100,9 @@ public void testSerDe() throws Exception { root.putObject("map").put("element", 123); root.putObject("multiSet").put("element", 2); root.putObject("map2map").putObject("inner_map").put("key", 234); + ObjectNode rowFieldNodes = root.deepCopy(); + rowFieldNodes.put("date", "1990-10-14T12:12:43.123"); + root.putIfAbsent("row", rowFieldNodes); byte[] serializedJson = objectMapper.writeValueAsBytes(root); @@ -117,7 +120,8 @@ public void testSerDe() throws Exception { "timestamp9", "map", "multiSet", - "map2map" + "map2map", + "row" }, new SeaTunnelDataType[] { BOOLEAN_TYPE, @@ -131,13 +135,42 @@ public void testSerDe() throws Exception { LocalTimeType.LOCAL_DATE_TIME_TYPE, new MapType(STRING_TYPE, LONG_TYPE), new MapType(STRING_TYPE, INT_TYPE), - new MapType(STRING_TYPE, new MapType(STRING_TYPE, INT_TYPE)) + new MapType(STRING_TYPE, new MapType(STRING_TYPE, INT_TYPE)), + new SeaTunnelRowType( + new String[] { + "bool", + "int", + "longValue", + "float", + "name", + "date", + "time", + "timestamp3", + "timestamp9", + "map", + "multiSet", + "map2map" + }, + new SeaTunnelDataType[] { + BOOLEAN_TYPE, + INT_TYPE, + LONG_TYPE, + FLOAT_TYPE, + STRING_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE, + LocalTimeType.LOCAL_TIME_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE, + new MapType(STRING_TYPE, LONG_TYPE), + new MapType(STRING_TYPE, INT_TYPE), + new MapType(STRING_TYPE, new MapType(STRING_TYPE, INT_TYPE)) + }) }); JsonDeserializationSchema deserializationSchema = new JsonDeserializationSchema(false, false, schema); - SeaTunnelRow expected = new SeaTunnelRow(12); + SeaTunnelRow expected = new SeaTunnelRow(13); expected.setField(0, true); expected.setField(1, intValue); expected.setField(2, longValue); @@ -151,6 +184,22 @@ public void testSerDe() throws Exception { expected.setField(10, multiSet); expected.setField(11, nestedMap); + SeaTunnelRow rowFieldRow = new SeaTunnelRow(12); + rowFieldRow.setField(0, true); + rowFieldRow.setField(1, intValue); + rowFieldRow.setField(2, longValue); + rowFieldRow.setField(3, floatValue); + rowFieldRow.setField(4, name); + rowFieldRow.setField(5, timestamp3.toLocalDateTime()); + rowFieldRow.setField(6, time); + rowFieldRow.setField(7, timestamp3.toLocalDateTime()); + rowFieldRow.setField(8, timestamp9.toLocalDateTime()); + rowFieldRow.setField(9, map); + rowFieldRow.setField(10, multiSet); + rowFieldRow.setField(11, nestedMap); + + expected.setField(12, rowFieldRow); + SeaTunnelRow seaTunnelRow = deserializationSchema.deserialize(serializedJson); assertEquals(expected, seaTunnelRow); @@ -423,62 +472,75 @@ public void testMapConverterKeyType() throws JsonProcessingException { JsonToRowConverters converters = new JsonToRowConverters(true, false); - JsonToRowConverters.JsonToRowConverter stringConverter = + JsonToRowConverters.JsonToObjectConverter stringConverter = converters.createConverter(stringKeyMapType); - JsonToRowConverters.JsonToRowConverter booleanConverter = + JsonToRowConverters.JsonToObjectConverter booleanConverter = converters.createConverter(booleanKeyMapType); - JsonToRowConverters.JsonToRowConverter tinyintConverter = + JsonToRowConverters.JsonToObjectConverter tinyintConverter = converters.createConverter(tinyintKeyMapType); - JsonToRowConverters.JsonToRowConverter smallintConverter = + JsonToRowConverters.JsonToObjectConverter smallintConverter = converters.createConverter(smallintKeyMapType); - JsonToRowConverters.JsonToRowConverter intConverter = + JsonToRowConverters.JsonToObjectConverter intConverter = converters.createConverter(intKeyMapType); - JsonToRowConverters.JsonToRowConverter bigintConverter = + JsonToRowConverters.JsonToObjectConverter bigintConverter = converters.createConverter(bigintKeyMapType); - JsonToRowConverters.JsonToRowConverter floatConverter = + JsonToRowConverters.JsonToObjectConverter floatConverter = converters.createConverter(floatKeyMapType); - JsonToRowConverters.JsonToRowConverter doubleConverter = + JsonToRowConverters.JsonToObjectConverter doubleConverter = converters.createConverter(doubleKeyMapType); - JsonToRowConverters.JsonToRowConverter dateConverter = + JsonToRowConverters.JsonToObjectConverter dateConverter = converters.createConverter(dateKeyMapType); - JsonToRowConverters.JsonToRowConverter timeConverter = + JsonToRowConverters.JsonToObjectConverter timeConverter = converters.createConverter(timeKeyMapType); - JsonToRowConverters.JsonToRowConverter timestampConverter = + JsonToRowConverters.JsonToObjectConverter timestampConverter = converters.createConverter(timestampKeyMapType); - JsonToRowConverters.JsonToRowConverter decimalConverter = + JsonToRowConverters.JsonToObjectConverter decimalConverter = converters.createConverter(decimalKeyMapType); - assertMapKeyType("{\"abc\": \"xxx\"}", stringConverter, "abc"); - assertMapKeyType("{\"false\": \"xxx\"}", booleanConverter, false); - assertMapKeyType("{\"1\": \"xxx\"}", tinyintConverter, (byte) 1); - assertMapKeyType("{\"12\": \"xxx\"}", smallintConverter, (short) 12); - assertMapKeyType("{\"123\": \"xxx\"}", intConverter, 123); - assertMapKeyType("{\"12345\": \"xxx\"}", bigintConverter, 12345L); - assertMapKeyType("{\"1.0001\": \"xxx\"}", floatConverter, 1.0001f); - assertMapKeyType("{\"999.9999\": \"xxx\"}", doubleConverter, 999.9999); - assertMapKeyType("{\"9999.23\": \"xxx\"}", decimalConverter, BigDecimal.valueOf(9999.23)); + assertMapKeyType("{\"abc\": \"xxx\"}", stringConverter, "abc", "stringConverter"); + assertMapKeyType("{\"false\": \"xxx\"}", booleanConverter, false, "booleanConverter"); + assertMapKeyType("{\"1\": \"xxx\"}", tinyintConverter, (byte) 1, "tinyintConverter"); + assertMapKeyType("{\"12\": \"xxx\"}", smallintConverter, (short) 12, "smallintConverter"); + assertMapKeyType("{\"123\": \"xxx\"}", intConverter, 123, "intConverter"); + assertMapKeyType("{\"12345\": \"xxx\"}", bigintConverter, 12345L, "bigintConverter"); + assertMapKeyType("{\"1.0001\": \"xxx\"}", floatConverter, 1.0001f, "floatConverter"); + assertMapKeyType("{\"999.9999\": \"xxx\"}", doubleConverter, 999.9999, "doubleConverter"); + assertMapKeyType( + "{\"9999.23\": \"xxx\"}", + decimalConverter, + BigDecimal.valueOf(9999.23), + "decimalConverter"); LocalDate date = DateTimeFormatter.ISO_LOCAL_DATE .parse("2024-01-26") .query(TemporalQueries.localDate()); - assertMapKeyType("{\"2024-01-26\": \"xxx\"}", dateConverter, date); + assertMapKeyType( + "{\"2024-01-26\": \"xxx\"}", dateConverter, date, "iso_local_date_string_map"); LocalTime time = JsonToRowConverters.TIME_FORMAT .parse("12:00:12.001") .query(TemporalQueries.localTime()); - assertMapKeyType("{\"12:00:12.001\": \"xxx\"}", timeConverter, time); + assertMapKeyType( + "{\"12:00:12.001\": \"xxx\"}", timeConverter, time, "time_format_string_map"); LocalDateTime timestamp = LocalDateTime.of(date, time); - assertMapKeyType("{\"2024-01-26T12:00:12.001\": \"xxx\"}", timestampConverter, timestamp); + assertMapKeyType( + "{\"2024-01-26T12:00:12.001\": \"xxx\"}", + timestampConverter, + timestamp, + "timestamp_string_map"); } private void assertMapKeyType( - String payload, JsonToRowConverters.JsonToRowConverter converter, Object expect) + String payload, + JsonToRowConverters.JsonToObjectConverter converter, + Object expect, + String fieldName) throws JsonProcessingException { JsonNode keyMapNode = JsonUtils.stringToJsonNode(payload); - Map keyMap = (Map) converter.convert(keyMapNode); + Map keyMap = (Map) converter.convert(keyMapNode, fieldName); assertEquals(expect, keyMap.keySet().iterator().next()); } } diff --git a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java index c4ba37996a2..f6f552732ac 100644 --- a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java +++ b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java @@ -41,32 +41,41 @@ import java.io.IOException; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; +import java.time.temporal.TemporalAccessor; +import java.time.temporal.TemporalQueries; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; public class TextDeserializationSchema implements DeserializationSchema { private final SeaTunnelRowType seaTunnelRowType; private final String[] separators; - private final DateUtils.Formatter dateFormatter; - private final DateTimeUtils.Formatter dateTimeFormatter; - private final TimeUtils.Formatter timeFormatter; private final String encoding; private final TextLineSplitor splitor; + @SuppressWarnings("MagicNumber") + public static final DateTimeFormatter TIME_FORMAT = + new DateTimeFormatterBuilder() + .appendPattern("HH:mm:ss") + .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true) + .toFormatter(); + + public Map fieldFormatterMap = new HashMap<>(); + private TextDeserializationSchema( @NonNull SeaTunnelRowType seaTunnelRowType, String[] separators, - DateUtils.Formatter dateFormatter, - DateTimeUtils.Formatter dateTimeFormatter, - TimeUtils.Formatter timeFormatter, String encoding, TextLineSplitor splitor) { this.seaTunnelRowType = seaTunnelRowType; this.separators = separators; - this.dateFormatter = dateFormatter; - this.dateTimeFormatter = dateTimeFormatter; - this.timeFormatter = timeFormatter; this.encoding = encoding; this.splitor = splitor; } @@ -129,13 +138,7 @@ public Builder textLineSplitor(TextLineSplitor splitor) { public TextDeserializationSchema build() { return new TextDeserializationSchema( - seaTunnelRowType, - separators, - dateFormatter, - dateTimeFormatter, - timeFormatter, - encoding, - textLineSplitor); + seaTunnelRowType, separators, encoding, textLineSplitor); } } @@ -145,7 +148,12 @@ public SeaTunnelRow deserialize(byte[] message) throws IOException { Map splitsMap = splitLineBySeaTunnelRowType(content, seaTunnelRowType, 0); Object[] objects = new Object[seaTunnelRowType.getTotalFields()]; for (int i = 0; i < objects.length; i++) { - objects[i] = convert(splitsMap.get(i), seaTunnelRowType.getFieldType(i), 0); + objects[i] = + convert( + splitsMap.get(i), + seaTunnelRowType.getFieldType(i), + 0, + seaTunnelRowType.getFieldNames()[i]); } return new SeaTunnelRow(objects); } @@ -172,7 +180,8 @@ private Map splitLineBySeaTunnelRowType( return splitsMap; } - private Object convert(String field, SeaTunnelDataType fieldType, int level) { + private Object convert( + String field, SeaTunnelDataType fieldType, int level, String fieldName) { if (StringUtils.isBlank(field)) { return null; } @@ -182,7 +191,7 @@ private Object convert(String field, SeaTunnelDataType fieldType, int level) String[] elements = field.split(separators[level + 1]); ArrayList objectArrayList = new ArrayList<>(); for (String element : elements) { - objectArrayList.add(convert(element, elementType, level + 1)); + objectArrayList.add(convert(element, elementType, level + 1, fieldName)); } switch (elementType.getSqlType()) { case STRING: @@ -216,11 +225,11 @@ private Object convert(String field, SeaTunnelDataType fieldType, int level) for (String kv : kvs) { String[] splits = kv.split(separators[level + 2]); if (splits.length < 2) { - objectMap.put(convert(splits[0], keyType, level + 1), null); + objectMap.put(convert(splits[0], keyType, level + 1, fieldName), null); } else { objectMap.put( - convert(splits[0], keyType, level + 1), - convert(splits[1], valueType, level + 1)); + convert(splits[0], keyType, level + 1, fieldName), + convert(splits[1], valueType, level + 1, fieldName)); } } return objectMap; @@ -247,21 +256,39 @@ private Object convert(String field, SeaTunnelDataType fieldType, int level) case BYTES: return field.getBytes(StandardCharsets.UTF_8); case DATE: - return DateUtils.parse(field, dateFormatter); + DateTimeFormatter dateFormatter = fieldFormatterMap.get(fieldName); + if (dateFormatter == null) { + dateFormatter = DateUtils.matchDateFormatter(field); + fieldFormatterMap.put(fieldName, dateFormatter); + } + + return dateFormatter.parse(field).query(TemporalQueries.localDate()); case TIME: - return TimeUtils.parse(field, timeFormatter); + TemporalAccessor parsedTime = TIME_FORMAT.parse(field); + return parsedTime.query(TemporalQueries.localTime()); case TIMESTAMP: - return DateTimeUtils.parse(field, dateTimeFormatter); + DateTimeFormatter dateTimeFormatter = fieldFormatterMap.get(fieldName); + if (dateTimeFormatter == null) { + dateTimeFormatter = DateTimeUtils.matchDateTimeFormatter(field); + fieldFormatterMap.put(fieldName, dateTimeFormatter); + } + + TemporalAccessor parsedTimestamp = dateTimeFormatter.parse(field); + LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime()); + LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate()); + return LocalDateTime.of(localDate, localTime); case ROW: Map splitsMap = splitLineBySeaTunnelRowType(field, (SeaTunnelRowType) fieldType, level + 1); Object[] objects = new Object[splitsMap.size()]; + String[] eleFieldNames = ((SeaTunnelRowType) fieldType).getFieldNames(); for (int i = 0; i < objects.length; i++) { objects[i] = convert( splitsMap.get(i), ((SeaTunnelRowType) fieldType).getFieldType(i), - level + 1); + level + 1, + fieldName + "." + eleFieldNames[i]); } return new SeaTunnelRow(objects); default: diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java index 6f5397e887b..34ebee0ba5d 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java @@ -53,7 +53,7 @@ public class JsonPathTransform extends MultipleFieldOutputTransform { private final JsonPathTransformConfig config; private final SeaTunnelRowType seaTunnelRowType; - private JsonToRowConverters.JsonToRowConverter[] converters; + private JsonToRowConverters.JsonToObjectConverter[] converters; private SeaTunnelRowType outputSeaTunnelRowType; private int[] srcFieldIndexArr; @@ -83,7 +83,7 @@ private void initConverters() { this.config.getColumnConfigs().stream() .map(ColumnConfig::getDestType) .map(jsonToRowConverters::createConverter) - .toArray(JsonToRowConverters.JsonToRowConverter[]::new); + .toArray(JsonToRowConverters.JsonToObjectConverter[]::new); } private void initOutputSeaTunnelRowType() { @@ -136,7 +136,7 @@ private Object doTransform( SeaTunnelDataType inputDataType, Object value, ColumnConfig columnConfig, - JsonToRowConverters.JsonToRowConverter converter) { + JsonToRowConverters.JsonToObjectConverter converter) { if (value == null) { return null; } @@ -166,7 +166,7 @@ private Object doTransform( } Object result = JSON_PATH_CACHE.get(columnConfig.getPath()).read(jsonString); JsonNode jsonNode = JsonUtils.toJsonNode(result); - return converter.convert(jsonNode); + return converter.convert(jsonNode, null); } catch (JsonPathException e) { throw new TransformException(JSON_PATH_COMPILE_ERROR, e.getMessage()); }