diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultSetUtil.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultSetUtil.java deleted file mode 100644 index 1e709ed64b6..00000000000 --- a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultSetUtil.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.engine.flink.result; - -import java.util.List; -import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.ResultKind; -import org.apache.flink.table.catalog.Column; -import org.apache.flink.types.Row; - -/** Utility class for flink result sets. */ -public class ResultSetUtil { - - public static ResultSet stringListToResultSet(List strings, String columnName) { - Row[] rows = strings.stream().map(Row::of).toArray(Row[]::new); - return ResultSet.builder() - .resultKind(ResultKind.SUCCESS_WITH_CONTENT) - .columns(Column.physical(columnName, DataTypes.STRING())) - .data(rows) - .build(); - } - - /** - * Build a simple result with OK message. Returned when SQL commands are executed successfully. - * Noted that a new ResultSet is returned each time, because ResultSet is stateful (with its - * cursor). - * - * @return A simple result with OK message. - */ - public static ResultSet successResultSet() { - return ResultSet.builder() - .resultKind(ResultKind.SUCCESS_WITH_CONTENT) - .columns(Column.physical("result", DataTypes.STRING())) - .data(new Row[] {Row.of("OK")}) - .build(); - } -} diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala index 0737b87093b..b38ceefa8cc 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala @@ -198,7 +198,7 @@ class ExecuteStatement( // reset all properties executor.resetSessionProperties(sessionId) } - resultSet = ResultSetUtil.successResultSet() + resultSet = ResultSetUtil.successResultSet setState(OperationState.FINISHED) } diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala index 4a2506157aa..119a903f253 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala @@ -30,7 +30,7 @@ class GetCatalogs(session: Session) override protected def runInternal(): Unit = { try { val tableEnv = sessionContext.getExecutionContext.getTableEnvironment - val catalogs = tableEnv.listCatalogs.toList.asJava + val catalogs = tableEnv.listCatalogs.toList resultSet = ResultSetUtil.stringListToResultSet(catalogs, TABLE_CAT) } catch onError() } diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTableTypes.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTableTypes.scala index 56247de93ba..19fd8b3578a 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTableTypes.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTableTypes.scala @@ -28,7 +28,7 @@ class GetTableTypes(session: Session) extends FlinkOperation(OperationType.GET_TABLE_TYPES, session) { override protected def runInternal(): Unit = { - val tableTypes = Constants.SUPPORTED_TABLE_TYPES.toList.asJava + val tableTypes = Constants.SUPPORTED_TABLE_TYPES.toList resultSet = ResultSetUtil.stringListToResultSet(tableTypes, TABLE_TYPE) } } diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala index 2d161ba16bb..250a3c0dad0 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala @@ -65,7 +65,7 @@ class GetTables( } } - resultSet = ResultSetUtil.stringListToResultSet(tables.asJava, Constants.SHOW_TABLES_RESULT) + resultSet = ResultSetUtil.stringListToResultSet(tables.toList, Constants.SHOW_TABLES_RESULT) } catch onError() } } diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala new file mode 100644 index 00000000000..9fd3118336f --- /dev/null +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.flink.result; + +import org.apache.flink.table.api.DataTypes +import org.apache.flink.table.api.ResultKind +import org.apache.flink.table.catalog.Column +import org.apache.flink.types.Row + +/** Util for flink result sets. */ +object ResultSetUtil { + + def stringListToResultSet(strings: List[String], columnName: String): ResultSet = { + val rows: Array[Row] = strings.map(s => Row.of(s)).toArray(Array[Row](0)) + ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT).columns(Column.physical( + columnName, + DataTypes.STRING)).data(rows).build + } + + /** + * Build a simple result with OK message. Returned when SQL commands are executed successfully. + * Noted that a new ResultSet is returned each time, because ResultSet is stateful (with its + * cursor). + * + * @return A simple result with OK message. + */ + def successResultSet: ResultSet = + ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT).columns(Column.physical( + "result", + DataTypes.STRING)).data(Array[Row](Row.of("OK"))).build +}