Skip to content

Commit

Permalink
Support run HiveSQLEngine on kerberized YARN
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouyifan279 committed Mar 19, 2024
1 parent 31469fa commit cb06025
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import java.io.File

import scala.collection.mutable.ListBuffer

import org.apache.hadoop.security.UserGroupInformation

import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_HIVE_EXTRA_CLASSPATH
import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter
import org.apache.kyuubi.engine.hive.HiveSQLEngine
Expand All @@ -29,7 +32,16 @@ object HiveYarnModeSubmitter extends EngineYarnModeSubmitter {

def main(args: Array[String]): Unit = {
Utils.fromCommandLineArgs(args, kyuubiConf)
submitApplication()

if (UserGroupInformation.isSecurityEnabled) {
require(
kyuubiConf.get(KyuubiConf.ENGINE_DEPLOY_YARN_PRINCIPAL).isDefined
&& kyuubiConf.get(KyuubiConf.ENGINE_DEPLOY_YARN_KEYTAB).isDefined,
s"${KyuubiConf.ENGINE_DEPLOY_YARN_PRINCIPAL.key} and " +
s"${KyuubiConf.ENGINE_DEPLOY_YARN_KEYTAB.key} must be set when submit " +
s"${HiveSQLEngine.getClass.getSimpleName.stripSuffix("$")} to YARN")
}
run()
}

override var engineType: String = "hive"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2822,6 +2822,20 @@ object KyuubiConf {
.stringConf
.createOptional

val ENGINE_DEPLOY_YARN_PRINCIPAL: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.yarn.principal")
.doc("Kerberos principal for the kyuubi engine when the engine deploy mode is YARN.")
.version("1.10.0")
.stringConf
.createOptional

val ENGINE_DEPLOY_YARN_KEYTAB: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.yarn.keytab")
.doc("Kerberos keytab for the kyuubi engine when the engine deploy mode is YARN.")
.version("1.10.0")
.stringConf
.createOptional

val ENGINE_FLINK_MEMORY: ConfigEntry[String] =
buildConf("kyuubi.engine.flink.memory")
.doc("The heap memory for the Flink SQL engine. Only effective in yarn session mode.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@
package org.apache.kyuubi.engine.deploy.yarn

import java.io.{File, IOException}
import java.security.PrivilegedExceptionAction

import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.fs.Path
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration

import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.{KyuubiException, Logging, Utils}
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.KyuubiHadoopUtils
import org.apache.kyuubi.util.command.CommandLineUtils.confKeyValues
Expand Down Expand Up @@ -71,7 +73,37 @@ object ApplicationMaster extends Logging {
unregister(finalStatus, finalMsg)
}
})
runApplicationMaster()

val ugi = kyuubiConf.get(KyuubiConf.ENGINE_DEPLOY_YARN_PRINCIPAL) match {
case Some(principalName) =>
val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
val keytabFilename = kyuubiConf.get(KyuubiConf.ENGINE_DEPLOY_YARN_KEYTAB).orNull
if (!new File(keytabFilename).exists()) {
throw new KyuubiException(s"Keytab file: ${keytabFilename} does not exist")
} else {
info("Attempting to login to Kerberos " +
s"using principal: ${principalName} and keytab: ${keytabFilename}")
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
}

val newUGI = UserGroupInformation.getCurrentUser()

// Transfer the original user's tokens to the new user, since it may contain needed tokens
// (such as those user to connect to YARN).
// TODO originalCreds may have been expired if Application retries.
newUGI.addCredentials(originalCreds)
newUGI
case _ =>
val appUser = kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY)
require(appUser.isDefined)
val newUGI = UserGroupInformation.createRemoteUser(appUser.get)
newUGI.addCredentials(UserGroupInformation.getCurrentUser.getCredentials)
newUGI
}

ugi.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = runApplicationMaster()
})
} catch {
case t: Throwable =>
error("Error running ApplicationMaster", t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package org.apache.kyuubi.engine.deploy.yarn

import java.io._
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.security.PrivilegedExceptionAction
import java.util
import java.util.{Locale, Properties}
import java.util.{Locale, Properties, UUID}
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConverters._
Expand All @@ -30,7 +32,8 @@ import scala.collection.mutable.ListBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records._
Expand All @@ -40,6 +43,7 @@ import org.apache.hadoop.yarn.util.Records
import org.apache.kyuubi.{KyuubiException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter._
import org.apache.kyuubi.util.KyuubiHadoopUtils

Expand Down Expand Up @@ -81,6 +85,9 @@ abstract class EngineYarnModeSubmitter extends Logging {

var yarnConf: Configuration = _
var hadoopConf: Configuration = _
var appUser: String = _
var keytab: String = _
var amKeytabFileName: Option[String] = _

var engineType: String

Expand All @@ -91,9 +98,36 @@ abstract class EngineYarnModeSubmitter extends Logging {
*/
def engineExtraJars(): Seq[File] = Seq.empty

protected def submitApplication(): Unit = {
def run(): Unit = {
yarnConf = KyuubiHadoopUtils.newYarnConfiguration(kyuubiConf)
hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf)
appUser = kyuubiConf.getOption(KYUUBI_SESSION_USER_KEY).orNull
require(appUser != null)
keytab = kyuubiConf.get(ENGINE_DEPLOY_YARN_KEYTAB).orNull
amKeytabFileName = if (keytab != null) {
val principal = kyuubiConf.get(ENGINE_DEPLOY_YARN_PRINCIPAL).orNull
require(
(principal == null) == (keytab == null),
"Both principal and keytab must be defined, or neither.")
info(s"Kerberos credentials: principal = $principal, keytab = $keytab")
// Generate a file name that can be used for the keytab file, that does not conflict
// with any user file.
Some(new File(keytab).getName() + "-" + UUID.randomUUID().toString)
} else {
None
}

val ugi = if (UserGroupInformation.getCurrentUser.getShortUserName == appUser) {
UserGroupInformation.getCurrentUser
} else {
UserGroupInformation.createProxyUser(appUser, UserGroupInformation.getCurrentUser)
}
ugi.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = submitApplication()
})
}

protected def submitApplication(): Unit = {
try {
yarnClient = YarnClient.createYarnClient()
yarnClient.init(yarnConf)
Expand Down Expand Up @@ -134,6 +168,29 @@ abstract class EngineYarnModeSubmitter extends Logging {
}
}

private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
if (UserGroupInformation.isSecurityEnabled) {
val credentials = obtainHadoopFsDelegationToken()
val serializedCreds = KyuubiHadoopUtils.serializeCredentials(credentials)
amContainer.setTokens(ByteBuffer.wrap(serializedCreds))
}
}

private def obtainHadoopFsDelegationToken(): Credentials = {
val tokenRenewer = Master.getMasterPrincipal(hadoopConf)
info("Delegation token renewer is: " + tokenRenewer)

if (tokenRenewer == null || tokenRenewer.isEmpty) {
val errorMessage = "Can't get Master Kerberos principal for use as renewer."
error(errorMessage)
throw new KyuubiException(errorMessage)
}

val credentials = new Credentials()
FileSystem.get(hadoopConf).addDelegationTokens(tokenRenewer, credentials)
credentials
}

private def createContainerLaunchContext(): ContainerLaunchContext = {
info("Setting up container launch context for engine AM")
val env = setupLaunchEnv(kyuubiConf)
Expand Down Expand Up @@ -171,6 +228,7 @@ abstract class EngineYarnModeSubmitter extends Logging {
amContainer.setCommands(printableCommands.asJava)
info(s"Commands: ${printableCommands.mkString(" ")}")

setupSecurityToken(amContainer)
amContainer
}

Expand All @@ -187,6 +245,20 @@ abstract class EngineYarnModeSubmitter extends Logging {

distributeJars(localResources, env)
distributeConf(localResources, env)

// If we passed in a keytab, make sure we copy the keytab to the staging directory on
// HDFS, and setup the relevant environment vars, so the AM can login again.
amKeytabFileName.foreach { kt =>
info("To enable the AM to login from keytab, credentials are being copied over to the AM" +
" via the YARN Secure Distributed Cache.")
// TODO Add credentials to YARN Secure Distributed Cache instead of HDFS.
distribute(
new Path(new File(keytab).toURI),
LocalResourceType.FILE,
kt,
localResources)
}

localResources
}

Expand Down Expand Up @@ -253,6 +325,7 @@ abstract class EngineYarnModeSubmitter extends Logging {
listDistinctFiles(yarnConf.get).foreach(putEntry)

val properties = confToProperties(kyuubiConf)
amKeytabFileName.foreach(kt => properties.put(ENGINE_DEPLOY_YARN_KEYTAB.key, kt))
writePropertiesToArchive(properties, KYUUBI_CONF_FILE, confStream)
} finally {
confStream.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,21 @@ object KyuubiHadoopUtils extends Logging {
creds
}

def serializeCredentials(creds: Credentials): Array[Byte] = {
val byteStream = new ByteArrayOutputStream
val dataStream = new DataOutputStream(byteStream)
creds.writeTokenStorageToStream(dataStream)
byteStream.toByteArray
}

def deserializeCredentials(tokenBytes: Array[Byte]): Credentials = {
val tokensBuf = new ByteArrayInputStream(tokenBytes)

val creds = new Credentials()
creds.readTokenStorageStream(new DataInputStream(tokensBuf))
creds
}

/**
* Get [[Credentials#tokenMap]] by reflection as [[Credentials#getTokenMap]] is not present before
* Hadoop 3.2.1.
Expand Down

0 comments on commit cb06025

Please sign in to comment.