Skip to content

Commit

Permalink
refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua committed Nov 15, 2021
1 parent cf1ad6b commit 92d7e09
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 21 deletions.
1 change: 1 addition & 0 deletions externals/kyuubi-flink-sql-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
<scope>provided</scope>
</dependency>

<!-- tests -->
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import org.apache.flink.runtime.util.EnvironmentInformation
import org.apache.flink.util.JarUtils

import org.apache.kyuubi.Logging
import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.Utils.{addShutdownHook, DEFAULT_SHUTDOWN_PRIORITY}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.flink.FlinkSQLEngine.countDownLatch
Expand All @@ -45,17 +45,14 @@ case class FlinkSQLEngine(engineContext: EngineContext) extends Serverable("Flin
override val backendService = new FlinkSQLBackendService(engineContext)
override val frontendServices = Seq(new FlinkThriftBinaryFrontendService(this))

override def initialize(conf: KyuubiConf): Unit = super.initialize(conf)

override protected def stopServer(): Unit = {
countDownLatch.countDown()
}

override def start(): Unit = {
super.start()
backendService.sessionManager.startTerminatingChecker()
}

override protected def stopServer(): Unit = {
countDownLatch.countDown()
}
}

object FlinkSQLEngine extends Logging {
Expand All @@ -76,37 +73,44 @@ object FlinkSQLEngine extends Logging {
if (engineOptions.isPrintHelp) {
EngineOptionsParser.printHelp()
} else {
// kyuubiConf.set(HA_ZK_ENGINE_REF_ID, System.getProperty("kyuubi.ha.engine.ref.id"))

val engineEnv = createEngineEnvironment(engineOptions)
val dependencies = FlinkSQLEngine.discoverDependencies(
engineOptions.getJars, engineOptions.getLibraryDirs)

val defaultContext = new EngineContext(engineEnv, dependencies)
val engineContext = new EngineContext(engineEnv, dependencies)

startEngine(defaultContext)
// scalastyle:off println
println("started engine...")
startEngine(engineContext)

// blocking main thread
countDownLatch.await()
}
} catch {
case t: Throwable if currentEngine.isDefined =>
currentEngine.foreach { engine =>
error(t)
engine.stop()
}
val engine = currentEngine.get
error(t)
engine.stop()
case t: Throwable =>
error("Create FlinkSQL Engine Failed", t)
error("Failed to launch Flink SQL Engine process: ", t)
}
}

def startEngine(engineContext: EngineContext): Unit = {
currentEngine = Some(new FlinkSQLEngine(engineContext))
currentEngine.foreach { engine =>
engine.initialize(kyuubiConf)
engine.start()
try {
engine.initialize(kyuubiConf)
} catch {
case t: Throwable =>
throw new KyuubiException(s"Failed to initialize FlinkSQLEngine: ${t.getMessage}", t)
}

try {
engine.start()
} catch {
case t: Throwable =>
throw new KyuubiException(s"Failed to start FlinkSQLEngine: ${t.getMessage}", t)
}

addShutdownHook(() => engine.stop(), DEFAULT_SHUTDOWN_PRIORITY + 1)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ class FlinkSQLSessionManager(engineContext: EngineContext)
newProperties +=
(EngineEnvironment.EXECUTION_ENTRY + "." + ExecutionEntry.EXECUTION_TYPE -> "batch")

val executionType = newProperties.getOrElse(
"executionType", ExecutionEntry.EXECUTION_TYPE_VALUE_BATCH)

// for batch mode we ensure that results are provided in materialized form
if ("executionType".equalsIgnoreCase(ExecutionEntry.EXECUTION_TYPE_VALUE_BATCH)) {
if (executionType.equalsIgnoreCase(ExecutionEntry.EXECUTION_TYPE_VALUE_BATCH)) {
newProperties += (
EngineEnvironment.EXECUTION_ENTRY + "." + ExecutionEntry.EXECUTION_RESULT_MODE ->
ExecutionEntry.EXECUTION_RESULT_MODE_VALUE_TABLE)
Expand Down

0 comments on commit 92d7e09

Please sign in to comment.