Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector] update pgsql catalog for save mode #6080

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,19 @@ protected String getCreateDatabaseSql(String databaseName) {
return "CREATE DATABASE \"" + databaseName + "\"";
}

public String getExistDataSql(TablePath tablePath) {
String schemaName = tablePath.getSchemaName();
String tableName = tablePath.getTableName();
return String.format("select * from \"%s\".\"%s\" limit 1", schemaName, tableName);
}

@Override
protected String getTruncateTableSql(TablePath tablePath) {
String schemaName = tablePath.getSchemaName();
String tableName = tablePath.getTableName();
return "TRUNCATE TABLE \"" + schemaName + "\".\"" + tableName + "\"";
}

@Override
protected String getDropDatabaseSql(String databaseName) {
return "DROP DATABASE \"" + databaseName + "\"";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,4 +443,131 @@ public void tearDown() {
POSTGRESQL_CONTAINER.stop();
}
}

@TestTemplate
public void testCatalogForSaveMode(TestContainer container)
throws IOException, InterruptedException {
String schema = "public";
String databaseName = POSTGRESQL_CONTAINER.getDatabaseName();
TablePath tablePathPG = TablePath.of(databaseName, "public", "pg_e2e_source_table");
TablePath tablePathPG_Sink = TablePath.of(databaseName, "public", "pg_ide_sink_table_2");
PostgresCatalog postgresCatalog =
new PostgresCatalog(
DatabaseIdentifier.POSTGRESQL,
POSTGRESQL_CONTAINER.getUsername(),
POSTGRESQL_CONTAINER.getPassword(),
JdbcUrlUtil.getUrlInfo(POSTGRESQL_CONTAINER.getJdbcUrl()),
schema);
postgresCatalog.open();
CatalogTable catalogTable = postgresCatalog.getTable(tablePathPG);
// sink tableExists ?
boolean tableExistsBefore = postgresCatalog.tableExists(tablePathPG_Sink);
Assertions.assertFalse(tableExistsBefore);
// create table
postgresCatalog.createTable(tablePathPG_Sink, catalogTable, true);
boolean tableExistsAfter = postgresCatalog.tableExists(tablePathPG_Sink);
Assertions.assertTrue(tableExistsAfter);
// isExistsData ?
boolean existsDataBefore = postgresCatalog.isExistsData(tablePathPG_Sink);
Assertions.assertFalse(existsDataBefore);
// insert one data
String customSql =
"INSERT INTO\n"
+ " pg_ide_sink_table_2 (gid,\n"
+ " text_col,\n"
+ " varchar_col,\n"
+ " char_col,\n"
+ " boolean_col,\n"
+ " smallint_col,\n"
+ " integer_col,\n"
+ " bigint_col,\n"
+ " decimal_col,\n"
+ " numeric_col,\n"
+ " real_col,\n"
+ " double_precision_col,\n"
+ " smallserial_col,\n"
+ " serial_col,\n"
+ " bigserial_col,\n"
+ " date_col,\n"
+ " timestamp_col,\n"
+ " bpchar_col,\n"
+ " age,\n"
+ " name,\n"
+ " point,\n"
+ " linestring,\n"
+ " polygon_colums,\n"
+ " multipoint,\n"
+ " multilinestring,\n"
+ " multipolygon,\n"
+ " geometrycollection,\n"
+ " geog,\n"
+ " json_col,\n"
+ " jsonb_col, \n"
+ " xml_col \n"
+ " )\n"
+ "VALUES\n"
+ " (\n"
+ " '"
+ 999
+ "',\n"
+ " 'Hello World',\n"
+ " 'Test',\n"
+ " 'Testing',\n"
+ " true,\n"
+ " 10,\n"
+ " 100,\n"
+ " 1000,\n"
+ " 10.55,\n"
+ " 8.8888,\n"
+ " 3.14,\n"
+ " 3.14159265,\n"
+ " 1,\n"
+ " 100,\n"
+ " 10000,\n"
+ " '2023-05-07',\n"
+ " '2023-05-07 14:30:00',\n"
+ " 'Testing',\n"
+ " 21,\n"
+ " 'Leblanc',\n"
+ " ST_GeomFromText('POINT(-122.3452 47.5925)', 4326),\n"
+ " ST_GeomFromText(\n"
+ " 'LINESTRING(-122.3451 47.5924, -122.3449 47.5923)',\n"
+ " 4326\n"
+ " ),\n"
+ " ST_GeomFromText(\n"
+ " 'POLYGON((-122.3453 47.5922, -122.3453 47.5926, -122.3448 47.5926, -122.3448 47.5922, -122.3453 47.5922))',\n"
+ " 4326\n"
+ " ),\n"
+ " ST_GeomFromText(\n"
+ " 'MULTIPOINT(-122.3459 47.5927, -122.3445 47.5918)',\n"
+ " 4326\n"
+ " ),\n"
+ " ST_GeomFromText(\n"
+ " 'MULTILINESTRING((-122.3463 47.5920, -122.3461 47.5919),(-122.3459 47.5924, -122.3457 47.5923))',\n"
+ " 4326\n"
+ " ),\n"
+ " ST_GeomFromText(\n"
+ " 'MULTIPOLYGON(((-122.3458 47.5925, -122.3458 47.5928, -122.3454 47.5928, -122.3454 47.5925, -122.3458 47.5925)),((-122.3453 47.5921, -122.3453 47.5924, -122.3448 47.5924, -122.3448 47.5921, -122.3453 47.5921)))',\n"
+ " 4326\n"
+ " ),\n"
+ " ST_GeomFromText(\n"
+ " 'GEOMETRYCOLLECTION(POINT(-122.3462 47.5921), LINESTRING(-122.3460 47.5924, -122.3457 47.5924))',\n"
+ " 4326\n"
+ " ),\n"
+ " ST_GeographyFromText('POINT(-122.3452 47.5925)'),\n"
+ " '{\"key\":\"test\"}',\n"
+ " '{\"key\":\"test\"}',\n"
+ " '<XX:NewSize>test</XX:NewSize>'\n"
+ " )";
postgresCatalog.executeSql(tablePathPG_Sink, customSql);
boolean existsDataAfter = postgresCatalog.isExistsData(tablePathPG_Sink);
Assertions.assertTrue(existsDataAfter);
// truncateTable
postgresCatalog.truncateTable(tablePathPG_Sink, true);
Assertions.assertFalse(postgresCatalog.isExistsData(tablePathPG_Sink));
// drop table
postgresCatalog.dropTable(tablePathPG_Sink, true);
Assertions.assertFalse(postgresCatalog.tableExists(tablePathPG_Sink));
postgresCatalog.close();
}
}