Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#26004] Make load-balance property configurable #171

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.debezium.connector.postgresql.snapshot.InitialSnapshotter;
import io.debezium.connector.postgresql.snapshot.NeverSnapshotter;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.connector.postgresql.transforms.yugabytedb.Pair;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
Expand Down Expand Up @@ -600,6 +601,7 @@ public static SchemaRefreshMode parse(String value) {
public static final Pattern YB_HOSTNAME_PATTERN = Pattern.compile("^[a-zA-Z0-9-_.,:]+$");
public static final int YB_DEFAULT_ERRORS_MAX_RETRIES = 60;
public static final long YB_DEFAULT_RETRIABLE_RESTART_WAIT = 30000L;
public static final boolean YB_DEFAULT_LOAD_BALANCE_CONNECTIONS = true;

public static final Field PORT = RelationalDatabaseConnectorConfig.PORT
.withDefault(DEFAULT_PORT);
Expand Down Expand Up @@ -694,6 +696,13 @@ public static SchemaRefreshMode parse(String value) {
.withDescription("Whether or not to take a consistent snapshot of the tables." +
"Disabling this option may result in duplication of some already snapshot data in the streaming phase.");

public static final Field YB_LOAD_BALANCE_CONNECTIONS = Field.create("yb.load.balance.connections")
.withDisplayName("YB load balance connections")
.withType(Type.BOOLEAN)
.withDefault(YB_DEFAULT_LOAD_BALANCE_CONNECTIONS)
.withImportance(Importance.LOW)
.withDescription("Whether or not to add load-balance property to connection url");

public static final Field MAX_RETRIES_ON_ERROR = Field.create(ERRORS_MAX_RETRIES)
.withDisplayName("The maximum number of retries")
.withType(Type.INT)
Expand Down Expand Up @@ -1216,6 +1225,10 @@ public boolean isYbConsistentSnapshotEnabled() {
return getConfig().getBoolean(YB_CONSISTENT_SNAPSHOT);
}

public boolean ybShouldLoadBalanceConnections() {
return getConfig().getBoolean(YB_LOAD_BALANCE_CONNECTIONS);
}

protected Snapshotter getSnapshotter() {
return this.snapshotMode.getSnapshotter(getConfig());
}
Expand Down Expand Up @@ -1314,6 +1327,7 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
SNAPSHOT_MODE,
SNAPSHOT_MODE_CLASS,
YB_CONSISTENT_SNAPSHOT,
YB_LOAD_BALANCE_CONNECTIONS,
PRIMARY_KEY_HASH_COLUMNS,
HSTORE_HANDLING_MODE,
BINARY_HANDLING_MODE,
Expand Down Expand Up @@ -1368,17 +1382,37 @@ private static int validateFlushLsnSource(Configuration config, Field field, Fie
}
return 0;
}

public static Pair<String, String> findAndReplaceLoadBalancePropertyValues(Boolean loadBalance) {
String multiHostUrl = PostgresConnection.MULTI_HOST_URL_PATTERN;
String singleHostUrl = PostgresConnection.URL_PATTERN;
String value = loadBalance.toString();

if (multiHostUrl.contains("${" + PostgresConnectorConfig.YB_LOAD_BALANCE_CONNECTIONS + "}")) {
multiHostUrl = multiHostUrl.replaceAll(
"\\$\\{" + PostgresConnectorConfig.YB_LOAD_BALANCE_CONNECTIONS + "\\}", value);
}

if (singleHostUrl.contains("${" + PostgresConnectorConfig.YB_LOAD_BALANCE_CONNECTIONS + "}")) {
singleHostUrl = singleHostUrl.replaceAll(
"\\$\\{" + PostgresConnectorConfig.YB_LOAD_BALANCE_CONNECTIONS + "\\}", value);
}

return new Pair<>(multiHostUrl, singleHostUrl);
}

/**
* Method to get the connection factory depending on the provided hostname value.
* @param hostName the host(s) for the PostgreSQL/YugabyteDB instance
* @return a {@link io.debezium.jdbc.JdbcConnection.ConnectionFactory} instance
*/
public static JdbcConnection.ConnectionFactory getConnectionFactory(String hostName) {
public static JdbcConnection.ConnectionFactory getConnectionFactory(String hostName, Boolean loadBalance) {
// The first string in the pair contains multi host URL pattern while the second string contains single host URL pattern.
Pair<String,String> urlPatterns = findAndReplaceLoadBalancePropertyValues(loadBalance);
return hostName.contains(":")
? JdbcConnection.patternBasedFactory(PostgresConnection.MULTI_HOST_URL_PATTERN, com.yugabyte.Driver.class.getName(),
? JdbcConnection.patternBasedFactory(urlPatterns.getFirst(), com.yugabyte.Driver.class.getName(),
PostgresConnection.class.getClassLoader(), JdbcConfiguration.PORT.withDefault(PostgresConnectorConfig.PORT.defaultValueAsString()))
: JdbcConnection.patternBasedFactory(PostgresConnection.URL_PATTERN, com.yugabyte.Driver.class.getName(),
: JdbcConnection.patternBasedFactory(urlPatterns.getSecond(), com.yugabyte.Driver.class.getName(),
PostgresConnection.class.getClassLoader(), JdbcConfiguration.PORT.withDefault(PostgresConnectorConfig.PORT.defaultValueAsString()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
}

final Charset databaseCharset;
try (PostgresConnection tempConnection = new PostgresConnection(connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_GENERAL)) {
try (PostgresConnection tempConnection = new PostgresConnection(connectorConfig.getJdbcConfig(),
PostgresConnection.CONNECTION_GENERAL, connectorConfig.ybShouldLoadBalanceConnections())) {
databaseCharset = tempConnection.getDatabaseCharset();
}

Expand All @@ -105,7 +106,8 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
typeRegistry);

MainConnectionProvidingConnectionFactory<PostgresConnection> connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>(
() -> new PostgresConnection(connectorConfig.getJdbcConfig(), valueConverterBuilder, PostgresConnection.CONNECTION_GENERAL));
() -> new PostgresConnection(connectorConfig.getJdbcConfig(), valueConverterBuilder,
PostgresConnection.CONNECTION_GENERAL, connectorConfig.ybShouldLoadBalanceConnections()));
// Global JDBC connection used both for snapshotting and streaming.
// Must be able to resolve datatypes.
jdbcConnection = connectionFactory.mainConnection();
Expand Down Expand Up @@ -214,8 +216,10 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
connectorConfig.createHeartbeat(
topicNamingStrategy,
schemaNameAdjuster,
() -> new PostgresConnection(connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_GENERAL),
exception -> {
() -> new PostgresConnection(connectorConfig.getJdbcConfig(),
PostgresConnection.CONNECTION_GENERAL,
connectorConfig.ybShouldLoadBalanceConnections()),
exception -> {
String sqlErrorId = exception.getSQLState();
switch (sqlErrorId) {
case "57P01":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,12 @@ protected void validateConnection(Map<String, ConfigValue> configValues, Configu
final PostgresConnectorConfig postgresConfig = new PostgresConnectorConfig(config);
final ConfigValue hostnameValue = configValues.get(RelationalDatabaseConnectorConfig.HOSTNAME.name());
// Try to connect to the database ...
try (PostgresConnection connection = new PostgresConnection(postgresConfig.getJdbcConfig(), PostgresConnection.CONNECTION_VALIDATE_CONNECTION)) {
try (PostgresConnection connection = new PostgresConnection(postgresConfig.getJdbcConfig(),
PostgresConnection.CONNECTION_VALIDATE_CONNECTION, postgresConfig.ybShouldLoadBalanceConnections())) {
try {
// Prepare connection without initial statement execution
connection.connection(false);
testConnection(connection);
testConnection(connection, postgresConfig.ybShouldLoadBalanceConnections());

// YB Note: This check validates that the WAL level is "logical" - skipping this
// since it is not applicable to YugabyteDB.
Expand All @@ -175,8 +176,9 @@ protected void validateConnection(Map<String, ConfigValue> configValues, Configu
checkLoginReplicationRoles(connection);
}
catch (SQLException e) {
LOGGER.error("Failed testing connection for {} with user '{}'", connection.connectionString(),
connection.username(), e);
LOGGER.error("Failed testing connection for {} with user '{}'",
connection.connectionString(postgresConfig.ybShouldLoadBalanceConnections()),
connection.username(), e);
hostnameValue.addErrorMessage("Error while validating connector config: " + e.getMessage());
}
}
Expand Down Expand Up @@ -236,9 +238,9 @@ private static void checkWalLevel(PostgresConnection connection, PostgresConnect
}
}

private static void testConnection(PostgresConnection connection) throws SQLException {
private static void testConnection(PostgresConnection connection, Boolean loadBalance) throws SQLException {
connection.execute("SELECT version()");
LOGGER.info("Successfully tested connection for {} with user '{}'", connection.connectionString(),
LOGGER.info("Successfully tested connection for {} with user '{}'", connection.connectionString(loadBalance),
connection.username());
}

Expand All @@ -251,7 +253,8 @@ protected Map<String, ConfigValue> validateAllFields(Configuration config) {
@Override
public List<TableId> getMatchingCollections(Configuration config) {
PostgresConnectorConfig connectorConfig = new PostgresConnectorConfig(config);
try (PostgresConnection connection = new PostgresConnection(connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_GENERAL)) {
try (PostgresConnection connection = new PostgresConnection(connectorConfig.getJdbcConfig(),
PostgresConnection.CONNECTION_GENERAL, connectorConfig.ybShouldLoadBalanceConnections())) {
return connection.readTableNames(connectorConfig.databaseName(), null, null, new String[]{ "TABLE" }).stream()
.filter(tableId -> connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.YugabyteDBServer;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.transforms.yugabytedb.Pair;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
Expand Down Expand Up @@ -71,12 +72,18 @@ public class PostgresConnection extends JdbcConnection {
private static final Pattern EXPRESSION_DEFAULT_PATTERN = Pattern.compile("\\(+(?:.+(?:[+ - * / < > = ~ ! @ # % ^ & | ` ?] ?.+)+)+\\)");
private static Logger LOGGER = LoggerFactory.getLogger(PostgresConnection.class);

public static final String MULTI_HOST_URL_PATTERN = "jdbc:yugabytedb://${" + JdbcConfiguration.HOSTNAME + "}/${" + JdbcConfiguration.DATABASE + "}?load-balance=true";
public static final String MULTI_HOST_URL_PATTERN = "jdbc:yugabytedb://${" + JdbcConfiguration.HOSTNAME + "}/${"
+ JdbcConfiguration.DATABASE + "}?load-balance=${" + PostgresConnectorConfig.YB_LOAD_BALANCE_CONNECTIONS
+ "}";
public static final String URL_PATTERN = "jdbc:yugabytedb://${" + JdbcConfiguration.HOSTNAME + "}:${"
+ JdbcConfiguration.PORT + "}/${" + JdbcConfiguration.DATABASE + "}";
+ JdbcConfiguration.PORT + "}/${" + JdbcConfiguration.DATABASE + "}?load-balance=${"
+ PostgresConnectorConfig.YB_LOAD_BALANCE_CONNECTIONS
+ "}";
protected static ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory(URL_PATTERN,
com.yugabyte.Driver.class.getName(),
PostgresConnection.class.getClassLoader(), JdbcConfiguration.PORT.withDefault(PostgresConnectorConfig.PORT.defaultValueAsString()));
PostgresConnection.class.getClassLoader(),
JdbcConfiguration.PORT.withDefault(PostgresConnectorConfig.PORT.defaultValueAsString()),
PostgresConnectorConfig.YB_LOAD_BALANCE_CONNECTIONS);

/**
* Obtaining a replication slot may fail if there's a pending transaction. We're retrying to get a slot for 30 min.
Expand Down Expand Up @@ -124,8 +131,10 @@ public PostgresConnection(JdbcConfiguration config, PostgresValueConverterBuilde
}
}

public PostgresConnection(JdbcConfiguration config, PostgresValueConverterBuilder valueConverterBuilder, String connectionUsage) {
this(config, valueConverterBuilder, connectionUsage, PostgresConnectorConfig.getConnectionFactory(config.getHostname()));
public PostgresConnection(JdbcConfiguration config, PostgresValueConverterBuilder valueConverterBuilder,
String connectionUsage, Boolean loadBalance) {
this(config, valueConverterBuilder, connectionUsage,
PostgresConnectorConfig.getConnectionFactory(config.getHostname(), loadBalance));
}

/**
Expand All @@ -150,8 +159,10 @@ public PostgresConnection(PostgresConnectorConfig config, TypeRegistry typeRegis
this.jdbcConfig = config.getJdbcConfig();
}

public PostgresConnection(PostgresConnectorConfig config, TypeRegistry typeRegistry, String connectionUsage) {
this(config, typeRegistry, connectionUsage, PostgresConnectorConfig.getConnectionFactory(config.getJdbcConfig().getHostname()));
public PostgresConnection(PostgresConnectorConfig config, TypeRegistry typeRegistry, String connectionUsage,
Boolean loadBalance) {
this(config, typeRegistry, connectionUsage,
PostgresConnectorConfig.getConnectionFactory(config.getJdbcConfig().getHostname(), loadBalance));
}

/**
Expand All @@ -161,8 +172,8 @@ public PostgresConnection(PostgresConnectorConfig config, TypeRegistry typeRegis
* @param config {@link Configuration} instance, may not be null.
* @param connectionUsage a symbolic name of the connection to be tracked in monitoring tools
*/
public PostgresConnection(JdbcConfiguration config, String connectionUsage) {
this(config, null, connectionUsage);
public PostgresConnection(JdbcConfiguration config, String connectionUsage, Boolean loadBalance) {
this(config, null, connectionUsage, loadBalance);
}

static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration, String connectionUsage) {
Expand All @@ -178,12 +189,14 @@ static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration, Str
*
* @return a {@code String} where the variables in {@code urlPattern} are replaced with values from the configuration
*/
public String connectionString() {
public String connectionString(Boolean loadBalance) {
Pair<String, String> urlPatterns = PostgresConnectorConfig
.findAndReplaceLoadBalancePropertyValues(loadBalance);
String hostName = jdbcConfig.getHostname();
if (hostName.contains(":")) {
return connectionString(MULTI_HOST_URL_PATTERN);
return connectionString(urlPatterns.getFirst());
} else {
return connectionString(URL_PATTERN);
return connectionString(urlPatterns.getSecond());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ private static JdbcConfiguration addDefaultSettings(JdbcConfiguration configurat
}

private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, InterruptedException {
try (PostgresConnection connection = new PostgresConnection(connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_SLOT_INFO)) {
try (PostgresConnection connection = new PostgresConnection(connectorConfig.getJdbcConfig(),
PostgresConnection.CONNECTION_SLOT_INFO, connectorConfig.ybShouldLoadBalanceConnections())) {
return connection.readReplicationSlotInfo(slotName, plugin.getPostgresPluginName());
}
}
Expand Down Expand Up @@ -807,7 +808,8 @@ public synchronized void close(boolean dropSlot) {
}
if (dropSlotOnClose && dropSlot) {
// we're dropping the replication slot via a regular - i.e. not a replication - connection
try (PostgresConnection connection = new PostgresConnection(connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_DROP_SLOT)) {
try (PostgresConnection connection = new PostgresConnection(connectorConfig.getJdbcConfig(),
PostgresConnection.CONNECTION_DROP_SLOT, connectorConfig.ybShouldLoadBalanceConnections())) {
connection.dropReplicationSlot(slotName);
}
catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.TableId;
Expand Down Expand Up @@ -52,7 +53,7 @@ public QueryInformationSchemaMetadata(Configuration config) {
connection = new PostgresConnection(
JdbcConfiguration.adapt(config.subset(CommonConnectorConfig.DATABASE_CONFIG_PREFIX, true)
.merge(config.subset(CommonConnectorConfig.DRIVER_CONFIG_PREFIX, true))),
"Debezium TimescaleDB metadata");
"Debezium TimescaleDB metadata", config.getBoolean(PostgresConnectorConfig.YB_LOAD_BALANCE_CONNECTIONS));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ public void shouldValidateConfiguration() throws Exception {
validateConfigField(validatedConfig, PostgresConnectorConfig.LOGICAL_DECODING_MESSAGE_PREFIX_EXCLUDE_LIST, null);
validateConfigField(validatedConfig, PostgresConnectorConfig.LOGICAL_DECODING_MESSAGE_PREFIX_INCLUDE_LIST, null);
validateConfigField(validatedConfig, PostgresConnectorConfig.YB_CONSISTENT_SNAPSHOT, Boolean.TRUE);
validateConfigField(validatedConfig, PostgresConnectorConfig.YB_LOAD_BALANCE_CONNECTIONS, Boolean.TRUE);
}

@Test
Expand Down
Loading