Skip to content

Commit

Permalink
[Refactor][Connector] Refactor connector dialect (#501) (#502)
Browse files Browse the repository at this point in the history
  • Loading branch information
zixi0825 authored Jan 26, 2025
1 parent fd024f4 commit 0400a78
Show file tree
Hide file tree
Showing 109 changed files with 1,070 additions and 350 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface ConnectorFactory {

Dialect getDialect();

ConnectorParameterConverter getConnectorParameterConverter();
ParameterConverter getConnectorParameterConverter();

Executor getExecutor();

Expand All @@ -42,4 +42,10 @@ public interface ConnectorFactory {
StatementSplitter getStatementSplitter();

StatementParser getStatementParser();

MetricScript getMetricScript();

default Boolean showInFrontend() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.connector.api;

public interface MetricScript {

default String selectFromTable() {
return "select * from ${table}";
}

default String baseActualValue(String uniqueKey) {
return "select count(1) as actual_value_"+ uniqueKey +" from ${invalidate_items_table}";
}

default String baseDirectActualValue(String uniqueKey, String invalidateItemsSql) {
return "select count(1) as actual_value_"+ uniqueKey +" from ( " + invalidateItemsSql + " ) t";
}

default String avgActualValue(String uniqueKey) {
return "select avg(${column}) as actual_value_"+ uniqueKey +" from ${table}";
}

default String avgLengthActualValue(String uniqueKey) {
return "select avg(length(${column})) as actual_value_"+ uniqueKey +" from ${table}";
}

default String countDistinctActualValue(String uniqueKey) {
return "select count(distinct(${column})) as actual_value_"+ uniqueKey +" from ${table}";
}

default String histogramActualValue(String uniqueKey, String where) {
return "select concat(k, '\001', cast(count as varchar)) as actual_value_" + uniqueKey + " from (select if(${column} is null, 'NULL', cast(${column} as varchar)) as k, count(1) as count from ${table} " + where + " group by ${column} order by count desc limit 50) T";
}

default String maxActualValue(String uniqueKey) {
return "select max(${column}) as actual_value_"+ uniqueKey +" from ${table}";
}

default String maxLengthActualValue(String uniqueKey) {
return "select max(length(${column})) as actual_value_"+ uniqueKey +" from ${table}";
}

default String minActualValue(String uniqueKey) {
return "select min(${column}) as actual_value_"+ uniqueKey +" from ${table}";
}

default String minLengthActualValue(String uniqueKey) {
return "select min(length(${column})) as actual_value_"+ uniqueKey +" from ${table}";
}

default String stdDevActualValue(String uniqueKey) {
return "select stddev(${column}) as actual_value_"+ uniqueKey +" from ${table}";
}

default String sumActualValue(String uniqueKey) {
return "select sum(${column}) as actual_value_"+ uniqueKey +" from ${table}";
}

default String varianceActualValue(String uniqueKey) {
return "select variance(${column}) as actual_value_"+ uniqueKey +" from ${table}";
}

default String groupByHavingCountForUnique() {
return " group by ${column} having count(1) = 1 ";
}

default String groupByHavingCountForDuplicate() {
return " group by ${column} having count(${column}) > 1 ";
}

default String columnInEnums() {
return " (${column} in ( ${enum_list} )) ";
}

default String columnNotInEnums() {
return " (${column} not in ( ${enum_list} ) or ${column} is null) ";
}

default String columnNotNull() {
return " ${column} is not null ";
}

default String columnIsNull() {
return " (${column} is null) ";
}

default String columnLengthCompare() {
return " length(${column}) ${comparator} ${length} ";
}

default String columnNotMatchRegex() {
return " ${column} not regexp '${regexp}' ";
}

default String columnMatchRegex() {
return " ${column} regexp '${regexp}' ";
}

default String columnGteMin() {
return " ${column} >= ${min} ";
}

default String columnLteMax() {
return " ${column} <= ${max} ";
}

default String columnIsBlank() {
return " (${column} is null or ${column} = '') ";
}

default String timeBetweenWithFormat() {
return " (DATE_FORMAT(${column}, '${datetime_format}') <= DATE_FORMAT(${deadline_time}, '${datetime_format}') ) AND (DATE_FORMAT(${column}, '${datetime_format}') >= DATE_FORMAT(${begin_time}, '${datetime_format}')) ";
}

default String dailyAvg(String uniqueKey) {
return "select round(avg(actual_value),2) as expected_value_" + uniqueKey +
" from dv_actual_values where data_time >=date_format(${data_time},'%Y-%m-%d')" +
" and data_time < date_add(date_format(${data_time},'%Y-%m-%d'), interval 1 DAY)" +
" and unique_code = ${unique_code}";
}

default String last7DayAvg(String uniqueKey) {
return "select round(avg(actual_value),2) as expected_value_" + uniqueKey +
" from dv_actual_values where data_time >= date_sub(date_format(${data_time},'%Y-%m-%d'),interval 7 DAY)" +
" and data_time < date_add(date_format(${data_time},'%Y-%m-%d'),interval 1 DAY) and unique_code = ${unique_code}";
}

default String last30DayAvg(String uniqueKey) {
return "select round(avg(actual_value),2) as expected_value_" + uniqueKey +
" from dv_actual_values where data_time >= date_sub(date_format(${data_time},'%Y-%m-%d'),interval 30 DAY)" +
" and data_time < date_add(date_format(${data_time},'%Y-%m-%d'),interval 1 DAY) and unique_code = ${unique_code}";
}

default String monthlyAvg(String uniqueKey) {
return "select round(avg(actual_value),2) as expected_value_" + uniqueKey +
" from dv_actual_values where data_time >= date_format(curdate(), '%Y-%m-01') and data_time < date_add(date_format(${data_time},'%Y-%m-%d'),interval 1 DAY)" +
" and unique_code = ${unique_code}";
}

default String weeklyAvg(String uniqueKey) {
return "select round(avg(actual_value),2) as expected_value_" + uniqueKey +
" from dv_actual_values where data_time >= date_sub(${data_time},interval weekday(${data_time}) + 0 day)" +
" and data_time < date_add(date_format(${data_time},'%Y-%m-%d'),interval 1 DAY) and unique_code = ${unique_code}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import static io.datavines.common.ConfigConstants.*;

public interface ConnectorParameterConverter {
public interface ParameterConverter {

Map<String,Object> converter(Map<String,Object> parameter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.datavines</groupId>
<artifactId>datavines-connector-spark</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@
*/
package io.datavines.connector.plugin;

import io.datavines.connector.api.Connector;
import io.datavines.connector.api.ConnectorParameterConverter;
import io.datavines.connector.api.Dialect;
import io.datavines.connector.api.Executor;
import io.datavines.connector.api.*;

public class ClickHouseConnectorFactory extends AbstractJdbcConnectorFactory {

@Override
public ConnectorParameterConverter getConnectorParameterConverter() {
return new ClickHouseConnectorParameterConverter();
public ParameterConverter getConnectorParameterConverter() {
return new ClickHouseParameterConverter();
}

@Override
Expand All @@ -42,4 +39,9 @@ public Connector getConnector() {
public Executor getExecutor() {
return new ClickHouseExecutor(getDataSourceClient());
}

@Override
public MetricScript getMetricScript() {
return new ClickHouseMetricScript();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,10 @@
*/
package io.datavines.connector.plugin;

import java.util.Map;

import static io.datavines.common.ConfigConstants.STD_DEV_KEY;
import static io.datavines.common.ConfigConstants.VARIANCE_KEY;

public class ClickHouseDialect extends JdbcDialect {

@Override
public String getDriver() {
return "ru.yandex.clickhouse.ClickHouseDriver";
}

@Override
public Map<String, String> getDialectKeyMap() {
super.getDialectKeyMap();
dialectKeyMap.put(STD_DEV_KEY, "STDDEV_POP");
dialectKeyMap.put(VARIANCE_KEY, "VAR_POP");
return dialectKeyMap;

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.connector.plugin;

public class ClickHouseMetricScript extends JdbcMetricScript {

@Override
public String stdDevActualValue(String uniqueKey) {
return "select STDDEV_POP(${column}) as actual_value_"+ uniqueKey +" from ${table}";
}

@Override
public String varianceActualValue(String uniqueKey) {
return "select VAR_POP(${column}) as actual_value_"+ uniqueKey +" from ${table}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import static io.datavines.common.ConfigConstants.*;

public class ClickHouseConnectorParameterConverter extends JdbcConnectorParameterConverter {
public class ClickHouseParameterConverter extends JdbcParameterConverter {

@Override
protected String getUrl(Map<String, Object> parameter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
public class DatabendConnectorFactory extends AbstractJdbcConnectorFactory {

@Override
public ConnectorParameterConverter getConnectorParameterConverter() {
return new DatabendConnectorParameterConverter();
public ParameterConverter getConnectorParameterConverter() {
return new DatabendParameterConverter();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import static io.datavines.common.ConfigConstants.*;

public class DatabendConnectorParameterConverter extends JdbcConnectorParameterConverter {
public class DatabendParameterConverter extends JdbcParameterConverter {

@Override
protected String getUrl(Map<String, Object> parameter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public Dialect getDialect() {
}

@Override
public ConnectorParameterConverter getConnectorParameterConverter() {
return new DmConnectorParameterConverter();
public ParameterConverter getConnectorParameterConverter() {
return new DmParameterConverter();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import static io.datavines.common.ConfigConstants.*;
import static io.datavines.common.ConfigConstants.PROPERTIES;

public class DmConnectorParameterConverter extends JdbcConnectorParameterConverter {
public class DmParameterConverter extends JdbcParameterConverter {
@Override
protected String getUrl(Map<String, Object> parameter) {
// in dm jdbc url, the database is not need.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
public class DorisConnectorFactory extends AbstractJdbcConnectorFactory {

@Override
public ConnectorParameterConverter getConnectorParameterConverter() {
return new DorisConnectorParameterConverter();
public ParameterConverter getConnectorParameterConverter() {
return new DorisParameterConverter();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import static io.datavines.common.ConfigConstants.*;

public class DorisConnectorParameterConverter extends MysqlConnectorParameterConverter {
public class DorisParameterConverter extends MysqlParameterConverter {

@Override
protected String getUrl(Map<String, Object> parameter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public Dialect getDialect() {
}

@Override
public ConnectorParameterConverter getConnectorParameterConverter() {
return new FileConnectorParameterConverter();
public ParameterConverter getConnectorParameterConverter() {
return new FileParameterConverter();
}

@Override
Expand Down Expand Up @@ -74,4 +74,14 @@ public StatementSplitter getStatementSplitter() {
public StatementParser getStatementParser() {
return null;
}

@Override
public MetricScript getMetricScript() {
return null;
}

@Override
public Boolean showInFrontend() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
*/
package io.datavines.connector.plugin;

import io.datavines.connector.api.ConnectorParameterConverter;
import io.datavines.connector.api.ParameterConverter;

import java.util.Map;

public class FileConnectorParameterConverter implements ConnectorParameterConverter {
public class FileParameterConverter implements ParameterConverter {

@Override
public Map<String, Object> converter(Map<String, Object> parameter) {
Expand Down
Loading

0 comments on commit 0400a78

Please sign in to comment.