-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Data type mapping for postgres export #5438
Conversation
1520ec1
to
958c4de
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like only Binary and Boolean type handler needed changes. Were you able to verify that for other types, the output between export and stream are consistent?
if (sourceConfig.getEngine() == EngineType.MYSQL) { | ||
final Map<String, Map<String, String>> tableColumnDataTypeMap = getColumnDataTypeMap( | ||
final Map<String, Map<String, String>> tableColumnDataTypeMap = getMySQLColumnDataTypeMap( | ||
(MySqlSchemaManager) schemaManager); | ||
dbTableMetadata = new DbTableMetadata(dbMetadata, tableColumnDataTypeMap); | ||
} else { | ||
dbTableMetadata = new DbTableMetadata(dbMetadata, Collections.emptyMap()); | ||
final Map<String, Map<String, String>> tableColumnDataTypeMap = getPostgresColumnDataTypeMap( | ||
(PostgresSchemaManager) schemaManager); | ||
dbTableMetadata = new DbTableMetadata(dbMetadata, tableColumnDataTypeMap); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This if-else block can be moved to a method, so here we only have:
DbTableMetadata dbTableMetadata = getDbTableMetadata(dbMetadata);
@@ -117,6 +122,38 @@ public List<String> getPrimaryKeys(final String fullTableName) { | |||
throw new RuntimeException("Failed to get primary keys for table " + fullTableName); | |||
} | |||
|
|||
|
|||
public Map<String, String> getColumnDataTypes(final String fullTableName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add this method declaration to the SchemaManager
interface? This way, we may be able to get rid of the if-else condition in RdsService class where this method is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
return sourceConfig.getTableNames().stream() | ||
.collect(Collectors.toMap( | ||
fullTableName -> fullTableName, | ||
fullTableName -> schemaManager.getColumnDataTypes(fullTableName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we put getColumnDataTypes
into SchemaManager interface, I think we can combine this method with the corresponding MySQL method and get rid of the if-else conditional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the changes
958c4de
to
8cc0e79
Compare
Yes, I verified the consistency between stream and export, only binary and boolean had changes. |
try (Connection connection = connectionManager.getConnection()) { | ||
final DatabaseMetaData metaData = connection.getMetaData(); | ||
// Retrieve column metadata | ||
try (ResultSet columns = metaData.getColumns(database, null, table, null)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
schema
should be used here:
try (ResultSet columns = metaData.getColumns(database, null, table, null)) { | |
try (ResultSet columns = metaData.getColumns(database, schema, table, null)) { |
} | ||
return columnsToDataType; | ||
} catch (final Exception e) { | ||
LOG.error("Failed to get dataTypes for database {} table {}, retrying", database, table, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to Include schema name in the error message as well.
} catch (final Exception e) { | ||
LOG.error("Failed to get dataTypes for database {} table {}, retrying", database, table, e); | ||
if (retry == NUM_OF_RETRIES) { | ||
throw new RuntimeException(String.format("Failed to get dataTypes for database %s table %s after " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly here, include schema name in the error message as well.
@@ -184,5 +186,13 @@ private void transformEvent(final Event event, final String fullTableName, final | |||
event.put(entry.getKey(), data); | |||
} | |||
} | |||
if (engineType == EngineType.POSTGRES) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remove the TODO line around Line 180?
if (value instanceof Map) { | ||
Object data = ((Map<?, ?>)value).get(BYTES_KEY); | ||
byte[] bytes = ((String) data).getBytes(StandardCharsets.ISO_8859_1); | ||
String hexString = "\\x" + bytesToHex(bytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: return it directly, no need to create a variable.
return value.toString(); | ||
} | ||
|
||
private static String bytesToHex(byte[] bytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why static?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it
8cc0e79
to
52b108b
Compare
return new DbTableMetadata(dbMetadata, tableColumnDataTypeMap); | ||
} | ||
|
||
private Map<String, Map<String, String>> getColumnDataTypeMap(final SchemaManager schemaManager) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think these method could be inside the config object itself.
return value.toString(); | ||
} | ||
|
||
private String bytesToHex(byte[] bytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private methods makes it hard to unit test them. This looks like a static utility that can live in a utility class that enables better unit testing of the logic inside this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method won't be used across classes mostly. Shouldn't we keep it here as it has more relevance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a separate util class
Signed-off-by: Divyansh Bokadia <[email protected]> Signed-off-by: Divyansh Bokadia <[email protected]> Data type mapping for postgres export Signed-off-by: Divyansh Bokadia <[email protected]> Data type mapping for postgres export Data type mapping for postgres export
52b108b
to
531f92f
Compare
Data type mapping for postgres export
Description
Issues Resolved
Contributes to #5309
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.