Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature][connector] add mysql cdc reader #3455

Merged
merged 2 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connec
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/ from https://github.com/apache/flink
seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/ from https://github.com/apache/iceberg
seatunnel-connectors-v2/connector-cdc/connector-base/src/main/java/org/apache/seatunnel/connectors/cdc/base from https://github.com/ververica/flink-cdc-connectors
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql from https://github.com/ververica/flink-cdc-connectors
seatunnel-connectors-v2/connector-cdc/connector-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium from https://github.com/ververica/flink-cdc-connectors
generate_client_protocol.sh from https://github.com/hazelcast/hazelcast
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java from https://github.com/hazelcast/hazelcast
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
<artifactId>connector-cdc-base</artifactId>

<properties>
<debezium.version>1.6.4.Final</debezium.version>
<hikaricp.version>4.0.3</hikaricp.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.seatunnel.connectors.cdc.base.config;

import org.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.seatunnel.connectors.cdc.base.option.SourceOptions;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/** A {@link SourceConfig.Factory} to provide {@link SourceConfig} of JDBC data source. */
@SuppressWarnings("checkstyle:MagicNumber")
public abstract class JdbcSourceConfigFactory implements SourceConfig.Factory<JdbcSourceConfig> {

private static final long serialVersionUID = 1L;

protected int port;
protected String hostname;
protected String username;
protected String password;
protected List<String> databaseList;
protected List<String> tableList;
protected StartupConfig startupConfig;
protected StopConfig stopConfig;
protected boolean includeSchemaChanges = false;
protected double distributionFactorUpper = 1000.0d;
protected double distributionFactorLower = 0.05d;
protected int splitSize = SourceOptions.SNAPSHOT_SPLIT_SIZE.defaultValue();
protected int fetchSize = SourceOptions.SNAPSHOT_FETCH_SIZE.defaultValue();
protected String serverTimeZone = JdbcSourceOptions.SERVER_TIME_ZONE.defaultValue();
protected Duration connectTimeout = JdbcSourceOptions.CONNECT_TIMEOUT.defaultValue();
protected int connectMaxRetries = JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue();
protected int connectionPoolSize = JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue();
protected Properties dbzProperties;

/** Integer port number of the database server. */
public JdbcSourceConfigFactory hostname(String hostname) {
this.hostname = hostname;
return this;
}

/** Integer port number of the database server. */
public JdbcSourceConfigFactory port(int port) {
this.port = port;
return this;
}

/**
* An optional list of regular expressions that match database names to be monitored; any
* database name not included in the whitelist will be excluded from monitoring. By default all
* databases will be monitored.
*/
public JdbcSourceConfigFactory databaseList(String... databaseList) {
this.databaseList = Arrays.asList(databaseList);
return this;
}

/**
* An optional list of regular expressions that match fully-qualified table identifiers for
* tables to be monitored; any table not included in the list will be excluded from monitoring.
* Each identifier is of the form databaseName.tableName. by default the connector will monitor
* every non-system table in each monitored database.
*/
public JdbcSourceConfigFactory tableList(String... tableList) {
this.tableList = Arrays.asList(tableList);
return this;
}

/** Name of the user to use when connecting to the database server. */
public JdbcSourceConfigFactory username(String username) {
this.username = username;
return this;
}

/** Password to use when connecting to the database server. */
public JdbcSourceConfigFactory password(String password) {
this.password = password;
return this;
}

/**
* The session time zone in database server, e.g. "America/Los_Angeles". It controls how the
* TIMESTAMP type converted to STRING. See more
* https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types
*/
public JdbcSourceConfigFactory serverTimeZone(String timeZone) {
this.serverTimeZone = timeZone;
return this;
}

/**
* The split size (number of rows) of table snapshot, captured tables are split into multiple
* splits when read the snapshot of table.
*/
public JdbcSourceConfigFactory splitSize(int splitSize) {
this.splitSize = splitSize;
return this;
}

/**
* The upper bound of split key evenly distribution factor, the factor is used to determine
* whether the table is evenly distribution or not.
*/
public JdbcSourceConfigFactory distributionFactorUpper(double distributionFactorUpper) {
this.distributionFactorUpper = distributionFactorUpper;
return this;
}

/**
* The lower bound of split key evenly distribution factor, the factor is used to determine
* whether the table is evenly distribution or not.
*/
public JdbcSourceConfigFactory distributionFactorLower(double distributionFactorLower) {
this.distributionFactorLower = distributionFactorLower;
return this;
}

/** The maximum fetch size for per poll when read table snapshot. */
public JdbcSourceConfigFactory fetchSize(int fetchSize) {
this.fetchSize = fetchSize;
return this;
}

/**
* The maximum time that the connector should wait after trying to connect to the database
* server before timing out.
*/
public JdbcSourceConfigFactory connectTimeout(Duration connectTimeout) {
this.connectTimeout = connectTimeout;
return this;
}

/** The connection pool size. */
public JdbcSourceConfigFactory connectionPoolSize(int connectionPoolSize) {
this.connectionPoolSize = connectionPoolSize;
return this;
}

/** The max retry times to get connection. */
public JdbcSourceConfigFactory connectMaxRetries(int connectMaxRetries) {
this.connectMaxRetries = connectMaxRetries;
return this;
}

/** Whether the {@link SourceConfig} should output the schema changes or not. */
public JdbcSourceConfigFactory includeSchemaChanges(boolean includeSchemaChanges) {
this.includeSchemaChanges = includeSchemaChanges;
return this;
}

/** The Debezium connector properties. For example, "snapshot.mode". */
public JdbcSourceConfigFactory debeziumProperties(Properties properties) {
this.dbzProperties = properties;
return this;
}

/** Specifies the startup options. */
public JdbcSourceConfigFactory startupOptions(StartupConfig startupConfig) {
this.startupConfig = startupConfig;
return this;
}

@Override
public abstract JdbcSourceConfig create(int subtask);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.seatunnel.connectors.cdc.base.option;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

import org.seatunnel.connectors.cdc.base.source.IncrementalSource;

import java.time.Duration;

/** Configurations for {@link IncrementalSource} of JDBC data source. */
@SuppressWarnings("checkstyle:MagicNumber")
public class JdbcSourceOptions extends SourceOptions {

public static final Option<String> HOSTNAME =
Options.key("hostname")
.stringType()
.noDefaultValue()
.withDescription("IP address or hostname of the database server.");

public static final Option<Integer> PORT =
Options.key("port")
.intType()
.defaultValue(3306)
.withDescription("Integer port number of the database server.");

public static final Option<String> USERNAME =
Options.key("username")
.stringType()
.noDefaultValue()
.withDescription(
"Name of the database to use when connecting to the database server.");

public static final Option<String> PASSWORD =
Options.key("password")
.stringType()
.noDefaultValue()
.withDescription("Password to use when connecting to the database server.");

public static final Option<String> DATABASE_NAME =
Options.key("database-name")
.stringType()
.noDefaultValue()
.withDescription("Database name of the database to monitor.");

public static final Option<String> TABLE_NAME =
Options.key("table-name")
.stringType()
.noDefaultValue()
.withDescription("Table name of the database to monitor.");

public static final Option<String> SERVER_TIME_ZONE =
Options.key("server-time-zone")
.stringType()
.defaultValue("UTC")
.withDescription("The session time zone in database server.");

public static final Option<String> SERVER_ID =
Options.key("server-id")
.stringType()
.noDefaultValue()
.withDescription(
"A numeric ID or a numeric ID range of this database client, "
+ "The numeric ID syntax is like '5400', the numeric ID range syntax "
+ "is like '5400-5408', The numeric ID range syntax is recommended when "
+ "'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all "
+ "currently-running database processes in the MySQL cluster. This connector"
+ " joins the MySQL cluster as another server (with this unique ID) "
+ "so it can read the binlog. By default, a random number is generated between"
+ " 5400 and 6400, though we recommend setting an explicit value.");

public static final Option<Duration> CONNECT_TIMEOUT =
Options.key("connect.timeout")
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription(
"The maximum time that the connector should wait after trying to connect to the database server before timing out.");

public static final Option<Integer> CONNECTION_POOL_SIZE =
Options.key("connection.pool.size")
.intType()
.defaultValue(20)
.withDescription("The connection pool size.");

public static final Option<Integer> CONNECT_MAX_RETRIES =
Options.key("connect.max-retries")
.intType()
.defaultValue(3)
.withDescription(
"The max retry times that the connector should retry to build database server connection.");
}
Loading