diff --git a/e2e_test/batch/basic/make_timestamptz.slt.part b/e2e_test/batch/basic/make_timestamptz.slt.part index 99f5d1369327c..fe03cccd1fbbc 100644 --- a/e2e_test/batch/basic/make_timestamptz.slt.part +++ b/e2e_test/batch/basic/make_timestamptz.slt.part @@ -45,6 +45,48 @@ select * from ttz; statement ok drop table ttz; +statement ok +CREATE TABLE tint(num int); + +statement ok +CREATE MATERIALIZED VIEW mv1 as SELECT make_timestamptz(num, num, num, num, num, num, 'Asia/Manila') from tint; + +statement ok +CREATE MATERIALIZED VIEW mv2 as SELECT make_timestamptz(num, num, num, num, num, num, 'America/New_York') from tint; + +statement ok +CREATE MATERIALIZED VIEW mv3 as SELECT make_timestamptz(num, num, num, num, num, num) from tint; + +statement ok +insert into tint values(1); + +query TT +select * from mv1; +---- +0001-01-01 12:00:59-04:56 + +query TT +select * from mv2; +---- +0001-01-01 01:01:01-04:56 + +query TT +select * from mv3; +---- +0001-01-01 01:01:01-04:56 + +statement ok +DROP MATERIALIZED VIEW mv1; + +statement ok +DROP MATERIALIZED VIEW mv2; + +statement ok +DROP MATERIALIZED VIEW mv3; + +statement ok +DROP TABLE tint; + query error Invalid parameter time_zone: 'Nehwon/Lankhmar' is not a valid timezone SELECT make_timestamptz(1910, 12, 24, 0, 0, 0, 'Nehwon/Lankhmar'); diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index fd7988413aeed..e3737e45acfd8 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -810,6 +810,8 @@ message StreamActor { common.Buffer vnode_bitmap = 8; // The SQL definition of this materialized view. Used for debugging only. string mview_definition = 9; + // Provide the necessary context, e.g. session info like time zone, for the actor. + plan_common.CapturedExecutionContext captured_execution_context = 10; } enum FragmentTypeFlag { diff --git a/src/expr/macro/src/context.rs b/src/expr/macro/src/context.rs index 683f873042e79..09b24a23692fd 100644 --- a/src/expr/macro/src/context.rs +++ b/src/expr/macro/src/context.rs @@ -14,9 +14,9 @@ use itertools::Itertools; use proc_macro2::TokenStream; -use quote::{quote, quote_spanned, ToTokens}; +use quote::{format_ident, quote, quote_spanned, ToTokens}; use syn::parse::{Parse, ParseStream}; -use syn::{Error, Expr, FnArg, Ident, ItemFn, Result, Token, Type, Visibility}; +use syn::{Error, Expr, ExprAsync, FnArg, Ident, ItemFn, Result, Token, Type, Visibility}; use crate::utils::extend_vis_with_super; @@ -225,6 +225,7 @@ pub(super) fn generate_captured_function( }) } +/// See [`super::captured_execution_context_scope!`]. pub(super) struct CapturedExecutionContextScopeInput { pub context: Expr, pub closure: Expr, @@ -238,3 +239,37 @@ impl Parse for CapturedExecutionContextScopeInput { Ok(Self { context, closure }) } } + +impl CapturedExecutionContextScopeInput { + pub(super) fn gen(self) -> Result { + let Self { context, closure } = self; + let capture_type = if let Expr::Async(ExprAsync { capture, .. }) = closure.clone() { + match capture { + Some(_) => quote! {move}, + None => quote! {}, + } + } else { + quote! {} + }; + + let ctx = quote! { let ctx = #context; }; + let mut body = quote! { #closure }; + let fields = vec![format_ident!("time_zone")]; + fields.iter().for_each(|field| { + let local_key_name = format_ident!("{}", field.to_string().to_uppercase()); + body = quote! { + async #capture_type { + use risingwave_expr::captured_execution_context::#local_key_name; + #local_key_name::scope(ctx.#field.to_owned(), #body).await + } + }; + }); + body = quote! { + async #capture_type { + #ctx + #body.await + } + }; + Ok(body) + } +} diff --git a/src/expr/macro/src/lib.rs b/src/expr/macro/src/lib.rs index 0da623001a2ed..34af84e4affd0 100644 --- a/src/expr/macro/src/lib.rs +++ b/src/expr/macro/src/lib.rs @@ -23,7 +23,6 @@ use context::{ }; use proc_macro::TokenStream; use proc_macro2::TokenStream as TokenStream2; -use quote::{format_ident, quote}; use syn::{Error, ItemFn, Result}; mod context; @@ -655,27 +654,8 @@ pub fn capture_context(attr: TokenStream, item: TokenStream) -> TokenStream { #[proc_macro] pub fn captured_execution_context_scope(input: TokenStream) -> TokenStream { fn inner(input: TokenStream) -> Result { - let CapturedExecutionContextScopeInput { context, closure } = syn::parse(input)?; - - let ctx = quote! { let ctx = #context; }; - let mut body = quote! { #closure }; - let fields = vec![format_ident!("time_zone")]; - fields.iter().for_each(|field| { - let local_key_name = format_ident!("{}", field.to_string().to_uppercase()); - body = quote! { - async { - use risingwave_expr::captured_execution_context::#local_key_name; - #local_key_name::scope(ctx.#field.to_owned(), #body).await - } - }; - }); - body = quote! { - async { - #ctx - #body.await - } - }; - Ok(body) + let input: CapturedExecutionContextScopeInput = syn::parse(input)?; + input.gen() } match inner(input) { diff --git a/src/meta/model_v2/src/actor.rs b/src/meta/model_v2/src/actor.rs index 6cb9199cc9ae7..d3b94e655b4da 100644 --- a/src/meta/model_v2/src/actor.rs +++ b/src/meta/model_v2/src/actor.rs @@ -15,7 +15,10 @@ use risingwave_pb::meta::table_fragments::actor_status::PbActorState; use sea_orm::entity::prelude::*; -use crate::{ActorId, ActorUpstreamActors, ConnectorSplits, FragmentId, VnodeBitmap}; +use crate::{ + ActorId, ActorUpstreamActors, CapturedExecutionContext, ConnectorSplits, FragmentId, + VnodeBitmap, +}; #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] #[sea_orm(rs_type = "String", db_type = "String(None)")] @@ -56,6 +59,7 @@ pub struct Model { pub parallel_unit_id: i32, pub upstream_actor_ids: ActorUpstreamActors, pub vnode_bitmap: Option, + pub captured_execution_context: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index adbef41cf9c8c..13bf09fd607f6 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -201,6 +201,10 @@ derive_from_json_struct!(StreamNode, risingwave_pb::stream_plan::PbStreamNode); derive_from_json_struct!(ConnectorSplits, risingwave_pb::source::ConnectorSplits); derive_from_json_struct!(VnodeBitmap, risingwave_pb::common::Buffer); derive_from_json_struct!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping); +derive_from_json_struct!( + CapturedExecutionContext, + risingwave_pb::plan_common::PbCapturedExecutionContext +); derive_from_json_struct!( FragmentVnodeMapping, diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index dc36c6466e57c..f47d5a35600a2 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -22,8 +22,9 @@ use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, StreamingJob}; use risingwave_meta_model_v2::{ - actor, actor_dispatcher, fragment, ActorId, ConnectorSplits, FragmentId, FragmentVnodeMapping, - I32Array, ObjectId, StreamNode, TableId, VnodeBitmap, WorkerId, + actor, actor_dispatcher, fragment, ActorId, CapturedExecutionContext, ConnectorSplits, + FragmentId, FragmentVnodeMapping, I32Array, ObjectId, StreamNode, TableId, VnodeBitmap, + WorkerId, }; use risingwave_pb::common::PbParallelUnit; use risingwave_pb::ddl_service::PbTableJobType; @@ -170,6 +171,7 @@ impl CatalogController { upstream_actor_id: pb_upstream_actor_id, vnode_bitmap: pb_vnode_bitmap, mview_definition: _, + captured_execution_context: pb_captured_execution_context, } = actor; let splits = pb_actor_splits.get(&actor_id).cloned().map(ConnectorSplits); @@ -208,6 +210,8 @@ impl CatalogController { parallel_unit_id, upstream_actor_ids: upstream_actors.into(), vnode_bitmap: pb_vnode_bitmap.map(VnodeBitmap), + captured_execution_context: pb_captured_execution_context + .map(CapturedExecutionContext), }); actor_dispatchers.insert( actor_id as ActorId, @@ -327,7 +331,7 @@ impl CatalogController { splits, upstream_actor_ids, vnode_bitmap, - .. + captured_execution_context, } = actor; let upstream_fragment_actors = upstream_actor_ids.into_inner(); @@ -349,6 +353,8 @@ impl CatalogController { }; let pb_vnode_bitmap = vnode_bitmap.map(|vnode_bitmap| vnode_bitmap.into_inner()); + let pb_captured_execution_context = + captured_execution_context.map(|ctx| ctx.into_inner()); let pb_upstream_actor_id = upstream_fragment_actors .values() @@ -388,6 +394,7 @@ impl CatalogController { upstream_actor_id: pb_upstream_actor_id, vnode_bitmap: pb_vnode_bitmap, mview_definition: "".to_string(), + captured_execution_context: pb_captured_execution_context, }) } @@ -934,13 +941,15 @@ mod tests { use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::fragment::DistributionType; use risingwave_meta_model_v2::{ - actor, actor_dispatcher, fragment, ActorId, ActorUpstreamActors, ConnectorSplits, - FragmentId, FragmentVnodeMapping, I32Array, ObjectId, StreamNode, TableId, VnodeBitmap, + actor, actor_dispatcher, fragment, ActorId, ActorUpstreamActors, CapturedExecutionContext, + ConnectorSplits, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, StreamNode, TableId, + VnodeBitmap, }; use risingwave_pb::common::ParallelUnit; use risingwave_pb::meta::table_fragments::actor_status::PbActorState; use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType; use risingwave_pb::meta::table_fragments::{PbActorStatus, PbFragment}; + use risingwave_pb::plan_common::PbCapturedExecutionContext; use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits}; use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody}; use risingwave_pb::stream_plan::{ @@ -1047,6 +1056,9 @@ mod tests { .cloned() .map(|bitmap| bitmap.to_protobuf()), mview_definition: "".to_string(), + captured_execution_context: Some(PbCapturedExecutionContext { + time_zone: String::from("America/New_York"), + }), } }) .collect_vec(); @@ -1168,6 +1180,11 @@ mod tests { parallel_unit_id: parallel_unit_id as i32, upstream_actor_ids: ActorUpstreamActors(actor_upstream_actor_ids), vnode_bitmap, + captured_execution_context: Some(CapturedExecutionContext( + PbCapturedExecutionContext { + time_zone: String::from("America/New_York"), + }, + )), } }) .collect_vec(); @@ -1252,6 +1269,7 @@ mod tests { parallel_unit_id, upstream_actor_ids, vnode_bitmap, + captured_execution_context, }, PbStreamActor { actor_id: pb_actor_id, @@ -1261,6 +1279,7 @@ mod tests { upstream_actor_id: pb_upstream_actor_id, vnode_bitmap: pb_vnode_bitmap, mview_definition, + captured_execution_context: pb_captured_execution_context, }, ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter()) { @@ -1316,6 +1335,11 @@ mod tests { .cloned() .map(ConnectorSplits) ); + + assert_eq!( + captured_execution_context.map(|ctx| ctx.into_inner()), + pb_captured_execution_context + ); } } diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 5f9b303c77a5c..b6c8f6650bf16 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -23,6 +23,7 @@ use risingwave_pb::common::{ParallelUnit, ParallelUnitMapping}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State}; use risingwave_pb::meta::PbTableFragments; +use risingwave_pb::plan_common::PbCapturedExecutionContext; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ FragmentTypeFlag, PbStreamEnvironment, StreamActor, StreamNode, StreamSource, @@ -74,6 +75,13 @@ impl StreamEnvironment { } } + pub fn to_captured_execution_context(&self) -> PbCapturedExecutionContext { + PbCapturedExecutionContext { + // `self.timezone` must always be set; an invalid value is used here for debugging if it's not. + time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()), + } + } + pub fn from_protobuf(prost: &PbStreamEnvironment) -> Self { Self { timezone: if prost.get_timezone().is_empty() { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 98360eee83cb2..b489b72c0569c 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -747,6 +747,7 @@ impl DdlController { let id = stream_job.id(); let default_parallelism = fragment_graph.default_parallelism(); let internal_tables = fragment_graph.internal_tables(); + let captured_execution_context = env.to_captured_execution_context(); // 1. Resolve the upstream fragments, extend the fragment graph to a complete graph that // contains all information needed for building the actor graph. @@ -790,7 +791,11 @@ impl DdlController { dispatchers, merge_updates, } = actor_graph_builder - .generate_graph(self.env.id_gen_manager_ref(), stream_job) + .generate_graph( + self.env.id_gen_manager_ref(), + stream_job, + captured_execution_context, + ) .await?; assert!(merge_updates.is_empty()); @@ -1071,6 +1076,7 @@ impl DdlController { ) -> MetaResult<(ReplaceTableContext, TableFragments)> { let id = stream_job.id(); let default_parallelism = fragment_graph.default_parallelism(); + let captured_execution_context = env.to_captured_execution_context(); let old_table_fragments = self .fragment_manager @@ -1123,7 +1129,11 @@ impl DdlController { dispatchers, merge_updates, } = actor_graph_builder - .generate_graph(self.env.id_gen_manager_ref(), stream_job) + .generate_graph( + self.env.id_gen_manager_ref(), + stream_job, + captured_execution_context, + ) .await?; assert!(dispatchers.is_empty()); diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 4a5339431f0aa..762b245bba73b 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -23,6 +23,7 @@ use risingwave_common::buffer::Bitmap; use risingwave_common::hash::{ActorId, ActorMapping, ParallelUnitId}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::meta::table_fragments::Fragment; +use risingwave_pb::plan_common::CapturedExecutionContext; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::{ @@ -258,7 +259,11 @@ impl ActorBuilder { } /// Build an actor after all the upstreams and downstreams are processed. - fn build(self, job: &StreamingJob) -> MetaResult { + fn build( + self, + job: &StreamingJob, + captured_execution_context: CapturedExecutionContext, + ) -> MetaResult { let rewritten_nodes = self.rewrite()?; // TODO: store each upstream separately @@ -281,6 +286,7 @@ impl ActorBuilder { upstream_actor_id, vnode_bitmap: self.vnode_bitmap.map(|b| b.to_protobuf()), mview_definition, + captured_execution_context: Some(captured_execution_context), }) } } @@ -703,6 +709,7 @@ impl ActorGraphBuilder { self, id_gen_manager: IdGeneratorManagerRef, job: &StreamingJob, + captured_execution_context: CapturedExecutionContext, ) -> MetaResult { // Pre-generate IDs for all actors. let actor_len = self @@ -741,7 +748,7 @@ impl ActorGraphBuilder { // and `Chain` are rewritten. for builder in actor_builders.into_values() { let fragment_id = builder.fragment_id(); - let actor = builder.build(job)?; + let actor = builder.build(job, captured_execution_context.clone())?; actors.entry(fragment_id).or_default().push(actor); } diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index 1ad03e1967d1a..ef956ed64b134 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -29,7 +29,7 @@ use risingwave_pb::expr::agg_call::Type; use risingwave_pb::expr::expr_node::RexNode; use risingwave_pb::expr::expr_node::Type::{Add, GreaterThan}; use risingwave_pb::expr::{AggCall, ExprNode, FunctionCall, PbInputRef}; -use risingwave_pb::plan_common::{ColumnCatalog, ColumnDesc, Field}; +use risingwave_pb::plan_common::{CapturedExecutionContext, ColumnCatalog, ColumnDesc, Field}; use risingwave_pb::stream_plan::stream_fragment_graph::{StreamFragment, StreamFragmentEdge}; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ @@ -456,6 +456,9 @@ async fn test_graph_builder() -> MetaResult<()> { let job = StreamingJob::Table(None, make_materialize_table(888), TableJobType::General); let graph = make_stream_graph(); + let captured_execution_context = CapturedExecutionContext { + time_zone: graph.env.as_ref().unwrap().timezone.clone(), + }; let fragment_graph = StreamFragmentGraph::new(graph, env.id_gen_manager_ref(), &job).await?; let internal_tables = fragment_graph.internal_tables(); @@ -465,7 +468,7 @@ async fn test_graph_builder() -> MetaResult<()> { NonZeroUsize::new(parallel_degree).unwrap(), )?; let ActorGraphBuildResult { graph, .. } = actor_graph_builder - .generate_graph(env.id_gen_manager_ref(), &job) + .generate_graph(env.id_gen_manager_ref(), &job, captured_execution_context) .await?; let table_fragments = TableFragments::for_test(TableId::default(), graph); diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index 3933efcbcaf30..a1a9b1948e7b3 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -23,7 +23,8 @@ use parking_lot::Mutex; use risingwave_common::error::ErrorSuppressor; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; use risingwave_common::util::epoch::EpochPair; -use risingwave_expr::ExprError; +use risingwave_expr::{captured_execution_context_scope, ExprError}; +use risingwave_pb::plan_common::CapturedExecutionContext; use tokio_stream::StreamExt; use tracing::Instrument; @@ -128,6 +129,7 @@ pub struct Actor { context: Arc, _metrics: Arc, actor_context: ActorContextRef, + captured_execution_context: CapturedExecutionContext, } impl Actor @@ -140,6 +142,7 @@ where context: Arc, metrics: Arc, actor_context: ActorContextRef, + captured_execution_context: CapturedExecutionContext, ) -> Self { Self { consumer, @@ -147,17 +150,21 @@ where context, _metrics: metrics, actor_context, + captured_execution_context, } } #[inline(always)] pub async fn run(mut self) -> StreamResult<()> { - tokio::join!( - // Drive the subtasks concurrently. - join_all(std::mem::take(&mut self.subtasks)), - self.run_consumer(), - ) - .1 + captured_execution_context_scope!(self.captured_execution_context.clone(), async move { + tokio::join!( + // Drive the subtasks concurrently. + join_all(std::mem::take(&mut self.subtasks)), + self.run_consumer(), + ) + .1 + }) + .await } async fn run_consumer(self) -> StreamResult<()> { diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs index dd2407d2efece..b094f35b3d0c5 100644 --- a/src/stream/src/executor/integration_tests.rs +++ b/src/stream/src/executor/integration_tests.rs @@ -22,6 +22,7 @@ use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::*; use risingwave_expr::aggregate::AggCall; use risingwave_expr::expr::*; +use risingwave_pb::plan_common::CapturedExecutionContext; use risingwave_storage::memory::MemoryStateStore; use super::exchange::permit::channel_for_test; @@ -78,9 +79,13 @@ async fn test_merger_sum_aggr() { context, StreamingMetrics::unused().into(), actor_ctx.clone(), + captured_execution_context.clone(), ); (actor, rx) }; + let captured_execution_context = CapturedExecutionContext { + time_zone: String::from("UTC"), + }; // join handles of all actors let mut handles = vec![]; @@ -129,6 +134,7 @@ async fn test_merger_sum_aggr() { context, StreamingMetrics::unused().into(), actor_ctx.clone(), + captured_execution_context.clone(), ); handles.push(tokio::spawn(actor.run())); @@ -184,6 +190,7 @@ async fn test_merger_sum_aggr() { context, StreamingMetrics::unused().into(), actor_ctx.clone(), + captured_execution_context.clone(), ); handles.push(tokio::spawn(actor.run())); diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 7f0bcc03e2186..2ff12229209e9 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -667,6 +667,7 @@ impl LocalStreamManagerCore { .map(|b| b.try_into()) .transpose() .context("failed to decode vnode bitmap")?; + let captured_execution_context = actor.captured_execution_context.clone().unwrap(); let (executor, subtasks) = self .create_nodes( @@ -688,6 +689,7 @@ impl LocalStreamManagerCore { self.context.clone(), self.streaming_metrics.clone(), actor_context.clone(), + captured_execution_context, ); let monitor = tokio_metrics::TaskMonitor::new();