Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #2721] Implement dedicated set/get catalog/database operators #2728

Closed
wants to merge 14 commits into from
1 change: 1 addition & 0 deletions docs/deployment/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ Key | Default | Meaning | Type | Since
<code>kyuubi.engine.jdbc.connection.user</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The user is used for connecting to server</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.6.0</div>
<code>kyuubi.engine.jdbc.driver.class</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The driver class for jdbc engine connection</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.6.0</div>
<code>kyuubi.engine.jdbc.type</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The short name of jdbc type</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.6.0</div>
<code>kyuubi.engine.operation.convert.catalog.database.enabled</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When set to true, The engine converts the JDBC methods of set/get Catalog and set/get Schema to the implementation of different engines</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.6.0</div>
<code>kyuubi.engine.operation.log.dir.root</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>engine_operation_logs</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Root directory for query operation log at engine-side.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
<code>kyuubi.engine.pool.name</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>engine-pool</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The name of engine pool.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.5.0</div>
<code>kyuubi.engine.pool.size</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>-1</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The size of engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold).</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,24 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage

private lazy val resultMaxRowsDefault = getConf.get(ENGINE_FLINK_MAX_ROWS)

private lazy val operationConvertCatalogDatabaseDefault =
getConf.get(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED)

override def newExecuteStatementOperation(
session: Session,
statement: String,
confOverlay: Map[String, String],
runAsync: Boolean,
queryTimeout: Long): Operation = {
val flinkSession = session.asInstanceOf[FlinkSessionImpl]
if (flinkSession.sessionContext.getConfigMap.getOrDefault(
ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key,
operationConvertCatalogDatabaseDefault.toString).toBoolean) {
val catalogDatabaseOperation = processCatalogDatabase(session, statement, confOverlay)
if (catalogDatabaseOperation != null) {
return catalogDatabaseOperation
}
}
val mode = flinkSession.sessionContext.getConfigMap.getOrDefault(
OPERATION_PLAN_ONLY_MODE.key,
operationModeDefault)
Expand All @@ -59,6 +70,26 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
addOperation(op)
}

override def newSetCurrentCatalogOperation(session: Session, catalog: String): Operation = {
val op = new SetCurrentCatalog(session, catalog)
addOperation(op)
}

override def newGetCurrentCatalogOperation(session: Session): Operation = {
val op = new GetCurrentCatalog(session)
addOperation(op)
}

override def newSetCurrentDatabaseOperation(session: Session, database: String): Operation = {
val op = new SetCurrentDatabase(session, database)
addOperation(op)
}

override def newGetCurrentDatabaseOperation(session: Session): Operation = {
val op = new GetCurrentDatabase(session)
addOperation(op)
}

override def newGetTypeInfoOperation(session: Session): Operation = {
val op = new GetTypeInfo(session)
addOperation(op)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.flink.operation

import org.apache.kyuubi.engine.flink.result.ResultSetUtil
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
import org.apache.kyuubi.session.Session

class GetCurrentCatalog(session: Session)
extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) {

override protected def runInternal(): Unit = {
try {
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
val catalog = tableEnv.getCurrentCatalog
resultSet = ResultSetUtil.stringListToResultSet(List(catalog), TABLE_CAT)
} catch onError()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.flink.operation

import org.apache.kyuubi.engine.flink.result.ResultSetUtil
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_SCHEM
import org.apache.kyuubi.session.Session

class GetCurrentDatabase(session: Session)
extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) {

override protected def runInternal(): Unit = {
try {
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
val database = tableEnv.getCurrentDatabase
resultSet = ResultSetUtil.stringListToResultSet(List(database), TABLE_SCHEM)
} catch onError()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.flink.operation

import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.session.Session

class SetCurrentCatalog(session: Session, catalog: String)
extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) {

override protected def runInternal(): Unit = {
try {
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
tableEnv.useCatalog(catalog)
setHasResultSet(false)
} catch onError()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.flink.operation

import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.session.Session

class SetCurrentDatabase(session: Session, database: String)
extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) {

override protected def runInternal(): Unit = {
try {
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
tableEnv.useDatabase(database)
setHasResultSet(false)
} catch onError()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsRe
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiConf.OperationModes.NONE
import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
import org.apache.kyuubi.engine.flink.result.Constants
Expand All @@ -37,7 +37,7 @@ import org.apache.kyuubi.service.ServiceState._

class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
override def withKyuubiConf: Map[String, String] =
Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> NONE.toString)
Map(OPERATION_PLAN_ONLY_MODE.key -> NONE.toString)

override protected def jdbcUrl: String =
s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;"
Expand Down Expand Up @@ -665,6 +665,22 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
})
}

test("execute statement - set/get catalog") {
withSessionConf()(
Map(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key -> "true"))(
Map.empty) {
withJdbcStatement() { statement =>
statement.executeQuery("create catalog cat_a with ('type'='generic_in_memory')")
val catalog = statement.getConnection.getCatalog
assert(catalog == "default_catalog")
statement.getConnection.setCatalog("cat_a")
val changedCatalog = statement.getConnection.getCatalog
assert(changedCatalog == "cat_a")
assert(statement.execute("drop catalog cat_a"))
}
}
}

test("execute statement - create/alter/drop database") {
// TODO: validate table results after FLINK-25558 is resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@link3280 I see you comment it's fixed by FLINK-24685, should we update it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the comment could be deleted now.

withJdbcStatement()({ statement =>
Expand All @@ -674,6 +690,22 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
})
}

test("execute statement - set/get database") {
withSessionConf()(
Map(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key -> "true"))(
Map.empty) {
withJdbcStatement()({ statement =>
statement.executeQuery("create database db_a")
val schema = statement.getConnection.getSchema
assert(schema == "default_database")
statement.getConnection.setSchema("db_a")
val changedSchema = statement.getConnection.getSchema
assert(changedSchema == "db_a")
assert(statement.execute("drop database db_a"))
})
}
}

test("execute statement - create/alter/drop table") {
// TODO: validate table results after FLINK-25558 is resolved
withJdbcStatement()({ statement =>
Expand Down Expand Up @@ -777,7 +809,7 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
}

test("ensure result max rows") {
withSessionConf()(Map(KyuubiConf.ENGINE_FLINK_MAX_ROWS.key -> "200"))(Map.empty) {
withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "200"))(Map.empty) {
withJdbcStatement() { statement =>
statement.execute("create table tbl_src (a bigint) with ('connector' = 'datagen')")
val resultSet = statement.executeQuery(s"select a from tbl_src")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.hive.operation

import scala.collection.JavaConverters._

import org.apache.hive.service.cli.operation.Operation

import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.session.Session

class GetCurrentCatalog(session: Session)
extends HiveOperation(OperationType.EXECUTE_STATEMENT, session) {
// Hive does not support catalog
override val internalHiveOperation: Operation =
delegatedOperationManager.newExecuteStatementOperation(
hive,
"SELECT '' AS TABLE_CAT",
Map.empty[String, String].asJava,
false,
0)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.hive.operation

import scala.collection.JavaConverters._

import org.apache.hive.service.cli.operation.Operation

import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.session.Session

class GetCurrentDatabase(session: Session)
extends HiveOperation(OperationType.EXECUTE_STATEMENT, session) {

override val internalHiveOperation: Operation =
delegatedOperationManager.newExecuteStatementOperation(
hive,
"SELECT current_database()",
Map.empty[String, String].asJava,
false,
0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.List
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.rpc.thrift.TRowSet

import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.operation.{Operation, OperationHandle, OperationManager}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.Session
Expand All @@ -36,10 +37,36 @@ class HiveOperationManager() extends OperationManager("HiveOperationManager") {
confOverlay: Map[String, String],
runAsync: Boolean,
queryTimeout: Long): Operation = {
if (session.sessionManager.getConf.get(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED)) {
val catalogDatabaseOperation = processCatalogDatabase(session, statement, confOverlay)
if (catalogDatabaseOperation != null) {
return catalogDatabaseOperation
}
}
val operation = new ExecuteStatement(session, statement, confOverlay, runAsync, queryTimeout)
addOperation(operation)
}

override def newSetCurrentCatalogOperation(session: Session, catalog: String): Operation = {
val op = new SetCurrentCatalog(session, catalog)
addOperation(op)
}

override def newGetCurrentCatalogOperation(session: Session): Operation = {
val op = new GetCurrentCatalog(session)
addOperation(op)
}

override def newSetCurrentDatabaseOperation(session: Session, database: String): Operation = {
val op = new SetCurrentDatabase(session, database)
addOperation(op)
}

override def newGetCurrentDatabaseOperation(session: Session): Operation = {
val op = new GetCurrentDatabase(session)
addOperation(op)
}

override def newGetTypeInfoOperation(session: Session): Operation = {
val operation = new GetTypeInfo(session)
addOperation(operation)
Expand Down
Loading