Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Postgres: support all Postgres 14 types #8726

Merged
merged 16 commits into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class JdbcSourceOperations extends AbstractJdbcCompatibleSourceOperations

private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceOperations.class);

private JDBCType safeGetJdbcType(final int columnTypeInt) {
protected JDBCType safeGetJdbcType(final int columnTypeInt) {
try {
return JDBCType.valueOf(columnTypeInt);
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ public List<String> getValues() {
}

public String getNameWithTestPrefix() {
return nameSpace + "_" + testNumber + "_" + sourceType;
// source type may include space (e.g. "character varying")
return nameSpace + "_" + testNumber + "_" + sourceType.replaceAll("\\s", "_");
}

public String getCreateSqlQuery() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.17
LABEL io.airbyte.version=0.4.0
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,33 @@

package io.airbyte.integrations.source.postgres;

import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_SCHEMA_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_TABLE_NAME;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.DataTypeUtils;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.math.BigDecimal;
import java.sql.JDBCType;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Collections;
import org.postgresql.jdbc.PgResultSetMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresSourceOperations extends JdbcSourceOperations {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSourceOperations.class);

@Override
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
// the first call communicates with the database. after that the result is cached.
Expand All @@ -25,16 +39,23 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());

for (int i = 1; i <= columnCount; i++) {
final String columnType = metadata.getColumnTypeName(i);
// attempt to access the column. this allows us to know if it is null before we do type-specific
// parsing. if it is null, we can move on. while awkward, this seems to be the agreed upon way of
// checking for null values with jdbc.

if (metadata.getColumnTypeName(i).equalsIgnoreCase("money")) {
if (columnType.equalsIgnoreCase("money")) {
// when a column is of type MONEY, getObject will throw exception
// this is a bug that will not be fixed:
// https://github.com/pgjdbc/pgjdbc/issues/425
// https://github.com/pgjdbc/pgjdbc/issues/1835
queryContext.getString(i);
} else if (columnType.equalsIgnoreCase("bit")) {
// getObject will fail as it tries to parse the value as boolean
queryContext.getString(i);
} else if (columnType.equalsIgnoreCase("numeric") || columnType.equalsIgnoreCase("decimal")) {
// getObject will fail when the value is 'infinity'
queryContext.getDouble(i);
} else {
queryContext.getObject(i);
}
Expand All @@ -49,12 +70,106 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
return jsonNode;
}

@Override
public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException {
final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData();
final String columnName = metadata.getColumnName(colIndex);
final String columnTypeName = metadata.getColumnTypeName(colIndex);
final JDBCType columnType = safeGetJdbcType(metadata.getColumnType(colIndex));

if (columnTypeName.equalsIgnoreCase("bool") || columnTypeName.equalsIgnoreCase("boolean")) {
putBoolean(json, columnName, resultSet, colIndex);
} else if (columnTypeName.equalsIgnoreCase("bytea")) {
putString(json, columnName, resultSet, colIndex);
} else if (columnTypeName.equalsIgnoreCase("time") || columnTypeName.equalsIgnoreCase("timetz")) {
putString(json, columnName, resultSet, colIndex);
} else {
// https://www.postgresql.org/docs/14/datatype.html
switch (columnType) {
case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex);
case TINYINT, SMALLINT -> putShortInt(json, columnName, resultSet, colIndex);
case INTEGER -> putInteger(json, columnName, resultSet, colIndex);
case BIGINT -> putBigInt(json, columnName, resultSet, colIndex);
case FLOAT, DOUBLE -> putDouble(json, columnName, resultSet, colIndex);
case REAL -> putFloat(json, columnName, resultSet, colIndex);
case NUMERIC, DECIMAL -> putBigDecimal(json, columnName, resultSet, colIndex);
// BIT is a bit string in Postgres, e.g. '0100'
case BIT, CHAR, VARCHAR, LONGVARCHAR -> putString(json, columnName, resultSet, colIndex);
case DATE -> putDate(json, columnName, resultSet, colIndex);
case TIME -> putTime(json, columnName, resultSet, colIndex);
case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex);
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex);
default -> putDefault(json, columnName, resultSet, colIndex);
}
}
}

@Override
public JDBCType getFieldType(final JsonNode field) {
try {
final String typeName = field.get(INTERNAL_COLUMN_TYPE_NAME).asText();
// Postgres boolean is mapped to JDBCType.BIT, but should be BOOLEAN
if (typeName.equalsIgnoreCase("bool") || typeName.equalsIgnoreCase("boolean")) {
return JDBCType.BOOLEAN;
} else if (typeName.equalsIgnoreCase("bytea")) {
// BYTEA is variable length binary string with hex output format by default (e.g. "\x6b707a").
// It should not be converted to base64 binary string. So it is represented as JDBC VARCHAR.
// https://www.postgresql.org/docs/14/datatype-binary.html
return JDBCType.VARCHAR;
}

return JDBCType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt());
} catch (final IllegalArgumentException ex) {
LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.",
field.get(INTERNAL_COLUMN_NAME),
field.get(INTERNAL_SCHEMA_NAME),
field.get(INTERNAL_TABLE_NAME),
field.get(INTERNAL_COLUMN_TYPE)));
return JDBCType.VARCHAR;
}
}

@Override
public JsonSchemaPrimitive getJsonType(final JDBCType jdbcType) {
return switch (jdbcType) {
case BOOLEAN -> JsonSchemaPrimitive.BOOLEAN;
case TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, REAL, NUMERIC, DECIMAL -> JsonSchemaPrimitive.NUMBER;
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> JsonSchemaPrimitive.STRING_BINARY;
default -> JsonSchemaPrimitive.STRING;
};
}

protected void putBoolean(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, resultSet.getString(index).equalsIgnoreCase("t"));
}

protected void putBigDecimal(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index));
if (bigDecimal != null) {
node.put(columnName, bigDecimal);
return;
}

// ResultSet#getBigDecimal cannot handle Infinity, -Infinity, or NaN, and will throw exception,
// which becomes null. So we need to check these special values as string.
final String value = resultSet.getString(index);
if (value.equalsIgnoreCase("infinity")) {
node.put(columnName, Double.POSITIVE_INFINITY);
} else if (value.equalsIgnoreCase("-infinity")) {
node.put(columnName, Double.NEGATIVE_INFINITY);
} else if (value.equalsIgnoreCase("nan")) {
node.put(columnName, Double.NaN);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does json have a concept of infinity and Nan? how are these represented when output from the connector itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Actually Json numbers do not support NaN or infinity. To support these three special values, we need to make sure that the destination can handle them. Otherwise, the destination will fail. Too bad, I need to revert this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like how we currently handle dates, you can output them in a string type with a format hint.

Then have normalization handle the special format hint for these special numbers accordingly for the destination

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChristopheDuong, nice.

Where the code that handles this? I'd like to see what the format hint looks like.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the json, it's "string" type.

Then, on normalization, if we find a format hint, we know it's actually a date not a string:

and (definition["format"] == "date" or "date" in definition["format"])

Copy link
Contributor

@ChristopheDuong ChristopheDuong Dec 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example of catalog.json with date:

so you can have a special string-float type instead of using json's number in order to encode special float values:

"db_float_column": {
  "type": "string",
  "format": "float"
}

and normalization can handle it in SQL: For example in bigquery, based on https://stackoverflow.com/a/53692265:

there is no literal representation of NaN or infinity, but the following case-insensitive strings can be explicitly cast to float:

"NaN"
"inf" or "+inf"
"-inf"

case 
  when db_float_column = '-infinity' then cast("-inf" as float) 
  when db_float_column = '+infinity' then cast("+inf" as float)
  else cast(db_float_coumn as float) end

or differently for another destination

} else {
node.put(columnName, (BigDecimal) null);
}
}

@Override
protected void putDouble(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
if (resultSet.getMetaData().getColumnTypeName(index).equals("money")) {
putMoney(node, columnName, resultSet, index);
} else {
super.putDouble(node, columnName, resultSet, index);
node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> resultSet.getDouble(index)));
}
}

Expand Down
Loading