Skip to content

Commit

Permalink
fix: fix hummock snapshot release in batch local execution (#5971)
Browse files Browse the repository at this point in the history
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
hzxa21 and mergify[bot] authored Oct 23, 2022
1 parent 5ffeccb commit d72ba12
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ async fn local_execute(session: Arc<SessionImpl>, query: Query) -> Result<LocalQ
query,
front_env.clone(),
"",
pinned_snapshot.snapshot.committed_epoch,
pinned_snapshot,
session.auth_context(),
);

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ impl QueryExecution {
.collect::<Vec<Arc<StageExecution>>>();

let stage_exec = Arc::new(StageExecution::new(
pinned_snapshot.snapshot.committed_epoch,
// TODO: Add support to use current epoch when needed
pinned_snapshot.get_committed_epoch(),
self.query.stage_graph.stages[&stage_id].clone(),
worker_node_manager.clone(),
self.shutdown_tx.clone(),
Expand Down
12 changes: 11 additions & 1 deletion src/frontend/src/scheduler/hummock_snapshot_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,21 @@ enum EpochOperation {
}

pub struct HummockSnapshotGuard {
pub snapshot: HummockSnapshot,
snapshot: HummockSnapshot,
query_id: QueryId,
unpin_snapshot_sender: UnboundedSender<EpochOperation>,
}

impl HummockSnapshotGuard {
pub fn get_committed_epoch(&self) -> u64 {
self.snapshot.committed_epoch
}

pub fn get_current_epoch(&self) -> u64 {
self.snapshot.current_epoch
}
}

impl Drop for HummockSnapshotGuard {
fn drop(&mut self) {
self.unpin_snapshot_sender
Expand Down
25 changes: 17 additions & 8 deletions src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use tracing::debug;
use uuid::Uuid;

use super::plan_fragmenter::{PartitionInfo, QueryStageRef};
use super::HummockSnapshotGuard;
use crate::optimizer::plan_node::PlanNodeType;
use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query, StageId};
use crate::scheduler::task_context::FrontendBatchTaskContext;
Expand Down Expand Up @@ -69,8 +70,8 @@ pub struct LocalQueryExecution {
sql: String,
query: Query,
front_env: FrontendEnv,
epoch: u64,

// The snapshot will be released when LocalQueryExecution is dropped.
snapshot: HummockSnapshotGuard,
auth_context: Arc<AuthContext>,
}

Expand All @@ -79,14 +80,14 @@ impl LocalQueryExecution {
query: Query,
front_env: FrontendEnv,
sql: S,
epoch: u64,
snapshot: HummockSnapshotGuard,
auth_context: Arc<AuthContext>,
) -> Self {
Self {
sql: sql.into(),
query,
front_env,
epoch,
snapshot,
auth_context,
}
}
Expand All @@ -109,7 +110,13 @@ impl LocalQueryExecution {

let plan_fragment = self.create_plan_fragment()?;
let plan_node = plan_fragment.root.unwrap();
let executor = ExecutorBuilder::new(&plan_node, &task_id, context, self.epoch);
let executor = ExecutorBuilder::new(
&plan_node,
&task_id,
context,
// TODO: Add support to use current epoch when needed
self.snapshot.get_committed_epoch(),
);
let executor = executor.build().await?;

#[for_await]
Expand Down Expand Up @@ -238,7 +245,8 @@ impl LocalQueryExecution {
};
let local_execute_plan = LocalExecutePlan {
plan: Some(second_stage_plan_fragment),
epoch: self.epoch,
// TODO: Add support to use current epoch when needed
epoch: self.snapshot.get_committed_epoch(),
};
let exchange_source = ExchangeSource {
task_output_id: Some(TaskOutputId {
Expand Down Expand Up @@ -266,8 +274,9 @@ impl LocalQueryExecution {
};

let local_execute_plan = LocalExecutePlan {
plan: Some(second_stage_plan_fragment),
epoch: self.epoch,
plan: Some(second_stage_plan_fragment),
// TODO: Add support to use current epoch when needed
epoch: self.snapshot.get_committed_epoch(),
};

let workers = if second_stage.parallelism == 1 {
Expand Down

0 comments on commit d72ba12

Please sign in to comment.