diff --git a/docs/pages/product/apis-integrations/queries.mdx b/docs/pages/product/apis-integrations/queries.mdx index 17ae53d60e620..6f171ce22eb88 100644 --- a/docs/pages/product/apis-integrations/queries.mdx +++ b/docs/pages/product/apis-integrations/queries.mdx @@ -142,7 +142,7 @@ The same query using the REST API syntax looks as follows: ### Query with post-processing **Queries with post-processing are specific to the [SQL API][ref-sql-api].** -They are structured in such a way that a [regular query](#regular-query) is +Generally, they are structured in such a way that a [regular query](#regular-query) is part of a `FROM` clause or a common table expression (CTE): ```sql @@ -178,8 +178,17 @@ limited set of SQL functions and operators. #### Example -See an example of a query with post-processing. In this query, we derive new -dimensions, post-aggregate measures, and perform additional filtering: +The simplest example of a query with post-processing: + +```sql +SELECT VERSION(); +``` + +This query invokes a function that is implemented by the SQL API and executed without +querying the upstream data source. + +Now, see a more complex example of a query with post-processing. In this query, we derive +new dimensions, post-aggregate measures, and perform additional filtering: ```sql SELECT diff --git a/docs/pages/product/apis-integrations/rest-api.mdx b/docs/pages/product/apis-integrations/rest-api.mdx index bd9a544882fc1..3ccf3d7267b12 100644 --- a/docs/pages/product/apis-integrations/rest-api.mdx +++ b/docs/pages/product/apis-integrations/rest-api.mdx @@ -130,13 +130,7 @@ accessible for everyone. | `data` | [`/v1/load`][ref-ref-load], [`/v1/sql`][ref-ref-sql] | ✅ Yes | | `graphql` | `/graphql` | ✅ Yes | | `jobs` | [`/v1/pre-aggregations/jobs`][ref-ref-paj] | ❌ No | - - - -Exception: `/livez` and `/readyz` endpoints don't belong to any scope. Access to -these endpoints can't be controlled using API scopes. - - +| No scope | `/livez`, `/readyz` | ✅ Yes, always | You can set accessible API scopes _for all requests_ using the `CUBEJS_DEFAULT_API_SCOPES` environment variable. For example, to disallow @@ -282,10 +276,10 @@ example, the following query will retrieve rows 101-200 from the `Orders` cube: [ref-conf-basepath]: /reference/configuration/config#basepath [ref-conf-contexttoapiscopes]: /reference/configuration/config#contexttoapiscopes -[ref-ref-load]: /product/apis-integrations/rest-api/reference#v1load -[ref-ref-meta]: /product/apis-integrations/rest-api/reference#v1meta -[ref-ref-sql]: /product/apis-integrations/rest-api/reference#v1sql -[ref-ref-paj]: /product/apis-integrations/rest-api/reference#v1pre-aggregationsjobs +[ref-ref-load]: /product/apis-integrations/rest-api/reference#base_pathv1load +[ref-ref-meta]: /product/apis-integrations/rest-api/reference#base_pathv1meta +[ref-ref-sql]: /product/apis-integrations/rest-api/reference#base_pathv1sql +[ref-ref-paj]: /product/apis-integrations/rest-api/reference#base_pathv1pre-aggregationsjobs [ref-security-context]: /product/auth/context [ref-graphql-api]: /product/apis-integrations/graphql-api [ref-orchestration-api]: /product/apis-integrations/orchestration-api diff --git a/docs/pages/product/apis-integrations/rest-api/reference.mdx b/docs/pages/product/apis-integrations/rest-api/reference.mdx index 9cf58e8ac7405..ea935aa175c31 100644 --- a/docs/pages/product/apis-integrations/rest-api/reference.mdx +++ b/docs/pages/product/apis-integrations/rest-api/reference.mdx @@ -99,21 +99,57 @@ values. ## `{base_path}/v1/sql` -Get the SQL Code generated by Cube to be executed in the database. +Takes an API query and returns the SQL query that can be executed against the data source +that is generated by Cube. This endpoint is useful for debugging, understanding how +Cube translates API queries into SQL queries, and providing transparency to SQL-savvy +end users. -| Parameter | Description | -| --------- | ------------------------------------------------------------------------- | -| query | URLencoded Cube [Query](/product/apis-integrations/rest-api/query-format) | +Using this endpoint to take the SQL query and execute it against the data source directly +is not recommended as it bypasses Cube's caching layer and other optimizations. -Response +Request parameters: -- `sql` - JSON Object with the following properties - - `sql` - Formatted SQL query with parameters - - `order` - Order fields and direction used in SQL query - - `cacheKeyQueries` - Key names and TTL of Cube data cache - - `preAggregations` - SQL queries used to build pre-aggregation tables +| Parameter, type | Description | Required | +| --- | --- | --- | +| `format`, `string` | Query format:
`sql` for [SQL API][ref-sql-api] queries,
`rest` for [REST API][ref-rest-api] queries (default) | ❌ No | +| `query`, `string` | Query as an URL-encoded JSON object or SQL query | ✅ Yes | +| `disable_post_processing`, `boolean` | Flag that affects query planning, `true` or `false` | ❌ No | -Example request: +If `disable_post_processing` is set to `true`, Cube will try to generate the SQL +as if the query is run without [post-processing][ref-query-wpp], i.e., if it's run as a +query with [pushdown][ref-query-wpd]. + + + +Currently, the `disable_post_processing` parameter is not yet supported. + + + +The response will contain a JSON object with the following properties under the `sql` key: + +| Property, type | Description | +| --- | --- | +| `status`, `string` | Query planning status, `ok` or `error` | +| `sql`, `array` | Two-element array (see below) | +| `sql[0]`, `string` | Generated query with parameter placeholders | +| `sql[1]`, `array` or `object` | Generated query parameters | + +For queries with the `sql` format, the response will also include the following additional +properties under the `sql` key: + +| Property, type | Description | +| --- | --- | +| `query_type`, `string` | `regular` for [regular][ref-regular-queries] queries,
`post_processing` for queries with [post-processing][ref-query-wpp],
`pushdown` for queries with [pushdown][ref-query-wpd] | + +For queries with the `sql` format, in case of an error, the response will only contain +`status`, `query_type`, and `error` properties. + +For example, an error will be returned if `disable_post_processing` was set to `true` but +the query can't be run without post-processing. + +### Example + +Request: ```bash{outputLines: 2-6} curl \ @@ -124,7 +160,7 @@ curl \ http://localhost:4000/cubejs-api/v1/sql ``` -Example response: +Response: ```json { @@ -464,4 +500,10 @@ Keep-Alive: timeout=5 [ref-recipes-data-blending]: /product/data-modeling/concepts/data-blending#data-blending [ref-rest-api]: /product/apis-integrations/rest-api [ref-basepath]: /product/apis-integrations/rest-api#base-path -[ref-datasources]: /product/configuration/advanced/multiple-data-sources \ No newline at end of file +[ref-datasources]: /product/configuration/advanced/multiple-data-sources +[ref-sql-api]: /product/apis-integrations/sql-api +[ref-rest-api]: /product/apis-integrations/rest-api +[ref-data-sources]: /product/configuration/advanced/multiple-data-sources +[ref-regular-queries]: /product/apis-integrations/queries#regular-query +[ref-query-wpp]: /product/apis-integrations/queries#query-with-post-processing +[ref-query-wpd]: /product/apis-integrations/queries#query-with-pushdown diff --git a/packages/cubejs-api-gateway/src/gateway.ts b/packages/cubejs-api-gateway/src/gateway.ts index 72acd827b0571..c180e0151296c 100644 --- a/packages/cubejs-api-gateway/src/gateway.ts +++ b/packages/cubejs-api-gateway/src/gateway.ts @@ -33,6 +33,7 @@ import { QueryType as QueryTypeEnum, ResultType } from './types/enums'; import { + BaseRequest, RequestContext, ExtendedRequestContext, Request, @@ -324,6 +325,17 @@ class ApiGateway { })); app.get(`${this.basePath}/v1/sql`, userMiddlewares, userAsyncHandler(async (req: any, res) => { + // TODO parse req.query with zod/joi/... + + if (req.query.format === 'sql') { + await this.sql4sql({ + query: req.query.query, + context: req.context, + res: this.resToResultFn(res) + }); + return; + } + await this.sql({ query: req.query.query, context: req.context, @@ -332,6 +344,17 @@ class ApiGateway { })); app.post(`${this.basePath}/v1/sql`, jsonParser, userMiddlewares, userAsyncHandler(async (req, res) => { + // TODO parse req.body with zod/joi/... + + if (req.body.format === 'sql') { + await this.sql4sql({ + query: req.body.query, + context: req.context, + res: this.resToResultFn(res) + }); + return; + } + await this.sql({ query: req.body.query, context: req.context, @@ -1281,6 +1304,26 @@ class ApiGateway { return [queryType, normalizedQueries, queryNormalizationResult.map((it) => remapToQueryAdapterFormat(it.normalizedQuery))]; } + protected async sql4sql({ + query, + context, + res, + }: {query: string} & BaseRequest) { + try { + await this.assertApiScope('data', context.securityContext); + + const result = await this.sqlServer.sql4sql(query, context.securityContext); + res({ sql: result }); + } catch (e: any) { + this.handleError({ + e, + context, + query, + res, + }); + } + } + public async sql({ query, context, diff --git a/packages/cubejs-api-gateway/src/sql-server.ts b/packages/cubejs-api-gateway/src/sql-server.ts index 32e7d37e1e19c..b40c83f3edf11 100644 --- a/packages/cubejs-api-gateway/src/sql-server.ts +++ b/packages/cubejs-api-gateway/src/sql-server.ts @@ -3,9 +3,11 @@ import { registerInterface, shutdownInterface, execSql, + sql4sql, SqlInterfaceInstance, Request as NativeRequest, LoadRequestMeta, + Sql4SqlResponse, } from '@cubejs-backend/native'; import type { ShutdownMode } from '@cubejs-backend/native'; import { displayCLIWarning, getEnv } from '@cubejs-backend/shared'; @@ -62,6 +64,10 @@ export class SQLServer { await execSql(this.sqlInterfaceInstance!, sqlQuery, stream, securityContext); } + public async sql4sql(sqlQuery: string, securityContext?: any): Promise { + return sql4sql(this.sqlInterfaceInstance!, sqlQuery, securityContext); + } + protected buildCheckSqlAuth(options: SQLServerOptions): CheckSQLAuthFn { return (options.checkSqlAuth && this.wrapCheckSqlAuthFn(options.checkSqlAuth)) || this.createDefaultCheckSqlAuthFn(options); diff --git a/packages/cubejs-backend-native/js/index.ts b/packages/cubejs-backend-native/js/index.ts index eec5c7370dd33..f32c824669916 100644 --- a/packages/cubejs-backend-native/js/index.ts +++ b/packages/cubejs-backend-native/js/index.ts @@ -124,6 +124,21 @@ export type DBResponsePrimitive = number | string; +// TODO type this better, to make it proper disjoint union +export type Sql4SqlOk = { + sql: string, + values: Array, +}; +export type Sql4SqlError = { error: string }; +export type Sql4SqlCommon = { + query_type: { + regular: boolean; + post_processing: boolean; + pushdown: boolean; + } +}; +export type Sql4SqlResponse = Sql4SqlCommon & (Sql4SqlOk | Sql4SqlError); + let loadedNative: any = null; export function loadNative() { @@ -389,6 +404,13 @@ export const execSql = async (instance: SqlInterfaceInstance, sqlQuery: string, await native.execSql(instance, sqlQuery, stream, securityContext ? JSON.stringify(securityContext) : null); }; +// TODO parse result from native code +export const sql4sql = async (instance: SqlInterfaceInstance, sqlQuery: string, securityContext?: any): Promise => { + const native = loadNative(); + + return native.sql4sql(instance, sqlQuery, securityContext ? JSON.stringify(securityContext) : null); +}; + export const buildSqlAndParams = (cubeEvaluator: any): String => { const native = loadNative(); diff --git a/packages/cubejs-backend-native/src/cubesql_utils.rs b/packages/cubejs-backend-native/src/cubesql_utils.rs new file mode 100644 index 0000000000000..36a7e7fafa007 --- /dev/null +++ b/packages/cubejs-backend-native/src/cubesql_utils.rs @@ -0,0 +1,76 @@ +use std::future::Future; +use std::net::SocketAddr; +use std::str::FromStr; +use std::sync::Arc; + +use cubesql::compile::DatabaseProtocol; +use cubesql::config::ConfigObj; +use cubesql::sql::{Session, SessionManager}; +use cubesql::CubeError; + +use crate::auth::NativeAuthContext; +use crate::config::NodeCubeServices; + +pub async fn create_session( + services: &NodeCubeServices, + native_auth_ctx: Arc, +) -> Result, CubeError> { + let config = services + .injector() + .get_service_typed::() + .await; + + let session_manager = services + .injector() + .get_service_typed::() + .await; + + let (host, port) = match SocketAddr::from_str( + config + .postgres_bind_address() + .as_deref() + .unwrap_or("127.0.0.1:15432"), + ) { + Ok(addr) => (addr.ip().to_string(), addr.port()), + Err(e) => { + return Err(CubeError::internal(format!( + "Failed to parse postgres_bind_address: {}", + e + ))) + } + }; + + let session = session_manager + .create_session(DatabaseProtocol::PostgreSQL, host, port, None) + .await?; + + session + .state + .set_auth_context(Some(native_auth_ctx.clone())); + + Ok(session) +} + +pub async fn with_session( + services: &NodeCubeServices, + native_auth_ctx: Arc, + f: F, +) -> Result +where + F: FnOnce(Arc) -> Fut, + Fut: Future>, +{ + let session_manager = services + .injector() + .get_service_typed::() + .await; + let session = create_session(services, native_auth_ctx).await?; + let connection_id = session.state.connection_id; + + // From now there's a session we should close before returning, as in `finally` + let result = { f(session).await }; + + session_manager.drop_session(connection_id).await; + + result +} diff --git a/packages/cubejs-backend-native/src/lib.rs b/packages/cubejs-backend-native/src/lib.rs index 60104692c697f..eb475af8857b6 100644 --- a/packages/cubejs-backend-native/src/lib.rs +++ b/packages/cubejs-backend-native/src/lib.rs @@ -7,6 +7,7 @@ pub mod auth; pub mod channel; pub mod config; pub mod cross; +pub mod cubesql_utils; pub mod gateway; pub mod logger; pub mod node_export; @@ -15,6 +16,7 @@ pub mod node_obj_serializer; pub mod orchestrator; #[cfg(feature = "python")] pub mod python; +pub mod sql4sql; pub mod stream; pub mod template; pub mod transport; diff --git a/packages/cubejs-backend-native/src/node_export.rs b/packages/cubejs-backend-native/src/node_export.rs index b4e811444da09..23f8fff450693 100644 --- a/packages/cubejs-backend-native/src/node_export.rs +++ b/packages/cubejs-backend-native/src/node_export.rs @@ -1,8 +1,5 @@ -use cubesql::compile::DatabaseProtocol; use cubesql::compile::{convert_sql_to_cube_query, get_df_batches}; use cubesql::config::processing_loop::ShutdownMode; -use cubesql::config::ConfigObj; -use cubesql::sql::SessionManager; use cubesql::transport::TransportService; use futures::StreamExt; @@ -13,7 +10,9 @@ use crate::auth::{NativeAuthContext, NodeBridgeAuthService}; use crate::channel::call_js_fn; use crate::config::{NodeConfiguration, NodeConfigurationFactoryOptions, NodeCubeServices}; use crate::cross::CLRepr; +use crate::cubesql_utils::with_session; use crate::logger::NodeBridgeLogger; +use crate::sql4sql::sql4sql; use crate::stream::OnDrainHandler; use crate::tokio_runtime_node; use crate::transport::NodeBridgeTransport; @@ -26,9 +25,7 @@ use cubenativeutils::wrappers::serializer::NativeDeserialize; use cubenativeutils::wrappers::NativeContextHolder; use cubesqlplanner::cube_bridge::base_query_options::NativeBaseQueryOptions; use cubesqlplanner::planner::base_query::BaseQuery; -use std::net::SocketAddr; use std::rc::Rc; -use std::str::FromStr; use std::sync::Arc; use std::time::SystemTime; @@ -36,8 +33,8 @@ use cubesql::{telemetry::ReportingLogger, CubeError}; use neon::prelude::*; -struct SQLInterface { - services: Arc, +pub(crate) struct SQLInterface { + pub(crate) services: Arc, } impl Finalize for SQLInterface {} @@ -184,107 +181,102 @@ async fn handle_sql_query( native_auth_ctx: Arc, channel: Arc, stream_methods: WritableStreamMethods, - sql_query: &String, + sql_query: &str, ) -> Result<(), CubeError> { let start_time = SystemTime::now(); - let config = services - .injector() - .get_service_typed::() - .await; - let transport_service = services .injector() .get_service_typed::() .await; - let session_manager = services - .injector() - .get_service_typed::() - .await; - let (host, port) = match SocketAddr::from_str( - &config - .postgres_bind_address() - .clone() - .unwrap_or("127.0.0.1:15432".into()), - ) { - Ok(addr) => (addr.ip().to_string(), addr.port()), - Err(e) => { - return Err(CubeError::internal(format!( - "Failed to parse postgres_bind_address: {}", - e - ))) + with_session(&services, native_auth_ctx.clone(), |session| async move { + if let Some(auth_context) = session.state.auth_context() { + session + .session_manager + .server + .transport + .log_load_state( + None, + auth_context, + session.state.get_load_request_meta("sql"), + "Load Request".to_string(), + serde_json::json!({ + "query": { + "sql": sql_query, + } + }), + ) + .await?; } - }; - let session = session_manager - .create_session(DatabaseProtocol::PostgreSQL, host, port, None) - .await?; - - session - .state - .set_auth_context(Some(native_auth_ctx.clone())); - - if let Some(auth_context) = session.state.auth_context() { - session - .session_manager - .server - .transport - .log_load_state( - None, - auth_context, - session.state.get_load_request_meta("sql"), - "Load Request".to_string(), - serde_json::json!({ - "query": { - "sql": sql_query, - } - }), - ) - .await?; - } + let session_clone = Arc::clone(&session); - let session_clone = Arc::clone(&session); + let execute = || async move { + // todo: can we use compiler_cache? + let meta_context = transport_service + .meta(native_auth_ctx) + .await + .map_err(|err| { + CubeError::internal(format!("Failed to get meta context: {}", err)) + })?; + let query_plan = convert_sql_to_cube_query(sql_query, meta_context, session).await?; - let execute = || async move { - // todo: can we use compiler_cache? - let meta_context = transport_service - .meta(native_auth_ctx) - .await - .map_err(|err| CubeError::internal(format!("Failed to get meta context: {}", err)))?; - let query_plan = convert_sql_to_cube_query(sql_query, meta_context, session).await?; + let mut stream = get_df_batches(&query_plan).await?; - let mut stream = get_df_batches(&query_plan).await?; + let semaphore = Arc::new(Semaphore::new(0)); - let semaphore = Arc::new(Semaphore::new(0)); + let drain_handler = OnDrainHandler::new( + channel.clone(), + stream_methods.stream.clone(), + semaphore.clone(), + ); - let drain_handler = OnDrainHandler::new( - channel.clone(), - stream_methods.stream.clone(), - semaphore.clone(), - ); + drain_handler.handle(stream_methods.on.clone()).await?; - drain_handler.handle(stream_methods.on.clone()).await?; + let mut is_first_batch = true; + while let Some(batch) = stream.next().await { + let (columns, data) = batch_to_rows(batch?)?; - let mut is_first_batch = true; - while let Some(batch) = stream.next().await { - let (columns, data) = batch_to_rows(batch?)?; + if is_first_batch { + let mut schema = Map::new(); + schema.insert("schema".into(), columns); + let columns = format!( + "{}{}", + serde_json::to_string(&serde_json::Value::Object(schema))?, + CHUNK_DELIM + ); + is_first_batch = false; + + call_js_fn( + channel.clone(), + stream_methods.write.clone(), + Box::new(|cx| { + let arg = cx.string(columns).upcast::(); + + Ok(vec![arg.upcast::()]) + }), + Box::new(|cx, v| match v.downcast_or_throw::(cx) { + Ok(v) => Ok(v.value(cx)), + Err(_) => Err(CubeError::internal( + "Failed to downcast write response".to_string(), + )), + }), + stream_methods.stream.clone(), + ) + .await?; + } - if is_first_batch { - let mut schema = Map::new(); - schema.insert("schema".into(), columns); - let columns = format!( - "{}{}", - serde_json::to_string(&serde_json::Value::Object(schema))?, - CHUNK_DELIM - ); - is_first_batch = false; + let mut rows = Map::new(); + rows.insert("data".into(), serde_json::Value::Array(data)); + let data = format!("{}{}", serde_json::to_string(&rows)?, CHUNK_DELIM); + let js_stream_write_fn = stream_methods.write.clone(); - call_js_fn( + let should_pause = !call_js_fn( channel.clone(), - stream_methods.write.clone(), + js_stream_write_fn, Box::new(|cx| { - let arg = cx.string(columns).upcast::(); + let arg = cx.string(data).upcast::(); Ok(vec![arg.upcast::()]) }), @@ -297,93 +289,67 @@ async fn handle_sql_query( stream_methods.stream.clone(), ) .await?; - } - - let mut rows = Map::new(); - rows.insert("data".into(), serde_json::Value::Array(data)); - let data = format!("{}{}", serde_json::to_string(&rows)?, CHUNK_DELIM); - let js_stream_write_fn = stream_methods.write.clone(); - let should_pause = !call_js_fn( - channel.clone(), - js_stream_write_fn, - Box::new(|cx| { - let arg = cx.string(data).upcast::(); - - Ok(vec![arg.upcast::()]) - }), - Box::new(|cx, v| match v.downcast_or_throw::(cx) { - Ok(v) => Ok(v.value(cx)), - Err(_) => Err(CubeError::internal( - "Failed to downcast write response".to_string(), - )), - }), - stream_methods.stream.clone(), - ) - .await?; - - if should_pause { - let permit = semaphore.acquire().await?; - permit.forget(); + if should_pause { + let permit = semaphore.acquire().await?; + permit.forget(); + } } - } - Ok::<(), CubeError>(()) - }; + Ok::<(), CubeError>(()) + }; - let result = execute().await; - let duration = start_time.elapsed().unwrap().as_millis() as u64; + let result = execute().await; + let duration = start_time.elapsed().unwrap().as_millis() as u64; - match &result { - Ok(_) => { - session_clone - .session_manager - .server - .transport - .log_load_state( - None, - session_clone.state.auth_context().unwrap(), - session_clone.state.get_load_request_meta("sql"), - "Load Request Success".to_string(), - serde_json::json!({ - "query": { - "sql": sql_query, - }, - "apiType": "sql", - "duration": duration, - "isDataQuery": true - }), - ) - .await?; - } - Err(err) => { - session_clone - .session_manager - .server - .transport - .log_load_state( - None, - session_clone.state.auth_context().unwrap(), - session_clone.state.get_load_request_meta("sql"), - "Cube SQL Error".to_string(), - serde_json::json!({ - "query": { - "sql": sql_query - }, - "apiType": "sql", - "duration": duration, - "error": err.message, - }), - ) - .await?; + match &result { + Ok(_) => { + session_clone + .session_manager + .server + .transport + .log_load_state( + None, + session_clone.state.auth_context().unwrap(), + session_clone.state.get_load_request_meta("sql"), + "Load Request Success".to_string(), + serde_json::json!({ + "query": { + "sql": sql_query, + }, + "apiType": "sql", + "duration": duration, + "isDataQuery": true + }), + ) + .await?; + } + Err(err) => { + session_clone + .session_manager + .server + .transport + .log_load_state( + None, + session_clone.state.auth_context().unwrap(), + session_clone.state.get_load_request_meta("sql"), + "Cube SQL Error".to_string(), + serde_json::json!({ + "query": { + "sql": sql_query + }, + "apiType": "sql", + "duration": duration, + "error": err.message, + }), + ) + .await?; + } } - } - - session_manager - .drop_session(session_clone.state.connection_id) - .await; - result + result + }) + .await } struct WritableStreamMethods { @@ -581,6 +547,7 @@ pub fn register_module_exports( cx.export_function("registerInterface", register_interface::)?; cx.export_function("shutdownInterface", shutdown_interface)?; cx.export_function("execSql", exec_sql)?; + cx.export_function("sql4sql", sql4sql)?; cx.export_function("isFallbackBuild", is_fallback_build)?; cx.export_function("__js_to_clrepr_to_js", debug_js_to_clrepr_to_js)?; diff --git a/packages/cubejs-backend-native/src/sql4sql.rs b/packages/cubejs-backend-native/src/sql4sql.rs new file mode 100644 index 0000000000000..fdc627dede12b --- /dev/null +++ b/packages/cubejs-backend-native/src/sql4sql.rs @@ -0,0 +1,228 @@ +use std::sync::Arc; + +use neon::prelude::*; + +use cubesql::compile::convert_sql_to_cube_query; +use cubesql::compile::datafusion::logical_plan::LogicalPlan; +use cubesql::compile::engine::df::scan::CubeScanNode; +use cubesql::compile::engine::df::wrapper::{CubeScanWrappedSqlNode, CubeScanWrapperNode}; +use cubesql::sql::Session; +use cubesql::transport::MetaContext; +use cubesql::CubeError; + +use crate::auth::NativeAuthContext; +use crate::config::NodeCubeServices; +use crate::cubesql_utils::with_session; +use crate::tokio_runtime_node; + +enum Sql4SqlQueryType { + Regular, + PostProcessing, + Pushdown, +} + +impl Sql4SqlQueryType { + pub fn to_js<'ctx>(&self, cx: &mut impl Context<'ctx>) -> JsResult<'ctx, JsString> { + let self_str = match self { + Self::Regular => "regular", + Self::PostProcessing => "post_processing", + Self::Pushdown => "pushdown", + }; + + Ok(cx.string(self_str)) + } +} + +enum Sql4SqlResponseResult { + Ok { + sql: String, + values: Vec>, + }, + Error { + error: String, + }, +} + +struct Sql4SqlResponse { + result: Sql4SqlResponseResult, + query_type: Sql4SqlQueryType, +} + +impl Sql4SqlResponse { + pub fn to_js<'ctx>(&self, cx: &mut impl Context<'ctx>) -> JsResult<'ctx, JsObject> { + let obj = cx.empty_object(); + + match &self.result { + Sql4SqlResponseResult::Ok { sql, values } => { + let status = cx.string("ok"); + obj.set(cx, "status", status)?; + + let sql_tuple = cx.empty_array(); + let sql = cx.string(sql); + sql_tuple.set(cx, 0, sql)?; + let js_values = cx.empty_array(); + for (i, v) in values.iter().enumerate() { + use std::convert::TryFrom; + let i = u32::try_from(i).unwrap(); + let v: Handle = v + .as_ref() + .map(|v| cx.string(v).upcast()) + .unwrap_or_else(|| cx.null().upcast()); + js_values.set(cx, i, v)?; + } + sql_tuple.set(cx, 1, js_values)?; + obj.set(cx, "sql", sql_tuple)?; + } + Sql4SqlResponseResult::Error { error } => { + let status = cx.string("error"); + obj.set(cx, "status", status)?; + + let error = cx.string(error); + obj.set(cx, "error", error)?; + } + } + + let query_type = self.query_type.to_js(cx)?; + obj.set(cx, "query_type", query_type)?; + + Ok(obj) + } +} + +async fn get_sql( + session: &Session, + meta_context: Arc, + plan: Arc, +) -> Result { + let auth_context = session + .state + .auth_context() + .ok_or_else(|| CubeError::internal("Unexpected missing auth context".to_string()))?; + + match plan.as_ref() { + LogicalPlan::Extension(extension) => { + let cube_scan_wrapped_sql = extension + .node + .as_any() + .downcast_ref::(); + + if let Some(cube_scan_wrapped_sql) = cube_scan_wrapped_sql { + return Ok(Sql4SqlResponse { + result: Sql4SqlResponseResult::Ok { + sql: cube_scan_wrapped_sql.wrapped_sql.sql.clone(), + values: cube_scan_wrapped_sql.wrapped_sql.values.clone(), + }, + query_type: Sql4SqlQueryType::Pushdown, + }); + } + + if extension.node.as_any().is::() { + let cube_scan_wrapper = CubeScanWrapperNode::new( + plan, + meta_context, + auth_context, + None, + session.server.config_obj.clone(), + ); + let wrapped_sql = cube_scan_wrapper + .generate_sql( + session.server.transport.clone(), + Arc::new(session.state.get_load_request_meta("sql")), + ) + .await?; + + return Ok(Sql4SqlResponse { + result: Sql4SqlResponseResult::Ok { + sql: wrapped_sql.wrapped_sql.sql.clone(), + values: wrapped_sql.wrapped_sql.values.clone(), + }, + query_type: Sql4SqlQueryType::Regular, + }); + } + + Err(CubeError::internal( + "Unexpected extension in logical plan root".to_string(), + )) + } + _ => Ok(Sql4SqlResponse { + result: Sql4SqlResponseResult::Error { + error: "Provided query can not be executed without post-processing.".to_string(), + }, + query_type: Sql4SqlQueryType::PostProcessing, + }), + } +} + +async fn handle_sql4sql_query( + services: Arc, + native_auth_ctx: Arc, + sql_query: &str, +) -> Result { + with_session(&services, native_auth_ctx.clone(), |session| async move { + let transport = session.server.transport.clone(); + // todo: can we use compiler_cache? + let meta_context = transport + .meta(native_auth_ctx) + .await + .map_err(|err| CubeError::internal(format!("Failed to get meta context: {err}")))?; + let query_plan = + convert_sql_to_cube_query(sql_query, meta_context.clone(), session.clone()).await?; + let logical_plan = query_plan.try_as_logical_plan()?; + get_sql(&session, meta_context, Arc::new(logical_plan.clone())).await + }) + .await +} + +pub fn sql4sql(mut cx: FunctionContext) -> JsResult { + let interface = cx.argument::>(0)?; + let sql_query = cx.argument::(1)?.value(&mut cx); + + let security_context: Option = match cx.argument::(2) { + Ok(string) => match string.downcast::(&mut cx) { + Ok(v) => v.value(&mut cx).parse::().ok(), + Err(_) => None, + }, + Err(_) => None, + }; + + let services = interface.services.clone(); + let runtime = tokio_runtime_node(&mut cx)?; + + let channel = cx.channel(); + + let native_auth_ctx = Arc::new(NativeAuthContext { + user: Some(String::from("unknown")), + superuser: false, + security_context, + }); + + let (deferred, promise) = cx.promise(); + + // In case spawned task panics or gets aborted before settle call it will leave permanently pending Promise in JS land + // We don't want to just waste whole thread (doesn't really matter main or worker or libuv thread pool) + // just busy waiting that JoinHandle + // TODO handle JoinError + // keep JoinHandle alive in JS thread + // check join handle from JS thread periodically, reject promise on JoinError + // maybe register something like uv_check handle (libuv itself does not have ABI stability of N-API) + // can do it relatively rare, and in a single loop for all JoinHandles + // this is just a watchdog for a Very Bad case, so latency requirement can be quite relaxed + runtime.spawn(async move { + let result = handle_sql4sql_query(services, native_auth_ctx, &sql_query).await; + + if let Err(err) = deferred.try_settle_with(&channel, move |mut cx| { + // `neon::result::ResultExt` is implemented only for Result, even though Ok variant is not touched + let response = result.or_else(|err| cx.throw_error(err.to_string()))?; + let response = response.to_js(&mut cx)?; + Ok(response) + }) { + // There is not much we can do at this point + // TODO lift this error to task => JoinHandle => JS watchdog + log::error!( + "Unable to settle JS promise from tokio task, try_settle_with failed, err: {err}" + ); + } + }); + + Ok(promise.upcast::()) +} diff --git a/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap b/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap index 4401d87097314..6e8034d70056c 100644 --- a/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap +++ b/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap @@ -1,5 +1,361 @@ // Jest Snapshot v1, https://goo.gl/fbAQLP +exports[`SQL API Cube SQL over HTTP sql4sql double aggregation post-processing 1`] = ` +Object { + "body": Object { + "sql": Object { + "error": "Provided query can not be executed without post-processing.", + "query_type": "post_processing", + "status": "error", + }, + }, + "headers": Headers { + Symbol(map): Object { + "access-control-allow-origin": Array [ + "*", + ], + "connection": Array [ + "keep-alive", + ], + "content-length": Array [ + "127", + ], + "content-type": Array [ + "application/json; charset=utf-8", + ], + "keep-alive": Array [ + "timeout=5", + ], + "x-powered-by": Array [ + "Express", + ], + }, + }, + "status": 200, + "statusText": "OK", +} +`; + +exports[`SQL API Cube SQL over HTTP sql4sql regular query 1`] = ` +Object { + "body": Object { + "sql": Object { + "query_type": "regular", + "sql": Array [ + "SELECT + sum(\\"orders\\".amount) \\"total\\" + FROM + ( + select 1 as id, 100 as amount, 'new' status, '2024-01-01'::timestamptz created_at + UNION ALL + select 2 as id, 200 as amount, 'new' status, '2024-01-02'::timestamptz created_at + UNION ALL + select 3 as id, 300 as amount, 'processed' status, '2024-01-03'::timestamptz created_at + UNION ALL + select 4 as id, 500 as amount, 'processed' status, '2024-01-04'::timestamptz created_at + UNION ALL + select 5 as id, 600 as amount, 'shipped' status, '2024-01-05'::timestamptz created_at + ) AS \\"orders\\" ", + Array [], + ], + "status": "ok", + }, + }, + "headers": Headers { + Symbol(map): Object { + "access-control-allow-origin": Array [ + "*", + ], + "connection": Array [ + "keep-alive", + ], + "content-length": Array [ + "638", + ], + "content-type": Array [ + "application/json; charset=utf-8", + ], + "keep-alive": Array [ + "timeout=5", + ], + "x-powered-by": Array [ + "Express", + ], + }, + }, + "status": 200, + "statusText": "OK", +} +`; + +exports[`SQL API Cube SQL over HTTP sql4sql regular query with missing column 1`] = ` +Object { + "body": Object { + "error": "Error: SQLCompilationError: Internal: Initial planning error: Error during planning: Invalid identifier '#foobar' for schema fields:[Orders.count, Orders.orderCount, Orders.netCollectionCompleted, Orders.arpu, Orders.refundRate, Orders.refundOrdersCount, Orders.overallOrders, Orders.totalAmount, Orders.toRemove, Orders.numberTotal, Orders.amountRank, Orders.amountReducedByStatus, Orders.statusPercentageOfTotal, Orders.amountRankView, Orders.amountRankDateMax, Orders.amountRankDate, Orders.countAndTotalAmount, Orders.createdAtMax, Orders.createdAtMaxProxy, Orders.id, Orders.status, Orders.createdAt, Orders.__user, Orders.__cubeJoinField], metadata:{}", + "stack": "Error: SQLCompilationError: Internal: Initial planning error: Error during planning: Invalid identifier '#foobar' for schema fields:[Orders.count, Orders.orderCount, Orders.netCollectionCompleted, Orders.arpu, Orders.refundRate, Orders.refundOrdersCount, Orders.overallOrders, Orders.totalAmount, Orders.toRemove, Orders.numberTotal, Orders.amountRank, Orders.amountReducedByStatus, Orders.statusPercentageOfTotal, Orders.amountRankView, Orders.amountRankDateMax, Orders.amountRankDate, Orders.countAndTotalAmount, Orders.createdAtMax, Orders.createdAtMaxProxy, Orders.id, Orders.status, Orders.createdAt, Orders.__user, Orders.__cubeJoinField], metadata:{}", + }, + "headers": Headers { + Symbol(map): Object { + "access-control-allow-origin": Array [ + "*", + ], + "connection": Array [ + "keep-alive", + ], + "content-length": Array [ + "1395", + ], + "content-type": Array [ + "application/json; charset=utf-8", + ], + "keep-alive": Array [ + "timeout=5", + ], + "x-powered-by": Array [ + "Express", + ], + }, + }, + "status": 500, + "statusText": "Internal Server Error", +} +`; + +exports[`SQL API Cube SQL over HTTP sql4sql regular query with parameters 1`] = ` +Object { + "body": Object { + "sql": Object { + "query_type": "regular", + "sql": Array [ + "SELECT + sum(\\"orders\\".amount) \\"total\\" + FROM + ( + select 1 as id, 100 as amount, 'new' status, '2024-01-01'::timestamptz created_at + UNION ALL + select 2 as id, 200 as amount, 'new' status, '2024-01-02'::timestamptz created_at + UNION ALL + select 3 as id, 300 as amount, 'processed' status, '2024-01-03'::timestamptz created_at + UNION ALL + select 4 as id, 500 as amount, 'processed' status, '2024-01-04'::timestamptz created_at + UNION ALL + select 5 as id, 600 as amount, 'shipped' status, '2024-01-05'::timestamptz created_at + ) AS \\"orders\\" WHERE (\\"orders\\".status = $1)", + Array [ + "foo", + ], + ], + "status": "ok", + }, + }, + "headers": Headers { + Symbol(map): Object { + "access-control-allow-origin": Array [ + "*", + ], + "connection": Array [ + "keep-alive", + ], + "content-length": Array [ + "674", + ], + "content-type": Array [ + "application/json; charset=utf-8", + ], + "keep-alive": Array [ + "timeout=5", + ], + "x-powered-by": Array [ + "Express", + ], + }, + }, + "status": 200, + "statusText": "OK", +} +`; + +exports[`SQL API Cube SQL over HTTP sql4sql set variable 1`] = ` +Object { + "body": Object { + "error": "Error: This query doesnt have a plan, because it already has values for response", + "stack": "Error: This query doesnt have a plan, because it already has values for response", + }, + "headers": Headers { + Symbol(map): Object { + "access-control-allow-origin": Array [ + "*", + ], + "connection": Array [ + "keep-alive", + ], + "content-length": Array [ + "241", + ], + "content-type": Array [ + "application/json; charset=utf-8", + ], + "keep-alive": Array [ + "timeout=5", + ], + "x-powered-by": Array [ + "Express", + ], + }, + }, + "status": 500, + "statusText": "Internal Server Error", +} +`; + +exports[`SQL API Cube SQL over HTTP sql4sql strictly post-processing 1`] = ` +Object { + "body": Object { + "sql": Object { + "error": "Provided query can not be executed without post-processing.", + "query_type": "post_processing", + "status": "error", + }, + }, + "headers": Headers { + Symbol(map): Object { + "access-control-allow-origin": Array [ + "*", + ], + "connection": Array [ + "keep-alive", + ], + "content-length": Array [ + "127", + ], + "content-type": Array [ + "application/json; charset=utf-8", + ], + "keep-alive": Array [ + "timeout=5", + ], + "x-powered-by": Array [ + "Express", + ], + }, + }, + "status": 200, + "statusText": "OK", +} +`; + +exports[`SQL API Cube SQL over HTTP sql4sql wrapper 1`] = ` +Object { + "body": Object { + "sql": Object { + "query_type": "pushdown", + "sql": Array [ + "SELECT \\"Orders\\".\\"sum_orders_total\\" \\"total\\" +FROM ( + SELECT + sum(\\"orders\\".amount) \\"sum_orders_total\\" + FROM + ( + select 1 as id, 100 as amount, 'new' status, '2024-01-01'::timestamptz created_at + UNION ALL + select 2 as id, 200 as amount, 'new' status, '2024-01-02'::timestamptz created_at + UNION ALL + select 3 as id, 300 as amount, 'processed' status, '2024-01-03'::timestamptz created_at + UNION ALL + select 4 as id, 500 as amount, 'processed' status, '2024-01-04'::timestamptz created_at + UNION ALL + select 5 as id, 600 as amount, 'shipped' status, '2024-01-05'::timestamptz created_at + ) AS \\"orders\\" WHERE ((LOWER(\\"orders\\".status) = UPPER(\\"orders\\".status))) +) AS \\"Orders\\"", + Array [], + ], + "status": "ok", + }, + }, + "headers": Headers { + Symbol(map): Object { + "access-control-allow-origin": Array [ + "*", + ], + "connection": Array [ + "keep-alive", + ], + "content-length": Array [ + "816", + ], + "content-type": Array [ + "application/json; charset=utf-8", + ], + "keep-alive": Array [ + "timeout=5", + ], + "x-powered-by": Array [ + "Express", + ], + }, + }, + "status": 200, + "statusText": "OK", +} +`; + +exports[`SQL API Cube SQL over HTTP sql4sql wrapper with parameters 1`] = ` +Object { + "body": Object { + "sql": Object { + "query_type": "pushdown", + "sql": Array [ + "SELECT \\"Orders\\".\\"sum_orders_total\\" \\"total\\" +FROM ( + SELECT + sum(\\"orders\\".amount) \\"sum_orders_total\\" + FROM + ( + select 1 as id, 100 as amount, 'new' status, '2024-01-01'::timestamptz created_at + UNION ALL + select 2 as id, 200 as amount, 'new' status, '2024-01-02'::timestamptz created_at + UNION ALL + select 3 as id, 300 as amount, 'processed' status, '2024-01-03'::timestamptz created_at + UNION ALL + select 4 as id, 500 as amount, 'processed' status, '2024-01-04'::timestamptz created_at + UNION ALL + select 5 as id, 600 as amount, 'shipped' status, '2024-01-05'::timestamptz created_at + ) AS \\"orders\\" WHERE ((LOWER(\\"orders\\".status) = $1)) +) AS \\"Orders\\"", + Array [ + "foo", + ], + ], + "status": "ok", + }, + }, + "headers": Headers { + Symbol(map): Object { + "access-control-allow-origin": Array [ + "*", + ], + "connection": Array [ + "keep-alive", + ], + "content-length": Array [ + "799", + ], + "content-type": Array [ + "application/json; charset=utf-8", + ], + "keep-alive": Array [ + "timeout=5", + ], + "x-powered-by": Array [ + "Express", + ], + }, + }, + "status": 200, + "statusText": "OK", +} +`; + exports[`SQL API Postgres (Data) SELECT COUNT(*) as cn, "status" FROM Orders GROUP BY 2 ORDER BY cn DESC: sql_orders 1`] = ` Array [ Object { diff --git a/packages/cubejs-testing/test/smoke-cubesql.test.ts b/packages/cubejs-testing/test/smoke-cubesql.test.ts index 7a780766b963a..53052bdf3bc1c 100644 --- a/packages/cubejs-testing/test/smoke-cubesql.test.ts +++ b/packages/cubejs-testing/test/smoke-cubesql.test.ts @@ -147,6 +147,89 @@ describe('SQL API', () => { expect(rows).toBe(ROWS_LIMIT); }); + + describe('sql4sql', () => { + async function generateSql(query: string) { + const response = await fetch(`${birdbox.configuration.apiUrl}/sql`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: token, + }, + body: JSON.stringify({ + query, + format: 'sql', + }), + }); + const { status, statusText, headers } = response; + const body = await response.json(); + + // To stabilize responses + delete body.requestId; + headers.delete('date'); + headers.delete('etag'); + + return { + status, + statusText, + headers, + body, + }; + } + + it('regular query', async () => { + expect(await generateSql(`SELECT SUM(totalAmount) AS total FROM Orders;`)).toMatchSnapshot(); + }); + + it('regular query with missing column', async () => { + expect(await generateSql(`SELECT SUM(foobar) AS total FROM Orders;`)).toMatchSnapshot(); + }); + + it('regular query with parameters', async () => { + expect(await generateSql(`SELECT SUM(totalAmount) AS total FROM Orders WHERE status = 'foo';`)).toMatchSnapshot(); + }); + + it('strictly post-processing', async () => { + expect(await generateSql(`SELECT version();`)).toMatchSnapshot(); + }); + + it('double aggregation post-processing', async () => { + expect(await generateSql(` + SELECT AVG(total) + FROM ( + SELECT + status, + SUM(totalAmount) AS total + FROM Orders + GROUP BY 1 + ) t + `)).toMatchSnapshot(); + }); + + it('wrapper', async () => { + expect(await generateSql(` + SELECT + SUM(totalAmount) AS total + FROM Orders + WHERE LOWER(status) = UPPER(status) + `)).toMatchSnapshot(); + }); + + it('wrapper with parameters', async () => { + expect(await generateSql(` + SELECT + SUM(totalAmount) AS total + FROM Orders + WHERE LOWER(status) = 'foo' + `)).toMatchSnapshot(); + }); + + it('set variable', async () => { + expect(await generateSql(` + SET MyVariable = 'Foo' + `)).toMatchSnapshot(); + }); + }); }); describe('Postgres (Auth)', () => { diff --git a/rust/cubesql/cubesql/src/compile/parser.rs b/rust/cubesql/cubesql/src/compile/parser.rs index d22f35cf8d9b0..4177e7f66ccec 100644 --- a/rust/cubesql/cubesql/src/compile/parser.rs +++ b/rust/cubesql/cubesql/src/compile/parser.rs @@ -41,16 +41,16 @@ static SIGMA_WORKAROUND: LazyLock = LazyLock::new(|| { }); pub fn parse_sql_to_statements( - query: &String, + query: &str, protocol: DatabaseProtocol, qtrace: &mut Option, ) -> CompilationResult> { - let original_query = query.clone(); + let original_query = query; log::debug!("Parsing SQL: {}", query); // @todo Support without workarounds // metabase - let query = query.clone().replace("IF(TABLE_TYPE='BASE TABLE' or TABLE_TYPE='SYSTEM VERSIONED', 'TABLE', TABLE_TYPE) as TABLE_TYPE", "TABLE_TYPE"); + let query = query.replace("IF(TABLE_TYPE='BASE TABLE' or TABLE_TYPE='SYSTEM VERSIONED', 'TABLE', TABLE_TYPE) as TABLE_TYPE", "TABLE_TYPE"); let query = query.replace("ORDER BY TABLE_TYPE, TABLE_SCHEMA, TABLE_NAME", ""); // @todo Implement CONVERT function let query = query.replace("CONVERT (CASE DATA_TYPE WHEN 'year' THEN NUMERIC_SCALE WHEN 'tinyint' THEN 0 ELSE NUMERIC_SCALE END, UNSIGNED INTEGER)", "0"); @@ -247,13 +247,14 @@ pub fn parse_sql_to_statements( }; parse_result.map_err(|err| { - CompilationError::user(format!("Unable to parse: {:?}", err)) - .with_meta(Some(HashMap::from([("query".to_string(), original_query)]))) + CompilationError::user(format!("Unable to parse: {:?}", err)).with_meta(Some( + HashMap::from([("query".to_string(), original_query.to_string())]), + )) }) } pub fn parse_sql_to_statement( - query: &String, + query: &str, protocol: DatabaseProtocol, qtrace: &mut Option, ) -> CompilationResult { @@ -274,7 +275,10 @@ pub fn parse_sql_to_statement( )) }; - Err(err.with_meta(Some(HashMap::from([("query".to_string(), query.clone())])))) + Err(err.with_meta(Some(HashMap::from([( + "query".to_string(), + query.to_string(), + )])))) } } } diff --git a/rust/cubesql/cubesql/src/compile/plan.rs b/rust/cubesql/cubesql/src/compile/plan.rs index 6b5926167d2bf..c9bc39d9ffda1 100644 --- a/rust/cubesql/cubesql/src/compile/plan.rs +++ b/rust/cubesql/cubesql/src/compile/plan.rs @@ -78,17 +78,22 @@ impl fmt::Debug for QueryPlan { } impl QueryPlan { - pub fn as_logical_plan(&self) -> LogicalPlan { + pub fn try_as_logical_plan(&self) -> Result<&LogicalPlan, CubeError> { match self { QueryPlan::DataFusionSelect(plan, _) | QueryPlan::CreateTempTable(plan, _, _, _) => { - plan.clone() - } - QueryPlan::MetaOk(_, _) | QueryPlan::MetaTabular(_, _) => { - panic!("This query doesnt have a plan, because it already has values for response") + Ok(plan) } + QueryPlan::MetaOk(_, _) | QueryPlan::MetaTabular(_, _) => Err(CubeError::internal( + "This query doesnt have a plan, because it already has values for response" + .to_string(), + )), } } + pub fn as_logical_plan(&self) -> LogicalPlan { + self.try_as_logical_plan().cloned().unwrap() + } + pub async fn as_physical_plan(&self) -> Result, CubeError> { match self { QueryPlan::DataFusionSelect(plan, ctx) diff --git a/rust/cubesql/cubesql/src/compile/router.rs b/rust/cubesql/cubesql/src/compile/router.rs index b4d933a1fc1de..5aba3b8447967 100644 --- a/rust/cubesql/cubesql/src/compile/router.rs +++ b/rust/cubesql/cubesql/src/compile/router.rs @@ -637,7 +637,7 @@ pub async fn convert_statement_to_cube_query( } pub async fn convert_sql_to_cube_query( - query: &String, + query: &str, meta: Arc, session: Arc, ) -> CompilationResult { diff --git a/rust/cubesql/cubesql/src/sql/postgres/shim.rs b/rust/cubesql/cubesql/src/sql/postgres/shim.rs index 35de490d4277f..88aa84373d983 100644 --- a/rust/cubesql/cubesql/src/sql/postgres/shim.rs +++ b/rust/cubesql/cubesql/src/sql/postgres/shim.rs @@ -1792,8 +1792,7 @@ impl AsyncPostgresShim { let cache_entry = self.get_cache_entry().await?; let meta = self.session.server.compiler_cache.meta(cache_entry).await?; - let statements = - parse_sql_to_statements(&query.to_string(), DatabaseProtocol::PostgreSQL, qtrace)?; + let statements = parse_sql_to_statements(query, DatabaseProtocol::PostgreSQL, qtrace)?; if statements.len() == 0 { self.write(protocol::EmptyQuery::new()).await?;