Skip to content

Commit

Permalink
Add skeleton to support all postgres types
Browse files Browse the repository at this point in the history
  • Loading branch information
tuliren committed Dec 12, 2021
1 parent 6843bc1 commit c64f2b4
Show file tree
Hide file tree
Showing 3 changed files with 331 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class JdbcSourceOperations extends AbstractJdbcCompatibleSourceOperations

private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceOperations.class);

private JDBCType safeGetJdbcType(final int columnTypeInt) {
protected JDBCType safeGetJdbcType(final int columnTypeInt) {
try {
return JDBCType.valueOf(columnTypeInt);
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,32 @@

package io.airbyte.integrations.source.postgres;

import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_SCHEMA_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_TABLE_NAME;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.DataTypeUtils;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.sql.JDBCType;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Collections;
import org.postgresql.jdbc.PgResultSetMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresSourceOperations extends JdbcSourceOperations {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSourceOperations.class);

@Override
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
// the first call communicates with the database. after that the result is cached.
Expand All @@ -25,16 +38,23 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());

for (int i = 1; i <= columnCount; i++) {
final String columnType = metadata.getColumnTypeName(i);
// attempt to access the column. this allows us to know if it is null before we do type-specific
// parsing. if it is null, we can move on. while awkward, this seems to be the agreed upon way of
// checking for null values with jdbc.

if (metadata.getColumnTypeName(i).equalsIgnoreCase("money")) {
if (columnType.equalsIgnoreCase("money")) {
// when a column is of type MONEY, getObject will throw exception
// this is a bug that will not be fixed:
// https://github.com/pgjdbc/pgjdbc/issues/425
// https://github.com/pgjdbc/pgjdbc/issues/1835
queryContext.getString(i);
} else if (columnType.equalsIgnoreCase("bit")) {
// getObject will fail as it tries to parse the value as boolean
queryContext.getString(i);
} else if (columnType.equalsIgnoreCase("numeric") || columnType.equalsIgnoreCase("decimal")) {
// getObject will fail when the value is 'infinity'
queryContext.getDouble(i);
} else {
queryContext.getObject(i);
}
Expand All @@ -49,12 +69,78 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
return jsonNode;
}

@Override
public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException {
final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData();
final String columnName = metadata.getColumnName(colIndex);
final String columnTypeName = metadata.getColumnTypeName(colIndex);
final JDBCType columnType = safeGetJdbcType(metadata.getColumnType(colIndex));

// https://www.postgresql.org/docs/14/datatype.html
switch (columnType) {
case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex);
case TINYINT, SMALLINT -> putShortInt(json, columnName, resultSet, colIndex);
case INTEGER -> putInteger(json, columnName, resultSet, colIndex);
case BIGINT -> putBigInt(json, columnName, resultSet, colIndex);
case FLOAT, DOUBLE -> putDouble(json, columnName, resultSet, colIndex);
case REAL -> putFloat(json, columnName, resultSet, colIndex);
case NUMERIC, DECIMAL -> putBigDecimal(json, columnName, resultSet, colIndex);
// BIT is a bit string in Postgres, e.g. '0100'
case BIT, CHAR, VARCHAR, LONGVARCHAR -> putString(json, columnName, resultSet, colIndex);
case DATE -> putDate(json, columnName, resultSet, colIndex);
case TIME -> putTime(json, columnName, resultSet, colIndex);
case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex);
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex);
default -> putDefault(json, columnName, resultSet, colIndex);
}
}

@Override
public JDBCType getFieldType(final JsonNode field) {
try {
final String typeName = field.get(INTERNAL_COLUMN_TYPE_NAME).asText();
// Postgres boolean is mapped to JDBCType.BIT, but should be BOOLEAN
if (typeName.equalsIgnoreCase("bool")) {
return JDBCType.BOOLEAN;
}

final JDBCType jdbcType = JDBCType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt());
System.out.printf("getFieldType: %s (%d %s) -> %s\n",
typeName,
field.get(INTERNAL_COLUMN_TYPE).asInt(),
jdbcType.name(),
getJsonType(jdbcType));
return JDBCType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt());
} catch (final IllegalArgumentException ex) {
LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.",
field.get(INTERNAL_COLUMN_NAME),
field.get(INTERNAL_SCHEMA_NAME),
field.get(INTERNAL_TABLE_NAME),
field.get(INTERNAL_COLUMN_TYPE)));
return JDBCType.VARCHAR;
}
}

@Override
public JsonSchemaPrimitive getJsonType(final JDBCType jdbcType) {
return switch (jdbcType) {
case BOOLEAN -> JsonSchemaPrimitive.BOOLEAN;
case TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, REAL, NUMERIC, DECIMAL -> JsonSchemaPrimitive.NUMBER;
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> JsonSchemaPrimitive.STRING_BINARY;
default -> JsonSchemaPrimitive.STRING;
};
}

protected void putBoolean(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, resultSet.getString(index).equalsIgnoreCase("t"));
}

@Override
protected void putDouble(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
if (resultSet.getMetaData().getColumnTypeName(index).equals("money")) {
putMoney(node, columnName, resultSet, index);
} else {
super.putDouble(node, columnName, resultSet, index);
node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> resultSet.getDouble(index)));
}
}

Expand Down
Loading

0 comments on commit c64f2b4

Please sign in to comment.