Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DBZ-PGYB] Restrict retries when retry count reaches limit #166

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ RUN rm -rf debezium-connector-vitess
WORKDIR /

# Copy the Debezium Connector for Postgres adapted for YugabyteDB
COPY debezium-connector-postgres/target/debezium-connector-yugabytedb-*.jar $KAFKA_CONNECT_PLUGINS_DIR/debezium-connector-postgres
COPY debezium-connector-postgres/target/debezium-connector-yugabytedb-*.jar $KAFKA_CONNECT_PLUGINS_DIR/debezium-connector-postgres/

# Set the TLS version to be used by Kafka processes
ENV KAFKA_OPTS="-Djdk.tls.client.protocols=TLSv1.2 -javaagent:/kafka/etc/jmx_prometheus_javaagent-0.17.2.jar=8080:/kafka/etc/jmx-exporter/metrics.yml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,8 @@ public static SchemaRefreshMode parse(String value) {
protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 10_240;
protected static final int DEFAULT_MAX_RETRIES = 6;
public static final Pattern YB_HOSTNAME_PATTERN = Pattern.compile("^[a-zA-Z0-9-_.,:]+$");
public static final int YB_DEFAULT_ERRORS_MAX_RETRIES = 60;
public static final long YB_DEFAULT_RETRIABLE_RESTART_WAIT = 30000L;

public static final Field PORT = RelationalDatabaseConnectorConfig.PORT
.withDefault(DEFAULT_PORT);
Expand Down Expand Up @@ -628,6 +630,28 @@ public static SchemaRefreshMode parse(String value) {
.withDescription("Whether or not to take a consistent snapshot of the tables." +
"Disabling this option may result in duplication of some already snapshot data in the streaming phase.");

public static final Field MAX_RETRIES_ON_ERROR = Field.create(ERRORS_MAX_RETRIES)
.withDisplayName("The maximum number of retries")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 24))
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDefault(YB_DEFAULT_ERRORS_MAX_RETRIES)
.withValidation(Field::isInteger)
.withDescription(
"The maximum number of retries on connection errors before failing (-1 = no limit, 0 = disabled, > 0 = num of retries).");

public static final Field RETRIABLE_RESTART_WAIT = Field.create("retriable.restart.connector.wait.ms")
.withDisplayName("Retriable restart wait (ms)")
.withType(Type.LONG)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 18))
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDefault(YB_DEFAULT_RETRIABLE_RESTART_WAIT)
.withDescription(
"Time to wait before restarting connector after retriable exception occurs. Defaults to " + YB_DEFAULT_RETRIABLE_RESTART_WAIT + "ms.")
.withValidation(Field::isPositiveLong);

public enum AutoCreateMode implements EnumeratedValue {
/**
* No Publication will be created, it's expected the user
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,18 @@ public class PostgresConnectorTask extends BaseSourceTask<PostgresPartition, Pos

@Override
public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> start(Configuration config) {
final PostgresConnectorConfig connectorConfig = new PostgresConnectorConfig(config);
queue = new ChangeEventQueue.Builder<DataChangeEvent>()
.pollInterval(connectorConfig.getPollInterval())
.maxBatchSize(connectorConfig.getMaxBatchSize())
.maxQueueSize(connectorConfig.getMaxQueueSize())
.maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
.loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME))
.build();

errorHandler = new PostgresErrorHandler(connectorConfig, queue, errorHandler);

try {
final PostgresConnectorConfig connectorConfig = new PostgresConnectorConfig(config);
final TopicNamingStrategy<TableId> topicNamingStrategy = connectorConfig.getTopicNamingStrategy(CommonConnectorConfig.TOPIC_NAMING_STRATEGY);
final Snapshotter snapshotter = connectorConfig.getSnapshotter();
final SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjuster();
Expand Down Expand Up @@ -182,16 +192,6 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
throw new DebeziumException(e);
}

queue = new ChangeEventQueue.Builder<DataChangeEvent>()
.pollInterval(connectorConfig.getPollInterval())
.maxBatchSize(connectorConfig.getMaxBatchSize())
.maxQueueSize(connectorConfig.getMaxQueueSize())
.maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
.loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME))
.build();

errorHandler = new PostgresErrorHandler(connectorConfig, queue, errorHandler);

final PostgresEventMetadataProvider metadataProvider = new PostgresEventMetadataProvider();

SignalProcessor<PostgresPartition, PostgresOffsetContext> signalProcessor = new SignalProcessor<>(
Expand Down Expand Up @@ -265,11 +265,16 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
}
}
catch (Exception exception) {
// YB Note: Catch all the exceptions and retry.
LOGGER.warn("Received exception, will be retrying", exception);
throw new RetriableException(exception);
LOGGER.warn("Received exception, will be setting producer throwable", exception);
errorHandler.setProducerThrowable(new RetriableException(exception));

if (errorHandler.getRetries() == connectorConfig.getMaxRetriesOnError()) {
throw new ConnectException("Maximum number of retries attempted, manually restart "
+ "the connector after fixing the error", exception);
} else {
throw new RetriableException(exception);
}
}

}

public ReplicationConnection createReplicationConnection(PostgresTaskContext taskContext, int maxRetries, Duration retryDelay)
Expand Down