Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
pan3793 committed Sep 7, 2022
1 parent 9ddb2af commit fd75238
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hive.service.rpc.thrift._
import org.apache.spark.SPARK_VERSION
import org.apache.spark.kyuubi.SparkContextHelper
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.types._

import org.apache.kyuubi.engine.SemanticVersion
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
import org.apache.kyuubi.engine.spark.schema.SchemaHelper.TIMESTAMP_NTZ
import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
Expand All @@ -55,6 +57,17 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with
}
}

test("audit Spark engine MetaData") {
withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
assert(metaData.getDatabaseProductName === "Spark SQL")
assert(metaData.getDatabaseProductVersion === SPARK_VERSION)
val ver = SemanticVersion(SPARK_VERSION)
assert(metaData.getDatabaseMajorVersion === ver.majorVersion)
assert(metaData.getDatabaseMinorVersion === ver.minorVersion)
}
}

test("get columns operation") {
val tableName = "spark_get_col_operation"
var schema = new StructType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ class FlinkOperationSuite extends WithKyuubiServerAndFlinkMiniCluster
}

test("server info provider - server") {
withSessionConf(Map(
KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "false",
KyuubiConf.SERVER_INFO_PROVIDER.key -> "SERVER"))()() {
withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "SERVER"))()() {
withSessionHandle { (client, handle) =>
val req = new TGetInfoReq()
req.setSessionHandle(handle)
Expand All @@ -86,9 +84,7 @@ class FlinkOperationSuite extends WithKyuubiServerAndFlinkMiniCluster
}

test("server info provider - engine") {
withSessionConf(Map(
KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "false",
KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE"))()() {
withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE"))()() {
withSessionHandle { (client, handle) =>
val req = new TGetInfoReq()
req.setSessionHandle(handle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ class KyuubiOperationHiveEnginePerUserSuite extends WithKyuubiServer with HiveEn
override protected def jdbcUrl: String = getJdbcUrl

test("server info provider - server") {
withSessionConf(Map(
KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "false",
KyuubiConf.SERVER_INFO_PROVIDER.key -> "SERVER"))()() {
withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "SERVER"))()() {
withSessionHandle { (client, handle) =>
val req = new TGetInfoReq()
req.setSessionHandle(handle)
Expand All @@ -54,9 +52,7 @@ class KyuubiOperationHiveEnginePerUserSuite extends WithKyuubiServer with HiveEn
}

test("server info provider - engine") {
withSessionConf(Map(
KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "false",
KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE"))()() {
withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE"))()() {
withSessionHandle { (client, handle) =>
val req = new TGetInfoReq()
req.setSessionHandle(handle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.sql.{DatabaseMetaData, ResultSet, SQLException, SQLFeatureNotSupport
import scala.util.Random

import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException}
import org.apache.kyuubi.engine.SemanticVersion
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._

// For both `in-memory` and `hive` external catalog
Expand Down Expand Up @@ -294,7 +293,7 @@ trait SparkMetadataTests extends HiveJDBCTestHelper {
}
}

test("audit Kyuubi Hive JDBC connection MetaData") {
test("audit Kyuubi Hive JDBC connection common MetaData") {
withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
Seq(
Expand Down Expand Up @@ -405,13 +404,8 @@ trait SparkMetadataTests extends HiveJDBCTestHelper {

assert(metaData.allTablesAreSelectable)
assert(metaData.getClientInfoProperties.next)
assert(metaData.getDatabaseProductName === "Apache Kyuubi (Incubating)")
assert(metaData.getDatabaseProductVersion === KYUUBI_VERSION)
assert(metaData.getDriverName === "Kyuubi Project Hive JDBC Shaded Client")
assert(metaData.getDriverVersion === KYUUBI_VERSION)
val ver = SemanticVersion(KYUUBI_VERSION)
assert(metaData.getDatabaseMajorVersion === ver.majorVersion)
assert(metaData.getDatabaseMinorVersion === ver.minorVersion)
assert(
metaData.getIdentifierQuoteString === " ",
"This method returns a space \" \" if identifier quoting is not supported")
Expand Down Expand Up @@ -455,21 +449,16 @@ trait SparkMetadataTests extends HiveJDBCTestHelper {
assert(metaData.getDefaultTransactionIsolation === java.sql.Connection.TRANSACTION_NONE)
assert(!metaData.supportsTransactions)
assert(!metaData.getProcedureColumns("", "%", "%", "%").next())
try {
assert(!metaData.getPrimaryKeys("", "default", "src").next())
} catch {
case e: Exception =>
assert(e.isInstanceOf[SQLException])
assert(e.getMessage.contains(KyuubiSQLException.featureNotSupported().getMessage))
val e1 = intercept[SQLException] {
metaData.getPrimaryKeys("", "default", "src").next()
}
assert(e1.getMessage.contains(KyuubiSQLException.featureNotSupported().getMessage))
assert(!metaData.getImportedKeys("", "default", "").next())
try {
assert(!metaData.getCrossReference("", "default", "src", "", "default", "src2").next())
} catch {
case e: Exception =>
assert(e.isInstanceOf[SQLException])
assert(e.getMessage.contains(KyuubiSQLException.featureNotSupported().getMessage))

val e2 = intercept[SQLException] {
metaData.getCrossReference("", "default", "src", "", "default", "src2").next()
}
assert(e2.getMessage.contains(KyuubiSQLException.featureNotSupported().getMessage))
assert(!metaData.getIndexInfo("", "default", "src", true, true).next())

assert(metaData.supportsResultSetType(new Random().nextInt()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,22 +199,12 @@ class KyuubiSessionImpl(
}

override def getInfo(infoType: TGetInfoType): TGetInfoValue = {
val provider = sessionConf.get(SERVER_INFO_PROVIDER)

if (client == null) {
if (provider != "SERVER") {
warn("The engine client haven't initialized, return the Kyuubi Server info " +
s"instead of $provider")
}
return super.getInfo(infoType)
}

waitForEngineLaunched()
provider match {
sessionConf.get(SERVER_INFO_PROVIDER) match {
case "SERVER" => super.getInfo(infoType)
case "ENGINE" => withAcquireRelease() {
client.getInfo(infoType).getInfoValue
}
waitForEngineLaunched()
client.getInfo(infoType).getInfoValue
}
case unknown => throw new IllegalArgumentException(s"Unknown server info provider $unknown")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TGetInfoReq, TGetInfoType, TStatusCode}
import org.scalatest.time.SpanSugar._

import org.apache.kyuubi.{Utils, WithKyuubiServer, WithSimpleDFSService}
import org.apache.kyuubi.{KYUUBI_VERSION, Utils, WithKyuubiServer, WithSimpleDFSService}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX
import org.apache.kyuubi.engine.SemanticVersion
import org.apache.kyuubi.jdbc.hive.KyuubiStatement
import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, SessionHandle}

Expand All @@ -43,6 +44,17 @@ class KyuubiOperationPerUserSuite
conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CONF_DIR", getHadoopConfDir)
}

test("audit Kyuubi server MetaData") {
withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
assert(metaData.getDatabaseProductName === "Apache Kyuubi (Incubating)")
assert(metaData.getDatabaseProductVersion === KYUUBI_VERSION)
val ver = SemanticVersion(KYUUBI_VERSION)
assert(metaData.getDatabaseMajorVersion === ver.majorVersion)
assert(metaData.getDatabaseMinorVersion === ver.minorVersion)
}
}

test("kyuubi defined function - system_user/session_user") {
withJdbcStatement() { statement =>
val rs = statement.executeQuery("SELECT system_user(), session_user()")
Expand Down Expand Up @@ -245,9 +257,7 @@ class KyuubiOperationPerUserSuite
}

test("server info provider - server") {
withSessionConf(Map(
KyuubiConf.SERVER_INFO_PROVIDER.key -> "SERVER",
KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "false"))()() {
withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "SERVER"))()() {
withSessionHandle { (client, handle) =>
val req = new TGetInfoReq()
req.setSessionHandle(handle)
Expand All @@ -258,9 +268,7 @@ class KyuubiOperationPerUserSuite
}

test("server info provider - engine") {
withSessionConf(Map(
KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE",
KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "false"))()() {
withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE"))()() {
withSessionHandle { (client, handle) =>
val req = new TGetInfoReq()
req.setSessionHandle(handle)
Expand Down

0 comments on commit fd75238

Please sign in to comment.