diff --git a/src/frontend/src/handler/create_mv.rs b/src/frontend/src/handler/create_mv.rs index 397428d28d5aa..b64083d6b070b 100644 --- a/src/frontend/src/handler/create_mv.rs +++ b/src/frontend/src/handler/create_mv.rs @@ -101,7 +101,7 @@ pub fn gen_create_mv_plan( let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose); if emit_on_window_close { - context.warn("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution."); + context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution."); } let mut plan_root = Planner::new(context).plan_query(bound)?; @@ -148,12 +148,18 @@ pub async fn handle_create_mv( ) -> Result { let session = handler_args.session.clone(); - let has_order_by = !query.order_by.is_empty(); - session.check_relation_name_duplicated(name.clone())?; - let (table, graph, mut notices) = { + let (table, graph) = { let context = OptimizerContext::from_handler_args(handler_args); + + let has_order_by = !query.order_by.is_empty(); + if has_order_by { + context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order. +It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view. +"#.to_string()); + } + let (plan, table) = gen_create_mv_plan(&session, context.into(), query, name, columns, emit_mode)?; let context = plan.plan_base().ctx.clone(); @@ -166,9 +172,7 @@ pub async fn handle_create_mv( let env = graph.env.as_mut().unwrap(); env.timezone = context.get_session_timezone(); - let notices = context.take_warnings(); - - (table, graph, notices) + (table, graph) }; let _job_guard = @@ -187,15 +191,8 @@ pub async fn handle_create_mv( .create_materialized_view(table, graph) .await?; - if has_order_by { - notices.push(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order. -It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view. -"#.to_string()); - } - - Ok(PgResponse::empty_result_with_notices( + Ok(PgResponse::empty_result( StatementType::CREATE_MATERIALIZED_VIEW, - notices, )) } @@ -333,11 +330,5 @@ pub mod tests { let sql = "create materialized view mv2 as select * from t order by x"; let response = frontend.run_sql(sql).await.unwrap(); assert_eq!(response.get_stmt_type(), CREATE_MATERIALIZED_VIEW); - assert_eq!( - response.get_notices()[0], - r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order. -It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view. -"# - ); } } diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 689720f4ccf8c..cd430cc1d8d17 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -651,7 +651,7 @@ pub async fn handle_create_table( } } - let (graph, source, table, notices) = { + let (graph, source, table) = { let context = OptimizerContext::from_handler_args(handler_args); let source_schema = check_create_table_with_source(context.with_options(), source_schema)?; let col_id_gen = ColumnIdGenerator::new_initial(); @@ -681,15 +681,12 @@ pub async fn handle_create_table( )?, }; - let context = plan.plan_base().ctx.clone(); - let notices = context.take_warnings(); - let mut graph = build_graph(plan); graph.parallelism = session .config() .get_streaming_parallelism() .map(|parallelism| Parallelism { parallelism }); - (graph, source, table, notices) + (graph, source, table) }; tracing::trace!( @@ -701,10 +698,7 @@ pub async fn handle_create_table( let catalog_writer = session.env().catalog_writer(); catalog_writer.create_table(source, table, graph).await?; - Ok(PgResponse::empty_result_with_notices( - StatementType::CREATE_TABLE, - notices, - )) + Ok(PgResponse::empty_result(StatementType::CREATE_TABLE)) } pub fn check_create_table_with_source( diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index 6e592b2b7ec99..bd46a0896b5ea 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -257,7 +257,6 @@ struct BatchPlanFragmenterResult { pub(crate) schema: Schema, pub(crate) stmt_type: StatementType, pub(crate) _dependent_relations: Vec, - pub(crate) warnings: Vec, } fn gen_batch_plan_fragmenter( @@ -272,7 +271,6 @@ fn gen_batch_plan_fragmenter( dependent_relations, } = plan_result; - let context = plan.plan_base().ctx.clone(); tracing::trace!( "Generated query plan: {:?}, query_mode:{:?}", plan.explain_to_string()?, @@ -288,7 +286,6 @@ fn gen_batch_plan_fragmenter( session.config().get_batch_parallelism(), plan, )?; - let warnings = context.take_warnings(); Ok(BatchPlanFragmenterResult { plan_fragmenter, @@ -296,7 +293,6 @@ fn gen_batch_plan_fragmenter( schema, stmt_type, _dependent_relations: dependent_relations, - warnings, }) } @@ -310,7 +306,6 @@ async fn execute( query_mode, schema, stmt_type, - warnings, .. } = plan_fragmenter_result; @@ -446,7 +441,12 @@ async fn execute( }; Ok(PgResponse::new_for_stream_extra( - stmt_type, rows_count, row_stream, pg_descs, warnings, callback, + stmt_type, + rows_count, + row_stream, + pg_descs, + vec![], + callback, )) } diff --git a/src/frontend/src/optimizer/optimizer_context.rs b/src/frontend/src/optimizer/optimizer_context.rs index 6c08fd7410a69..95f8f678a3816 100644 --- a/src/frontend/src/optimizer/optimizer_context.rs +++ b/src/frontend/src/optimizer/optimizer_context.rs @@ -18,7 +18,6 @@ use std::cell::{RefCell, RefMut}; use std::rc::Rc; use std::sync::Arc; -use itertools::Itertools; use risingwave_sqlparser::ast::{ExplainOptions, ExplainType}; use crate::expr::{CorrelatedId, SessionTimezone}; @@ -51,8 +50,15 @@ pub struct OptimizerContext { session_timezone: RefCell, /// Store expr display id. next_expr_display_id: RefCell, - /// warning messages - warning_messages: RefCell>, +} + +// Still not sure if we need to introduce "on_optimization_finish" or other common callback methods, +impl Drop for OptimizerContext { + fn drop(&mut self) { + if let Some(warning) = self.session_timezone.borrow().warning() { + self.warn_to_user(warning); + }; + } } pub type OptimizerContextRef = Rc; @@ -81,7 +87,6 @@ impl OptimizerContext { with_options: handler_args.with_options, session_timezone, next_expr_display_id: RefCell::new(RESERVED_ID_NUM.into()), - warning_messages: RefCell::new(vec![]), } } @@ -101,7 +106,6 @@ impl OptimizerContext { with_options: Default::default(), session_timezone: RefCell::new(SessionTimezone::new("UTC".into())), next_expr_display_id: RefCell::new(0), - warning_messages: RefCell::new(vec![]), } .into() } @@ -165,11 +169,8 @@ impl OptimizerContext { optimizer_trace.push("\n".to_string()); } - pub fn warn(&self, str: impl Into) { - let mut warnings = self.warning_messages.borrow_mut(); - let string = str.into(); - tracing::trace!("warn to user:{}", string); - warnings.push(string); + pub fn warn_to_user(&self, str: impl Into) { + self.session_ctx().notice_to_user(str); } pub fn store_logical(&self, str: impl Into) { @@ -184,14 +185,6 @@ impl OptimizerContext { self.optimizer_trace.borrow_mut().drain(..).collect() } - pub fn take_warnings(&self) -> Vec { - let mut warnings = self.warning_messages.borrow_mut().drain(..).collect_vec(); - if let Some(warning) = self.session_timezone.borrow().warning() { - warnings.push(warning); - }; - warnings - } - pub fn with_options(&self) -> &WithOptions { &self.with_options } diff --git a/src/frontend/src/optimizer/plan_node/convert.rs b/src/frontend/src/optimizer/plan_node/convert.rs index f6399bc7cbba8..8d285a2c4f712 100644 --- a/src/frontend/src/optimizer/plan_node/convert.rs +++ b/src/frontend/src/optimizer/plan_node/convert.rs @@ -75,7 +75,7 @@ pub fn stream_enforce_eowc_requirement( .into()) } else { if n_watermark_cols > 1 { - ctx.warn("There are multiple watermark columns in the query, currently only the first one will be used."); + ctx.warn_to_user("There are multiple watermark columns in the query, currently only the first one will be used."); } let watermark_col_idx = watermark_cols.ones().next().unwrap(); Ok(StreamSort::new(plan, watermark_col_idx).into()) diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 02a874d82785f..9e6af21aa01e3 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -418,6 +418,8 @@ pub struct SessionImpl { user_authenticator: UserAuthenticator, /// Stores the value of configurations. config_map: RwLock, + /// buffer the Notices to users, + notices: RwLock>, /// Identified by process_id, secret_key. Corresponds to SessionManager. id: (i32, i32), @@ -442,6 +444,7 @@ impl SessionImpl { config_map: Default::default(), id, current_query_cancel_flag: Mutex::new(None), + notices: Default::default(), } } @@ -459,6 +462,7 @@ impl SessionImpl { // Mock session use non-sense id. id: (0, 0), current_query_cancel_flag: Mutex::new(None), + notices: Default::default(), } } @@ -601,6 +605,10 @@ impl SessionImpl { tripwire } + fn clear_notices(&self) { + *self.notices.write() = vec![]; + } + pub fn cancel_current_query(&self) { let mut flag_guard = self.current_query_cancel_flag.lock().unwrap(); if let Some(trigger) = flag_guard.take() { @@ -612,10 +620,12 @@ impl SessionImpl { info!("Trying to cancel query in distributed mode."); self.env.query_manager().cancel_queries_in_session(self.id) } + self.clear_notices() } pub fn cancel_current_creating_job(&self) { self.env.creating_streaming_job_tracker.abort_jobs(self.id); + self.clear_notices() } /// This function only used for test now. @@ -662,6 +672,12 @@ impl SessionImpl { .inspect_err(|e| tracing::error!("failed to handle sql:\n{}:\n{}", sql, e))?; Ok(rsp) } + + pub fn notice_to_user(&self, str: impl Into) { + let notice = str.into(); + tracing::trace!("notice to user:{}", notice); + self.notices.write().push(notice); + } } pub struct SessionManagerImpl { @@ -922,6 +938,11 @@ impl Session for SessionImpl { Portal::PureStatement(statement) => Ok(infer(None, statement)?), } } + + fn take_notices(self: Arc) -> Vec { + let inner = &mut (*self.notices.write()); + std::mem::take(inner) + } } /// Returns row description of the statement diff --git a/src/utils/pgwire/src/pg_protocol.rs b/src/utils/pgwire/src/pg_protocol.rs index cefe12d6529cc..22d73db6f72da 100644 --- a/src/utils/pgwire/src/pg_protocol.rs +++ b/src/utils/pgwire/src/pg_protocol.rs @@ -349,10 +349,12 @@ where let session = session.clone(); // execute query - let mut res = session - .run_one_query(stmt, Format::Text) - .await - .map_err(|err| PsqlError::QueryError(err))?; + let res = session.clone().run_one_query(stmt, Format::Text).await; + for notice in session.take_notices() { + self.stream + .write_no_flush(&BeMessage::NoticeResponse(¬ice))?; + } + let mut res = res.map_err(|err| PsqlError::QueryError(err))?; for notice in res.get_notices() { self.stream diff --git a/src/utils/pgwire/src/pg_server.rs b/src/utils/pgwire/src/pg_server.rs index d6497ba4157d3..59b6fdfc11540 100644 --- a/src/utils/pgwire/src/pg_server.rs +++ b/src/utils/pgwire/src/pg_server.rs @@ -74,6 +74,10 @@ where params_types: Vec, ) -> Result; + // TODO: maybe this function should be async and return the notice more timely + /// try to take the current notices from the session + fn take_notices(self: Arc) -> Vec; + fn bind( self: Arc, prepare_statement: PS, @@ -304,6 +308,10 @@ mod tests { fn id(&self) -> SessionId { (0, 0) } + + fn take_notices(self: Arc) -> Vec { + vec![] + } } #[tokio::test]