From 95598cb17c7a3f21fa3c741bc996ba81488cd2e1 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Mon, 29 Aug 2022 15:11:42 +0200 Subject: [PATCH 1/5] Merge upstream changes --- Dockerfile | 4 +- kafka-connect-jdbc/README.md | 1 + kafka-connect-jdbc/pom.xml | 28 ++- .../connect/jdbc/JdbcSinkConnector.java | 37 +++- .../connect/jdbc/JdbcSourceConnector.java | 24 ++- .../connect/jdbc/dialect/DatabaseDialect.java | 15 ++ .../jdbc/dialect/GenericDatabaseDialect.java | 29 ++- .../jdbc/dialect/OracleDatabaseDialect.java | 2 +- .../dialect/PostgreSqlDatabaseDialect.java | 36 ++++ .../dialect/SqlServerDatabaseDialect.java | 6 + .../connect/jdbc/sink/BufferedRecords.java | 64 ++----- .../connect/jdbc/source/BulkTableQuerier.java | 2 +- .../source/JdbcSourceConnectorConfig.java | 167 ++++++++++++++++-- .../connect/jdbc/source/JdbcSourceTask.java | 69 ++++++-- .../jdbc/source/TableMonitorThread.java | 24 ++- .../connect/jdbc/source/TableQuerier.java | 15 ++ .../source/TimestampIncrementingCriteria.java | 2 +- .../TimestampIncrementingTableQuerier.java | 4 +- .../connect/jdbc/util/DateTimeUtils.java | 31 ++-- .../connect/jdbc/JdbcSinkConnectorTest.java | 97 ++++++++++ .../connect/jdbc/JdbcSourceConnectorTest.java | 138 +++++++++++++-- .../connect/jdbc/dialect/BaseDialectTest.java | 7 +- .../dialect/GenericDatabaseDialectTest.java | 41 ++++- .../dialect/OracleDatabaseDialectTest.java | 16 +- .../PostgreSqlDatabaseDialectTest.java | 40 ++++- .../jdbc/sink/BufferedRecordsTest.java | 10 +- .../integration/MicrosoftSqlServerSinkIT.java | 62 +++++-- .../source/JdbcSourceTaskLifecycleTest.java | 59 +++++++ .../jdbc/source/JdbcSourceTaskUpdateTest.java | 20 ++- .../jdbc/source/TableMonitorThreadTest.java | 65 +++++-- .../TimestampIncrementingCriteriaTest.java | 8 +- .../source/integration/MSSQLDateTimeIT.java | 3 +- .../connect/jdbc/util/DateTimeUtilsTest.java | 40 ++++- 33 files changed, 971 insertions(+), 195 deletions(-) create mode 100644 kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/JdbcSinkConnectorTest.java diff --git a/Dockerfile b/Dockerfile index 1d241e9f..dcb9abae 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -ARG BASE_IMAGE=radarbase/kafka-connect-transform-keyvalue:7.0.1 +ARG BASE_IMAGE=radarbase/kafka-connect-transform-keyvalue:7.2.1 FROM maven:3.8-jdk-11 as builder @@ -34,7 +34,7 @@ LABEL description="Kafka JDBC connector" ENV CONNECT_PLUGIN_PATH /usr/share/kafka-connect/plugins # To isolate the classpath from the plugin path as recommended -COPY --from=builder /code/kafka-connect-jdbc/target/components/packages/confluentinc-kafka-connect-jdbc-10.3.2/confluentinc-kafka-connect-jdbc-10.3.2/ ${CONNECT_PLUGIN_PATH}/kafka-connect-jdbc/ +COPY --from=builder /code/kafka-connect-jdbc/target/components/packages/confluentinc-kafka-connect-jdbc-10.5.2/confluentinc-kafka-connect-jdbc-10.5.2/ ${CONNECT_PLUGIN_PATH}/kafka-connect-jdbc/ # Load topics validator COPY ./docker/kafka-wait /usr/bin/kafka-wait diff --git a/kafka-connect-jdbc/README.md b/kafka-connect-jdbc/README.md index b563e537..71d80a2a 100644 --- a/kafka-connect-jdbc/README.md +++ b/kafka-connect-jdbc/README.md @@ -23,6 +23,7 @@ Contributions can only be accepted if they contain appropriate testing. For exam - Source Code: https://github.com/confluentinc/kafka-connect-jdbc - Issue Tracker: https://github.com/confluentinc/kafka-connect-jdbc/issues +- Learn how to work with the connector's source code by reading our [Development and Contribution guidelines](CONTRIBUTING.md). # Information diff --git a/kafka-connect-jdbc/pom.xml b/kafka-connect-jdbc/pom.xml index 9f1bb956..bc7131c5 100644 --- a/kafka-connect-jdbc/pom.xml +++ b/kafka-connect-jdbc/pom.xml @@ -14,6 +14,7 @@ ~ specific language governing permissions and limitations under the License. --> + 4.0.0 @@ -25,7 +26,7 @@ io.confluent kafka-connect-jdbc jar - 10.3.2 + 10.5.2 kafka-connect-jdbc Confluent, Inc. @@ -48,7 +49,7 @@ scm:git:git://github.com/confluentinc/kafka-connect-jdbc.git scm:git:git@github.com:confluentinc/kafka-connect-jdbc.git https://github.com/confluentinc/kafka-connect-jdbc - v10.3.1 + v10.5.2 @@ -56,10 +57,12 @@ 2.4 0.11.1 3.25.2 - 42.2.19 19.7.0.0 8.4.1.jre8 + 42.3.3 1.3.1 + 1.7.36 + 1.2.19 Confluent Community License UTF-8 target/${project.artifactId}-${project.version}-package @@ -73,10 +76,10 @@ https://packages.confluent.io/maven/ - confluent + Confluent https://packages.confluent.io/maven/ @@ -121,7 +124,6 @@ com.microsoft.sqlserver mssql-jdbc ${mssqlserver.jdbc.driver.version} - runtime net.sourceforge.jtds @@ -169,13 +171,14 @@ org.slf4j - slf4j-log4j12 + slf4j-reload4j + ${slf4j.version} test - - io.confluent - confluent-log4j + ch.qos.reload4j + reload4j + ${reload4j.version} test @@ -201,6 +204,12 @@ connect-runtime ${kafka.version} test + + + org.slf4j + slf4j-log4j12 + + org.apache.kafka @@ -270,6 +279,7 @@ -Xlint:all + -Werror diff --git a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/JdbcSinkConnector.java b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/JdbcSinkConnector.java index 1f800434..cfe0c4f3 100644 --- a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/JdbcSinkConnector.java +++ b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/JdbcSinkConnector.java @@ -15,8 +15,15 @@ package io.confluent.connect.jdbc; +import static io.confluent.connect.jdbc.sink.JdbcSinkConfig.DELETE_ENABLED; +import static io.confluent.connect.jdbc.sink.JdbcSinkConfig.PK_MODE; +import static io.confluent.connect.jdbc.sink.JdbcSinkConfig.PrimaryKeyMode.RECORD_KEY; + +import java.util.Locale; +import java.util.Optional; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; import org.slf4j.Logger; @@ -65,8 +72,34 @@ public ConfigDef config() { @Override public Config validate(Map connectorConfigs) { - // TODO cross-fields validation here: pkFields against the pkMode - return super.validate(connectorConfigs); + /** get configuration parsed and validated individually */ + Config config = super.validate(connectorConfigs); + + return validateDeleteEnabledPkMode(config); + } + + private Config validateDeleteEnabledPkMode(Config config) { + configValue(config, DELETE_ENABLED) + .filter(deleteEnabled -> Boolean.TRUE.equals(deleteEnabled.value())) + .ifPresent(deleteEnabled -> configValue(config, PK_MODE) + .ifPresent(pkMode -> { + if (!RECORD_KEY.name().toLowerCase(Locale.ROOT).equals(pkMode.value()) + && !RECORD_KEY.name().toUpperCase(Locale.ROOT).equals(pkMode.value())) { + String conflictMsg = "Deletes are only supported for pk.mode record_key"; + pkMode.addErrorMessage(conflictMsg); + deleteEnabled.addErrorMessage(conflictMsg); + } + })); + return config; + } + + /** only if individual validation passed. */ + private Optional configValue(Config config, String name) { + return config.configValues() + .stream() + .filter(cfg -> name.equals(cfg.name()) + && cfg.errorMessages().isEmpty()) + .findFirst(); } @Override diff --git a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java index 4146eef0..fc3a2e80 100644 --- a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java +++ b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java @@ -15,8 +15,10 @@ package io.confluent.connect.jdbc; +import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceConnector; @@ -92,6 +94,8 @@ public void start(Map properties) throws ConnectException { cachedConnectionProvider.getConnection(); long tablePollMs = config.getLong(JdbcSourceConnectorConfig.TABLE_POLL_INTERVAL_MS_CONFIG); + long tableStartupLimitMs = + config.getLong(JdbcSourceConnectorConfig.TABLE_MONITORING_STARTUP_POLLING_LIMIT_MS_CONFIG); List whitelist = config.getList(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG); Set whitelistSet = whitelist.isEmpty() ? null : new HashSet<>(whitelist); List blacklist = config.getList(JdbcSourceConnectorConfig.TABLE_BLACKLIST_CONFIG); @@ -117,9 +121,11 @@ public void start(Map properties) throws ConnectException { dialect, cachedConnectionProvider, context, + tableStartupLimitMs, tablePollMs, whitelistSet, - blacklistSet + blacklistSet, + Time.SYSTEM ); if (query.isEmpty()) { tableMonitorThread.start(); @@ -135,6 +141,15 @@ public Class taskClass() { return JdbcSourceTask.class; } + @Override + public Config validate(Map connectorConfigs) { + Config config = super.validate(connectorConfigs); + JdbcSourceConnectorConfig jdbcSourceConnectorConfig + = new JdbcSourceConnectorConfig(connectorConfigs); + jdbcSourceConnectorConfig.validateMultiConfigs(config); + return config; + } + @Override public List> taskConfigs(int maxTasks) { String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG); @@ -154,8 +169,11 @@ public List> taskConfigs(int maxTasks) { + "the list of tables from the database yet" ); } else if (currentTables.isEmpty()) { - taskConfigs = Collections.emptyList(); - log.warn("No tasks will be run because no tables were found"); + taskConfigs = new ArrayList<>(1); + log.warn("No tables were found so there's no work to be done."); + Map taskProps = new HashMap<>(configProperties); + taskProps.put(JdbcSourceTaskConfig.TABLES_CONFIG, "[]"); + taskConfigs.add(taskProps); } else { int numGroups = Math.min(currentTables.size(), maxTasks); List> tablesGrouped = diff --git a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/DatabaseDialect.java b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/DatabaseDialect.java index d034c086..37a9e761 100644 --- a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/DatabaseDialect.java +++ b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/DatabaseDialect.java @@ -32,6 +32,7 @@ import io.confluent.connect.jdbc.sink.metadata.SchemaPair; import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; import io.confluent.connect.jdbc.source.ColumnMapping; +import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.TransactionIsolationMode; import io.confluent.connect.jdbc.source.TimestampIncrementingCriteria; import io.confluent.connect.jdbc.util.ColumnDefinition; import io.confluent.connect.jdbc.util.ColumnId; @@ -202,6 +203,20 @@ Timestamp currentTimeOnDB( */ boolean tableExists(Connection connection, TableId tableId) throws SQLException; + + /** + * Set the isolation mode for the connection. + * Isolation modes can differ by database so this provides an interface for + * the mode to be overridden. + * + * @param connection the database connection; may not be null + * @param transactionIsolationMode the transaction isolation config + */ + void setConnectionIsolationMode( + Connection connection, + TransactionIsolationMode transactionIsolationMode + ); + /** * Create the definition for the columns described by the database metadata using the current * schema and catalog patterns defined in the configuration. diff --git a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java index eafa557e..7f33a306 100644 --- a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java +++ b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java @@ -17,7 +17,9 @@ import java.time.ZoneOffset; import java.util.TimeZone; + import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.types.Password; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; @@ -76,6 +78,7 @@ import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig; import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.NumericMapping; import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.TimestampGranularity; +import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.TransactionIsolationMode; import io.confluent.connect.jdbc.source.JdbcSourceTaskConfig; import io.confluent.connect.jdbc.source.TimestampIncrementingCriteria; import io.confluent.connect.jdbc.util.ColumnDefinition; @@ -599,6 +602,30 @@ public boolean tableExists( } } + public void setConnectionIsolationMode( + Connection connection, + TransactionIsolationMode transactionIsolationMode + ) { + if (transactionIsolationMode + == TransactionIsolationMode.DEFAULT) { + return; + } + int isolationMode = TransactionIsolationMode.get( + transactionIsolationMode + ); + try { + DatabaseMetaData metadata = connection.getMetaData(); + if (metadata.supportsTransactionIsolationLevel(isolationMode)) { + connection.setTransactionIsolation(isolationMode); + } else { + throw new ConfigException("Transaction Isolation level not supported by database"); + } + } catch (SQLException | ConfigException ex) { + log.warn("Unable to set transaction.isolation.mode: " + transactionIsolationMode.name() + + ". No transaction isolation mode will be set for the queries: " + ex.getMessage()); + } + } + protected String displayableTableTypes(String[] types, String delim) { return Arrays.stream(types).sorted().collect(Collectors.joining(delim)); } @@ -1391,7 +1418,7 @@ protected ColumnConverter columnConverterFor( case Types.TIMESTAMP: { return rs -> { Timestamp timestamp = rs.getTimestamp(col, DateTimeUtils.getTimeZoneCalendar(timeZone)); - return tsGranularity.fromTimestamp.apply(timestamp); + return tsGranularity.fromTimestamp.apply(timestamp, timeZone); }; } diff --git a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialect.java b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialect.java index 7294f765..f393f599 100644 --- a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialect.java +++ b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialect.java @@ -200,7 +200,7 @@ protected String getSqlType(SinkRecordField field) { case BOOLEAN: return "NUMBER(1,0)"; case STRING: - return "CLOB"; + return "VARCHAR2(4000)"; case BYTES: return "BLOB"; default: diff --git a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java index c2a94b9d..131cea20 100644 --- a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java +++ b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java @@ -571,4 +571,40 @@ protected String valueTypeCast(TableDefinition tableDefn, ColumnId columnId) { } return ""; } + + @Override + protected int decimalScale(ColumnDefinition defn) { + if (defn.scale() == NUMERIC_TYPE_SCALE_UNSET) { + return NUMERIC_TYPE_SCALE_HIGH; + } + + // Postgres requires DECIMAL/NUMERIC columns to have a precision greater than zero + // If the precision appears to be zero, it's because the user didn't define a fixed precision + // for the column. + if (defn.precision() == 0) { + // In that case, a scale of zero indicates that there also isn't a fixed scale defined for + // the column. Instead of treating that column as if its scale is actually zero (which can + // cause issues since it may contain values that aren't possible with a scale of zero, like + // 12.12), we fall back on NUMERIC_TYPE_SCALE_HIGH to try to avoid loss of precision + if (defn.scale() == 0) { + log.debug( + "Column {} does not appear to have a fixed scale defined; defaulting to {}", + defn.id(), + NUMERIC_TYPE_SCALE_HIGH + ); + return NUMERIC_TYPE_SCALE_HIGH; + } else { + // Should never happen, but if it does may signal an edge case + // that we need to add new logic for + log.warn( + "Column {} has a precision of zero, but a non-zero scale of {}", + defn.id(), + defn.scale() + ); + } + } + + return defn.scale(); + } + } diff --git a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java index 16d0b5a9..50330d0d 100644 --- a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java +++ b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Time; import org.apache.kafka.connect.data.Timestamp; @@ -236,6 +237,11 @@ protected static java.sql.Timestamp dateTimeOffsetFrom(String value, TimeZone ti return java.sql.Timestamp.from(zdt.toInstant()); } + @Override + protected Integer getSqlTypeForSchema(Schema schema) { + return schema.type() == Schema.Type.BYTES ? Types.VARBINARY : null; + } + @Override protected String getSqlType(SinkRecordField field) { if (field.schemaName() != null) { diff --git a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java index 100f6d37..cda5df24 100644 --- a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java +++ b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java @@ -25,11 +25,11 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; +import java.sql.BatchUpdateException; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.stream.Collectors; import io.confluent.connect.jdbc.dialect.DatabaseDialect; @@ -39,7 +39,6 @@ import io.confluent.connect.jdbc.util.ColumnId; import io.confluent.connect.jdbc.util.TableId; -import static io.confluent.connect.jdbc.sink.JdbcSinkConfig.InsertMode.INSERT; import static java.util.Objects.isNull; import static java.util.Objects.nonNull; @@ -184,28 +183,8 @@ public List flush() throws SQLException { updateStatementBinder.bindRecord(record); } } - Optional totalUpdateCount = executeUpdates(); - long totalDeleteCount = executeDeletes(); - - final long expectedCount = updateRecordCount(); - log.trace("{} records:{} resulting in totalUpdateCount:{} totalDeleteCount:{}", - config.insertMode, records.size(), totalUpdateCount, totalDeleteCount - ); - if (totalUpdateCount.filter(total -> total != expectedCount).isPresent() - && config.insertMode == INSERT) { - throw new ConnectException(String.format( - "Update count (%d) did not sum up to total number of records inserted (%d)", - totalUpdateCount.get(), - expectedCount - )); - } - if (!totalUpdateCount.isPresent()) { - log.info( - "{} records:{} , but no count of the number of rows it affected is available", - config.insertMode, - records.size() - ); - } + executeUpdates(); + executeDeletes(); final List flushedRecords = records; records = new ArrayList<>(); @@ -213,39 +192,26 @@ public List flush() throws SQLException { return flushedRecords; } - /** - * @return an optional count of all updated rows or an empty optional if no info is available - */ - private Optional executeUpdates() throws SQLException { - Optional count = Optional.empty(); - for (int updateCount : updatePreparedStatement.executeBatch()) { - if (updateCount != Statement.SUCCESS_NO_INFO) { - count = count.isPresent() - ? count.map(total -> total + updateCount) - : Optional.of((long) updateCount); + private void executeUpdates() throws SQLException { + int[] batchStatus = updatePreparedStatement.executeBatch(); + for (int updateCount : batchStatus) { + if (updateCount == Statement.EXECUTE_FAILED) { + throw new BatchUpdateException( + "Execution failed for part of the batch update", batchStatus); } } - return count; } - private long executeDeletes() throws SQLException { - long totalDeleteCount = 0; + private void executeDeletes() throws SQLException { if (nonNull(deletePreparedStatement)) { - for (int updateCount : deletePreparedStatement.executeBatch()) { - if (updateCount != Statement.SUCCESS_NO_INFO) { - totalDeleteCount += updateCount; + int[] batchStatus = deletePreparedStatement.executeBatch(); + for (int updateCount : batchStatus) { + if (updateCount == Statement.EXECUTE_FAILED) { + throw new BatchUpdateException( + "Execution failed for part of the batch delete", batchStatus); } } } - return totalDeleteCount; - } - - private long updateRecordCount() { - return records - .stream() - // ignore deletes - .filter(record -> nonNull(record.value()) || !config.deleteEnabled) - .count(); } public void close() throws SQLException { diff --git a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/BulkTableQuerier.java b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/BulkTableQuerier.java index 96553873..2f2d0613 100644 --- a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/BulkTableQuerier.java +++ b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/BulkTableQuerier.java @@ -70,7 +70,7 @@ protected void createPreparedStatement(Connection db) throws SQLException { String queryStr = builder.toString(); recordQuery(queryStr); - log.debug("{} prepared SQL query: {}", this, queryStr); + log.trace("{} prepared SQL query: {}", this, queryStr); stmt = dialect.createPreparedStatement(db, queryStr); } diff --git a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java index a033db22..17bebf0b 100644 --- a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java +++ b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java @@ -15,6 +15,7 @@ package io.confluent.connect.jdbc.source; +import java.sql.Connection; import java.sql.Timestamp; import java.time.ZoneId; import java.util.Arrays; @@ -26,15 +27,21 @@ import java.util.TimeZone; import java.util.concurrent.atomic.AtomicReference; +import com.microsoft.sqlserver.jdbc.SQLServerConnection; +import io.confluent.connect.jdbc.dialect.DatabaseDialect; +import io.confluent.connect.jdbc.dialect.DatabaseDialects; import io.confluent.connect.jdbc.util.DatabaseDialectRecommender; import io.confluent.connect.jdbc.util.DateTimeUtils; import io.confluent.connect.jdbc.util.EnumRecommender; import io.confluent.connect.jdbc.util.QuoteMethod; import io.confluent.connect.jdbc.util.TimeZoneValidator; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.regex.Pattern; + import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Recommender; @@ -42,6 +49,7 @@ import org.apache.kafka.common.config.ConfigDef.Validator; import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; @@ -51,7 +59,7 @@ public class JdbcSourceConnectorConfig extends AbstractConfig { private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceConnectorConfig.class); - private static final Pattern INVALID_CHARS = Pattern.compile("[^a-zA-Z0-9._-]"); + private static Pattern INVALID_CHARS = Pattern.compile("[^a-zA-Z0-9._-]"); public static final String CONNECTION_PREFIX = "connection."; @@ -63,6 +71,7 @@ public class JdbcSourceConnectorConfig extends AbstractConfig { + "``jdbc:sqlserver://localhost;instance=SQLEXPRESS;" + "databaseName=db_name``"; private static final String CONNECTION_URL_DISPLAY = "JDBC URL"; + private static final String CONNECTION_URL_DEFAULT = ""; public static final String CONNECTION_USER_CONFIG = CONNECTION_PREFIX + "user"; private static final String CONNECTION_USER_DOC = "JDBC connection user."; @@ -199,12 +208,17 @@ public class JdbcSourceConnectorConfig extends AbstractConfig { + "built-in representations \n" + " * nanos_long: represents timestamp values as nanos since epoch\n" + " * nanos_string: represents timestamp values as nanos since epoch in string\n" - + " * nanos_iso_datetime_string: uses the iso format 'yyyy-MM-dd'T'HH:mm:ss.n'\n"; + + " * nanos_iso_datetime_string: uses iso format 'yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS'\n"; public static final String TIMESTAMP_GRANULARITY_DISPLAY = "Timestamp granularity for " + "timestamp columns"; private static final EnumRecommender TIMESTAMP_GRANULARITY_RECOMMENDER = EnumRecommender.in(TimestampGranularity.values()); + /* The amount of time to wait for the table monitoring thread to complete initial table read */ + public static final String TABLE_MONITORING_STARTUP_POLLING_LIMIT_MS_CONFIG = + "table.monitoring.startup.polling.limit.ms"; + public static final long TABLE_MONITORING_STARTUP_POLLING_LIMIT_MS_DEFAULT = 10 * 1000; + public static final String TABLE_POLL_INTERVAL_MS_CONFIG = "table.poll.interval.ms"; private static final String TABLE_POLL_INTERVAL_MS_DOC = "Frequency in ms to poll for new or removed tables, which may result in updated task " @@ -300,10 +314,16 @@ public class JdbcSourceConnectorConfig extends AbstractConfig { public static final String QUERY_SUFFIX_CONFIG = "query.suffix"; public static final String QUERY_SUFFIX_DEFAULT = ""; - public static final String QUERY_SUFFIX_DOC = + public static final String QUERY_SUFFIX_DOC = "Suffix to append at the end of the generated query."; public static final String QUERY_SUFFIX_DISPLAY = "Query suffix"; + public static final String QUERY_RETRIES_CONFIG = "query.retry.attempts"; + public static final String QUERY_RETRIES_DEFAULT = "-1"; + public static final String QUERY_RETRIES_DOC = + "Number of times to retry SQL exceptions encountered when executing queries."; + public static final String QUERY_RETRIES_DISPLAY = "Query Retry Attempts"; + private static final EnumRecommender QUOTE_METHOD_RECOMMENDER = EnumRecommender.in(QuoteMethod.values()); @@ -330,6 +350,28 @@ public class JdbcSourceConnectorConfig extends AbstractConfig { + " In most cases it only makes sense to have either TABLE or VIEW."; private static final String TABLE_TYPE_DISPLAY = "Table Types"; + public static final String TRANSACTION_ISOLATION_MODE_DEFAULT = + TransactionIsolationMode.DEFAULT.name(); + public static final String TRANSACTION_ISOLATION_MODE_CONFIG = "transaction.isolation.mode"; + private static final String TRANSACTION_ISOLATION_MODE_DOC = + "Mode to control which transaction isolation level is used when running queries " + + "against the database. By default no explicit transaction isolation" + + "mode is set. SQL_SERVER_SNAPSHOT will only work" + + "against a connector configured to write to Sql Server. " + + " Options include:\n" + + " * DEFAULT\n " + + " * READ_UNCOMMITED\n" + + " * READ_COMMITED\n" + + " * REPEATABLE_READ\n" + + " * SERIALIZABLE\n" + + " * SQL_SERVER_SNAPSHOT\n"; + private static final String TRANSACTION_ISOLATION_MODE_DISPLAY = "Transaction Isolation Mode"; + + private static final EnumRecommender TRANSACTION_ISOLATION_MODE_RECOMMENDER = + EnumRecommender.in(TransactionIsolationMode.values()); + + private static final String SqlServerDatabaseDialectName = "SqlServerDatabaseDialect"; + public static ConfigDef baseConfigDef() { ConfigDef config = new ConfigDef(); addDatabaseOptions(config); @@ -338,11 +380,51 @@ public static ConfigDef baseConfigDef() { return config; } + public Config validateMultiConfigs(Config config) { + HashMap configValues = new HashMap<>(); + config.configValues().stream() + .filter((configValue) -> + configValue.name().equals( + JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG + ) + ).forEach(configValue -> configValues.putIfAbsent(configValue.name(), configValue)); + + TransactionIsolationMode transactionIsolationMode = + TransactionIsolationMode.valueOf( + this.getString(TRANSACTION_ISOLATION_MODE_CONFIG) + ); + if (transactionIsolationMode == TransactionIsolationMode.SQL_SERVER_SNAPSHOT) { + DatabaseDialect dialect; + final String dialectName = this.getString(JdbcSourceConnectorConfig.DIALECT_NAME_CONFIG); + if (dialectName != null && !dialectName.trim().isEmpty()) { + dialect = DatabaseDialects.create(dialectName, this); + } else { + dialect = DatabaseDialects.findBestFor(this.getString(CONNECTION_URL_CONFIG), this); + } + if (!dialect.name().equals( + DatabaseDialects.create( + SqlServerDatabaseDialectName, this + ).name() + ) + ) { + configValues + .get(JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG) + .addErrorMessage("Isolation mode of `" + + TransactionIsolationMode.SQL_SERVER_SNAPSHOT.name() + + "` can only be configured with a Sql Server Dialect" + ); + } + } + + return config; + } + private static final void addDatabaseOptions(ConfigDef config) { int orderInGroup = 0; config.define( CONNECTION_URL_CONFIG, Type.STRING, + CONNECTION_URL_DEFAULT, Importance.HIGH, CONNECTION_URL_DOC, DATABASE_GROUP, @@ -565,7 +647,29 @@ private static final void addModeOptions(ConfigDef config) { MODE_GROUP, ++orderInGroup, Width.MEDIUM, - QUERY_SUFFIX_DISPLAY); + QUERY_SUFFIX_DISPLAY + ).define( + TRANSACTION_ISOLATION_MODE_CONFIG, + Type.STRING, + TRANSACTION_ISOLATION_MODE_DEFAULT, + Importance.LOW, + TRANSACTION_ISOLATION_MODE_DOC, + MODE_GROUP, + ++orderInGroup, + Width.MEDIUM, + TRANSACTION_ISOLATION_MODE_DISPLAY, + TRANSACTION_ISOLATION_MODE_RECOMMENDER + ).define( + QUERY_RETRIES_CONFIG, + Type.INT, + QUERY_RETRIES_DEFAULT, + Importance.LOW, + QUERY_RETRIES_DOC, + MODE_GROUP, + ++orderInGroup, + Width.MEDIUM, + QUERY_RETRIES_DISPLAY + ); } private static final void addConnectorOptions(ConfigDef config) { @@ -600,6 +704,11 @@ private static final void addConnectorOptions(ConfigDef config) { ++orderInGroup, Width.SHORT, BATCH_MAX_ROWS_DISPLAY + ).defineInternal( + TABLE_MONITORING_STARTUP_POLLING_LIMIT_MS_CONFIG, + Type.LONG, + TABLE_MONITORING_STARTUP_POLLING_LIMIT_MS_DEFAULT, + Importance.LOW ).define( TABLE_POLL_INTERVAL_MS_CONFIG, Type.LONG, @@ -680,10 +789,6 @@ public void ensureValid(final String name, final Object value) { public JdbcSourceConnectorConfig(Map props) { super(CONFIG_DEF, props); - String mode = getString(JdbcSourceConnectorConfig.MODE_CONFIG); - if (mode.equals(JdbcSourceConnectorConfig.MODE_UNSPECIFIED)) { - throw new ConfigException("Query mode must be specified"); - } } public String topicPrefix() { @@ -820,16 +925,16 @@ public enum TimestampGranularity { CONNECT_LOGICAL(optional -> optional ? org.apache.kafka.connect.data.Timestamp.builder().optional().build() : org.apache.kafka.connect.data.Timestamp.builder().build(), - timestamp -> timestamp, - timestamp -> (Timestamp) timestamp), + (timestamp, tz) -> timestamp, + (timestamp, tz) -> (Timestamp) timestamp), NANOS_LONG(optional -> optional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA, - DateTimeUtils::toEpochNanos, - epochNanos -> DateTimeUtils.toTimestamp((Long) epochNanos)), + (timestamp, tz) -> DateTimeUtils.toEpochNanos(timestamp), + (epochNanos, tz) -> DateTimeUtils.toTimestamp((Long) epochNanos)), NANOS_STRING(optional -> optional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA, - DateTimeUtils::toEpochNanosString, - epochNanosString -> { + (timestamp, tz) -> DateTimeUtils.toEpochNanosString(timestamp), + (epochNanosString, tz) -> { try { return DateTimeUtils.toTimestamp((String) epochNanosString); } catch (NumberFormatException e) { @@ -843,11 +948,12 @@ public enum TimestampGranularity { NANOS_ISO_DATETIME_STRING(optional -> optional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA, DateTimeUtils::toIsoDateTimeString, - isoDateTimeString -> DateTimeUtils.toTimestampFromIsoDateTime((String) isoDateTimeString)); + (isoDateTimeString, tz) -> + DateTimeUtils.toTimestampFromIsoDateTime((String) isoDateTimeString, tz)); public final Function schemaFunction; - public final Function fromTimestamp; - public final Function toTimestamp; + public final BiFunction fromTimestamp; + public final BiFunction toTimestamp; public static final String DEFAULT = CONNECT_LOGICAL.name().toLowerCase(Locale.ROOT); @@ -864,14 +970,37 @@ public static TimestampGranularity get(JdbcSourceConnectorConfig config) { } TimestampGranularity(Function schemaFunction, - Function fromTimestamp, - Function toTimestamp) { + BiFunction fromTimestamp, + BiFunction toTimestamp) { this.schemaFunction = schemaFunction; this.fromTimestamp = fromTimestamp; this.toTimestamp = toTimestamp; } } + public enum TransactionIsolationMode { + DEFAULT, READ_UNCOMMITTED, READ_COMMITTED, + REPEATABLE_READ, SERIALIZABLE, SQL_SERVER_SNAPSHOT; + + public static int get(TransactionIsolationMode mode) { + switch (mode) { + case READ_UNCOMMITTED: + return Connection.TRANSACTION_READ_UNCOMMITTED; + case READ_COMMITTED: + return Connection.TRANSACTION_READ_COMMITTED; + case REPEATABLE_READ: + return Connection.TRANSACTION_REPEATABLE_READ; + case SERIALIZABLE: + return Connection.TRANSACTION_SERIALIZABLE; + case SQL_SERVER_SNAPSHOT: + return SQLServerConnection.TRANSACTION_SNAPSHOT; + default: + return -1; + } + } + } + + protected JdbcSourceConnectorConfig(ConfigDef subclassConfigDef, Map props) { super(subclassConfigDef, props); } diff --git a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java index b09f1f57..5cd8a6ab 100644 --- a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java +++ b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java @@ -51,6 +51,7 @@ import io.confluent.connect.jdbc.util.ColumnId; import io.confluent.connect.jdbc.util.TableId; import io.confluent.connect.jdbc.util.Version; +import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.TransactionIsolationMode; /** * JdbcSourceTask is a Kafka Connect SourceTask implementation that reads from JDBC databases and @@ -65,11 +66,14 @@ public class JdbcSourceTask extends SourceTask { private Time time; private JdbcSourceTaskConfig config; private DatabaseDialect dialect; - private CachedConnectionProvider cachedConnectionProvider; - private PriorityQueue tableQueue = new PriorityQueue(); + //Visible for Testing + CachedConnectionProvider cachedConnectionProvider; + PriorityQueue tableQueue = new PriorityQueue<>(); private final AtomicBoolean running = new AtomicBoolean(false); private final AtomicLong taskThreadId = new AtomicLong(0); + int maxRetriesPerQuerier; + public JdbcSourceTask() { this.time = new SystemTime(); } @@ -92,6 +96,24 @@ public void start(Map properties) { throw new ConnectException("Couldn't start JdbcSourceTask due to configuration error", e); } + List tables = config.getList(JdbcSourceTaskConfig.TABLES_CONFIG); + String query = config.getString(JdbcSourceTaskConfig.QUERY_CONFIG); + + if ((tables.isEmpty() && query.isEmpty())) { + throw new ConnectException("Task is being killed because" + + " it was not assigned a table nor a query to execute." + + " If run in table mode please make sure that the tables" + + " exist on the database. If the table does exist on" + + " the database, we recommend using the fully qualified" + + " table name."); + } + + if ((!tables.isEmpty() && !query.isEmpty())) { + throw new ConnectException("Invalid configuration: a JdbcSourceTask" + + " cannot have both a table and a query assigned to it"); + } + + final String url = config.getString(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG); final int maxConnAttempts = config.getInt(JdbcSourceConnectorConfig.CONNECTION_ATTEMPTS_CONFIG); final long retryBackoff = config.getLong(JdbcSourceConnectorConfig.CONNECTION_BACKOFF_CONFIG); @@ -106,12 +128,17 @@ public void start(Map properties) { cachedConnectionProvider = connectionProvider(maxConnAttempts, retryBackoff); - List tables = config.getList(JdbcSourceTaskConfig.TABLES_CONFIG); - String query = config.getString(JdbcSourceTaskConfig.QUERY_CONFIG); - if ((tables.isEmpty() && query.isEmpty()) || (!tables.isEmpty() && !query.isEmpty())) { - throw new ConnectException("Invalid configuration: each JdbcSourceTask must have at " - + "least one table assigned to it or one query specified"); - } + + dialect.setConnectionIsolationMode( + cachedConnectionProvider.getConnection(), + TransactionIsolationMode + .valueOf( + config.getString( + JdbcSourceConnectorConfig + .TRANSACTION_ISOLATION_MODE_CONFIG + ) + ) + ); TableQuerier.QueryMode queryMode = !query.isEmpty() ? TableQuerier.QueryMode.QUERY : TableQuerier.QueryMode.TABLE; List tablesOrQuery = queryMode == TableQuerier.QueryMode.QUERY @@ -207,10 +234,10 @@ public void start(Map properties) { if (mode.equals(JdbcSourceTaskConfig.MODE_BULK)) { tableQueue.add( new BulkTableQuerier( - dialect, - queryMode, - tableOrQuery, - topicPrefix, + dialect, + queryMode, + tableOrQuery, + topicPrefix, suffix ) ); @@ -267,6 +294,8 @@ public void start(Map properties) { running.set(true); taskThreadId.set(Thread.currentThread().getId()); log.info("Started JDBC source task"); + + maxRetriesPerQuerier = config.getInt(JdbcSourceConnectorConfig.QUERY_RETRIES_CONFIG); } protected CachedConnectionProvider connectionProvider(int maxConnAttempts, long retryBackoff) { @@ -388,6 +417,7 @@ public List poll() throws InterruptedException { while (results.size() < batchMaxRows && (hadNext = querier.next())) { results.add(querier.extractRecord()); } + querier.resetRetryCount(); if (!hadNext) { // If we finished processing the results from the current query, we can reset and send @@ -421,8 +451,21 @@ public List poll() throws InterruptedException { closeResources(); throw new ConnectException(sqle); } catch (SQLException sqle) { - log.error("SQL exception while running query for table: {}", querier, sqle); + log.error( + "SQL exception while running query for table: {}, {}." + + " Attempting retry {} of {} attempts.", + querier, + sqle, + querier.getAttemptedRetryCount() + 1, + maxRetriesPerQuerier + ); resetAndRequeueHead(querier, true); + if (maxRetriesPerQuerier > 0 + && querier.getAttemptedRetryCount() >= maxRetriesPerQuerier) { + closeResources(); + throw new ConnectException("Failed to Query table after retries", sqle); + } + querier.incrementRetryCount(); return null; } catch (Throwable t) { log.error("Failed to run query for table: {}", querier, t); diff --git a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/TableMonitorThread.java b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/TableMonitorThread.java index 449eeb23..86136dbb 100644 --- a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/TableMonitorThread.java +++ b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/TableMonitorThread.java @@ -19,6 +19,9 @@ import io.confluent.connect.jdbc.util.ConnectionProvider; import io.confluent.connect.jdbc.util.QuoteMethod; import io.confluent.connect.jdbc.util.TableId; + +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.errors.ConnectException; import org.slf4j.Logger; @@ -46,26 +49,32 @@ public class TableMonitorThread extends Thread { private final ConnectionProvider connectionProvider; private final ConnectorContext context; private final CountDownLatch shutdownLatch; + private final long startupMs; private final long pollMs; private final Set whitelist; private final Set blacklist; private final AtomicReference> tables; + private final Time time; public TableMonitorThread(DatabaseDialect dialect, ConnectionProvider connectionProvider, ConnectorContext context, + long startupMs, long pollMs, Set whitelist, - Set blacklist + Set blacklist, + Time time ) { this.dialect = dialect; this.connectionProvider = connectionProvider; this.context = context; this.shutdownLatch = new CountDownLatch(1); + this.startupMs = startupMs; this.pollMs = pollMs; this.whitelist = whitelist; this.blacklist = blacklist; this.tables = new AtomicReference<>(); + this.time = time; } @Override @@ -98,6 +107,7 @@ public void run() { * successfully yet */ public List tables() { + awaitTablesReady(startupMs); List tablesSnapshot = tables.get(); if (tablesSnapshot == null) { return null; @@ -145,6 +155,15 @@ public List tables() { return tablesSnapshot; } + private void awaitTablesReady(long timeoutMs) { + try { + time.waitObject(tables, () -> tables.get() != null, time.milliseconds() + timeoutMs); + } catch (InterruptedException | TimeoutException e) { + log.warn("Timed out or interrupted while awaiting for tables being read."); + return; + } + } + public void shutdown() { log.info("Shutting down thread monitoring tables."); shutdownLatch.countDown(); @@ -189,6 +208,9 @@ private boolean updateTables() { } List priorTablesSnapshot = tables.getAndSet(filteredTables); + synchronized (tables) { + tables.notifyAll(); + } return !Objects.equals(priorTablesSnapshot, filteredTables); } diff --git a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/TableQuerier.java b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/TableQuerier.java index c127485e..25fcf155 100644 --- a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/TableQuerier.java +++ b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/TableQuerier.java @@ -57,6 +57,8 @@ public enum QueryMode { protected SchemaMapping schemaMapping; private String loggedQueryString; + private int attemptedRetries; + public TableQuerier( DatabaseDialect dialect, QueryMode mode, @@ -71,6 +73,7 @@ public TableQuerier( this.topicPrefix = topicPrefix; this.lastUpdate = 0; this.suffix = suffix; + this.attemptedRetries = 0; } public long getLastUpdate() { @@ -119,6 +122,18 @@ public void reset(long now, boolean resetOffset) { lastUpdate = now; } + public int getAttemptedRetryCount() { + return attemptedRetries; + } + + public void incrementRetryCount() { + attemptedRetries++; + } + + public void resetRetryCount() { + attemptedRetries = 0; + } + private void releaseLocksQuietly() { if (db != null) { try { diff --git a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingCriteria.java b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingCriteria.java index 7784842d..8b01be2a 100644 --- a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingCriteria.java +++ b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingCriteria.java @@ -230,7 +230,7 @@ protected Timestamp extractOffsetTimestamp( ) { caseAdjustedTimestampColumns.computeIfAbsent(schema, this::findCaseSensitiveTimestampColumns); for (String timestampColumn : caseAdjustedTimestampColumns.get(schema)) { - Timestamp ts = timestampGranularity.toTimestamp.apply(record.get(timestampColumn)); + Timestamp ts = timestampGranularity.toTimestamp.apply(record.get(timestampColumn), timeZone); if (ts != null) { return ts; } diff --git a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java index 49226ebf..11d1217f 100644 --- a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java +++ b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java @@ -152,7 +152,7 @@ protected void createPreparedStatement(Connection db) throws SQLException { String queryString = builder.toString(); recordQuery(queryString); - log.debug("{} prepared SQL query: {}", this, queryString); + log.trace("{} prepared SQL query: {}", this, queryString); stmt = dialect.createPreparedStatement(db, queryString); } @@ -269,4 +269,4 @@ public String toString() { + ", timestampColumns=" + timestampColumnNames + '}'; } -} +} \ No newline at end of file diff --git a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/util/DateTimeUtils.java b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/util/DateTimeUtils.java index a468d3f0..46155027 100644 --- a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/util/DateTimeUtils.java +++ b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/util/DateTimeUtils.java @@ -18,6 +18,7 @@ import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.time.LocalDateTime; +import java.time.chrono.ChronoZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.Calendar; import java.util.Date; @@ -34,7 +35,7 @@ public class DateTimeUtils { static final long NANOSECONDS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1); static final long NANOSECONDS_PER_SECOND = TimeUnit.SECONDS.toNanos(1); static final DateTimeFormatter ISO_DATE_TIME_NANOS_FORMAT = - DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.n"); + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS"); private static final ThreadLocal> TIMEZONE_CALENDARS = ThreadLocal.withInitial(HashMap::new); @@ -76,14 +77,6 @@ public static String formatTimestamp(Date date, TimeZone timeZone) { }).format(date); } - public static String formatTimestamptz(Date date, TimeZone timeZone) { - return TIMEZONE_TIMESTAMP_FORMATS.get().computeIfAbsent(timeZone, aTimeZone -> { - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSX"); - sdf.setTimeZone(aTimeZone); - return sdf; - }).format(date); - } - private static Long convertToEpochNanos(Timestamp t) { Long epochMillis = TimeUnit.SECONDS.toNanos(t.getTime() / MILLISECONDS_PER_SECOND); Long nanosInSecond = TimeUnit.NANOSECONDS.toNanos(t.getNanos()); @@ -119,11 +112,14 @@ public static String toEpochNanosString(Timestamp timestamp) { * Get the iso date-time string with nano precision for the given {@link Timestamp}. * * @param timestamp the Java timestamp value + * @param tz the timezone of the source database * @return the string iso date time */ - public static String toIsoDateTimeString(Timestamp timestamp) { + public static String toIsoDateTimeString(Timestamp timestamp, TimeZone tz) { return Optional.ofNullable(timestamp) - .map(t -> t.toLocalDateTime().format(ISO_DATE_TIME_NANOS_FORMAT)) + .map(Timestamp::toInstant) + .map(t -> t.atZone(tz.toZoneId())) + .map(t -> t.format(ISO_DATE_TIME_NANOS_FORMAT)) .orElse(null); } @@ -159,13 +155,16 @@ public static Timestamp toTimestamp(String nanos) throws NumberFormatException { /** * Get {@link Timestamp} from epoch with nano precision * - * @param isoDateTime iso dateTime format "yyyy-MM-dd'T'HH:mm:ss.n" + * @param isoDT iso dateTime format "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS" + * @param tz the timezone of the source database * @return the equivalent java sql Timestamp */ - public static Timestamp toTimestampFromIsoDateTime(String isoDateTime) { - return Optional.ofNullable(isoDateTime) - .map(i -> LocalDateTime.parse(isoDateTime, ISO_DATE_TIME_NANOS_FORMAT)) - .map(Timestamp::valueOf) + public static Timestamp toTimestampFromIsoDateTime(String isoDT, TimeZone tz) { + return Optional.ofNullable(isoDT) + .map(i -> LocalDateTime.parse(isoDT, ISO_DATE_TIME_NANOS_FORMAT)) + .map(t -> t.atZone(tz.toZoneId())) + .map(ChronoZonedDateTime::toInstant) + .map(Timestamp::from) .orElse(null); } diff --git a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/JdbcSinkConnectorTest.java b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/JdbcSinkConnectorTest.java new file mode 100644 index 00000000..d7c49bf1 --- /dev/null +++ b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/JdbcSinkConnectorTest.java @@ -0,0 +1,97 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.jdbc; + + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.kafka.common.config.Config; + +import static io.confluent.connect.jdbc.sink.JdbcSinkConfig.PK_MODE; +import static java.util.Collections.EMPTY_LIST; +import static java.util.Collections.singletonList; +import static org.junit.Assert.*; + +import org.junit.Test; + +public class JdbcSinkConnectorTest { + + @Test + public void testValidationWhenDeleteEnabled() { + + JdbcSinkConnector connector = new JdbcSinkConnector(); + + Map connConfig = new HashMap<>(); + connConfig.put("connector.class", "io.confluent.connect.jdbc.JdbcSinkConnector"); + connConfig.put("delete.enabled", "true"); + + connConfig.put("pk.mode", "record_key"); + assertEquals("'pk.mode must be 'RECORD_KEY/record_key' when 'delete.enabled' == true", + EMPTY_LIST, configErrors(connector.validate(connConfig), PK_MODE)); + + connConfig.put("pk.mode", "RECORD_KEY"); + assertEquals("pk.mode must be 'RECORD_KEY/record_key' when 'delete.enabled' == true", + EMPTY_LIST, configErrors(connector.validate(connConfig), PK_MODE)); + + connConfig.put("pk.mode", "none"); + + final String conflictMsg = "Deletes are only supported for pk.mode record_key"; + + assertEquals("'record_key' is the only valid mode when 'delete.enabled' == true", + singletonList(conflictMsg), + configErrors(connector.validate(connConfig), PK_MODE)); + } + + @Test + public void testValidationWhenDeleteNotEnabled() { + + JdbcSinkConnector connector = new JdbcSinkConnector(); + + Map connConfig = new HashMap<>(); + connConfig.put("connector.class", "io.confluent.connect.jdbc.JdbcSinkConnector"); + connConfig.put("delete.enabled", "false"); + + connConfig.put("pk.mode", "none"); + assertEquals("any defined mode is valid when 'delete.enabled' == false", + EMPTY_LIST, configErrors(connector.validate(connConfig), PK_MODE)); + } + + @Test + public void testValidationWhenPKModeInvalid() { + + JdbcSinkConnector connector = new JdbcSinkConnector(); + + Map connConfig = new HashMap<>(); + connConfig.put("connector.class", "io.confluent.connect.jdbc.JdbcSinkConnector"); + connConfig.put("delete.enabled", "false"); + connConfig.put("pk.mode", "gibberish"); + + assertEquals("no double reporting for unknown pk.mode", + 1, configErrors(connector.validate(connConfig), PK_MODE).size()); + } + + + private List configErrors(Config config, String propertyName) { + return config.configValues() + .stream() + .flatMap(cfg -> propertyName.equals(cfg.name()) ? + cfg.errorMessages().stream() : Stream.empty()) + .collect(Collectors.toList()); + } +} diff --git a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/JdbcSourceConnectorTest.java b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/JdbcSourceConnectorTest.java index 455581e8..50d33475 100644 --- a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/JdbcSourceConnectorTest.java +++ b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/JdbcSourceConnectorTest.java @@ -15,6 +15,8 @@ package io.confluent.connect.jdbc; +import org.apache.kafka.common.config.Config; +import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.errors.ConnectException; import org.easymock.EasyMock; @@ -48,6 +50,7 @@ import io.confluent.connect.jdbc.util.TableId; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -58,7 +61,7 @@ public class JdbcSourceConnectorTest { private JdbcSourceConnector connector; private EmbeddedDerby db; - private Map connProps; + private Map props; public static class MockJdbcSourceConnector extends JdbcSourceConnector { CachedConnectionProvider provider; @@ -84,10 +87,10 @@ protected CachedConnectionProvider connectionProvider( public void setup() { connector = new JdbcSourceConnector(); db = new EmbeddedDerby(); - connProps = new HashMap<>(); - connProps.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, db.getUrl()); - connProps.put(JdbcSourceConnectorConfig.MODE_CONFIG, JdbcSourceConnectorConfig.MODE_BULK); - connProps.put(JdbcSourceConnectorConfig.TOPIC_PREFIX_CONFIG, "test-"); + props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, db.getUrl()); + props.put(JdbcSourceConnectorConfig.MODE_CONFIG, JdbcSourceConnectorConfig.MODE_BULK); + props.put(JdbcSourceConnectorConfig.TOPIC_PREFIX_CONFIG, "test-"); } @After @@ -139,19 +142,19 @@ public void testStartStop() throws Exception { PowerMock.replayAll(); - connector.start(connProps); + connector.start(props); connector.stop(); PowerMock.verifyAll(); } @Test - public void testNoTablesNoTasks() throws Exception { + public void testNoTablesSpawnsSingleTask() throws Exception { // Tests case where there are no readable tables and ensures that no tasks // are returned to be run - connector.start(connProps); + connector.start(props); List> configs = connector.taskConfigs(3); - assertTrue(configs.isEmpty()); + assertEquals(1, configs.size()); connector.stop(); } @@ -170,7 +173,7 @@ public void testPartitioningOneTable() throws Exception { EasyMock.replay(connectorContext); connector.initialize(connectorContext); - connector.start(connProps); + connector.start(props); assertTrue( "Connector should have request task reconfiguration after reading tables from the database", taskReconfigurationLatch.await(10, TimeUnit.SECONDS) @@ -201,7 +204,7 @@ public void testPartitioningManyTables() throws Exception { EasyMock.replay(connectorContext); connector.initialize(connectorContext); - connector.start(connProps); + connector.start(props); assertTrue( "Connector should have request task reconfiguration after reading tables from the database", taskReconfigurationLatch.await(10, TimeUnit.SECONDS) @@ -227,8 +230,8 @@ public void testPartitioningQuery() throws Exception { db.createTable("test1", "id", "INT NOT NULL"); db.createTable("test2", "id", "INT NOT NULL"); final String sample_query = "SELECT foo, bar FROM sample_table"; - connProps.put(JdbcSourceConnectorConfig.QUERY_CONFIG, sample_query); - connector.start(connProps); + props.put(JdbcSourceConnectorConfig.QUERY_CONFIG, sample_query); + connector.start(props); List> configs = connector.taskConfigs(3); assertEquals(1, configs.size()); assertTaskConfigsHaveParentConfigs(configs); @@ -242,9 +245,9 @@ public void testPartitioningQuery() throws Exception { @Test(expected = ConnectException.class) public void testConflictingQueryTableSettings() { final String sample_query = "SELECT foo, bar FROM sample_table"; - connProps.put(JdbcSourceConnectorConfig.QUERY_CONFIG, sample_query); - connProps.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "foo,bar"); - connector.start(connProps); + props.put(JdbcSourceConnectorConfig.QUERY_CONFIG, sample_query); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "foo,bar"); + connector.start(props); } private void assertTaskConfigsHaveParentConfigs(List> configs) { @@ -254,6 +257,109 @@ private void assertTaskConfigsHaveParentConfigs(List> config } } + + @Test + public void testSqlServerIsolationModeWithCorrectDialect() { + + props.put(JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG, "SQL_SERVER_SNAPSHOT"); + props.put(JdbcSourceConnectorConfig.DIALECT_NAME_CONFIG, "SqlServerDatabaseDialect"); + + Config config = connector.validate(props); + HashMap configValues = new HashMap<>(); + config.configValues().stream() + .filter((configValue) -> + configValue.name().equals( + JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG + ) + ).forEach(configValue -> configValues.putIfAbsent(configValue.name(), configValue)); + + assertTrue( + configValues.get( + JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG + ).errorMessages().isEmpty() + ); + } + + @Test + public void testSqlServerIsolationModeIncorrectDialect() { + props.put(JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG, "SQL_SERVER_SNAPSHOT"); + props.put(JdbcSourceConnectorConfig.DIALECT_NAME_CONFIG, "MySqlDatabaseDialect"); + + Config config = connector.validate(props); + HashMap configValues = new HashMap<>(); + config.configValues().stream() + .filter((configValue) -> + configValue.name().equals( + JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG + ) + ).forEach(configValue -> configValues.putIfAbsent(configValue.name(), configValue)); + + List errors = configValues.get( + JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG + ).errorMessages(); + assertFalse(errors.isEmpty()); + assertEquals(1, errors.size()); + assertTrue(errors.get(0).contains( + "Isolation mode of `SQL_SERVER_SNAPSHOT` can only be" + + " configured with a Sql Server Dialect") + ); + } + + @Test + public void testSqlServerIsolationModeWithCorrectUrl() { + List sqlServerConnectionUrlTypes = new ArrayList<>(); + sqlServerConnectionUrlTypes.add("jdbc:sqlserver://localhost;user=Me"); + sqlServerConnectionUrlTypes.add("jdbc:microsoft:sqlserver://localhost;user=Me"); + sqlServerConnectionUrlTypes.add("jdbc:jtds:sqlserver://localhost;user=Me"); + + for (String urlType : sqlServerConnectionUrlTypes) { + props.put(JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG, "SQL_SERVER_SNAPSHOT"); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, urlType); + + Config config = connector.validate(props); + HashMap configValues = new HashMap<>(); + config.configValues().stream() + .filter((configValue) -> + configValue.name().equals( + JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG + ) + ).forEach(configValue -> configValues.putIfAbsent(configValue.name(), configValue)); + + assertTrue( + configValues.get( + JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG + ).errorMessages().isEmpty() + ); + } + } + + @Test + public void testSqlServerIsolationModeWithIncorrectUrl() { + props.put(JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG, "SQL_SERVER_SNAPSHOT"); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:mysql://localhost:3306/sakila?profileSQL=true"); + + Config config = connector.validate(props); + HashMap configValues = new HashMap<>(); + config.configValues().stream() + .filter((configValue) -> + configValue.name().equals( + JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG + ) + ).forEach(configValue -> configValues.putIfAbsent(configValue.name(), configValue)); + + List errors = configValues.get( + JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG + ).errorMessages(); + assertFalse(errors.isEmpty()); + assertEquals(1, errors.size()); + assertTrue(errors.get(0).contains( + "Isolation mode of `SQL_SERVER_SNAPSHOT` can only be" + + " configured with a Sql Server Dialect") + ); + + + } + private String tables(String... names) { List tableIds = new ArrayList<>(); for (String name : names) { diff --git a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/dialect/BaseDialectTest.java b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/dialect/BaseDialectTest.java index 7fa9235f..9156a85c 100644 --- a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/dialect/BaseDialectTest.java +++ b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/dialect/BaseDialectTest.java @@ -480,7 +480,12 @@ public void bindFieldNull() throws SQLException { ); int index = 0; for (Schema schema : nullableTypes) { - verifyBindField(++index, schema, null).setObject(index, null); + final Integer sqlType = dialect.getSqlTypeForSchema(schema); + if (sqlType == null) { + verifyBindField(++index, schema, null).setObject(index, null); + } else { + verifyBindField(++index, schema, null).setNull(index, sqlType); + } } } diff --git a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java index e8e66cfd..353b506f 100644 --- a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java +++ b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialectTest.java @@ -16,6 +16,7 @@ package io.confluent.connect.jdbc.dialect; import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; @@ -526,6 +527,44 @@ public void writeColumnSpec() { verifyWriteColumnSpec("foo DUMMY NULL", new SinkRecordField(Schema.OPTIONAL_INT32_SCHEMA, "foo", false)); } + @Test + public void setTransactionIsolationModes() throws SQLException { + GenericDatabaseDialect dialect = dummyDialect(); + Connection conn = db.getConnection(); + + // READ_UNCOMMITTED + dialect.setConnectionIsolationMode(conn, + JdbcSourceConnectorConfig.TransactionIsolationMode.READ_UNCOMMITTED + ); + assertEquals(conn.getTransactionIsolation(), Connection.TRANSACTION_READ_UNCOMMITTED); + + // READ_COMMITTED + dialect.setConnectionIsolationMode(conn, + JdbcSourceConnectorConfig.TransactionIsolationMode.READ_COMMITTED + ); + assertEquals(conn.getTransactionIsolation(), Connection.TRANSACTION_READ_COMMITTED); + + // REPEATABLE READ + dialect.setConnectionIsolationMode(conn, + JdbcSourceConnectorConfig.TransactionIsolationMode.REPEATABLE_READ + ); + assertEquals(conn.getTransactionIsolation(), Connection.TRANSACTION_REPEATABLE_READ); + + // SERIALIZABLE + dialect.setConnectionIsolationMode(conn, + JdbcSourceConnectorConfig.TransactionIsolationMode.SERIALIZABLE + ); + assertEquals(conn.getTransactionIsolation(), Connection.TRANSACTION_SERIALIZABLE); + + // this transaction isolation mode is not supported. No error is expected. + // Just a warning. Old isolation mode is maintained. + dialect.setConnectionIsolationMode(conn, + JdbcSourceConnectorConfig.TransactionIsolationMode.SQL_SERVER_SNAPSHOT + ); + // confirm transaction isolation mode does not change. + assertEquals(conn.getTransactionIsolation(), Connection.TRANSACTION_SERIALIZABLE); + } + @Test public void shouldSanitizeUrlWithoutCredentialsInProperties() { assertSanitizedUrl( @@ -610,4 +649,4 @@ public void shouldAddExtraProperties() { assertFalse(modified.containsKey("foo2")); assertFalse(modified.containsKey("connection.foo2")); } -} +} \ No newline at end of file diff --git a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java index fddd0ae4..d01fdbbe 100644 --- a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java +++ b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/dialect/OracleDatabaseDialectTest.java @@ -79,7 +79,7 @@ public void shouldMapPrimitiveSchemaTypeToSqlTypes() { assertPrimitiveMapping(Type.FLOAT64, "BINARY_DOUBLE"); assertPrimitiveMapping(Type.BOOLEAN, "NUMBER(1,0)"); assertPrimitiveMapping(Type.BYTES, "BLOB"); - assertPrimitiveMapping(Type.STRING, "CLOB"); + assertPrimitiveMapping(Type.STRING, "VARCHAR2(4000)"); } @Test @@ -99,7 +99,7 @@ public void shouldMapDataTypes() { verifyDataTypeMapping("BINARY_FLOAT", Schema.FLOAT32_SCHEMA); verifyDataTypeMapping("BINARY_DOUBLE", Schema.FLOAT64_SCHEMA); verifyDataTypeMapping("NUMBER(1,0)", Schema.BOOLEAN_SCHEMA); - verifyDataTypeMapping("CLOB", Schema.STRING_SCHEMA); + verifyDataTypeMapping("VARCHAR2(4000)", Schema.STRING_SCHEMA); verifyDataTypeMapping("BLOB", Schema.BYTES_SCHEMA); verifyDataTypeMapping("NUMBER(*,0)", Decimal.schema(0)); verifyDataTypeMapping("NUMBER(*,42)", Decimal.schema(42)); @@ -126,8 +126,8 @@ public void shouldMapTimestampSchemaTypeToTimestampSqlType() { @Test public void shouldBuildCreateQueryStatement() { String expected = "CREATE TABLE \"myTable\" (\n" + "\"c1\" NUMBER(10,0) NOT NULL,\n" + - "\"c2\" NUMBER(19,0) NOT NULL,\n" + "\"c3\" CLOB NOT NULL,\n" + - "\"c4\" CLOB NULL,\n" + "\"c5\" DATE DEFAULT '2001-03-15',\n" + + "\"c2\" NUMBER(19,0) NOT NULL,\n" + "\"c3\" VARCHAR2(4000) NOT NULL,\n" + + "\"c4\" VARCHAR2(4000) NULL,\n" + "\"c5\" DATE DEFAULT '2001-03-15',\n" + "\"c6\" DATE DEFAULT '00:00:00.000',\n" + "\"c7\" TIMESTAMP DEFAULT '2001-03-15 00:00:00.000',\n" + "\"c8\" NUMBER(*,4) NULL,\n" + @@ -144,8 +144,8 @@ public void shouldBuildAlterTableStatement() { "ALTER TABLE \"myTable\" ADD(\n" + "\"c1\" NUMBER(10,0) NOT NULL,\n" + "\"c2\" NUMBER(19,0) NOT NULL,\n" + - "\"c3\" CLOB NOT NULL,\n" + - "\"c4\" CLOB NULL,\n" + + "\"c3\" VARCHAR2(4000) NOT NULL,\n" + + "\"c4\" VARCHAR2(4000) NULL,\n" + "\"c5\" DATE DEFAULT '2001-03-15',\n" + "\"c6\" DATE DEFAULT '00:00:00.000',\n" + "\"c7\" TIMESTAMP DEFAULT '2001-03-15 00:00:00.000',\n" + @@ -163,8 +163,8 @@ public void shouldBuildAlterTableStatement() { "ALTER TABLE myTable ADD(\n" + "c1 NUMBER(10,0) NOT NULL,\n" + "c2 NUMBER(19,0) NOT NULL,\n" + - "c3 CLOB NOT NULL,\n" + - "c4 CLOB NULL,\n" + + "c3 VARCHAR2(4000) NOT NULL,\n" + + "c4 VARCHAR2(4000) NULL,\n" + "c5 DATE DEFAULT '2001-03-15',\n" + "c6 DATE DEFAULT '00:00:00.000',\n" + "c7 TIMESTAMP DEFAULT '2001-03-15 00:00:00.000',\n" + diff --git a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java index ed6cce19..c8e993b4 100644 --- a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java +++ b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java @@ -15,15 +15,16 @@ package io.confluent.connect.jdbc.dialect; +import io.confluent.connect.jdbc.util.ColumnDefinition; import io.confluent.connect.jdbc.util.ColumnId; import io.confluent.connect.jdbc.util.QuoteMethod; import io.confluent.connect.jdbc.util.TableDefinition; import io.confluent.connect.jdbc.util.TableDefinitionBuilder; import io.confluent.connect.jdbc.util.TableId; + import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Schema.Type; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Time; @@ -555,4 +556,41 @@ public void shouldTruncateTableNames() { actualTableId = dialect.parseTableIdentifier(tableFqn); assertEquals(expectedTableId, actualTableId); } + + @Test + public void shouldFallBackOnUnknownDecimalScale() { + ColumnId columnId = new ColumnId(new TableId("catalog", "schema", "table"), "column"); + ColumnDefinition definition = mock(ColumnDefinition.class); + when(definition.id()).thenReturn(columnId); + + when(definition.precision()).thenReturn(4); + when(definition.scale()).thenReturn(GenericDatabaseDialect.NUMERIC_TYPE_SCALE_UNSET); + + assertEquals(GenericDatabaseDialect.NUMERIC_TYPE_SCALE_HIGH, dialect.decimalScale(definition)); + } + + @Test + public void shouldFallBackOnUnfixedDecimalScale() { + ColumnId columnId = new ColumnId(new TableId("catalog", "schema", "table"), "column"); + ColumnDefinition definition = mock(ColumnDefinition.class); + when(definition.id()).thenReturn(columnId); + + when(definition.precision()).thenReturn(0); + when(definition.scale()).thenReturn(0); + + assertEquals(GenericDatabaseDialect.NUMERIC_TYPE_SCALE_HIGH, dialect.decimalScale(definition)); + } + + @Test + public void shouldNotFallBackOnKnownDecimalScale() { + ColumnId columnId = new ColumnId(new TableId("catalog", "schema", "table"), "column"); + ColumnDefinition definition = mock(ColumnDefinition.class); + when(definition.id()).thenReturn(columnId); + + when(definition.precision()).thenReturn(0); + when(definition.scale()).thenReturn(5); + + assertEquals(5, dialect.decimalScale(definition)); + } + } diff --git a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/sink/BufferedRecordsTest.java b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/sink/BufferedRecordsTest.java index f22597d6..2de93007 100644 --- a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/sink/BufferedRecordsTest.java +++ b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/sink/BufferedRecordsTest.java @@ -18,7 +18,7 @@ import org.apache.kafka.common.config.ConfigException; import io.confluent.connect.jdbc.util.ColumnDefinition; import io.confluent.connect.jdbc.util.TableDefinition; -import java.sql.Types; + import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; @@ -31,10 +31,12 @@ import org.mockito.Mockito; import java.io.IOException; +import java.sql.BatchUpdateException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Types; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -453,8 +455,8 @@ public void testMultipleDeletesWithSchemaBatchedTogether() throws SQLException { assertEquals(Collections.singletonList(recordB), buffer.flush()); } - @Test - public void testFlushSuccessNoInfo() throws SQLException { + @Test(expected = BatchUpdateException.class) + public void testFlushExecuteFailed() throws SQLException { final String url = sqliteHelper.sqliteUri(); final JdbcSinkConfig config = new JdbcSinkConfig(props); @@ -462,7 +464,7 @@ public void testFlushSuccessNoInfo() throws SQLException { int[] batchResponse = new int[2]; batchResponse[0] = Statement.SUCCESS_NO_INFO; - batchResponse[1] = Statement.SUCCESS_NO_INFO; + batchResponse[1] = Statement.EXECUTE_FAILED; final ColumnDefinition colDefMock = mock(ColumnDefinition.class); when(colDefMock.type()).thenReturn(Types.VARCHAR); diff --git a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/sink/integration/MicrosoftSqlServerSinkIT.java b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/sink/integration/MicrosoftSqlServerSinkIT.java index e05d3f29..a2568b25 100644 --- a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/sink/integration/MicrosoftSqlServerSinkIT.java +++ b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/sink/integration/MicrosoftSqlServerSinkIT.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; @@ -35,10 +36,7 @@ public class MicrosoftSqlServerSinkIT extends BaseConnectorIT { private static final Logger log = LoggerFactory.getLogger(MicrosoftSqlServerSinkIT.class); private static final String CONNECTOR_NAME = "jdbc-sink-connector"; - private static final int TASKS_MAX = 3; private static final String MSSQL_URL = "jdbc:sqlserver://0.0.0.0:1433"; - private static final String MSSQL_Table = "example_table"; - private static final List KAFKA_TOPICS = Collections.singletonList(MSSQL_Table); private Map props; private Connection connection; private JsonConverter jsonConverter; @@ -47,6 +45,7 @@ public class MicrosoftSqlServerSinkIT extends BaseConnectorIT { private static final String PASS = "reallyStrongPwd123"; // test creds @ClassRule + @SuppressWarnings("deprecation") public static final FixedHostPortGenericContainer mssqlServer = new FixedHostPortGenericContainer<>("microsoft/mssql-server-linux:latest") .withEnv("ACCEPT_EULA","Y") @@ -77,20 +76,22 @@ public void close() throws SQLException { */ @Test public void verifyConnectorFailsWhenTableNameS() throws Exception { + final String table = "example_table"; + // Setup up props for the sink connector - props = configProperties(); + props = configProperties(table); // create table - String sql = "CREATE TABLE guest." + MSSQL_Table + String sql = "CREATE TABLE guest." + table + " (id int NULL, last_name VARCHAR(50), created_at DATETIME2 NOT NULL);"; PreparedStatement createStmt = connection.prepareStatement(sql); executeSQL(createStmt); // Create topic in Kafka - KAFKA_TOPICS.forEach(topic -> connect.kafka().createTopic(topic, 1)); + connect.kafka().createTopic(table, 1); // Configure sink connector - configureAndWaitForConnector(); + configureAndWaitForConnector(1); //create record and produce it Timestamp t = Timestamp.from( @@ -106,23 +107,47 @@ public void verifyConnectorFailsWhenTableNameS() throws Exception { .put("last_name", "Brams") .put("created_at", t); - String kafkaValue = new String(jsonConverter.fromConnectData(MSSQL_Table, schema, struct)); - connect.kafka().produce(MSSQL_Table, null, kafkaValue); - - //sleep till it fails - Thread.sleep(Duration.ofSeconds(30).toMillis()); + String kafkaValue = new String(jsonConverter.fromConnectData(table, schema, struct)); + connect.kafka().produce(table, null, kafkaValue); //verify that connector failed because it cannot find the table. assertTasksFailedWithTrace( CONNECTOR_NAME, - Math.min(KAFKA_TOPICS.size(), TASKS_MAX), + 1, "Table \"dbo\".\"" - + MSSQL_Table + + table + "\" is missing and auto-creation is disabled" ); } - private Map configProperties() { + /** + * Verify that inserting a null BYTES value succeeds. + */ + @Test + public void verifyNullBYTESValue() throws Exception { + final String table = "optional_bytes"; + + props = configProperties(table); + props.put("auto.create", "true"); + + connect.kafka().createTopic(table, 1); + configureAndWaitForConnector(1); + + final Schema schema = SchemaBuilder.struct().name("com.example.OptionalBytes") + .field("id", Schema.INT32_SCHEMA) + .field("optional_bytes", Schema.OPTIONAL_BYTES_SCHEMA) + .build(); + final Struct struct = new Struct(schema) + .put("id", 1) + .put("optional_bytes", null); + + String kafkaValue = new String(jsonConverter.fromConnectData(table, schema, struct)); + connect.kafka().produce(table, null, kafkaValue); + + waitForCommittedRecords(CONNECTOR_NAME, Collections.singleton(table), 1, 1, TimeUnit.MINUTES.toMillis(2)); + } + + private Map configProperties(String topic) { // Create a hashmap to setup sink connector config properties Map props = new HashMap<>(); @@ -138,7 +163,7 @@ private Map configProperties() { props.put(JdbcSinkConfig.CONNECTION_USER, USER); props.put(JdbcSinkConfig.CONNECTION_PASSWORD, PASS); props.put("pk.mode", "none"); - props.put("topics", MSSQL_Table); + props.put("topics", topic); return props; } @@ -151,12 +176,11 @@ private void executeSQL(PreparedStatement stmt) throws Exception { } } - private void configureAndWaitForConnector() throws Exception { + private void configureAndWaitForConnector(int numTasks) throws Exception { // start a sink connector connect.configureConnector(CONNECTOR_NAME, props); // wait for tasks to spin up - int minimumNumTasks = Math.min(KAFKA_TOPICS.size(), TASKS_MAX); - waitForConnectorToStart(CONNECTOR_NAME, minimumNumTasks); + waitForConnectorToStart(CONNECTOR_NAME, numTasks); } } diff --git a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceTaskLifecycleTest.java b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceTaskLifecycleTest.java index d96c6c4a..067d066f 100644 --- a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceTaskLifecycleTest.java +++ b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceTaskLifecycleTest.java @@ -26,15 +26,23 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.sql.Connection; +import java.sql.SQLException; import java.sql.SQLNonTransientException; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import io.confluent.connect.jdbc.util.CachedConnectionProvider; +import static org.easymock.EasyMock.anyBoolean; +import static org.easymock.EasyMock.anyLong; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.hamcrest.core.StringContains.containsString; @@ -42,6 +50,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; + @RunWith(PowerMockRunner.class) @PowerMockIgnore("javax.management.*") public class JdbcSourceTaskLifecycleTest extends JdbcSourceTaskTestBase { @@ -137,6 +146,7 @@ protected CachedConnectionProvider connectionProvider( // Should request a connection, then should close it on stop() EasyMock.expect(mockCachedConnectionProvider.getConnection()).andReturn(db.getConnection()); + EasyMock.expect(mockCachedConnectionProvider.getConnection()).andReturn(db.getConnection()); mockCachedConnectionProvider.close(); PowerMock.expectLastCall(); @@ -308,6 +318,55 @@ public void testNonTransientSQLExceptionThrows() throws Exception { assertThat(e.getMessage(), containsString("not_existing_table")); } + @Test(expected = ConnectException.class) + public void testTransientSQLExceptionRetries() throws Exception { + + int retryMax = 2; //max times to retry + TableQuerier bulkTableQuerier = EasyMock.createMock(BulkTableQuerier.class); + + for (int i = 0; i < retryMax+1; i++) { + expect(bulkTableQuerier.querying()).andReturn(true); + bulkTableQuerier.maybeStartQuery(anyObject()); + expectLastCall().andThrow(new SQLException("This is a transient exception")); + + expect(bulkTableQuerier.getAttemptedRetryCount()).andReturn(i); + // Called another time in error logging + expect(bulkTableQuerier.getAttemptedRetryCount()).andReturn(i); + bulkTableQuerier.incrementRetryCount(); + expectLastCall().once(); + bulkTableQuerier.reset(anyLong(), anyBoolean()); + } + + replay(bulkTableQuerier); + JdbcSourceTask mockedTask = setUpMockedTask(bulkTableQuerier, retryMax); + + for (int i = 0; i < retryMax+1; i++) { + mockedTask.poll(); + } + } + + + private JdbcSourceTask setUpMockedTask(TableQuerier bulkTableQuerier, int retryMax) throws Exception { + CachedConnectionProvider mockCachedConnectionProvider = EasyMock.createMock(CachedConnectionProvider.class); + for (int i = 0; i < retryMax+1; i++) { + expect(mockCachedConnectionProvider.getConnection()).andReturn(null); + } + replay(mockCachedConnectionProvider); + + PriorityQueue priorityQueue = new PriorityQueue<>(); + priorityQueue.add(bulkTableQuerier); + + + JdbcSourceTask mockedTask = new JdbcSourceTask(time); + mockedTask.start(singleTableConfig()); + + mockedTask.tableQueue = priorityQueue; + mockedTask.cachedConnectionProvider = mockCachedConnectionProvider; + mockedTask.maxRetriesPerQuerier = retryMax; + + return mockedTask; + } + private static void validatePollResultTable(List records, int expected, String table) { assertEquals(expected, records.size()); diff --git a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceTaskUpdateTest.java b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceTaskUpdateTest.java index be440ec5..6474ba62 100644 --- a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceTaskUpdateTest.java +++ b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceTaskUpdateTest.java @@ -23,7 +23,6 @@ import static org.junit.Assert.fail; import java.sql.Timestamp; -import java.time.Duration; import java.time.ZoneOffset; import java.util.Arrays; import java.util.Collections; @@ -42,7 +41,6 @@ import org.junit.runner.RunWith; import org.powermock.api.easymock.PowerMock; import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import io.confluent.connect.jdbc.util.DateTimeUtils; @@ -865,6 +863,24 @@ public void testCustomQueryWithTimestamp() throws Exception { PowerMock.verifyAll(); } + @Test (expected = ConnectException.class) + public void testTaskFailsIfNoQueryOrTablesConfigProvided() { + initializeTask(); + Map props = new HashMap<>(); + props.put(JdbcSourceTaskConfig.TABLES_CONFIG, "[]"); + props.put(JdbcSourceConnectorConfig.QUERY_CONFIG, ""); + task.start(props); + } + + @Test (expected = ConnectException.class) + public void testTaskFailsIfBothQueryAndTablesConfigProvided() { + initializeTask(); + Map props = new HashMap<>(); + props.put(JdbcSourceTaskConfig.TABLES_CONFIG, "[dbo.table]"); + props.put(JdbcSourceConnectorConfig.QUERY_CONFIG, "Select * from some table"); + task.start(props); + } + private void startTask(String timestampColumn, String incrementingColumn, String query) { startTask(timestampColumn, incrementingColumn, query, 0L, "UTC"); } diff --git a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/TableMonitorThreadTest.java b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/TableMonitorThreadTest.java index 219a5ab6..e58603f5 100644 --- a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/TableMonitorThreadTest.java +++ b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/TableMonitorThreadTest.java @@ -15,6 +15,8 @@ package io.confluent.connect.jdbc.source; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.errors.ConnectException; import org.easymock.EasyMock; @@ -51,6 +53,7 @@ @PrepareForTest({JdbcSourceTask.class}) @PowerMockIgnore("javax.management.*") public class TableMonitorThreadTest { + private static final long STARTUP_LIMIT = 50; private static final long POLL_INTERVAL = 100; private final static TableId FOO = new TableId(null, null, "foo"); @@ -83,12 +86,13 @@ public class TableMonitorThreadTest { @Mock private Connection connection; @Mock private DatabaseDialect dialect; @Mock private ConnectorContext context; + @Mock private Time time; @Test public void testSingleLookup() throws Exception { EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, null, null); + STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM); expectTableNames(LIST_FOO, shutdownThread()); EasyMock.replay(connectionProvider, dialect); @@ -100,11 +104,10 @@ public void testSingleLookup() throws Exception { } @Test - public void testTablesNotBlockedOnUpdateThread() throws Exception { + public void testTablesBlockingTimeoutOnUpdateThread() throws Exception { EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, null, null); - + STARTUP_LIMIT, 0, null, null, time); CountDownLatch connectionRequested = new CountDownLatch(1); CountDownLatch connectionCompleted = new CountDownLatch(1); @@ -115,7 +118,13 @@ public void testTablesNotBlockedOnUpdateThread() throws Exception { return connection; }).anyTimes(); - EasyMock.replay(connectionProvider, connection, dialect); + EasyMock.expect(time.milliseconds()).andReturn(0L).anyTimes(); + time.waitObject(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.eq(STARTUP_LIMIT)); + EasyMock.expectLastCall() + .andThrow(new TimeoutException()) + .anyTimes(); + + EasyMock.replay(connectionProvider, connection, dialect, time); // Haven't had a chance to start the first table read; should return null to signify that no // attempt to list tables on the database has succeeded yet @@ -141,6 +150,34 @@ public void testTablesNotBlockedOnUpdateThread() throws Exception { // Have completed a table read; should return an empty list (instead of null) to signify that // we've been able to read the tables from the database, but just can't find any to query assertEquals(Collections.emptyList(), tableMonitorThread.tables()); + + EasyMock.verify(time); + } + + @Test + public void testTablesBlockingWithDeadlineOnUpdateThread() throws Exception { + EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()); + tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, + STARTUP_LIMIT, POLL_INTERVAL, null, null, time); + + EasyMock.expect(dialect.tableIds(EasyMock.eq(connection))).andReturn(Collections.emptyList()); + EasyMock.expect(connectionProvider.getConnection()).andReturn(connection); + + long currentTime = System.currentTimeMillis(); + EasyMock.expect(time.milliseconds()).andReturn(currentTime); + time.waitObject( + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.eq(currentTime + STARTUP_LIMIT)); + + EasyMock.replay(connectionProvider, connection, dialect, time); + + tableMonitorThread.start(); + tableMonitorThread.join(); + + assertEquals(Collections.emptyList(), tableMonitorThread.tables()); + + EasyMock.verify(time); } @Test @@ -148,7 +185,7 @@ public void testWhitelist() throws Exception { Set whitelist = new HashSet<>(Arrays.asList("foo", "bar")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, whitelist, null); + STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM); expectTableNames(LIST_FOO_BAR, shutdownThread()); EasyMock.replay(connectionProvider, dialect); @@ -164,7 +201,7 @@ public void testBlacklist() throws Exception { Set blacklist = new HashSet<>(Arrays.asList("bar", "baz")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, null, blacklist); + STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM); expectTableNames(LIST_FOO_BAR_BAZ, shutdownThread()); EasyMock.replay(connectionProvider, dialect); @@ -179,7 +216,7 @@ public void testBlacklist() throws Exception { public void testReconfigOnUpdate() throws Exception { EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, null, null); + STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM); expectTableNames(LIST_FOO); expectTableNames(LIST_FOO, checkTableNames("foo")); context.requestTaskReconfiguration(); @@ -207,7 +244,7 @@ public void testReconfigOnUpdate() throws Exception { @Test public void testInvalidConnection() throws Exception { tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, null, null); + STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM); EasyMock.expect(connectionProvider.getConnection()).andThrow(new ConnectException("Simulated error with the db.")); CountDownLatch errorLatch = new CountDownLatch(1); @@ -230,7 +267,7 @@ public void testInvalidConnection() throws Exception { public void testDuplicates() throws Exception { EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, null, null); + STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM); expectTableNames(LIST_DUP_WITH_ALL, shutdownThread()); context.requestTaskReconfiguration(); EasyMock.expectLastCall(); @@ -248,7 +285,7 @@ public void testDuplicateWithUnqualifiedWhitelist() throws Exception { Set whitelist = new HashSet<>(Arrays.asList("dup")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, whitelist, null); + STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM); expectTableNames(LIST_DUP_ONLY, shutdownThread()); context.requestTaskReconfiguration(); EasyMock.expectLastCall(); @@ -267,7 +304,7 @@ public void testDuplicateWithUnqualifiedBlacklist() throws Exception { Set blacklist = new HashSet<>(Arrays.asList("foo")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, null, blacklist); + STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM); expectTableNames(LIST_DUP_WITH_ALL, shutdownThread()); context.requestTaskReconfiguration(); EasyMock.expectLastCall(); @@ -286,7 +323,7 @@ public void testDuplicateWithQualifiedWhitelist() throws Exception { Set whitelist = new HashSet<>(Arrays.asList("dup1.dup", "foo")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, whitelist, null); + STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM); expectTableNames(LIST_DUP_WITH_ALL, shutdownThread()); EasyMock.replay(connectionProvider, dialect); @@ -301,7 +338,7 @@ public void testDuplicateWithQualifiedBlacklist() throws Exception { Set blacklist = new HashSet<>(Arrays.asList("dup1.dup", "foo")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - POLL_INTERVAL, null, blacklist); + STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM); expectTableNames(LIST_DUP_WITH_ALL, shutdownThread()); EasyMock.replay(connectionProvider, dialect); diff --git a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/TimestampIncrementingCriteriaTest.java b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/TimestampIncrementingCriteriaTest.java index 9fcc4f6a..c0874ecd 100644 --- a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/TimestampIncrementingCriteriaTest.java +++ b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/TimestampIncrementingCriteriaTest.java @@ -221,8 +221,8 @@ public void extractWithTsColumnIsoDateTimeString() throws Exception { .field(TS2_COLUMN.name(), SchemaBuilder.STRING_SCHEMA) .build(); record = new Struct(schema) - .put(TS1_COLUMN.name(), DateTimeUtils.toIsoDateTimeString(TS1)) - .put(TS2_COLUMN.name(), DateTimeUtils.toIsoDateTimeString(TS2)); + .put(TS1_COLUMN.name(), DateTimeUtils.toIsoDateTimeString(TS1, utcTimeZone)) + .put(TS2_COLUMN.name(), DateTimeUtils.toIsoDateTimeString(TS2, utcTimeZone)); assertExtractedOffset(-1, TS1, schema, record, TimestampGranularity.NANOS_ISO_DATETIME_STRING); } @@ -273,8 +273,8 @@ public void extractWithTsColumnIsoDateTimeStringNanosConfig() throws Exception { .field(TS2_COLUMN.name(), SchemaBuilder.STRING_SCHEMA) .build(); record = new Struct(schema) - .put(TS1_COLUMN.name(), DateTimeUtils.toIsoDateTimeString(TS1)) - .put(TS2_COLUMN.name(), DateTimeUtils.toIsoDateTimeString(TS2)); + .put(TS1_COLUMN.name(), DateTimeUtils.toIsoDateTimeString(TS1, utcTimeZone)) + .put(TS2_COLUMN.name(), DateTimeUtils.toIsoDateTimeString(TS2, utcTimeZone)); assertExtractedOffset(-1, TS1, schema, record, TimestampGranularity.NANOS_STRING); } diff --git a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/integration/MSSQLDateTimeIT.java b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/integration/MSSQLDateTimeIT.java index 99b16c89..ed8a807b 100644 --- a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/integration/MSSQLDateTimeIT.java +++ b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/source/integration/MSSQLDateTimeIT.java @@ -88,6 +88,7 @@ public class MSSQLDateTimeIT extends BaseConnectorIT { private Connection connection; @ClassRule + @SuppressWarnings("deprecation") public static final FixedHostPortGenericContainer mssqlServer = new FixedHostPortGenericContainer<>("microsoft/mssql-server-linux:latest") .withEnv("ACCEPT_EULA","Y") @@ -292,4 +293,4 @@ private void configureAndWaitForConnector() throws Exception { int minimumNumTasks = Math.min(KAFKA_TOPICS.size(), TASKS_MAX); waitForConnectorToStart(CONNECTOR_NAME, minimumNumTasks); } -} \ No newline at end of file +} diff --git a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/util/DateTimeUtilsTest.java b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/util/DateTimeUtilsTest.java index ad122080..5205d85c 100644 --- a/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/util/DateTimeUtilsTest.java +++ b/kafka-connect-jdbc/src/test/java/io/confluent/connect/jdbc/util/DateTimeUtilsTest.java @@ -20,12 +20,16 @@ import java.sql.Time; import java.sql.Timestamp; import java.time.Instant; +import java.time.ZoneOffset; +import java.util.TimeZone; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; public class DateTimeUtilsTest { + private TimeZone utcTimeZone = TimeZone.getTimeZone(ZoneOffset.UTC); + @Test public void testTimestampToNanosLong() { Timestamp timestamp = Timestamp.from(Instant.now()); @@ -62,15 +66,43 @@ public void testTimestampToNanosStringNull() { public void testTimestampToIsoDateTime() { Timestamp timestamp = Timestamp.from(Instant.now()); timestamp.setNanos(141362049); - String isoDateTime = DateTimeUtils.toIsoDateTimeString(timestamp); - assertEquals(timestamp, DateTimeUtils.toTimestampFromIsoDateTime(isoDateTime)); + String isoDateTime = DateTimeUtils.toIsoDateTimeString(timestamp, utcTimeZone); + assertEquals("141362049", isoDateTime.substring(isoDateTime.lastIndexOf('.') + 1)); + assertEquals(timestamp, DateTimeUtils.toTimestampFromIsoDateTime(isoDateTime, utcTimeZone)); + } + + @Test + public void testTimestampToIsoDateTimeNanosLeading0s() { + Timestamp timestamp = Timestamp.from(Instant.now()); + timestamp.setNanos(1); + String isoDateTime = DateTimeUtils.toIsoDateTimeString(timestamp, utcTimeZone); + assertEquals("000000001", isoDateTime.substring(isoDateTime.lastIndexOf('.') + 1)); + assertEquals(timestamp, DateTimeUtils.toTimestampFromIsoDateTime(isoDateTime, utcTimeZone)); + } + + @Test + public void testTimestampToIsoDateTimeNanosTrailing0s() { + Timestamp timestamp = Timestamp.from(Instant.now()); + timestamp.setNanos(100); + String isoDateTime = DateTimeUtils.toIsoDateTimeString(timestamp, utcTimeZone); + assertEquals("000000100", isoDateTime.substring(isoDateTime.lastIndexOf('.') + 1)); + assertEquals(timestamp, DateTimeUtils.toTimestampFromIsoDateTime(isoDateTime, utcTimeZone)); + } + + @Test + public void testTimestampToIsoDateTimeNanos0s() { + Timestamp timestamp = Timestamp.from(Instant.now()); + timestamp.setNanos(0); + String isoDateTime = DateTimeUtils.toIsoDateTimeString(timestamp, utcTimeZone); + assertEquals("000000000", isoDateTime.substring(isoDateTime.lastIndexOf('.') + 1)); + assertEquals(timestamp, DateTimeUtils.toTimestampFromIsoDateTime(isoDateTime, utcTimeZone)); } @Test public void testTimestampToIsoDateTimeNull() { - String isoDateTime = DateTimeUtils.toIsoDateTimeString(null); + String isoDateTime = DateTimeUtils.toIsoDateTimeString(null, utcTimeZone); assertNull(isoDateTime); - Timestamp timestamp = DateTimeUtils.toTimestampFromIsoDateTime(null); + Timestamp timestamp = DateTimeUtils.toTimestampFromIsoDateTime(null, utcTimeZone); assertNull(timestamp); } } From 852de2ab17a7c0d0e4a1cad04aad0eb6e181a913 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Tue, 30 Aug 2022 16:32:34 +0200 Subject: [PATCH 2/5] No longer depend on zookeeper for readiness check --- Dockerfile | 2 +- docker/kafka-wait | 15 +++++---------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/Dockerfile b/Dockerfile index dcb9abae..2efb16c0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,7 +13,7 @@ # limitations under the License. ARG BASE_IMAGE=radarbase/kafka-connect-transform-keyvalue:7.2.1 -FROM maven:3.8-jdk-11 as builder +FROM --platform=$BUILDPLATFORM maven:3.8-jdk-11 as builder # Make kafka-connect-jdbc source folder RUN mkdir /code /code/kafka-connect-jdbc diff --git a/docker/kafka-wait b/docker/kafka-wait index 56f70cdb..3ac86168 100755 --- a/docker/kafka-wait +++ b/docker/kafka-wait @@ -1,8 +1,8 @@ #!/bin/bash # Check if variables exist -if [ -z "$CONNECT_ZOOKEEPER_CONNECT" ]; then - echo "CONNECT_ZOOKEEPER_CONNECT is not defined" +if [ -z "$CONNECT_BOOTSTRAP_SERVERS" ]; then + echo "CONNECT_BOOTSTRAP_SERVERS is not defined" exit 2 fi @@ -18,14 +18,9 @@ max_timeout=32 tries=10 timeout=1 while true; do - ZOOKEEPER_CHECK=$(zookeeper-shell ${CONNECT_ZOOKEEPER_CONNECT} <<< "ls /brokers/ids" | tail -1) - echo "Zookeeper response: ${ZOOKEEPER_CHECK}" - # ZOOKEEPER_CHECK="${ZOOKEEPER_CHECK##*$'\n'}" - ZOOKEEPER_CHECK="$(echo -e "${ZOOKEEPER_CHECK}" | tr -d '[:space:]' | tr -d '[' | tr -d ']')" + KAFKA_CHECK=$(kafka-broker-api-versions --bootstrap-server "$CONNECT_BOOTSTRAP_SERVERS" | grep "(id: " | wc -l) - IFS=',' read -r -a array <<< ${ZOOKEEPER_CHECK} - LENGTH=${#array[@]} - if [ "$LENGTH" -eq "$KAFKA_BROKERS" ]; then + if [ "$KAFKA_CHECK" -ge "$KAFKA_BROKERS" ]; then echo "Kafka brokers available." break fi @@ -35,7 +30,7 @@ while true; do echo "FAILED: KAFKA BROKERs NOT READY." exit 5 fi - echo "Expected $KAFKA_BROKERS brokers but found only $LENGTH. Waiting $timeout second before retrying ..." + echo "Expected $KAFKA_BROKERS brokers but found only $KAFKA_CHECK. Waiting $timeout second before retrying ..." sleep ${timeout} if [ ${timeout} -lt ${max_timeout} ]; then timeout=$((timeout * 2)) From 0ed2c0f66e0a82f9f4351e51e9598197b973cf8c Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Tue, 30 Aug 2022 16:36:31 +0200 Subject: [PATCH 3/5] Fix merge errors and warnings --- .../connect/jdbc/dialect/TimescaleDBDatabaseDialect.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/TimescaleDBDatabaseDialect.java b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/TimescaleDBDatabaseDialect.java index d76f7163..589ac137 100644 --- a/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/TimescaleDBDatabaseDialect.java +++ b/kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/dialect/TimescaleDBDatabaseDialect.java @@ -57,7 +57,6 @@ public DatabaseDialect create(AbstractConfig config) { static final String CHUNK_TIME_INTERVAL = "1 day"; - static final String DELIMITER = ";"; static final String HYPERTABLE_WARNING = "A result was returned when none was expected"; static final String TIME_COLUMN = "time"; @@ -114,7 +113,7 @@ public String buildCreateSchemaStatement(TableId table) { private Optional getTimeField(Collection fields) { return fields.stream() - .filter(p -> p.name().toLowerCase().equals(TIME_COLUMN)) + .filter(p -> p.name().equalsIgnoreCase(TIME_COLUMN)) .findFirst(); } @@ -135,8 +134,8 @@ public void applyDdlStatements(Connection connection, @Override protected String getSqlType(SinkRecordField field) { if (field.schemaName() != null) { - if (field.schemaName().equals(Timestamp.LOGICAL_NAME)) { - return "TIMESTAMPTZ"; + if (field.schemaName().equals(Timestamp.LOGICAL_NAME)) { + return "TIMESTAMPTZ"; } } return super.getSqlType(field); @@ -148,7 +147,7 @@ protected void formatColumnValue(ExpressionBuilder builder, String schemaName, if (schemaName != null) { if (schemaName.equals(org.apache.kafka.connect.data.Timestamp.LOGICAL_NAME)) { builder.appendStringQuoted( - DateTimeUtils.formatTimestamptz((java.util.Date) value, super.timeZone())); + DateTimeUtils.formatTimestamp((java.util.Date) value, super.timeZone())); } } super.formatColumnValue(builder, schemaName, schemaParameters, type, value); From cbd0ae61c6ba85373834df1f099bd7afeb87a1b4 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Tue, 30 Aug 2022 16:42:43 +0200 Subject: [PATCH 4/5] Bump github action versions --- .github/workflows/docker.yml | 12 ++++++------ .github/workflows/main.yml | 6 +++--- .github/workflows/release.yml | 14 +++++++------- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index e5e419d4..068713b7 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -20,19 +20,19 @@ jobs: # Steps represent a sequence of tasks that will be executed as part of the job steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 # Add Docker labels and tags - name: Docker meta id: docker_meta - uses: docker/metadata-action@v3 + uses: docker/metadata-action@v4 with: images: ${{ env.DOCKER_IMAGE }} tags: | type=ref,event=branch - name: Cache Docker layers - uses: actions/cache@v2 + uses: actions/cache@v3 with: path: /tmp/.buildx-cache # Key is named differently to avoid collision @@ -42,17 +42,17 @@ jobs: - name: Set up Docker Buildx id: buildx - uses: docker/setup-buildx-action@v1 + uses: docker/setup-buildx-action@v2 - name: Login to DockerHub - uses: docker/login-action@v1 + uses: docker/login-action@v2 with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} - name: Build and push id: docker_build - uses: docker/build-push-action@v2 + uses: docker/build-push-action@v3 with: # Allow running the image on the architectures supported by openjdk:11-jre-slim push: true diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6d43c14e..97eff903 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -23,12 +23,12 @@ jobs: # Steps represent a sequence of tasks that will be executed as part of the job steps: # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up JDK 11 - uses: actions/setup-java@v2 + uses: actions/setup-java@v3 with: - distribution: zulu + distribution: temurin java-version: 11 cache: maven diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ac5f40ea..b8836278 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -16,10 +16,10 @@ jobs: # Steps represent a sequence of tasks that will be executed as part of the job steps: # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it - - uses: actions/checkout@v2 - - uses: actions/setup-java@v2 + - uses: actions/checkout@v3 + - uses: actions/setup-java@v3 with: - distribution: zulu + distribution: temurin java-version: 11 cache: maven @@ -41,12 +41,12 @@ jobs: # Steps represent a sequence of tasks that will be executed as part of the job steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 # Add Docker labels and tags - name: Docker meta id: docker_meta - uses: docker/metadata-action@v3 + uses: docker/metadata-action@v4 with: images: ${{ env.DOCKER_IMAGE }} tags: | @@ -54,14 +54,14 @@ jobs: type=semver,pattern={{major}}.{{minor}} - name: Login to DockerHub - uses: docker/login-action@v1 + uses: docker/login-action@v2 with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} - name: Build and push id: docker_build - uses: docker/build-push-action@v2 + uses: docker/build-push-action@v3 with: # Allow running the image on the architectures supported by openjdk:11-jre-slim push: true From 30174a98ecc97e36f1c194ab63687c922476ab56 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Wed, 31 Aug 2022 11:40:45 +0200 Subject: [PATCH 5/5] Make the wait optional if env variables are missing --- docker/kafka-wait | 99 +++++++++++++++++++++++------------------------ 1 file changed, 49 insertions(+), 50 deletions(-) diff --git a/docker/kafka-wait b/docker/kafka-wait index 3ac86168..915f38b7 100755 --- a/docker/kafka-wait +++ b/docker/kafka-wait @@ -1,60 +1,59 @@ #!/bin/bash +max_timeout=32 + # Check if variables exist if [ -z "$CONNECT_BOOTSTRAP_SERVERS" ]; then echo "CONNECT_BOOTSTRAP_SERVERS is not defined" - exit 2 +else + KAFKA_BROKERS=${KAFKA_BROKERS:-3} + + tries=10 + timeout=1 + while true; do + KAFKA_CHECK=$(kafka-broker-api-versions --bootstrap-server "$CONNECT_BOOTSTRAP_SERVERS" | grep "(id: " | wc -l) + + if [ "$KAFKA_CHECK" -ge "$KAFKA_BROKERS" ]; then + echo "Kafka brokers available." + break + fi + + tries=$((tries - 1)) + if [ ${tries} -eq 0 ]; then + echo "FAILED: KAFKA BROKERs NOT READY." + exit 5 + fi + echo "Expected $KAFKA_BROKERS brokers but found only $KAFKA_CHECK. Waiting $timeout second before retrying ..." + sleep ${timeout} + if [ ${timeout} -lt ${max_timeout} ]; then + timeout=$((timeout * 2)) + fi + done + + echo "Kafka is available." fi if [ -z "$CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL" ]; then echo "CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL is not defined" - exit 4 +else + tries=10 + timeout=1 + while true; do + if wget --spider -q "${CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL}/subjects" 2>/dev/null; then + echo "Schema registry available." + break + fi + tries=$((tries - 1)) + if [ $tries -eq 0 ]; then + echo "FAILED TO REACH SCHEMA REGISTRY." + exit 6 + fi + echo "Failed to reach schema registry. Retrying in ${timeout} seconds." + sleep ${timeout} + if [ ${timeout} -lt ${max_timeout} ]; then + timeout=$((timeout * 2)) + fi + done + + echo "Schema registry is available." fi - -KAFKA_BROKERS=${KAFKA_BROKERS:-3} - -max_timeout=32 - -tries=10 -timeout=1 -while true; do - KAFKA_CHECK=$(kafka-broker-api-versions --bootstrap-server "$CONNECT_BOOTSTRAP_SERVERS" | grep "(id: " | wc -l) - - if [ "$KAFKA_CHECK" -ge "$KAFKA_BROKERS" ]; then - echo "Kafka brokers available." - break - fi - - tries=$((tries - 1)) - if [ ${tries} -eq 0 ]; then - echo "FAILED: KAFKA BROKERs NOT READY." - exit 5 - fi - echo "Expected $KAFKA_BROKERS brokers but found only $KAFKA_CHECK. Waiting $timeout second before retrying ..." - sleep ${timeout} - if [ ${timeout} -lt ${max_timeout} ]; then - timeout=$((timeout * 2)) - fi -done - -tries=10 -timeout=1 -while true; do - if wget --spider -q "${CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL}/subjects" 2>/dev/null; then - echo "Schema registry available." - break - fi - tries=$((tries - 1)) - if [ $tries -eq 0 ]; then - echo "FAILED TO REACH SCHEMA REGISTRY." - exit 6 - fi - echo "Failed to reach schema registry. Retrying in ${timeout} seconds." - sleep ${timeout} - if [ ${timeout} -lt ${max_timeout} ]; then - timeout=$((timeout * 2)) - fi -done - - -echo "Kafka is available. Ready to go!"