From d72ba124a1f4e49c749b4a5158def5e2d8e8096e Mon Sep 17 00:00:00 2001 From: "Zhanxiang (Patrick) Huang" Date: Sun, 23 Oct 2022 14:47:35 +0800 Subject: [PATCH] fix: fix hummock snapshot release in batch local execution (#5971) Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- src/frontend/src/handler/query.rs | 2 +- .../src/scheduler/distributed/query.rs | 3 ++- .../src/scheduler/hummock_snapshot_manager.rs | 12 ++++++++- src/frontend/src/scheduler/local.rs | 25 +++++++++++++------ 4 files changed, 31 insertions(+), 11 deletions(-) diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index 23f0984a44f9b..2430e87aa8b71 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -225,7 +225,7 @@ async fn local_execute(session: Arc, query: Query) -> Result>>(); let stage_exec = Arc::new(StageExecution::new( - pinned_snapshot.snapshot.committed_epoch, + // TODO: Add support to use current epoch when needed + pinned_snapshot.get_committed_epoch(), self.query.stage_graph.stages[&stage_id].clone(), worker_node_manager.clone(), self.shutdown_tx.clone(), diff --git a/src/frontend/src/scheduler/hummock_snapshot_manager.rs b/src/frontend/src/scheduler/hummock_snapshot_manager.rs index 5760bd02b500d..7b13b6b529651 100644 --- a/src/frontend/src/scheduler/hummock_snapshot_manager.rs +++ b/src/frontend/src/scheduler/hummock_snapshot_manager.rs @@ -63,11 +63,21 @@ enum EpochOperation { } pub struct HummockSnapshotGuard { - pub snapshot: HummockSnapshot, + snapshot: HummockSnapshot, query_id: QueryId, unpin_snapshot_sender: UnboundedSender, } +impl HummockSnapshotGuard { + pub fn get_committed_epoch(&self) -> u64 { + self.snapshot.committed_epoch + } + + pub fn get_current_epoch(&self) -> u64 { + self.snapshot.current_epoch + } +} + impl Drop for HummockSnapshotGuard { fn drop(&mut self) { self.unpin_snapshot_sender diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index ac26d221b1983..fa0619664c268 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -38,6 +38,7 @@ use tracing::debug; use uuid::Uuid; use super::plan_fragmenter::{PartitionInfo, QueryStageRef}; +use super::HummockSnapshotGuard; use crate::optimizer::plan_node::PlanNodeType; use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query, StageId}; use crate::scheduler::task_context::FrontendBatchTaskContext; @@ -69,8 +70,8 @@ pub struct LocalQueryExecution { sql: String, query: Query, front_env: FrontendEnv, - epoch: u64, - + // The snapshot will be released when LocalQueryExecution is dropped. + snapshot: HummockSnapshotGuard, auth_context: Arc, } @@ -79,14 +80,14 @@ impl LocalQueryExecution { query: Query, front_env: FrontendEnv, sql: S, - epoch: u64, + snapshot: HummockSnapshotGuard, auth_context: Arc, ) -> Self { Self { sql: sql.into(), query, front_env, - epoch, + snapshot, auth_context, } } @@ -109,7 +110,13 @@ impl LocalQueryExecution { let plan_fragment = self.create_plan_fragment()?; let plan_node = plan_fragment.root.unwrap(); - let executor = ExecutorBuilder::new(&plan_node, &task_id, context, self.epoch); + let executor = ExecutorBuilder::new( + &plan_node, + &task_id, + context, + // TODO: Add support to use current epoch when needed + self.snapshot.get_committed_epoch(), + ); let executor = executor.build().await?; #[for_await] @@ -238,7 +245,8 @@ impl LocalQueryExecution { }; let local_execute_plan = LocalExecutePlan { plan: Some(second_stage_plan_fragment), - epoch: self.epoch, + // TODO: Add support to use current epoch when needed + epoch: self.snapshot.get_committed_epoch(), }; let exchange_source = ExchangeSource { task_output_id: Some(TaskOutputId { @@ -266,8 +274,9 @@ impl LocalQueryExecution { }; let local_execute_plan = LocalExecutePlan { - plan: Some(second_stage_plan_fragment), - epoch: self.epoch, + plan: Some(second_stage_plan_fragment), + // TODO: Add support to use current epoch when needed + epoch: self.snapshot.get_committed_epoch(), }; let workers = if second_stage.parallelism == 1 {