Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(fe): refine notice framework #9705

Merged
merged 5 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 12 additions & 21 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub fn gen_create_mv_plan(

let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
if emit_on_window_close {
context.warn("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
}

let mut plan_root = Planner::new(context).plan_query(bound)?;
Expand Down Expand Up @@ -148,12 +148,18 @@ pub async fn handle_create_mv(
) -> Result<RwPgResponse> {
let session = handler_args.session.clone();

let has_order_by = !query.order_by.is_empty();

session.check_relation_name_duplicated(name.clone())?;

let (table, graph, mut notices) = {
let (table, graph) = {
let context = OptimizerContext::from_handler_args(handler_args);

let has_order_by = !query.order_by.is_empty();
if has_order_by {
context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
"#.to_string());
}

let (plan, table) =
gen_create_mv_plan(&session, context.into(), query, name, columns, emit_mode)?;
let context = plan.plan_base().ctx.clone();
Expand All @@ -166,9 +172,7 @@ pub async fn handle_create_mv(
let env = graph.env.as_mut().unwrap();
env.timezone = context.get_session_timezone();

let notices = context.take_warnings();

(table, graph, notices)
(table, graph)
};

let _job_guard =
Expand All @@ -187,15 +191,8 @@ pub async fn handle_create_mv(
.create_materialized_view(table, graph)
.await?;

if has_order_by {
notices.push(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
"#.to_string());
}

Ok(PgResponse::empty_result_with_notices(
Ok(PgResponse::empty_result(
StatementType::CREATE_MATERIALIZED_VIEW,
notices,
))
}

Expand Down Expand Up @@ -333,11 +330,5 @@ pub mod tests {
let sql = "create materialized view mv2 as select * from t order by x";
let response = frontend.run_sql(sql).await.unwrap();
assert_eq!(response.get_stmt_type(), CREATE_MATERIALIZED_VIEW);
assert_eq!(
response.get_notices()[0],
r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
"#
);
}
}
12 changes: 3 additions & 9 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ pub async fn handle_create_table(
}
}

let (graph, source, table, notices) = {
let (graph, source, table) = {
let context = OptimizerContext::from_handler_args(handler_args);
let source_schema = check_create_table_with_source(context.with_options(), source_schema)?;
let col_id_gen = ColumnIdGenerator::new_initial();
Expand Down Expand Up @@ -681,15 +681,12 @@ pub async fn handle_create_table(
)?,
};

let context = plan.plan_base().ctx.clone();
let notices = context.take_warnings();

let mut graph = build_graph(plan);
graph.parallelism = session
.config()
.get_streaming_parallelism()
.map(|parallelism| Parallelism { parallelism });
(graph, source, table, notices)
(graph, source, table)
};

tracing::trace!(
Expand All @@ -701,10 +698,7 @@ pub async fn handle_create_table(
let catalog_writer = session.env().catalog_writer();
catalog_writer.create_table(source, table, graph).await?;

Ok(PgResponse::empty_result_with_notices(
StatementType::CREATE_TABLE,
notices,
))
Ok(PgResponse::empty_result(StatementType::CREATE_TABLE))
}

pub fn check_create_table_with_source(
Expand Down
12 changes: 6 additions & 6 deletions src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ struct BatchPlanFragmenterResult {
pub(crate) schema: Schema,
pub(crate) stmt_type: StatementType,
pub(crate) _dependent_relations: Vec<TableId>,
pub(crate) warnings: Vec<String>,
}

fn gen_batch_plan_fragmenter(
Expand All @@ -272,7 +271,6 @@ fn gen_batch_plan_fragmenter(
dependent_relations,
} = plan_result;

let context = plan.plan_base().ctx.clone();
tracing::trace!(
"Generated query plan: {:?}, query_mode:{:?}",
plan.explain_to_string()?,
Expand All @@ -288,15 +286,13 @@ fn gen_batch_plan_fragmenter(
session.config().get_batch_parallelism(),
plan,
)?;
let warnings = context.take_warnings();

Ok(BatchPlanFragmenterResult {
plan_fragmenter,
query_mode,
schema,
stmt_type,
_dependent_relations: dependent_relations,
warnings,
})
}

Expand All @@ -310,7 +306,6 @@ async fn execute(
query_mode,
schema,
stmt_type,
warnings,
..
} = plan_fragmenter_result;

Expand Down Expand Up @@ -446,7 +441,12 @@ async fn execute(
};

Ok(PgResponse::new_for_stream_extra(
stmt_type, rows_count, row_stream, pg_descs, warnings, callback,
stmt_type,
rows_count,
row_stream,
pg_descs,
vec![],
callback,
))
}

Expand Down
29 changes: 11 additions & 18 deletions src/frontend/src/optimizer/optimizer_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::cell::{RefCell, RefMut};
use std::rc::Rc;
use std::sync::Arc;

use itertools::Itertools;
use risingwave_sqlparser::ast::{ExplainOptions, ExplainType};

use crate::expr::{CorrelatedId, SessionTimezone};
Expand Down Expand Up @@ -51,8 +50,15 @@ pub struct OptimizerContext {
session_timezone: RefCell<SessionTimezone>,
/// Store expr display id.
next_expr_display_id: RefCell<usize>,
/// warning messages
warning_messages: RefCell<Vec<String>>,
}

// Still not sure if we need to introduce "on_optimization_finish" or other common callback methods,
impl Drop for OptimizerContext {
fn drop(&mut self) {
if let Some(warning) = self.session_timezone.borrow().warning() {
self.warn_to_user(warning);
};
}
}

pub type OptimizerContextRef = Rc<OptimizerContext>;
Expand Down Expand Up @@ -81,7 +87,6 @@ impl OptimizerContext {
with_options: handler_args.with_options,
session_timezone,
next_expr_display_id: RefCell::new(RESERVED_ID_NUM.into()),
warning_messages: RefCell::new(vec![]),
}
}

Expand All @@ -101,7 +106,6 @@ impl OptimizerContext {
with_options: Default::default(),
session_timezone: RefCell::new(SessionTimezone::new("UTC".into())),
next_expr_display_id: RefCell::new(0),
warning_messages: RefCell::new(vec![]),
}
.into()
}
Expand Down Expand Up @@ -165,11 +169,8 @@ impl OptimizerContext {
optimizer_trace.push("\n".to_string());
}

pub fn warn(&self, str: impl Into<String>) {
let mut warnings = self.warning_messages.borrow_mut();
let string = str.into();
tracing::trace!("warn to user:{}", string);
warnings.push(string);
pub fn warn_to_user(&self, str: impl Into<String>) {
self.session_ctx().notice_to_user(str);
}

pub fn store_logical(&self, str: impl Into<String>) {
Expand All @@ -184,14 +185,6 @@ impl OptimizerContext {
self.optimizer_trace.borrow_mut().drain(..).collect()
}

pub fn take_warnings(&self) -> Vec<String> {
let mut warnings = self.warning_messages.borrow_mut().drain(..).collect_vec();
if let Some(warning) = self.session_timezone.borrow().warning() {
warnings.push(warning);
};
warnings
}

pub fn with_options(&self) -> &WithOptions {
&self.with_options
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub fn stream_enforce_eowc_requirement(
.into())
} else {
if n_watermark_cols > 1 {
ctx.warn("There are multiple watermark columns in the query, currently only the first one will be used.");
ctx.warn_to_user("There are multiple watermark columns in the query, currently only the first one will be used.");
}
let watermark_col_idx = watermark_cols.ones().next().unwrap();
Ok(StreamSort::new(plan, watermark_col_idx).into())
Expand Down
21 changes: 21 additions & 0 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,8 @@ pub struct SessionImpl {
user_authenticator: UserAuthenticator,
/// Stores the value of configurations.
config_map: RwLock<ConfigMap>,
/// buffer the Notices to users,
notices: RwLock<Vec<String>>,

/// Identified by process_id, secret_key. Corresponds to SessionManager.
id: (i32, i32),
Expand All @@ -442,6 +444,7 @@ impl SessionImpl {
config_map: Default::default(),
id,
current_query_cancel_flag: Mutex::new(None),
notices: Default::default(),
}
}

Expand All @@ -459,6 +462,7 @@ impl SessionImpl {
// Mock session use non-sense id.
id: (0, 0),
current_query_cancel_flag: Mutex::new(None),
notices: Default::default(),
}
}

Expand Down Expand Up @@ -601,6 +605,10 @@ impl SessionImpl {
tripwire
}

fn clear_notices(&self) {
*self.notices.write() = vec![];
}

pub fn cancel_current_query(&self) {
let mut flag_guard = self.current_query_cancel_flag.lock().unwrap();
if let Some(trigger) = flag_guard.take() {
Expand All @@ -612,10 +620,12 @@ impl SessionImpl {
info!("Trying to cancel query in distributed mode.");
self.env.query_manager().cancel_queries_in_session(self.id)
}
self.clear_notices()
}

pub fn cancel_current_creating_job(&self) {
self.env.creating_streaming_job_tracker.abort_jobs(self.id);
self.clear_notices()
}

/// This function only used for test now.
Expand Down Expand Up @@ -662,6 +672,12 @@ impl SessionImpl {
.inspect_err(|e| tracing::error!("failed to handle sql:\n{}:\n{}", sql, e))?;
Ok(rsp)
}

pub fn notice_to_user(&self, str: impl Into<String>) {
let notice = str.into();
tracing::trace!("notice to user:{}", notice);
self.notices.write().push(notice);
}
}

pub struct SessionManagerImpl {
Expand Down Expand Up @@ -922,6 +938,11 @@ impl Session<PgResponseStream, PrepareStatement, Portal> for SessionImpl {
Portal::PureStatement(statement) => Ok(infer(None, statement)?),
}
}

fn take_notices(self: Arc<Self>) -> Vec<String> {
let inner = &mut (*self.notices.write());
std::mem::take(inner)
}
}

/// Returns row description of the statement
Expand Down
10 changes: 6 additions & 4 deletions src/utils/pgwire/src/pg_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,12 @@ where
let session = session.clone();

// execute query
let mut res = session
.run_one_query(stmt, Format::Text)
.await
.map_err(|err| PsqlError::QueryError(err))?;
let res = session.clone().run_one_query(stmt, Format::Text).await;
for notice in session.take_notices() {
self.stream
.write_no_flush(&BeMessage::NoticeResponse(&notice))?;
}
let mut res = res.map_err(|err| PsqlError::QueryError(err))?;

for notice in res.get_notices() {
self.stream
Expand Down
8 changes: 8 additions & 0 deletions src/utils/pgwire/src/pg_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ where
params_types: Vec<DataType>,
) -> Result<PS, BoxedError>;

// TODO: maybe this function should be async and return the notice more timely
/// try to take the current notices from the session
fn take_notices(self: Arc<Self>) -> Vec<String>;

fn bind(
self: Arc<Self>,
prepare_statement: PS,
Expand Down Expand Up @@ -304,6 +308,10 @@ mod tests {
fn id(&self) -> SessionId {
(0, 0)
}

fn take_notices(self: Arc<Self>) -> Vec<String> {
vec![]
}
}

#[tokio::test]
Expand Down