Skip to content

Commit

Permalink
[KYUUBI #1579] Implement basic ability of executing statement in Flin…
Browse files Browse the repository at this point in the history
…k engine

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1603 from yanghua/KYUUBI-1579.

Closes #1579

48db76b [Cheng Pan] cleanup
3670751 [Cheng Pan] Address comments
25ca5ae [yanghua] reduce code
6f18a4a [yanghua] [KYUUBI #1579] Implement basic ability of executing statement

Lead-authored-by: yanghua <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
yanghua and pan3793 committed Dec 22, 2021
1 parent c972139 commit 3673399
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 7 deletions.
6 changes: 6 additions & 0 deletions externals/kyuubi-flink-sql-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@
<artifactId>jul-to-slf4j</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.flink.operation

import java.util
import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import com.google.common.annotations.VisibleForTesting
import org.apache.flink.table.client.gateway.{Executor, ResultDescriptor, TypedResult}
import org.apache.flink.table.operations.QueryOperation
import org.apache.flink.types.Row

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.flink.result.{ColumnInfo, ResultKind, ResultSet}
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.ThreadUtils

class ExecuteStatement(
session: Session,
override val statement: String,
override val shouldRunAsync: Boolean,
queryTimeout: Long)
extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) with Logging {

private val operationLog: OperationLog =
OperationLog.createOperationLog(session, getHandle)

private var resultDescriptor: ResultDescriptor = _

private var columnInfos: util.List[ColumnInfo] = _

private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None

override def getOperationLog: Option[OperationLog] = Option(operationLog)

@VisibleForTesting
override def setExecutor(executor: Executor): Unit = {
this.executor = executor
}

def setSessionId(sessionId: String): Unit = {
this.sessionId = sessionId
}

override protected def beforeRun(): Unit = {
OperationLog.setCurrentOperationLog(operationLog)
setState(OperationState.PENDING)
setHasResultSet(true)
}

override protected def afterRun(): Unit = {
OperationLog.removeCurrentOperationLog()
}

override protected def runInternal(): Unit = {
addTimeoutMonitor()
if (shouldRunAsync) {
val asyncOperation = new Runnable {
override def run(): Unit = {
OperationLog.setCurrentOperationLog(operationLog)
}
}

try {
executeStatement()
val flinkSQLSessionManager = session.sessionManager
val backgroundHandle = flinkSQLSessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)
val ke =
KyuubiSQLException("Error submitting query in background, query rejected", rejected)
setOperationException(ke)
throw ke
}
} else {
executeStatement()
}
}

private def executeStatement(): Unit = {
try {
setState(OperationState.RUNNING)

columnInfos = new util.ArrayList[ColumnInfo]

val operation = executor.parseStatement(sessionId, statement)
resultDescriptor = executor.executeQuery(sessionId, operation.asInstanceOf[QueryOperation])
resultDescriptor.getResultSchema.getColumns.asScala.foreach { column =>
columnInfos.add(ColumnInfo.create(column.getName, column.getDataType.getLogicalType))
}

val resultID = resultDescriptor.getResultId

val rows = new ArrayBuffer[Row]()
var loop = true
while (loop) {
Thread.sleep(50) // slow the processing down

val result = executor.snapshotResult(sessionId, resultID, 2)
result.getType match {
case TypedResult.ResultType.PAYLOAD =>
rows.clear()
(1 to result.getPayload).foreach { page =>
rows ++= executor.retrieveResultPage(resultID, page).asScala
}
case TypedResult.ResultType.EOS => loop = false
case TypedResult.ResultType.EMPTY =>
}
}

resultSet = ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(columnInfos)
.data(rows.toArray[Row])
.build
setState(OperationState.FINISHED)
} catch {
onError(cancel = true)
} finally {
statementTimeoutCleaner.foreach(_.shutdown())
}
}

private def addTimeoutMonitor(): Unit = {
if (queryTimeout > 0) {
val timeoutExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
val action: Runnable = () => cleanup(OperationState.TIMEOUT)
timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
statementTimeoutCleaner = Some(timeoutExecutor)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.IOException

import scala.collection.JavaConverters.collectionAsScalaIterableConverter

import org.apache.flink.table.client.gateway.Executor
import org.apache.flink.table.client.gateway.context.SessionContext
import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}

Expand All @@ -40,8 +41,20 @@ abstract class FlinkOperation(
session: Session)
extends AbstractOperation(opType, session) {

protected val sessionContext: SessionContext =
protected val sessionContext: SessionContext = {
session.asInstanceOf[FlinkSessionImpl].getSessionContext
}

protected var executor: Executor = _

protected def setExecutor(executor: Executor): Unit = {
this.executor = session.asInstanceOf[FlinkSessionImpl].getExecutor
}

protected var sessionId: String = {
session.asInstanceOf[FlinkSessionImpl].getSessionId
}

protected var resultSet: ResultSet = _

override protected def beforeRun(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
session: Session,
statement: String,
runAsync: Boolean,
queryTimeout: Long): Operation = null
queryTimeout: Long): Operation = {
val op = new ExecuteStatement(session, statement, runAsync, queryTimeout)
addOperation(op)
}

override def newGetTypeInfoOperation(session: Session): Operation = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ object RowSet {
tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
}
TColumnValue.stringVal(tStringValue)
} else if (logicalType.isInstanceOf[CharType]) {
val tStringValue = new TStringValue
if (row.getField(ordinal) != null) {
tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
}
TColumnValue.stringVal(tStringValue)
} else {
val tStrValue = new TStringValue
if (row.getField(ordinal) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.kyuubi.engine.flink.session

import org.apache.flink.table.client.gateway.Executor
import org.apache.flink.table.client.gateway.context.DefaultContext
import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.hive.service.rpc.thrift.TProtocolVersion

import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
Expand All @@ -29,13 +31,24 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
override protected def isServer: Boolean = false

val operationManager = new FlinkSQLOperationManager()
val executor: Executor = new LocalExecutor(engineContext)

override def start(): Unit = {
super.start()
executor.start()
}

override def openSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
conf: Map[String, String]): SessionHandle = null

override def closeSession(sessionHandle: SessionHandle): Unit = {}
conf: Map[String, String]): SessionHandle = {
executor.openSession("")
null
}

override def closeSession(sessionHandle: SessionHandle): Unit = {
executor.closeSession(sessionHandle.toString)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kyuubi.engine.flink.session

import org.apache.flink.table.client.gateway.Executor
import org.apache.flink.table.client.gateway.context.SessionContext
import org.apache.hive.service.rpc.thrift.TProtocolVersion

Expand All @@ -36,4 +37,8 @@ class FlinkSessionImpl(

def getSessionContext: SessionContext = sessionContext

def getExecutor: Executor = sessionManager.asInstanceOf[FlinkSQLSessionManager].executor

def getSessionId: String = handle.toString

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@

package org.apache.kyuubi.engine.flink.operation

import java.net.URL
import java.util
import java.util.Collections

import org.apache.flink.client.cli.DefaultCLI
import org.apache.flink.configuration.Configuration
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration.{ConfigConstants, Configuration, MemorySize, TaskManagerOptions, WebOptions}
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.table.client.gateway.context.{DefaultContext, SessionContext}
import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.apache.hive.service.rpc.thrift.TProtocolVersion

import org.apache.kyuubi.{KyuubiFunSuite, Utils}
Expand All @@ -34,14 +40,52 @@ class FlinkOperationSuite extends KyuubiFunSuite {
val user: String = Utils.currentUser
val password = "anonymous"

val NUM_TMS = 2
val NUM_SLOTS_PER_TM = 2

private def getConfig = {
val config = new Configuration
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m"))
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS)
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM)
config.setBoolean(WebOptions.SUBMIT_ENABLE, false)
config
}

val MINI_CLUSTER_RESOURCE =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfig)
.setNumberTaskManagers(NUM_TMS)
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM).build)

var clusterClient: ClusterClient[_] = _

var engineContext = new DefaultContext(
Collections.emptyList(),
new Configuration,
Collections.singletonList(new DefaultCLI))
var sessionContext: SessionContext = _
var flinkSession: FlinkSessionImpl = _

private def createLocalExecutor: LocalExecutor =
createLocalExecutor(Collections.emptyList[URL], new Configuration)

private def createLocalExecutor(
dependencies: util.List[URL],
configuration: Configuration): LocalExecutor = {
configuration.addAll(clusterClient.getFlinkConfiguration)
val defaultContext: DefaultContext = new DefaultContext(
dependencies,
configuration,
Collections.singletonList(new DefaultCLI))
new LocalExecutor(defaultContext)
}

override def beforeAll(): Unit = {
MINI_CLUSTER_RESOURCE.before()
clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient

sessionContext = SessionContext.create(engineContext, "test-session-id");
val flinkSQLSessionManager = new FlinkSQLSessionManager(engineContext)
flinkSQLSessionManager.initialize(KyuubiConf())
Expand All @@ -66,7 +110,20 @@ class FlinkOperationSuite extends KyuubiFunSuite {

val resultSet = getCatalogOperation.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
assert(1 == resultSet.getRowsSize)
assert(resultSet.getRows.get(0).getColVals().get(0).getStringVal.getValue === "default_catalog")
assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "default_catalog")
}

test("execute statement - select column name with dots") {
val executeStatementOp = new ExecuteStatement(flinkSession, "select 'tmp.hello'", false, -1)
val executor = createLocalExecutor
executor.openSession("test-session")
executeStatementOp.setExecutor(executor)
executeStatementOp.setSessionId("test-session")
executeStatementOp.run()

val resultSet = executeStatementOp.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
assert(1 == resultSet.getRowsSize)
assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "tmp.hello")
}

}
20 changes: 20 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,26 @@
<artifactId>flink-sql-client_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down

0 comments on commit 3673399

Please sign in to comment.