From 9d05fa7ad9f292d29dcff4a7b5b1ada40bb77a26 Mon Sep 17 00:00:00 2001 From: zhouyifan279 Date: Thu, 21 Mar 2024 13:42:48 +0800 Subject: [PATCH] Use principal & keytab in Local mode --- .../kyuubi/engine/hive/HiveSQLEngine.scala | 5 +-- .../deploy/yarn/ApplicationMaster.scala | 18 +++++++--- .../engine/hive/HiveProcessBuilder.scala | 35 ++++++++++++++++--- 3 files changed, 46 insertions(+), 12 deletions(-) diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala index c3f4785242f..f8813f1c621 100644 --- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala +++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala @@ -145,9 +145,10 @@ object HiveSQLEngine extends Logging { case DeployMode.LOCAL if principal.isDefined && keytab.isDefined => UserGroupInformation.loginUserFromKeytab(principal.get, keytab.get) val newUGI = UserGroupInformation.getCurrentUser - require(newUGI.getShortUserName == proxyUser.get, + require( + newUGI.getShortUserName == proxyUser.get, s"Engine proxy user: ${proxyUser.get} is not same with " + - s"engine principal: ${newUGI.getShortUserName}. ") + s"engine principal: ${newUGI.getShortUserName}. ") newUGI case DeployMode.LOCAL if proxyUser.get != realUser.getShortUserName => val newUGI = UserGroupInformation.createProxyUser(proxyUser.get, realUser) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMaster.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMaster.scala index ab1bb6afc55..3421cf9867e 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMaster.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMaster.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier import org.apache.kyuubi.{KyuubiException, Logging, Utils} import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys} @@ -79,18 +80,25 @@ object ApplicationMaster extends Logging { val originalCreds = UserGroupInformation.getCurrentUser().getCredentials() val keytabFilename = kyuubiConf.get(KyuubiConf.ENGINE_KEYTAB).orNull if (!new File(keytabFilename).exists()) { - throw new KyuubiException(s"Keytab file: ${keytabFilename} does not exist") + throw new KyuubiException(s"Keytab file: $keytabFilename does not exist") } else { info("Attempting to login to Kerberos " + - s"using principal: ${principalName} and keytab: ${keytabFilename}") + s"using principal: $principalName and keytab: $keytabFilename") UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename) } val newUGI = UserGroupInformation.getCurrentUser() - // Transfer the original user's tokens to the new user, since it may contain needed tokens - // (such as those user to connect to YARN). - newUGI.addCredentials(originalCreds) + // Transfer YARN_AM_RM_TOKEN to the new user. + // Not transfer other tokens, such as HDFS_DELEGATION_TOKEN, + // to avoid "org.apache.hadoop.ipc.RemoteException(java.io.IOException): + // Delegation Token can be issued only with kerberos or web authentication" + // when engine tries to obtain new tokens. + KyuubiHadoopUtils.getTokenMap(originalCreds).values + .find(_.getKind == AMRMTokenIdentifier.KIND_NAME) + .foreach { token => + newUGI.addToken(token) + } newUGI case _ => val appUser = kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala index 903e06575cc..be4da552cc4 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala @@ -17,16 +17,17 @@ package org.apache.kyuubi.engine.hive -import java.io.File +import java.io.{File, IOException} import java.nio.file.{Files, Paths} import scala.collection.mutable import com.google.common.annotations.VisibleForTesting +import org.apache.hadoop.security.UserGroupInformation import org.apache.kyuubi._ import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.config.KyuubiConf.{ENGINE_DEPLOY_YARN_MODE_APP_NAME, ENGINE_HIVE_DEPLOY_MODE, ENGINE_HIVE_EXTRA_CLASSPATH, ENGINE_HIVE_JAVA_OPTIONS, ENGINE_HIVE_MEMORY} +import org.apache.kyuubi.config.KyuubiConf.{ENGINE_DEPLOY_YARN_MODE_APP_NAME, ENGINE_HIVE_DEPLOY_MODE, ENGINE_HIVE_EXTRA_CLASSPATH, ENGINE_HIVE_JAVA_OPTIONS, ENGINE_HIVE_MEMORY, ENGINE_KEYTAB, ENGINE_PRINCIPAL} import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_ID, KYUUBI_SESSION_USER_KEY} import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder} import org.apache.kyuubi.engine.deploy.DeployMode @@ -121,19 +122,43 @@ object HiveProcessBuilder extends Logging { final val HIVE_ENGINE_NAME = "hive.engine.name" def apply( - appUser: String, + proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf, engineRefId: String, extraEngineLog: Option[OperationLog], defaultEngineName: String): HiveProcessBuilder = { + checkKeytab(proxyUser, conf) DeployMode.withName(conf.get(ENGINE_HIVE_DEPLOY_MODE)) match { - case LOCAL => new HiveProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog) + case LOCAL => + new HiveProcessBuilder(proxyUser, doAsEnabled, conf, engineRefId, extraEngineLog) case YARN => warn(s"Hive on YARN model is experimental.") conf.setIfMissing(ENGINE_DEPLOY_YARN_MODE_APP_NAME, Some(defaultEngineName)) - new HiveYarnModeProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog) + new HiveYarnModeProcessBuilder(proxyUser, doAsEnabled, conf, engineRefId, extraEngineLog) case other => throw new KyuubiException(s"Unsupported deploy mode: $other") } } + + private def checkKeytab(proxyUser: String, conf: KyuubiConf): Unit = { + val principal = conf.get(ENGINE_PRINCIPAL) + val keytab = conf.get(ENGINE_KEYTAB) + require( + principal.isDefined == keytab.isDefined, + "Both principal and keytab must be defined, or neither.") + if (principal.isDefined && keytab.isDefined) { + try { + val ugi = UserGroupInformation + .loginUserFromKeytabAndReturnUGI(principal.get, keytab.get) + require( + ugi.getShortUserName == proxyUser, + s"Proxy user: $proxyUser is not same with " + + s"engine principal: ${ugi.getShortUserName}.") + } catch { + case e: IOException => + error(s"Failed to login for ${principal.get}", e) + None + } + } + } }