Skip to content

Commit

Permalink
NIFI-14275 Fix quoting configuration handling in UpdateDatabaseTable …
Browse files Browse the repository at this point in the history
…during ALTER
  • Loading branch information
sfc-gh-mgemra committed Feb 18, 2025
1 parent af8cedd commit 95c7ae1
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,9 @@ private synchronized OutputMetadataHolder checkAndUpdateTableSchema(
// Read in the current table metadata, compare it to the reader's schema, and
// add any columns from the schema that are missing in the table
try (final Statement s = conn.createStatement()) {
final DatabaseMetaData databaseMetaData = conn.getMetaData();
final String quoteString = databaseMetaData.getIdentifierQuoteString();

// Determine whether the table exists
TableSchema tableSchema = null;
try {
Expand All @@ -507,8 +510,6 @@ private synchronized OutputMetadataHolder checkAndUpdateTableSchema(
boolean tableCreated = false;
if (tableSchema == null) {
if (createIfNotExists) {
final DatabaseMetaData databaseMetaData = conn.getMetaData();
final String quoteString = databaseMetaData.getIdentifierQuoteString();

// Create a TableSchema from the record, adding all columns
for (RecordField recordField : schema.getFields()) {
Expand All @@ -523,22 +524,16 @@ private synchronized OutputMetadataHolder checkAndUpdateTableSchema(
columnName = recordFieldName;
}

final String qualifiedColumnName;
if (quoteColumnNames) {
qualifiedColumnName = s.enquoteIdentifier(columnName, true);
} else {
qualifiedColumnName = columnName;
}

final int dataType = DataTypeUtils.getSQLTypeValue(recordField.getDataType());
columns.add(new ColumnDescription(qualifiedColumnName, dataType, required, null, recordField.isNullable()));
final String quotedColumnName = enquoteIdentifier(columnName, quoteString, quoteColumnNames);
columns.add(new ColumnDescription(quotedColumnName, dataType, required, null, recordField.isNullable()));
getLogger().debug("Adding column {} to table {}", columnName, tableName);
}

final String qualifiedCatalogName = catalogName == null ? null : s.enquoteIdentifier(catalogName, quoteTableName);
final String qualifiedSchemaName = schemaName == null ? null : s.enquoteIdentifier(schemaName, quoteTableName);
final String qualifiedTableName = s.enquoteIdentifier(tableName, quoteTableName);
tableSchema = new TableSchema(qualifiedCatalogName, qualifiedSchemaName, qualifiedTableName, columns, translateFieldNames, normalizer, primaryKeyColumnNames, quoteString);
final String quotedCatalogName = enquoteIdentifier(catalogName, quoteString, quoteTableName);
final String quotedSchemaName = enquoteIdentifier(schemaName, quoteString, quoteTableName);
final String quotedTableName = enquoteIdentifier(tableName, quoteString, quoteTableName);
tableSchema = new TableSchema(quotedCatalogName, quotedSchemaName, quotedTableName, columns, translateFieldNames, normalizer, primaryKeyColumnNames, quoteString);

final TableDefinition tableDefinition = getTableDefinition(tableSchema);
final StatementRequest statementRequest = new StandardStatementRequest(StatementType.CREATE, tableDefinition);
Expand Down Expand Up @@ -583,15 +578,16 @@ private synchronized OutputMetadataHolder checkAndUpdateTableSchema(
if (!columnsToAdd.isEmpty()) {
final List<ColumnDefinition> columnDefinitions = columnsToAdd.stream().map(columnDescription ->
new StandardColumnDefinition(
columnDescription.getColumnName(),
enquoteIdentifier(columnDescription.getColumnName(), quoteString, quoteColumnNames),
columnDescription.getDataType(),
columnDescription.isNullable() ? ColumnDefinition.Nullable.YES : ColumnDefinition.Nullable.UNKNOWN,
columnDescription.isRequired()
)
)
.map(ColumnDefinition.class::cast)
.toList();
final TableDefinition tableDefinition = new TableDefinition(Optional.empty(), Optional.empty(), tableName, columnDefinitions);
final String qualifiedTableName = enquoteIdentifier(tableName, quoteString, quoteTableName);
final TableDefinition tableDefinition = new TableDefinition(Optional.empty(), Optional.empty(), qualifiedTableName, columnDefinitions);
final StatementRequest statementRequest = new StandardStatementRequest(StatementType.ALTER, tableDefinition);
final StatementResponse statementResponse = databaseDialectService.getStatement(statementRequest);

Expand Down Expand Up @@ -700,6 +696,13 @@ private TableDefinition getTableDefinition(final TableSchema tableSchema) {
);
}

private String enquoteIdentifier(final String identifier, final String quotedIdentifierString, final boolean quoteIdentifier) {
if (identifier != null && quoteIdentifier) {
return quotedIdentifierString + identifier + quotedIdentifierString;
}
return identifier;
}

private static class OutputMetadataHolder {
private final RecordSchema outputSchema;
private final Map<String, String> fieldMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,43 +114,24 @@ default String getTableAliasClause(String tableName) {
return "AS " + tableName;
}

/**
* Table Quote String usage limited to statement generation methods within DatabaseAdapter
*
* @return Table Quote String
*/
default String getTableQuoteString() {
// ANSI standard is a double quote
return "\"";
}

default String getColumnQuoteString() {
// ANSI standard is a double quote
return "\"";
}

default boolean supportsCreateTableIfNotExists() {
return false;
}

/**
* Generates a CREATE TABLE statement using the specified table schema
* @param tableSchema The table schema including column information
* @param quoteTableName Whether to quote the table name in the generated DDL
* @param quoteColumnNames Whether to quote column names in the generated DDL
* @return A String containing DDL to create the specified table
*/
default String getCreateTableStatement(TableSchema tableSchema, boolean quoteTableName, boolean quoteColumnNames) {
default String getCreateTableStatement(TableSchema tableSchema) {
StringBuilder createTableStatement = new StringBuilder();

List<ColumnDescription> columns = tableSchema.getColumnsAsList();
List<String> columnsAndDatatypes = new ArrayList<>(columns.size());
Set<String> primaryKeyColumnNames = tableSchema.getPrimaryKeyColumnNames();
for (ColumnDescription column : columns) {
StringBuilder sb = new StringBuilder()
.append(quoteColumnNames ? getColumnQuoteString() : "")
.append(column.getColumnName())
.append(quoteColumnNames ? getColumnQuoteString() : "")
.append(" ")
.append(getSQLForDataType(column.getDataType()))
.append(column.isNullable() ? "" : " NOT NULL")
Expand All @@ -159,32 +140,28 @@ default String getCreateTableStatement(TableSchema tableSchema, boolean quoteTab
}

createTableStatement.append("CREATE TABLE IF NOT EXISTS ")
.append(generateTableName(quoteTableName, tableSchema.getCatalogName(), tableSchema.getSchemaName(), tableSchema.getTableName(), tableSchema))
.append(generateTableName(tableSchema.getCatalogName(), tableSchema.getSchemaName(), tableSchema.getTableName(), tableSchema))
.append(" (")
.append(String.join(", ", columnsAndDatatypes))
.append(") ");

return createTableStatement.toString();
}

default String getAlterTableStatement(String tableName, List<ColumnDescription> columnsToAdd, final boolean quoteTableName, final boolean quoteColumnNames) {
default String getAlterTableStatement(String tableName, List<ColumnDescription> columnsToAdd) {
StringBuilder createTableStatement = new StringBuilder();

List<String> columnsAndDatatypes = new ArrayList<>(columnsToAdd.size());
for (ColumnDescription column : columnsToAdd) {
StringBuilder sb = new StringBuilder()
.append(quoteColumnNames ? getColumnQuoteString() : "")
.append(column.getColumnName())
.append(quoteColumnNames ? getColumnQuoteString() : "")
.append(" ")
.append(getSQLForDataType(column.getDataType()));
columnsAndDatatypes.add(sb.toString());
}

createTableStatement.append("ALTER TABLE ")
.append(quoteTableName ? getTableQuoteString() : "")
.append(tableName)
.append(quoteTableName ? getTableQuoteString() : "")
.append(" ADD COLUMNS (")
.append(String.join(", ", columnsAndDatatypes))
.append(") ");
Expand All @@ -208,39 +185,19 @@ default String getSQLForDataType(int sqlType) {
return JDBCType.valueOf(sqlType).getName();
}

default String generateTableName(final boolean quoteTableName, final String catalog, final String schemaName, final String tableName, final TableSchema tableSchema) {
default String generateTableName(final String catalog, final String schemaName, final String tableName, final TableSchema tableSchema) {
final StringBuilder tableNameBuilder = new StringBuilder();
if (catalog != null) {
if (quoteTableName) {
tableNameBuilder.append(tableSchema.getQuotedIdentifierString())
.append(catalog)
.append(tableSchema.getQuotedIdentifierString());
} else {
tableNameBuilder.append(catalog);
}

tableNameBuilder.append(catalog);
tableNameBuilder.append(".");
}

if (schemaName != null) {
if (quoteTableName) {
tableNameBuilder.append(tableSchema.getQuotedIdentifierString())
.append(schemaName)
.append(tableSchema.getQuotedIdentifierString());
} else {
tableNameBuilder.append(schemaName);
}

tableNameBuilder.append(schemaName);
tableNameBuilder.append(".");
}

if (quoteTableName) {
tableNameBuilder.append(tableSchema.getQuotedIdentifierString())
.append(tableName)
.append(tableSchema.getQuotedIdentifierString());
} else {
tableNameBuilder.append(tableName);
}
tableNameBuilder.append(tableName);

return tableNameBuilder.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ public StatementResponse getStatement(final StatementRequest statementRequest) {
final String sql;

if (StatementType.ALTER == statementType) {
sql = databaseAdapter.getAlterTableStatement(tableDefinition.tableName(), columnDescriptions, true, true);
sql = databaseAdapter.getAlterTableStatement(tableDefinition.tableName(), columnDescriptions);
} else if (StatementType.CREATE == statementType) {
final TableSchema tableSchema = getTableSchema(tableDefinition);
sql = databaseAdapter.getCreateTableStatement(tableSchema, false, false);
sql = databaseAdapter.getCreateTableStatement(tableSchema);
} else if (StatementType.UPSERT == statementType) {
sql = databaseAdapter.getUpsertStatement(tableDefinition.tableName(), columnNames, primaryKeyColumnNames);
} else if (StatementType.INSERT_IGNORE == statementType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,24 +104,20 @@ public String getSelectStatement(String tableName, String columnNames, String wh
}

@Override
public String getAlterTableStatement(final String tableName, final List<ColumnDescription> columnsToAdd, final boolean quoteTableName, final boolean quoteColumnNames) {
public String getAlterTableStatement(final String tableName, final List<ColumnDescription> columnsToAdd) {
List<String> columnsAndDatatypes = new ArrayList<>(columnsToAdd.size());
for (ColumnDescription column : columnsToAdd) {
String dataType = getSQLForDataType(column.getDataType());
StringBuilder sb = new StringBuilder("ADD ")
.append(quoteColumnNames ? getColumnQuoteString() : "")
.append(column.getColumnName())
.append(quoteColumnNames ? getColumnQuoteString() : "")
.append(" ")
.append(dataType);
columnsAndDatatypes.add(sb.toString());
}

StringBuilder alterTableStatement = new StringBuilder();
return alterTableStatement.append("ALTER TABLE ")
.append(quoteTableName ? getTableQuoteString() : "")
.append(tableName)
.append(quoteTableName ? getTableQuoteString() : "")
.append(" ")
.append(String.join(", ", columnsAndDatatypes))
.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,40 +121,26 @@ public String getInsertIgnoreStatement(String table, List<String> columnNames, C
return statementStringBuilder.toString();
}

@Override
public String getTableQuoteString() {
return "`";
}

@Override
public String getColumnQuoteString() {
return "`";
}

@Override
public boolean supportsCreateTableIfNotExists() {
return true;
}

@Override
public String getAlterTableStatement(final String tableName, final List<ColumnDescription> columnsToAdd, final boolean quoteTableName, final boolean quoteColumnNames) {
public String getAlterTableStatement(final String tableName, final List<ColumnDescription> columnsToAdd) {
List<String> columnsAndDatatypes = new ArrayList<>(columnsToAdd.size());
for (ColumnDescription column : columnsToAdd) {
String dataType = getSQLForDataType(column.getDataType());
StringBuilder sb = new StringBuilder("ADD COLUMN ")
.append(quoteColumnNames ? getColumnQuoteString() : "")
.append(column.getColumnName())
.append(quoteColumnNames ? getColumnQuoteString() : "")
.append(" ")
.append(dataType);
columnsAndDatatypes.add(sb.toString());
}

StringBuilder alterTableStatement = new StringBuilder();
return alterTableStatement.append("ALTER TABLE ")
.append(quoteTableName ? getTableQuoteString() : "")
.append(tableName)
.append(quoteTableName ? getTableQuoteString() : "")
.append(" ")
.append(String.join(", ", columnsAndDatatypes))
.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,25 +200,21 @@ private String getColumnAssignment(String table, String columnName, String newTa
}

@Override
public String getAlterTableStatement(String tableName, List<ColumnDescription> columnsToAdd, final boolean quoteTableName, final boolean quoteColumnNames) {
public String getAlterTableStatement(String tableName, List<ColumnDescription> columnsToAdd) {
StringBuilder createTableStatement = new StringBuilder();

List<String> columnsAndDatatypes = new ArrayList<>(columnsToAdd.size());
for (ColumnDescription column : columnsToAdd) {
String dataType = getSQLForDataType(column.getDataType());
StringBuilder sb = new StringBuilder()
.append(quoteColumnNames ? getColumnQuoteString() : "")
.append(column.getColumnName())
.append(quoteColumnNames ? getColumnQuoteString() : "")
.append(" ")
.append(dataType);
columnsAndDatatypes.add(sb.toString());
}

createTableStatement.append("ALTER TABLE ")
.append(quoteTableName ? getTableQuoteString() : "")
.append(tableName)
.append(quoteTableName ? getTableQuoteString() : "")
.append(" ADD (")
.append(String.join(", ", columnsAndDatatypes))
.append(") ");
Expand Down Expand Up @@ -256,15 +252,13 @@ public boolean supportsCreateTableIfNotExists() {
/**
* Generates a CREATE TABLE statement using the specified table schema
* @param tableSchema The table schema including column information
* @param quoteTableName Whether to quote the table name in the generated DDL
* @param quoteColumnNames Whether to quote column names in the generated DDL
* @return A String containing DDL to create the specified table
*/
@Override
public String getCreateTableStatement(TableSchema tableSchema, boolean quoteTableName, boolean quoteColumnNames) {
public String getCreateTableStatement(TableSchema tableSchema) {
StringBuilder createTableStatement = new StringBuilder()
.append("DECLARE\n\tsql_stmt long;\nBEGIN\n\tsql_stmt:='CREATE TABLE ")
.append(generateTableName(quoteTableName, tableSchema.getCatalogName(), tableSchema.getSchemaName(), tableSchema.getTableName(), tableSchema))
.append(generateTableName(tableSchema.getCatalogName(), tableSchema.getSchemaName(), tableSchema.getTableName(), tableSchema))
.append(" (");

List<ColumnDescription> columns = tableSchema.getColumnsAsList();
Expand All @@ -273,9 +267,7 @@ public String getCreateTableStatement(TableSchema tableSchema, boolean quoteTabl
ColumnDescription column = columns.get(i);
createTableStatement
.append((i != 0) ? ", " : "")
.append(quoteColumnNames ? getColumnQuoteString() : "")
.append(column.getColumnName())
.append(quoteColumnNames ? getColumnQuoteString() : "")
.append(" ")
.append(getSQLForDataType(column.getDataType()))
.append(column.isNullable() ? "" : " NOT NULL")
Expand Down
Loading

0 comments on commit 95c7ae1

Please sign in to comment.