Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #6253] Support running JDBC engine on YARN AM #6275

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.jdbc.connection.provider | &lt;undefined&gt; | A JDBC connection provider plugin for the Kyuubi Server to establish a connection to the JDBC URL. The configuration value should be a subclass of `org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider`. Kyuubi provides the following built-in implementations: <li>doris: For establishing Doris connections.</li> <li>mysql: For establishing MySQL connections.</li> <li>phoenix: For establishing Phoenix connections.</li> <li>postgresql: For establishing PostgreSQL connections.</li><li>starrocks: For establishing StarRocks connections.</li><li>impala: For establishing Impala connections.</li><li>clickhouse: For establishing clickhouse connections.</li> | string | 1.6.0 |
| kyuubi.engine.jdbc.connection.url | &lt;undefined&gt; | The server url that engine will connect to | string | 1.6.0 |
| kyuubi.engine.jdbc.connection.user | &lt;undefined&gt; | The user is used for connecting to server | string | 1.6.0 |
| kyuubi.engine.jdbc.deploy.mode | LOCAL | Configures the jdbc engine deploy mode, The value can be 'local', 'yarn'. In local mode, the engine operates on the same node as the KyuubiServer. In YARN mode, the engine runs within the Application Master (AM) container of YARN. | string | 1.10.0 |
| kyuubi.engine.jdbc.driver.class | &lt;undefined&gt; | The driver class for JDBC engine connection | string | 1.6.0 |
| kyuubi.engine.jdbc.extra.classpath | &lt;undefined&gt; | The extra classpath for the JDBC query engine, for configuring the location of the JDBC driver and etc. | string | 1.6.0 |
| kyuubi.engine.jdbc.fetch.size | 1000 | The fetch size of JDBC engine | int | 1.9.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
*/
package org.apache.kyuubi.engine.jdbc

import java.security.PrivilegedExceptionAction

import org.apache.hadoop.security.UserGroupInformation

import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.Utils.{addShutdownHook, JDBC_ENGINE_SHUTDOWN_PRIORITY}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_JDBC_INITIALIZE_SQL
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_DEPLOY_MODE, ENGINE_JDBC_INITIALIZE_SQL, ENGINE_KEYTAB, ENGINE_PRINCIPAL}
import org.apache.kyuubi.engine.deploy.DeployMode
import org.apache.kyuubi.engine.jdbc.JdbcSQLEngine.currentEngine
import org.apache.kyuubi.engine.jdbc.util.KyuubiJdbcUtils
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_RETRY_POLICY
Expand All @@ -38,6 +43,7 @@ class JdbcSQLEngine extends Serverable("JdbcSQLEngine") {
// Start engine self-terminating checker after all services are ready and it can be reached by
// all servers in engine spaces.
backendService.sessionManager.startTerminatingChecker(() => {
selfExited = true
currentEngine.foreach(_.stop())
})
}
Expand Down Expand Up @@ -66,15 +72,45 @@ object JdbcSQLEngine extends Logging {

def main(args: Array[String]): Unit = {
SignalRegister.registerLogger(logger)

try {
Utils.fromCommandLineArgs(args, kyuubiConf)
kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
kyuubiConf.setIfMissing(HA_ZK_CONN_RETRY_POLICY, RetryPolicies.N_TIME.toString)
val proxyUser = kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY)
require(proxyUser.isDefined, s"${KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY} is not set")
val realUser = UserGroupInformation.getLoginUser
val principal = kyuubiConf.get(ENGINE_PRINCIPAL)
val keytab = kyuubiConf.get(ENGINE_KEYTAB)

startEngine()
val ugi = DeployMode.withName(kyuubiConf.get(ENGINE_JDBC_DEPLOY_MODE)) match {
case DeployMode.LOCAL
if UserGroupInformation.isSecurityEnabled && principal.isDefined && keytab.isDefined =>
UserGroupInformation.loginUserFromKeytab(principal.get, keytab.get)
UserGroupInformation.getCurrentUser
case DeployMode.LOCAL if proxyUser.get != realUser.getShortUserName =>
val newUGI = UserGroupInformation.createProxyUser(proxyUser.get, realUser)
newUGI.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = {
val engineCredentials =
kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
engineCredentials.filter(_.nonEmpty).foreach { credentials =>
}
}
})
newUGI
case _ =>
UserGroupInformation.getCurrentUser
}

KyuubiJdbcUtils.initializeJdbcSession(kyuubiConf, kyuubiConf.get(ENGINE_JDBC_INITIALIZE_SQL))
ugi.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = {
startEngine()
KyuubiJdbcUtils.initializeJdbcSession(
kyuubiConf,
kyuubiConf.get(ENGINE_JDBC_INITIALIZE_SQL))
}
})
} catch {
case t: Throwable if currentEngine.isDefined =>
currentEngine.foreach { engine =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.deploy

import java.io.File

import scala.collection.mutable.ListBuffer

import org.apache.hadoop.security.UserGroupInformation

import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_JDBC_EXTRA_CLASSPATH
import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter
import org.apache.kyuubi.engine.jdbc.JdbcSQLEngine

object JdbcYarnModeSubmitter extends EngineYarnModeSubmitter {

def main(args: Array[String]): Unit = {
Utils.fromCommandLineArgs(args, kyuubiConf)

if (UserGroupInformation.isSecurityEnabled) {
require(
kyuubiConf.get(KyuubiConf.ENGINE_PRINCIPAL).isDefined
&& kyuubiConf.get(KyuubiConf.ENGINE_KEYTAB).isDefined,
s"${KyuubiConf.ENGINE_PRINCIPAL.key} and " +
s"${KyuubiConf.ENGINE_KEYTAB.key} must be set when submit " +
s"${JdbcSQLEngine.getClass.getSimpleName.stripSuffix("$")} to YARN")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, this condition could be relaxed. but this can be done later, independently. cc @cxzl25

for hive, we require keytab to run MR jobs, but for JDBC, most DB does not require kerberos authentication.

run()
}

override var engineType: String = "jdbc"

override def engineMainClass(): String = JdbcSQLEngine.getClass.getName

/**
* Jar list for the Hive engine.
*/
override def engineExtraJars(): Seq[File] = {
val hadoopCp = sys.env.get("HIVE_HADOOP_CLASSPATH")
val extraCp = kyuubiConf.get(ENGINE_JDBC_EXTRA_CLASSPATH)
val jars = new ListBuffer[File]
hadoopCp.foreach(cp => parseClasspath(cp, jars))
extraCp.foreach(cp => parseClasspath(cp, jars))
jars.toSeq
}

private[jdbc] def parseClasspath(classpath: String, jars: ListBuffer[File]): Unit = {
classpath.split(":").filter(_.nonEmpty).foreach { cp =>
if (cp.endsWith("/*")) {
val dir = cp.substring(0, cp.length - 2)
new File(dir) match {
case f if f.isDirectory =>
f.listFiles().filter(_.getName.endsWith(".jar")).foreach(jars += _)
case _ =>
}
} else {
jars += new File(cp)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class JdbcSessionManager(name: String)
}

private def stopSession(): Unit = {
JdbcSQLEngine.currentEngine.foreach(_.stop())
JdbcSQLEngine.currentEngine.foreach { engine =>
engine.selfExited = true
engine.stop()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kyuubi.engine.jdbc.clickhouse

import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.jdbc.WithJdbcEngine

trait WithClickHouseEngine extends WithJdbcEngine with WithClickHouseContainer {
Expand All @@ -29,6 +30,7 @@ trait WithClickHouseEngine extends WithJdbcEngine with WithClickHouseContainer {
ENGINE_JDBC_CONNECTION_PASSWORD.key -> container.password,
ENGINE_TYPE.key -> "jdbc",
ENGINE_JDBC_SHORT_NAME.key -> "clickhouse",
KYUUBI_SESSION_USER_KEY -> "kyuubi",
ENGINE_JDBC_DRIVER_CLASS.key -> container.driverClassName)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.deploy

import java.io.File

import scala.collection.mutable.ListBuffer

import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, SCALA_COMPILE_VERSION}
import org.apache.kyuubi.util.JavaUtils

class JdbcYarnModeSubmitterSuite extends KyuubiFunSuite {
val jdbcEngineHome: String = JavaUtils.getCodeSourceLocation(getClass).split("/target")(0)

test("hadoop class path") {
val jars = new ListBuffer[File]
val classpath =
s"$jdbcEngineHome/target/scala-$SCALA_COMPILE_VERSION/jars/*:" +
s"$jdbcEngineHome/target/kyuubi-jdbc-engine-$SCALA_COMPILE_VERSION-$KYUUBI_VERSION.jar"
JdbcYarnModeSubmitter.parseClasspath(classpath, jars)
assert(jars.nonEmpty)
assert(jars.exists(
_.getName == s"kyuubi-jdbc-engine-$SCALA_COMPILE_VERSION-$KYUUBI_VERSION.jar"))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kyuubi.engine.jdbc.doris

import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.jdbc.WithJdbcEngine

trait WithDorisEngine extends WithJdbcEngine with WithDorisContainer {
Expand All @@ -28,5 +29,6 @@ trait WithDorisEngine extends WithJdbcEngine with WithDorisContainer {
ENGINE_JDBC_CONNECTION_PASSWORD.key -> "",
ENGINE_TYPE.key -> "jdbc",
ENGINE_JDBC_SHORT_NAME.key -> "doris",
KYUUBI_SESSION_USER_KEY -> "kyuubi",
ENGINE_JDBC_DRIVER_CLASS.key -> "com.mysql.cj.jdbc.Driver")
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kyuubi.engine.jdbc.impala

import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.jdbc.WithJdbcEngine

trait WithImpalaEngine extends WithJdbcEngine with WithImpalaContainer {
Expand All @@ -26,5 +27,6 @@ trait WithImpalaEngine extends WithJdbcEngine with WithImpalaContainer {
ENGINE_JDBC_CONNECTION_URL.key -> hiveServerJdbcUrl,
ENGINE_TYPE.key -> "jdbc",
ENGINE_JDBC_SHORT_NAME.key -> "impala",
KYUUBI_SESSION_USER_KEY -> "kyuubi",
ENGINE_JDBC_DRIVER_CLASS.key -> ImpalaConnectionProvider.driverClass)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.dimafeng.testcontainers.scalatest.TestContainerForAll
import org.testcontainers.utility.DockerImageName

import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.jdbc.WithJdbcEngine

trait WithMySQLEngine extends WithJdbcEngine with TestContainerForAll {
Expand All @@ -40,6 +41,7 @@ trait WithMySQLEngine extends WithJdbcEngine with TestContainerForAll {
ENGINE_JDBC_CONNECTION_PASSWORD.key -> mysqlContainer.password,
ENGINE_TYPE.key -> "jdbc",
ENGINE_JDBC_SHORT_NAME.key -> "mysql",
KYUUBI_SESSION_USER_KEY -> "kyuubi",
ENGINE_JDBC_DRIVER_CLASS.key -> "com.mysql.cj.jdbc.Driver")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kyuubi.engine.jdbc.phoenix

import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.jdbc.WithJdbcEngine

trait WithPhoenixEngine extends WithJdbcEngine with WithPhoenixContainer {
Expand All @@ -32,6 +33,7 @@ trait WithPhoenixEngine extends WithJdbcEngine with WithPhoenixContainer {
ENGINE_JDBC_CONNECTION_PASSWORD.key -> "",
ENGINE_TYPE.key -> "jdbc",
ENGINE_JDBC_SHORT_NAME.key -> "phoenix",
KYUUBI_SESSION_USER_KEY -> "kyuubi",
ENGINE_JDBC_DRIVER_CLASS.key -> "org.apache.phoenix.queryserver.client.Driver")

private def getConnectString: String = s"$jdbcUrlPrefix=http://$queryServerUrl;$serialization"
Expand Down
Loading
Loading