Skip to content

Commit

Permalink
move CapturedContext to stream_plan
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Nov 28, 2023
1 parent 02f4358 commit 42e6765
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 24 deletions.
3 changes: 0 additions & 3 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,3 @@ message PlanFragment {
ExchangeInfo exchange_info = 2;
}

message CapturedContext {
optional string time_zone = 1;
}
5 changes: 5 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -878,3 +878,8 @@ message StreamFragmentGraph {
// If none, default parallelism will be applied.
Parallelism parallelism = 6;
}

// Provide statement-local context, e.g. session info like time zone, for runtime execution.
message CapturedContext {
string time_zone = 1;
}
4 changes: 2 additions & 2 deletions proto/task_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ message CreateTaskRequest {
batch_plan.PlanFragment plan = 2;
common.BatchQueryEpoch epoch = 3;
map<string, string> tracing_context = 4;
batch_plan.CapturedContext captured_context = 5;
stream_plan.CapturedContext captured_context = 5;
}

message CancelTaskRequest {
Expand All @@ -64,7 +64,7 @@ message ExecuteRequest {
batch_plan.PlanFragment plan = 2;
common.BatchQueryEpoch epoch = 3;
map<string, string> tracing_context = 4;
batch_plan.CapturedContext captured_context = 5;
stream_plan.CapturedContext captured_context = 5;
}

service TaskService {
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/execution/grpc_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl GrpcExchangeSource {
plan: plan.plan,
epoch: plan.epoch,
tracing_context: plan.tracing_context,
captured_context: Some(capture_context()),
captured_context: Some(capture_context()?),
};
client.execute(execute_request).await?
}
Expand Down
3 changes: 2 additions & 1 deletion src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ use risingwave_common::util::panic::FutureCatchUnwindExt;
use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use risingwave_common::util::tracing::TracingContext;
use risingwave_expr::captured_context_scope;
use risingwave_pb::batch_plan::{CapturedContext, PbTaskId, PbTaskOutputId, PlanFragment};
use risingwave_pb::batch_plan::{PbTaskId, PbTaskOutputId, PlanFragment};
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::stream_plan::CapturedContext;
use risingwave_pb::task_service::task_info_response::TaskStatus;
use risingwave_pb::task_service::{GetDataResponse, TaskInfoResponse};
use risingwave_pb::PbFieldNotFound;
Expand Down
3 changes: 2 additions & 1 deletion src/batch/src/task/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use risingwave_common::config::BatchConfig;
use risingwave_common::memory::MemoryContext;
use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use risingwave_common::util::tracing::TracingContext;
use risingwave_pb::batch_plan::{CapturedContext, PbTaskId, PbTaskOutputId, PlanFragment};
use risingwave_pb::batch_plan::{PbTaskId, PbTaskOutputId, PlanFragment};
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::stream_plan::CapturedContext;
use risingwave_pb::task_service::task_info_response::TaskStatus;
use risingwave_pb::task_service::{GetDataResponse, TaskInfoResponse};
use tokio::sync::mpsc::Sender;
Expand Down
15 changes: 7 additions & 8 deletions src/expr/core/src/captured_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_expr::define_context;
use risingwave_pb::batch_plan::CapturedContext;
use risingwave_expr::{define_context, Result as ExprResult};
use risingwave_pb::stream_plan::CapturedContext;

// For all execution mode.
define_context! {
pub TIME_ZONE: String,
}

pub fn capture_context() -> CapturedContext {
let mut ctx = CapturedContext::default();
let _ = TIME_ZONE::try_with(|time_zone| {
ctx.time_zone = Some(time_zone.to_string());
});
ctx
pub fn capture_context() -> ExprResult<CapturedContext> {
let ctx = TIME_ZONE::try_with(|time_zone| CapturedContext {
time_zone: time_zone.to_owned(),
})?;
Ok(ctx)
}
3 changes: 1 addition & 2 deletions src/expr/macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,7 @@ pub fn captured_context_scope(input: TokenStream) -> TokenStream {
body = quote! {
async {
use risingwave_expr::captured_context::#local_key_name;
let #field = ctx.#field.unwrap_or_default();
#local_key_name::scope(#field.to_owned(), #body).await
#local_key_name::scope(ctx.#field.to_owned(), #body).await
}
};
});
Expand Down
9 changes: 4 additions & 5 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ use risingwave_connector::source::SplitMetaData;
use risingwave_expr::captured_context_scope;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{
CapturedContext, DistributedLookupJoinNode, ExchangeNode, ExchangeSource,
MergeSortExchangeNode, PlanFragment, PlanNode as PlanNodePb, PlanNode, TaskId as TaskIdPb,
TaskOutputId,
DistributedLookupJoinNode, ExchangeNode, ExchangeSource, MergeSortExchangeNode, PlanFragment,
PlanNode as PlanNodePb, PlanNode, TaskId as TaskIdPb, TaskOutputId,
};
use risingwave_pb::common::{BatchQueryEpoch, HostAddress, WorkerNode};
use risingwave_pb::stream_plan::CapturedContext;
use risingwave_pb::task_service::{CancelTaskRequest, TaskInfoResponse};
use risingwave_rpc_client::ComputeClientPoolRef;
use tokio::spawn;
Expand Down Expand Up @@ -633,8 +633,7 @@ impl StageRunner {

async fn schedule_tasks_for_all(&mut self, shutdown_rx: ShutdownToken) -> SchedulerResult<()> {
let captured_context = CapturedContext {
// TODO(kexiang): We go through the plan to make sure only the neccessay context is captured.
time_zone: Some(self.ctx.session().config().timezone().to_owned()),
time_zone: self.ctx.session().config().timezone().to_owned(),
};
// If root, we execute it locally.
if !self.is_root_stage() {
Expand Down
3 changes: 2 additions & 1 deletion src/rpc_client/src/compute_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE};
use risingwave_common::monitor::connection::{EndpointExt, TcpConfig};
use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::tracing::TracingContext;
use risingwave_pb::batch_plan::{CapturedContext, PlanFragment, TaskId, TaskOutputId};
use risingwave_pb::batch_plan::{PlanFragment, TaskId, TaskOutputId};
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::compute::config_service_client::ConfigServiceClient;
use risingwave_pb::compute::{ShowConfigRequest, ShowConfigResponse};
Expand All @@ -31,6 +31,7 @@ use risingwave_pb::monitor_service::{
ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse,
StackTraceRequest, StackTraceResponse,
};
use risingwave_pb::stream_plan::CapturedContext;
use risingwave_pb::task_service::exchange_service_client::ExchangeServiceClient;
use risingwave_pb::task_service::task_service_client::TaskServiceClient;
use risingwave_pb::task_service::{
Expand Down

0 comments on commit 42e6765

Please sign in to comment.