Skip to content

Commit

Permalink
[KYUUBI apache#1608] Introduce FlinkSQLEngine and FlinkProcessBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua committed Dec 23, 2021
1 parent 3673399 commit 15e67a5
Show file tree
Hide file tree
Showing 7 changed files with 512 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.flink.result;

import java.util.Arrays;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.cli.CliOptions;
import org.apache.flink.table.client.cli.CliOptionsParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Utility class for reading engine environment file. */
public class EngineEnvironmentUtil {

private static final Logger LOG = LoggerFactory.getLogger(EngineEnvironmentUtil.class);

public static final String MODE_EMBEDDED = "embedded";

public static void checkFlinkVersion() {
String flinkVersion = EnvironmentInformation.getVersion();
if (!flinkVersion.startsWith("1.14")) {
LOG.error("Only Flink-1.14 is supported now!");
throw new RuntimeException("Only Flink-1.14 is supported now!");
}
}

public static CliOptions parseCliOptions(String[] args) {
final String mode;
final String[] modeArgs;
if (args.length < 1 || args[0].startsWith("-")) {
// mode is not specified, use the default `embedded` mode
mode = MODE_EMBEDDED;
modeArgs = args;
} else {
// mode is specified, extract the mode value and reaming args
mode = args[0];
// remove mode
modeArgs = Arrays.copyOfRange(args, 1, args.length);
}

final CliOptions options = CliOptionsParser.parseEmbeddedModeClient(modeArgs);

switch (mode) {
case MODE_EMBEDDED:
if (options.isPrintHelp()) {
CliOptionsParser.printHelpEmbeddedModeClient();
}
break;

default:
throw new SqlClientException("Other mode is not supported yet.");
}

return options;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.flink

import org.apache.flink.table.client.gateway.context.DefaultContext

import org.apache.kyuubi.engine.flink.session.FlinkSQLSessionManager
import org.apache.kyuubi.service.AbstractBackendService
import org.apache.kyuubi.session.SessionManager

class FlinkSQLBackendService(engineContext: DefaultContext)
extends AbstractBackendService("FlinkSQLBackendService") {

override val sessionManager: SessionManager = new FlinkSQLSessionManager(engineContext)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.flink

import java.util.concurrent.CountDownLatch

import org.apache.flink.table.client.gateway.context.DefaultContext
import org.apache.flink.table.client.gateway.local.LocalContextUtils

import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.Utils.{addShutdownHook, DEFAULT_SHUTDOWN_PRIORITY}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, currentEngine}
import org.apache.kyuubi.engine.flink.result.EngineEnvironmentUtil
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.SignalRegister

/**
* A flink sql engine just like an instance of Flink SQL Engine.
*/
case class FlinkSQLEngine(engineContext: DefaultContext) extends Serverable("FlinkSQLEngine") {

override val backendService = new FlinkSQLBackendService(engineContext)
override val frontendServices = Seq(new FlinkThriftBinaryFrontendService(this))

override def start(): Unit = {
super.start()
backendService.sessionManager.startTerminatingChecker(() => {
assert(currentEngine.isDefined)
currentEngine.get.stop()
})
}

override protected def stopServer(): Unit = {
countDownLatch.countDown()
}
}

object FlinkSQLEngine extends Logging {

val kyuubiConf: KyuubiConf = KyuubiConf()
var currentEngine: Option[FlinkSQLEngine] = None

private val countDownLatch = new CountDownLatch(1)

def main(args: Array[String]): Unit = {
SignalRegister.registerLogger(logger)

EngineEnvironmentUtil.checkFlinkVersion()

try {
val cliOptions = EngineEnvironmentUtil.parseCliOptions(args)

val defaultContext = LocalContextUtils.buildDefaultContext(cliOptions)

kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)

startEngine(defaultContext)

// blocking main thread
countDownLatch.await()
} catch {
case t: Throwable if currentEngine.isDefined =>
val engine = currentEngine.get
error(t)
engine.stop()
case t: Throwable =>
error("Failed to launch Flink SQL Engine process: ", t)
}
}

def startEngine(engineContext: DefaultContext): Unit = {
currentEngine = Some(new FlinkSQLEngine(engineContext))
currentEngine.foreach { engine =>
try {
engine.initialize(kyuubiConf)
} catch {
case t: Throwable =>
throw new KyuubiException(s"Failed to initialize FlinkSQLEngine: ${t.getMessage}", t)
}

try {
engine.start()
} catch {
case t: Throwable =>
throw new KyuubiException(s"Failed to start FlinkSQLEngine: ${t.getMessage}", t)
}

addShutdownHook(() => engine.stop(), DEFAULT_SHUTDOWN_PRIORITY + 1)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.flink

import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration}
import org.apache.flink.table.client.cli.{CliOptions, CliOptionsParser}
import org.apache.flink.table.client.gateway.local.LocalContextUtils

import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf

trait WithFlinkSQLEngine extends KyuubiFunSuite {

protected val flinkConfig = new Configuration()
protected var miniCluster: MiniCluster = _
protected var engine: FlinkSQLEngine = _
// conf will be loaded until start flink engine
def withKyuubiConf: Map[String, String]
val kyuubiConf: KyuubiConf = FlinkSQLEngine.kyuubiConf

protected var connectionUrl: String = _

override def beforeAll(): Unit = {
super.beforeAll()
startMiniCluster()
}

override def afterAll(): Unit = {
super.afterAll()
miniCluster.close()
}

def startFlinkEngine(): Unit = {
withKyuubiConf.foreach { case (k, v) =>
System.setProperty(k, v)
kyuubiConf.set(k, v)
}

val cliOptions: CliOptions = CliOptionsParser.parseEmbeddedModeClient(Array())
val engineContext = LocalContextUtils.buildDefaultContext(cliOptions)

FlinkSQLEngine.startEngine(engineContext)
engine = FlinkSQLEngine.currentEngine.get
connectionUrl = engine.frontendServices.head.connectionUrl
}

def stopFlinkEngine(): Unit = {
if (engine != null) {
engine.stop()
engine = null
}
}

private def startMiniCluster(): Unit = {
val cfg = new MiniClusterConfiguration.Builder()
.setConfiguration(flinkConfig)
.setNumSlotsPerTaskManager(1)
.build
miniCluster = new MiniCluster(cfg)
miniCluster.start()
flinkConfig.setString(RestOptions.ADDRESS, miniCluster.getRestAddress.get().getHost)
flinkConfig.setInteger(RestOptions.PORT, miniCluster.getRestAddress.get().getPort)
}

protected def getJdbcUrl: String = s"jdbc:hive2://$connectionUrl/;"

}
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,14 @@ object KyuubiConf {
.stringConf
.createOptional

val ENGINE_FLINK_MAIN_RESOURCE: OptionalConfigEntry[String] =
buildConf("session.engine.flink.main.resource")
.doc("The package used to create Flink SQL engine remote application. If it is undefined," +
" Kyuubi will use the default")
.version("1.5.0")
.stringConf
.createOptional

val ENGINE_LOGIN_TIMEOUT: ConfigEntry[Long] = buildConf("session.engine.login.timeout")
.doc("The timeout of creating the connection to remote sql query engine")
.version("1.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.EngineType.{EngineType, SPARK_SQL}
import org.apache.kyuubi.engine.EngineType.{EngineType, FLINK_SQL, SPARK_SQL}
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, SERVER, ShareLevel}
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
Expand Down Expand Up @@ -184,6 +185,8 @@ private[kyuubi] class EngineRef(
SparkProcessBuilder.TAG_KEY,
conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
new SparkProcessBuilder(appUser, conf, extraEngineLog)
case FLINK_SQL =>
new FlinkProcessBuilder(appUser, conf)
case _ => throw new UnsupportedOperationException(s"Unsupported engine type: ${engineType}")
}
MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
Expand Down
Loading

0 comments on commit 15e67a5

Please sign in to comment.