diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 9108faaf17169..938d1190125b7 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -99,7 +99,7 @@ - name: Cockroachdb sourceDefinitionId: 9fa5862c-da7c-11eb-8d19-0242ac130003 dockerRepository: airbyte/source-cockroachdb - dockerImageTag: 0.1.2 + dockerImageTag: 0.1.3 documentationUrl: https://docs.airbyte.io/integrations/sources/cockroachdb sourceType: database - name: Commercetools diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index ed6768f7e5a20..4bb697c8682ed 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -919,7 +919,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-cockroachdb:0.1.2" +- dockerImage: "airbyte/source-cockroachdb:0.1.3" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/postgres" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-cockroachdb/Dockerfile b/airbyte-integrations/connectors/source-cockroachdb/Dockerfile index 6c9ba122f5e5e..e7fa7e0b90106 100644 --- a/airbyte-integrations/connectors/source-cockroachdb/Dockerfile +++ b/airbyte-integrations/connectors/source-cockroachdb/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/source-cockroachdb \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-cockroachdb/build.gradle b/airbyte-integrations/connectors/source-cockroachdb/build.gradle index 342895cc05168..07e996113ec3a 100644 --- a/airbyte-integrations/connectors/source-cockroachdb/build.gradle +++ b/airbyte-integrations/connectors/source-cockroachdb/build.gradle @@ -17,7 +17,7 @@ dependencies { implementation project(':airbyte-integrations:connectors:source-relational-db') implementation 'org.apache.commons:commons-lang3:3.11' - implementation "org.postgresql:postgresql:42.2.18" + implementation "org.postgresql:postgresql:42.3.1" testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc')) diff --git a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java index 5e74b69090f2a..631fd32259444 100644 --- a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java +++ b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java @@ -8,6 +8,7 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.db.jdbc.JdbcSourceOperations; import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; @@ -28,7 +29,7 @@ public class CockroachDbSource extends AbstractJdbcSource { static final String DRIVER_CLASS = "org.postgresql.Driver"; public CockroachDbSource() { - super(DRIVER_CLASS, new PostgresJdbcStreamingQueryConfiguration()); + super(DRIVER_CLASS, new PostgresJdbcStreamingQueryConfiguration(), new CockroachJdbcSourceOperations()); } @Override @@ -87,4 +88,9 @@ public static void main(final String[] args) throws Exception { LOGGER.info("completed source: {}", CockroachDbSource.class); } + @Override + protected JdbcSourceOperations getSourceOperations() { + return new CockroachJdbcSourceOperations(); + } + } diff --git a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcSourceOperations.java b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcSourceOperations.java new file mode 100644 index 0000000000000..e24ef00a4caf5 --- /dev/null +++ b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcSourceOperations.java @@ -0,0 +1,67 @@ +package io.airbyte.integrations.source.cockroachdb; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.jdbc.JdbcSourceOperations; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collections; + +/** + * Class is the responsible for special Cockroach DataTypes handling + */ +public class CockroachJdbcSourceOperations extends JdbcSourceOperations { + + @Override + protected void putBoolean(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { + if ("bit".equalsIgnoreCase(resultSet.getMetaData().getColumnTypeName(index))) { + node.put(columnName, resultSet.getByte(index)); + } else { + node.put(columnName, resultSet.getBoolean(index)); + } + } + + @Override + protected void putDouble(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { + node.put(columnName, resultSet.getDouble(index)); + } + + @Override + protected void putNumber(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { + node.put(columnName, resultSet.getBigDecimal(index)); + } + + @Override + public JsonNode rowToJson(ResultSet queryContext) throws SQLException { + final int columnCount = queryContext.getMetaData().getColumnCount(); + final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); + + for (int i = 1; i <= columnCount; i++) { + try { + queryContext.getObject(i); + if (!queryContext.wasNull()) { + setJsonField(queryContext, i, jsonNode); + } + } catch (SQLException e) { + putCockroachSpecialDataType(queryContext, i, jsonNode); + } + } + return jsonNode; + } + + private void putCockroachSpecialDataType(ResultSet resultSet, int index, ObjectNode node) throws SQLException { + String columnType = resultSet.getMetaData().getColumnTypeName(index); + String columnName = resultSet.getMetaData().getColumnName(index); + try { + if ("numeric".equalsIgnoreCase(columnType)) { + final double value = resultSet.getDouble(index); + node.put(columnName, value); + } else { + node.put(columnName, (Double) null); + } + } catch (final SQLException e) { + node.put(columnName, (Double) null); + } + } +} diff --git a/airbyte-integrations/connectors/source-cockroachdb/src/test-integration/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSourceDatatypeTest.java b/airbyte-integrations/connectors/source-cockroachdb/src/test-integration/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSourceDatatypeTest.java index 18d22facd93dd..bac0377247848 100644 --- a/airbyte-integrations/connectors/source-cockroachdb/src/test-integration/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-cockroachdb/src/test-integration/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSourceDatatypeTest.java @@ -88,16 +88,14 @@ protected void initTests() { .addExpectedValues("{sky,road,car}", null) .build()); - // TODO https://github.com/airbytehq/airbyte/issues/4408 - // BIT type is currently parsed as a Boolean which is incorrect - // addDataTypeTestData( - // TestDataHolder.builder() - // .sourceType("bit") - // .fullSourceDataType("BIT(3)") - // .airbyteType(JsonSchemaPrimitive.NUMBER) - // .addInsertValues("B'101'") - // //.addExpectedValues("101") - // .build()); + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bit") + .fullSourceDataType("BIT(3)") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .addInsertValues("B'101'") + .addExpectedValues("101") + .build()); addDataTypeTestData( TestDataHolder.builder() @@ -203,39 +201,32 @@ protected void initTests() { " ", null) .build()); - // TODO https://github.com/airbytehq/airbyte/issues/4408 - // JdbcUtils-> DATE_FORMAT is set as ""yyyy-MM-dd'T'HH:mm:ss'Z'"" so it doesnt - // suppose to handle BC dates addDataTypeTestData( TestDataHolder.builder() .sourceType("date") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("'1999-01-08'", "null") // "'199-10-10 BC'" - .addExpectedValues("1999-01-08T00:00:00Z", null) // , "199-10-10 BC") + .addInsertValues("'1999-01-08'", "null") + .addExpectedValues("1999-01-08T00:00:00Z", null) .build()); - // TODO https://github.com/airbytehq/airbyte/issues/4408 - // Values "'-Infinity'", "'Infinity'", "'Nan'" will not be parsed due to: - // JdbcUtils -> setJsonField contains: - // case FLOAT, DOUBLE -> o.put(columnName, nullIfInvalid(() -> r.getDouble(i), Double::isFinite)); addDataTypeTestData( TestDataHolder.builder() .sourceType("float8") .airbyteType(JsonSchemaPrimitive.NUMBER) - .addInsertValues("'123'", "'1234567890.1234567'", "null") - .addExpectedValues("123.0", "1.2345678901234567E9", null) + .addInsertValues("'123'", "'1234567890.1234567'", "null", "'infinity'", + "'+infinity'", "'+inf'", "'inf'", "'-inf'", "'-infinity'", "'nan'") + .addExpectedValues("123.0", "1.2345678901234567E9", null, + "Infinity", "Infinity", "Infinity", "Infinity", "-Infinity", "-Infinity", "NaN") .build()); - // TODO https://github.com/airbytehq/airbyte/issues/4408 - // Values "'-Infinity'", "'Infinity'", "'Nan'" will not be parsed due to: - // JdbcUtils -> setJsonField contains: - // case FLOAT, DOUBLE -> o.put(columnName, nullIfInvalid(() -> r.getDouble(i), Double::isFinite)); addDataTypeTestData( TestDataHolder.builder() .sourceType("float") .airbyteType(JsonSchemaPrimitive.NUMBER) - .addInsertValues("'123'", "'1234567890.1234567'", "null") - .addExpectedValues("123.0", "1.2345678901234567E9", null) + .addInsertValues("'123'", "'1234567890.1234567'", "null", "'infinity'", + "'+infinity'", "'+inf'", "'inf'", "'-inf'", "'-infinity'", "'nan'") + .addExpectedValues("123.0", "1.2345678901234567E9", null, "Infinity", + "Infinity", "Infinity", "Infinity", "-Infinity", "-Infinity", "NaN") .build()); addDataTypeTestData( @@ -286,16 +277,12 @@ protected void initTests() { .addExpectedValues("99999", null) .build()); - // TODO https://github.com/airbytehq/airbyte/issues/4408 - // The decimal type in CockroachDB may contain 'Nan', inf, infinity, +inf, +infinity, -inf, - // -infinity types, but in JdbcUtils-> rowToJson we try to map it like this, so it fails - // case NUMERIC, DECIMAL -> o.put(columnName, nullIfInvalid(() -> r.getBigDecimal(i))); addDataTypeTestData( TestDataHolder.builder() .sourceType("decimal") .airbyteType(JsonSchemaPrimitive.NUMBER) - .addInsertValues("99999", "5.1", "0", "null") - .addExpectedValues("99999", "5.1", "0", null) + .addInsertValues("'+inf'", "999", "'-inf'", "'+infinity'", "'-infinity'", "'nan'") + .addExpectedValues("Infinity", "999", "-Infinity", "Infinity", "-Infinity", "NaN") .build()); addDataTypeTestData( @@ -315,25 +302,24 @@ protected void initTests() { .addExpectedValues("a", "abc", "Миші йдуть;", "櫻花分店", "", null, "\\xF0\\x9F\\x9A\\x80") .build()); - // TODO https://github.com/airbytehq/airbyte/issues/4408 // JdbcUtils-> DATE_FORMAT is set as ""yyyy-MM-dd'T'HH:mm:ss'Z'"" for both Date and Time types. - // So Time only (04:05:06) would be represented like "1970-01-01T04:05:06Z" which is incorrect + // Time (04:05:06) would be represented like "1970-01-01T04:05:06Z" addDataTypeTestData( TestDataHolder.builder() .sourceType("time") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null") + .addInsertValues("'04:05:06'", null) + .addExpectedValues("1970-01-01T04:05:06Z") .addNullExpectedValue() .build()); - // https://github.com/airbytehq/airbyte/issues/4408 - // TODO JdbcUtils-> DATE_FORMAT is set as ""yyyy-MM-dd'T'HH:mm:ss'Z'"" for both Date and Time types. - // So Time only (04:05:06) would be represented like "1970-01-01T04:05:06Z" which is incorrect + // Time (04:05:06) would be represented like "1970-01-01T04:05:06Z" addDataTypeTestData( TestDataHolder.builder() .sourceType("timetz") .airbyteType(JsonSchemaPrimitive.STRING) - .addInsertValues("null") + .addInsertValues("'04:05:06Z'", null) + .addExpectedValues("1970-01-01T04:05:06Z") .addNullExpectedValue() .build()); diff --git a/airbyte-integrations/connectors/source-cockroachdb/src/test/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSourceTest.java b/airbyte-integrations/connectors/source-cockroachdb/src/test/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSourceTest.java index ffc9b294a46e5..502de737fe8ae 100644 --- a/airbyte-integrations/connectors/source-cockroachdb/src/test/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSourceTest.java +++ b/airbyte-integrations/connectors/source-cockroachdb/src/test/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSourceTest.java @@ -80,10 +80,10 @@ class CockroachDbSourceTest { .toDefaultConfiguredCatalog(CATALOG); private static final Set ASCII_MESSAGES = Sets.newHashSet( createRecord(STREAM_NAME, SCHEMA_NAME, - map("id", new BigDecimal("1.0"), "name", "goku", "power", null)), + map("id", new BigDecimal("1.0"), "name", "goku", "power", Double.POSITIVE_INFINITY)), createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("2.0"), "name", "vegeta", "power", 9000.1)), - createRecord(STREAM_NAME, SCHEMA_NAME, map("id", null, "name", "piccolo", "power", null))); + createRecord(STREAM_NAME, SCHEMA_NAME, map("id", Double.NaN, "name", "piccolo", "power", Double.NEGATIVE_INFINITY))); private static final Set UTF8_MESSAGES = Sets.newHashSet( createRecord(STREAM_NAME, SCHEMA_NAME, diff --git a/docs/integrations/sources/cockroachdb.md b/docs/integrations/sources/cockroachdb.md index 5f03415b8b933..fed85e0f305d1 100644 --- a/docs/integrations/sources/cockroachdb.md +++ b/docs/integrations/sources/cockroachdb.md @@ -95,5 +95,6 @@ Your database user should now be ready for use with Airbyte. | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.3 | 2021-10-10 | [7819](https://github.com/airbytehq/airbyte/pull/7819) | Fixed Datatype errors during Cockroach DB parsing | | 0.1.2 | 2021-08-13 | [4699](https://github.com/airbytehq/airbyte/pull/4699) | Added json config validator |