Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support to run HiveSQLEngine on kerberized YARN #6199

Closed
Closed
2 changes: 2 additions & 0 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.apache.hadoop.security.UserGroupInformation

import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_HIVE_DEPLOY_MODE, ENGINE_KEYTAB, ENGINE_PRINCIPAL}
import org.apache.kyuubi.engine.deploy.DeployMode
import org.apache.kyuubi.engine.hive.HiveSQLEngine.currentEngine
import org.apache.kyuubi.engine.hive.events.{HiveEngineEvent, HiveEventHandlerRegister}
import org.apache.kyuubi.events.EventBus
Expand Down Expand Up @@ -133,26 +135,37 @@ object HiveSQLEngine extends Logging {
SignalRegister.registerLogger(logger)
try {
Utils.fromCommandLineArgs(args, kyuubiConf)
val sessionUser = kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY)
val proxyUser = kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY)
require(proxyUser.isDefined, s"${KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY} is not set")
val realUser = UserGroupInformation.getLoginUser

if (sessionUser.isEmpty || sessionUser.get == realUser.getShortUserName) {
startEngine()
} else {
val effectiveUser = UserGroupInformation.createProxyUser(sessionUser.get, realUser)
effectiveUser.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = {
val engineCredentials =
kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
engineCredentials.filter(_.nonEmpty).foreach { credentials =>
HiveTBinaryFrontendService.renewDelegationToken(credentials)
val principal = kyuubiConf.get(ENGINE_PRINCIPAL)
val keytab = kyuubiConf.get(ENGINE_KEYTAB)

val ugi = DeployMode.withName(kyuubiConf.get(ENGINE_HIVE_DEPLOY_MODE)) match {
case DeployMode.LOCAL
if UserGroupInformation.isSecurityEnabled && principal.isDefined && keytab.isDefined =>
UserGroupInformation.loginUserFromKeytab(principal.get, keytab.get)
UserGroupInformation.getCurrentUser
case DeployMode.LOCAL if proxyUser.get != realUser.getShortUserName =>
val newUGI = UserGroupInformation.createProxyUser(proxyUser.get, realUser)
newUGI.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = {
val engineCredentials =
kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
engineCredentials.filter(_.nonEmpty).foreach { credentials =>
HiveTBinaryFrontendService.renewDelegationToken(credentials)
}
}
startEngine()
}
})
})
newUGI
case _ =>
UserGroupInformation.getCurrentUser
}

ugi.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = startEngine()
})
} catch {
case t: Throwable =>
currentEngine match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import java.io.File

import scala.collection.mutable.ListBuffer

import org.apache.hadoop.security.UserGroupInformation

import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_HIVE_EXTRA_CLASSPATH
import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter
import org.apache.kyuubi.engine.hive.HiveSQLEngine
Expand All @@ -29,7 +32,16 @@ object HiveYarnModeSubmitter extends EngineYarnModeSubmitter {

def main(args: Array[String]): Unit = {
Utils.fromCommandLineArgs(args, kyuubiConf)
submitApplication()

if (UserGroupInformation.isSecurityEnabled) {
require(
kyuubiConf.get(KyuubiConf.ENGINE_PRINCIPAL).isDefined
&& kyuubiConf.get(KyuubiConf.ENGINE_KEYTAB).isDefined,
s"${KyuubiConf.ENGINE_PRINCIPAL.key} and " +
s"${KyuubiConf.ENGINE_KEYTAB.key} must be set when submit " +
s"${HiveSQLEngine.getClass.getSimpleName.stripSuffix("$")} to YARN")
}
run()
}

override var engineType: String = "hive"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.commons.lang3.{JavaVersion, SystemUtils}

import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf.ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED
import org.apache.kyuubi.config.KyuubiReservedKeys
import org.apache.kyuubi.engine.hive.HiveSQLEngine
import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.util.command.CommandLineUtils._
Expand All @@ -34,7 +35,9 @@ class HiveCatalogDatabaseOperationSuite extends HiveJDBCTestHelper {
CONF,
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true",
CONF,
s"${ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key}=true")
s"${ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key}=true",
CONF,
s"${KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY}=kyuubi")
HiveSQLEngine.main(args)
super.beforeAll()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.hive.operation
import org.apache.commons.lang3.{JavaVersion, SystemUtils}

import org.apache.kyuubi.{HiveEngineTests, KYUUBI_VERSION, Utils}
import org.apache.kyuubi.config.KyuubiReservedKeys
import org.apache.kyuubi.engine.hive.HiveSQLEngine
import org.apache.kyuubi.jdbc.hive.KyuubiStatement
import org.apache.kyuubi.util.command.CommandLineUtils._
Expand All @@ -31,7 +32,9 @@ class HiveOperationSuite extends HiveEngineTests {
metastore.toFile.delete()
val args = Array(
CONF,
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true")
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true",
CONF,
s"${KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY}=kyuubi")
HiveSQLEngine.main(args)
super.beforeAll()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2836,6 +2836,20 @@ object KyuubiConf {
.stringConf
.createOptional

val ENGINE_PRINCIPAL: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.principal")
.doc("Kerberos principal for the kyuubi engine.")
.version("1.10.0")
.stringConf
.createOptional

val ENGINE_KEYTAB: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.keytab")
.doc("Kerberos keytab for the kyuubi engine.")
.version("1.10.0")
.stringConf
.createOptional

val ENGINE_FLINK_MEMORY: ConfigEntry[String] =
buildConf("kyuubi.engine.flink.memory")
.doc("The heap memory for the Flink SQL engine. Only effective in yarn session mode.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@
package org.apache.kyuubi.engine.deploy.yarn

import java.io.{File, IOException}
import java.security.PrivilegedExceptionAction

import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.fs.Path
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier

import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.{KyuubiException, Logging, Utils}
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.KyuubiHadoopUtils
import org.apache.kyuubi.util.command.CommandLineUtils.confKeyValues
Expand Down Expand Up @@ -71,7 +74,43 @@ object ApplicationMaster extends Logging {
unregister(finalStatus, finalMsg)
}
})
runApplicationMaster()

val ugi = kyuubiConf.get(KyuubiConf.ENGINE_PRINCIPAL) match {
case Some(principalName) if UserGroupInformation.isSecurityEnabled =>
val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
val keytabFilename = kyuubiConf.get(KyuubiConf.ENGINE_KEYTAB).orNull
if (!new File(keytabFilename).exists()) {
throw new KyuubiException(s"Keytab file: $keytabFilename does not exist")
} else {
info("Attempting to login to Kerberos " +
s"using principal: $principalName and keytab: $keytabFilename")
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
}

val newUGI = UserGroupInformation.getCurrentUser()

// Transfer YARN_AM_RM_TOKEN to the new user.
// Not transfer other tokens, such as HDFS_DELEGATION_TOKEN,
// to avoid "org.apache.hadoop.ipc.RemoteException(java.io.IOException):
// Delegation Token can be issued only with kerberos or web authentication"
// when engine tries to obtain new tokens.
KyuubiHadoopUtils.getTokenMap(originalCreds).values
.find(_.getKind == AMRMTokenIdentifier.KIND_NAME)
.foreach { token =>
newUGI.addToken(token)
}
newUGI
case _ =>
val appUser = kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY)
require(appUser.isDefined, s"${KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY} is not set")
val newUGI = UserGroupInformation.createRemoteUser(appUser.get)
newUGI.addCredentials(UserGroupInformation.getCurrentUser.getCredentials)
newUGI
}

ugi.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = runApplicationMaster()
})
} catch {
case t: Throwable =>
error("Error running ApplicationMaster", t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package org.apache.kyuubi.engine.deploy.yarn
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The submitter process determine whether to login as well in kerberized environment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what "determine whether to login as well" means. Can you elaborate it?


import java.io._
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.security.PrivilegedExceptionAction
import java.util
import java.util.{Locale, Properties}
import java.util.{Locale, Properties, UUID}
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConverters._
Expand All @@ -30,7 +32,8 @@ import scala.collection.mutable.ListBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records._
Expand All @@ -40,6 +43,7 @@ import org.apache.hadoop.yarn.util.Records
import org.apache.kyuubi.{KyuubiException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter._
import org.apache.kyuubi.util.KyuubiHadoopUtils

Expand Down Expand Up @@ -81,6 +85,9 @@ abstract class EngineYarnModeSubmitter extends Logging {

var yarnConf: Configuration = _
var hadoopConf: Configuration = _
var appUser: String = _
var keytab: String = _
var amKeytabFileName: Option[String] = _

var engineType: String

Expand All @@ -91,9 +98,35 @@ abstract class EngineYarnModeSubmitter extends Logging {
*/
def engineExtraJars(): Seq[File] = Seq.empty

protected def submitApplication(): Unit = {
def run(): Unit = {
yarnConf = KyuubiHadoopUtils.newYarnConfiguration(kyuubiConf)
hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf)
appUser = kyuubiConf.getOption(KYUUBI_SESSION_USER_KEY).orNull
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this correct for GROUP and SERVER share level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KYUUBI_SESSION_USER_KEY is set to EngineRef#appUser when submit Engines. So it's correct for all share levels.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird. I remember we use this key to distinguish the session user and app user in some cases ...

Copy link
Contributor Author

@zhouyifan279 zhouyifan279 Mar 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is confusing because we currently use KYUUBI_SESSION_USER_KEY in both ProcBuilder and Engine side Operation.
Replacing KYUUBI_SESSION_USER_KEY with another config when used in ProcBuilder can eliminate confusion.
Let's do it in another PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

require(appUser != null, s"$KYUUBI_SESSION_USER_KEY is not set")
keytab = kyuubiConf.get(ENGINE_KEYTAB).orNull
val principal = kyuubiConf.get(ENGINE_PRINCIPAL).orNull
amKeytabFileName =
if (UserGroupInformation.isSecurityEnabled && principal != null && keytab != null) {
info(s"Kerberos credentials: principal = $principal, keytab = $keytab")
UserGroupInformation.loginUserFromKeytab(principal, keytab)
// Generate a file name that can be used for the keytab file, that does not conflict
// with any user file.
Some(new File(keytab).getName() + "-" + UUID.randomUUID().toString)
} else {
None
}

val ugi = if (UserGroupInformation.getCurrentUser.getShortUserName == appUser) {
UserGroupInformation.getCurrentUser
} else {
UserGroupInformation.createProxyUser(appUser, UserGroupInformation.getCurrentUser)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also supports proxy users, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should only happen when using TGT cache with YARN mode, though it does not work ... it's good to keep those code path in case some one modifies the Hive source code to make it workable, but we'd better add a warning message to explicitly tell users this is not likely to work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More precisely speaking, proxy user mechanism works fine during the submitting progress. AM can be started successfully.
But HiveSQLEngine currently only works with principal & keytab provided.
I think we should check this prerequisite in HiveYarnModeProcessBuilder.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, exactly

}
ugi.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = submitApplication()
})
}

protected def submitApplication(): Unit = {
try {
yarnClient = YarnClient.createYarnClient()
yarnClient.init(yarnConf)
Expand Down Expand Up @@ -134,6 +167,29 @@ abstract class EngineYarnModeSubmitter extends Logging {
}
}

private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
if (UserGroupInformation.isSecurityEnabled) {
val credentials = obtainHadoopFsDelegationToken()
val serializedCreds = KyuubiHadoopUtils.serializeCredentials(credentials)
amContainer.setTokens(ByteBuffer.wrap(serializedCreds))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the ApplicationMaster need to login even if the token is set on this block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WithoutUserGroupInformation.loginUserFromKeytab, the login user can only authenticate by tokens.
But HiveSQLEngine requires the login user to authenticate by kerberos tgt to obtain new tokens.

}
}

private def obtainHadoopFsDelegationToken(): Credentials = {
val tokenRenewer = Master.getMasterPrincipal(hadoopConf)
info(s"Delegation token renewer is: $tokenRenewer")

if (tokenRenewer == null || tokenRenewer.isEmpty) {
val errorMessage = "Can't get Master Kerberos principal for use as renewer."
error(errorMessage)
throw new KyuubiException(errorMessage)
}

val credentials = new Credentials()
FileSystem.get(hadoopConf).addDelegationTokens(tokenRenewer, credentials)
credentials
}

private def createContainerLaunchContext(): ContainerLaunchContext = {
info("Setting up container launch context for engine AM")
val env = setupLaunchEnv(kyuubiConf)
Expand Down Expand Up @@ -171,6 +227,7 @@ abstract class EngineYarnModeSubmitter extends Logging {
amContainer.setCommands(printableCommands.asJava)
info(s"Commands: ${printableCommands.mkString(" ")}")

setupSecurityToken(amContainer)
amContainer
}

Expand All @@ -187,6 +244,19 @@ abstract class EngineYarnModeSubmitter extends Logging {

distributeJars(localResources, env)
distributeConf(localResources, env)

// If we passed in a keytab, make sure we copy the keytab to the staging directory on
// HDFS, and setup the relevant environment vars, so the AM can login again.
amKeytabFileName.foreach { kt =>
info("To enable the AM to login from keytab, credentials are being copied over to the AM" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log seems a bit redundant, because in the Kerberos environment, we explicitly require users to configure this configuration item.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keytabs are security-sensitive files. I think it's better to tell user why we upload keytabs in place.

" via the YARN Secure Distributed Cache.")
distribute(
new Path(new File(keytab).toURI),
LocalResourceType.FILE,
kt,
localResources)
}

localResources
}

Expand Down Expand Up @@ -253,6 +323,7 @@ abstract class EngineYarnModeSubmitter extends Logging {
listDistinctFiles(yarnConf.get).foreach(putEntry)

val properties = confToProperties(kyuubiConf)
amKeytabFileName.foreach(kt => properties.put(ENGINE_KEYTAB.key, kt))
writePropertiesToArchive(properties, KYUUBI_CONF_FILE, confStream)
} finally {
confStream.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,21 @@ object KyuubiHadoopUtils extends Logging {
creds
}

def serializeCredentials(creds: Credentials): Array[Byte] = {
val byteStream = new ByteArrayOutputStream
val dataStream = new DataOutputStream(byteStream)
creds.writeTokenStorageToStream(dataStream)
byteStream.toByteArray
}

def deserializeCredentials(tokenBytes: Array[Byte]): Credentials = {
val tokensBuf = new ByteArrayInputStream(tokenBytes)

val creds = new Credentials()
creds.readTokenStorageStream(new DataInputStream(tokensBuf))
creds
}

/**
* Get [[Credentials#tokenMap]] by reflection as [[Credentials#getTokenMap]] is not present before
* Hadoop 3.2.1.
Expand Down
Loading
Loading