Skip to content

Commit

Permalink
[KYUUBI #3122] GetInfo supports return server/engine info
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

Workaround for #3032, close #3323

There are known some ODBC drivers e.g. [Databricks ODBC driver](https://www.databricks.com/spark/odbc-drivers-download) depending on `TGetInfoType.CLI_DBMS_VER` and `TGetInfoType.CLI_DBMS_NAME` to check server compatibilities and abilities.

This PR proposes to introduce a new configuration `kyuubi.server.info.provider=SERVER/ENGINE` to make GetInfo support return either server or engine information.

Since beeline will call GetInfo in the initialization phase, to make sure the beeline fast open experience, in async launch mode, when the engine is not ready, return server info regardless `kyuubi.server.info.provider`.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [x] Add screenshots for manual tests if appropriate

Testing w/ PowerBI

![image](https://user-images.githubusercontent.com/26535726/188945975-0d0fc95c-f989-4025-ad7d-c024e23ec328.png)

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3122 from pan3793/info.

Closes #3122

742bdbe [Cheng Pan] nit
bb85d2b [Cheng Pan] style
fd75238 [Cheng Pan] fix
9ddb2af [Cheng Pan] nit
fd8f797 [Cheng Pan] fix ut
840205e [Cheng Pan] nit
f9996d5 [Cheng Pan] GetInfo supports returning engine info

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
pan3793 committed Sep 8, 2022
1 parent 5589406 commit d9f1d0b
Show file tree
Hide file tree
Showing 14 changed files with 220 additions and 82 deletions.
1 change: 1 addition & 0 deletions docs/deployment/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ kyuubi.operation.status.polling.timeout|PT5S|Timeout(ms) for long polling asynch

Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
kyuubi.server.info.provider|SERVER|The server information provider name, some clients may rely on this information to check the server compatibilities and functionalities. <li>SERVER: Return Kyuubi server information.</li> <li>ENGINE: Return Kyuubi engine information.</li>|string|1.6.1
kyuubi.server.limit.connections.per.ipaddress|&lt;undefined&gt;|Maximum kyuubi server connections per ipaddress. Any user exceeding this limit will not be allowed to connect.|int|1.6.0
kyuubi.server.limit.connections.per.user|&lt;undefined&gt;|Maximum kyuubi server connections per user. Any user exceeding this limit will not be allowed to connect.|int|1.6.0
kyuubi.server.limit.connections.per.user.ipaddress|&lt;undefined&gt;|Maximum kyuubi server connections per user:ipaddress combination. Any user-ipaddress exceeding this limit will not be allowed to connect.|int|1.6.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package org.apache.kyuubi.engine.flink.session

import scala.util.control.NonFatal

import org.apache.flink.runtime.util.EnvironmentInformation
import org.apache.flink.table.client.gateway.SqlExecutionException
import org.apache.flink.table.client.gateway.context.SessionContext
import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.session.{AbstractSession, SessionManager}

Expand Down Expand Up @@ -66,4 +68,13 @@ class FlinkSessionImpl(
}
super.open()
}

override def getInfo(infoType: TGetInfoType): TGetInfoValue = withAcquireRelease() {
infoType match {
case TGetInfoType.CLI_SERVER_NAME | TGetInfoType.CLI_DBMS_NAME =>
TGetInfoValue.stringValue("Apache Flink")
case TGetInfoType.CLI_DBMS_VER => TGetInfoValue.stringValue(EnvironmentInformation.getVersion)
case _ => throw KyuubiSQLException(s"Unrecognized GetInfoType value: $infoType")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import java.util.HashMap

import scala.collection.JavaConverters._

import org.apache.hive.common.util.HiveVersionInfo
import org.apache.hive.service.cli.session.HiveSession
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.hive.events.HiveSessionEvent
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.operation.{Operation, OperationHandle}
Expand Down Expand Up @@ -54,6 +56,19 @@ class HiveSessionImpl(
super.runOperation(operation)
}

override def getInfo(infoType: TGetInfoType): TGetInfoValue = withAcquireRelease() {
infoType match {
case TGetInfoType.CLI_SERVER_NAME => TGetInfoValue.stringValue("Hive")
case TGetInfoType.CLI_DBMS_NAME => TGetInfoValue.stringValue("Apache Hive")
case TGetInfoType.CLI_DBMS_VER => TGetInfoValue.stringValue(HiveVersionInfo.getVersion)
case TGetInfoType.CLI_ODBC_KEYWORDS => TGetInfoValue.stringValue("Unimplemented")
case TGetInfoType.CLI_MAX_COLUMN_NAME_LEN |
TGetInfoType.CLI_MAX_SCHEMA_NAME_LEN |
TGetInfoType.CLI_MAX_TABLE_NAME_LEN => TGetInfoValue.lenValue(128)
case _ => throw KyuubiSQLException(s"Unrecognized GetInfoType value: $infoType")
}
}

override def close(): Unit = {
sessionEvent.endTime = System.currentTimeMillis()
EventBus.post(sessionEvent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.kyuubi.engine.spark.session

import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}
import org.apache.spark.sql.{AnalysisException, SparkSession}

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.spark.events.SessionEvent
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
Expand Down Expand Up @@ -75,11 +76,24 @@ class SparkSessionImpl(
super.runOperation(operation)
}

override def getInfo(infoType: TGetInfoType): TGetInfoValue = withAcquireRelease() {
infoType match {
case TGetInfoType.CLI_SERVER_NAME | TGetInfoType.CLI_DBMS_NAME =>
TGetInfoValue.stringValue("Spark SQL")
case TGetInfoType.CLI_DBMS_VER => TGetInfoValue.stringValue(org.apache.spark.SPARK_VERSION)
case TGetInfoType.CLI_ODBC_KEYWORDS => TGetInfoValue.stringValue("Unimplemented")
case TGetInfoType.CLI_MAX_COLUMN_NAME_LEN |
TGetInfoType.CLI_MAX_SCHEMA_NAME_LEN |
TGetInfoType.CLI_MAX_TABLE_NAME_LEN => TGetInfoValue.lenValue(128)
case _ => throw KyuubiSQLException(s"Unrecognized GetInfoType value: $infoType")
}
}

override def close(): Unit = {
sessionEvent.endTime = System.currentTimeMillis()
EventBus.post(sessionEvent)
super.close()
spark.sessionState.catalog.getTempViewNames().foreach(spark.catalog.uncacheTable(_))
spark.sessionState.catalog.getTempViewNames().foreach(spark.catalog.uncacheTable)
sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager].closeILoop(handle)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hive.service.rpc.thrift._
import org.apache.spark.SPARK_VERSION
import org.apache.spark.kyuubi.SparkContextHelper
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.types._

import org.apache.kyuubi.engine.SemanticVersion
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
import org.apache.kyuubi.engine.spark.schema.SchemaHelper.TIMESTAMP_NTZ
import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
Expand All @@ -55,6 +57,17 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with
}
}

test("audit Spark engine MetaData") {
withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
assert(metaData.getDatabaseProductName === "Spark SQL")
assert(metaData.getDatabaseProductVersion === SPARK_VERSION)
val ver = SemanticVersion(SPARK_VERSION)
assert(metaData.getDatabaseMajorVersion === ver.majorVersion)
assert(metaData.getDatabaseMinorVersion === ver.minorVersion)
}
}

test("get columns operation") {
val tableName = "spark_get_col_operation"
var schema = new StructType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.kyuubi.it.flink.operation

import org.apache.hive.service.rpc.thrift.{TGetInfoReq, TGetInfoType}

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.it.flink.WithKyuubiServerAndFlinkMiniCluster
Expand Down Expand Up @@ -69,4 +71,26 @@ class FlinkOperationSuite extends WithKyuubiServerAndFlinkMiniCluster
assert(success)
}
}

test("server info provider - server") {
withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "SERVER"))()() {
withSessionHandle { (client, handle) =>
val req = new TGetInfoReq()
req.setSessionHandle(handle)
req.setInfoType(TGetInfoType.CLI_DBMS_NAME)
assert(client.GetInfo(req).getInfoValue.getStringValue === "Apache Kyuubi (Incubating)")
}
}
}

test("server info provider - engine") {
withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE"))()() {
withSessionHandle { (client, handle) =>
val req = new TGetInfoReq()
req.setSessionHandle(handle)
req.setInfoType(TGetInfoType.CLI_DBMS_NAME)
assert(client.GetInfo(req).getInfoValue.getStringValue === "Apache Flink")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.kyuubi.it.hive.operation

import org.apache.hive.service.rpc.thrift.{TGetInfoReq, TGetInfoType}

import org.apache.kyuubi.{HiveEngineTests, Utils, WithKyuubiServer}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
Expand All @@ -37,4 +39,26 @@ class KyuubiOperationHiveEnginePerUserSuite extends WithKyuubiServer with HiveEn
}

override protected def jdbcUrl: String = getJdbcUrl

test("server info provider - server") {
withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "SERVER"))()() {
withSessionHandle { (client, handle) =>
val req = new TGetInfoReq()
req.setSessionHandle(handle)
req.setInfoType(TGetInfoType.CLI_DBMS_NAME)
assert(client.GetInfo(req).getInfoValue.getStringValue === "Apache Kyuubi (Incubating)")
}
}
}

test("server info provider - engine") {
withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE"))()() {
withSessionHandle { (client, handle) =>
val req = new TGetInfoReq()
req.setSessionHandle(handle)
req.setInfoType(TGetInfoType.CLI_DBMS_NAME)
assert(client.GetInfo(req).getInfoValue.getStringValue === "Apache Hive")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1691,6 +1691,16 @@ object KyuubiConf {
.stringConf
.createOptional

val SERVER_INFO_PROVIDER: ConfigEntry[String] =
buildConf("kyuubi.server.info.provider")
.doc("The server information provider name, some clients may rely on this information" +
" to check the server compatibilities and functionalities." +
" <li>SERVER: Return Kyuubi server information.</li>" +
" <li>ENGINE: Return Kyuubi engine information.</li>")
.version("1.6.1")
.stringConf
.createWithDefault("SERVER")

val ENGINE_SPARK_SHOW_PROGRESS: ConfigEntry[Boolean] =
buildConf("kyuubi.session.engine.spark.showProgress")
.doc("When true, show the progress bar in the spark engine log.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package org.apache.kyuubi.session

import scala.collection.JavaConverters._

import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion, TRowSet, TTableSchema}
import org.apache.hive.service.rpc.thrift._

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_CLIENT_IP_KEY
import org.apache.kyuubi.operation.{Operation, OperationHandle}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
Expand Down Expand Up @@ -55,7 +55,7 @@ abstract class AbstractSession(

val normalizedConf: Map[String, String] = sessionManager.validateAndNormalizeConf(conf)

override lazy val name: Option[String] = normalizedConf.get(KyuubiConf.SESSION_NAME.key)
override lazy val name: Option[String] = normalizedConf.get(SESSION_NAME.key)

final private val opHandleSet = new java.util.HashSet[OperationHandle]

Expand Down Expand Up @@ -108,8 +108,8 @@ abstract class AbstractSession(

override def getInfo(infoType: TGetInfoType): TGetInfoValue = withAcquireRelease() {
infoType match {
case TGetInfoType.CLI_SERVER_NAME => TGetInfoValue.stringValue("Apache Kyuubi (Incubating)")
case TGetInfoType.CLI_DBMS_NAME => TGetInfoValue.stringValue("Apache Kyuubi (Incubating)")
case TGetInfoType.CLI_SERVER_NAME | TGetInfoType.CLI_DBMS_NAME =>
TGetInfoValue.stringValue("Apache Kyuubi (Incubating)")
case TGetInfoType.CLI_DBMS_VER => TGetInfoValue.stringValue(org.apache.kyuubi.KYUUBI_VERSION)
case TGetInfoType.CLI_ODBC_KEYWORDS => TGetInfoValue.stringValue("Unimplemented")
case TGetInfoType.CLI_MAX_COLUMN_NAME_LEN |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.sql.{DatabaseMetaData, ResultSet, SQLException, SQLFeatureNotSupport
import scala.util.Random

import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException}
import org.apache.kyuubi.engine.SemanticVersion
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._

// For both `in-memory` and `hive` external catalog
Expand Down Expand Up @@ -294,7 +293,7 @@ trait SparkMetadataTests extends HiveJDBCTestHelper {
}
}

test("audit Kyuubi Hive JDBC connection MetaData") {
test("audit Kyuubi Hive JDBC connection common MetaData") {
withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
Seq(
Expand Down Expand Up @@ -405,13 +404,8 @@ trait SparkMetadataTests extends HiveJDBCTestHelper {

assert(metaData.allTablesAreSelectable)
assert(metaData.getClientInfoProperties.next)
assert(metaData.getDatabaseProductName === "Apache Kyuubi (Incubating)")
assert(metaData.getDatabaseProductVersion === KYUUBI_VERSION)
assert(metaData.getDriverName === "Kyuubi Project Hive JDBC Shaded Client")
assert(metaData.getDriverVersion === KYUUBI_VERSION)
val ver = SemanticVersion(KYUUBI_VERSION)
assert(metaData.getDatabaseMajorVersion === ver.majorVersion)
assert(metaData.getDatabaseMinorVersion === ver.minorVersion)
assert(
metaData.getIdentifierQuoteString === " ",
"This method returns a space \" \" if identifier quoting is not supported")
Expand Down Expand Up @@ -455,21 +449,16 @@ trait SparkMetadataTests extends HiveJDBCTestHelper {
assert(metaData.getDefaultTransactionIsolation === java.sql.Connection.TRANSACTION_NONE)
assert(!metaData.supportsTransactions)
assert(!metaData.getProcedureColumns("", "%", "%", "%").next())
try {
assert(!metaData.getPrimaryKeys("", "default", "src").next())
} catch {
case e: Exception =>
assert(e.isInstanceOf[SQLException])
assert(e.getMessage.contains(KyuubiSQLException.featureNotSupported().getMessage))
val e1 = intercept[SQLException] {
metaData.getPrimaryKeys("", "default", "src").next()
}
assert(e1.getMessage.contains(KyuubiSQLException.featureNotSupported().getMessage))
assert(!metaData.getImportedKeys("", "default", "").next())
try {
assert(!metaData.getCrossReference("", "default", "src", "", "default", "src2").next())
} catch {
case e: Exception =>
assert(e.isInstanceOf[SQLException])
assert(e.getMessage.contains(KyuubiSQLException.featureNotSupported().getMessage))

val e2 = intercept[SQLException] {
metaData.getCrossReference("", "default", "src", "", "default", "src2").next()
}
assert(e2.getMessage.contains(KyuubiSQLException.featureNotSupported().getMessage))
assert(!metaData.getIndexInfo("", "default", "src", true, true).next())

assert(metaData.supportsResultSetType(new Random().nextInt()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ import org.scalatest.time._

import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_BIND_HOST, FRONTEND_CONNECTION_URL_USE_HOSTNAME, FRONTEND_THRIFT_BINARY_BIND_HOST, FRONTEND_THRIFT_BINARY_BIND_PORT}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.operation.{OperationHandle, TClientTestUtils}
import org.apache.kyuubi.service.TFrontendService.FeServiceServerContext
import org.apache.kyuubi.session.{AbstractSession, SessionHandle}

class TFrontendServiceSuite extends KyuubiFunSuite {

protected val server = new NoopTBinaryFrontendServer()
protected val conf = KyuubiConf()
protected val conf: KyuubiConf = KyuubiConf()
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
.set("kyuubi.test.server.should.fail", "false")
.set(KyuubiConf.SESSION_CHECK_INTERVAL, Duration.ofSeconds(5).toMillis)
Expand All @@ -54,8 +54,8 @@ class TFrontendServiceSuite extends KyuubiFunSuite {
}

override def afterAll(): Unit = {
server.getServices.foreach(_.stop())
super.afterAll()
server.getServices.foreach(_.stop())
}

private def checkOperationResult(
Expand Down Expand Up @@ -166,8 +166,7 @@ class TFrontendServiceSuite extends KyuubiFunSuite {
val req = new TGetInfoReq()
req.setSessionHandle(handle)
req.setInfoType(TGetInfoType.CLI_DBMS_VER)
val resp = client.GetInfo(req)
assert(resp.getInfoValue.getStringValue === org.apache.kyuubi.KYUUBI_VERSION)
assert(client.GetInfo(req).getInfoValue.getStringValue === org.apache.kyuubi.KYUUBI_VERSION)
req.setInfoType(TGetInfoType.CLI_SERVER_NAME)
assert(client.GetInfo(req).getInfoValue.getStringValue === "Apache Kyuubi (Incubating)")
req.setInfoType(TGetInfoType.CLI_DBMS_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,13 @@ class KyuubiSessionImpl(
}

override def getInfo(infoType: TGetInfoType): TGetInfoValue = {
if (client != null) {
withAcquireRelease() {
client.getInfo(infoType).getInfoValue
}
} else {
super.getInfo(infoType)
sessionConf.get(SERVER_INFO_PROVIDER) match {
case "SERVER" => super.getInfo(infoType)
case "ENGINE" => withAcquireRelease() {
waitForEngineLaunched()
client.getInfo(infoType).getInfoValue
}
case unknown => throw new IllegalArgumentException(s"Unknown server info provider $unknown")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe
}

test("transfer the TGetInfoReq to kyuubi engine side to verify the connection valid") {
withSessionConf(Map.empty)(Map(KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "false"))() {
withSessionConf(Map.empty)(Map(
KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE",
KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "false"))() {
withJdbcStatement() { statement =>
val conn = statement.getConnection.asInstanceOf[KyuubiConnection]
assert(conn.isValid(3000))
Expand Down
Loading

0 comments on commit d9f1d0b

Please sign in to comment.