Skip to content

Commit

Permalink
[KYUUBI #2002] Support show jars operations
Browse files Browse the repository at this point in the history
  • Loading branch information
link3280 committed Mar 13, 2022
1 parent fbdb009 commit 56cdd12
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class ExecuteStatement(
resultSet = OperationUtils.runAddJarOperation(addJarOperation, executor, sessionId)
case removeJarOperation: RemoveJarOperation =>
resultSet = OperationUtils.runRemoveJarOperation(removeJarOperation, executor, sessionId)
case showJarsOperation: ShowJarsOperation =>
resultSet = OperationUtils.runShowJarOperation(showJarsOperation, executor, sessionId)
case operation: Operation => runOperation(operation)
}
setState(OperationState.FINISHED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ package org.apache.kyuubi.engine.flink.operation

import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.flink.table.api.{DataTypes, ResultKind}
import org.apache.flink.table.catalog.Column
import org.apache.flink.table.client.gateway.Executor
import org.apache.flink.table.operations.command.{AddJarOperation, RemoveJarOperation, ResetOperation, SetOperation}
import org.apache.flink.table.operations.command._
import org.apache.flink.types.Row

import org.apache.kyuubi.engine.flink.result.ResultSet
import org.apache.kyuubi.engine.flink.result.{ResultSet, ResultSetUtil}
import org.apache.kyuubi.engine.flink.result.ResultSetUtil.successResultSet

object OperationUtils {
Expand Down Expand Up @@ -111,12 +112,13 @@ object OperationUtils {
}

/**
* Runs a AddJarOperation with executor. Returns when AddJarOperation is executed successfully.
* Runs a AddJarOperation with the executor. Currently only jars on local filesystem
* are supported.
*
* @param addJarOperation Add-jar operation.
* @param executor A gateway for communicating with Flink and other external systems.
* @param sessionId Id of the session.
* @return A ResultSet of ResetOperation execution.
* @return A ResultSet of AddJarOperation execution.
*/
def runAddJarOperation(
addJarOperation: AddJarOperation,
Expand All @@ -127,13 +129,13 @@ object OperationUtils {
}

/**
* Runs a RemoveJarOperation with executor. Returns when RemoveJarOperation is executed
* successfully.
* Runs a RemoveJarOperation with the executor. Only jars added by AddJarOperation could
* be removed.
*
* @param removeJarOperation Add-jar operation.
* @param executor A gateway for communicating with Flink and other external systems.
* @param sessionId Id of the session.
* @return A ResultSet of ResetOperation execution.
* @return A ResultSet of RemoveJarOperation execution.
*/
def runRemoveJarOperation(
removeJarOperation: RemoveJarOperation,
Expand All @@ -142,4 +144,20 @@ object OperationUtils {
executor.removeJar(sessionId, removeJarOperation.getPath)
successResultSet
}

/**
* Runs a ShowJarsOperation with the executor. Returns the jars of the current session.
*
* @param showJarsOperation Show-jar operation.
* @param executor A gateway for communicating with Flink and other external systems.
* @param sessionId Id of the session.
* @return A ResultSet of ShowJarsOperation execution.
*/
def runShowJarOperation(
showJarsOperation: ShowJarsOperation,
executor: Executor,
sessionId: String): ResultSet = {
val jars = executor.listJars(sessionId)
ResultSetUtil.stringListToResultSet(jars.asScala.toList, "jar")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.kyuubi.engine.flink.operation

import java.nio.file.Files
import java.sql.DatabaseMetaData
import java.util.UUID

import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE
Expand Down Expand Up @@ -778,38 +779,35 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
}
}

test("execute statement - add/remove jar") {
val jarName = "newly-added.jar"
test("execute statement - add/remove/show jar") {
val jarName = s"newly-added-${UUID.randomUUID()}.jar"
val newJar = TestUserClassLoaderJar.createJarFile(
Files.createTempDirectory("add-jar-test").toFile,
jarName,
GENERATED_UDF_CLASS,
GENERATED_UDF_CODE).toPath

withMultipleConnectionJdbcStatement()(
{ statement =>
statement.execute(s"add jar '$newJar'")
val addJarResult = statement.executeQuery("set")
var success = false
while (addJarResult.next()) {
if (addJarResult.getString(1) == "pipeline.jars" &&
addJarResult.getString(2).contains(jarName)) {
success = true
}
withMultipleConnectionJdbcStatement()({ statement =>
statement.execute(s"add jar '$newJar'")

val showJarsResultAdded = statement.executeQuery("show jars")
var exists = false
while (showJarsResultAdded.next()) {
if (showJarsResultAdded.getString(1).contains(jarName)) {
exists = true
}
assert(success)
},
{ statement =>
statement.execute(s"remove jar '$newJar'")
val removeJarResult = statement.executeQuery("set")
var success = false
while (removeJarResult.next()) {
if (removeJarResult.getString(1) == "pipeline.jars" &&
!removeJarResult.getString(2).contains(jarName)) {
success = true
}
}
assert(exists)

statement.execute(s"remove jar '$newJar'")
val showJarsResultRemoved = statement.executeQuery("show jars")
exists = false
while (showJarsResultRemoved.next()) {
if (showJarsResultRemoved.getString(1).contains(jarName)) {
exists = true
}
assert(success)
})
}
assert(!exists)
})
}
}

0 comments on commit 56cdd12

Please sign in to comment.