diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 2820f226fed..713a6d008d5 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -110,6 +110,7 @@ jobs:
name: unit-tests-log
path: |
**/target/unit-tests.log
+ **/kyuubi-flink-sql-engine.log*
**/kyuubi-spark-sql-engine.log*
**/target/scalastyle-output.xml
diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml
index 501d93425fb..d44ab6eec9c 100644
--- a/.github/workflows/nightly.yml
+++ b/.github/workflows/nightly.yml
@@ -54,4 +54,5 @@ jobs:
name: unit-tests-log
path: |
**/target/unit-tests.log
+ **/kyuubi-flink-sql-engine.log*
**/kyuubi-spark-sql-engine.log*
diff --git a/build/dist b/build/dist
index 5ab4b04f44c..7f585c2ac01 100755
--- a/build/dist
+++ b/build/dist
@@ -30,7 +30,7 @@ set -e
KYUUBI_HOME="$(cd "`dirname "$0"`/.."; pwd)"
DISTDIR="$KYUUBI_HOME/dist"
MAKE_TGZ=false
-# TODO: add FLINK_PROVIDED option
+FLINK_PROVIDED=false
SPARK_PROVIDED=false
NAME=none
MVN="$KYUUBI_HOME/build/mvn"
@@ -62,6 +62,9 @@ while (( "$#" )); do
--tgz)
MAKE_TGZ=true
;;
+ --flink-provided)
+ FLINK_PROVIDED=true
+ ;;
--spark-provided)
SPARK_PROVIDED=true
;;
@@ -124,6 +127,11 @@ SCALA_VERSION=$("$MVN" help:evaluate -Dexpression=scala.binary.version $@ 2>/dev
| grep -v "WARNING"\
| tail -n 1)
+FLINK_VERSION=$("$MVN" help:evaluate -Dexpression=flink.version $@ 2>/dev/null\
+ | grep -v "INFO"\
+ | grep -v "WARNING"\
+ | tail -n 1)
+
SPARK_VERSION=$("$MVN" help:evaluate -Dexpression=spark.version $@ 2>/dev/null\
| grep -v "INFO"\
| grep -v "WARNING"\
@@ -144,7 +152,7 @@ HIVE_VERSION=$("$MVN" help:evaluate -Dexpression=hive.version $@ 2>/dev/null\
| grep -v "WARNING"\
| tail -n 1)
-echo "Building Kyuubi package of version $VERSION against Spark version - $SPARK_VERSION"
+echo "Building Kyuubi package of version $VERSION against Flink $FLINK_VERSION, Spark $SPARK_VERSION"
SUFFIX="-$NAME"
if [[ "$NAME" == "none" ]]; then
@@ -163,7 +171,7 @@ fi
MVN_DIST_OPT="-DskipTests"
if [[ "$SPARK_PROVIDED" == "true" ]]; then
- MVN_DIST_OPT="$MVN_DIST_OPT -Pspark-provided"
+ MVN_DIST_OPT="$MVN_DIST_OPT -Pflink-provided,spark-provided"
fi
BUILD_COMMAND=("$MVN" clean install $MVN_DIST_OPT $@)
@@ -178,6 +186,8 @@ rm -rf "$DISTDIR"
mkdir -p "$DISTDIR/pid"
mkdir -p "$DISTDIR/logs"
mkdir -p "$DISTDIR/work"
+mkdir -p "$DISTDIR/externals/engines/flink"
+mkdir -p "$DISTDIR/externals/engines/flink/lib"
mkdir -p "$DISTDIR/externals/engines/spark"
mkdir -p "$DISTDIR/beeline-jars"
echo "Kyuubi $VERSION $GITREVSTRING built for" > "$DISTDIR/RELEASE"
@@ -204,6 +214,12 @@ for jar in $(ls "$DISTDIR/jars/"); do
done
cd -
+# Copy flink engines
+cp -r "$KYUUBI_HOME/externals/kyuubi-flink-sql-engine/bin/" "$DISTDIR/externals/engines/flink/bin/"
+chmod a+x "$DISTDIR/externals/engines/flink/bin/flink-sql-engine.sh"
+cp -r "$KYUUBI_HOME/externals/kyuubi-flink-sql-engine/conf/" "$DISTDIR/externals/engines/flink/conf/"
+cp "$KYUUBI_HOME/externals/kyuubi-flink-sql-engine/target/kyuubi-flink-sql-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/flink/lib"
+
# Copy spark engines
cp "$KYUUBI_HOME/externals/kyuubi-spark-sql-engine/target/kyuubi-spark-sql-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/spark"
@@ -224,6 +240,12 @@ for SPARK_EXTENSION_VERSION in ${SPARK_EXTENSION_VERSIONS[@]}; do
fi
done
+if [[ "$FLINK_PROVIDED" != "true" ]]; then
+ # Copy flink binary dist
+ cp -r "$KYUUBI_HOME/externals/kyuubi-download/target/flink-$FLINK_VERSION/" \
+ "$DISTDIR/externals/flink-$FLINK_VERSION/"
+fi
+
if [[ "$SPARK_PROVIDED" != "true" ]]; then
# Copy spark binary dist
cp -r "$KYUUBI_HOME/externals/kyuubi-download/target/spark-$SPARK_VERSION-bin-hadoop${SPARK_HADOOP_VERSION}$HIVE_VERSION_SUFFIX/" \
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 10310a53c93..f47fb5320d0 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -300,6 +300,7 @@ kyuubi\.session\.check
\.interval|
|A comma separated list of ignored keys. If the client connection contains any of them, the key and the corresponding value will be removed silently during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax.
|seq
|1.2.0
kyuubi\.session\.conf
\.restrict\.list||A comma separated list of restricted keys. If the client connection contains any of them, the connection will be rejected explicitly during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax.
|seq
|1.2.0
kyuubi\.session\.engine
\.check\.interval|PT1M
|The check interval for engine timeout
|duration
|1.0.0
+kyuubi\.session\.engine
\.flink\.main\.resource|<undefined>
|The package used to create Flink SQL engine remote job. If it is undefined, Kyuubi will use the default
|string
|1.4.0
kyuubi\.session\.engine
\.idle\.timeout|PT30M
|engine timeout, the engine will self-terminate when it's not accessed for this duration. 0 or negative means not to self-terminate.
|duration
|1.0.0
kyuubi\.session\.engine
\.initialize\.timeout|PT3M
|Timeout for starting the background engine, e.g. SparkSQLEngine.
|duration
|1.0.0
kyuubi\.session\.engine
\.launch\.async|true
|When opening kyuubi session, whether to launch backend engine asynchronously. When true, the Kyuubi server will set up the connection with the client without delay as the backend engine will be created asynchronously.
|boolean
|1.4.0
diff --git a/externals/kyuubi-flink-sql-engine/bin/flink-sql-engine.sh b/externals/kyuubi-flink-sql-engine/bin/flink-sql-engine.sh
new file mode 100755
index 00000000000..0cdcd11ae66
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/bin/flink-sql-engine.sh
@@ -0,0 +1,69 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+################################################################################
+# Adopted from "flink" bash script
+################################################################################
+
+if [[ -z "$FLINK_HOME" || ! -d "$FLINK_HOME" ]]; then
+ (>&2 echo "Invalid FLINK_HOME: ${FLINK_HOME:-unset}")
+ exit 1
+fi
+
+FLINK_SQL_ENGINE_HOME="$(cd `dirname $0`/..; pwd)"
+if [[ "$FLINK_SQL_ENGINE_HOME" == "$KYUUBI_HOME/externals/engines/flink" ]]; then
+ FLINK_SQL_ENGINE_CONF_DIR="$FLINK_SQL_ENGINE_HOME/conf"
+ FLINK_SQL_ENGINE_LIB_DIR="$FLINK_SQL_ENGINE_HOME/lib"
+ FLINK_SQL_ENGINE_LOG_DIR="$KYUUBI_LOG_DIR"
+ FLINK_SQL_ENGINE_JAR=$(find "$FLINK_SQL_ENGINE_LIB_DIR" -regex ".*/kyuubi-flink-sql-engine_.*\.jar")
+ FLINK_HADOOP_CLASSPATH="$INTERNAL_HADOOP_CLASSPATHS"
+else
+ echo -e "\nFLINK_SQL_ENGINE_HOME $FLINK_SQL_ENGINE_HOME doesn't match production directory, assuming in development environment..."
+ FLINK_SQL_ENGINE_CONF_DIR="$FLINK_SQL_ENGINE_HOME/conf"
+ FLINK_SQL_ENGINE_LIB_DIR="$FLINK_SQL_ENGINE_HOME/target"
+ FLINK_SQL_ENGINE_LOG_DIR="$FLINK_SQL_ENGINE_HOME/target"
+ FLINK_SQL_ENGINE_JAR=$(find "$FLINK_SQL_ENGINE_LIB_DIR" -regex '.*/kyuubi-flink-sql-engine_.*\.jar$' | grep -v '\-javadoc.jar$' | grep -v '\-tests.jar$')
+ _FLINK_SQL_ENGINE_HADOOP_CLIENT_JARS=$(find $FLINK_SQL_ENGINE_LIB_DIR -regex '.*/hadoop-client-.*\.jar$' | tr '\n' ':')
+ FLINK_HADOOP_CLASSPATH="${_FLINK_SQL_ENGINE_HADOOP_CLIENT_JARS%:}"
+fi
+
+# do NOT let config.sh detect FLINK_HOME
+_FLINK_HOME_DETERMINED=1 . "$FLINK_HOME/bin/config.sh"
+
+FLINK_IDENT_STRING=${FLINK_IDENT_STRING:-"$USER"}
+FLINK_SQL_CLIENT_JAR=$(find "$FLINK_OPT_DIR" -regex ".*flink-sql-client.*.jar")
+CC_CLASSPATH=`constructFlinkClassPath`
+
+FULL_CLASSPATH="$FLINK_SQL_ENGINE_JAR:$FLINK_SQL_CLIENT_JAR:$CC_CLASSPATH:$FLINK_HADOOP_CLASSPATH"
+
+log_file="$FLINK_SQL_ENGINE_LOG_DIR/kyuubi-flink-sql-engine-$FLINK_IDENT_STRING-$HOSTNAME.log"
+log_setting=(
+ -Dlog.file="$log_file"
+ -Dlog4j.configurationFile=file:"$FLINK_SQL_ENGINE_CONF_DIR/log4j.properties"
+ -Dlog4j.configuration=file:"$FLINK_SQL_ENGINE_CONF_DIR/log4j.properties"
+ -Dlogback.configurationFile=file:"$FLINK_SQL_ENGINE_CONF_DIR/logback.xml"
+)
+
+if [ -n "$FLINK_SQL_ENGINE_JAR" ]; then
+ exec $JAVA_RUN ${FLINK_SQL_ENGINE_DYNAMIC_ARGS} "${log_setting[@]}" -cp ${FULL_CLASSPATH} \
+ org.apache.kyuubi.engine.flink.FlinkSQLEngine "$@"
+else
+ (>&2 echo "[ERROR] Flink SQL Engine JAR file 'kyuubi-flink-sql-engine*.jar' should be located in $FLINK_SQL_ENGINE_LIB_DIR.")
+ exit 1
+fi
diff --git a/externals/kyuubi-flink-sql-engine/conf/log4j.properties b/externals/kyuubi-flink-sql-engine/conf/log4j.properties
new file mode 100644
index 00000000000..3e7cd9ac049
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/conf/log4j.properties
@@ -0,0 +1,43 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This affects logging for both kyuubi-flink-sql-engine and Flink
+log4j.rootLogger=INFO, CA
+
+#Console Appender
+log4j.appender.CA=org.apache.log4j.ConsoleAppender
+log4j.appender.CA.layout=org.apache.log4j.PatternLayout
+log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
+log4j.appender.CA.Threshold = FATAL
+
+# Log all infos in the given file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.file=${log.file}
+log4j.appender.file.append=false
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+#File Appender
+log4j.appender.FA=org.apache.log4j.FileAppender
+log4j.appender.FA.append=false
+log4j.appender.FA.file=target/unit-tests.log
+log4j.appender.FA.layout=org.apache.log4j.PatternLayout
+log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{2}: %m%n
+
+# Set the logger level of File Appender to DEBUG
+log4j.appender.FA.Threshold = DEBUG
diff --git a/externals/kyuubi-flink-sql-engine/conf/logback.xml b/externals/kyuubi-flink-sql-engine/conf/logback.xml
new file mode 100644
index 00000000000..8652fd35e3c
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/conf/logback.xml
@@ -0,0 +1,32 @@
+
+
+
+
+ ${log.file}
+ false
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n
+
+
+
+
+
+
+
+
diff --git a/externals/kyuubi-flink-sql-engine/pom.xml b/externals/kyuubi-flink-sql-engine/pom.xml
index fd6f8b19963..2f684cad274 100644
--- a/externals/kyuubi-flink-sql-engine/pom.xml
+++ b/externals/kyuubi-flink-sql-engine/pom.xml
@@ -147,4 +147,116 @@
+
+ target/scala-${scala.binary.version}/classes
+ target/scala-${scala.binary.version}/test-classes
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+ false
+
+
+ org.apache.kyuubi:kyuubi-common_${scala.binary.version}
+ org.apache.kyuubi:kyuubi-ha_${scala.binary.version}
+ com.fasterxml.jackson.core:*
+ com.fasterxml.jackson.module:*
+ com.google.guava:failureaccess
+ com.google.guava:guava
+ commons-codec:commons-codec
+ io.netty:netty-all
+ org.apache.commons:commons-lang3
+ org.apache.curator:curator-client
+ org.apache.curator:curator-framework
+ org.apache.curator:curator-recipes\
+ org.apache.hive:hive-service-rpc
+ org.apache.thrift:*
+ org.apache.zookeeper:*
+
+
+
+
+ com.fasterxml.jackson
+ ${kyuubi.shade.packageName}.com.fasterxml.jackson
+
+ com.fasterxml.jackson.**
+
+
+
+ org.apache.curator
+ ${kyuubi.shade.packageName}.org.apache.curator
+
+ org.apache.curator.**
+
+
+
+ com.google.common
+ ${kyuubi.shade.packageName}.com.google.common
+
+ com.google.common.**
+
+
+
+ org.apache.commons
+ ${kyuubi.shade.packageName}.org.apache.commons
+
+ org.apache.commons.**
+
+
+
+ io.netty
+ ${kyuubi.shade.packageName}.io.netty
+
+ io.netty.**
+
+
+
+ org.apache.hive.service.rpc.thrift
+ ${kyuubi.shade.packageName}.org.apache.hive.service.rpc.thrift
+
+ org.apache.hive.service.rpc.thrift.**
+
+
+
+ org.apache.thrift
+ ${kyuubi.shade.packageName}.org.apache.thrift
+
+ org.apache.thrift.**
+
+
+
+ org.apache.zookeeper
+ ${kyuubi.shade.packageName}.org.apache.zookeeper
+
+ org.apache.zookeeper.**
+
+
+
+
+
+
+ package
+
+ shade
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+ prepare-test-jar
+ test-compile
+
+ test-jar
+
+
+
+
+
+
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java
index 6496d847f3a..617b9f0e42b 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java
@@ -21,5 +21,40 @@
/** Constant column names. */
public class Constants {
+ // for statement execution
+ public static final String JOB_ID = "job_id";
+
+ // for results with SUCCESS result kind
+ public static final String RESULT = "result";
+ public static final String OK = "OK";
+
+ public static final String SHOW_MODULES_RESULT = "modules";
+
+ public static final String SHOW_CURRENT_CATALOG_RESULT = "catalog";
+
public static final String SHOW_CATALOGS_RESULT = "catalogs";
+
+ public static final String SHOW_CURRENT_DATABASE_RESULT = "database";
+
+ public static final String SHOW_DATABASES_RESULT = "databases";
+
+ public static final String SHOW_FUNCTIONS_RESULT = "functions";
+
+ public static final String EXPLAIN_RESULT = "explanation";
+
+ public static final String DESCRIBE_NAME = "name";
+ public static final String DESCRIBE_TYPE = "type";
+ public static final String DESCRIBE_NULL = "null";
+ public static final String DESCRIBE_KEY = "key";
+ public static final String DESCRIBE_COMPUTED_COLUMN = "computed_column";
+ public static final String DESCRIBE_WATERMARK = "watermark";
+
+ public static final String SHOW_TABLES_RESULT = "tables";
+
+ public static final String SHOW_VIEWS_RESULT = "views";
+
+ public static final String SET_KEY = "key";
+ public static final String SET_VALUE = "value";
+
+ public static final String[] SUPPORTED_TABLE_TYPES = new String[] {"TABLE", "VIEW"};
}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultKind.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultKind.java
index 8fb51c28a82..401bd6d36ed 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultKind.java
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultKind.java
@@ -20,6 +20,9 @@
/** ResultKind defines the types of the result. */
public enum ResultKind {
+ // for DDL, DCL and statements with a simple "OK"
+ SUCCESS,
+
// rows with important content are available (DML, DQL)
SUCCESS_WITH_CONTENT
}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
new file mode 100644
index 00000000000..fa122b16228
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink
+
+import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.flink.table.client.SqlClientException
+import org.apache.flink.table.client.cli.{CliOptions, CliOptionsParser}
+import org.apache.flink.table.client.gateway.context.SessionContext
+import org.apache.flink.table.client.gateway.local.LocalExecutor
+
+import org.apache.kyuubi.Logging
+
+object FlinkEngineUtils extends Logging {
+
+ val MODE_EMBEDDED = "embedded"
+
+ def checkFlinkVersion(): Unit = {
+ val flinkVersion = EnvironmentInformation.getVersion
+ if (!flinkVersion.startsWith("1.14")) {
+ throw new RuntimeException("Only Flink-1.14.x is supported now!")
+ }
+ }
+
+ def parseCliOptions(args: Array[String]): CliOptions = {
+ val (mode, modeArgs) =
+ if (args.isEmpty || args(0).startsWith("-")) (MODE_EMBEDDED, args)
+ else (args(0), args.drop(1))
+ // TODO remove requirement of flink-python
+ val options = CliOptionsParser.parseEmbeddedModeClient(modeArgs)
+ mode match {
+ case MODE_EMBEDDED if options.isPrintHelp => CliOptionsParser.printHelpEmbeddedModeClient()
+ case MODE_EMBEDDED =>
+ case _ => throw new SqlClientException("Other mode is not supported yet.")
+ }
+ options
+ }
+
+ def getSessionContext(localExecutor: LocalExecutor, sessionId: String): SessionContext = {
+ val method = classOf[LocalExecutor].getDeclaredMethod("getSessionContext", classOf[String])
+ method.setAccessible(true)
+ method.invoke(localExecutor, sessionId).asInstanceOf[SessionContext]
+ }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLBackendService.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLBackendService.scala
new file mode 100644
index 00000000000..d049e3c80bf
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLBackendService.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink
+
+import org.apache.flink.table.client.gateway.context.DefaultContext
+
+import org.apache.kyuubi.engine.flink.session.FlinkSQLSessionManager
+import org.apache.kyuubi.service.AbstractBackendService
+import org.apache.kyuubi.session.SessionManager
+
+class FlinkSQLBackendService(engineContext: DefaultContext)
+ extends AbstractBackendService("FlinkSQLBackendService") {
+
+ override val sessionManager: SessionManager = new FlinkSQLSessionManager(engineContext)
+
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
new file mode 100644
index 00000000000..48699b4ab20
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink
+
+import java.util.concurrent.CountDownLatch
+
+import scala.collection.JavaConverters._
+
+import org.apache.flink.client.cli.{CliFrontend, CustomCommandLine, DefaultCLI}
+import org.apache.flink.configuration.GlobalConfiguration
+import org.apache.flink.table.client.gateway.context.DefaultContext
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.Utils.{addShutdownHook, FLINK_ENGINE_SHUTDOWN_PRIORITY}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, currentEngine}
+import org.apache.kyuubi.service.Serverable
+import org.apache.kyuubi.util.SignalRegister
+
+case class FlinkSQLEngine(engineContext: DefaultContext) extends Serverable("FlinkSQLEngine") {
+
+ override val backendService = new FlinkSQLBackendService(engineContext)
+ override val frontendServices = Seq(new FlinkThriftBinaryFrontendService(this))
+
+ override def initialize(conf: KyuubiConf): Unit = super.initialize(conf)
+
+ override protected def stopServer(): Unit = {
+ countDownLatch.countDown()
+ }
+
+ override def start(): Unit = {
+ super.start()
+ backendService.sessionManager.startTerminatingChecker { () =>
+ assert(currentEngine.isDefined)
+ currentEngine.get.stop()
+ }
+ }
+
+ override def stop(): Unit = {
+ super.stop()
+ }
+
+}
+
+object FlinkSQLEngine extends Logging {
+
+ val kyuubiConf: KyuubiConf = KyuubiConf()
+ var currentEngine: Option[FlinkSQLEngine] = None
+
+ private val countDownLatch = new CountDownLatch(1)
+
+ def main(args: Array[String]): Unit = {
+ SignalRegister.registerLogger(logger)
+
+ FlinkEngineUtils.checkFlinkVersion()
+
+ try {
+ val flinkConfDir = CliFrontend.getConfigurationDirectoryFromEnv
+ val flinkConf = GlobalConfiguration.loadConfiguration(flinkConfDir)
+ val engineContext = new DefaultContext(
+ List.empty.asJava,
+ flinkConf,
+ List[CustomCommandLine](new DefaultCLI).asJava)
+
+ kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+
+ startEngine(engineContext)
+ info("started engine...")
+
+ // blocking main thread
+ countDownLatch.await()
+ } catch {
+ case t: Throwable if currentEngine.isDefined =>
+ currentEngine.foreach { engine =>
+ error(t)
+ engine.stop()
+ }
+ case t: Throwable =>
+ error("Create FlinkSQL Engine Failed", t)
+ }
+ }
+
+ def startEngine(engineContext: DefaultContext): Unit = {
+ currentEngine = Some(new FlinkSQLEngine(engineContext))
+ currentEngine.foreach { engine =>
+ engine.initialize(kyuubiConf)
+ engine.start()
+ addShutdownHook(() => engine.stop(), FLINK_ENGINE_SHUTDOWN_PRIORITY + 1)
+ }
+ }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 818d1684347..0313e435157 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -54,9 +54,7 @@ class ExecuteStatement(
override def getOperationLog: Option[OperationLog] = Option(operationLog)
@VisibleForTesting
- override def setExecutor(executor: Executor): Unit = {
- this.executor = executor
- }
+ override def setExecutor(executor: Executor): Unit = super.setExecutor(executor)
def setSessionId(sessionId: String): Unit = {
this.sessionId = sessionId
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
index a3565362e3b..8821bea0655 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -42,17 +42,15 @@ abstract class FlinkOperation(
extends AbstractOperation(opType, session) {
protected val sessionContext: SessionContext = {
- session.asInstanceOf[FlinkSessionImpl].getSessionContext
+ session.asInstanceOf[FlinkSessionImpl].sessionContext
}
- protected var executor: Executor = _
+ protected var executor: Executor = session.asInstanceOf[FlinkSessionImpl].executor
- protected def setExecutor(executor: Executor): Unit = {
- this.executor = session.asInstanceOf[FlinkSessionImpl].getExecutor
- }
+ protected def setExecutor(executor: Executor): Unit = this.executor = executor
protected var sessionId: String = {
- session.asInstanceOf[FlinkSessionImpl].getSessionId
+ session.asInstanceOf[FlinkSessionImpl].sessionId
}
protected var resultSet: ResultSet = _
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
index 3439805c0b0..fd7c79e0143 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
@@ -17,11 +17,11 @@
package org.apache.kyuubi.engine.flink.schema
+import java.{lang, util}
import java.nio.ByteBuffer
-import java.util
import java.util.Collections
-import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, seqAsJavaListConverter}
+import scala.collection.JavaConverters._
import scala.language.implicitConversions
import org.apache.flink.table.types.logical._
@@ -71,60 +71,61 @@ object RowSet {
val logicalType = resultSet.getColumns.get(ordinal).getLogicalType
- if (logicalType.isInstanceOf[BooleanType]) {
- val boolValue = new TBoolValue
- if (row.getField(ordinal) != null) {
- boolValue.setValue(row.getField(ordinal).asInstanceOf[Boolean])
- }
- TColumnValue.boolVal(boolValue)
- } else if (logicalType.isInstanceOf[TinyIntType]) {
- val tI16Value = new TI16Value
- if (row.getField(ordinal) != null) {
- tI16Value.setValue(row.getField(ordinal).asInstanceOf[Short])
- }
- TColumnValue.i16Val(tI16Value)
- } else if (logicalType.isInstanceOf[IntType]) {
- val tI32Value = new TI32Value
- if (row.getField(ordinal) != null) {
- tI32Value.setValue(row.getField(ordinal).asInstanceOf[Short])
- }
- TColumnValue.i32Val(tI32Value)
- } else if (logicalType.isInstanceOf[BigIntType]) {
- val tI64Value = new TI64Value
- if (row.getField(ordinal) != null) {
- tI64Value.setValue(row.getField(ordinal).asInstanceOf[Long])
- }
- TColumnValue.i64Val(tI64Value)
- } else if (logicalType.isInstanceOf[FloatType]) {
- val tDoubleValue = new TDoubleValue
- if (row.getField(ordinal) != null) {
- tDoubleValue.setValue(row.getField(ordinal).asInstanceOf[Float])
- }
- TColumnValue.doubleVal(tDoubleValue)
- } else if (logicalType.isInstanceOf[DoubleType]) {
- val tDoubleValue = new TDoubleValue
- if (row.getField(ordinal) != null) {
- tDoubleValue.setValue(row.getField(ordinal).asInstanceOf[Double])
- }
- TColumnValue.doubleVal(tDoubleValue)
- } else if (logicalType.isInstanceOf[VarCharType]) {
- val tStringValue = new TStringValue
- if (row.getField(ordinal) != null) {
- tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
- }
- TColumnValue.stringVal(tStringValue)
- } else if (logicalType.isInstanceOf[CharType]) {
- val tStringValue = new TStringValue
- if (row.getField(ordinal) != null) {
- tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
- }
- TColumnValue.stringVal(tStringValue)
- } else {
- val tStrValue = new TStringValue
- if (row.getField(ordinal) != null) {
- // TODO to be done
- }
- TColumnValue.stringVal(tStrValue)
+ logicalType match {
+ case _: BooleanType =>
+ val boolValue = new TBoolValue
+ if (row.getField(ordinal) != null) {
+ boolValue.setValue(row.getField(ordinal).asInstanceOf[Boolean])
+ }
+ TColumnValue.boolVal(boolValue)
+ case _: TinyIntType =>
+ val tI16Value = new TI16Value
+ if (row.getField(ordinal) != null) {
+ tI16Value.setValue(row.getField(ordinal).asInstanceOf[Short])
+ }
+ TColumnValue.i16Val(tI16Value)
+ case _: IntType =>
+ val tI32Value = new TI32Value
+ if (row.getField(ordinal) != null) {
+ tI32Value.setValue(row.getField(ordinal).asInstanceOf[Short])
+ }
+ TColumnValue.i32Val(tI32Value)
+ case _: BigIntType =>
+ val tI64Value = new TI64Value
+ if (row.getField(ordinal) != null) {
+ tI64Value.setValue(row.getField(ordinal).asInstanceOf[Long])
+ }
+ TColumnValue.i64Val(tI64Value)
+ case _: FloatType =>
+ val tDoubleValue = new TDoubleValue
+ if (row.getField(ordinal) != null) {
+ tDoubleValue.setValue(row.getField(ordinal).asInstanceOf[Float])
+ }
+ TColumnValue.doubleVal(tDoubleValue)
+ case _: DoubleType =>
+ val tDoubleValue = new TDoubleValue
+ if (row.getField(ordinal) != null) {
+ tDoubleValue.setValue(row.getField(ordinal).asInstanceOf[Double])
+ }
+ TColumnValue.doubleVal(tDoubleValue)
+ case _: VarCharType =>
+ val tStringValue = new TStringValue
+ if (row.getField(ordinal) != null) {
+ tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
+ }
+ TColumnValue.stringVal(tStringValue)
+ case _: CharType =>
+ val tStringValue = new TStringValue
+ if (row.getField(ordinal) != null) {
+ tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
+ }
+ TColumnValue.stringVal(tStringValue)
+ case _ =>
+ val tStrValue = new TStringValue
+ if (row.getField(ordinal) != null) {
+ // TODO to be done
+ }
+ TColumnValue.stringVal(tStrValue)
}
}
@@ -132,41 +133,23 @@ object RowSet {
ByteBuffer.wrap(bitSet.toByteArray)
}
- private def toTColumn(
- rows: Seq[Row],
- ordinal: Int,
- logicalType: LogicalType): TColumn = {
+ private def toTColumn(rows: Seq[Row], ordinal: Int, logicalType: LogicalType): TColumn = {
val nulls = new java.util.BitSet()
- if (logicalType.isInstanceOf[BooleanType]) {
- val values = getOrSetAsNull[java.lang.Boolean](
- rows,
- ordinal,
- nulls,
- true)
- TColumn.boolVal(new TBoolColumn(values, nulls))
- } else if (logicalType.isInstanceOf[TinyIntType]) {
- val values = getOrSetAsNull[java.lang.Short](
- rows,
- ordinal,
- nulls,
- 0.toShort)
- TColumn.i16Val(new TI16Column(values, nulls))
- } else if (logicalType.isInstanceOf[VarCharType]) {
- val values = getOrSetAsNull[java.lang.String](
- rows,
- ordinal,
- nulls,
- "")
- TColumn.stringVal(new TStringColumn(values, nulls))
- } else if (logicalType.isInstanceOf[CharType]) {
- val values = getOrSetAsNull[java.lang.String](
- rows,
- ordinal,
- nulls,
- "")
- TColumn.stringVal(new TStringColumn(values, nulls))
- } else {
- null
+ logicalType match {
+ case _: BooleanType =>
+ val values = getOrSetAsNull[lang.Boolean](rows, ordinal, nulls, true)
+ TColumn.boolVal(new TBoolColumn(values, nulls))
+ case _: TinyIntType =>
+ val values = getOrSetAsNull[lang.Short](rows, ordinal, nulls, 0.toShort)
+ TColumn.i16Val(new TI16Column(values, nulls))
+ case _: VarCharType =>
+ val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
+ TColumn.stringVal(new TStringColumn(values, nulls))
+ case _: CharType =>
+ val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
+ TColumn.stringVal(new TStringColumn(values, nulls))
+ case _ =>
+ null
}
}
@@ -222,21 +205,13 @@ object RowSet {
ret
}
- def toTTypeId(typ: LogicalType): TTypeId =
- if (typ.isInstanceOf[NullType]) {
- TTypeId.NULL_TYPE
- } else if (typ.isInstanceOf[BooleanType]) {
- TTypeId.BOOLEAN_TYPE
- } else if (typ.isInstanceOf[FloatType]) {
- TTypeId.FLOAT_TYPE
- } else if (typ.isInstanceOf[DoubleType]) {
- TTypeId.DOUBLE_TYPE
- } else if (typ.isInstanceOf[VarCharType]) {
- TTypeId.STRING_TYPE
- } else if (typ.isInstanceOf[CharType]) {
- TTypeId.STRING_TYPE
- } else {
- null
- }
-
+ def toTTypeId(typ: LogicalType): TTypeId = typ match {
+ case _: NullType => TTypeId.NULL_TYPE
+ case _: BooleanType => TTypeId.BOOLEAN_TYPE
+ case _: FloatType => TTypeId.FLOAT_TYPE
+ case _: DoubleType => TTypeId.DOUBLE_TYPE
+ case _: VarCharType => TTypeId.STRING_TYPE
+ case _: CharType => TTypeId.STRING_TYPE
+ case _ => null
+ }
}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
index 1ffc311386a..62a1e5ec9ef 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -17,11 +17,11 @@
package org.apache.kyuubi.engine.flink.session
-import org.apache.flink.table.client.gateway.Executor
import org.apache.flink.table.client.gateway.context.DefaultContext
import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
import org.apache.kyuubi.session.{SessionHandle, SessionManager}
@@ -31,7 +31,7 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
override protected def isServer: Boolean = false
val operationManager = new FlinkSQLOperationManager()
- val executor: Executor = new LocalExecutor(engineContext)
+ val executor = new LocalExecutor(engineContext)
override def start(): Unit = {
super.start()
@@ -44,8 +44,25 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
password: String,
ipAddress: String,
conf: Map[String, String]): SessionHandle = {
- executor.openSession("")
- null
+
+ val sessionHandle = SessionHandle(protocol)
+ val sessionId = sessionHandle.identifier.toString
+
+ executor.openSession(sessionId)
+ val sessionContext = FlinkEngineUtils.getSessionContext(executor, sessionId)
+
+ val sessionImpl = new FlinkSessionImpl(
+ protocol,
+ user,
+ password,
+ ipAddress,
+ conf,
+ this,
+ sessionHandle,
+ sessionContext)
+
+ setSession(sessionHandle, sessionImpl)
+ sessionHandle
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index fe97cda51af..ae6b9d52d08 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -30,15 +30,11 @@ class FlinkSessionImpl(
ipAddress: String,
conf: Map[String, String],
sessionManager: SessionManager,
- sessionContext: SessionContext)
+ val handle: SessionHandle,
+ val sessionContext: SessionContext)
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
- override val handle: SessionHandle = SessionHandle(protocol)
-
- def getSessionContext: SessionContext = sessionContext
-
- def getExecutor: Executor = sessionManager.asInstanceOf[FlinkSQLSessionManager].executor
-
- def getSessionId: String = handle.toString
+ def executor: Executor = sessionManager.asInstanceOf[FlinkSQLSessionManager].executor
+ def sessionId: String = handle.identifier.toString
}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/resources/log4j.properties b/externals/kyuubi-flink-sql-engine/src/test/resources/log4j.properties
new file mode 100644
index 00000000000..f9785ef6ca4
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/test/resources/log4j.properties
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootLogger=DEBUG, CA, FA
+
+# Console Appender
+log4j.appender.CA=org.apache.log4j.ConsoleAppender
+log4j.appender.CA.layout=org.apache.log4j.PatternLayout
+log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
+log4j.appender.CA.Threshold = FATAL
+
+# File Appender
+log4j.appender.FA=org.apache.log4j.FileAppender
+log4j.appender.FA.append=false
+log4j.appender.FA.file=target/unit-tests.log
+log4j.appender.FA.layout=org.apache.log4j.PatternLayout
+log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{2}: %m%n
+
+# Set the logger level of File Appender to WARN
+log4j.appender.FA.Threshold = DEBUG
+
+# SPARK-34128:Suppress undesirable TTransportException warnings involved in THRIFT-4805
+log4j.appender.console.filter.1=org.apache.log4j.varia.StringMatchFilter
+log4j.appender.console.filter.1.StringToMatch=Thrift error occurred during processing of message
+log4j.appender.console.filter.1.AcceptOnMatch=false
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngine.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngine.scala
new file mode 100644
index 00000000000..2ef43cdd8c8
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngine.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink
+
+import scala.collection.JavaConverters._
+
+import org.apache.flink.client.cli.{CustomCommandLine, DefaultCLI}
+import org.apache.flink.configuration.{Configuration, RestOptions}
+import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration}
+import org.apache.flink.table.client.gateway.context.DefaultContext
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+
+trait WithFlinkSQLEngine extends KyuubiFunSuite {
+
+ protected val flinkConfig = new Configuration()
+ protected var miniCluster: MiniCluster = _
+ protected var engine: FlinkSQLEngine = _
+ // conf will be loaded until start flink engine
+ def withKyuubiConf: Map[String, String]
+ val kyuubiConf: KyuubiConf = FlinkSQLEngine.kyuubiConf
+
+ protected var connectionUrl: String = _
+
+ override def beforeAll(): Unit = {
+ startMiniCluster()
+ startFlinkEngine()
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ stopFlinkEngine()
+ miniCluster.close()
+ }
+
+ def startFlinkEngine(): Unit = {
+ withKyuubiConf.foreach { case (k, v) =>
+ System.setProperty(k, v)
+ kyuubiConf.set(k, v)
+ }
+ val engineContext = new DefaultContext(
+ List.empty.asJava,
+ flinkConfig,
+ List[CustomCommandLine](new DefaultCLI).asJava)
+ FlinkSQLEngine.startEngine(engineContext)
+ engine = FlinkSQLEngine.currentEngine.get
+ connectionUrl = engine.frontendServices.head.connectionUrl
+ }
+
+ def stopFlinkEngine(): Unit = {
+ if (engine != null) {
+ engine.stop()
+ engine = null
+ }
+ }
+
+ private def startMiniCluster(): Unit = {
+ val cfg = new MiniClusterConfiguration.Builder()
+ .setConfiguration(flinkConfig)
+ .setNumSlotsPerTaskManager(1)
+ .build
+ miniCluster = new MiniCluster(cfg)
+ miniCluster.start()
+ flinkConfig.setString(RestOptions.ADDRESS, miniCluster.getRestAddress.get().getHost)
+ flinkConfig.setInteger(RestOptions.PORT, miniCluster.getRestAddress.get().getPort)
+ }
+
+ protected def getJdbcUrl: String = s"jdbc:hive2://$connectionUrl/;"
+
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index f58104319b9..3d0cce438d3 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -17,113 +17,46 @@
package org.apache.kyuubi.engine.flink.operation
-import java.net.URL
-import java.util
-import java.util.Collections
-
-import org.apache.flink.client.cli.DefaultCLI
-import org.apache.flink.client.program.ClusterClient
-import org.apache.flink.configuration.{ConfigConstants, Configuration, MemorySize, TaskManagerOptions, WebOptions}
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
-import org.apache.flink.table.client.gateway.context.{DefaultContext, SessionContext}
-import org.apache.flink.table.client.gateway.local.LocalExecutor
-import org.apache.flink.test.util.MiniClusterWithClientResource
-import org.apache.hive.service.rpc.thrift.TProtocolVersion
-
-import org.apache.kyuubi.{KyuubiFunSuite, Utils}
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.flink.session.{FlinkSessionImpl, FlinkSQLSessionManager}
-import org.apache.kyuubi.operation.FetchOrientation
-
-class FlinkOperationSuite extends KyuubiFunSuite {
-
- val user: String = Utils.currentUser
- val password = "anonymous"
-
- val NUM_TMS = 2
- val NUM_SLOTS_PER_TM = 2
-
- private def getConfig = {
- val config = new Configuration
- config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m"))
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS)
- config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM)
- config.setBoolean(WebOptions.SUBMIT_ENABLE, false)
- config
- }
-
- val MINI_CLUSTER_RESOURCE =
- new MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setConfiguration(getConfig)
- .setNumberTaskManagers(NUM_TMS)
- .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM).build)
-
- var clusterClient: ClusterClient[_] = _
-
- var engineContext = new DefaultContext(
- Collections.emptyList(),
- new Configuration,
- Collections.singletonList(new DefaultCLI))
- var sessionContext: SessionContext = _
- var flinkSession: FlinkSessionImpl = _
-
- private def createLocalExecutor: LocalExecutor =
- createLocalExecutor(Collections.emptyList[URL], new Configuration)
-
- private def createLocalExecutor(
- dependencies: util.List[URL],
- configuration: Configuration): LocalExecutor = {
- configuration.addAll(clusterClient.getFlinkConfiguration)
- val defaultContext: DefaultContext = new DefaultContext(
- dependencies,
- configuration,
- Collections.singletonList(new DefaultCLI))
- new LocalExecutor(defaultContext)
- }
-
- override def beforeAll(): Unit = {
- MINI_CLUSTER_RESOURCE.before()
- clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient
-
- sessionContext = SessionContext.create(engineContext, "test-session-id");
- val flinkSQLSessionManager = new FlinkSQLSessionManager(engineContext)
- flinkSQLSessionManager.initialize(KyuubiConf())
- flinkSession = new FlinkSessionImpl(
- TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
- user,
- password,
- "localhost",
- Map(),
- flinkSQLSessionManager,
- sessionContext)
- super.beforeAll()
- }
-
- override def afterAll(): Unit = {
- super.afterAll()
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar._
+
+import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+import org.apache.kyuubi.service.ServiceState._
+
+class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
+ override def withKyuubiConf: Map[String, String] = Map()
+
+ override protected def jdbcUrl: String =
+ s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/"
+
+ ignore("release session if shared level is CONNECTION") {
+ logger.info(s"jdbc url is $jdbcUrl")
+ assert(engine.getServiceState == STARTED)
+ withJdbcStatement() { _ => }
+ eventually(Timeout(20.seconds)) {
+ assert(engine.getServiceState == STOPPED)
+ }
}
test("get catalogs for flink sql") {
- val getCatalogOperation = new GetCatalogs(flinkSession)
- getCatalogOperation.run()
-
- val resultSet = getCatalogOperation.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
- assert(1 == resultSet.getRowsSize)
- assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "default_catalog")
+ withJdbcStatement() { statement =>
+ val meta = statement.getConnection.getMetaData
+ val catalogs = meta.getCatalogs
+ val expected = Set("default_catalog").toIterator
+ while (catalogs.next()) {
+ assert(catalogs.getString("catalogs") === expected.next())
+ }
+ assert(!expected.hasNext)
+ assert(!catalogs.next())
+ }
}
- test("execute statement - select column name with dots") {
- val executeStatementOp = new ExecuteStatement(flinkSession, "select 'tmp.hello'", false, -1)
- val executor = createLocalExecutor
- executor.openSession("test-session")
- executeStatementOp.setExecutor(executor)
- executeStatementOp.setSessionId("test-session")
- executeStatementOp.run()
-
- val resultSet = executeStatementOp.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
- assert(1 == resultSet.getRowsSize)
- assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "tmp.hello")
+ test("execute statement - select column name with dots") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("select 'tmp.hello'")
+ assert(resultSet.next())
+ assert(resultSet.getString(1) === "tmp.hello")
+ }
}
-
}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/LegacyFlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/LegacyFlinkOperationSuite.scala
new file mode 100644
index 00000000000..7a0ff8f6c1e
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/LegacyFlinkOperationSuite.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import java.net.URL
+import java.util
+import java.util.Collections
+
+import org.apache.flink.client.cli.DefaultCLI
+import org.apache.flink.client.program.ClusterClient
+import org.apache.flink.configuration.{ConfigConstants, Configuration, MemorySize, TaskManagerOptions, WebOptions}
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
+import org.apache.flink.table.client.gateway.context.{DefaultContext, SessionContext}
+import org.apache.flink.table.client.gateway.local.LocalExecutor
+import org.apache.flink.test.util.MiniClusterWithClientResource
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.flink.session.{FlinkSessionImpl, FlinkSQLSessionManager}
+import org.apache.kyuubi.operation.FetchOrientation
+import org.apache.kyuubi.session.SessionHandle
+
+class LegacyFlinkOperationSuite extends KyuubiFunSuite {
+
+ val user: String = Utils.currentUser
+ val password = "anonymous"
+
+ val NUM_TMS = 2
+ val NUM_SLOTS_PER_TM = 2
+
+ private def getConfig = {
+ val config = new Configuration
+ config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m"))
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM)
+ config.setBoolean(WebOptions.SUBMIT_ENABLE, false)
+ config
+ }
+
+ val MINI_CLUSTER_RESOURCE =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfig)
+ .setNumberTaskManagers(NUM_TMS)
+ .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM).build)
+
+ var clusterClient: ClusterClient[_] = _
+
+ var engineContext = new DefaultContext(
+ Collections.emptyList(),
+ new Configuration,
+ Collections.singletonList(new DefaultCLI))
+ var sessionContext: SessionContext = _
+ var flinkSession: FlinkSessionImpl = _
+
+ private def createLocalExecutor: LocalExecutor =
+ createLocalExecutor(Collections.emptyList[URL], new Configuration)
+
+ private def createLocalExecutor(
+ dependencies: util.List[URL],
+ configuration: Configuration): LocalExecutor = {
+ configuration.addAll(clusterClient.getFlinkConfiguration)
+ val defaultContext: DefaultContext = new DefaultContext(
+ dependencies,
+ configuration,
+ Collections.singletonList(new DefaultCLI))
+ new LocalExecutor(defaultContext)
+ }
+
+ override def beforeAll(): Unit = {
+ MINI_CLUSTER_RESOURCE.before()
+ clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient
+
+ sessionContext = SessionContext.create(engineContext, "test-session-id");
+ val flinkSQLSessionManager = new FlinkSQLSessionManager(engineContext)
+ flinkSQLSessionManager.initialize(KyuubiConf())
+ flinkSession = new FlinkSessionImpl(
+ TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
+ user,
+ password,
+ "localhost",
+ Map(),
+ flinkSQLSessionManager,
+ SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6),
+ sessionContext)
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ }
+
+ test("get catalogs for flink sql") {
+ val getCatalogOperation = new GetCatalogs(flinkSession)
+ getCatalogOperation.run()
+
+ val resultSet = getCatalogOperation.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
+ assert(1 == resultSet.getRowsSize)
+ assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "default_catalog")
+ }
+
+ test("execute statement - select column name with dots") {
+ val executeStatementOp = new ExecuteStatement(flinkSession, "select 'tmp.hello'", false, -1)
+ val executor = createLocalExecutor
+ executor.openSession("test-session")
+ executeStatementOp.setExecutor(executor)
+ executeStatementOp.setSessionId("test-session")
+ executeStatementOp.run()
+
+ val resultSet = executeStatementOp.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
+ assert(1 == resultSet.getRowsSize)
+ assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "tmp.hello")
+ }
+
+}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index 36e13cf92e1..0b78163e51f 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -201,6 +201,7 @@ object Utils extends Logging {
// The value follows org.apache.spark.util.ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY
// Hooks need to be invoked before the SparkContext stopped shall use a higher priority.
val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50
+ val FLINK_ENGINE_SHUTDOWN_PRIORITY = 50
/**
* Add some operations that you want into ShutdownHook
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 0f5784825ed..41842221680 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -577,6 +577,14 @@ object KyuubiConf {
.stringConf
.createOptional
+ val ENGINE_FLINK_MAIN_RESOURCE: OptionalConfigEntry[String] =
+ buildConf("session.engine.flink.main.resource")
+ .doc("The package used to create Flink SQL engine remote job. If it is undefined," +
+ " Kyuubi will use the default")
+ .version("1.4.0")
+ .stringConf
+ .createOptional
+
val ENGINE_LOGIN_TIMEOUT: ConfigEntry[Long] = buildConf("session.engine.login.timeout")
.doc("The timeout of creating the connection to remote sql query engine")
.version("1.0.0")
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
index b4102638453..e21d82062a5 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
@@ -105,6 +105,7 @@ abstract class ThriftBinaryFrontendService(name: String)
s" [$minThreads, $maxThreads] worker threads")
} catch {
case e: Throwable =>
+ error(e)
throw new KyuubiException(
s"Failed to initialize frontend service on $serverAddr:$portNum.",
e)
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index 169da2c6907..a2a8a63ff0f 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -267,6 +267,13 @@
test
+
+ org.apache.kyuubi
+ kyuubi-flink-sql-engine_${scala.binary.version}
+ ${project.version}
+ test
+
+
org.apache.kyuubi
kyuubi-spark-sql-engine_${scala.binary.version}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 24e32c4012a..6f8dd7406bf 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -32,8 +32,9 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.engine.EngineType.{EngineType, SPARK_SQL}
+import org.apache.kyuubi.engine.EngineType.{EngineType, FLINK_SQL, SPARK_SQL}
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, SERVER, ShareLevel}
+import org.apache.kyuubi.engine.flink.FlinkEngineProcessBuilder
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
@@ -184,8 +185,17 @@ private[kyuubi] class EngineRef(
SparkProcessBuilder.TAG_KEY,
conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
new SparkProcessBuilder(appUser, conf, extraEngineLog)
- case _ => throw new UnsupportedOperationException(s"Unsupported engine type: ${engineType}")
+ case FLINK_SQL =>
+ conf.setIfMissing(FlinkEngineProcessBuilder.APP_KEY, defaultEngineName)
+ // tag is a seq type with comma-separated
+ conf.set(
+ FlinkEngineProcessBuilder.TAG_KEY,
+ conf.getOption(FlinkEngineProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
+ conf.set(HA_ZK_NAMESPACE, engineSpace)
+ conf.set(HA_ZK_ENGINE_REF_ID, engineRefId)
+ new FlinkEngineProcessBuilder(appUser, conf, extraEngineLog)
}
+
MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
try {
info(s"Launching engine:\n$builder")
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index bf1ca92c1b6..55990e05c5c 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -53,6 +53,8 @@ trait ProcBuilder {
protected def env: Map[String, String] = conf.getEnvs
+ protected def childProcEnv: Map[String, String] = env
+
protected val extraEngineLog: Option[OperationLog]
protected val workingDir: Path
@@ -61,7 +63,7 @@ trait ProcBuilder {
val pb = new ProcessBuilder(commands: _*)
val envs = pb.environment()
- envs.putAll(env.asJava)
+ envs.putAll(childProcEnv.asJava)
pb.directory(workingDir.toFile)
pb.redirectError(engineLog)
pb.redirectOutput(engineLog)
@@ -169,7 +171,7 @@ trait ProcBuilder {
case Some(kyuubiHome) =>
val pb = new ProcessBuilder("/bin/sh", s"$kyuubiHome/bin/stop-application.sh", appId)
pb.environment()
- .putAll(env.asJava)
+ .putAll(childProcEnv.asJava)
pb.redirectError(Redirect.appendTo(engineLog))
pb.redirectOutput(Redirect.appendTo(engineLog))
val process = pb.start()
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineProcessBuilder.scala
new file mode 100644
index 00000000000..ca9fc831d5f
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineProcessBuilder.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink
+
+import java.io.{File, FilenameFilter}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+
+import com.google.common.annotations.VisibleForTesting
+
+import org.apache.kyuubi._
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_FLINK_MAIN_RESOURCE
+import org.apache.kyuubi.engine.ProcBuilder
+import org.apache.kyuubi.engine.flink.FlinkEngineProcessBuilder.FLINK_ENGINE_BINARY_FILE
+import org.apache.kyuubi.operation.log.OperationLog
+
+/**
+ * A builder to build flink sql engine progress.
+ */
+class FlinkEngineProcessBuilder(
+ override val proxyUser: String,
+ override val conf: KyuubiConf,
+ val extraEngineLog: Option[OperationLog] = None)
+ extends ProcBuilder with Logging {
+
+ override protected def executable: String = {
+ val flinkEngineHomeOpt = env.get("FLINK_ENGINE_HOME").orElse {
+ val cwd = getClass.getProtectionDomain.getCodeSource.getLocation.getPath
+ .split("kyuubi-server")
+ assert(cwd.length > 1)
+ Option(
+ Paths.get(cwd.head)
+ .resolve("externals")
+ .resolve("kyuubi-flink-sql-engine")
+ .toFile)
+ .map(_.getAbsolutePath)
+ }
+
+ flinkEngineHomeOpt.map { dir =>
+ Paths.get(dir, "bin", FLINK_ENGINE_BINARY_FILE).toAbsolutePath.toFile.getCanonicalPath
+ } getOrElse {
+ throw KyuubiSQLException("FLINK_ENGINE_HOME is not set! " +
+ "For more detail information on installing and configuring Flink, please visit " +
+ "https://kyuubi.apache.org/docs/stable/deployment/settings.html#environments")
+ }
+ }
+
+ override protected def mainResource: Option[String] = {
+ val jarName = s"${module}_$SCALA_COMPILE_VERSION-$KYUUBI_VERSION.jar"
+ // 1. get the main resource jar for user specified config first
+ conf.get(ENGINE_FLINK_MAIN_RESOURCE).filter { userSpecified =>
+ // skip check exist if not local file.
+ val uri = new URI(userSpecified)
+ val schema = if (uri.getScheme != null) uri.getScheme else "file"
+ schema match {
+ case "file" => Files.exists(Paths.get(userSpecified))
+ case _ => true
+ }
+ }.orElse {
+ // 2. get the main resource jar from system build default
+ env.get(KyuubiConf.KYUUBI_HOME)
+ .map { Paths.get(_, "externals", "engines", "flink", jarName) }
+ .filter(Files.exists(_)).map(_.toAbsolutePath.toFile.getCanonicalPath)
+ }.orElse {
+ // 3. get the main resource from dev environment
+ Option(Paths.get("externals", module, "target", jarName))
+ .filter(Files.exists(_)).orElse {
+ Some(Paths.get("..", "externals", module, "target", jarName))
+ }.map(_.toAbsolutePath.toFile.getCanonicalPath)
+ }
+ }
+
+ override protected def module: String = "kyuubi-flink-sql-engine"
+
+ override protected def mainClass: String = "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
+
+ override protected def childProcEnv: Map[String, String] = conf.getEnvs +
+ ("FLINK_HOME" -> FLINK_HOME) +
+ ("FLINK_CONF_DIR" -> s"$FLINK_HOME/conf") +
+ ("FLINK_SQL_ENGINE_JAR" -> mainResource.get) +
+ ("FLINK_SQL_ENGINE_DYNAMIC_ARGS" ->
+ conf.getAll.map { case (k, v) => s"-D$k=$v" }.mkString(" "))
+
+ override protected def commands: Array[String] = Array(executable)
+
+ override protected val workingDir: Path = {
+ env.get("KYUUBI_WORK_DIR_ROOT").map { root =>
+ val workingRoot = Paths.get(root).toAbsolutePath
+ if (!Files.exists(workingRoot)) {
+ debug(s"Creating KYUUBI_WORK_DIR_ROOT at $workingRoot")
+ Files.createDirectories(workingRoot)
+ }
+ if (Files.isDirectory(workingRoot)) {
+ workingRoot.toString
+ } else null
+ }.map { rootAbs =>
+ val working = Paths.get(rootAbs, proxyUser)
+ if (!Files.exists(working)) {
+ debug(s"Creating $proxyUser's working directory at $working")
+ Files.createDirectories(working)
+ }
+ if (Files.isDirectory(working)) {
+ working
+ } else {
+ Utils.createTempDir(rootAbs, proxyUser)
+ }
+ }.getOrElse {
+ Utils.createTempDir(namePrefix = proxyUser)
+ }
+ }
+
+ override def toString: String = commands.map {
+ case arg if arg.startsWith("--") => s"\\\n\t$arg"
+ case arg => arg
+ }.mkString(" ")
+
+ @VisibleForTesting
+ def FLINK_HOME: String = {
+ // prepare FLINK_HOME
+ val flinkHomeOpt = env.get("FLINK_HOME").orElse {
+ val cwd = getClass.getProtectionDomain.getCodeSource.getLocation.getPath
+ .split("kyuubi-server")
+ assert(cwd.length > 1)
+ Option(
+ Paths.get(cwd.head)
+ .resolve("externals")
+ .resolve("kyuubi-download")
+ .resolve("target")
+ .toFile
+ .listFiles(new FilenameFilter {
+ override def accept(dir: File, name: String): Boolean = {
+ dir.isDirectory && name.startsWith("flink-")
+ }
+ }))
+ .flatMap(_.headOption)
+ .map(_.getAbsolutePath)
+ }
+
+ flinkHomeOpt.map { dir =>
+ dir
+ } getOrElse {
+ throw KyuubiSQLException("FLINK_HOME is not set! " +
+ "For more detail information on installing and configuring Flink, please visit " +
+ "https://kyuubi.apache.org/docs/stable/deployment/settings.html#environments")
+ }
+ }
+
+ private def useKeytab(): Boolean = false
+}
+
+object FlinkEngineProcessBuilder {
+ final val APP_KEY = "yarn.application.name"
+ final val TAG_KEY = "yarn.tags"
+
+ final private val FLINK_ENGINE_BINARY_FILE = "flink-sql-engine.sh"
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerAndFlinkLocalCluster.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerAndFlinkLocalCluster.scala
new file mode 100644
index 00000000000..445f6fd6994
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerAndFlinkLocalCluster.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi
+
+import scala.sys.process._
+
+import org.apache.kyuubi.engine.flink.FlinkEngineProcessBuilder
+
+trait WithKyuubiServerAndFlinkLocalCluster extends WithKyuubiServer {
+
+ private lazy val FLINK_HOME: String =
+ new FlinkEngineProcessBuilder(Utils.currentUser, conf).FLINK_HOME
+
+ override def beforeAll(): Unit = {
+ s"$FLINK_HOME/bin/start-cluster.sh".!
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ s"$FLINK_HOME/bin/stop-cluster.sh".!
+ }
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkEngineProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkEngineProcessBuilderSuite.scala
new file mode 100644
index 00000000000..c111e0009c4
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkEngineProcessBuilderSuite.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+
+class FlinkEngineProcessBuilderSuite extends KyuubiFunSuite {
+ private def conf = KyuubiConf().set("kyuubi.on", "off")
+
+ test("flink engine process builder") {
+ val builder = new FlinkEngineProcessBuilder("vinoyang", conf)
+ val commands = builder.toString.split(' ')
+ assert(commands.exists(_ endsWith "flink-sql-engine.sh"))
+ }
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/flink/FlinkOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/flink/FlinkOperationSuite.scala
new file mode 100644
index 00000000000..06202db7094
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/flink/FlinkOperationSuite.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation.flink
+
+import org.apache.kyuubi.WithKyuubiServerAndFlinkLocalCluster
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TYPE, FRONTEND_THRIFT_BINARY_BIND_PORT}
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+
+class FlinkOperationSuite extends WithKyuubiServerAndFlinkLocalCluster with HiveJDBCTestHelper {
+ override val conf: KyuubiConf = KyuubiConf()
+ .set(ENGINE_TYPE, "FLINK_SQL")
+ .set(FRONTEND_THRIFT_BINARY_BIND_PORT, 10019)
+
+ override protected def jdbcUrl: String = getJdbcUrl
+
+ test("get catalogs for flink sql") {
+ withJdbcStatement() { statement =>
+ val meta = statement.getConnection.getMetaData
+ val catalogs = meta.getCatalogs
+ val expected = Set("default_catalog").toIterator
+ while (catalogs.next()) {
+ assert(catalogs.getString("catalogs") === expected.next())
+ }
+ assert(!expected.hasNext)
+ assert(!catalogs.next())
+ }
+ }
+
+ test("execute statement - select column name with dots") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("select 'tmp.hello'")
+ assert(resultSet.next())
+ assert(resultSet.getString(1) === "tmp.hello")
+ }
+ }
+}