diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala index 420fc79f02e..e872dc6287f 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala @@ -114,6 +114,8 @@ class ExecuteStatement( resultSet = OperationUtils.runAddJarOperation(addJarOperation, executor, sessionId) case removeJarOperation: RemoveJarOperation => resultSet = OperationUtils.runRemoveJarOperation(removeJarOperation, executor, sessionId) + case showJarsOperation: ShowJarsOperation => + resultSet = OperationUtils.runShowJarOperation(showJarsOperation, executor, sessionId) case operation: Operation => runOperation(operation) } setState(OperationState.FINISHED) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/OperationUtils.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/OperationUtils.scala index b532558123b..eb11a51d52d 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/OperationUtils.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/OperationUtils.scala @@ -19,15 +19,16 @@ package org.apache.kyuubi.engine.flink.operation import java.util +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.flink.table.api.{DataTypes, ResultKind} import org.apache.flink.table.catalog.Column import org.apache.flink.table.client.gateway.Executor -import org.apache.flink.table.operations.command.{AddJarOperation, RemoveJarOperation, ResetOperation, SetOperation} +import org.apache.flink.table.operations.command._ import org.apache.flink.types.Row -import org.apache.kyuubi.engine.flink.result.ResultSet +import org.apache.kyuubi.engine.flink.result.{ResultSet, ResultSetUtil} import org.apache.kyuubi.engine.flink.result.ResultSetUtil.successResultSet object OperationUtils { @@ -111,12 +112,13 @@ object OperationUtils { } /** - * Runs a AddJarOperation with executor. Returns when AddJarOperation is executed successfully. + * Runs a AddJarOperation with the executor. Currently only jars on local filesystem + * are supported. * * @param addJarOperation Add-jar operation. * @param executor A gateway for communicating with Flink and other external systems. * @param sessionId Id of the session. - * @return A ResultSet of ResetOperation execution. + * @return A ResultSet of AddJarOperation execution. */ def runAddJarOperation( addJarOperation: AddJarOperation, @@ -127,13 +129,13 @@ object OperationUtils { } /** - * Runs a RemoveJarOperation with executor. Returns when RemoveJarOperation is executed - * successfully. + * Runs a RemoveJarOperation with the executor. Only jars added by AddJarOperation could + * be removed. * * @param removeJarOperation Add-jar operation. * @param executor A gateway for communicating with Flink and other external systems. * @param sessionId Id of the session. - * @return A ResultSet of ResetOperation execution. + * @return A ResultSet of RemoveJarOperation execution. */ def runRemoveJarOperation( removeJarOperation: RemoveJarOperation, @@ -142,4 +144,20 @@ object OperationUtils { executor.removeJar(sessionId, removeJarOperation.getPath) successResultSet } + + /** + * Runs a ShowJarsOperation with the executor. Returns the jars of the current session. + * + * @param showJarsOperation Show-jar operation. + * @param executor A gateway for communicating with Flink and other external systems. + * @param sessionId Id of the session. + * @return A ResultSet of ShowJarsOperation execution. + */ + def runShowJarOperation( + showJarsOperation: ShowJarsOperation, + executor: Executor, + sessionId: String): ResultSet = { + val jars = executor.listJars(sessionId) + ResultSetUtil.stringListToResultSet(jars.asScala.toList, "jar") + } } diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala index 666b09a8b7f..126d203120c 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala @@ -19,6 +19,7 @@ package org.apache.kyuubi.engine.flink.operation import java.nio.file.Files import java.sql.DatabaseMetaData +import java.util.UUID import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE @@ -778,38 +779,35 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper { } } - test("execute statement - add/remove jar") { - val jarName = "newly-added.jar" + test("execute statement - add/remove/show jar") { + val jarName = s"newly-added-${UUID.randomUUID()}.jar" val newJar = TestUserClassLoaderJar.createJarFile( Files.createTempDirectory("add-jar-test").toFile, jarName, GENERATED_UDF_CLASS, GENERATED_UDF_CODE).toPath - withMultipleConnectionJdbcStatement()( - { statement => - statement.execute(s"add jar '$newJar'") - val addJarResult = statement.executeQuery("set") - var success = false - while (addJarResult.next()) { - if (addJarResult.getString(1) == "pipeline.jars" && - addJarResult.getString(2).contains(jarName)) { - success = true - } + withMultipleConnectionJdbcStatement()({ statement => + statement.execute(s"add jar '$newJar'") + + val showJarsResultAdded = statement.executeQuery("show jars") + var exists = false + while (showJarsResultAdded.next()) { + if (showJarsResultAdded.getString(1).contains(jarName)) { + exists = true } - assert(success) - }, - { statement => - statement.execute(s"remove jar '$newJar'") - val removeJarResult = statement.executeQuery("set") - var success = false - while (removeJarResult.next()) { - if (removeJarResult.getString(1) == "pipeline.jars" && - !removeJarResult.getString(2).contains(jarName)) { - success = true - } + } + assert(exists) + + statement.execute(s"remove jar '$newJar'") + val showJarsResultRemoved = statement.executeQuery("show jars") + exists = false + while (showJarsResultRemoved.next()) { + if (showJarsResultRemoved.getString(1).contains(jarName)) { + exists = true } - assert(success) - }) + } + assert(!exists) + }) } }