Skip to content

Commit

Permalink
feat(expr): support streaming make_timestamptz
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Dec 4, 2023
1 parent ba4b196 commit 1219100
Show file tree
Hide file tree
Showing 14 changed files with 178 additions and 43 deletions.
42 changes: 42 additions & 0 deletions e2e_test/batch/basic/make_timestamptz.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,48 @@ select * from ttz;
statement ok
drop table ttz;

statement ok
CREATE TABLE tint(num int);

statement ok
CREATE MATERIALIZED VIEW mv1 as SELECT make_timestamptz(num, num, num, num, num, num, 'Asia/Manila') from tint;

statement ok
CREATE MATERIALIZED VIEW mv2 as SELECT make_timestamptz(num, num, num, num, num, num, 'America/New_York') from tint;

statement ok
CREATE MATERIALIZED VIEW mv3 as SELECT make_timestamptz(num, num, num, num, num, num) from tint;

statement ok
insert into tint values(1);

query TT
select * from mv1;
----
0001-01-01 12:00:59-04:56

query TT
select * from mv2;
----
0001-01-01 01:01:01-04:56

query TT
select * from mv3;
----
0001-01-01 01:01:01-04:56

statement ok
DROP MATERIALIZED VIEW mv1;

statement ok
DROP MATERIALIZED VIEW mv2;

statement ok
DROP MATERIALIZED VIEW mv3;

statement ok
DROP TABLE tint;

query error Invalid parameter time_zone: 'Nehwon/Lankhmar' is not a valid timezone
SELECT make_timestamptz(1910, 12, 24, 0, 0, 0, 'Nehwon/Lankhmar');

Expand Down
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,8 @@ message StreamActor {
common.Buffer vnode_bitmap = 8;
// The SQL definition of this materialized view. Used for debugging only.
string mview_definition = 9;
// Provide the necessary context, e.g. session info like time zone, for the actor.
plan_common.CapturedExecutionContext captured_execution_context = 10;
}

enum FragmentTypeFlag {
Expand Down
39 changes: 37 additions & 2 deletions src/expr/macro/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

use itertools::Itertools;
use proc_macro2::TokenStream;
use quote::{quote, quote_spanned, ToTokens};
use quote::{format_ident, quote, quote_spanned, ToTokens};
use syn::parse::{Parse, ParseStream};
use syn::{Error, Expr, FnArg, Ident, ItemFn, Result, Token, Type, Visibility};
use syn::{Error, Expr, ExprAsync, FnArg, Ident, ItemFn, Result, Token, Type, Visibility};

use crate::utils::extend_vis_with_super;

Expand Down Expand Up @@ -225,6 +225,7 @@ pub(super) fn generate_captured_function(
})
}

/// See [`super::captured_execution_context_scope!`].
pub(super) struct CapturedExecutionContextScopeInput {
pub context: Expr,
pub closure: Expr,
Expand All @@ -238,3 +239,37 @@ impl Parse for CapturedExecutionContextScopeInput {
Ok(Self { context, closure })
}
}

impl CapturedExecutionContextScopeInput {
pub(super) fn gen(self) -> Result<TokenStream> {
let Self { context, closure } = self;
let capture_type = if let Expr::Async(ExprAsync { capture, .. }) = closure.clone() {
match capture {
Some(_) => quote! {move},
None => quote! {},
}
} else {
quote! {}
};

let ctx = quote! { let ctx = #context; };
let mut body = quote! { #closure };
let fields = vec![format_ident!("time_zone")];
fields.iter().for_each(|field| {
let local_key_name = format_ident!("{}", field.to_string().to_uppercase());
body = quote! {
async #capture_type {
use risingwave_expr::captured_execution_context::#local_key_name;
#local_key_name::scope(ctx.#field.to_owned(), #body).await
}
};
});
body = quote! {
async #capture_type {
#ctx
#body.await
}
};
Ok(body)
}
}
24 changes: 2 additions & 22 deletions src/expr/macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use context::{
};
use proc_macro::TokenStream;
use proc_macro2::TokenStream as TokenStream2;
use quote::{format_ident, quote};
use syn::{Error, ItemFn, Result};

mod context;
Expand Down Expand Up @@ -655,27 +654,8 @@ pub fn capture_context(attr: TokenStream, item: TokenStream) -> TokenStream {
#[proc_macro]
pub fn captured_execution_context_scope(input: TokenStream) -> TokenStream {
fn inner(input: TokenStream) -> Result<TokenStream2> {
let CapturedExecutionContextScopeInput { context, closure } = syn::parse(input)?;

let ctx = quote! { let ctx = #context; };
let mut body = quote! { #closure };
let fields = vec![format_ident!("time_zone")];
fields.iter().for_each(|field| {
let local_key_name = format_ident!("{}", field.to_string().to_uppercase());
body = quote! {
async {
use risingwave_expr::captured_execution_context::#local_key_name;
#local_key_name::scope(ctx.#field.to_owned(), #body).await
}
};
});
body = quote! {
async {
#ctx
#body.await
}
};
Ok(body)
let input: CapturedExecutionContextScopeInput = syn::parse(input)?;
input.gen()
}

match inner(input) {
Expand Down
6 changes: 5 additions & 1 deletion src/meta/model_v2/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
use sea_orm::entity::prelude::*;

use crate::{ActorId, ActorUpstreamActors, ConnectorSplits, FragmentId, VnodeBitmap};
use crate::{
ActorId, ActorUpstreamActors, CapturedExecutionContext, ConnectorSplits, FragmentId,
VnodeBitmap,
};

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
Expand Down Expand Up @@ -56,6 +59,7 @@ pub struct Model {
pub parallel_unit_id: i32,
pub upstream_actor_ids: ActorUpstreamActors,
pub vnode_bitmap: Option<VnodeBitmap>,
pub captured_execution_context: Option<CapturedExecutionContext>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
4 changes: 4 additions & 0 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ derive_from_json_struct!(StreamNode, risingwave_pb::stream_plan::PbStreamNode);
derive_from_json_struct!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
derive_from_json_struct!(VnodeBitmap, risingwave_pb::common::Buffer);
derive_from_json_struct!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
derive_from_json_struct!(
CapturedExecutionContext,
risingwave_pb::plan_common::PbCapturedExecutionContext
);

derive_from_json_struct!(
FragmentVnodeMapping,
Expand Down
34 changes: 29 additions & 5 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_meta_model_v2::actor::ActorStatus;
use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, StreamingJob};
use risingwave_meta_model_v2::{
actor, actor_dispatcher, fragment, ActorId, ConnectorSplits, FragmentId, FragmentVnodeMapping,
I32Array, ObjectId, StreamNode, TableId, VnodeBitmap, WorkerId,
actor, actor_dispatcher, fragment, ActorId, CapturedExecutionContext, ConnectorSplits,
FragmentId, FragmentVnodeMapping, I32Array, ObjectId, StreamNode, TableId, VnodeBitmap,
WorkerId,
};
use risingwave_pb::common::PbParallelUnit;
use risingwave_pb::ddl_service::PbTableJobType;
Expand Down Expand Up @@ -170,6 +171,7 @@ impl CatalogController {
upstream_actor_id: pb_upstream_actor_id,
vnode_bitmap: pb_vnode_bitmap,
mview_definition: _,
captured_execution_context: pb_captured_execution_context,
} = actor;

let splits = pb_actor_splits.get(&actor_id).cloned().map(ConnectorSplits);
Expand Down Expand Up @@ -208,6 +210,8 @@ impl CatalogController {
parallel_unit_id,
upstream_actor_ids: upstream_actors.into(),
vnode_bitmap: pb_vnode_bitmap.map(VnodeBitmap),
captured_execution_context: pb_captured_execution_context
.map(CapturedExecutionContext),
});
actor_dispatchers.insert(
actor_id as ActorId,
Expand Down Expand Up @@ -327,7 +331,7 @@ impl CatalogController {
splits,
upstream_actor_ids,
vnode_bitmap,
..
captured_execution_context,
} = actor;

let upstream_fragment_actors = upstream_actor_ids.into_inner();
Expand All @@ -349,6 +353,8 @@ impl CatalogController {
};

let pb_vnode_bitmap = vnode_bitmap.map(|vnode_bitmap| vnode_bitmap.into_inner());
let pb_captured_execution_context =
captured_execution_context.map(|ctx| ctx.into_inner());

let pb_upstream_actor_id = upstream_fragment_actors
.values()
Expand Down Expand Up @@ -388,6 +394,7 @@ impl CatalogController {
upstream_actor_id: pb_upstream_actor_id,
vnode_bitmap: pb_vnode_bitmap,
mview_definition: "".to_string(),
captured_execution_context: pb_captured_execution_context,
})
}

Expand Down Expand Up @@ -934,13 +941,15 @@ mod tests {
use risingwave_meta_model_v2::actor::ActorStatus;
use risingwave_meta_model_v2::fragment::DistributionType;
use risingwave_meta_model_v2::{
actor, actor_dispatcher, fragment, ActorId, ActorUpstreamActors, ConnectorSplits,
FragmentId, FragmentVnodeMapping, I32Array, ObjectId, StreamNode, TableId, VnodeBitmap,
actor, actor_dispatcher, fragment, ActorId, ActorUpstreamActors, CapturedExecutionContext,
ConnectorSplits, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, StreamNode, TableId,
VnodeBitmap,
};
use risingwave_pb::common::ParallelUnit;
use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
use risingwave_pb::meta::table_fragments::{PbActorStatus, PbFragment};
use risingwave_pb::plan_common::PbCapturedExecutionContext;
use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
use risingwave_pb::stream_plan::{
Expand Down Expand Up @@ -1047,6 +1056,9 @@ mod tests {
.cloned()
.map(|bitmap| bitmap.to_protobuf()),
mview_definition: "".to_string(),
captured_execution_context: Some(PbCapturedExecutionContext {
time_zone: String::from("America/New_York"),
}),
}
})
.collect_vec();
Expand Down Expand Up @@ -1168,6 +1180,11 @@ mod tests {
parallel_unit_id: parallel_unit_id as i32,
upstream_actor_ids: ActorUpstreamActors(actor_upstream_actor_ids),
vnode_bitmap,
captured_execution_context: Some(CapturedExecutionContext(
PbCapturedExecutionContext {
time_zone: String::from("America/New_York"),
},
)),
}
})
.collect_vec();
Expand Down Expand Up @@ -1252,6 +1269,7 @@ mod tests {
parallel_unit_id,
upstream_actor_ids,
vnode_bitmap,
captured_execution_context,
},
PbStreamActor {
actor_id: pb_actor_id,
Expand All @@ -1261,6 +1279,7 @@ mod tests {
upstream_actor_id: pb_upstream_actor_id,
vnode_bitmap: pb_vnode_bitmap,
mview_definition,
captured_execution_context: pb_captured_execution_context,
},
) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
{
Expand Down Expand Up @@ -1316,6 +1335,11 @@ mod tests {
.cloned()
.map(ConnectorSplits)
);

assert_eq!(
captured_execution_context.map(|ctx| ctx.into_inner()),
pb_captured_execution_context
);
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_pb::common::{ParallelUnit, ParallelUnitMapping};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State};
use risingwave_pb::meta::PbTableFragments;
use risingwave_pb::plan_common::PbCapturedExecutionContext;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{
FragmentTypeFlag, PbStreamEnvironment, StreamActor, StreamNode, StreamSource,
Expand Down Expand Up @@ -74,6 +75,13 @@ impl StreamEnvironment {
}
}

pub fn to_captured_execution_context(&self) -> PbCapturedExecutionContext {
PbCapturedExecutionContext {
// `self.timezone` must always be set; an invalid value is used here for debugging if it's not.
time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
}
}

pub fn from_protobuf(prost: &PbStreamEnvironment) -> Self {
Self {
timezone: if prost.get_timezone().is_empty() {
Expand Down
14 changes: 12 additions & 2 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ impl DdlController {
let id = stream_job.id();
let default_parallelism = fragment_graph.default_parallelism();
let internal_tables = fragment_graph.internal_tables();
let captured_execution_context = env.to_captured_execution_context();

// 1. Resolve the upstream fragments, extend the fragment graph to a complete graph that
// contains all information needed for building the actor graph.
Expand Down Expand Up @@ -790,7 +791,11 @@ impl DdlController {
dispatchers,
merge_updates,
} = actor_graph_builder
.generate_graph(self.env.id_gen_manager_ref(), stream_job)
.generate_graph(
self.env.id_gen_manager_ref(),
stream_job,
captured_execution_context,
)
.await?;
assert!(merge_updates.is_empty());

Expand Down Expand Up @@ -1071,6 +1076,7 @@ impl DdlController {
) -> MetaResult<(ReplaceTableContext, TableFragments)> {
let id = stream_job.id();
let default_parallelism = fragment_graph.default_parallelism();
let captured_execution_context = env.to_captured_execution_context();

let old_table_fragments = self
.fragment_manager
Expand Down Expand Up @@ -1123,7 +1129,11 @@ impl DdlController {
dispatchers,
merge_updates,
} = actor_graph_builder
.generate_graph(self.env.id_gen_manager_ref(), stream_job)
.generate_graph(
self.env.id_gen_manager_ref(),
stream_job,
captured_execution_context,
)
.await?;
assert!(dispatchers.is_empty());

Expand Down
Loading

0 comments on commit 1219100

Please sign in to comment.