Skip to content

Commit

Permalink
🎉 Extend logic for JDBC connectors to provide additional properties i…
Browse files Browse the repository at this point in the history
…n JSON schema (#7859)

* add date-time formats to json schema creation

* add jsonSchemaMap to enum

* fix tests and checkstyle

* remove python changes from PR

* remove star import

* Rename String_timestamp to String_Datetime

* Rename String_timestamp to String_Datetime

* fix checkstyle

* fix jdbc source tests

Co-authored-by: vmaltsev <[email protected]>
  • Loading branch information
VitaliiMaltsev and vmaltsev authored Nov 13, 2021
1 parent 6166164 commit 2fe927b
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class DataTypeEnumTest {
@Test
void testConversionFromJsonSchemaPrimitiveToDataType() {
assertEquals(5, DataType.class.getEnumConstants().length);
assertEquals(6, JsonSchemaPrimitive.class.getEnumConstants().length);
assertEquals(9, JsonSchemaPrimitive.class.getEnumConstants().length);

assertEquals(DataType.STRING, DataType.fromValue(JsonSchemaPrimitive.STRING.toString().toLowerCase()));
assertEquals(DataType.NUMBER, DataType.fromValue(JsonSchemaPrimitive.NUMBER.toString().toLowerCase()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ public JsonSchemaPrimitive getType(final JDBCType bigQueryType) {
case REAL -> JsonSchemaPrimitive.NUMBER;
case NUMERIC, DECIMAL -> JsonSchemaPrimitive.NUMBER;
case CHAR, NCHAR, NVARCHAR, VARCHAR, LONGVARCHAR -> JsonSchemaPrimitive.STRING;
case DATE -> JsonSchemaPrimitive.STRING;
case TIME -> JsonSchemaPrimitive.STRING;
case TIMESTAMP -> JsonSchemaPrimitive.STRING;
case DATE -> JsonSchemaPrimitive.STRING_DATE;
case TIME -> JsonSchemaPrimitive.STRING_TIME;
case TIMESTAMP -> JsonSchemaPrimitive.STRING_DATETIME;
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> JsonSchemaPrimitive.STRING;
// since column types aren't necessarily meaningful to Airbyte, liberally convert all unrecgonised
// types to String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,9 @@ private static void assertExpectedOutputTypes(final Connection connection) throw
.put("decimal", JsonSchemaPrimitive.NUMBER)
.put("char", JsonSchemaPrimitive.STRING)
.put("varchar", JsonSchemaPrimitive.STRING)
.put("date", JsonSchemaPrimitive.STRING)
.put("time", JsonSchemaPrimitive.STRING)
.put("timestamp", JsonSchemaPrimitive.STRING)
.put("date", JsonSchemaPrimitive.STRING_DATE)
.put("time", JsonSchemaPrimitive.STRING_TIME)
.put("timestamp", JsonSchemaPrimitive.STRING_DATETIME)
.put("binary1", JsonSchemaPrimitive.STRING)
.build();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.cockroachdb;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -64,4 +68,5 @@ private void putCockroachSpecialDataType(ResultSet resultSet, int index, ObjectN
node.put(columnName, (Double) null);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ protected void initTests() {
.addNullExpectedValue()
.build());

// Time (04:05:06) would be represented like "1970-01-01T04:05:06Z"
// Time (04:05:06) would be represented like "1970-01-01T04:05:06Z"
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("timetz")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
defaultNamespace,
Field.of(COL_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING))
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING_DATE))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
Field.of(COL_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING_DATE),
Field.of(COL_ROW_ID, JsonSchemaPrimitive.NUMBER))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ROW_ID))),
Expand All @@ -138,7 +138,7 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
defaultNamespace,
Field.of(COL_FIRST_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_LAST_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING))
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING_DATE))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,10 @@ protected void tearDown(TestDestinationEnv testEnv) {
/* Helpers */

private String getCertificate() throws IOException, InterruptedException {
// To enable SSL connection on the server, we need to generate self-signed certificates for the server and add them to the configuration.
// Then you need to enable SSL connection and specify on which port it will work. These changes will take effect after restart.
// To enable SSL connection on the server, we need to generate self-signed certificates for the
// server and add them to the configuration.
// Then you need to enable SSL connection and specify on which port it will work. These changes will
// take effect after restart.
// The certificate for generating a user certificate has the extension *.arm.
db.execInContainer("su", "-", "db2inst1", "-c", "gsk8capicmd_64 -keydb -create -db \"server.kdb\" -pw \"" + TEST_KEY_STORE_PASS + "\" -stash");
db.execInContainer("su", "-", "db2inst1", "-c", "gsk8capicmd_64 -cert -create -db \"server.kdb\" -pw \"" + TEST_KEY_STORE_PASS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.Db2JdbcStreamingQueryConfiguration;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import java.sql.JDBCType;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -803,28 +803,32 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
defaultNamespace,
Field.of(COL_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING))
Field.of(COL_UPDATED_AT, resolveJsonSchemaType()))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
Field.of(COL_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING))
Field.of(COL_UPDATED_AT, resolveJsonSchemaType()))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(Collections.emptyList()),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_COMPOSITE_PK,
defaultNamespace,
Field.of(COL_FIRST_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_LAST_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING))
Field.of(COL_UPDATED_AT, resolveJsonSchemaType()))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
}

private JsonSchemaPrimitive resolveJsonSchemaType() {
return getDriverClass().toLowerCase().contains("oracle") ? JsonSchemaPrimitive.STRING_DATETIME : JsonSchemaPrimitive.STRING_DATE;
}

protected List<AirbyteMessage> getTestMessages() {
return Lists.newArrayList(
new AirbyteMessage().withType(Type.RECORD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public static JsonNode fieldsToJsonSchema(final List<Field> fields) {
.stream()
.collect(Collectors.toMap(
Field::getName,
field -> ImmutableMap.of("type", field.getTypeAsJsonSchemaString()))))
field -> field.getType().getJsonSchemaTypeMap())))
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,28 @@

package io.airbyte.protocol.models;

import com.google.common.collect.ImmutableMap;

public enum JsonSchemaPrimitive {
STRING,
NUMBER,
OBJECT,
ARRAY,
BOOLEAN,
NULL;

STRING_DATE(ImmutableMap.of("type", "string", "format", "date")),
STRING_TIME(ImmutableMap.of("type", "string", "format", "time")),
STRING_DATETIME(ImmutableMap.of("type", "string", "format", "date-time")),
STRING(ImmutableMap.of("type", "string")),
NUMBER(ImmutableMap.of("type", "number")),
OBJECT(ImmutableMap.of("type", "object")),
ARRAY(ImmutableMap.of("type", "array")),
BOOLEAN(ImmutableMap.of("type", "boolean")),
NULL(ImmutableMap.of("type", "null"));

private final ImmutableMap<String, String> jsonSchemaTypeMap;

JsonSchemaPrimitive(ImmutableMap<String, String> jsonSchemaTypeMap) {
this.jsonSchemaTypeMap = jsonSchemaTypeMap;
}

public ImmutableMap<String, String> getJsonSchemaTypeMap() {
return jsonSchemaTypeMap;
}

}

0 comments on commit 2fe927b

Please sign in to comment.