diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java index c979653525c..b5380d916fc 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java @@ -51,6 +51,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL; @@ -128,7 +130,16 @@ public SourceSplitEnumerator restoreEn private List initColumnsIndex(InfluxDB influxdb) { // query one row to get column info - String query = sourceConfig.getSql() + QUERY_LIMIT; + String sql = sourceConfig.getSql(); + String query = sql + QUERY_LIMIT; + // if sql contains tz(), can't be append QUERY_LIMIT at last . see bug #4231 + int start = containTzFunction(sql.toLowerCase()); + if (start > 0) { + StringBuilder tmpSql = new StringBuilder(sql); + tmpSql.insert(start - 1, QUERY_LIMIT).append(" "); + query = tmpSql.toString(); + } + try { QueryResult queryResult = influxdb.query(new Query(query, sourceConfig.getDatabase())); @@ -145,4 +156,14 @@ private List initColumnsIndex(InfluxDB influxdb) { e); } } + + private static int containTzFunction(String sql) { + Pattern pattern = Pattern.compile("tz\\(.*\\)"); + Matcher matcher = pattern.matcher(sql); + if (matcher.find()) { + int start = matcher.start(); + return start; + } + return -1; + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java index d6667966220..ddc7afadacb 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java @@ -208,6 +208,40 @@ public void testInfluxdb(TestContainer container) throws IOException, Interrupte } } + @TestTemplate + public void testInfluxdbWithTz(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/influxdb-to-influxdb-with-tz.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + String sourceSql = + String.format("select * from %s order by time", INFLUXDB_SOURCE_MEASUREMENT); + String sinkSql = String.format("select * from %s order by time", INFLUXDB_SINK_MEASUREMENT); + QueryResult sourceQueryResult = influxDB.query(new Query(sourceSql, INFLUXDB_DATABASE)); + QueryResult sinkQueryResult = influxDB.query(new Query(sinkSql, INFLUXDB_DATABASE)); + // assert data count + Assertions.assertEquals( + sourceQueryResult.getResults().size(), sinkQueryResult.getResults().size()); + // assert data values + List> sourceValues = + sourceQueryResult.getResults().get(0).getSeries().get(0).getValues(); + List> sinkValues = + sinkQueryResult.getResults().get(0).getSeries().get(0).getValues(); + int rowSize = sourceValues.size(); + int colSize = sourceValues.get(0).size(); + + for (int row = 0; row < rowSize; row++) { + for (int col = 0; col < colSize; col++) { + Object sourceColValue = sourceValues.get(row).get(col); + Object sinkColValue = sinkValues.get(row).get(col); + + if (!Objects.deepEquals(sourceColValue, sinkColValue)) { + Assertions.assertEquals(sourceColValue, sinkColValue); + } + } + } + } + private void initializeInfluxDBClient() throws ConnectException { InfluxDBConfig influxDBConfig = new InfluxDBConfig(influxDBConnectUrl); influxDB = InfluxDBClient.getInfluxDB(influxDBConfig); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf new file mode 100644 index 00000000000..4b7666130da --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb-with-tz.conf @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + InfluxDB { + url = "http://influxdb-host:8086" + sql = "select label, c_string, c_double, c_bigint, c_float, c_int, c_smallint, c_boolean from source tz('Asia/Shanghai')" + database = "test" + schema { + fields { + label = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } + } +} + +transform { +} + +sink { + InfluxDB { + url = "http://influxdb-host:8086" + database = "test" + measurement = "sink" + key_time = "time" + key_tags = ["label"] + batch_size = 1 + } +} \ No newline at end of file