+
\ No newline at end of file
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 3d4177e86a7..6d9e5b57406 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -120,108 +120,111 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
### Engine
-| Key | Default | Meaning | Type | Since |
-|----------------------------------------------------------|---------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|--------|
-| kyuubi.engine.chat.ernie.http.connect.timeout | PT2M | The timeout[ms] for establishing the connection with the ernie bot server. A timeout value of zero is interpreted as an infinite timeout. | duration | 1.9.0 |
-| kyuubi.engine.chat.ernie.http.proxy | <undefined> | HTTP proxy url for API calling in ernie bot engine. e.g. http://127.0.0.1:1088 | string | 1.9.0 |
-| kyuubi.engine.chat.ernie.http.socket.timeout | PT2M | The timeout[ms] for waiting for data packets after ernie bot server connection is established. A timeout value of zero is interpreted as an infinite timeout. | duration | 1.9.0 |
-| kyuubi.engine.chat.ernie.model | completions | ID of the model used in ernie bot. Available models are completions_pro, ernie_bot_8k, completions and eb-instant[Model overview](https://cloud.baidu.com/doc/WENXINWORKSHOP/s/6lp69is2a). | string | 1.9.0 |
-| kyuubi.engine.chat.ernie.token | <undefined> | The token to access ernie bot open API, which could be got at https://cloud.baidu.com/doc/WENXINWORKSHOP/s/Ilkkrb0i5 | string | 1.9.0 |
-| kyuubi.engine.chat.extra.classpath | <undefined> | The extra classpath for the Chat engine, for configuring the location of the SDK and etc. | string | 1.8.0 |
-| kyuubi.engine.chat.gpt.apiKey | <undefined> | The key to access OpenAI open API, which could be got at https://platform.openai.com/account/api-keys | string | 1.8.0 |
-| kyuubi.engine.chat.gpt.http.connect.timeout | PT2M | The timeout[ms] for establishing the connection with the Chat GPT server. A timeout value of zero is interpreted as an infinite timeout. | duration | 1.8.0 |
-| kyuubi.engine.chat.gpt.http.proxy | <undefined> | HTTP proxy url for API calling in Chat GPT engine. e.g. http://127.0.0.1:1087 | string | 1.8.0 |
-| kyuubi.engine.chat.gpt.http.socket.timeout | PT2M | The timeout[ms] for waiting for data packets after Chat GPT server connection is established. A timeout value of zero is interpreted as an infinite timeout. | duration | 1.8.0 |
-| kyuubi.engine.chat.gpt.model | gpt-3.5-turbo | ID of the model used in ChatGPT. Available models refer to OpenAI's [Model overview](https://platform.openai.com/docs/models/overview). | string | 1.8.0 |
-| kyuubi.engine.chat.java.options | <undefined> | The extra Java options for the Chat engine | string | 1.8.0 |
-| kyuubi.engine.chat.memory | 1g | The heap memory for the Chat engine | string | 1.8.0 |
-| kyuubi.engine.chat.provider | ECHO | The provider for the Chat engine. Candidates:
ECHO: simply replies a welcome message.
GPT: a.k.a ChatGPT, powered by OpenAI.
ERNIE: ErnieBot, powered by Baidu.
| string | 1.8.0 |
-| kyuubi.engine.connection.url.use.hostname | true | (deprecated) When true, the engine registers with hostname to zookeeper. When Spark runs on K8s with cluster mode, set to false to ensure that server can connect to engine | boolean | 1.3.0 |
-| kyuubi.engine.deregister.exception.classes || A comma-separated list of exception classes. If there is any exception thrown, whose class matches the specified classes, the engine would deregister itself. | set | 1.2.0 |
-| kyuubi.engine.deregister.exception.messages || A comma-separated list of exception messages. If there is any exception thrown, whose message or stacktrace matches the specified message list, the engine would deregister itself. | set | 1.2.0 |
-| kyuubi.engine.deregister.exception.ttl | PT30M | Time to live(TTL) for exceptions pattern specified in kyuubi.engine.deregister.exception.classes and kyuubi.engine.deregister.exception.messages to deregister engines. Once the total error count hits the kyuubi.engine.deregister.job.max.failures within the TTL, an engine will deregister itself and wait for self-terminated. Otherwise, we suppose that the engine has recovered from temporary failures. | duration | 1.2.0 |
-| kyuubi.engine.deregister.job.max.failures | 4 | Number of failures of job before deregistering the engine. | int | 1.2.0 |
-| kyuubi.engine.doAs.enabled | true | Whether to enable user impersonation on launching engine. When enabled, for engines which supports user impersonation, e.g. SPARK, depends on the `kyuubi.engine.share.level`, different users will be used to launch the engine. Otherwise, Kyuubi Server's user will always be used to launch the engine. | boolean | 1.9.0 |
-| kyuubi.engine.event.json.log.path | file:///tmp/kyuubi/events | The location where all the engine events go for the built-in JSON logger.
Local Path: start with 'file://'
HDFS Path: start with 'hdfs://'
| string | 1.3.0 |
-| kyuubi.engine.event.loggers | SPARK | A comma-separated list of engine history loggers, where engine/session/operation etc events go.
SPARK: the events will be written to the Spark listener bus.
JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
JDBC: to be done
CUSTOM: User-defined event handlers.
Note that: Kyuubi supports custom event handlers with the Java SPI. To register a custom event handler, the user needs to implement a subclass of `org.apache.kyuubi.events.handler.CustomEventHandlerProvider` which has a zero-arg constructor. | seq | 1.3.0 |
-| kyuubi.engine.flink.application.jars | <undefined> | A comma-separated list of the local jars to be shipped with the job to the cluster. For example, SQL UDF jars. Only effective in yarn application mode. | string | 1.8.0 |
-| kyuubi.engine.flink.extra.classpath | <undefined> | The extra classpath for the Flink SQL engine, for configuring the location of hadoop client jars, etc. Only effective in yarn session mode. | string | 1.6.0 |
-| kyuubi.engine.flink.initialize.sql | SHOW DATABASES | The initialize sql for Flink engine. It fallback to `kyuubi.engine.initialize.sql`. | seq | 1.8.1 |
-| kyuubi.engine.flink.java.options | <undefined> | The extra Java options for the Flink SQL engine. Only effective in yarn session mode. | string | 1.6.0 |
-| kyuubi.engine.flink.memory | 1g | The heap memory for the Flink SQL engine. Only effective in yarn session mode. | string | 1.6.0 |
-| kyuubi.engine.hive.deploy.mode | LOCAL | Configures the hive engine deploy mode, The value can be 'local', 'yarn'. In local mode, the engine operates on the same node as the KyuubiServer. In YARN mode, the engine runs within the Application Master (AM) container of YARN. | string | 1.9.0 |
-| kyuubi.engine.hive.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go.
JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
JDBC: to be done
CUSTOM: to be done.
| seq | 1.7.0 |
-| kyuubi.engine.hive.extra.classpath | <undefined> | The extra classpath for the Hive query engine, for configuring location of the hadoop client jars and etc. | string | 1.6.0 |
-| kyuubi.engine.hive.java.options | <undefined> | The extra Java options for the Hive query engine | string | 1.6.0 |
-| kyuubi.engine.hive.memory | 1g | The heap memory for the Hive query engine | string | 1.6.0 |
-| kyuubi.engine.initialize.sql | SHOW DATABASES | SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. i.e. use `SHOW DATABASES` to eagerly active HiveClient. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver. | seq | 1.2.0 |
-| kyuubi.engine.jdbc.connection.password | <undefined> | The password is used for connecting to server | string | 1.6.0 |
-| kyuubi.engine.jdbc.connection.propagateCredential | false | Whether to use the session's user and password to connect to database | boolean | 1.8.0 |
-| kyuubi.engine.jdbc.connection.properties || The additional properties are used for connecting to server | seq | 1.6.0 |
-| kyuubi.engine.jdbc.connection.provider | <undefined> | A JDBC connection provider plugin for the Kyuubi Server to establish a connection to the JDBC URL. The configuration value should be a subclass of `org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider`. Kyuubi provides the following built-in implementations:
doris: For establishing Doris connections.
mysql: For establishing MySQL connections.
phoenix: For establishing Phoenix connections.
postgresql: For establishing PostgreSQL connections.
starrocks: For establishing StarRocks connections.
impala: For establishing Impala connections.
clickhouse: For establishing clickhouse connections.
| string | 1.6.0 |
-| kyuubi.engine.jdbc.connection.url | <undefined> | The server url that engine will connect to | string | 1.6.0 |
-| kyuubi.engine.jdbc.connection.user | <undefined> | The user is used for connecting to server | string | 1.6.0 |
-| kyuubi.engine.jdbc.deploy.mode | LOCAL | Configures the jdbc engine deploy mode, The value can be 'local', 'yarn'. In local mode, the engine operates on the same node as the KyuubiServer. In YARN mode, the engine runs within the Application Master (AM) container of YARN. | string | 1.10.0 |
-| kyuubi.engine.jdbc.driver.class | <undefined> | The driver class for JDBC engine connection | string | 1.6.0 |
-| kyuubi.engine.jdbc.extra.classpath | <undefined> | The extra classpath for the JDBC query engine, for configuring the location of the JDBC driver and etc. | string | 1.6.0 |
-| kyuubi.engine.jdbc.fetch.size | 1000 | The fetch size of JDBC engine | int | 1.9.0 |
-| kyuubi.engine.jdbc.initialize.sql | SELECT 1 | SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. i.e. use `SELECT 1` to eagerly active JDBCClient. | seq | 1.8.0 |
-| kyuubi.engine.jdbc.java.options | <undefined> | The extra Java options for the JDBC query engine | string | 1.6.0 |
-| kyuubi.engine.jdbc.memory | 1g | The heap memory for the JDBC query engine | string | 1.6.0 |
-| kyuubi.engine.jdbc.operation.incremental.collect | false | When true, the result will be sequentially calculated and returned to the JDBC engine. It fallback to `kyuubi.operation.incremental.collect` | boolean | 1.10.0 |
-| kyuubi.engine.jdbc.session.initialize.sql || SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. | seq | 1.8.0 |
-| kyuubi.engine.jdbc.type | <undefined> | The short name of JDBC type | string | 1.6.0 |
-| kyuubi.engine.keytab | <undefined> | Kerberos keytab for the kyuubi engine. | string | 1.10.0 |
-| kyuubi.engine.kubernetes.submit.timeout | PT30S | The engine submit timeout for Kubernetes application. | duration | 1.7.2 |
-| kyuubi.engine.operation.convert.catalog.database.enabled | true | When set to true, The engine converts the JDBC methods of set/get Catalog and set/get Schema to the implementation of different engines | boolean | 1.6.0 |
-| kyuubi.engine.operation.log.dir.root | engine_operation_logs | Root directory for query operation log at engine-side. | string | 1.4.0 |
-| kyuubi.engine.pool.name | engine-pool | The name of the engine pool. | string | 1.5.0 |
-| kyuubi.engine.pool.selectPolicy | RANDOM | The select policy of an engine from the corresponding engine pool engine for a session.
RANDOM - Randomly use the engine in the pool
POLLING - Polling use the engine in the pool
| string | 1.7.0 |
-| kyuubi.engine.pool.size | -1 | The size of the engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold). | int | 1.4.0 |
-| kyuubi.engine.pool.size.threshold | 9 | This parameter is introduced as a server-side parameter controlling the upper limit of the engine pool. | int | 1.4.0 |
-| kyuubi.engine.principal | <undefined> | Kerberos principal for the kyuubi engine. | string | 1.10.0 |
-| kyuubi.engine.session.initialize.sql || SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver. | seq | 1.3.0 |
-| kyuubi.engine.share.level | USER | Engines will be shared in different levels, available configs are:
CONNECTION: the engine will not be shared but only used by the current client connection, and the engine will be launched by session user.
USER: the engine will be shared by all sessions created by a unique username, and the engine will be launched by session user.
GROUP: the engine will be shared by all sessions created by all users belong to the same primary group name. The engine will be launched by the primary group name as the effective username, so here the group name is in value of special user who is able to visit the computing resources/data of the team. It follows the [Hadoop GroupsMapping](https://reurl.cc/xE61Y5) to map user to a primary group. If the primary group is not found, it fallback to the USER level.
SERVER: the engine will be shared by Kyuubi servers, and the engine will be launched by Server's user.
See also `kyuubi.engine.share.level.subdomain` and `kyuubi.engine.doAs.enabled`. | string | 1.2.0 |
-| kyuubi.engine.share.level.sub.domain | <undefined> | (deprecated) - Using kyuubi.engine.share.level.subdomain instead | string | 1.2.0 |
-| kyuubi.engine.share.level.subdomain | <undefined> | Allow end-users to create a subdomain for the share level of an engine. A subdomain is a case-insensitive string values that must be a valid zookeeper subpath. For example, for the `USER` share level, an end-user can share a certain engine within a subdomain, not for all of its clients. End-users are free to create multiple engines in the `USER` share level. When disable engine pool, use 'default' if absent. | string | 1.4.0 |
-| kyuubi.engine.single.spark.session | false | When set to true, this engine is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database. | boolean | 1.3.0 |
-| kyuubi.engine.spark.event.loggers | SPARK | A comma-separated list of engine loggers, where engine/session/operation etc events go.
SPARK: the events will be written to the Spark listener bus.
JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
JDBC: to be done
CUSTOM: to be done.
| seq | 1.7.0 |
-| kyuubi.engine.spark.initialize.sql | SHOW DATABASES | The initialize sql for Spark engine. It fallback to `kyuubi.engine.initialize.sql`. | seq | 1.8.1 |
-| kyuubi.engine.spark.operation.incremental.collect | false | When true, the result will be sequentially calculated and returned to the Spark driver. Note that, kyuubi.operation.result.max.rows will be ignored on incremental collect mode. It fallback to `kyuubi.operation.incremental.collect` | boolean | 1.10.0 |
-| kyuubi.engine.spark.output.mode | AUTO | The output mode of Spark engine:
AUTO: For PySpark, the extracted `text/plain` from python response as output.
NOTEBOOK: For PySpark, the original python response as output.
| string | 1.9.0 |
-| kyuubi.engine.spark.python.env.archive | <undefined> | Portable Python env archive used for Spark engine Python language mode. | string | 1.7.0 |
-| kyuubi.engine.spark.python.env.archive.exec.path | bin/python | The Python exec path under the Python env archive. | string | 1.7.0 |
-| kyuubi.engine.spark.python.home.archive | <undefined> | Spark archive containing $SPARK_HOME/python directory, which is used to init session Python worker for Python language mode. | string | 1.7.0 |
-| kyuubi.engine.submit.timeout | PT30S | Period to tolerant Driver Pod ephemerally invisible after submitting. In some Resource Managers, e.g. K8s, the Driver Pod is not visible immediately after `spark-submit` is returned. | duration | 1.7.1 |
-| kyuubi.engine.trino.connection.keystore.password | <undefined> | The keystore password used for connecting to trino cluster | string | 1.8.0 |
-| kyuubi.engine.trino.connection.keystore.path | <undefined> | The keystore path used for connecting to trino cluster | string | 1.8.0 |
-| kyuubi.engine.trino.connection.keystore.type | <undefined> | The keystore type used for connecting to trino cluster | string | 1.8.0 |
-| kyuubi.engine.trino.connection.password | <undefined> | The password used for connecting to trino cluster | string | 1.8.0 |
-| kyuubi.engine.trino.connection.truststore.password | <undefined> | The truststore password used for connecting to trino cluster | string | 1.8.0 |
-| kyuubi.engine.trino.connection.truststore.path | <undefined> | The truststore path used for connecting to trino cluster | string | 1.8.0 |
-| kyuubi.engine.trino.connection.truststore.type | <undefined> | The truststore type used for connecting to trino cluster | string | 1.8.0 |
-| kyuubi.engine.trino.connection.user | <undefined> | The user used for connecting to trino cluster | string | 1.9.0 |
-| kyuubi.engine.trino.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go.
JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
JDBC: to be done
CUSTOM: to be done.
| seq | 1.7.0 |
-| kyuubi.engine.trino.extra.classpath | <undefined> | The extra classpath for the Trino query engine, for configuring other libs which may need by the Trino engine | string | 1.6.0 |
-| kyuubi.engine.trino.java.options | <undefined> | The extra Java options for the Trino query engine | string | 1.6.0 |
-| kyuubi.engine.trino.memory | 1g | The heap memory for the Trino query engine | string | 1.6.0 |
-| kyuubi.engine.trino.operation.incremental.collect | false | When true, the result will be sequentially calculated and returned to the trino. It fallback to `kyuubi.operation.incremental.collect` | boolean | 1.10.0 |
-| kyuubi.engine.type | SPARK_SQL | Specify the detailed engine supported by Kyuubi. The engine type bindings to SESSION scope. This configuration is experimental. Currently, available configs are:
SPARK_SQL: specify this engine type will launch a Spark engine which can provide all the capacity of the Apache Spark. Note, it's a default engine type.
FLINK_SQL: specify this engine type will launch a Flink engine which can provide all the capacity of the Apache Flink.
TRINO: specify this engine type will launch a Trino engine which can provide all the capacity of the Trino.
HIVE_SQL: specify this engine type will launch a Hive engine which can provide all the capacity of the Hive Server2.
JDBC: specify this engine type will launch a JDBC engine which can forward queries to the database system through the certain JDBC driver, for now, it supports Doris, MySQL, Phoenix, PostgreSQL, StarRocks, Impala and ClickHouse.
CHAT: specify this engine type will launch a Chat engine.
| string | 1.4.0 |
-| kyuubi.engine.ui.retainedSessions | 200 | The number of SQL client sessions kept in the Kyuubi Query Engine web UI. | int | 1.4.0 |
-| kyuubi.engine.ui.retainedStatements | 200 | The number of statements kept in the Kyuubi Query Engine web UI. | int | 1.4.0 |
-| kyuubi.engine.ui.stop.enabled | true | When true, allows Kyuubi engine to be killed from the Spark Web UI. | boolean | 1.3.0 |
-| kyuubi.engine.user.isolated.spark.session | true | When set to false, if the engine is running in a group or server share level, all the JDBC/ODBC connections will be isolated against the user. Including the temporary views, function registries, SQL configuration, and the current database. Note that, it does not affect if the share level is connection or user. | boolean | 1.6.0 |
-| kyuubi.engine.user.isolated.spark.session.idle.interval | PT1M | The interval to check if the user-isolated Spark session is timeout. | duration | 1.6.0 |
-| kyuubi.engine.user.isolated.spark.session.idle.timeout | PT6H | If kyuubi.engine.user.isolated.spark.session is false, we will release the Spark session if its corresponding user is inactive after this configured timeout. | duration | 1.6.0 |
-| kyuubi.engine.yarn.app.name | <undefined> | The YARN app name when the engine deploy mode is YARN. | string | 1.9.0 |
-| kyuubi.engine.yarn.cores | 1 | kyuubi engine container core number when the engine deploy mode is YARN. | int | 1.9.0 |
-| kyuubi.engine.yarn.java.options | <undefined> | The extra Java options for the AM when the engine deploy mode is YARN. | string | 1.9.0 |
-| kyuubi.engine.yarn.memory | 1024 | kyuubi engine container memory in mb when the engine deploy mode is YARN. | int | 1.9.0 |
-| kyuubi.engine.yarn.priority | <undefined> | kyuubi engine yarn priority when the engine deploy mode is YARN. | int | 1.9.0 |
-| kyuubi.engine.yarn.queue | default | kyuubi engine yarn queue when the engine deploy mode is YARN. | string | 1.9.0 |
-| kyuubi.engine.yarn.stagingDir | <undefined> | Staging directory used while submitting kyuubi engine to YARN, It should be a absolute path in HDFS. | string | 1.9.0 |
-| kyuubi.engine.yarn.submit.timeout | PT30S | The engine submit timeout for YARN application. | duration | 1.7.2 |
-| kyuubi.engine.yarn.tags | <undefined> | kyuubi engine yarn tags when the engine deploy mode is YARN. | seq | 1.9.0 |
+| Key | Default | Meaning | Type | Since |
+|-----------------------------------------------------------|---------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|--------|
+| kyuubi.engine.chat.ernie.http.connect.timeout | PT2M | The timeout[ms] for establishing the connection with the ernie bot server. A timeout value of zero is interpreted as an infinite timeout. | duration | 1.9.0 |
+| kyuubi.engine.chat.ernie.http.proxy | <undefined> | HTTP proxy url for API calling in ernie bot engine. e.g. http://127.0.0.1:1088 | string | 1.9.0 |
+| kyuubi.engine.chat.ernie.http.socket.timeout | PT2M | The timeout[ms] for waiting for data packets after ernie bot server connection is established. A timeout value of zero is interpreted as an infinite timeout. | duration | 1.9.0 |
+| kyuubi.engine.chat.ernie.model | completions | ID of the model used in ernie bot. Available models are completions_pro, ernie_bot_8k, completions and eb-instant[Model overview](https://cloud.baidu.com/doc/WENXINWORKSHOP/s/6lp69is2a). | string | 1.9.0 |
+| kyuubi.engine.chat.ernie.token | <undefined> | The token to access ernie bot open API, which could be got at https://cloud.baidu.com/doc/WENXINWORKSHOP/s/Ilkkrb0i5 | string | 1.9.0 |
+| kyuubi.engine.chat.extra.classpath | <undefined> | The extra classpath for the Chat engine, for configuring the location of the SDK and etc. | string | 1.8.0 |
+| kyuubi.engine.chat.gpt.apiKey | <undefined> | The key to access OpenAI open API, which could be got at https://platform.openai.com/account/api-keys | string | 1.8.0 |
+| kyuubi.engine.chat.gpt.http.connect.timeout | PT2M | The timeout[ms] for establishing the connection with the Chat GPT server. A timeout value of zero is interpreted as an infinite timeout. | duration | 1.8.0 |
+| kyuubi.engine.chat.gpt.http.proxy | <undefined> | HTTP proxy url for API calling in Chat GPT engine. e.g. http://127.0.0.1:1087 | string | 1.8.0 |
+| kyuubi.engine.chat.gpt.http.socket.timeout | PT2M | The timeout[ms] for waiting for data packets after Chat GPT server connection is established. A timeout value of zero is interpreted as an infinite timeout. | duration | 1.8.0 |
+| kyuubi.engine.chat.gpt.model | gpt-3.5-turbo | ID of the model used in ChatGPT. Available models refer to OpenAI's [Model overview](https://platform.openai.com/docs/models/overview). | string | 1.8.0 |
+| kyuubi.engine.chat.java.options | <undefined> | The extra Java options for the Chat engine | string | 1.8.0 |
+| kyuubi.engine.chat.memory | 1g | The heap memory for the Chat engine | string | 1.8.0 |
+| kyuubi.engine.chat.provider | ECHO | The provider for the Chat engine. Candidates:
ECHO: simply replies a welcome message.
GPT: a.k.a ChatGPT, powered by OpenAI.
ERNIE: ErnieBot, powered by Baidu.
| string | 1.8.0 |
+| kyuubi.engine.connection.url.use.hostname | true | (deprecated) When true, the engine registers with hostname to zookeeper. When Spark runs on K8s with cluster mode, set to false to ensure that server can connect to engine | boolean | 1.3.0 |
+| kyuubi.engine.deregister.exception.classes || A comma-separated list of exception classes. If there is any exception thrown, whose class matches the specified classes, the engine would deregister itself. | set | 1.2.0 |
+| kyuubi.engine.deregister.exception.messages || A comma-separated list of exception messages. If there is any exception thrown, whose message or stacktrace matches the specified message list, the engine would deregister itself. | set | 1.2.0 |
+| kyuubi.engine.deregister.exception.ttl | PT30M | Time to live(TTL) for exceptions pattern specified in kyuubi.engine.deregister.exception.classes and kyuubi.engine.deregister.exception.messages to deregister engines. Once the total error count hits the kyuubi.engine.deregister.job.max.failures within the TTL, an engine will deregister itself and wait for self-terminated. Otherwise, we suppose that the engine has recovered from temporary failures. | duration | 1.2.0 |
+| kyuubi.engine.deregister.job.max.failures | 4 | Number of failures of job before deregistering the engine. | int | 1.2.0 |
+| kyuubi.engine.doAs.enabled | true | Whether to enable user impersonation on launching engine. When enabled, for engines which supports user impersonation, e.g. SPARK, depends on the `kyuubi.engine.share.level`, different users will be used to launch the engine. Otherwise, Kyuubi Server's user will always be used to launch the engine. | boolean | 1.9.0 |
+| kyuubi.engine.event.json.log.path | file:///tmp/kyuubi/events | The location where all the engine events go for the built-in JSON logger.
Local Path: start with 'file://'
HDFS Path: start with 'hdfs://'
| string | 1.3.0 |
+| kyuubi.engine.event.loggers | SPARK | A comma-separated list of engine history loggers, where engine/session/operation etc events go.
SPARK: the events will be written to the Spark listener bus.
JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
JDBC: to be done
CUSTOM: User-defined event handlers.
Note that: Kyuubi supports custom event handlers with the Java SPI. To register a custom event handler, the user needs to implement a subclass of `org.apache.kyuubi.events.handler.CustomEventHandlerProvider` which has a zero-arg constructor. | seq | 1.3.0 |
+| kyuubi.engine.flink.application.jars | <undefined> | A comma-separated list of the local jars to be shipped with the job to the cluster. For example, SQL UDF jars. Only effective in yarn application mode. | string | 1.8.0 |
+| kyuubi.engine.flink.extra.classpath | <undefined> | The extra classpath for the Flink SQL engine, for configuring the location of hadoop client jars, etc. Only effective in yarn session mode. | string | 1.6.0 |
+| kyuubi.engine.flink.initialize.sql | SHOW DATABASES | The initialize sql for Flink engine. It fallback to `kyuubi.engine.initialize.sql`. | seq | 1.8.1 |
+| kyuubi.engine.flink.java.options | <undefined> | The extra Java options for the Flink SQL engine. Only effective in yarn session mode. | string | 1.6.0 |
+| kyuubi.engine.flink.memory | 1g | The heap memory for the Flink SQL engine. Only effective in yarn session mode. | string | 1.6.0 |
+| kyuubi.engine.hive.deploy.mode | LOCAL | Configures the hive engine deploy mode, The value can be 'local', 'yarn'. In local mode, the engine operates on the same node as the KyuubiServer. In YARN mode, the engine runs within the Application Master (AM) container of YARN. | string | 1.9.0 |
+| kyuubi.engine.hive.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go.
JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
JDBC: to be done
CUSTOM: to be done.
| seq | 1.7.0 |
+| kyuubi.engine.hive.extra.classpath | <undefined> | The extra classpath for the Hive query engine, for configuring location of the hadoop client jars and etc. | string | 1.6.0 |
+| kyuubi.engine.hive.java.options | <undefined> | The extra Java options for the Hive query engine | string | 1.6.0 |
+| kyuubi.engine.hive.memory | 1g | The heap memory for the Hive query engine | string | 1.6.0 |
+| kyuubi.engine.initialize.sql | SHOW DATABASES | SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. i.e. use `SHOW DATABASES` to eagerly active HiveClient. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver. | seq | 1.2.0 |
+| kyuubi.engine.jdbc.connection.password | <undefined> | The password is used for connecting to server | string | 1.6.0 |
+| kyuubi.engine.jdbc.connection.propagateCredential | false | Whether to use the session's user and password to connect to database | boolean | 1.8.0 |
+| kyuubi.engine.jdbc.connection.properties || The additional properties are used for connecting to server | seq | 1.6.0 |
+| kyuubi.engine.jdbc.connection.provider | <undefined> | A JDBC connection provider plugin for the Kyuubi Server to establish a connection to the JDBC URL. The configuration value should be a subclass of `org.apache.kyuubi.engine.jdbc.connection.JdbcConnectionProvider`. Kyuubi provides the following built-in implementations:
doris: For establishing Doris connections.
mysql: For establishing MySQL connections.
phoenix: For establishing Phoenix connections.
postgresql: For establishing PostgreSQL connections.
starrocks: For establishing StarRocks connections.
impala: For establishing Impala connections.
clickhouse: For establishing clickhouse connections.
| string | 1.6.0 |
+| kyuubi.engine.jdbc.connection.url | <undefined> | The server url that engine will connect to | string | 1.6.0 |
+| kyuubi.engine.jdbc.connection.user | <undefined> | The user is used for connecting to server | string | 1.6.0 |
+| kyuubi.engine.jdbc.deploy.mode | LOCAL | Configures the jdbc engine deploy mode, The value can be 'local', 'yarn'. In local mode, the engine operates on the same node as the KyuubiServer. In YARN mode, the engine runs within the Application Master (AM) container of YARN. | string | 1.10.0 |
+| kyuubi.engine.jdbc.driver.class | <undefined> | The driver class for JDBC engine connection | string | 1.6.0 |
+| kyuubi.engine.jdbc.extra.classpath | <undefined> | The extra classpath for the JDBC query engine, for configuring the location of the JDBC driver and etc. | string | 1.6.0 |
+| kyuubi.engine.jdbc.fetch.size | 1000 | The fetch size of JDBC engine | int | 1.9.0 |
+| kyuubi.engine.jdbc.initialize.sql | SELECT 1 | SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. i.e. use `SELECT 1` to eagerly active JDBCClient. | seq | 1.8.0 |
+| kyuubi.engine.jdbc.java.options | <undefined> | The extra Java options for the JDBC query engine | string | 1.6.0 |
+| kyuubi.engine.jdbc.memory | 1g | The heap memory for the JDBC query engine | string | 1.6.0 |
+| kyuubi.engine.jdbc.operation.incremental.collect | false | When true, the result will be sequentially calculated and returned to the JDBC engine. It fallback to `kyuubi.operation.incremental.collect` | boolean | 1.10.0 |
+| kyuubi.engine.jdbc.session.initialize.sql || SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. | seq | 1.8.0 |
+| kyuubi.engine.jdbc.type | <undefined> | The short name of JDBC type | string | 1.6.0 |
+| kyuubi.engine.keytab | <undefined> | Kerberos keytab for the kyuubi engine. | string | 1.10.0 |
+| kyuubi.engine.kubernetes.submit.timeout | PT30S | The engine submit timeout for Kubernetes application. | duration | 1.7.2 |
+| kyuubi.engine.operation.convert.catalog.database.enabled | true | When set to true, The engine converts the JDBC methods of set/get Catalog and set/get Schema to the implementation of different engines | boolean | 1.6.0 |
+| kyuubi.engine.operation.log.dir.root | engine_operation_logs | Root directory for query operation log at engine-side. | string | 1.4.0 |
+| kyuubi.engine.pool.name | engine-pool | The name of the engine pool. | string | 1.5.0 |
+| kyuubi.engine.pool.selectPolicy | RANDOM | The select policy of an engine from the corresponding engine pool engine for a session.
RANDOM - Randomly use the engine in the pool
POLLING - Polling use the engine in the pool
| string | 1.7.0 |
+| kyuubi.engine.pool.size | -1 | The size of the engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold). | int | 1.4.0 |
+| kyuubi.engine.pool.size.threshold | 9 | This parameter is introduced as a server-side parameter controlling the upper limit of the engine pool. | int | 1.4.0 |
+| kyuubi.engine.principal | <undefined> | Kerberos principal for the kyuubi engine. | string | 1.10.0 |
+| kyuubi.engine.session.initialize.sql || SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver. | seq | 1.3.0 |
+| kyuubi.engine.share.level | USER | Engines will be shared in different levels, available configs are:
CONNECTION: the engine will not be shared but only used by the current client connection, and the engine will be launched by session user.
USER: the engine will be shared by all sessions created by a unique username, and the engine will be launched by session user.
GROUP: the engine will be shared by all sessions created by all users belong to the same primary group name. The engine will be launched by the primary group name as the effective username, so here the group name is in value of special user who is able to visit the computing resources/data of the team. It follows the [Hadoop GroupsMapping](https://reurl.cc/xE61Y5) to map user to a primary group. If the primary group is not found, it fallback to the USER level.
SERVER: the engine will be shared by Kyuubi servers, and the engine will be launched by Server's user.
See also `kyuubi.engine.share.level.subdomain` and `kyuubi.engine.doAs.enabled`. | string | 1.2.0 |
+| kyuubi.engine.share.level.sub.domain | <undefined> | (deprecated) - Using kyuubi.engine.share.level.subdomain instead | string | 1.2.0 |
+| kyuubi.engine.share.level.subdomain | <undefined> | Allow end-users to create a subdomain for the share level of an engine. A subdomain is a case-insensitive string values that must be a valid zookeeper subpath. For example, for the `USER` share level, an end-user can share a certain engine within a subdomain, not for all of its clients. End-users are free to create multiple engines in the `USER` share level. When disable engine pool, use 'default' if absent. | string | 1.4.0 |
+| kyuubi.engine.single.spark.session | false | When set to true, this engine is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database. | boolean | 1.3.0 |
+| kyuubi.engine.spark.connect.grpc.bind.host | <undefined> | Hostname or IP of the machine on which to run the grpc server in frontend service | string | 1.9.0 |
+| kyuubi.engine.spark.connect.grpc.bind.port | 15002 | The port is used in spark connect frontendService start GrpcServer | int | 1.9.0 |
+| kyuubi.engine.spark.connect.grpc.max.inbound.message.size | 134217728 | Sets the maximum inbound message in bytes size for the gRPC requests.Requests with a larger payload will fail. | int | 1.9.0 |
+| kyuubi.engine.spark.event.loggers | SPARK | A comma-separated list of engine loggers, where engine/session/operation etc events go.
SPARK: the events will be written to the Spark listener bus.
JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
JDBC: to be done
CUSTOM: to be done.
| seq | 1.7.0 |
+| kyuubi.engine.spark.initialize.sql | SHOW DATABASES | The initialize sql for Spark engine. It fallback to `kyuubi.engine.initialize.sql`. | seq | 1.8.1 |
+| kyuubi.engine.spark.operation.incremental.collect | false | When true, the result will be sequentially calculated and returned to the Spark driver. Note that, kyuubi.operation.result.max.rows will be ignored on incremental collect mode. It fallback to `kyuubi.operation.incremental.collect` | boolean | 1.10.0 |
+| kyuubi.engine.spark.output.mode | AUTO | The output mode of Spark engine:
AUTO: For PySpark, the extracted `text/plain` from python response as output.
NOTEBOOK: For PySpark, the original python response as output.
| string | 1.9.0 |
+| kyuubi.engine.spark.python.env.archive | <undefined> | Portable Python env archive used for Spark engine Python language mode. | string | 1.7.0 |
+| kyuubi.engine.spark.python.env.archive.exec.path | bin/python | The Python exec path under the Python env archive. | string | 1.7.0 |
+| kyuubi.engine.spark.python.home.archive | <undefined> | Spark archive containing $SPARK_HOME/python directory, which is used to init session Python worker for Python language mode. | string | 1.7.0 |
+| kyuubi.engine.submit.timeout | PT30S | Period to tolerant Driver Pod ephemerally invisible after submitting. In some Resource Managers, e.g. K8s, the Driver Pod is not visible immediately after `spark-submit` is returned. | duration | 1.7.1 |
+| kyuubi.engine.trino.connection.keystore.password | <undefined> | The keystore password used for connecting to trino cluster | string | 1.8.0 |
+| kyuubi.engine.trino.connection.keystore.path | <undefined> | The keystore path used for connecting to trino cluster | string | 1.8.0 |
+| kyuubi.engine.trino.connection.keystore.type | <undefined> | The keystore type used for connecting to trino cluster | string | 1.8.0 |
+| kyuubi.engine.trino.connection.password | <undefined> | The password used for connecting to trino cluster | string | 1.8.0 |
+| kyuubi.engine.trino.connection.truststore.password | <undefined> | The truststore password used for connecting to trino cluster | string | 1.8.0 |
+| kyuubi.engine.trino.connection.truststore.path | <undefined> | The truststore path used for connecting to trino cluster | string | 1.8.0 |
+| kyuubi.engine.trino.connection.truststore.type | <undefined> | The truststore type used for connecting to trino cluster | string | 1.8.0 |
+| kyuubi.engine.trino.connection.user | <undefined> | The user used for connecting to trino cluster | string | 1.9.0 |
+| kyuubi.engine.trino.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go.
JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
JDBC: to be done
CUSTOM: to be done.
| seq | 1.7.0 |
+| kyuubi.engine.trino.extra.classpath | <undefined> | The extra classpath for the Trino query engine, for configuring other libs which may need by the Trino engine | string | 1.6.0 |
+| kyuubi.engine.trino.java.options | <undefined> | The extra Java options for the Trino query engine | string | 1.6.0 |
+| kyuubi.engine.trino.memory | 1g | The heap memory for the Trino query engine | string | 1.6.0 |
+| kyuubi.engine.trino.operation.incremental.collect | false | When true, the result will be sequentially calculated and returned to the trino. It fallback to `kyuubi.operation.incremental.collect` | boolean | 1.10.0 |
+| kyuubi.engine.type | SPARK_SQL | Specify the detailed engine supported by Kyuubi. The engine type bindings to SESSION scope. This configuration is experimental. Currently, available configs are:
SPARK_SQL: specify this engine type will launch a Spark engine which can provide all the capacity of the Apache Spark. Note, it's a default engine type.
FLINK_SQL: specify this engine type will launch a Flink engine which can provide all the capacity of the Apache Flink.
TRINO: specify this engine type will launch a Trino engine which can provide all the capacity of the Trino.
HIVE_SQL: specify this engine type will launch a Hive engine which can provide all the capacity of the Hive Server2.
JDBC: specify this engine type will launch a JDBC engine which can forward queries to the database system through the certain JDBC driver, for now, it supports Doris, MySQL, Phoenix, PostgreSQL, StarRocks, Impala and ClickHouse.
CHAT: specify this engine type will launch a Chat engine.
| string | 1.4.0 |
+| kyuubi.engine.ui.retainedSessions | 200 | The number of SQL client sessions kept in the Kyuubi Query Engine web UI. | int | 1.4.0 |
+| kyuubi.engine.ui.retainedStatements | 200 | The number of statements kept in the Kyuubi Query Engine web UI. | int | 1.4.0 |
+| kyuubi.engine.ui.stop.enabled | true | When true, allows Kyuubi engine to be killed from the Spark Web UI. | boolean | 1.3.0 |
+| kyuubi.engine.user.isolated.spark.session | true | When set to false, if the engine is running in a group or server share level, all the JDBC/ODBC connections will be isolated against the user. Including the temporary views, function registries, SQL configuration, and the current database. Note that, it does not affect if the share level is connection or user. | boolean | 1.6.0 |
+| kyuubi.engine.user.isolated.spark.session.idle.interval | PT1M | The interval to check if the user-isolated Spark session is timeout. | duration | 1.6.0 |
+| kyuubi.engine.user.isolated.spark.session.idle.timeout | PT6H | If kyuubi.engine.user.isolated.spark.session is false, we will release the Spark session if its corresponding user is inactive after this configured timeout. | duration | 1.6.0 |
+| kyuubi.engine.yarn.app.name | <undefined> | The YARN app name when the engine deploy mode is YARN. | string | 1.9.0 |
+| kyuubi.engine.yarn.cores | 1 | kyuubi engine container core number when the engine deploy mode is YARN. | int | 1.9.0 |
+| kyuubi.engine.yarn.java.options | <undefined> | The extra Java options for the AM when the engine deploy mode is YARN. | string | 1.9.0 |
+| kyuubi.engine.yarn.memory | 1024 | kyuubi engine container memory in mb when the engine deploy mode is YARN. | int | 1.9.0 |
+| kyuubi.engine.yarn.priority | <undefined> | kyuubi engine yarn priority when the engine deploy mode is YARN. | int | 1.9.0 |
+| kyuubi.engine.yarn.queue | default | kyuubi engine yarn queue when the engine deploy mode is YARN. | string | 1.9.0 |
+| kyuubi.engine.yarn.stagingDir | <undefined> | Staging directory used while submitting kyuubi engine to YARN, It should be a absolute path in HDFS. | string | 1.9.0 |
+| kyuubi.engine.yarn.submit.timeout | PT30S | The engine submit timeout for YARN application. | duration | 1.7.2 |
+| kyuubi.engine.yarn.tags | <undefined> | kyuubi engine yarn tags when the engine deploy mode is YARN. | seq | 1.9.0 |
### Event
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 344806dc943..b01f5d44a45 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1064,6 +1064,21 @@ object KyuubiConf {
.transformToLowerCase
.createWithDefault(SaslQOP.AUTH.toString)
+ val GRPC_FRONTEND_BIND_HOST: OptionalConfigEntry[String] =
+ buildConf("kyuubi.grpc.frontend.bind.host")
+ .doc("Hostname or IP of the machine on which to run the grpc frontend services.")
+ .version("1.0.0")
+ .serverOnly
+ .stringConf
+ .createOptional
+
+ val GRPC_FRONTEND_SPARK_CONNECT_HOST: ConfigEntry[Option[String]] =
+ buildConf("kyuubi.grpc.frontend.spark.connect.host")
+ .doc("Hostname or IP of the machine on which to run the Spark Connect Grpc frontend service.")
+ .version("1.4.0")
+ .serverOnly
+ .fallbackConf(FRONTEND_BIND_HOST)
+
val FRONTEND_REST_BIND_HOST: ConfigEntry[Option[String]] =
buildConf("kyuubi.frontend.rest.bind.host")
.doc("Hostname or IP of the machine on which to run the REST frontend service.")
@@ -2731,6 +2746,28 @@ object KyuubiConf {
.stringConf
.createWithDefault("ENGINE")
+ val ENGINE_SPARK_CONNECT_GRPC_BINDING_PORT: ConfigEntry[Int] =
+ buildConf("kyuubi.engine.spark.connect.grpc.bind.port")
+ .doc("The port is used in spark connect frontendService start GrpcServer")
+ .version("1.9.0")
+ .intConf
+ .createWithDefault(15002)
+
+ val ENGINE_SPARK_CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE: ConfigEntry[Int] =
+ buildConf("kyuubi.engine.spark.connect.grpc.max.inbound.message.size")
+ .doc("Sets the maximum inbound message in bytes size for the gRPC requests." +
+ "Requests with a larger payload will fail.")
+ .version("1.9.0")
+ .intConf
+ .createWithDefault(128 * 1024 * 1024)
+
+ val ENGINE_SPARK_CONNECT_GRPC_BINDING_HOST: ConfigEntry[Option[String]] =
+ buildConf("kyuubi.engine.spark.connect.grpc.bind.host")
+ .doc("Hostname or IP of the machine on which to run the grpc server in frontend service ")
+ .version("1.9.0")
+ .serverOnly
+ .fallbackConf(FRONTEND_BIND_HOST)
+
val ENGINE_SPARK_SHOW_PROGRESS: ConfigEntry[Boolean] =
buildConf("kyuubi.session.engine.spark.showProgress")
.doc("When true, show the progress bar in the Spark's engine log.")
diff --git a/kyuubi-grpc-server/pom.xml b/kyuubi-grpc-server/pom.xml
new file mode 100644
index 00000000000..03e84b8e444
--- /dev/null
+++ b/kyuubi-grpc-server/pom.xml
@@ -0,0 +1,243 @@
+
+
+
+ 4.0.0
+
+ org.apache.kyuubi
+ kyuubi-parent
+ 1.10.0-SNAPSHOT
+ ../pom.xml
+
+
+ kyuubi-grpc-server_${scala.binary.version}
+ jar
+ Kyuubi Project Grpc Server
+ https://kyuubi.apache.org/
+
+
+
+
+ org.apache.spark
+ spark-connect_2.13
+ 3.5.1
+
+
+ com.google.common
+ guava
+
+
+
+
+
+
+ com.google.guava
+ failureaccess
+ 1.0.1
+
+
+
+
+ org.apache.spark
+ spark-network-common_2.13
+ 3.5.1
+
+
+
+ org.apache.kyuubi
+ kyuubi-grpc_${scala.binary.version}
+ ${project.version}
+
+
+
+ org.apache.kyuubi
+ kyuubi-common_${scala.binary.version}
+ ${project.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.slf4j
+ jcl-over-slf4j
+
+
+ org.slf4j
+ jul-to-slf4j
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+
+
+ org.apache.logging.log4j
+ log4j-api
+
+
+ org.apache.logging.log4j
+ log4j-core
+
+
+ org.apache.logging.log4j
+ log4j-1.2-api
+
+
+
+
+
+ org.apache.kyuubi
+ kyuubi-common_${scala.binary.version}
+ ${project.version}
+ test-jar
+ test
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.slf4j
+ jcl-over-slf4j
+
+
+ org.slf4j
+ jul-to-slf4j
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+
+
+ org.apache.logging.log4j
+ log4j-api
+
+
+ org.apache.logging.log4j
+ log4j-core
+
+
+ org.apache.logging.log4j
+ log4j-1.2-api
+
+
+
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+
+
+
+ io.grpc
+ grpc-core
+
+
+ io.grpc
+ grpc-stub
+
+
+
+ javax.servlet
+ javax.servlet-api
+
+
+
+ org.apache.kyuubi
+ kyuubi-grpc-shade
+ ${project.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-patch-plugin
+ 1.2
+
+
+ mylittle.patch
+
+
+
+
+ patch
+ process-sources
+
+ apply
+
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+
+
+ add-scala-sources
+
+ add-source
+
+ generate-sources
+
+
+ src/main/scala-${scala.binary.version}
+
+
+
+
+ add-scala-test-sources
+
+ add-test-source
+
+ generate-test-sources
+
+
+ src/test/scala-${scala.binary.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+ prepare-test-jar
+
+ test-jar
+
+ test-compile
+
+
+
+
+ target/scala-${scala.binary.version}/classes
+ target/scala-${scala.binary.version}/test-classes
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.6.2
+
+
+
+
diff --git a/kyuubi-grpc-server/src/main/resources/log4j2.xml b/kyuubi-grpc-server/src/main/resources/log4j2.xml
new file mode 100644
index 00000000000..6fa1ea32f63
--- /dev/null
+++ b/kyuubi-grpc-server/src/main/resources/log4j2.xml
@@ -0,0 +1,29 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/event/KyuubiGrpcOperationEventsManager.scala b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/event/KyuubiGrpcOperationEventsManager.scala
new file mode 100644
index 00000000000..f8f88ccf434
--- /dev/null
+++ b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/event/KyuubiGrpcOperationEventsManager.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.event
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.grpc.events.OperationEventsManager
+import org.apache.kyuubi.grpc.operation.GrpcOperation
+import org.apache.kyuubi.grpc.utils.Clock
+
+class KyuubiGrpcOperationEventsManager(operation: GrpcOperation, clock: Clock)
+ extends OperationEventsManager(operation, clock) with Logging {
+
+ override def postStarted(): Unit = {
+ super.postStarted()
+ info("Operation Event: post Started")
+ }
+
+ override def postClosed(): Unit = {
+ info("Operation Event: post Closed")
+ super.postClosed()
+ }
+}
diff --git a/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/event/KyuubiGrpcSessionEventsManager.scala b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/event/KyuubiGrpcSessionEventsManager.scala
new file mode 100644
index 00000000000..a47f5035fb2
--- /dev/null
+++ b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/event/KyuubiGrpcSessionEventsManager.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.event
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.grpc.events.SessionEventsManager
+import org.apache.kyuubi.grpc.session.KyuubiGrpcSession
+import org.apache.kyuubi.grpc.utils.Clock
+
+class KyuubiGrpcSessionEventsManager(session: KyuubiGrpcSession, clock: Clock)
+ extends SessionEventsManager(session, clock) with Logging {
+
+ override def postStarted(): Unit = {
+ super.postStarted()
+ info("Session Event: post Started")
+ }
+
+ override def postClosed(): Unit = {
+ info("Session Event: post Closed")
+ super.postClosed()
+ }
+}
diff --git a/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/operation/KyuubiGrpcOperation.scala b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/operation/KyuubiGrpcOperation.scala
new file mode 100644
index 00000000000..e30da905a27
--- /dev/null
+++ b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/operation/KyuubiGrpcOperation.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.operation
+
+import org.apache.kyuubi.grpc.events.OperationEventsManager
+import org.apache.kyuubi.grpc.session.KyuubiGrpcSession
+import org.apache.kyuubi.operation.log.OperationLog
+
+abstract class KyuubiGrpcOperation(session: KyuubiGrpcSession)
+ extends AbstractGrpcOperation[KyuubiGrpcSession](session) {
+
+ override def key: OperationKey = OperationKey(session.sessionKey)
+
+ override def beforeRun(): Unit = {
+ info("beforeRun, currently empty")
+ }
+
+ override def afterRun(): Unit = {
+ info("afterRun, currently empty")
+ }
+
+ override def close(): Unit = {
+ info("close, currently empty")
+ }
+
+ override def operationEventsManager: OperationEventsManager = {
+ null
+ }
+
+ override def interrupt(): Unit = {
+ info("interrupt, currently empty")
+ }
+
+ override def isTimedOut: Boolean = false
+
+ override def getOperationLog: Option[OperationLog] = {
+ Some(null)
+ }
+}
diff --git a/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/operation/KyuubiGrpcOperationManager.scala b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/operation/KyuubiGrpcOperationManager.scala
new file mode 100644
index 00000000000..5622ac15c32
--- /dev/null
+++ b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/operation/KyuubiGrpcOperationManager.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.operation
+
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.kyuubi.grpc.operation.spark.Config
+import org.apache.kyuubi.grpc.session.KyuubiGrpcSession
+import org.apache.kyuubi.shade.org.apache.spark.connect.proto.{ConfigRequest, ConfigResponse}
+
+class KyuubiGrpcOperationManager(name: String) extends GrpcOperationManager(name) {
+ def this() = this(classOf[KyuubiGrpcOperationManager].getSimpleName)
+
+ def newSparkConfigOperation(
+ channel: ManagedChannel,
+ session: KyuubiGrpcSession,
+ request: ConfigRequest,
+ responseObserver: StreamObserver[ConfigResponse]): GrpcOperation = {
+ val operation =
+ new Config(channel, session, request, responseObserver)
+ addOperation(operation)
+ }
+}
diff --git a/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/operation/spark/Config.scala b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/operation/spark/Config.scala
new file mode 100644
index 00000000000..6b6ac4a8abd
--- /dev/null
+++ b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/operation/spark/Config.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.operation.spark
+
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.kyuubi.grpc.session.KyuubiGrpcSession
+import org.apache.kyuubi.shade.org.apache.spark.connect.proto.{ConfigRequest, ConfigResponse}
+
+class Config(
+ channel: ManagedChannel,
+ session: KyuubiGrpcSession,
+ request: ConfigRequest,
+ responseObserver: StreamObserver[ConfigResponse])
+ extends KyuubiSparkGrpcOperation(channel, session) {
+ override protected def runInternal(): Unit = {
+ stub.config(request, responseObserver)
+ }
+}
diff --git a/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/operation/spark/KyuubiSparkGrpcOperation.scala b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/operation/spark/KyuubiSparkGrpcOperation.scala
new file mode 100644
index 00000000000..44a7fd4fc93
--- /dev/null
+++ b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/operation/spark/KyuubiSparkGrpcOperation.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.operation.spark
+
+import io.grpc.ManagedChannel
+
+import org.apache.kyuubi.grpc.operation.KyuubiGrpcOperation
+import org.apache.kyuubi.grpc.session.KyuubiGrpcSession
+import org.apache.kyuubi.shade.org.apache.spark.connect.proto.SparkConnectServiceGrpc
+import org.apache.kyuubi.shade.org.apache.spark.connect.proto.SparkConnectServiceGrpc.SparkConnectServiceStub
+
+abstract class KyuubiSparkGrpcOperation(channel: ManagedChannel, session: KyuubiGrpcSession)
+ extends KyuubiGrpcOperation(session) {
+
+ def stub: SparkConnectServiceStub = SparkConnectServiceGrpc.newStub(channel)
+}
diff --git a/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/server/AbstractKyuubiGrpcFrontendService.scala b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/server/AbstractKyuubiGrpcFrontendService.scala
new file mode 100644
index 00000000000..47452c4e2d6
--- /dev/null
+++ b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/server/AbstractKyuubiGrpcFrontendService.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.server
+
+import io.grpc.ManagedChannel
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.grpc.service.AbstractGrpcFrontendService
+
+abstract class AbstractKyuubiGrpcFrontendService(grpcSeverable: KyuubiGrpcSeverable, name: String)
+ extends AbstractGrpcFrontendService(name) {
+
+ var host = ""
+ var port = 0
+
+ def channel: ManagedChannel
+
+ def startEngine(): (String, Int)
+
+ override def initialize(conf: KyuubiConf): Unit = {
+ val serverInfo = startEngine()
+ host = serverInfo._1
+ port = serverInfo._2
+ super.initialize(conf)
+ }
+
+ override val serverable: KyuubiGrpcSeverable = grpcSeverable
+
+ def grpcBe: KyuubiGrpcBackendService = be.asInstanceOf[KyuubiGrpcBackendService]
+
+}
diff --git a/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/server/KyuubiGrpcBackendService.scala b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/server/KyuubiGrpcBackendService.scala
new file mode 100644
index 00000000000..68030a14a2b
--- /dev/null
+++ b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/server/KyuubiGrpcBackendService.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.server
+
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.kyuubi.grpc.service.AbstractGrpcBackendService
+import org.apache.kyuubi.grpc.session.{GrpcSessionManager, KyuubiGrpcSession, KyuubiGrpcSessionManager, SessionKey}
+import org.apache.kyuubi.shade.org.apache.spark.connect.proto._
+
+class KyuubiGrpcBackendService(name: String) extends AbstractGrpcBackendService(name) {
+
+ def config(
+ sessionKey: SessionKey,
+ request: ConfigRequest,
+ responseObserver: StreamObserver[ConfigResponse],
+ channel: ManagedChannel): Unit = {
+ grpcSessionManager.openSession(sessionKey)
+ .config(request, responseObserver, channel)
+ }
+
+ def this() = this(classOf[KyuubiGrpcBackendService].getSimpleName)
+
+ override def grpcSessionManager: GrpcSessionManager[KyuubiGrpcSession] =
+ new KyuubiGrpcSessionManager()
+}
diff --git a/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/server/KyuubiGrpcServer.scala b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/server/KyuubiGrpcServer.scala
new file mode 100644
index 00000000000..f65f2fae4ec
--- /dev/null
+++ b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/server/KyuubiGrpcServer.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.server
+
+class KyuubiGrpcServer(name: String)
+ extends KyuubiGrpcSeverable(name) {
+ override val backendService: KyuubiGrpcBackendService = new KyuubiGrpcBackendService()
+ override val frontendServices: Seq[AbstractKyuubiGrpcFrontendService] = Seq.empty
+
+ def frontendInfo(): (String, Int) = {
+ (frontendServices.head.host, frontendServices.head.port)
+ }
+}
diff --git a/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/server/KyuubiGrpcSeverable.scala b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/server/KyuubiGrpcSeverable.scala
new file mode 100644
index 00000000000..a23b6f92427
--- /dev/null
+++ b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/server/KyuubiGrpcSeverable.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.server
+
+import org.apache.kyuubi.grpc.service.GrpcSeverable
+
+abstract class KyuubiGrpcSeverable(name: String)
+ extends GrpcSeverable[KyuubiGrpcBackendService, AbstractKyuubiGrpcFrontendService](name) {
+
+ override protected def stopServer(): Unit = {
+ info("stop")
+ }
+}
diff --git a/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/server/KyuubiSparkConnectFrontendService.scala b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/server/KyuubiSparkConnectFrontendService.scala
new file mode 100644
index 00000000000..8cbf30177a0
--- /dev/null
+++ b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/server/KyuubiSparkConnectFrontendService.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.server
+
+import scala.jdk.CollectionConverters._
+
+import com.google.protobuf.MessageLite
+import io.grpc._
+import io.grpc.stub.StreamObserver
+
+import org.apache.kyuubi.grpc.session.SessionKey
+import org.apache.kyuubi.service.Service
+import org.apache.kyuubi.shade.org.apache.spark.connect.proto.{ConfigRequest, ConfigResponse, SparkConnectServiceGrpc}
+
+abstract class KyuubiSparkConnectFrontendService(grpcSeverable: KyuubiGrpcSeverable, name: String)
+ extends AbstractKyuubiGrpcFrontendService(grpcSeverable, name)
+ with SparkConnectServiceGrpc.AsyncService {
+
+ override def channel: ManagedChannel = {
+ Grpc.newChannelBuilderForAddress(
+ host,
+ port,
+ InsecureChannelCredentials.create()).build()
+ }
+
+ override protected def serverHost: Option[String] = Some("localhost")
+
+ override def bindService(): ServerServiceDefinition = {
+ val serviceDef = SparkConnectServiceGrpc.bindService(this)
+ val builder = ServerServiceDefinition.builder(serviceDef.getServiceDescriptor.getName)
+ serviceDef.getMethods.asScala
+ .asInstanceOf[Iterable[ServerMethodDefinition[MessageLite, MessageLite]]]
+ .foreach(method =>
+ builder.addMethod(
+ methodWithCustomMarshallers(method.getMethodDescriptor),
+ method.getServerCallHandler))
+ builder.build()
+ }
+
+ override val discoveryService: Option[Service] = None
+
+ override def config(
+ request: ConfigRequest,
+ responseObserver: StreamObserver[ConfigResponse]): Unit = {
+ val sessionKey = new SessionKey(request.getUserContext.getUserId, request.getSessionId)
+ grpcBe.config(sessionKey, request, responseObserver, channel)
+ }
+
+}
diff --git a/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/session/KyuubiGrpcSession.scala b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/session/KyuubiGrpcSession.scala
new file mode 100644
index 00000000000..93baa33b39e
--- /dev/null
+++ b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/session/KyuubiGrpcSession.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.session
+
+import scala.util.Random
+
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.kyuubi.grpc.event.KyuubiGrpcSessionEventsManager
+import org.apache.kyuubi.grpc.events.SessionEventsManager
+import org.apache.kyuubi.grpc.operation.{GrpcOperation, OperationKey}
+import org.apache.kyuubi.grpc.utils.SystemClock
+import org.apache.kyuubi.shade.org.apache.spark.connect.proto._
+
+class KyuubiGrpcSession(
+ userId: String,
+ sessionManager: KyuubiGrpcSessionManager)
+ extends AbstractGrpcSession(userId) {
+
+ def config(
+ request: ConfigRequest,
+ responseObserver: StreamObserver[ConfigResponse],
+ channel: ManagedChannel): Unit = {
+ val operation = sessionManager.grpcOperationManager
+ .newSparkConfigOperation(channel, this, request, responseObserver)
+ runGrpcOperation(operation)
+ }
+
+ override def name: Option[String] = Some("KyuubiGrpcSessionImpl")
+
+ override def serverSessionId: String = Random.nextString(10)
+
+ override def sessionManager: GrpcSessionManager[KyuubiGrpcSession] = sessionManager
+
+ override def sessionEventsManager: SessionEventsManager =
+ new KyuubiGrpcSessionEventsManager(this, new SystemClock())
+
+ override def getOperation(operationKey: OperationKey): GrpcOperation =
+ sessionManager.grpcOperationManager.getOperation(operationKey)
+}
diff --git a/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/session/KyuubiGrpcSessionManager.scala b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/session/KyuubiGrpcSessionManager.scala
new file mode 100644
index 00000000000..2d06d1329bb
--- /dev/null
+++ b/kyuubi-grpc-server/src/main/scala/org/apache/kyuubi/grpc/session/KyuubiGrpcSessionManager.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.session
+
+import org.apache.kyuubi.grpc.operation.KyuubiGrpcOperationManager
+
+class KyuubiGrpcSessionManager
+ extends GrpcSessionManager[KyuubiGrpcSession]("KyuubiGrpcSessionManager") {
+
+ override def isServer: Boolean = true
+
+ override def grpcOperationManager: KyuubiGrpcOperationManager = new KyuubiGrpcOperationManager()
+
+ override def getOrCreateSession(key: SessionKey): KyuubiGrpcSession = {
+ val session = new KyuubiGrpcSession(key.userId, this)
+ session
+ }
+}
diff --git a/kyuubi-grpc-server/src/test/scala/org/apache/spark/sql/connect/SparkConnectSuite.scala b/kyuubi-grpc-server/src/test/scala/org/apache/spark/sql/connect/SparkConnectSuite.scala
new file mode 100644
index 00000000000..42c6517276e
--- /dev/null
+++ b/kyuubi-grpc-server/src/test/scala/org/apache/spark/sql/connect/SparkConnectSuite.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect
+
+import org.apache.spark.connect.proto.ConfigRequest
+import org.apache.spark.sql.connect.client.SparkConnectClient
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+
+class SparkConnectSuite extends KyuubiFunSuite {
+ val server: TestServer = new TestServer("testServer")
+ var client: SparkConnectClient = _
+ override def beforeAll(): Unit = {
+ val conf = KyuubiConf().set(KyuubiConf.ENGINE_SPARK_CONNECT_GRPC_BINDING_PORT, 10023)
+ server.initialize(conf)
+ server.start()
+ client = SparkConnectClient.builder().port(server.frontendInfo()._2).build()
+ super.beforeAll()
+
+ }
+
+ test("test config") {
+ val request = ConfigRequest.newBuilder()
+ .setSessionId("abc123")
+ .build()
+
+ val response = client.config(request.getOperation)
+ assert(response.getSessionId === "abc123")
+ }
+}
diff --git a/kyuubi-grpc-server/src/test/scala/org/apache/spark/sql/connect/SparkConnectUtil.scala b/kyuubi-grpc-server/src/test/scala/org/apache/spark/sql/connect/SparkConnectUtil.scala
new file mode 100644
index 00000000000..740707156b7
--- /dev/null
+++ b/kyuubi-grpc-server/src/test/scala/org/apache/spark/sql/connect/SparkConnectUtil.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect
+
+import java.net.InetSocketAddress
+import java.time.Instant
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connect.service._
+
+object SparkConnectUtil {
+ def startSparkConnectServer(): (String, Int) = {
+ val _sparkConf = new SparkConf()
+ _sparkConf.setIfMissing("spark.sql.binaryOutputStyle", "UTF8")
+ _sparkConf.setIfMissing("spark.master", "local")
+ val appName = s"kyuubi_test_spark_${Instant.now}"
+ _sparkConf.setIfMissing("spark.app.name", appName)
+ val session = SparkSession.builder().config(_sparkConf).getOrCreate()
+ var isa: InetSocketAddress = null
+ SparkConnectService.start(session.sparkContext)
+ ("localhost", 15002)
+ }
+}
diff --git a/kyuubi-grpc-server/src/test/scala/org/apache/spark/sql/connect/TestFrontendService.scala b/kyuubi-grpc-server/src/test/scala/org/apache/spark/sql/connect/TestFrontendService.scala
new file mode 100644
index 00000000000..2020f8945eb
--- /dev/null
+++ b/kyuubi-grpc-server/src/test/scala/org/apache/spark/sql/connect/TestFrontendService.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect
+
+import org.apache.kyuubi.grpc.server.{KyuubiGrpcSeverable, KyuubiSparkConnectFrontendService}
+
+class TestFrontendService(grpcSeverable: KyuubiGrpcSeverable, name: String)
+ extends KyuubiSparkConnectFrontendService(grpcSeverable, name) {
+ override def startEngine(): (String, Int) = {
+ SparkConnectUtil.startSparkConnectServer()
+ }
+}
diff --git a/kyuubi-grpc-server/src/test/scala/org/apache/spark/sql/connect/TestServer.scala b/kyuubi-grpc-server/src/test/scala/org/apache/spark/sql/connect/TestServer.scala
new file mode 100644
index 00000000000..4c1031771a8
--- /dev/null
+++ b/kyuubi-grpc-server/src/test/scala/org/apache/spark/sql/connect/TestServer.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect
+
+import org.apache.kyuubi.grpc.server.{AbstractKyuubiGrpcFrontendService, KyuubiGrpcServer}
+
+class TestServer(name: String) extends KyuubiGrpcServer(name) {
+
+ override val frontendServices: Seq[AbstractKyuubiGrpcFrontendService] = Seq(
+ new TestFrontendService(this, "TestFrontendService"))
+}
diff --git a/kyuubi-grpc-shade/pom.xml b/kyuubi-grpc-shade/pom.xml
new file mode 100644
index 00000000000..dc6c03c2601
--- /dev/null
+++ b/kyuubi-grpc-shade/pom.xml
@@ -0,0 +1,123 @@
+
+
+
+ 4.0.0
+
+ org.apache.kyuubi
+ kyuubi-parent
+ 1.10.0-SNAPSHOT
+
+
+ kyuubi-grpc-shade
+ jar
+ Kyuubi Project Grpc Shade(Only for test before merge)
+ https://kyuubi.apache.org/
+
+
+
+ org.apache.spark
+ spark-connect-common_2.13
+ 3.5.1
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+ false
+ true
+ true
+ false
+
+
+ org.apache.spark:spark-connect-common_2.13
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+ META-INF/DEPENDENCIES
+ META-INF/LICENSE.txt
+ META-INF/NOTICE.txt
+ META-INF/maven/**
+ META-INF/native-image/**
+ LICENSE.txt
+ NOTICE.txt
+ mozilla/**
+ arrow-git.properties
+
+
+
+
+
+ org.apache.spark.connect
+ ${kyuubi.shade.packageName}.org.apache.spark.connect
+
+
+
+
+
+
+
+ shade
+
+ package
+
+
+
+
+
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+
+
+ compile
+
+ attach-artifact
+
+ package
+
+
+
+ ${basedir}/target/${project.artifactId}-${project.version}.jar
+ jar
+ optional
+
+
+
+
+
+
+
+ target/scala-${scala.binary.version}/classes
+ target/scala-${scala.binary.version}/test-classes
+
+
diff --git a/kyuubi-grpc-shade/src/main/resources/META-INF/LICENSE b/kyuubi-grpc-shade/src/main/resources/META-INF/LICENSE
new file mode 100644
index 00000000000..9e1d666a8f2
--- /dev/null
+++ b/kyuubi-grpc-shade/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,226 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+------------------------------------------------------------------------------------
+
+This project bundles some components that are licensed under the
+
+Apache License Version 2.0
+--------------------------
+org.apache.hive:hive-service-rpc
+com.google.guava:failureaccess
+com.google.guava:guava
+commons-codec:commons-codec
+org.apache.commons:commons-lang3
+org.apache.curator:curator-framework
+org.apache.curator:curator-client
+org.apache.httpcomponents:httpclient
+org.apache.httpcomponents:httpcore
+org.apache.thrift:fb303
+org.apache.thrift:libthrift
+org.apache.zookeeper:zookeeper
+
+MIT License
+-----------
+org.slf4j:slf4j-api
+org.slf4j:jcl-over-slf4j
diff --git a/kyuubi-grpc-shade/src/main/resources/META-INF/NOTICE b/kyuubi-grpc-shade/src/main/resources/META-INF/NOTICE
new file mode 100644
index 00000000000..cf2047d5ee2
--- /dev/null
+++ b/kyuubi-grpc-shade/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,105 @@
+Apache Kyuubi
+Copyright 2021-2022 The Apache Software Foundation.
+
+This product includes software developed at
+The Apache Software Foundation (https://www.apache.org/).
+
+Apache Iceberg
+Copyright 2017-2022 The Apache Software Foundation
+
+Apache Parquet MR
+Copyright 2014-2024 The Apache Software Foundation
+
+This project includes code from Kite, developed at Cloudera, Inc. with
+the following copyright notice:
+
+| Copyright 2013 Cloudera Inc.
+|
+| Licensed under the Apache License, Version 2.0 (the "License");
+| you may not use this file except in compliance with the License.
+| You may obtain a copy of the License at
+|
+| http://www.apache.org/licenses/LICENSE-2.0
+|
+| Unless required by applicable law or agreed to in writing, software
+| distributed under the License is distributed on an "AS IS" BASIS,
+| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+| See the License for the specific language governing permissions and
+| limitations under the License.
+
+Apache Spark
+Copyright 2014 and onwards The Apache Software Foundation.
+
+--------------------------------------------------------------------------------
+
+This binary artifact contains Apache Commons Codec
+
+Apache Commons Codec
+Copyright 2002-2020 The Apache Software Foundation
+
+| src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+| contains test data from http://aspell.net/test/orig/batch0.tab.
+| Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
+|
+| ===============================================================================
+|
+| The content of package org.apache.commons.codec.language.bm has been translated
+| from the original php source code available at http://stevemorse.org/phoneticinfo.htm
+| with permission from the original authors.
+| Original source copyright:
+| Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
+--------------------------------------------------------------------------------
+
+This binary artifact contains
+
+Guava
+License: Apache License, 2.0
+Copyright (C) 2009 The Guava Authors
+
+Apache Commons Lang
+Copyright 2001-2020 The Apache Software Foundation
+
+Curator Client
+Copyright 2011-2017 The Apache Software Foundation
+
+Curator Framework
+Copyright 2011-2017 The Apache Software Foundation
+
+Hive Service RPC
+Copyright 2022 The Apache Software Foundation
+
+Apache HttpClient
+Copyright 1999-2020 The Apache Software Foundation
+
+Apache HttpCore
+Copyright 2005-2020 The Apache Software Foundation
+
+Apache Thrift
+Copyright 2006-2010 The Apache Software Foundation.
+
+Apache Zookeeper
+Copyright 2009-2019 The Apache Software Foundation.
+
+Apache Arrow
+Copyright 2016-2019 The Apache Software Foundation
+
+--------------------------------------------------------------------------------
+
+This binary artifact contains Jackson JSON processor
+
+Jackson is a high-performance, Free/Open Source JSON processing library.
+It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+been in development since 2007.
+It is currently developed by a community of developers.
+
+## Licensing
+
+Jackson 2.x core and extension components are licensed under Apache License 2.0
+To find the details that apply to this artifact see the accompanying LICENSE file.
+
+## Credits
+
+A list of contributors may be found from CREDITS(-2.x) file, which is included
+in some artifacts (usually source distributions); but is always available
+from the source code management (SCM) system project uses.
diff --git a/kyuubi-grpc/pom.xml b/kyuubi-grpc/pom.xml
new file mode 100644
index 00000000000..0e8d1764696
--- /dev/null
+++ b/kyuubi-grpc/pom.xml
@@ -0,0 +1,407 @@
+
+
+
+ 4.0.0
+
+ org.apache.kyuubi
+ kyuubi-parent
+ 1.10.0-SNAPSHOT
+
+
+ kyuubi-grpc_${scala.binary.version}
+ jar
+ Kyuubi Project Grpc
+ https://kyuubi.apache.org/
+
+
+
+ org.apache.kyuubi
+ kyuubi-common_${scala.binary.version}
+ ${project.version}
+
+
+
+ org.apache.kyuubi
+ kyuubi-ha_${scala.binary.version}
+ ${project.version}
+
+
+
+ io.grpc
+ grpc-core
+
+
+
+ io.grpc
+ grpc-protobuf
+
+
+ io.grpc
+ grpc-util
+
+
+
+ io.grpc
+ grpc-stub
+
+
+
+ com.google.protobuf
+ protobuf-java
+ ${protobuf.version}
+ compile
+
+
+
+ org.scala-lang
+ scala-compiler
+ provided
+
+
+
+ org.scala-lang
+ scala-reflect
+ provided
+
+
+
+ org.apache.kyuubi
+ kyuubi-common_${scala.binary.version}
+ ${project.version}
+ test-jar
+ test
+
+
+
+ org.scala-lang.modules
+ scala-collection-compat_${scala.binary.version}
+
+
+
+ commons-collections
+ commons-collections
+ test
+
+
+
+ commons-io
+ commons-io
+ test
+
+
+
+ com.dimafeng
+ testcontainers-scala-scalatest_${scala.binary.version}
+ test
+
+
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+
+
+ add-scala-sources
+
+ add-source
+
+ generate-sources
+
+
+ src/main/scala-${scala.binary.version}
+
+
+
+
+ add-scala-test-sources
+
+ add-test-source
+
+ generate-test-sources
+
+
+ src/test/scala-${scala.binary.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+ false
+
+
+ com.google.android:annotations
+ com.google.api.grpc:proto-google-common-protos
+ com.google.code.gson:gson
+ com.google.errorprone:error_prone_annotations
+ com.google.guava:*
+ com.google.j2objc:j2objc-annotations
+ com.google.protobuf:*
+ dev.failsafe:failsafe
+ io.etcd:*
+ io.grpc:*
+ io.netty:*
+ io.perfmark:perfmark-api
+ io.vertx:*
+ org.apache.kyuubi:*
+ org.checkerframework:checker-qual
+ org.codehaus.mojo:animal-sniffer-annotations
+
+
+
+
+ *:*
+
+ **/*.proto
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+ META-INF/DEPENDENCIES
+ META-INF/LICENSE.txt
+ META-INF/NOTICE.txt
+ META-INF/maven/**
+ LICENSE.txt
+ NOTICE.txt
+ mozilla/**
+ **/module-info.class
+
+
+
+
+
+ dev.failsafe
+ ${kyuubi.shade.packageName}.dev.failsafe
+
+ dev.failsafe.**
+
+
+
+ io.etcd
+ ${kyuubi.shade.packageName}.io.etcd
+
+ io.etcd.**
+
+
+
+ io.grpc
+ ${kyuubi.shade.packageName}.io.grpc
+
+
+ io.netty
+ ${kyuubi.shade.packageName}.io.netty
+
+ io.netty.**
+
+
+
+ io.perfmark
+ ${kyuubi.shade.packageName}.io.perfmark
+
+
+ io.vertx
+ ${kyuubi.shade.packageName}.io.vertx
+
+ io.vertx.**
+
+
+
+ android.annotation
+ ${kyuubi.shade.packageName}.android.annotation
+
+
+ com.google.common
+ ${kyuubi.shade.packageName}.com.google.common
+
+ com.google.common.**
+
+
+
+ com.google.thirdparty
+ ${kyuubi.shade.packageName}.com.google.thirdparty
+
+ com.google.thirdparty.**
+
+
+
+ com.google.protobuf
+ ${kyuubi.shade.packageName}.com.google.protobuf
+
+ com.google.protobuf.**
+
+
+
+ org.codehaus.mojo.animal_sniffer
+ ${kyuubi.shade.packageName}.org.codehaus.mojo.animal_sniffer
+
+
+ com.google.j2objc.annotations
+ ${kyuubi.shade.packageName}.com.google.j2objc.annotations
+
+
+ com.google.errorprone.annotations
+ ${kyuubi.shade.packageName}.com.google.errorprone.annotations
+
+
+ org.checkerframework
+ ${kyuubi.shade.packageName}.org.checkerframework
+
+
+ com.google.gson
+ ${kyuubi.shade.packageName}.com.google.gson
+
+
+
+ com.google.api
+ ${kyuubi.shade.packageName}.com.google.api
+
+
+ com.google.cloud
+ ${kyuubi.shade.packageName}.com.google.cloud
+
+
+ com.google.geo
+ ${kyuubi.shade.packageName}.com.google.geo
+
+
+ com.google.logging
+ ${kyuubi.shade.packageName}.com.google.logging
+
+
+ com.google.longrunning
+ ${kyuubi.shade.packageName}.com.google.longrunning
+
+
+ com.google.rpc
+ ${kyuubi.shade.packageName}.com.google.rpc
+
+
+ com.google.type
+ ${kyuubi.shade.packageName}.com.google.type
+
+
+
+
+
+
+
+
+
+ shade
+
+ package
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+
+
+ rename-native-library
+
+ run
+
+ package
+
+
+ unpacking netty jar
+
+ renaming netty native libraries
+
+
+
+
+ deleting META-INF/native-image folder
+
+ repackaging netty jar
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+ prepare-test-jar
+
+ test-jar
+
+ test-compile
+
+
+
+
+ target/scala-${scala.binary.version}/classes
+ target/scala-${scala.binary.version}/test-classes
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.6.2
+
+
+
+
+
+ default-protoc
+
+ true
+
+
+
+
+ org.xolstice.maven.plugins
+ protobuf-maven-plugin
+ 0.6.1
+
+ com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
+ grpc-java
+ io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+ src/test/resources/protobuf
+
+
+
+
+ compile
+ compile-custom
+ test-compile
+
+
+
+
+
+
+
+
+
diff --git a/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/events/OperationEventsManager.scala b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/events/OperationEventsManager.scala
new file mode 100644
index 00000000000..e427dbc20d6
--- /dev/null
+++ b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/events/OperationEventsManager.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.events
+
+import org.apache.kyuubi.grpc.operation.GrpcOperation
+import org.apache.kyuubi.grpc.session.GrpcSession
+import org.apache.kyuubi.grpc.utils.Clock
+
+object OperationEventsManager {
+ // TODO: make this configurable
+ val MAX_STATEMENT_TEXT_SIZE = 65535
+}
+
+sealed abstract class OperationStatus(value: Int)
+
+object OperationStatus {
+ case object Pending extends OperationStatus(0)
+ case object Started extends OperationStatus(1)
+ case object Analyzed extends OperationStatus(2)
+ case object ReadyForExecution extends OperationStatus(3)
+ case object Finished extends OperationStatus(4)
+ case object Failed extends OperationStatus(5)
+ case object Canceled extends OperationStatus(6)
+ case object Closed extends OperationStatus(7)
+}
+abstract class OperationEventsManager(operation: GrpcOperation, clock: Clock) {
+ private def operationId: String = operation.operationKey.operationId
+
+ private def session: GrpcSession = operation.grpcSession
+
+ private def sessionId: String = session.sessionKey.sessionId
+
+ private def sessionStatus = session.sessionEventsManager.status
+
+ protected var _status: OperationStatus = OperationStatus.Pending
+
+ private var error = Option.empty[Boolean]
+
+ private var canceled = Option.empty[Boolean]
+
+ private var producedRowCount = Option.empty[Long]
+
+ private def status: OperationStatus = _status
+
+ private def hasCanceled: Option[Boolean] = canceled
+
+ private def hasError: Option[Boolean] = error
+
+ private def getProduceRowCount: Option[Long] = producedRowCount
+
+ def postStarted(): Unit = {
+ assertStatus(List(OperationStatus.Pending), OperationStatus.Started)
+ }
+
+ def postAnalyzed(analyzedPlan: Option[Any] = None): Unit = {
+ assertStatus(List(OperationStatus.Started, OperationStatus.Analyzed), OperationStatus.Analyzed)
+ }
+
+ def postReadyForExecution(): Unit = {
+ assertStatus(List(OperationStatus.Analyzed), OperationStatus.ReadyForExecution)
+ }
+
+ def postCanceled(): Unit = {
+ assertStatus(
+ List(
+ OperationStatus.Started,
+ OperationStatus.Analyzed,
+ OperationStatus.ReadyForExecution,
+ OperationStatus.Finished,
+ OperationStatus.Failed),
+ OperationStatus.Canceled)
+ canceled = Some(true)
+ }
+
+ def postFailed(errorMessage: String): Unit = {
+ assertStatus(
+ List(
+ OperationStatus.Started,
+ OperationStatus.Analyzed,
+ OperationStatus.ReadyForExecution,
+ OperationStatus.Finished),
+ OperationStatus.Failed)
+ error = Some(true)
+ }
+
+ def postFinished(producedRowCountOpt: Option[Long] = None): Unit = {
+ assertStatus(
+ List(
+ OperationStatus.Started,
+ OperationStatus.ReadyForExecution),
+ OperationStatus.Finished)
+ producedRowCount = producedRowCountOpt
+ }
+
+ def postClosed(): Unit = {
+ assertStatus(
+ List(
+ OperationStatus.Finished,
+ OperationStatus.Failed,
+ OperationStatus.Canceled),
+ OperationStatus.Closed)
+ }
+
+ def status_(operationStatus: OperationStatus): Unit = {
+ _status = operationStatus
+ }
+
+ private def assertStatus(
+ validStatuses: List[OperationStatus],
+ eventStatus: OperationStatus): Unit = {
+ if (!validStatuses.contains(status)) {
+ throw new IllegalStateException(
+ s"""
+ |operationId: $operationId with status ${status}
+ |is not within statuses $validStatuses for event $eventStatus
+ |""".stripMargin)
+ }
+// if (sessionStatus != SessionStatus.Started) {
+// throw new IllegalStateException(
+// s"""
+// |sessionId: $sessionId with status $sessionStatus
+// |is not Started for event $eventStatus
+// |""".stripMargin)
+// }
+ _status = eventStatus
+ }
+}
diff --git a/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/events/SessionEventsManager.scala b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/events/SessionEventsManager.scala
new file mode 100644
index 00000000000..e182a62c783
--- /dev/null
+++ b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/events/SessionEventsManager.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.events
+
+import org.apache.kyuubi.grpc.session.GrpcSession
+import org.apache.kyuubi.grpc.utils.Clock
+
+sealed abstract class SessionStatus(value: Int)
+
+object SessionStatus {
+ case object Pending extends SessionStatus(0)
+ case object Started extends SessionStatus(1)
+ case object Closed extends SessionStatus(2)
+}
+
+abstract class SessionEventsManager(session: GrpcSession, clock: Clock) {
+ private def sessionId: String = session.sessionKey.sessionId
+
+ private var _status: SessionStatus = SessionStatus.Pending
+
+ protected def status_(sessionStatus: SessionStatus): Unit = {
+ _status = sessionStatus
+ }
+
+ def status: SessionStatus = _status
+
+ def postStarted(): Unit = {
+ assertStatus(List(SessionStatus.Pending), SessionStatus.Started)
+ status_(SessionStatus.Started)
+ }
+
+ def postClosed(): Unit = {
+ assertStatus(List(SessionStatus.Started), SessionStatus.Closed)
+ status_(SessionStatus.Closed)
+ }
+
+ private def assertStatus(validStatuses: List[SessionStatus], eventStatus: SessionStatus): Unit = {
+ if (!validStatuses.contains(status)) {
+ throw new IllegalStateException(
+ s"""
+ |sessionId: $sessionId with status ${status}
+ |is not within statuses $validStatuses for event $eventStatus
+ |""".stripMargin)
+ }
+ }
+}
diff --git a/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/operation/AbstractGrpcOperation.scala b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/operation/AbstractGrpcOperation.scala
new file mode 100644
index 00000000000..3502802e7bc
--- /dev/null
+++ b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/operation/AbstractGrpcOperation.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.operation
+
+import java.util.concurrent.locks.ReentrantLock
+
+import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
+import org.apache.kyuubi.grpc.session.GrpcSession
+
+abstract class AbstractGrpcOperation[S <: GrpcSession](session: S) extends GrpcOperation
+ with Logging {
+ final protected val opType: String = getClass.getSimpleName
+ final protected val createTime = System.currentTimeMillis()
+ protected def key: OperationKey
+ final private val operationTimeout: Long = 1000
+ private var lock: ReentrantLock = new ReentrantLock()
+
+ protected def withLockRequired[T](block: => T): T = Utils.withLockRequired(lock)(block)
+
+ @volatile protected var startTime: Long = _
+ @volatile protected var completedTime: Long = _
+ @volatile protected var lastAccessTime: Long = createTime
+
+ @volatile protected var operationException: KyuubiSQLException = _
+
+ protected def setOperationException(ex: KyuubiSQLException): Unit = {
+ this.operationException = ex
+ }
+
+ protected def runInternal(): Unit
+
+ protected def beforeRun(): Unit
+
+ protected def afterRun(): Unit
+
+ override def run(): Unit = {
+ beforeRun()
+ try {
+ runInternal()
+ } finally {
+ afterRun()
+ }
+ }
+
+ override def close(): Unit
+
+ override def operationKey: OperationKey = key
+
+ override def grpcSession: S = session
+
+}
+
+object OperationJobTag {
+ def apply(prefix: String, operationKey: OperationKey): String = {
+ s"${prefix}_" +
+ s"User_${operationKey.userId}_" +
+ s"Session_${operationKey.sessionId}_" +
+ s"Operation_${operationKey.operationId}"
+ }
+
+ def unapply(jobTag: String, prefix: String): Option[String] = {
+ if (jobTag.startsWith(prefix)) Some(jobTag) else None
+ }
+}
diff --git a/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/operation/GrpcOperation.scala b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/operation/GrpcOperation.scala
new file mode 100644
index 00000000000..c0d3715c176
--- /dev/null
+++ b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/operation/GrpcOperation.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.operation
+
+import org.apache.kyuubi.grpc.events.OperationEventsManager
+import org.apache.kyuubi.grpc.session.GrpcSession
+import org.apache.kyuubi.operation.log.OperationLog
+
+trait GrpcOperation {
+ def run(): Unit
+ def interrupt(): Unit
+ def close(): Unit
+
+ def getOperationLog: Option[OperationLog]
+ def isTimedOut: Boolean
+ def grpcSession: GrpcSession
+ def operationKey: OperationKey
+ def operationEventsManager: OperationEventsManager
+}
diff --git a/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/operation/GrpcOperationManager.scala b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/operation/GrpcOperationManager.scala
new file mode 100644
index 00000000000..964158ee4d9
--- /dev/null
+++ b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/operation/GrpcOperationManager.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.operation
+
+import java.util.concurrent._
+
+import scala.collection.JavaConverters._
+
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.operation.log.LogDivertAppender
+import org.apache.kyuubi.service.AbstractService
+
+/**
+ * The [[GrpcOperationManager]] manages all the grpc operations during their lifecycle
+ */
+abstract class GrpcOperationManager(name: String)
+ extends AbstractService(name) {
+
+ private val keyToOperations = new ConcurrentHashMap[OperationKey, GrpcOperation]
+
+ protected def skipOperationLog: Boolean = false
+ def getOperationCount: Int = keyToOperations.size()
+
+ def allOperations(): Iterable[GrpcOperation] = keyToOperations.values().asScala
+
+ override def initialize(conf: KyuubiConf): Unit = {
+ LogDivertAppender.initialize(skipOperationLog)
+ super.initialize(conf)
+ }
+
+ def close(operationKey: OperationKey): Unit = {
+ val operation = keyToOperations.get(operationKey)
+ if (operation == null) throw KyuubiSQLException(s"Invalid $operationKey")
+ operation.close()
+ }
+
+ final def addOperation(grpcOperation: GrpcOperation): GrpcOperation = synchronized {
+ keyToOperations.put(grpcOperation.operationKey, grpcOperation)
+ grpcOperation
+ }
+
+ @throws[KyuubiSQLException]
+ final def getOperation(operationKey: OperationKey): GrpcOperation = {
+ val operation = synchronized { keyToOperations.get(operationKey) }
+ if (operation == null) throw KyuubiSQLException(s"Invalid $operationKey")
+ operation
+ }
+
+ @throws[KyuubiSQLException]
+ final def removeOperation(operationKey: OperationKey): GrpcOperation = synchronized {
+ val operation = keyToOperations.remove(operationKey)
+ if (operation == null) throw KyuubiSQLException(s"Invalid $operationKey")
+ operation
+ }
+
+ @throws[KyuubiSQLException]
+ final def closeOperation(operationKey: OperationKey): Unit = {
+ val operation = removeOperation(operationKey)
+ operation.close()
+ }
+
+ @throws[KyuubiSQLException]
+ final def interruptOperation(operationKey: OperationKey): Unit = {
+ val operation = getOperation(operationKey)
+ operation.interrupt()
+ }
+
+}
diff --git a/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/operation/OperationKey.scala b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/operation/OperationKey.scala
new file mode 100644
index 00000000000..c1cc2e83474
--- /dev/null
+++ b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/operation/OperationKey.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.operation
+
+import java.util.UUID
+
+import org.apache.kyuubi.grpc.session.SessionKey
+
+case class OperationKey(userId: String, sessionId: String, operationId: String) {
+ override def toString: String =
+ s"Session: [{$userId}_{$sessionId}], OperationId: [$operationId]"
+}
+
+object OperationKey {
+ def apply(key: SessionKey): OperationKey =
+ new OperationKey(key.userId, key.sessionId, UUID.randomUUID().toString)
+
+}
diff --git a/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/service/AbstractGrpcBackendService.scala b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/service/AbstractGrpcBackendService.scala
new file mode 100644
index 00000000000..502ad3d057d
--- /dev/null
+++ b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/service/AbstractGrpcBackendService.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.service
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.service.CompositeService
+
+abstract class AbstractGrpcBackendService(name: String)
+ extends CompositeService(name) with GrpcBackendService {
+
+ override def initialize(conf: KyuubiConf): Unit = {
+ addService(grpcSessionManager)
+ super.initialize(conf)
+ }
+}
diff --git a/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/service/AbstractGrpcFrontendService.scala b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/service/AbstractGrpcFrontendService.scala
new file mode 100644
index 00000000000..7d057220b65
--- /dev/null
+++ b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/service/AbstractGrpcFrontendService.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.service
+
+import java.net.{InetAddress, InetSocketAddress}
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicBoolean
+
+import com.google.protobuf.MessageLite
+import io.grpc._
+import io.grpc.MethodDescriptor.PrototypeMarshaller
+import io.grpc.netty.NettyServerBuilder
+import io.grpc.protobuf.lite.ProtoLiteUtils
+
+import org.apache.kyuubi.{KyuubiException, Logging, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_CONNECT_GRPC_BINDING_PORT, FRONTEND_ADVERTISED_HOST}
+import org.apache.kyuubi.service.CompositeService
+import org.apache.kyuubi.util.NamedThreadFactory
+
+abstract class AbstractGrpcFrontendService(name: String)
+ extends CompositeService(name) with GrpcFrontendService with Runnable
+ with BindableService with Logging {
+
+ private val started = new AtomicBoolean(false)
+ protected var server: Server = _
+ protected def portNum: Int = conf.get(ENGINE_SPARK_CONNECT_GRPC_BINDING_PORT)
+ protected def maxInboundMessageSize: Int = 1024
+
+ protected def serverHost: Option[String]
+ protected lazy val serverAddr: InetAddress =
+ serverHost.map(InetAddress.getByName).getOrElse(Utils.findLocalInetAddress)
+
+ private lazy val serverThread = new NamedThreadFactory(getName, false).newThread(this)
+
+ override def initialize(conf: KyuubiConf): Unit = {
+ this.conf = conf
+ try {
+ val socketAddress = new InetSocketAddress(serverAddr.getHostName, portNum)
+ val nettyServerBuilder = NettyServerBuilder
+ .forAddress(socketAddress)
+ .maxInboundMessageSize(maxInboundMessageSize)
+ .addService(this)
+ server = nettyServerBuilder.build()
+ } catch {
+ case e: Throwable =>
+ error(e)
+ throw new KyuubiException(
+ s"Failed to initialize grpc frontend service on $portNum",
+ e)
+ }
+ super.initialize(conf)
+ }
+
+ override def bindService(): ServerServiceDefinition
+
+ protected def methodWithCustomMarshallers(methodDesc: MethodDescriptor[MessageLite, MessageLite])
+ : MethodDescriptor[MessageLite, MessageLite] = {
+ // default 1024
+ val recursionLimit = 1024
+ val requestMarshaller =
+ ProtoLiteUtils.marshallerWithRecursionLimit(
+ methodDesc.getRequestMarshaller
+ .asInstanceOf[PrototypeMarshaller[MessageLite]]
+ .getMessagePrototype,
+ recursionLimit)
+ val responseMarshaller =
+ ProtoLiteUtils.marshallerWithRecursionLimit(
+ methodDesc.getResponseMarshaller
+ .asInstanceOf[PrototypeMarshaller[MessageLite]]
+ .getMessagePrototype,
+ recursionLimit)
+ methodDesc.toBuilder
+ .setRequestMarshaller(requestMarshaller)
+ .setResponseMarshaller(responseMarshaller)
+ .build()
+ }
+
+ override def start(): Unit = {
+ try {
+ if (started.compareAndSet(false, true)) {
+ serverThread.start()
+ }
+ super.start()
+ } catch {
+ case e: Throwable =>
+ stopInternal()
+ throw e
+ }
+ }
+
+ private def stopInternal(): Unit = {
+ if (started.compareAndSet(true, false)) {
+ serverThread.interrupt()
+ stopServer(Some(10L), Some(TimeUnit.SECONDS))
+ info(getName + " has stoppped")
+ }
+ }
+
+ override def stop(): Unit = {
+ super.stop()
+ stopInternal()
+ }
+
+ def stopServer(timeout: Option[Long] = None, unit: Option[TimeUnit] = None): Unit = {
+ if (server != null) {
+ if (timeout.isDefined && unit.isDefined) {
+ server.shutdown()
+ server.awaitTermination(timeout.get, unit.get)
+ } else {
+ server.shutdown()
+ }
+ }
+ }
+
+ override def run(): Unit = {
+ try {
+ server.start()
+ info("Grpc Server Start Success")
+ } catch {
+ case _: InterruptedException => error(s"$getName is interrupted")
+ case t: Throwable =>
+ error(s"Error starting $getName", t)
+ System.exit(-1)
+ }
+ }
+
+ override def connectionUrl: String = {
+ val host = (conf.get(FRONTEND_ADVERTISED_HOST), serverHost) match {
+ case (Some(advertisedHost), _) => advertisedHost
+ case (None, Some(h)) => h
+ case (None, None) => serverAddr.getHostAddress
+ }
+ host + ":" + portNum
+ }
+
+ protected def isServer(): Boolean = false
+
+}
diff --git a/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/service/GrpcBackendService.scala b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/service/GrpcBackendService.scala
new file mode 100644
index 00000000000..9dbdebcd1d0
--- /dev/null
+++ b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/service/GrpcBackendService.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.service
+
+import org.apache.kyuubi.grpc.session.{GrpcSession, GrpcSessionManager}
+
+trait GrpcBackendService {
+
+ def grpcSessionManager: GrpcSessionManager[_ <: GrpcSession]
+}
diff --git a/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/service/GrpcFrontendService.scala b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/service/GrpcFrontendService.scala
new file mode 100644
index 00000000000..7feebb067fd
--- /dev/null
+++ b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/service/GrpcFrontendService.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.service
+
+import org.apache.kyuubi.service.Service
+
+trait GrpcFrontendService {
+
+ def connectionUrl: String
+
+ val serverable: GrpcSeverable[_ <: GrpcBackendService, _ <: GrpcFrontendService]
+
+ final def be: GrpcBackendService = serverable.backendService
+
+ val discoveryService: Option[Service]
+
+ def attributes: Map[String, String] = Map.empty
+
+}
diff --git a/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/service/GrpcSeverable.scala b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/service/GrpcSeverable.scala
new file mode 100644
index 00000000000..64ea8347607
--- /dev/null
+++ b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/service/GrpcSeverable.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.service
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.service.CompositeService
+
+abstract class GrpcSeverable[B <: AbstractGrpcBackendService, F <: AbstractGrpcFrontendService](
+ name: String) extends CompositeService(name) {
+ private val started = new AtomicBoolean(false)
+
+ var selfExited = false
+
+ val backendService: B
+
+ val frontendServices: Seq[F]
+
+ override def initialize(conf: KyuubiConf): Unit = synchronized {
+ this.conf = conf
+ addService(backendService)
+ frontendServices.foreach(addService)
+ super.initialize(conf)
+ }
+
+ override def start(): Unit = synchronized {
+ if (!started.getAndSet(true)) {
+ super.start()
+ }
+ }
+
+ protected def stopServer(): Unit
+
+ override def stop(): Unit = synchronized {
+ try {
+ if (started.getAndSet(false)) {
+ super.stop()
+ }
+ } catch {
+ case t: Throwable =>
+ warn(s"Error stopping $name ${t.getMessage}", t)
+ } finally {
+ try {
+ stopServer()
+ } catch {
+ case t: Throwable =>
+ warn(s"Error stopping $name ${t.getMessage}", t)
+ }
+ }
+ }
+}
diff --git a/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/session/AbstractGrpcSession.scala b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/session/AbstractGrpcSession.scala
new file mode 100644
index 00000000000..1035a5e9116
--- /dev/null
+++ b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/session/AbstractGrpcSession.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.session
+
+import java.util
+
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
+import org.apache.kyuubi.grpc.operation.{GrpcOperation, OperationKey}
+import org.apache.kyuubi.grpc.utils.ProtoUtils
+
+abstract class AbstractGrpcSession(
+ val userId: String) extends GrpcSession with Logging {
+ override val sessionKey: SessionKey = SessionKey(userId)
+
+ final private val _createTime: Long = System.currentTimeMillis()
+ override def createTime: Long = _createTime
+
+ @volatile private var _lastAccessTime: Long = _createTime
+ override def lastAccessTime: Long = _lastAccessTime
+
+ @volatile private var closedTimeMs: Option[Long] = None
+
+ @volatile private var _lastIdleTime: Long = _createTime
+ override def lastIdleTime: Long = _lastIdleTime
+
+ final private val opKeySet = new util.HashSet[OperationKey]
+
+ protected def runGrpcOperation(operation: GrpcOperation): OperationKey = {
+ if (closedTimeMs.isDefined) {
+ throw KyuubiSQLException("Cannot build operation because the session is closed")
+ }
+ try {
+ val opKey = operation.operationKey
+ opKeySet.add(opKey)
+ operation.run()
+ opKey
+ } catch {
+ case e: KyuubiSQLException =>
+ opKeySet.remove(operation.operationKey)
+ sessionManager.grpcOperationManager.close(operation.operationKey)
+ throw e
+ }
+ }
+
+ override def removeOperation(operationKey: OperationKey): Unit = {
+ sessionManager.grpcOperationManager.removeOperation(operationKey)
+ }
+
+ override def closeOperation(operationKey: OperationKey): Unit = {
+ sessionManager.grpcOperationManager.closeOperation(operationKey)
+ }
+
+ override def interruptOperation(operationKey: OperationKey): Unit = {
+ sessionManager.grpcOperationManager.interruptOperation(operationKey)
+ }
+
+ override def open(): Unit = {
+ sessionEventsManager.postStarted()
+ }
+
+ override def close(): Unit = {
+ if (closedTimeMs.isDefined) {
+ throw KyuubiSQLException(s"Session ${sessionKey.sessionId} is already closed.")
+ }
+ closedTimeMs = Some(System.currentTimeMillis())
+ sessionEventsManager.postClosed()
+ }
+
+}
+
+object SessionTag {
+ def apply(sessionKey: SessionKey, tag: String, prefix: String): String = {
+ ProtoUtils.throwIfInvalidTag(tag)
+ s"${prefix}_" +
+ s"User_${sessionKey.userId}_" +
+ s"Session_${sessionKey.sessionId}_" +
+ s"Tag_${tag}"
+ }
+
+ def unapply(sessionTag: String, prefix: String): Option[String] = {
+ if (sessionTag.startsWith(prefix)) Some(sessionTag) else None
+ }
+}
diff --git a/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/session/GrpcSession.scala b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/session/GrpcSession.scala
new file mode 100644
index 00000000000..00e3666cca7
--- /dev/null
+++ b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/session/GrpcSession.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.session
+
+import org.apache.kyuubi.grpc.events.SessionEventsManager
+import org.apache.kyuubi.grpc.operation.{GrpcOperation, OperationKey}
+
+trait GrpcSession {
+ def sessionKey: SessionKey
+ def name: Option[String]
+
+ def serverSessionId: String
+
+ def createTime: Long
+ def lastAccessTime: Long
+ def lastIdleTime: Long
+
+ def sessionManager: GrpcSessionManager[_ <: GrpcSession]
+
+ def sessionEventsManager: SessionEventsManager
+
+ def open()
+ def close()
+
+ def removeOperation(operationKey: OperationKey): Unit
+
+ def getOperation(operationKey: OperationKey): GrpcOperation
+ def closeOperation(operationKey: OperationKey): Unit
+
+ def interruptOperation(operationKey: OperationKey): Unit
+
+}
diff --git a/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/session/GrpcSessionManager.scala b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/session/GrpcSessionManager.scala
new file mode 100644
index 00000000000..2708e542fb1
--- /dev/null
+++ b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/session/GrpcSessionManager.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.session
+
+import java.io.IOException
+import java.nio.file.{Files, Paths}
+import java.util.concurrent._
+
+import scala.concurrent.duration.Duration
+
+import org.apache.kyuubi.{KyuubiSQLException, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.grpc.operation.GrpcOperationManager
+import org.apache.kyuubi.service.CompositeService
+import org.apache.kyuubi.util.ThreadUtils
+
+abstract class GrpcSessionManager[S <: AbstractGrpcSession](name: String)
+ extends CompositeService(name) {
+
+ @volatile private var shutdown = false
+
+ protected var _operationLogRoot: Option[String] = None
+
+ def operationLogRoot: Option[String] = _operationLogRoot
+
+ private def initOperationLogRootDir(): Unit = {
+ try {
+ val logRoot = {
+ if (isServer) {
+ conf.get(SERVER_OPERATION_LOG_DIR_ROOT)
+ } else {
+ conf.get(ENGINE_OPERATION_LOG_DIR_ROOT)
+ }
+ }
+ val logPath = Files.createDirectories(Utils.getAbsolutePathFromWork(logRoot))
+ _operationLogRoot = Some(logPath.toString)
+ } catch {
+ case e: IOException =>
+ error(s"Failed to initialize operation log root directory: ${_operationLogRoot}", e)
+ _operationLogRoot = None
+ }
+ }
+
+ private val sessionKeyToSession = new ConcurrentHashMap[SessionKey, S]
+
+ @volatile private var _latestLogoutTime: Long = System.currentTimeMillis()
+ def latestLogoutTime: Long = _latestLogoutTime
+
+ private val timeoutChecker =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-timeout-checker")
+
+ protected def isServer: Boolean
+
+ private var execPool: ThreadPoolExecutor = _
+
+ def grpcOperationManager: GrpcOperationManager
+
+ protected def getOrCreateSession(
+ key: SessionKey): S
+
+ def getSession(key: SessionKey): S = {
+ getSessionOption(key).getOrElse(throw KyuubiSQLException(s"Invalid key $key"))
+ }
+
+ private def getSessionOption(key: SessionKey): Option[S] = {
+ Option(sessionKeyToSession.get(key))
+ }
+ def openSession(
+ key: SessionKey): S = {
+ info(s"Opening grpc session for ${key.userId}")
+ val session = getOrCreateSession(key)
+ try {
+ val key = session.sessionKey
+ session.open()
+ setSession(key, session)
+ logSessionCountInfo(session, "opened")
+ session
+ } catch {
+ case e: Exception =>
+ try {
+ session.close()
+ } catch {
+ case t: Throwable =>
+ warn(s"Error closing session for ${key.userId}", t)
+ }
+ throw KyuubiSQLException(e)
+ }
+ }
+
+ protected def removeSession(key: SessionKey): Option[S] = {
+ val session = sessionKeyToSession.remove(key)
+ Some(session)
+ }
+
+ protected def shutdownSession(session: S): Unit = {
+ session.close()
+ }
+
+ protected def closeSession(key: SessionKey): Unit = {
+ _latestLogoutTime = System.currentTimeMillis()
+ val session = sessionKeyToSession.remove(key)
+ if (session == null) {
+ throw KyuubiSQLException(s"Invalid $key")
+ }
+ logSessionCountInfo(session, "closed")
+ try {
+ session.close()
+ } finally {
+ deleteOperationLogSessionDir(key)
+ }
+ }
+
+ private def deleteOperationLogSessionDir(key: SessionKey): Unit = {
+ _operationLogRoot.foreach(logRoot => {
+ val rootPath = Paths.get(logRoot, key.toString)
+ try {
+ Utils.deleteDirectoryRecursively(rootPath.toFile)
+ } catch {
+ case e: IOException =>
+ error(s"Failed to delete session operation log directory ${rootPath.toString}", e)
+ }
+ })
+ }
+
+ final protected def setSession(key: SessionKey, session: S): Unit = {
+ sessionKeyToSession.put(key, session)
+ }
+
+ protected def logSessionCountInfo(session: S, action: String): Unit = {
+ info(s"${session.sessionKey.userId}'s ${session.getClass.getSimpleName} with" +
+ s" ${session.sessionKey.sessionId}${session.name.map("/" + _).getOrElse("")} is $action," +
+ s" current opening sessions $getOpenSessionCount")
+ }
+
+ def getOpenSessionCount: Int = sessionKeyToSession.size()
+
+ def getExecPoolSize: Int = {
+ assert(execPool != null)
+ execPool.getPoolSize
+ }
+
+ def getActiveCount: Int = {
+ assert(execPool != null)
+ execPool.getActiveCount
+ }
+
+ def getWorkQueueSize: Int = {
+ assert(execPool != null)
+ execPool.getQueue.size()
+ }
+
+ override def initialize(conf: KyuubiConf): Unit = {
+ this.conf = conf
+ addService(grpcOperationManager)
+ initOperationLogRootDir()
+
+ val poolSize: Int = {
+ if (isServer) {
+ conf.get(SERVER_EXEC_POOL_SIZE)
+ } else {
+ conf.get(ENGINE_EXEC_POOL_SIZE)
+ }
+ }
+
+ val waitQueueSize: Int = {
+ if (isServer) {
+ conf.get(SERVER_EXEC_WAIT_QUEUE_SIZE)
+ } else {
+ conf.get(ENGINE_EXEC_WAIT_QUEUE_SIZE)
+ }
+ }
+ val keepAliveMs: Long = {
+ if (isServer) {
+ conf.get(SERVER_EXEC_KEEPALIVE_TIME)
+ } else {
+ conf.get(ENGINE_EXEC_KEEPALIVE_TIME)
+ }
+ }
+
+ execPool = ThreadUtils.newDaemonQueuedThreadPool(
+ poolSize,
+ waitQueueSize,
+ keepAliveMs,
+ s"$name-exec-pool")
+ super.initialize(conf)
+ }
+
+ override def stop(): Unit = synchronized {
+ super.stop()
+ shutdown = true
+ val shutdownTimeout: Long = {
+ if (isServer) {
+ conf.get(SERVER_EXEC_POOL_SHUTDOWN_TIMEOUT)
+ } else {
+ conf.get(ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT)
+ }
+ }
+
+ ThreadUtils.shutdown(execPool, Duration(shutdownTimeout, TimeUnit.MILLISECONDS))
+ }
+}
diff --git a/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/session/SessionKey.scala b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/session/SessionKey.scala
new file mode 100644
index 00000000000..e02582de331
--- /dev/null
+++ b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/session/SessionKey.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.session
+
+import java.util.UUID
+
+case class SessionKey(userId: String, sessionId: String) {
+ override def toString: String = s"UserId: [$userId], SessionId: [$sessionId]"
+}
+
+object SessionKey {
+ def apply(userId: String): SessionKey = new SessionKey(userId, UUID.randomUUID().toString)
+}
diff --git a/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/utils/Clock.scala b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/utils/Clock.scala
new file mode 100644
index 00000000000..9a471443d59
--- /dev/null
+++ b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/utils/Clock.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.utils
+
+trait Clock {
+ def getTimeMillis(): Long
+
+ def nanoTime(): Long
+
+ def waitTillTime(targetTime: Long): Long
+}
+
+class SystemClock extends Clock {
+ val minPollTime = 25L
+
+ override def getTimeMillis(): Long = System.currentTimeMillis()
+
+ override def nanoTime(): Long = System.nanoTime()
+
+ override def waitTillTime(targetTime: Long): Long = {
+ var currentTime = System.currentTimeMillis()
+
+ var waitTime = targetTime - currentTime
+ if (waitTime <= 0) {
+ return currentTime
+ }
+
+ val pollTime = math.max(waitTime / 10.0, minPollTime).toLong
+
+ while (true) {
+ currentTime = System.currentTimeMillis()
+ waitTime = targetTime - currentTime
+ if (waitTime <= 0) {
+ return currentTime
+ }
+ val sleepTime = math.min(waitTime, pollTime)
+ Thread.sleep(sleepTime)
+ }
+ -1
+ }
+}
diff --git a/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/utils/ProtoUtils.scala b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/utils/ProtoUtils.scala
new file mode 100644
index 00000000000..08528eaacad
--- /dev/null
+++ b/kyuubi-grpc/src/main/scala/org/apache/kyuubi/grpc/utils/ProtoUtils.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.grpc.utils
+
+object ProtoUtils {
+
+ private val JOB_TAGS_SEP = ','
+ def throwIfInvalidTag(tag: String): Unit = {
+ if (tag == null) {
+ throw new IllegalArgumentException("Tag cannot be null.")
+ }
+ if (tag.contains(JOB_TAGS_SEP)) {
+ throw new IllegalArgumentException(
+ s"Tag cannot contain '$JOB_TAGS_SEP'.")
+ }
+ if (tag.isEmpty) {
+ throw new IllegalArgumentException("Tag cannot be an empty string.")
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 513271d0d3c..cea4b623d47 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,6 +83,9 @@
kyuubi-utilkyuubi-util-scalakyuubi-zookeeper
+ kyuubi-grpc
+ kyuubi-grpc-shade
+ kyuubi-grpc-server
@@ -1279,6 +1282,7 @@
true
+ alwaysfalse
@@ -1294,6 +1298,7 @@
false
+ alwayscentralMaven Repository