Skip to content


[KYUUBI #3519] Flink SQL Engine - GetColumns Operation
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [x] Add screenshots for manual tests if appropriate

- [x] [Run test]( locally before make a pull request

Closes #3519 from cxzl25/flink_get_columns.

Closes #3519

ab81776 [sychen] add column size
efdf1b9 [sychen] indent
630e907 [sychen] refactor
48a79d5 [sychen] add ut
69763bd [sychen] GetColumns
8e5e6c5 [sychen] GetColumns

Authored-by: sychen <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
cxzl25 authored and pan3793 committed Sep 28, 2022
1 parent 12a85fe commit 8419b7b
Show file tree
Hide file tree
Showing 5 changed files with 389 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,14 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
schemaName: String,
tableName: String,
columnName: String): Operation = {
throw new UnsupportedOperationException(
"Unsupported Operation type GetColumns. You can execute " +
"DESCRIBE statement instead to get column infos.")
val op = new GetColumns(
session = session,
catalogNameOrEmpty = catalogName,
schemaNamePattern = schemaName,
tableNamePattern = tableName,
columnNamePattern = columnName)


override def newGetFunctionsOperation(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.

package org.apache.kyuubi.engine.flink.operation

import scala.collection.JavaConverters._

import org.apache.commons.lang3.StringUtils
import org.apache.flink.table.api.{DataTypes, ResultKind}
import org.apache.flink.table.catalog.Column
import org.apache.flink.table.types.logical._
import org.apache.flink.types.Row

import org.apache.kyuubi.engine.flink.result.ResultSet
import org.apache.kyuubi.engine.flink.schema.SchemaHelper
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session

class GetColumns(
session: Session,
catalogNameOrEmpty: String,
schemaNamePattern: String,
tableNamePattern: String,
columnNamePattern: String)
extends FlinkOperation(session) {

override protected def runInternal(): Unit = {
try {
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
val resolver = tableEnv match {
case impl: StreamTableEnvironmentImpl =>
case _ =>
throw new UnsupportedOperationException(
"Unsupported Operation type GetColumns. You can execute " +
"DESCRIBE statement instead to get column infos.")

val catalogName =
if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
else catalogNameOrEmpty

val schemaNameRegex = toJavaRegex(schemaNamePattern)
val tableNameRegex = toJavaRegex(tableNamePattern)
val columnNameRegex = toJavaRegex(columnNamePattern).r

val columns = tableEnv.getCatalog(catalogName).asScala.toArray.flatMap { flinkCatalog =>
SchemaHelper.getSchemasWithPattern(flinkCatalog, schemaNameRegex)
.flatMap { schemaName =>
.filter { _._2.isDefined }
.flatMap { case (tableName, flinkTable) =>
val resolvedSchema = flinkTable.get.getUnresolvedSchema.resolve(resolver)
.filter { case (column, _) =>
.map { case (column, pos) =>
toColumnResult(catalogName, schemaName, tableName, column, pos)

resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
Column.physical(TABLE_CAT, DataTypes.STRING),
Column.physical(TABLE_SCHEM, DataTypes.STRING),
Column.physical(TABLE_NAME, DataTypes.STRING),
Column.physical(COLUMN_NAME, DataTypes.STRING),
Column.physical(DATA_TYPE, DataTypes.INT),
Column.physical(TYPE_NAME, DataTypes.STRING),
Column.physical(COLUMN_SIZE, DataTypes.INT),
Column.physical(BUFFER_LENGTH, DataTypes.TINYINT),
Column.physical(DECIMAL_DIGITS, DataTypes.INT),
Column.physical(NUM_PREC_RADIX, DataTypes.INT),
Column.physical(NULLABLE, DataTypes.INT),
Column.physical(REMARKS, DataTypes.STRING),
Column.physical(COLUMN_DEF, DataTypes.STRING),
Column.physical(SQL_DATA_TYPE, DataTypes.INT),
Column.physical(SQL_DATETIME_SUB, DataTypes.INT),
Column.physical(CHAR_OCTET_LENGTH, DataTypes.INT),
Column.physical(ORDINAL_POSITION, DataTypes.INT),
Column.physical(IS_NULLABLE, DataTypes.STRING),
Column.physical(SCOPE_CATALOG, DataTypes.STRING),
Column.physical(SCOPE_SCHEMA, DataTypes.STRING),
Column.physical(SCOPE_TABLE, DataTypes.STRING),
Column.physical(SOURCE_DATA_TYPE, DataTypes.SMALLINT),
Column.physical(IS_AUTO_INCREMENT, DataTypes.STRING))
} catch onError()

private def toColumnResult(
catalogName: String,
schemaName: String,
tableName: String,
column: Column,
pos: Int): Row = {
val logicalType = column.getDataType.getLogicalType
// format: off
catalogName, // TABLE_CAT
schemaName, // TABLE_SCHEM
tableName, // TABLE_NAME
column.getName, // COLUMN_NAME
Integer.valueOf(toJavaSQLType(logicalType)), // DATA_TYPE
logicalType.toString.replace(" NOT NULL", ""), // TYPE_NAME
getColumnSize(logicalType), // COLUMN_SIZE
getDecimalDigits(logicalType), // DECIMAL_DIGITS
getNumPrecRadix(logicalType), // NUM_PREC_RADIX
Integer.valueOf(if (logicalType.isNullable) 1 else 0), // NULLABLE
column.getComment.orElse(null), // REMARKS
null, // COLUMN_DEF
null, // SQL_DATA_TYPE
Integer.valueOf(pos), // ORDINAL_POSITION
if (logicalType.isNullable) "YES" else "NO", // IS_NULLABLE
null, // SCOPE_TABLE
// format: on

private def toJavaSQLType(flinkType: LogicalType): Int = flinkType.getClass match {
case c: Class[_] if c == classOf[NullType] => java.sql.Types.NULL
case c: Class[_] if c == classOf[BooleanType] => java.sql.Types.BOOLEAN
case c: Class[_] if c == classOf[TinyIntType] => java.sql.Types.TINYINT
case c: Class[_] if c == classOf[SmallIntType] => java.sql.Types.SMALLINT
case c: Class[_] if c == classOf[IntType] => java.sql.Types.INTEGER
case c: Class[_] if c == classOf[BigIntType] => java.sql.Types.BIGINT
case c: Class[_] if c == classOf[FloatType] => java.sql.Types.FLOAT
case c: Class[_] if c == classOf[DoubleType] => java.sql.Types.DOUBLE
case c: Class[_] if c == classOf[CharType] => java.sql.Types.CHAR
case c: Class[_] if c == classOf[VarCharType] => java.sql.Types.VARCHAR
case c: Class[_] if c == classOf[DecimalType] => java.sql.Types.DECIMAL
case c: Class[_] if c == classOf[DateType] => java.sql.Types.DATE
case c: Class[_] if c == classOf[TimestampType] => java.sql.Types.TIMESTAMP
case c: Class[_] if c == classOf[DayTimeIntervalType] => java.sql.Types.OTHER
case c: Class[_] if c == classOf[YearMonthIntervalType] => java.sql.Types.OTHER
case c: Class[_] if c == classOf[ZonedTimestampType] => java.sql.Types.TIMESTAMP
case c: Class[_] if c == classOf[TimeType] => java.sql.Types.TIME
case c: Class[_] if c == classOf[BinaryType] => java.sql.Types.BINARY
case c: Class[_] if c == classOf[VarBinaryType] => java.sql.Types.BINARY
case c: Class[_] if c == classOf[ArrayType] => java.sql.Types.ARRAY
case c: Class[_] if c == classOf[MapType] => java.sql.Types.JAVA_OBJECT
case c: Class[_] if c == classOf[MultisetType] => java.sql.Types.JAVA_OBJECT
case c: Class[_] if c == classOf[StructuredType] => java.sql.Types.STRUCT
case c: Class[_] if c == classOf[DistinctType] => java.sql.Types.OTHER
case c: Class[_] if c == classOf[RawType[_]] => java.sql.Types.OTHER
case c: Class[_] if c == classOf[RowType] => java.sql.Types.STRUCT
case c: Class[_] if c == classOf[SymbolType[_]] => java.sql.Types.OTHER
case _ => java.sql.Types.OTHER

private def getColumnSize(flinkType: LogicalType): Integer = flinkType.getClass match {
case c: Class[_] if c == classOf[TinyIntType] => 3
case c: Class[_] if c == classOf[SmallIntType] => 5
case c: Class[_] if c == classOf[IntType] => 10
case c: Class[_] if c == classOf[DateType] => 10
case c: Class[_] if c == classOf[BigIntType] => 19
case c: Class[_] if c == classOf[FloatType] => 7
case c: Class[_] if c == classOf[DoubleType] => 15
case c: Class[_] if c == classOf[DecimalType] => flinkType.asInstanceOf[DecimalType].getScale
case c: Class[_] if c == classOf[VarCharType] => Integer.MAX_VALUE
case c: Class[_] if c == classOf[BinaryType] => Integer.MAX_VALUE
case c: Class[_] if c == classOf[TimestampType] => 29
case _ => null

private def getDecimalDigits(flinkType: LogicalType): Integer = flinkType.getClass match {
case c: Class[_] if c == classOf[BooleanType] => 0
case c: Class[_] if c == classOf[TinyIntType] => 0
case c: Class[_] if c == classOf[SmallIntType] => 0
case c: Class[_] if c == classOf[IntType] => 0
case c: Class[_] if c == classOf[BigIntType] => 0
case c: Class[_] if c == classOf[FloatType] => 7
case c: Class[_] if c == classOf[DoubleType] => 15
case c: Class[_] if c == classOf[DecimalType] => flinkType.asInstanceOf[DecimalType].getScale
case c: Class[_] if c == classOf[TimestampType] => 9
case _ => null

private def getNumPrecRadix(flinkType: LogicalType): Integer = flinkType.getClass match {
case c: Class[_]
if c == classOf[TinyIntType] || c == classOf[SmallIntType]
|| c == classOf[IntType] || c == classOf[BigIntType]
|| c == classOf[FloatType] || c == classOf[DoubleType]
|| c == classOf[DecimalType] => 10
case _ => null
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

package org.apache.kyuubi.engine.flink.operation

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

import org.apache.commons.lang3.StringUtils
import org.apache.flink.table.api.{DataTypes, ResultKind}
import org.apache.flink.table.catalog.{Column, ObjectIdentifier}
import org.apache.flink.table.catalog.Column
import org.apache.flink.types.Row

import org.apache.kyuubi.engine.flink.result.ResultSet
import org.apache.kyuubi.engine.flink.schema.SchemaHelper
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session

Expand All @@ -45,22 +43,17 @@ class GetTables(
if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
else catalogNameOrEmpty

val schemaNameRegex = toJavaRegex(schemaNamePattern).r
val tableNameRegex = toJavaRegex(tableNamePattern).r
val schemaNameRegex = toJavaRegex(schemaNamePattern)
val tableNameRegex = toJavaRegex(tableNamePattern)

val tables = tableEnv.getCatalog(catalogName).asScala.toArray.flatMap { flinkCatalog =>
.filter { schemaName => schemaNameRegex.pattern.matcher(schemaName).matches() }
SchemaHelper.getSchemasWithPattern(flinkCatalog, schemaNameRegex)
.flatMap { schemaName =>
.filter { tableName => tableNameRegex.pattern.matcher(tableName).matches() }
.map { tableName =>
val objPath = ObjectIdentifier.of(catalogName, schemaName, tableName).toObjectPath
Try(flinkCatalog.getTable(objPath)) match {
case Success(flinkTable) => (tableName, Some(flinkTable))
case Failure(_) => (tableName, None)
.filter {
case (_, None) => false
case (_, Some(flinkTable)) => tableTypes.contains(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.

package org.apache.kyuubi.engine.flink.schema

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

import org.apache.flink.table.catalog.{Catalog, CatalogBaseTable, ObjectIdentifier}

object SchemaHelper {

def getSchemasWithPattern(flinkCatalog: Catalog, schemaNamePattern: String): Array[String] = {
val p = schemaNamePattern.r.pattern
.filter { schemaName => p.matcher(schemaName).matches() }

def getTablesWithPattern(
flinkCatalog: Catalog,
schemaName: String,
tableNamePattern: String): Array[String] = {
val p = tableNamePattern.r.pattern
.filter { tableName => p.matcher(tableName).matches() }

def getFlinkTablesWithPattern(
flinkCatalog: Catalog,
catalogName: String,
schemaName: String,
tableNamePattern: String): Array[(String, Option[CatalogBaseTable])] = {
getTablesWithPattern(flinkCatalog, schemaName, tableNamePattern).map { tableName =>
getFlinkTable(flinkCatalog, catalogName, schemaName, tableName)

def getFlinkTable(
flinkCatalog: Catalog,
catalogName: String,
schemaName: String,
tableName: String): (String, Option[CatalogBaseTable]) = {
val objPath = ObjectIdentifier.of(catalogName, schemaName, tableName).toObjectPath
Try(flinkCatalog.getTable(objPath)) match {
case Success(flinkTable) => (tableName, Some(flinkTable))
case Failure(_) => (tableName, None)

0 comments on commit 8419b7b

Please sign in to comment.