Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source CockroachDB: Fix cockroach Datatypes #7819

Merged
merged 10 commits into from
Nov 12, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
- name: Cockroachdb
sourceDefinitionId: 9fa5862c-da7c-11eb-8d19-0242ac130003
dockerRepository: airbyte/source-cockroachdb
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/sources/cockroachdb
sourceType: database
- name: Commercetools
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-cockroachdb:0.1.2"
- dockerImage: "airbyte/source-cockroachdb:0.1.3"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/source-cockroachdb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dependencies {
implementation project(':airbyte-integrations:connectors:source-relational-db')

implementation 'org.apache.commons:commons-lang3:3.11'
implementation "org.postgresql:postgresql:42.2.18"
implementation "org.postgresql:postgresql:42.3.1"

testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
Expand All @@ -28,7 +29,7 @@ public class CockroachDbSource extends AbstractJdbcSource {
static final String DRIVER_CLASS = "org.postgresql.Driver";

public CockroachDbSource() {
super(DRIVER_CLASS, new PostgresJdbcStreamingQueryConfiguration());
super(DRIVER_CLASS, new PostgresJdbcStreamingQueryConfiguration(), new CockroachJdbcSourceOperations());
}

@Override
Expand Down Expand Up @@ -87,4 +88,9 @@ public static void main(final String[] args) throws Exception {
LOGGER.info("completed source: {}", CockroachDbSource.class);
}

@Override
protected JdbcSourceOperations getSourceOperations() {
return new CockroachJdbcSourceOperations();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.airbyte.integrations.source.cockroachdb;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;

/**
* Class is the responsible for special Cockroach DataTypes handling
*/
public class CockroachJdbcSourceOperations extends JdbcSourceOperations {

@Override
protected void putBoolean(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
if ("bit".equalsIgnoreCase(resultSet.getMetaData().getColumnTypeName(index))) {
node.put(columnName, resultSet.getByte(index));
} else {
node.put(columnName, resultSet.getBoolean(index));
}
}

@Override
protected void putDouble(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, resultSet.getDouble(index));
}

@Override
protected void putNumber(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, resultSet.getBigDecimal(index));
}

@Override
public JsonNode rowToJson(ResultSet queryContext) throws SQLException {
final int columnCount = queryContext.getMetaData().getColumnCount();
final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());

for (int i = 1; i <= columnCount; i++) {
try {
queryContext.getObject(i);
if (!queryContext.wasNull()) {
setJsonField(queryContext, i, jsonNode);
}
} catch (SQLException e) {
putCockroachSpecialDataType(queryContext, i, jsonNode);
}
}
return jsonNode;
}

private void putCockroachSpecialDataType(ResultSet resultSet, int index, ObjectNode node) throws SQLException {
String columnType = resultSet.getMetaData().getColumnTypeName(index);
String columnName = resultSet.getMetaData().getColumnName(index);
try {
if ("numeric".equalsIgnoreCase(columnType)) {
final double value = resultSet.getDouble(index);
node.put(columnName, value);
} else {
node.put(columnName, (Double) null);
}
} catch (final SQLException e) {
node.put(columnName, (Double) null);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be an else branch here? Otherwise the result may be ignored.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added else statement and handling of SQLException as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know what types may go to the else branch or give a SQLException? Depending on what type it can be, defaulting to null may or may not be the best solution.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,14 @@ protected void initTests() {
.addExpectedValues("{sky,road,car}", null)
.build());

// TODO https://github.com/airbytehq/airbyte/issues/4408
// BIT type is currently parsed as a Boolean which is incorrect
// addDataTypeTestData(
// TestDataHolder.builder()
// .sourceType("bit")
// .fullSourceDataType("BIT(3)")
// .airbyteType(JsonSchemaPrimitive.NUMBER)
// .addInsertValues("B'101'")
// //.addExpectedValues("101")
// .build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("bit")
.fullSourceDataType("BIT(3)")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("B'101'")
.addExpectedValues("101")
.build());

addDataTypeTestData(
TestDataHolder.builder()
Expand Down Expand Up @@ -203,39 +201,32 @@ protected void initTests() {
" ", null)
.build());

// TODO https://github.com/airbytehq/airbyte/issues/4408
// JdbcUtils-> DATE_FORMAT is set as ""yyyy-MM-dd'T'HH:mm:ss'Z'"" so it doesnt
// suppose to handle BC dates
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("date")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'1999-01-08'", "null") // "'199-10-10 BC'"
.addExpectedValues("1999-01-08T00:00:00Z", null) // , "199-10-10 BC")
.addInsertValues("'1999-01-08'", "null")
.addExpectedValues("1999-01-08T00:00:00Z", null)
.build());

// TODO https://github.com/airbytehq/airbyte/issues/4408
// Values "'-Infinity'", "'Infinity'", "'Nan'" will not be parsed due to:
// JdbcUtils -> setJsonField contains:
// case FLOAT, DOUBLE -> o.put(columnName, nullIfInvalid(() -> r.getDouble(i), Double::isFinite));
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("float8")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("'123'", "'1234567890.1234567'", "null")
.addExpectedValues("123.0", "1.2345678901234567E9", null)
.addInsertValues("'123'", "'1234567890.1234567'", "null", "'infinity'",
"'+infinity'", "'+inf'", "'inf'", "'-inf'", "'-infinity'", "'nan'")
.addExpectedValues("123.0", "1.2345678901234567E9", null,
"Infinity", "Infinity", "Infinity", "Infinity", "-Infinity", "-Infinity", "NaN")
.build());

// TODO https://github.com/airbytehq/airbyte/issues/4408
// Values "'-Infinity'", "'Infinity'", "'Nan'" will not be parsed due to:
// JdbcUtils -> setJsonField contains:
// case FLOAT, DOUBLE -> o.put(columnName, nullIfInvalid(() -> r.getDouble(i), Double::isFinite));
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("float")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("'123'", "'1234567890.1234567'", "null")
.addExpectedValues("123.0", "1.2345678901234567E9", null)
.addInsertValues("'123'", "'1234567890.1234567'", "null", "'infinity'",
"'+infinity'", "'+inf'", "'inf'", "'-inf'", "'-infinity'", "'nan'")
.addExpectedValues("123.0", "1.2345678901234567E9", null, "Infinity",
"Infinity", "Infinity", "Infinity", "-Infinity", "-Infinity", "NaN")
.build());

addDataTypeTestData(
Expand Down Expand Up @@ -286,16 +277,12 @@ protected void initTests() {
.addExpectedValues("99999", null)
.build());

// TODO https://github.com/airbytehq/airbyte/issues/4408
// The decimal type in CockroachDB may contain 'Nan', inf, infinity, +inf, +infinity, -inf,
// -infinity types, but in JdbcUtils-> rowToJson we try to map it like this, so it fails
// case NUMERIC, DECIMAL -> o.put(columnName, nullIfInvalid(() -> r.getBigDecimal(i)));
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("decimal")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("99999", "5.1", "0", "null")
.addExpectedValues("99999", "5.1", "0", null)
.addInsertValues("'+inf'", "999", "'-inf'", "'+infinity'", "'-infinity'", "'nan'")
.addExpectedValues("Infinity", "999", "-Infinity", "Infinity", "-Infinity", "NaN")
.build());

addDataTypeTestData(
Expand All @@ -315,25 +302,24 @@ protected void initTests() {
.addExpectedValues("a", "abc", "Миші йдуть;", "櫻花分店", "", null, "\\xF0\\x9F\\x9A\\x80")
.build());

// TODO https://github.com/airbytehq/airbyte/issues/4408
// JdbcUtils-> DATE_FORMAT is set as ""yyyy-MM-dd'T'HH:mm:ss'Z'"" for both Date and Time types.
// So Time only (04:05:06) would be represented like "1970-01-01T04:05:06Z" which is incorrect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is still relevant since the returned value still have 1970-01-01 in the full date string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Returned back the comment, regarding the DateTime format. However, all java source connectors will send full DateTime in JSON for now. As we need standardization for all related DateTime formats.
Please, see:

public static final String DATE_FORMAT_PATTERN = "yyyy-MM-dd'T'HH:mm:ss'Z'";

So this behavior will be changed when we will sure all destination connectors are able to parse date and time formats.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Make sense.

// Time (04:05:06) would be represented like "1970-01-01T04:05:06Z"
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("time")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("null")
.addInsertValues("'04:05:06'", null)
.addExpectedValues("1970-01-01T04:05:06Z")
.addNullExpectedValue()
.build());

// https://github.com/airbytehq/airbyte/issues/4408
// TODO JdbcUtils-> DATE_FORMAT is set as ""yyyy-MM-dd'T'HH:mm:ss'Z'"" for both Date and Time types.
// So Time only (04:05:06) would be represented like "1970-01-01T04:05:06Z" which is incorrect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for this comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same is related to this one.

// Time (04:05:06) would be represented like "1970-01-01T04:05:06Z"
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("timetz")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("null")
.addInsertValues("'04:05:06Z'", null)
.addExpectedValues("1970-01-01T04:05:06Z")
.addNullExpectedValue()
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ class CockroachDbSourceTest {
.toDefaultConfiguredCatalog(CATALOG);
private static final Set<AirbyteMessage> ASCII_MESSAGES = Sets.newHashSet(
createRecord(STREAM_NAME, SCHEMA_NAME,
map("id", new BigDecimal("1.0"), "name", "goku", "power", null)),
map("id", new BigDecimal("1.0"), "name", "goku", "power", Double.POSITIVE_INFINITY)),
createRecord(STREAM_NAME, SCHEMA_NAME,
map("id", new BigDecimal("2.0"), "name", "vegeta", "power", 9000.1)),
createRecord(STREAM_NAME, SCHEMA_NAME, map("id", null, "name", "piccolo", "power", null)));
createRecord(STREAM_NAME, SCHEMA_NAME, map("id", Double.NaN, "name", "piccolo", "power", Double.NEGATIVE_INFINITY)));

private static final Set<AirbyteMessage> UTF8_MESSAGES = Sets.newHashSet(
createRecord(STREAM_NAME, SCHEMA_NAME,
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/cockroachdb.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,6 @@ Your database user should now be ready for use with Airbyte.

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.3 | 2021-10-10 | [7819](https://github.com/airbytehq/airbyte/pull/7819) | Fixed Datatype errors during Cockroach DB parsing |
| 0.1.2 | 2021-08-13 | [4699](https://github.com/airbytehq/airbyte/pull/4699) | Added json config validator |