Skip to content

Commit

Permalink
[bugfix][cdc-base] Fix cdc base shutdown thread not cleared (#4327)
Browse files Browse the repository at this point in the history

---------

Co-authored-by: TaoZex <[email protected]>
  • Loading branch information
ic4y and TaoZex authored Mar 15, 2023
1 parent c6b633f commit ac61409
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public interface FetchTask<Split> {
/** Returns current task is running or not. */
boolean isRunning();

/** Close this task */
void shutdown();

/** Returns the split that the task used. */
Split getSplit();

Expand All @@ -63,5 +66,7 @@ interface Context {
void rewriteOutputBuffer(Map<Struct, SourceRecord> outputBuffer, SourceRecord changeRecord);

List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord> snapshotRecords);

void close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,20 @@ private void checkReadException() {
@Override
public void close() {
try {
if (taskContext != null) {
taskContext.close();
}
if (snapshotSplitReadTask != null) {
snapshotSplitReadTask.shutdown();
}
if (executorService != null) {
executorService.shutdown();
if (executorService.awaitTermination(
if (!executorService.awaitTermination(
READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
log.warn(
"Failed to close the scan fetcher in {} seconds.",
"Failed to close the scan fetcher in {} seconds. Service will execute force close(ExecutorService.shutdownNow)",
READER_CLOSE_TIMEOUT_SECONDS);
executorService.shutdownNow();
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,20 @@ private void checkReadException() {
@Override
public void close() {
try {
if (taskContext != null) {
taskContext.close();
}
if (streamFetchTask != null) {
streamFetchTask.shutdown();
}
if (executorService != null) {
executorService.shutdown();
if (executorService.awaitTermination(
if (!executorService.awaitTermination(
READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
log.warn(
"Failed to close the stream fetcher in {} seconds.",
"Failed to close the stream fetcher in {} seconds. Service will execute force close(ExecutorService.shutdownNow)",
READER_CLOSE_TIMEOUT_SECONDS);
executorService.shutdownNow();
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,13 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.TableDiscoveryUtils;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;

import java.sql.SQLException;
import java.util.List;

import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.isTableIdCaseSensitive;

/** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */
Expand Down Expand Up @@ -104,12 +100,7 @@ public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId ta
@Override
public MySqlSourceFetchTaskContext createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
final MySqlConnection jdbcConnection =
createMySqlConnection(taskSourceConfig.getDbzConfiguration());
final BinaryLogClient binaryLogClient =
createBinaryClient(taskSourceConfig.getDbzConfiguration());
return new MySqlSourceFetchTaskContext(
taskSourceConfig, this, jdbcConnection, binaryLogClient);
return new MySqlSourceFetchTaskContext(taskSourceConfig, this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,21 @@
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection;

/** The context for fetch task that fetching data of snapshot split from MySQL data source. */
@Slf4j
public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext {

private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceFetchTaskContext.class);
Expand All @@ -89,13 +95,10 @@ public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
private MySqlErrorHandler errorHandler;

public MySqlSourceFetchTaskContext(
JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dataSourceDialect,
MySqlConnection connection,
BinaryLogClient binaryLogClient) {
JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
super(sourceConfig, dataSourceDialect);
this.connection = connection;
this.binaryLogClient = binaryLogClient;
this.connection = createMySqlConnection(sourceConfig.getDbzConfiguration());
this.binaryLogClient = createBinaryClient(sourceConfig.getDbzConfiguration());
this.metadataProvider = new MySqlEventMetadataProvider();
}

Expand Down Expand Up @@ -159,6 +162,18 @@ public void configure(SourceSplitBase sourceSplitBase) {
this.errorHandler = new MySqlErrorHandler(connectorConfig.getLogicalName(), queue);
}

@Override
public void close() {
try {
this.connection.close();
this.binaryLogClient.disconnect();
} catch (SQLException e) {
log.warn("Failed to close connection", e);
} catch (IOException e) {
log.warn("Failed to close binaryLogClient", e);
}
}

@Override
public MySqlSourceConfig getSourceConfig() {
return (MySqlSourceConfig) sourceConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ public boolean isRunning() {
return taskRunning;
}

@Override
public void shutdown() {
taskRunning = false;
}

@Override
public SourceSplitBase getSplit() {
return split;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ public boolean isRunning() {
return taskRunning;
}

@Override
public void shutdown() {
taskRunning = false;
}

@Override
public SourceSplitBase getSplit() {
return split;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ public static String quote(TableId tableId) {
private static PreparedStatement initStatement(JdbcConnection jdbc, String sql, int fetchSize)
throws SQLException {
final Connection connection = jdbc.connection();
// Add MySQL metadata locks to prevent modification of table structure.
connection.setAutoCommit(false);
final PreparedStatement statement =
connection.prepareStatement(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.TableDiscoveryUtils;

import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
Expand Down Expand Up @@ -104,13 +103,8 @@ public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId ta
@Override
public SqlServerSourceFetchTaskContext createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
final SqlServerConnection jdbcConnection =
createSqlServerConnection(taskSourceConfig.getDbzConfiguration());
final SqlServerConnection metaDataConnection =
createSqlServerConnection(taskSourceConfig.getDbzConfiguration());

return new SqlServerSourceFetchTaskContext(
taskSourceConfig, this, jdbcConnection, metaDataConnection);
return new SqlServerSourceFetchTaskContext(taskSourceConfig, this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,18 @@
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
import lombok.extern.slf4j.Slf4j;

import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;

/** The context for fetch task that fetching data of snapshot split from MySQL data source. */
@Slf4j
public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext {

private final SqlServerConnection dataConnection;
Expand All @@ -83,13 +88,11 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext
private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics;

public SqlServerSourceFetchTaskContext(
JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dataSourceDialect,
SqlServerConnection dataConnection,
SqlServerConnection metadataConnection) {
JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
super(sourceConfig, dataSourceDialect);
this.dataConnection = dataConnection;
this.metadataConnection = metadataConnection;

this.dataConnection = createSqlServerConnection(sourceConfig.getDbzConfiguration());
this.metadataConnection = createSqlServerConnection(sourceConfig.getDbzConfiguration());
this.metadataProvider = new SqlServerEventMetadataProvider();
}

Expand Down Expand Up @@ -158,6 +161,16 @@ public void configure(SourceSplitBase sourceSplitBase) {
this.errorHandler = new SqlServerErrorHandler(connectorConfig.getLogicalName(), queue);
}

@Override
public void close() {
try {
this.dataConnection.close();
this.metadataConnection.close();
} catch (SQLException e) {
log.warn("Failed to close connection", e);
}
}

@Override
public SqlServerSourceConfig getSourceConfig() {
return (SqlServerSourceConfig) sourceConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ public boolean isRunning() {
return taskRunning;
}

@Override
public void shutdown() {
taskRunning = false;
}

@Override
public SourceSplitBase getSplit() {
return split;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public boolean isRunning() {
return taskRunning;
}

@Override
public void shutdown() {
taskRunning = false;
}

@Override
public SourceSplitBase getSplit() {
return split;
Expand Down

0 comments on commit ac61409

Please sign in to comment.