Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][mysql-cdc/jdbc] Support mysql 5.5 versions #6710

Merged
merged 1 commit into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions docs/en/connector-v2/source/MySQL-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ describes how to set up the MySQL CDC connector to run SQL queries against MySQL

## Supported DataSource Info

| Datasource | Supported versions | Driver | Url | Maven |
|------------|-------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------|----------------------------------|----------------------------------------------------------------------|
| MySQL | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x </li><li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x </li> | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28 |
| Datasource | Supported versions | Driver | Url | Maven |
|------------|------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------|----------------------------------|----------------------------------------------------------------------|
| MySQL | <li> [MySQL](https://dev.mysql.com/doc): 5.5, 5.6, 5.7, 8.0.x </li><li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x </li> | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28 |

## Using Dependency

Expand Down Expand Up @@ -92,9 +92,11 @@ server-id = 223344
log_bin = mysql-bin
expire_logs_days = 10
binlog_format = row
# mysql 5.6+ requires binlog_row_image to be set to FULL
binlog_row_image = FULL

# enable gtid mode
# mysql 5.6+ requires gtid_mode to be set to ON
gtid_mode = on
enforce_gtid_consistency = on
```
Expand All @@ -107,6 +109,21 @@ enforce_gtid_consistency = on

4. Confirm your changes by checking the binlog status once more:

MySQL 5.5:

```sql
mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+----------------+
| Variable_name | Value |
+--------------------------+----------------+
| binlog_format | ROW |
| log_bin | ON |
+--------------------------+----------------+
5 rows in set (0.00 sec)
```

MySQL 5.6+:

```sql
mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+----------------+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,6 @@ public static org.apache.seatunnel.api.table.catalog.Column convertToSeaTunnelCo
default:
break;
}
return MySqlTypeConverter.INSTANCE.convert(builder.build());
return MySqlTypeConverter.DEFAULT_INSTANCE.convert(builder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlVersion;

import com.google.common.base.Preconditions;
import com.mysql.cj.MysqlType;
Expand All @@ -39,6 +40,7 @@
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
Expand All @@ -56,9 +58,14 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
SYS_DATABASES.add("sys");
}

private MySqlVersion version;
private MySqlTypeConverter typeConverter;

public MySqlCatalog(
String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
super(catalogName, username, pwd, urlInfo, null);
this.version = resolveVersion();
this.typeConverter = new MySqlTypeConverter(version);
}

@Override
Expand Down Expand Up @@ -130,7 +137,8 @@ protected Column buildColumn(ResultSet resultSet) throws SQLException {
// e.g. `varchar(10)` is 40
long charOctetLength = resultSet.getLong("CHARACTER_OCTET_LENGTH");
// e.g. `timestamp(3)` is 3
int timePrecision = resultSet.getInt("DATETIME_PRECISION");
int timePrecision =
MySqlVersion.V_5_5.equals(version) ? 0 : resultSet.getInt("DATETIME_PRECISION");

Preconditions.checkArgument(!(numberPrecision > 0 && charOctetLength > 0));
Preconditions.checkArgument(!(numberScale > 0 && timePrecision > 0));
Expand All @@ -152,12 +160,13 @@ protected Column buildColumn(ResultSet resultSet) throws SQLException {
.defaultValue(defaultValue)
.comment(comment)
.build();
return MySqlTypeConverter.INSTANCE.convert(typeDefine);
return typeConverter.convert(typeDefine);
}

@Override
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
return MysqlCreateTableSqlBuilder.builder(tablePath, table).build(table.getCatalogName());
return MysqlCreateTableSqlBuilder.builder(tablePath, table, typeConverter)
.build(table.getCatalogName());
}

@Override
Expand All @@ -179,7 +188,8 @@ protected String getDropDatabaseSql(String databaseName) {
@Override
public CatalogTable getTable(String sqlQuery) throws SQLException {
Connection defaultConnection = getConnection(defaultUrl);
return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new MySqlTypeMapper());
return CatalogUtils.getCatalogTable(
defaultConnection, sqlQuery, new MySqlTypeMapper(typeConverter));
}

@Override
Expand All @@ -193,4 +203,18 @@ public String getExistDataSql(TablePath tablePath) {
"SELECT * FROM `%s`.`%s` LIMIT 1;",
tablePath.getDatabaseName(), tablePath.getTableName());
}

private MySqlVersion resolveVersion() {
try (Statement statement = getConnection(defaultUrl).createStatement();
ResultSet resultSet = statement.executeQuery("SELECT VERSION()")) {
resultSet.next();
return MySqlVersion.parse(resultSet.getString(1));
} catch (Exception e) {
log.info(
"Failed to get mysql version, fallback to default version: {}",
MySqlVersion.V_5_7,
e);
return MySqlVersion.V_5_7;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
Expand Down Expand Up @@ -57,20 +58,23 @@ public class MysqlCreateTableSqlBuilder {

private String fieldIde;

private MysqlCreateTableSqlBuilder(String tableName) {
private final MySqlTypeConverter typeConverter;

private MysqlCreateTableSqlBuilder(String tableName, MySqlTypeConverter typeConverter) {
checkNotNull(tableName, "tableName must not be null");
this.tableName = tableName;
this.typeConverter = typeConverter;
}

public static MysqlCreateTableSqlBuilder builder(
TablePath tablePath, CatalogTable catalogTable) {
TablePath tablePath, CatalogTable catalogTable, MySqlTypeConverter typeConverter) {
checkNotNull(tablePath, "tablePath must not be null");
checkNotNull(catalogTable, "catalogTable must not be null");

TableSchema tableSchema = catalogTable.getTableSchema();
checkNotNull(tableSchema, "tableSchema must not be null");

return new MysqlCreateTableSqlBuilder(tablePath.getTableName())
return new MysqlCreateTableSqlBuilder(tablePath.getTableName(), typeConverter)
.comment(catalogTable.getComment())
// todo: set charset and collate
.engine(null)
Expand Down Expand Up @@ -167,10 +171,16 @@ private String buildColumnIdentifySql(Column column, String catalogName) {
final List<String> columnSqls = new ArrayList<>();
columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "`"));
boolean isSupportDef = true;
if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)) {

if ((SqlType.TIME.equals(column.getDataType().getSqlType())
|| SqlType.TIMESTAMP.equals(column.getDataType().getSqlType()))
&& column.getScale() != null) {
BasicTypeDefine<MysqlType> typeDefine = typeConverter.reconvert(column);
columnSqls.add(typeDefine.getColumnType());
} else if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)) {
columnSqls.add(column.getSourceType());
} else {
BasicTypeDefine<MysqlType> typeDefine = MySqlTypeConverter.INSTANCE.reconvert(column);
BasicTypeDefine<MysqlType> typeDefine = typeConverter.reconvert(column);
columnSqls.add(typeDefine.getColumnType());
}
// nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public SeaTunnelDataType<?> toSeaTunnelType(
.scale(scale)
.build();

return MySqlTypeConverter.INSTANCE.convert(typeDefine).getDataType();
return MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine).getDataType();
}

@Override
Expand All @@ -122,7 +122,8 @@ public MysqlType toConnectorType(
.nullable(true)
.build();

BasicTypeDefine<MysqlType> typeDefine = MySqlTypeConverter.INSTANCE.reconvert(column);
BasicTypeDefine<MysqlType> typeDefine =
MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
return typeDefine.getNativeType();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,14 @@ public class MySqlTypeConverter implements TypeConverter<BasicTypeDefine<MysqlTy
public static final long POWER_2_24 = (long) Math.pow(2, 24);
public static final long POWER_2_32 = (long) Math.pow(2, 32);
public static final long MAX_VARBINARY_LENGTH = POWER_2_16 - 4;
public static final MySqlTypeConverter INSTANCE = new MySqlTypeConverter();
public static final MySqlTypeConverter DEFAULT_INSTANCE =
new MySqlTypeConverter(MySqlVersion.V_5_7);

private final MySqlVersion version;

public MySqlTypeConverter(MySqlVersion version) {
this.version = version;
}

@Override
public String identifier() {
Expand Down Expand Up @@ -462,7 +469,9 @@ public BasicTypeDefine<MysqlType> reconvert(Column column) {
case TIME:
builder.nativeType(MysqlType.TIME);
builder.dataType(MYSQL_TIME);
if (column.getScale() != null && column.getScale() > 0) {
if (version.isAtOrBefore(MySqlVersion.V_5_5)) {
builder.columnType(MYSQL_TIME);
} else if (column.getScale() != null && column.getScale() > 0) {
int timeScale = column.getScale();
if (timeScale > MAX_TIME_SCALE) {
timeScale = MAX_TIME_SCALE;
Expand All @@ -484,7 +493,9 @@ public BasicTypeDefine<MysqlType> reconvert(Column column) {
case TIMESTAMP:
builder.nativeType(MysqlType.DATETIME);
builder.dataType(MYSQL_DATETIME);
if (column.getScale() != null && column.getScale() > 0) {
if (version.isAtOrBefore(MySqlVersion.V_5_5)) {
builder.columnType(MYSQL_DATETIME);
} else if (column.getScale() != null && column.getScale() > 0) {
int timestampScale = column.getScale();
if (timestampScale > MAX_TIMESTAMP_SCALE) {
timestampScale = MAX_TIMESTAMP_SCALE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,19 @@

public class MySqlTypeMapper implements JdbcDialectTypeMapper {

private MySqlTypeConverter typeConverter;

public MySqlTypeMapper() {
this(MySqlTypeConverter.DEFAULT_INSTANCE);
}

public MySqlTypeMapper(MySqlTypeConverter typeConverter) {
this.typeConverter = typeConverter;
}

@Override
public Column mappingColumn(BasicTypeDefine typeDefine) {
return MySqlTypeConverter.INSTANCE.convert(typeDefine);
return typeConverter.convert(typeDefine);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;

public enum MySqlVersion {
V_5_5,
V_5_6,
V_5_7,
V_8;

public static MySqlVersion parse(String version) {
if (version != null) {
if (version.startsWith("5.5")) {
return V_5_5;
}
if (version.startsWith("5.6")) {
return V_5_6;
}
if (version.startsWith("5.7")) {
return V_5_7;
}
if (version.startsWith("8.0")) {
return V_8;
}
}
throw new UnsupportedOperationException("Unsupported MySQL version: " + version);
}

public boolean isBefore(MySqlVersion version) {
return this.compareTo(version) < 0;
}

public boolean isAtOrBefore(MySqlVersion version) {
return this.compareTo(version) <= 0;
}

public boolean isAfter(MySqlVersion version) {
return this.compareTo(version) > 0;
}

public boolean isAtOrAfter(MySqlVersion version) {
return this.compareTo(version) >= 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlCreateTableSqlBuilder;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -91,7 +92,8 @@ public void testBuild() {
"User table");

String createTableSql =
MysqlCreateTableSqlBuilder.builder(tablePath, catalogTable)
MysqlCreateTableSqlBuilder.builder(
tablePath, catalogTable, MySqlTypeConverter.DEFAULT_INSTANCE)
.build(DatabaseIdentifier.MYSQL);
// create table sql is change; The old unit tests are no longer applicable
String expect =
Expand Down
Loading
Loading