Skip to content

Commit

Permalink
[KYUUBI #1340] Refactor ProcBuilder creation based on EngineType
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1341 from yanghua/KYUUBI-1340.

Closes #1340

aa0df59 [yanghua] Fixed test failure
ee2cd07 [yanghua] [KYUUBI #1340] Refactor ProcBuilder creation based on EngineType

Authored-by: yanghua <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
  • Loading branch information
yanghua authored and yaooqinn committed Nov 6, 2021
1 parent 04d4179 commit f9b933a
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.EngineType.EngineType
import org.apache.kyuubi.engine.EngineType.{EngineType, SPARK_SQL}
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, SERVER, ShareLevel}
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
Expand Down Expand Up @@ -179,13 +179,17 @@ private[kyuubi] class EngineRef(
var engineRef = getServerHost(zkClient, engineSpace)
if (engineRef.nonEmpty) return engineRef.get

conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
// tag is a seq type with comma-separated
conf.set(SparkProcessBuilder.TAG_KEY,
conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
conf.set(HA_ZK_NAMESPACE, engineSpace)
conf.set(HA_ZK_ENGINE_REF_ID, engineRefId)
val builder = new SparkProcessBuilder(appUser, conf)
val builder = engineType match {
case SPARK_SQL =>
conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
// tag is a seq type with comma-separated
conf.set(SparkProcessBuilder.TAG_KEY,
conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
new SparkProcessBuilder(appUser, conf)
case _ => throw new UnsupportedOperationException(s"Unsupported engine type: ${engineType}")
}
MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
try {
info(s"Launching engine:\n$builder")
Expand All @@ -209,7 +213,7 @@ private[kyuubi] class EngineRef(
process.destroyForcibly()
MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser)))
throw KyuubiSQLException(
s"Timeout($timeout ms) to launched Spark with $builder. $killMessage",
s"Timeout($timeout ms) to launched $engineType engine with $builder. $killMessage",
builder.getError)
}
engineRef = getEngineByRefId(zkClient, engineSpace, engineRefId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class EngineRefSuite extends KyuubiFunSuite {
test("start and get engine address with lock") {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test")
conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString)
Expand Down

0 comments on commit f9b933a

Please sign in to comment.