From cdd179e7bf94a2271d063b7b94efe1c4bc83b110 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Tue, 26 Nov 2024 22:21:57 +0530 Subject: [PATCH 1/4] changes --- .../postgresql/PostgresConnectorConfig.java | 23 ++++++++++++ .../postgresql/PostgresConnectorTask.java | 36 +++++++++++-------- 2 files changed, 45 insertions(+), 14 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 7d70a05260a..a4dba8a0299 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -541,6 +541,8 @@ public static SchemaRefreshMode parse(String value) { protected static final int DEFAULT_PORT = 5_433; protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 10_240; protected static final int DEFAULT_MAX_RETRIES = 6; + protected static final long DEFAULT_RETRIABLE_RESTART_EXPONENTIAL_DELAY_MIN_MS = 5_000; + protected static final long DEFAULT_RETRIABLE_RESTART_EXPONENTIAL_DELAY_MAX_MS = 120_000; public static final Pattern YB_HOSTNAME_PATTERN = Pattern.compile("^[a-zA-Z0-9-_.,:]+$"); public static final Field PORT = RelationalDatabaseConnectorConfig.PORT @@ -1010,6 +1012,19 @@ public static AutoCreateMode parse(String value, String defaultValue) { return 0; }); + public static final Field RETRIABLE_RESTART_EXPONENTIAL_DELAY_MIN_MS = Field.create("retriable.restart.exponential.delay.min.ms") + .withDisplayName("Minimum delay in case of retries for exponential backoff") + .withType(Type.LONG) + .withImportance(Importance.LOW) + .withDescription("The minimum delay between retry when following exponential backoff") + .withDefault(DEFAULT_RETRIABLE_RESTART_EXPONENTIAL_DELAY_MIN_MS); + public static final Field RETRIABLE_RESTART_EXPONENTIAL_DELAY_MAX_MS = Field.create("retriable.restart.exponential.delay.max.ms") + .withDisplayName("Maximum delay in case of retries for exponential backoff") + .withType(Type.LONG) + .withImportance(Importance.LOW) + .withDescription("The maximum delay between retry when following exponential backoff") + .withDefault(DEFAULT_RETRIABLE_RESTART_EXPONENTIAL_DELAY_MAX_MS); + private final LogicalDecodingMessageFilter logicalDecodingMessageFilter; private final HStoreHandlingMode hStoreHandlingMode; private final IntervalHandlingMode intervalHandlingMode; @@ -1143,6 +1158,14 @@ public String primaryKeyHashColumns() { return getConfig().getString(PRIMARY_KEY_HASH_COLUMNS); } + public long retriableRestartExponentialDelayMinMs() { + return getConfig().getLong(RETRIABLE_RESTART_EXPONENTIAL_DELAY_MIN_MS); + } + + public long retriableRestartExponentialDelayMaxMs() { + return getConfig().getLong(RETRIABLE_RESTART_EXPONENTIAL_DELAY_MAX_MS); + } + @Override public byte[] getUnavailableValuePlaceholder() { String placeholder = getConfig().getString(UNAVAILABLE_VALUE_PLACEHOLDER); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java index 99eaca22b00..b21090f346a 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java @@ -72,8 +72,18 @@ public class PostgresConnectorTask extends BaseSourceTask start(Configuration config) { + final PostgresConnectorConfig connectorConfig = new PostgresConnectorConfig(config); + queue = new ChangeEventQueue.Builder() + .pollInterval(connectorConfig.getPollInterval()) + .maxBatchSize(connectorConfig.getMaxBatchSize()) + .maxQueueSize(connectorConfig.getMaxQueueSize()) + .maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes()) + .loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME)) + .build(); + + errorHandler = new PostgresErrorHandler(connectorConfig, queue, errorHandler); + try { - final PostgresConnectorConfig connectorConfig = new PostgresConnectorConfig(config); final TopicNamingStrategy topicNamingStrategy = connectorConfig.getTopicNamingStrategy(CommonConnectorConfig.TOPIC_NAMING_STRATEGY); final Snapshotter snapshotter = connectorConfig.getSnapshotter(); final SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjuster(); @@ -182,15 +192,15 @@ public ChangeEventSourceCoordinator st throw new DebeziumException(e); } - queue = new ChangeEventQueue.Builder() - .pollInterval(connectorConfig.getPollInterval()) - .maxBatchSize(connectorConfig.getMaxBatchSize()) - .maxQueueSize(connectorConfig.getMaxQueueSize()) - .maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes()) - .loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME)) - .build(); - - errorHandler = new PostgresErrorHandler(connectorConfig, queue, errorHandler); +// queue = new ChangeEventQueue.Builder() +// .pollInterval(connectorConfig.getPollInterval()) +// .maxBatchSize(connectorConfig.getMaxBatchSize()) +// .maxQueueSize(connectorConfig.getMaxQueueSize()) +// .maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes()) +// .loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME)) +// .build(); +// +// errorHandler = new PostgresErrorHandler(connectorConfig, queue, errorHandler); final PostgresEventMetadataProvider metadataProvider = new PostgresEventMetadataProvider(); @@ -265,11 +275,9 @@ public ChangeEventSourceCoordinator st } } catch (Exception exception) { - // YB Note: Catch all the exceptions and retry. - LOGGER.warn("Received exception, will be retrying", exception); - throw new RetriableException(exception); + LOGGER.warn("Received exception, will be setting producer throwable", exception); + errorHandler.setProducerThrowable(new RetriableException(exception)); } - } public ReplicationConnection createReplicationConnection(PostgresTaskContext taskContext, int maxRetries, Duration retryDelay) From e23e3e6017a95b59b6cbc1a1172ff6e5e4f1cb44 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 27 Nov 2024 11:27:48 +0000 Subject: [PATCH 2/4] addressed review comments --- .../postgresql/PostgresConnectorConfig.java | 23 ------------------- .../postgresql/PostgresConnectorTask.java | 17 ++++++-------- 2 files changed, 7 insertions(+), 33 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index a4dba8a0299..7d70a05260a 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -541,8 +541,6 @@ public static SchemaRefreshMode parse(String value) { protected static final int DEFAULT_PORT = 5_433; protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 10_240; protected static final int DEFAULT_MAX_RETRIES = 6; - protected static final long DEFAULT_RETRIABLE_RESTART_EXPONENTIAL_DELAY_MIN_MS = 5_000; - protected static final long DEFAULT_RETRIABLE_RESTART_EXPONENTIAL_DELAY_MAX_MS = 120_000; public static final Pattern YB_HOSTNAME_PATTERN = Pattern.compile("^[a-zA-Z0-9-_.,:]+$"); public static final Field PORT = RelationalDatabaseConnectorConfig.PORT @@ -1012,19 +1010,6 @@ public static AutoCreateMode parse(String value, String defaultValue) { return 0; }); - public static final Field RETRIABLE_RESTART_EXPONENTIAL_DELAY_MIN_MS = Field.create("retriable.restart.exponential.delay.min.ms") - .withDisplayName("Minimum delay in case of retries for exponential backoff") - .withType(Type.LONG) - .withImportance(Importance.LOW) - .withDescription("The minimum delay between retry when following exponential backoff") - .withDefault(DEFAULT_RETRIABLE_RESTART_EXPONENTIAL_DELAY_MIN_MS); - public static final Field RETRIABLE_RESTART_EXPONENTIAL_DELAY_MAX_MS = Field.create("retriable.restart.exponential.delay.max.ms") - .withDisplayName("Maximum delay in case of retries for exponential backoff") - .withType(Type.LONG) - .withImportance(Importance.LOW) - .withDescription("The maximum delay between retry when following exponential backoff") - .withDefault(DEFAULT_RETRIABLE_RESTART_EXPONENTIAL_DELAY_MAX_MS); - private final LogicalDecodingMessageFilter logicalDecodingMessageFilter; private final HStoreHandlingMode hStoreHandlingMode; private final IntervalHandlingMode intervalHandlingMode; @@ -1158,14 +1143,6 @@ public String primaryKeyHashColumns() { return getConfig().getString(PRIMARY_KEY_HASH_COLUMNS); } - public long retriableRestartExponentialDelayMinMs() { - return getConfig().getLong(RETRIABLE_RESTART_EXPONENTIAL_DELAY_MIN_MS); - } - - public long retriableRestartExponentialDelayMaxMs() { - return getConfig().getLong(RETRIABLE_RESTART_EXPONENTIAL_DELAY_MAX_MS); - } - @Override public byte[] getUnavailableValuePlaceholder() { String placeholder = getConfig().getString(UNAVAILABLE_VALUE_PLACEHOLDER); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java index b21090f346a..c9065736b93 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java @@ -192,16 +192,6 @@ public ChangeEventSourceCoordinator st throw new DebeziumException(e); } -// queue = new ChangeEventQueue.Builder() -// .pollInterval(connectorConfig.getPollInterval()) -// .maxBatchSize(connectorConfig.getMaxBatchSize()) -// .maxQueueSize(connectorConfig.getMaxQueueSize()) -// .maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes()) -// .loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME)) -// .build(); -// -// errorHandler = new PostgresErrorHandler(connectorConfig, queue, errorHandler); - final PostgresEventMetadataProvider metadataProvider = new PostgresEventMetadataProvider(); SignalProcessor signalProcessor = new SignalProcessor<>( @@ -277,6 +267,13 @@ public ChangeEventSourceCoordinator st catch (Exception exception) { LOGGER.warn("Received exception, will be setting producer throwable", exception); errorHandler.setProducerThrowable(new RetriableException(exception)); + + if (errorHandler.getRetries() == connectorConfig.getMaxRetriesOnError()) { + throw new ConnectException("Maximum number of retries attempted, manually restart " + + "the connector after fixing the error", exception); + } else { + throw new RetriableException(exception); + } } } From bca9b316109eee2c45509a3772f4361c06ad927c Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Thu, 28 Nov 2024 06:45:21 +0000 Subject: [PATCH 3/4] addressed review comments --- Dockerfile | 2 +- .../postgresql/PostgresConnectorConfig.java | 24 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index b0464c0277c..fde22bcb377 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,7 +17,7 @@ RUN rm -rf debezium-connector-vitess WORKDIR / # Copy the Debezium Connector for Postgres adapted for YugabyteDB -COPY debezium-connector-postgres/target/debezium-connector-yugabytedb-*.jar $KAFKA_CONNECT_PLUGINS_DIR/debezium-connector-postgres +COPY debezium-connector-postgres/target/debezium-connector-yugabytedb-*.jar $KAFKA_CONNECT_PLUGINS_DIR/debezium-connector-postgres/ # Set the TLS version to be used by Kafka processes ENV KAFKA_OPTS="-Djdk.tls.client.protocols=TLSv1.2 -javaagent:/kafka/etc/jmx_prometheus_javaagent-0.17.2.jar=8080:/kafka/etc/jmx-exporter/metrics.yml" diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 7d70a05260a..162bd585ad8 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -542,6 +542,8 @@ public static SchemaRefreshMode parse(String value) { protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 10_240; protected static final int DEFAULT_MAX_RETRIES = 6; public static final Pattern YB_HOSTNAME_PATTERN = Pattern.compile("^[a-zA-Z0-9-_.,:]+$"); + public static final int YB_DEFAULT_ERRORS_MAX_RETRIES = 90; + public static final long YB_DEFAULT_RETRIABLE_RESTART_WAIT = 20000L; public static final Field PORT = RelationalDatabaseConnectorConfig.PORT .withDefault(DEFAULT_PORT); @@ -628,6 +630,28 @@ public static SchemaRefreshMode parse(String value) { .withDescription("Whether or not to take a consistent snapshot of the tables." + "Disabling this option may result in duplication of some already snapshot data in the streaming phase."); + public static final Field MAX_RETRIES_ON_ERROR = Field.create(ERRORS_MAX_RETRIES) + .withDisplayName("The maximum number of retries") + .withType(Type.INT) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 24)) + .withWidth(Width.MEDIUM) + .withImportance(Importance.LOW) + .withDefault(YB_DEFAULT_ERRORS_MAX_RETRIES) + .withValidation(Field::isInteger) + .withDescription( + "The maximum number of retries on connection errors before failing (-1 = no limit, 0 = disabled, > 0 = num of retries)."); + + public static final Field RETRIABLE_RESTART_WAIT = Field.create("retriable.restart.connector.wait.ms") + .withDisplayName("Retriable restart wait (ms)") + .withType(Type.LONG) + .withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 18)) + .withWidth(Width.MEDIUM) + .withImportance(Importance.LOW) + .withDefault(YB_DEFAULT_RETRIABLE_RESTART_WAIT) + .withDescription( + "Time to wait before restarting connector after retriable exception occurs. Defaults to " + YB_DEFAULT_RETRIABLE_RESTART_WAIT + "ms.") + .withValidation(Field::isPositiveLong); + public enum AutoCreateMode implements EnumeratedValue { /** * No Publication will be created, it's expected the user From 08a7a071e0d648c5b4a707cacef52683fb05576c Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Thu, 28 Nov 2024 08:49:33 +0000 Subject: [PATCH 4/4] change defailt values --- .../connector/postgresql/PostgresConnectorConfig.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 162bd585ad8..74c50aed43a 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -542,8 +542,8 @@ public static SchemaRefreshMode parse(String value) { protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 10_240; protected static final int DEFAULT_MAX_RETRIES = 6; public static final Pattern YB_HOSTNAME_PATTERN = Pattern.compile("^[a-zA-Z0-9-_.,:]+$"); - public static final int YB_DEFAULT_ERRORS_MAX_RETRIES = 90; - public static final long YB_DEFAULT_RETRIABLE_RESTART_WAIT = 20000L; + public static final int YB_DEFAULT_ERRORS_MAX_RETRIES = 60; + public static final long YB_DEFAULT_RETRIABLE_RESTART_WAIT = 30000L; public static final Field PORT = RelationalDatabaseConnectorConfig.PORT .withDefault(DEFAULT_PORT);