Skip to content

Commit

Permalink
Revert "run Layer::get_value_reconstruct_data in spawn_blocking (#…
Browse files Browse the repository at this point in the history
…4498)"

This reverts commit 1faf69a.
  • Loading branch information
shanyp committed Jun 27, 2023
1 parent 00d1cfa commit bc98db9
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 118 deletions.
51 changes: 10 additions & 41 deletions pageserver/src/tenant/storage_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::context::RequestContext;
use crate::repository::{Key, Value};
use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use anyhow::{Context, Result};
use anyhow::Result;
use bytes::Bytes;
use enum_map::EnumMap;
use enumset::EnumSet;
Expand Down Expand Up @@ -343,8 +343,7 @@ impl LayerAccessStats {
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
/// timeline names, because those are known in the context of which the layers
/// are used in (timeline).
#[async_trait::async_trait]
pub trait Layer: std::fmt::Debug + Send + Sync + 'static {
pub trait Layer: std::fmt::Debug + Send + Sync {
/// Range of keys that this layer covers
fn get_key_range(&self) -> Range<Key>;

Expand Down Expand Up @@ -374,42 +373,13 @@ pub trait Layer: std::fmt::Debug + Send + Sync + 'static {
/// is available. If this returns ValueReconstructResult::Continue, look up
/// the predecessor layer and call again with the same 'reconstruct_data' to
/// collect more data.
fn get_value_reconstruct_data_blocking(
fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: ValueReconstructState,
ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)>;

/// CANCEL SAFETY: if the returned future is dropped,
/// the wrapped closure still run to completion and the return value discarded.
/// For the case of get_value_reconstruct_data, we expect the closure to not
/// have any side effects, as it only attempts to read a layer (and stuff like
/// page cache isn't considered a real side effect).
/// But, ...
/// TRACING:
/// If the returned future is cancelled, the spawn_blocking span can outlive
/// the caller's span.
/// So, technically, we should be using `parent: None` and `follows_from: current`
/// instead. However, in practice, the advantage of maintaining the span stack
/// in logs outweighs the disadvantage of having a dangling span in a case that
/// is not expected to happen because in pageserver we generally don't drop pending futures.
async fn get_value_reconstruct_data(
self: Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: ValueReconstructState,
ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
let span = tracing::info_span!("get_value_reconstruct_data_spawn_blocking");
tokio::task::spawn_blocking(move || {
let _enter = span.enter();
self.get_value_reconstruct_data_blocking(key, lsn_range, reconstruct_data, ctx)
})
.await
.context("spawn_blocking")?
}
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<ValueReconstructResult>;

/// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
fn short_id(&self) -> String;
Expand Down Expand Up @@ -529,7 +499,6 @@ impl LayerDescriptor {
}
}

#[async_trait::async_trait]
impl Layer for LayerDescriptor {
fn get_key_range(&self) -> Range<Key> {
self.key.clone()
Expand All @@ -543,13 +512,13 @@ impl Layer for LayerDescriptor {
self.is_incremental
}

fn get_value_reconstruct_data_blocking(
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: ValueReconstructState,
_ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
todo!("This method shouldn't be part of the Layer trait")
}

Expand Down
15 changes: 7 additions & 8 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ impl std::fmt::Debug for DeltaLayerInner {
}
}

#[async_trait::async_trait]
impl Layer for DeltaLayer {
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
Expand Down Expand Up @@ -295,21 +294,21 @@ impl Layer for DeltaLayer {
Ok(())
}

fn get_value_reconstruct_data_blocking(
fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
mut reconstruct_state: ValueReconstructState,
ctx: RequestContext,
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.desc.lsn_range.start);
let mut need_image = true;

ensure!(self.desc.key_range.contains(&key));

{
// Open the file and lock the metadata in memory
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;

// Scan the page versions backwards, starting from `lsn`.
let file = &inner.file;
Expand Down Expand Up @@ -375,9 +374,9 @@ impl Layer for DeltaLayer {
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok((reconstruct_state, ValueReconstructResult::Continue))
Ok(ValueReconstructResult::Continue)
} else {
Ok((reconstruct_state, ValueReconstructResult::Complete))
Ok(ValueReconstructResult::Complete)
}
}

Expand Down
15 changes: 7 additions & 8 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ impl std::fmt::Debug for ImageLayerInner {
}
}

#[async_trait::async_trait]
impl Layer for ImageLayer {
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
Expand Down Expand Up @@ -182,18 +181,18 @@ impl Layer for ImageLayer {
}

/// Look up given page in the file
fn get_value_reconstruct_data_blocking(
fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
mut reconstruct_state: ValueReconstructState,
ctx: RequestContext,
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
assert!(self.desc.key_range.contains(&key));
assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn);

let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;

let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
Expand All @@ -211,9 +210,9 @@ impl Layer for ImageLayer {
let value = Bytes::from(blob);

reconstruct_state.img = Some((self.lsn, value));
Ok((reconstruct_state, ValueReconstructResult::Complete))
Ok(ValueReconstructResult::Complete)
} else {
Ok((reconstruct_state, ValueReconstructResult::Missing))
Ok(ValueReconstructResult::Missing)
}
}

Expand Down
15 changes: 7 additions & 8 deletions pageserver/src/tenant/storage_layer/inmemory_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ impl InMemoryLayer {
}
}

#[async_trait::async_trait]
impl Layer for InMemoryLayer {
fn get_key_range(&self) -> Range<Key> {
Key::MIN..Key::MAX
Expand Down Expand Up @@ -191,13 +190,13 @@ impl Layer for InMemoryLayer {
}

/// Look up given value in the layer.
fn get_value_reconstruct_data_blocking(
fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
mut reconstruct_state: ValueReconstructState,
_ctx: RequestContext,
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;

Expand All @@ -214,7 +213,7 @@ impl Layer for InMemoryLayer {
match value {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
return Ok((reconstruct_state, ValueReconstructResult::Complete));
return Ok(ValueReconstructResult::Complete);
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
Expand All @@ -234,9 +233,9 @@ impl Layer for InMemoryLayer {
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok((reconstruct_state, ValueReconstructResult::Continue))
Ok(ValueReconstructResult::Continue)
} else {
Ok((reconstruct_state, ValueReconstructResult::Complete))
Ok(ValueReconstructResult::Complete)
}
}
}
Expand Down
13 changes: 6 additions & 7 deletions pageserver/src/tenant/storage_layer/remote_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::context::RequestContext;
use crate::repository::Key;
use crate::tenant::layer_map::BatchedUpdates;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructState};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use anyhow::{bail, Result};
use pageserver_api::models::HistoricLayerInfo;
use std::ops::Range;
Expand All @@ -21,7 +21,7 @@ use utils::{
use super::filename::{DeltaFileName, ImageFileName};
use super::{
DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc, ValueReconstructResult,
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc,
};

/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
Expand Down Expand Up @@ -63,15 +63,14 @@ impl std::fmt::Debug for RemoteLayer {
}
}

#[async_trait::async_trait]
impl Layer for RemoteLayer {
fn get_value_reconstruct_data_blocking(
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_state: ValueReconstructState,
_ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
_reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
bail!(
"layer {} needs to be downloaded",
self.filename().file_name()
Expand Down
73 changes: 27 additions & 46 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,14 +555,13 @@ impl Timeline {
None => None,
};

let reconstruct_state = ValueReconstructState {
let mut reconstruct_state = ValueReconstructState {
records: Vec::new(),
img: cached_page_img,
};

let timer = self.metrics.get_reconstruct_data_time_histo.start_timer();
let reconstruct_state = self
.get_reconstruct_data(key, lsn, reconstruct_state, ctx)
self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
.await?;
timer.stop_and_record();

Expand Down Expand Up @@ -2353,9 +2352,9 @@ impl Timeline {
&self,
key: Key,
request_lsn: Lsn,
mut reconstruct_state: ValueReconstructState,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<ValueReconstructState, PageReconstructError> {
) -> Result<(), PageReconstructError> {
// Start from the current timeline.
let mut timeline_owned;
let mut timeline = self;
Expand Down Expand Up @@ -2385,12 +2384,12 @@ impl Timeline {
// The function should have updated 'state'
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
match result {
ValueReconstructResult::Complete => return Ok(reconstruct_state),
ValueReconstructResult::Complete => return Ok(()),
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
return Ok(reconstruct_state);
return Ok(());
}
if prev_lsn <= cont_lsn {
// Didn't make any progress in last iteration. Error out to avoid
Expand Down Expand Up @@ -2494,19 +2493,13 @@ impl Timeline {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match Arc::clone(open_layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
result = match open_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
Expand All @@ -2527,19 +2520,13 @@ impl Timeline {
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match Arc::clone(frozen_layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
result = match frozen_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
Expand Down Expand Up @@ -2568,19 +2555,13 @@ impl Timeline {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor);
result = match Arc::clone(&layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
result = match layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
Expand Down

0 comments on commit bc98db9

Please sign in to comment.