Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert ":tada: Extend logic for JDBC connectors to provide additional… #7969

Merged
merged 1 commit into from
Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class DataTypeEnumTest {
@Test
void testConversionFromJsonSchemaPrimitiveToDataType() {
assertEquals(5, DataType.class.getEnumConstants().length);
assertEquals(9, JsonSchemaPrimitive.class.getEnumConstants().length);
assertEquals(6, JsonSchemaPrimitive.class.getEnumConstants().length);

assertEquals(DataType.STRING, DataType.fromValue(JsonSchemaPrimitive.STRING.toString().toLowerCase()));
assertEquals(DataType.NUMBER, DataType.fromValue(JsonSchemaPrimitive.NUMBER.toString().toLowerCase()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ public JsonSchemaPrimitive getType(final JDBCType bigQueryType) {
case REAL -> JsonSchemaPrimitive.NUMBER;
case NUMERIC, DECIMAL -> JsonSchemaPrimitive.NUMBER;
case CHAR, NCHAR, NVARCHAR, VARCHAR, LONGVARCHAR -> JsonSchemaPrimitive.STRING;
case DATE -> JsonSchemaPrimitive.STRING_DATE;
case TIME -> JsonSchemaPrimitive.STRING_TIME;
case TIMESTAMP -> JsonSchemaPrimitive.STRING_DATETIME;
case DATE -> JsonSchemaPrimitive.STRING;
case TIME -> JsonSchemaPrimitive.STRING;
case TIMESTAMP -> JsonSchemaPrimitive.STRING;
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> JsonSchemaPrimitive.STRING;
// since column types aren't necessarily meaningful to Airbyte, liberally convert all unrecgonised
// types to String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,9 @@ private static void assertExpectedOutputTypes(final Connection connection) throw
.put("decimal", JsonSchemaPrimitive.NUMBER)
.put("char", JsonSchemaPrimitive.STRING)
.put("varchar", JsonSchemaPrimitive.STRING)
.put("date", JsonSchemaPrimitive.STRING_DATE)
.put("time", JsonSchemaPrimitive.STRING_TIME)
.put("timestamp", JsonSchemaPrimitive.STRING_DATETIME)
.put("date", JsonSchemaPrimitive.STRING)
.put("time", JsonSchemaPrimitive.STRING)
.put("timestamp", JsonSchemaPrimitive.STRING)
.put("binary1", JsonSchemaPrimitive.STRING)
.build();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.cockroachdb;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -68,5 +64,4 @@ private void putCockroachSpecialDataType(ResultSet resultSet, int index, ObjectN
node.put(columnName, (Double) null);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ protected void initTests() {
.addNullExpectedValue()
.build());

// Time (04:05:06) would be represented like "1970-01-01T04:05:06Z"
// Time (04:05:06) would be represented like "1970-01-01T04:05:06Z"
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("timetz")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
defaultNamespace,
Field.of(COL_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING_DATE))
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
Field.of(COL_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING_DATE),
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING),
Field.of(COL_ROW_ID, JsonSchemaPrimitive.NUMBER))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ROW_ID))),
Expand All @@ -138,7 +138,7 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
defaultNamespace,
Field.of(COL_FIRST_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_LAST_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING_DATE))
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,8 @@ protected void tearDown(TestDestinationEnv testEnv) {
/* Helpers */

private String getCertificate() throws IOException, InterruptedException {
// To enable SSL connection on the server, we need to generate self-signed certificates for the
// server and add them to the configuration.
// Then you need to enable SSL connection and specify on which port it will work. These changes will
// take effect after restart.
// To enable SSL connection on the server, we need to generate self-signed certificates for the server and add them to the configuration.
// Then you need to enable SSL connection and specify on which port it will work. These changes will take effect after restart.
// The certificate for generating a user certificate has the extension *.arm.
db.execInContainer("su", "-", "db2inst1", "-c", "gsk8capicmd_64 -keydb -create -db \"server.kdb\" -pw \"" + TEST_KEY_STORE_PASS + "\" -stash");
db.execInContainer("su", "-", "db2inst1", "-c", "gsk8capicmd_64 -cert -create -db \"server.kdb\" -pw \"" + TEST_KEY_STORE_PASS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.Db2JdbcStreamingQueryConfiguration;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import java.sql.JDBCType;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -803,32 +803,28 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
defaultNamespace,
Field.of(COL_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_UPDATED_AT, resolveJsonSchemaType()))
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
Field.of(COL_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_UPDATED_AT, resolveJsonSchemaType()))
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(Collections.emptyList()),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_COMPOSITE_PK,
defaultNamespace,
Field.of(COL_FIRST_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_LAST_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_UPDATED_AT, resolveJsonSchemaType()))
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
}

private JsonSchemaPrimitive resolveJsonSchemaType() {
return getDriverClass().toLowerCase().contains("oracle") ? JsonSchemaPrimitive.STRING_DATETIME : JsonSchemaPrimitive.STRING_DATE;
}

protected List<AirbyteMessage> getTestMessages() {
return Lists.newArrayList(
new AirbyteMessage().withType(Type.RECORD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public static JsonNode fieldsToJsonSchema(final List<Field> fields) {
.stream()
.collect(Collectors.toMap(
Field::getName,
field -> field.getType().getJsonSchemaTypeMap())))
field -> ImmutableMap.of("type", field.getTypeAsJsonSchemaString()))))
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,11 @@

package io.airbyte.protocol.models;

import com.google.common.collect.ImmutableMap;

public enum JsonSchemaPrimitive {

STRING_DATE(ImmutableMap.of("type", "string", "format", "date")),
STRING_TIME(ImmutableMap.of("type", "string", "format", "time")),
STRING_DATETIME(ImmutableMap.of("type", "string", "format", "date-time")),
STRING(ImmutableMap.of("type", "string")),
NUMBER(ImmutableMap.of("type", "number")),
OBJECT(ImmutableMap.of("type", "object")),
ARRAY(ImmutableMap.of("type", "array")),
BOOLEAN(ImmutableMap.of("type", "boolean")),
NULL(ImmutableMap.of("type", "null"));

private final ImmutableMap<String, String> jsonSchemaTypeMap;

JsonSchemaPrimitive(ImmutableMap<String, String> jsonSchemaTypeMap) {
this.jsonSchemaTypeMap = jsonSchemaTypeMap;
}

public ImmutableMap<String, String> getJsonSchemaTypeMap() {
return jsonSchemaTypeMap;
}

STRING,
NUMBER,
OBJECT,
ARRAY,
BOOLEAN,
NULL;
}