Skip to content

Commit

Permalink
GetColumns
Browse files Browse the repository at this point in the history
  • Loading branch information
cxzl25 committed Sep 19, 2022
1 parent 8e5e6c5 commit 69763bd
Showing 1 changed file with 48 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,14 @@ class GetColumns(
override protected def runInternal(): Unit = {
try {
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
val resolver = tableEnv.asInstanceOf[StreamTableEnvironmentImpl]
.getCatalogManager.getSchemaResolver
val resolver = tableEnv match {
case impl: StreamTableEnvironmentImpl =>
impl.getCatalogManager.getSchemaResolver
case _ =>
throw new UnsupportedOperationException(
"Unsupported Operation type GetColumns. You can execute " +
"DESCRIBE statement instead to get column infos.")
}

val catalogName =
if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
Expand All @@ -66,43 +72,15 @@ class GetColumns(
case Failure(_) => (tableName, None)
}
}
.filter {
case (_, None) => false
case _ => true
}.flatMap { case (tableName, flinkTable) =>
.filter { _._2.isDefined }
.flatMap { case (tableName, flinkTable) =>
val schema = flinkTable.get.getUnresolvedSchema.resolve(resolver)
val rows = schema.getColumns.asScala.toArray.zipWithIndex
schema.getColumns.asScala.toArray.zipWithIndex
.filter(c =>
columnNameRegex.pattern.matcher(c._1.getName).matches())
.map { case (column, pos) =>
val logicalType = column.getDataType.getLogicalType
Row.of(
catalogName, // TABLE_CAT
schemaName, // TABLE_SCHEM
tableName, // TABLE_NAME
column.getName, // COLUMN_NAME
Integer.valueOf(toJavaSQLType(logicalType)), // DATA_TYPE
logicalType.toString.replace(" NOT NULL", ""), // TYPE_NAME
null, // COLUMN_SIZE
null, // BUFFER_LENGTH
getDecimalDigits(logicalType), // DECIMAL_DIGITS
getNumPrecRadix(logicalType), // NUM_PREC_RADIX
Integer.valueOf(if (logicalType.isNullable) 1 else 0), // NULLABLE
column.getComment.orElse(""), // REMARKS
null, // COLUMN_DEF
null, // SQL_DATA_TYPE
null, // SQL_DATETIME_SUB
null, // CHAR_OCTET_LENGTH
Integer.valueOf(pos), // ORDINAL_POSITION
"YES", // IS_NULLABLE
null, // SCOPE_CATALOG
null, // SCOPE_SCHEMA
null, // SCOPE_TABLE
null, // SOURCE_DATA_TYPE
"NO" // IS_AUTO_INCREMENT
)
toColumnResult(catalogName, schemaName, tableName, column, pos)
}
rows
}
}
}
Expand Down Expand Up @@ -137,6 +115,42 @@ class GetColumns(
} catch onError()
}

private def toColumnResult(
catalogName: String,
schemaName: String,
tableName: String,
column: Column,
pos: Int): Row = {
val logicalType = column.getDataType.getLogicalType
// format: off
Row.of(
catalogName, // TABLE_CAT
schemaName, // TABLE_SCHEM
tableName, // TABLE_NAME
column.getName, // COLUMN_NAME
Integer.valueOf(toJavaSQLType(logicalType)), // DATA_TYPE
logicalType.toString.replace(" NOT NULL", ""), // TYPE_NAME
null, // COLUMN_SIZE
null, // BUFFER_LENGTH
getDecimalDigits(logicalType), // DECIMAL_DIGITS
getNumPrecRadix(logicalType), // NUM_PREC_RADIX
Integer.valueOf(if (logicalType.isNullable) 1 else 0), // NULLABLE
column.getComment.orElse(""), // REMARKS
null, // COLUMN_DEF
null, // SQL_DATA_TYPE
null, // SQL_DATETIME_SUB
null, // CHAR_OCTET_LENGTH
Integer.valueOf(pos), // ORDINAL_POSITION
"YES", // IS_NULLABLE
null, // SCOPE_CATALOG
null, // SCOPE_SCHEMA
null, // SCOPE_TABLE
null, // SOURCE_DATA_TYPE
"NO" // IS_AUTO_INCREMENT
)
// format: on
}

private def toJavaSQLType(flinkType: LogicalType): Int = flinkType.getClass match {
case c: Class[_] if c == classOf[NullType] => java.sql.Types.NULL
case c: Class[_] if c == classOf[BooleanType] => java.sql.Types.BOOLEAN
Expand Down

0 comments on commit 69763bd

Please sign in to comment.