Skip to content

Commit

Permalink
make Delta{Value,Key}Iter Send
Browse files Browse the repository at this point in the history
... by switching the internal RwLock to a OnceCell.

This is preliminary work for/from #4220 (async `Layer::get_value_reconstruct_data`).

See #4462 (comment)
for more context.

fixes #4471
  • Loading branch information
problame committed Jun 12, 2023
1 parent b0286e3 commit 20ebf84
Showing 1 changed file with 47 additions and 77 deletions.
124 changes: 47 additions & 77 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::virtual_file::VirtualFile;
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use once_cell::sync::OnceCell;
use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
Expand All @@ -46,7 +47,6 @@ use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::*;

use utils::{
Expand Down Expand Up @@ -184,7 +184,7 @@ pub struct DeltaLayer {

access_stats: LayerAccessStats,

inner: RwLock<DeltaLayerInner>,
inner: OnceCell<DeltaLayerInner>,
}

impl std::fmt::Debug for DeltaLayer {
Expand All @@ -201,21 +201,17 @@ impl std::fmt::Debug for DeltaLayer {
}

pub struct DeltaLayerInner {
/// If false, the fields below have not been loaded into memory yet.
loaded: bool,

// values copied from summary
index_start_blk: u32,
index_root_blk: u32,

/// Reader object for reading blocks from the file. (None if not loaded yet)
file: Option<FileBlockReader<VirtualFile>>,
file: FileBlockReader<VirtualFile>,
}

impl std::fmt::Debug for DeltaLayerInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DeltaLayerInner")
.field("loaded", &self.loaded)
.field("index_start_blk", &self.index_start_blk)
.field("index_root_blk", &self.index_root_blk)
.finish()
Expand Down Expand Up @@ -246,7 +242,7 @@ impl Layer for DeltaLayer {
inner.index_start_blk, inner.index_root_blk
);

let file = inner.file.as_ref().unwrap();
let file = &inner.file;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
Expand Down Expand Up @@ -315,7 +311,7 @@ impl Layer for DeltaLayer {
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;

// Scan the page versions backwards, starting from `lsn`.
let file = inner.file.as_ref().unwrap();
let file = &inner.file;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
Expand Down Expand Up @@ -500,51 +496,22 @@ impl DeltaLayer {
/// Open the underlying file and read the metadata into memory, if it's
/// not loaded already.
///
fn load(
&self,
access_kind: LayerAccessKind,
ctx: &RequestContext,
) -> Result<RwLockReadGuard<DeltaLayerInner>> {
fn load(&self, access_kind: LayerAccessKind, ctx: &RequestContext) -> Result<&DeltaLayerInner> {
self.access_stats
.record_access(access_kind, ctx.task_kind());
loop {
// Quick exit if already loaded
let inner = self.inner.read().unwrap();
if inner.loaded {
return Ok(inner);
}

// Need to open the file and load the metadata. Upgrade our lock to
// a write lock. (Or rather, release and re-lock in write mode.)
drop(inner);
let inner = self.inner.write().unwrap();
if !inner.loaded {
self.load_inner(inner).with_context(|| {
format!("Failed to load delta layer {}", self.path().display())
})?;
} else {
// Another thread loaded it while we were not holding the lock.
}

// We now have the file open and loaded. There's no function to do
// that in the std library RwLock, so we have to release and re-lock
// in read mode. (To be precise, the lock guard was moved in the
// above call to `load_inner`, so it's already been released). And
// while we do that, another thread could unload again, so we have
// to re-check and retry if that happens.
}
// Quick exit if already loaded
self.inner
.get_or_try_init(|| self.load_inner())
.with_context(|| format!("Failed to load delta layer {}", self.path().display()))
}

fn load_inner(&self, mut inner: RwLockWriteGuard<DeltaLayerInner>) -> Result<()> {
fn load_inner(&self) -> Result<DeltaLayerInner> {
let path = self.path();

// Open the file if it's not open already.
if inner.file.is_none() {
let file = VirtualFile::open(&path)
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
inner.file = Some(FileBlockReader::new(file));
}
let file = inner.file.as_mut().unwrap();
let file = VirtualFile::open(&path)
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);

let summary_blk = file.read_blk(0)?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;

Expand All @@ -571,13 +538,13 @@ impl DeltaLayer {
}
}

inner.index_start_blk = actual_summary.index_start_blk;
inner.index_root_blk = actual_summary.index_root_blk;

debug!("loaded from {}", &path.display());

inner.loaded = true;
Ok(())
Ok(DeltaLayerInner {
file,
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
})
}

/// Create a DeltaLayer struct representing an existing file on disk.
Expand All @@ -599,12 +566,7 @@ impl DeltaLayer {
file_size,
),
access_stats,
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
index_start_blk: 0,
index_root_blk: 0,
}),
inner: once_cell::sync::OnceCell::new(),
}
}

Expand All @@ -631,12 +593,7 @@ impl DeltaLayer {
metadata.len(),
),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
index_start_blk: 0,
index_root_blk: 0,
}),
inner: once_cell::sync::OnceCell::new(),
})
}

Expand Down Expand Up @@ -800,12 +757,7 @@ impl DeltaLayerWriterInner {
metadata.len(),
),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
index_start_blk,
index_root_blk,
}),
inner: once_cell::sync::OnceCell::new(),
};

// fsync the file
Expand Down Expand Up @@ -940,13 +892,13 @@ struct DeltaValueIter<'a> {
reader: BlockCursor<Adapter<'a>>,
}

struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>);
struct Adapter<'a>(&'a DeltaLayerInner);

impl<'a> BlockReader for Adapter<'a> {
type BlockLease = PageReadGuard<'static>;

fn read_blk(&self, blknum: u32) -> Result<Self::BlockLease, std::io::Error> {
self.0.file.as_ref().unwrap().read_blk(blknum)
self.0.file.read_blk(blknum)
}
}

Expand All @@ -959,8 +911,8 @@ impl<'a> Iterator for DeltaValueIter<'a> {
}

impl<'a> DeltaValueIter<'a> {
fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result<Self> {
let file = inner.file.as_ref().unwrap();
fn new(inner: &'a DeltaLayerInner) -> Result<Self> {
let file = &inner.file;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
Expand Down Expand Up @@ -1033,8 +985,8 @@ impl Iterator for DeltaKeyIter {
}

impl<'a> DeltaKeyIter {
fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result<Self> {
let file = inner.file.as_ref().unwrap();
fn new(inner: &'a DeltaLayerInner) -> Result<Self> {
let file = &inner.file;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
Expand Down Expand Up @@ -1074,3 +1026,21 @@ impl<'a> DeltaKeyIter {
Ok(iter)
}
}

#[cfg(test)]
mod test {
use super::DeltaKeyIter;
use super::DeltaLayer;
use super::DeltaValueIter;

// We will soon need the iters to be send in the compactio ncode.
// Cf https://github.com/neondatabase/neon/pull/4462#issuecomment-1587398883
// Cf https://github.com/neondatabase/neon/issues/4471
#[test]
fn is_send() {
fn assert_send<T: Send>() {}
assert_send::<DeltaLayer>();
assert_send::<DeltaValueIter>();
assert_send::<DeltaKeyIter>();
}
}

0 comments on commit 20ebf84

Please sign in to comment.