Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

run Layer::get_value_reconstruct_data in spawn_blocking #4498

51 changes: 41 additions & 10 deletions pageserver/src/tenant/storage_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::context::RequestContext;
use crate::repository::{Key, Value};
use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use anyhow::Result;
use anyhow::{Context, Result};
use bytes::Bytes;
use enum_map::EnumMap;
use enumset::EnumSet;
Expand Down Expand Up @@ -343,7 +343,8 @@ impl LayerAccessStats {
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
/// timeline names, because those are known in the context of which the layers
/// are used in (timeline).
pub trait Layer: std::fmt::Debug + Send + Sync {
#[async_trait::async_trait]
problame marked this conversation as resolved.
Show resolved Hide resolved
pub trait Layer: std::fmt::Debug + Send + Sync + 'static {
/// Range of keys that this layer covers
fn get_key_range(&self) -> Range<Key>;

Expand Down Expand Up @@ -373,13 +374,42 @@ pub trait Layer: std::fmt::Debug + Send + Sync {
/// is available. If this returns ValueReconstructResult::Continue, look up
/// the predecessor layer and call again with the same 'reconstruct_data' to
/// collect more data.
fn get_value_reconstruct_data(
fn get_value_reconstruct_data_blocking(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<ValueReconstructResult>;
reconstruct_data: ValueReconstructState,
ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)>;

/// CANCEL SAFETY: if the returned future is dropped,
/// the wrapped closure still run to completion and the return value discarded.
/// For the case of get_value_reconstruct_data, we expect the closure to not
/// have any side effects, as it only attempts to read a layer (and stuff like
/// page cache isn't considered a real side effect).
/// But, ...
/// TRACING:
/// If the returned future is cancelled, the spawn_blocking span can outlive
/// the caller's span.
/// So, technically, we should be using `parent: None` and `follows_from: current`
/// instead. However, in practice, the advantage of maintaining the span stack
/// in logs outweighs the disadvantage of having a dangling span in a case that
/// is not expected to happen because in pageserver we generally don't drop pending futures.
async fn get_value_reconstruct_data(
self: Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: ValueReconstructState,
ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
let span = tracing::info_span!("get_value_reconstruct_data_spawn_blocking");
problame marked this conversation as resolved.
Show resolved Hide resolved
tokio::task::spawn_blocking(move || {
let _enter = span.enter();
self.get_value_reconstruct_data_blocking(key, lsn_range, reconstruct_data, ctx)
})
.await
.context("spawn_blocking")?
}

/// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
fn short_id(&self) -> String;
Expand Down Expand Up @@ -499,6 +529,7 @@ impl LayerDescriptor {
}
}

#[async_trait::async_trait]
impl Layer for LayerDescriptor {
fn get_key_range(&self) -> Range<Key> {
self.key.clone()
Expand All @@ -512,13 +543,13 @@ impl Layer for LayerDescriptor {
self.is_incremental
}

fn get_value_reconstruct_data(
fn get_value_reconstruct_data_blocking(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
_reconstruct_data: ValueReconstructState,
_ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
todo!("This method shouldn't be part of the Layer trait")
}

Expand Down
19 changes: 10 additions & 9 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};

use tracing::*;

use utils::{
Expand Down Expand Up @@ -218,6 +219,7 @@ impl std::fmt::Debug for DeltaLayerInner {
}
}

#[async_trait::async_trait]
impl Layer for DeltaLayer {
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
Expand Down Expand Up @@ -294,21 +296,20 @@ impl Layer for DeltaLayer {
Ok(())
}

fn get_value_reconstruct_data(
fn get_value_reconstruct_data_blocking(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
mut reconstruct_state: ValueReconstructState,
ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
ensure!(lsn_range.start >= self.desc.lsn_range.start);
let mut need_image = true;

ensure!(self.desc.key_range.contains(&key));

{
// Open the file and lock the metadata in memory
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;

// Scan the page versions backwards, starting from `lsn`.
let file = &inner.file;
Expand Down Expand Up @@ -365,7 +366,7 @@ impl Layer for DeltaLayer {
need_image = false;
break;
}
}
} // release metadata lock and close the file
}
}
// release metadata lock and close the file
Expand All @@ -374,9 +375,9 @@ impl Layer for DeltaLayer {
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(ValueReconstructResult::Continue)
Ok((reconstruct_state, ValueReconstructResult::Continue))
} else {
Ok(ValueReconstructResult::Complete)
Ok((reconstruct_state, ValueReconstructResult::Complete))
}
}

Expand Down
15 changes: 8 additions & 7 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ impl std::fmt::Debug for ImageLayerInner {
}
}

#[async_trait::async_trait]
impl Layer for ImageLayer {
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
Expand Down Expand Up @@ -181,18 +182,18 @@ impl Layer for ImageLayer {
}

/// Look up given page in the file
fn get_value_reconstruct_data(
fn get_value_reconstruct_data_blocking(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
mut reconstruct_state: ValueReconstructState,
ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
problame marked this conversation as resolved.
Show resolved Hide resolved
assert!(self.desc.key_range.contains(&key));
assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn);

let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;

let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
Expand All @@ -210,9 +211,9 @@ impl Layer for ImageLayer {
let value = Bytes::from(blob);

reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete)
Ok((reconstruct_state, ValueReconstructResult::Complete))
} else {
Ok(ValueReconstructResult::Missing)
Ok((reconstruct_state, ValueReconstructResult::Missing))
}
}

Expand Down
15 changes: 8 additions & 7 deletions pageserver/src/tenant/storage_layer/inmemory_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl InMemoryLayer {
}
}

#[async_trait::async_trait]
impl Layer for InMemoryLayer {
fn get_key_range(&self) -> Range<Key> {
Key::MIN..Key::MAX
Expand Down Expand Up @@ -190,13 +191,13 @@ impl Layer for InMemoryLayer {
}

/// Look up given value in the layer.
fn get_value_reconstruct_data(
fn get_value_reconstruct_data_blocking(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
mut reconstruct_state: ValueReconstructState,
_ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
problame marked this conversation as resolved.
Show resolved Hide resolved
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;

Expand All @@ -213,7 +214,7 @@ impl Layer for InMemoryLayer {
match value {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
return Ok(ValueReconstructResult::Complete);
return Ok((reconstruct_state, ValueReconstructResult::Complete));
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
Expand All @@ -233,9 +234,9 @@ impl Layer for InMemoryLayer {
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(ValueReconstructResult::Continue)
Ok((reconstruct_state, ValueReconstructResult::Continue))
} else {
Ok(ValueReconstructResult::Complete)
Ok((reconstruct_state, ValueReconstructResult::Complete))
}
}
}
Expand Down
13 changes: 7 additions & 6 deletions pageserver/src/tenant/storage_layer/remote_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::context::RequestContext;
use crate::repository::Key;
use crate::tenant::layer_map::BatchedUpdates;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::storage_layer::{Layer, ValueReconstructState};
use anyhow::{bail, Result};
use pageserver_api::models::HistoricLayerInfo;
use std::ops::Range;
Expand All @@ -21,7 +21,7 @@ use utils::{
use super::filename::{DeltaFileName, ImageFileName};
use super::{
DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc,
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc, ValueReconstructResult,
};

/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
Expand Down Expand Up @@ -63,14 +63,15 @@ impl std::fmt::Debug for RemoteLayer {
}
}

#[async_trait::async_trait]
impl Layer for RemoteLayer {
fn get_value_reconstruct_data(
fn get_value_reconstruct_data_blocking(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
_reconstruct_state: ValueReconstructState,
_ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
bail!(
"layer {} needs to be downloaded",
self.filename().file_name()
Expand Down
Loading