From 770400f5db339c2e92513cc572208ccbff9af6ff Mon Sep 17 00:00:00 2001 From: Matthew Ahrens Date: Fri, 6 May 2022 10:37:33 -0700 Subject: [PATCH] Add disks to zettacache live with `zcache add` command (#392) Currently, disks are added to the zettacache implicitly, by specifying new devices on the command line with the `-c PATH` argument. This commit adds a way to add disks to the zettacache without restarting the agent, by running `zcache add PATH`. Additionally, a `zcache sync [--merge]` subcommand is added, to sync a checkpoint (and optionally request an immediate merge of the index). --- cmd/zfs_object_agent/util/src/message.rs | 11 ++ cmd/zfs_object_agent/zcache/src/add.rs | 42 +++++ cmd/zfs_object_agent/zcache/src/hits.rs | 9 +- cmd/zfs_object_agent/zcache/src/main.rs | 19 +- cmd/zfs_object_agent/zcache/src/sync.rs | 40 +++++ .../zettacache/src/base_types.rs | 2 +- .../zettacache/src/block_access.rs | 106 +++++++---- .../zettacache/src/slab_allocator.rs | 61 +++++-- .../zettacache/src/zettacache/mod.rs | 164 ++++++++++++++---- .../zettaobject/src/root_connection.rs | 49 ++++++ 10 files changed, 418 insertions(+), 85 deletions(-) create mode 100644 cmd/zfs_object_agent/zcache/src/add.rs create mode 100644 cmd/zfs_object_agent/zcache/src/sync.rs diff --git a/cmd/zfs_object_agent/util/src/message.rs b/cmd/zfs_object_agent/util/src/message.rs index 42e1ac5e3fb0..158e44577935 100644 --- a/cmd/zfs_object_agent/util/src/message.rs +++ b/cmd/zfs_object_agent/util/src/message.rs @@ -1,8 +1,11 @@ +use std::fmt::Debug; use std::mem::size_of; use std::ptr; use std::slice; use safer_ffi::prelude::*; +use serde::Deserialize; +use serde::Serialize; use tokio::io; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; @@ -132,3 +135,11 @@ pub const TYPE_REPORT_HITS: &str = "report hits"; pub const TYPE_LIST_DEVICES: &str = "list devices"; pub const TYPE_ZCACHE_IOSTAT: &str = "zcache iostat"; pub const TYPE_ZCACHE_STATS: &str = "zcache stats"; +pub const TYPE_ADD_DISK: &str = "add disk"; +pub const TYPE_SYNC_CHECKPOINT: &str = "sync checkpoint"; +pub const TYPE_INITIATE_MERGE: &str = "initiate merge"; + +#[derive(Serialize, Deserialize, Debug)] +pub struct AddDiskRequest { + pub path: String, +} diff --git a/cmd/zfs_object_agent/zcache/src/add.rs b/cmd/zfs_object_agent/zcache/src/add.rs new file mode 100644 index 000000000000..47e20da79eb2 --- /dev/null +++ b/cmd/zfs_object_agent/zcache/src/add.rs @@ -0,0 +1,42 @@ +//! `zcache add` subcommand + +use anyhow::anyhow; +use anyhow::Result; +use async_trait::async_trait; +use clap::Parser; +use util::message::AddDiskRequest; +use util::message::TYPE_ADD_DISK; +use util::writeln_stdout; + +use crate::remote_channel::RemoteChannel; +use crate::remote_channel::RemoteError; +use crate::subcommand::ZcacheSubCommand; + +#[derive(Parser)] +#[clap(about = "Add a disk to the ZettaCache.")] +pub struct Add { + path: String, +} + +#[async_trait] +impl ZcacheSubCommand for Add { + async fn invoke(&self) -> Result<()> { + let mut remote = RemoteChannel::new(true).await?; + + let request = AddDiskRequest { + path: self.path.clone(), + }; + + match remote + .call(TYPE_ADD_DISK, Some(nvpair::to_nvlist(&request).unwrap())) + .await + { + Ok(_) => { + writeln_stdout!("Disk {} added", self.path); + } + Err(RemoteError::ResultError(_)) => return Err(anyhow!("No cache found")), + Err(RemoteError::Other(e)) => return Err(e), + } + Ok(()) + } +} diff --git a/cmd/zfs_object_agent/zcache/src/hits.rs b/cmd/zfs_object_agent/zcache/src/hits.rs index 01251ba47433..5cb83b763677 100644 --- a/cmd/zfs_object_agent/zcache/src/hits.rs +++ b/cmd/zfs_object_agent/zcache/src/hits.rs @@ -4,6 +4,7 @@ use std::time::Duration; use std::time::SystemTime; use std::time::UNIX_EPOCH; +use anyhow::anyhow; use anyhow::Result; use async_trait::async_trait; use chrono::DateTime; @@ -184,7 +185,9 @@ impl ZcacheSubCommand for Hits { writeln_stdout!("Hits-by-size data cleared"); } Err(RemoteError::ResultError(_)) => { - writeln_stdout!("No cache found, so no hits-by-size data present"); + return Err(anyhow!( + "No cache found, so no hits-by-size data is available" + )) } Err(RemoteError::Other(e)) => return Err(e), } @@ -207,7 +210,9 @@ impl ZcacheSubCommand for Hits { hits_by_size.print(quantiles, cumulative, ghost); } Err(RemoteError::ResultError(_)) => { - writeln_stdout!("No cache found, so no hits-by-size data is available"); + return Err(anyhow!( + "No cache found, so no hits-by-size data is available" + )); } Err(RemoteError::Other(e)) => return Err(e), } diff --git a/cmd/zfs_object_agent/zcache/src/main.rs b/cmd/zfs_object_agent/zcache/src/main.rs index 3d53fa2b688f..928930aa32ff 100644 --- a/cmd/zfs_object_agent/zcache/src/main.rs +++ b/cmd/zfs_object_agent/zcache/src/main.rs @@ -4,22 +4,21 @@ #![warn(clippy::cast_sign_loss)] #![deny(clippy::print_stdout)] #![deny(clippy::print_stderr)] + +mod add; mod hits; mod iostat; mod list; mod remote_channel; mod stats; mod subcommand; +mod sync; use anyhow::Result; use clap::Parser; use clap::Subcommand; use hits::ClearHitData; -use hits::Hits; -use iostat::Iostat; -use list::List; use log::*; -use stats::Stats; use subcommand::ZcacheSubCommand; fn main() -> Result<()> { @@ -52,10 +51,12 @@ struct Cli { /// 2. Add an entry to the enum here where it will be parsed and instantiated automatically. /// 3. Add a match entry to the match block in `async_main()`. enum Commands { - Hits(Hits), - Iostat(Iostat), - List(List), - Stats(Stats), + Hits(hits::Hits), + Iostat(iostat::Iostat), + List(list::List), + Stats(stats::Stats), + Add(add::Add), + Sync(sync::Sync), // clear_hit_data is deprecated/hidden #[clap(rename_all = "snake_case")] @@ -75,6 +76,8 @@ async fn async_main() -> Result<()> { Commands::Iostat(subcommand) => subcommand.invoke().await?, Commands::List(subcommand) => subcommand.invoke().await?, Commands::Stats(subcommand) => subcommand.invoke().await?, + Commands::Add(subcommand) => subcommand.invoke().await?, + Commands::Sync(subcommand) => subcommand.invoke().await?, } Ok(()) diff --git a/cmd/zfs_object_agent/zcache/src/sync.rs b/cmd/zfs_object_agent/zcache/src/sync.rs new file mode 100644 index 000000000000..f56f0b28936f --- /dev/null +++ b/cmd/zfs_object_agent/zcache/src/sync.rs @@ -0,0 +1,40 @@ +//! `zcache sync` subcommand + +use anyhow::anyhow; +use anyhow::Result; +use async_trait::async_trait; +use clap::Parser; +use util::message::TYPE_INITIATE_MERGE; +use util::message::TYPE_SYNC_CHECKPOINT; + +use crate::remote_channel::RemoteChannel; +use crate::remote_channel::RemoteError; +use crate::subcommand::ZcacheSubCommand; + +#[derive(Parser)] +#[clap(about = "Wait for changes to be persisted to Zettacache.")] +pub struct Sync { + /// Request index merge. If a merge is already in progress, a new merge will be started as + /// soon as this one completes. + #[clap(long)] + merge: bool, +} + +#[async_trait] +impl ZcacheSubCommand for Sync { + async fn invoke(&self) -> Result<()> { + let mut remote = RemoteChannel::new(true).await?; + + let result = if self.merge { + remote.call(TYPE_INITIATE_MERGE, None).await + } else { + remote.call(TYPE_SYNC_CHECKPOINT, None).await + }; + match result { + Ok(_) => {} + Err(RemoteError::ResultError(_)) => return Err(anyhow!("No cache found")), + Err(RemoteError::Other(e)) => return Err(e), + } + Ok(()) + } +} diff --git a/cmd/zfs_object_agent/zettacache/src/base_types.rs b/cmd/zfs_object_agent/zettacache/src/base_types.rs index 8a95bfa0402c..3ec0b504c55f 100644 --- a/cmd/zfs_object_agent/zettacache/src/base_types.rs +++ b/cmd/zfs_object_agent/zettacache/src/base_types.rs @@ -62,7 +62,7 @@ impl DiskId { assert_le!(value, Self::MAX_VALUE); DiskId(u16::try_from(value).unwrap()) } - pub fn get(self) -> usize { + pub fn index(self) -> usize { self.0 as usize } } diff --git a/cmd/zfs_object_agent/zettacache/src/block_access.rs b/cmd/zfs_object_agent/zettacache/src/block_access.rs index 7d707a423963..e03cd37c8176 100644 --- a/cmd/zfs_object_agent/zettacache/src/block_access.rs +++ b/cmd/zfs_object_agent/zettacache/src/block_access.rs @@ -8,6 +8,7 @@ use std::os::unix::prelude::AsRawFd; use std::os::unix::prelude::OpenOptionsExt; use std::path::Path; use std::sync::atomic::Ordering; +use std::sync::RwLock; use std::thread::sleep; use std::time::Duration; use std::time::Instant; @@ -17,6 +18,8 @@ use anyhow::Context; use anyhow::Result; use bincode::Options; use bytesize::ByteSize; +use derivative::Derivative; +use futures::Future; use libc::c_void; use log::*; use nix::errno::Errno; @@ -123,18 +126,18 @@ impl<'a> Drop for OpInProgress<'a> { #[derive(Debug)] pub struct BlockAccess { sector_size: usize, - disks: Vec, + disks: RwLock>, readonly: bool, timebase: Instant, } -#[derive(Debug)] +#[derive(Derivative)] +#[derivative(Debug)] pub struct Disk { - // We want all the reader/writer_threads to share the same file descriptor, - // but we don't have a mechanism to ensure that they stop using the fd when - // the DiskStruct is dropped and the fd is closed. To solve this we simply - // never close the fd. The fd is owned by the File, and we leave a - // reference to it here to indicate that it's related to this Disk, even + // We want all the reader/writer_threads to share the same file descriptor, but we don't have + // a mechanism to ensure that they stop using the fd when the DiskStruct is dropped and the + // fd is closed. To solve this we simply never close the fd. The fd is owned by the File, + // and we leave a reference to it here to indicate that it's related to this Disk, even // though it's only used via the reader/writer_threads. #[allow(dead_code)] file: &'static File, @@ -142,9 +145,13 @@ pub struct Disk { device_path: String, size: u64, sector_size: usize, + #[derivative(Debug = "ignore")] io_stats: &'static DiskIoStats, + #[derivative(Debug = "ignore")] reader_tx: flume::Sender, + #[derivative(Debug = "ignore")] writer_txs: Vec>, + #[derivative(Debug = "ignore")] metadata_writer_txs: Vec>, } @@ -322,7 +329,15 @@ impl Disk { } } - async fn read(&self, offset: u64, size: usize, io_type: DiskIoType) -> AlignedBytes { + // This is a desugared `async fn` so that it can return a Future that does not capture + // `&self` (as indicated by the absence of `+ '_`). That way, the caller can drop the + // associated RwLock before `await`ing. + fn read( + &self, + offset: u64, + size: usize, + io_type: DiskIoType, + ) -> impl Future { self.verify_aligned(offset); self.verify_aligned(size); @@ -334,9 +349,9 @@ impl Disk { tx, }; - self.reader_tx.send_async(message).await.unwrap(); - let bytes = measure!().fut(rx).await.unwrap(); - bytes + // note: reader_tx is unbounded, so .send() will not block + self.reader_tx.send(message).unwrap(); + async move { measure!().fut(rx).await.unwrap() } } fn aggregating_writer_thread( @@ -453,7 +468,15 @@ impl Disk { } } - async fn write(&self, offset: u64, bytes: AlignedBytes, io_type: DiskIoType) { + // This is a desugared `async fn` so that it can return a Future that does not capture + // `&self` (as indicated by the absence of `+ '_`). That way, the caller can drop the + // associated RwLock before `await`ing. + fn write( + &self, + offset: u64, + bytes: AlignedBytes, + io_type: DiskIoType, + ) -> impl Future { self.verify_aligned(offset); self.verify_aligned(bytes.len()); @@ -476,7 +499,7 @@ impl Disk { txs[writer] .send(message) .unwrap_or_else(|e| panic!("writer_txs[{}].send: {}", writer, e)); - measure!().fut(rx).await.unwrap(); + async move { measure!().fut(rx).await.unwrap() } } fn verify_aligned(&self, n: N) { @@ -507,23 +530,32 @@ impl BlockAccess { BlockAccess { sector_size, - disks, + disks: RwLock::new(disks), readonly, timebase: Instant::now(), } } + pub fn add_disk(&self, disk: Disk) -> DiskId { + let mut disks = self.disks.write().unwrap(); + let id = DiskId::new(disks.len()); + disks.push(disk); + id + } + /// Note: In the future we'll support device removal in which case the /// DiskId's will probably not be sequential. By using this accessor we /// need not assume anything about the values inside the DiskId's. pub fn disks(&self) -> impl Iterator { - (0..self.disks.len()).map(DiskId::new) + (0..self.disks.read().unwrap().len()).map(DiskId::new) } // Gather a list of devices for zcache list_devices command. pub fn list_devices(&self) -> DeviceList { let devices = self .disks + .read() + .unwrap() .iter() .map(|d| DeviceEntry { name: d.device_path.to_string(), @@ -533,16 +565,21 @@ impl BlockAccess { DeviceList { devices } } - fn disk(&self, disk: DiskId) -> &Disk { - &self.disks[disk.get()] + pub fn disk_size(&self, disk: DiskId) -> u64 { + self.disks.read().unwrap()[disk.index()].size } - pub fn disk_size(&self, disk: DiskId) -> u64 { - self.disk(disk).size + pub fn disk_extent(&self, disk: DiskId) -> Extent { + Extent { + location: DiskLocation::new(disk, 0), + size: self.disk_size(disk), + } } pub fn disk_path(&self, disk: DiskId) -> String { - self.disk(disk).device_path.to_string() + self.disks.read().unwrap()[disk.index()] + .device_path + .to_string() } pub fn total_capacity(&self) -> u64 { @@ -555,13 +592,13 @@ impl BlockAccess { self.verify_aligned(extent.location.offset()); self.verify_aligned(extent.size); - self.disk(extent.location.disk()) - .read( - extent.location.offset(), - usize::from64(extent.size), - io_type, - ) - .await + let disk = extent.location.disk(); + let fut = self.disks.read().unwrap()[disk.index()].read( + extent.location.offset(), + usize::from64(extent.size), + io_type, + ); // drop disks RwLock before waiting for io + fut.await } // The location.offset() and bytes.len() must be sector-aligned. However, @@ -578,9 +615,10 @@ impl BlockAccess { ); self.verify_aligned(location.offset()); self.verify_aligned(bytes.len()); - self.disk(location.disk()) - .write(location.offset(), bytes, io_type) - .await + let disk = location.disk(); + let fut = self.disks.read().unwrap()[disk.index()].write(location.offset(), bytes, io_type); + // drop disks RwLock before waiting for io + fut.await; } pub fn round_up_to_sector(&self, n: N) -> N { @@ -739,7 +777,13 @@ impl BlockAccess { serde_json::to_string(&IoStatsRef { cache_runtime_id: agent_id, // used to detect agent restarts across stat snapshots timestamp: self.timebase.elapsed(), - disk_stats: self.disks.iter().map(|disk| disk.io_stats).collect(), + disk_stats: self + .disks + .read() + .unwrap() + .iter() + .map(|disk| disk.io_stats) + .collect(), }) .unwrap() } diff --git a/cmd/zfs_object_agent/zettacache/src/slab_allocator.rs b/cmd/zfs_object_agent/zettacache/src/slab_allocator.rs index b2051ebe9f9f..2d3015e9fe88 100644 --- a/cmd/zfs_object_agent/zettacache/src/slab_allocator.rs +++ b/cmd/zfs_object_agent/zettacache/src/slab_allocator.rs @@ -14,6 +14,7 @@ use std::ops::Add; use std::ops::Bound::*; use std::ops::Sub; use std::sync::Mutex; +use std::sync::RwLock; use bimap::BiBTreeMap; use bytesize::ByteSize; @@ -56,8 +57,13 @@ pub struct SlabAllocatorPhys { #[derive(Debug)] pub struct SlabAccess { - capacity: BiBTreeMap, + inner: RwLock, slab_size: u64, +} + +#[derive(Debug)] +struct SlabAccessInner { + capacity: BiBTreeMap, num_slabs: u64, } @@ -114,7 +120,8 @@ impl SlabAllocatorPhys { impl SlabAccess { pub fn slab_id_to_extent(&self, slab_id: SlabId) -> Extent { - let (&extent_slab, containing_extent) = self + let inner = self.inner.read().unwrap(); + let (&extent_slab, containing_extent) = inner .capacity .left_range((Unbounded, Included(slab_id))) .next_back() @@ -125,7 +132,8 @@ impl SlabAccess { pub fn extent_to_slab_id(&self, extent: Extent) -> SlabId { assert_le!(extent.size, self.slab_size); - let (&capacity_slab, capacity_extent) = self + let inner = self.inner.read().unwrap(); + let (&capacity_slab, capacity_extent) = inner .capacity .right_range((Unbounded, Included(extent.location))) .next_back() @@ -135,7 +143,7 @@ impl SlabAccess { let slab_id = capacity_slab + ((extent.location - capacity_extent.location) / self.slab_size); - assert_lt!(slab_id.0, self.num_slabs); + assert_lt!(slab_id.0, inner.num_slabs); debug_assert!(self.slab_id_to_extent(slab_id).contains(&extent)); slab_id } @@ -145,11 +153,13 @@ impl SlabAccess { } pub fn num_slabs(&self) -> u64 { - self.num_slabs + let inner = self.inner.read().unwrap(); + inner.num_slabs } pub fn capacity(&self) -> u64 { - self.num_slabs * self.slab_size + let inner = self.inner.read().unwrap(); + inner.num_slabs * self.slab_size } } @@ -168,9 +178,11 @@ impl SlabAllocatorBuilder { Self { allocatable: (0..num_slabs).map(SlabId).collect(), access: SlabAccess { - capacity, + inner: RwLock::new(SlabAccessInner { + capacity, + num_slabs, + }), slab_size: phys.slab_size, - num_slabs, }, } } @@ -185,7 +197,7 @@ impl SlabAllocatorBuilder { inner: Mutex::new(Inner { allocatable: self.allocatable.into_iter().collect(), freeing: Vec::new(), - reserved_slabs: RESERVED_SLABS_PCT.apply(self.access.num_slabs), + reserved_slabs: RESERVED_SLABS_PCT.apply(self.access.num_slabs()), }), access: self.access, } @@ -220,11 +232,35 @@ impl SlabAllocatorBuilder { } impl SlabAllocator { + pub fn extend(&self, capacity: Extent) { + // lock order: SlabAllocator.inner before SlabAccess.inner + let mut inner = self.inner.lock().unwrap(); + + let mut access_inner = self.access.inner.write().unwrap(); + let first_new_slab = SlabId(access_inner.num_slabs); + // capacity is aligned to be a multiple of slabsize + let capacity = capacity.trim_end(capacity.size - capacity.size % self.access.slab_size); + access_inner.capacity.insert(first_new_slab, capacity); + let new_slabs = capacity.size / self.access.slab_size; + + // We don't want to allocate and write to the new capacity until the next checkpoint + // (when the SuperBlockPhys's have been updated to reflect the new capacity). Therefore + // we add the new slabs to `freeing`. + inner + .freeing + .extend((0..new_slabs).map(|n| SlabId(access_inner.num_slabs + n))); + + access_inner.num_slabs += new_slabs; + } + pub fn get_phys(&self) -> SlabAllocatorPhys { SlabAllocatorPhys { slab_size: self.access.slab_size, capacity: self .access + .inner + .read() + .unwrap() .capacity .iter() .map(|(_, &extent)| extent) @@ -250,13 +286,14 @@ impl SlabAllocator { let mut inner = self.inner.lock().unwrap(); inner.reserved_slabs = max( reserved_space / self.access.slab_size, - RESERVED_SLABS_PCT.apply(self.access.num_slabs), + RESERVED_SLABS_PCT.apply(self.access.num_slabs()), ); } pub fn allocate_reserved(&self) -> SlabId { let mut inner = self.inner.lock().unwrap(); - if inner.allocatable.len() as u64 <= SUPER_RESERVED_SLABS_PCT.apply(self.access.num_slabs) { + if inner.allocatable.len() as u64 <= SUPER_RESERVED_SLABS_PCT.apply(self.access.num_slabs()) + { panic!("Free slabs exhausted."); } inner.allocatable.pop().unwrap() @@ -285,7 +322,7 @@ impl SlabAllocator { pub fn num_slabs_to_evacuate(&self) -> u64 { let inner = self.inner.lock().unwrap(); let target_free_slabs = - inner.reserved_slabs + TARGET_AVAILABLE_SLABS_PCT.apply(self.access.num_slabs); + inner.reserved_slabs + TARGET_AVAILABLE_SLABS_PCT.apply(self.access.num_slabs()); let current_free_slabs = inner.allocatable.len() as u64; target_free_slabs.saturating_sub(current_free_slabs) } diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs b/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs index 974e8537ed4d..8186d11f8968 100644 --- a/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs +++ b/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs @@ -32,11 +32,12 @@ use serde::Deserialize; use serde::Serialize; use sysinfo::System; use sysinfo::SystemExt; +use tokio::select; use tokio::sync::mpsc; +use tokio::sync::watch; use tokio::sync::OwnedSemaphorePermit; use tokio::sync::Semaphore; use tokio::time::sleep_until; -use tokio::time::timeout_at; use util::concurrent_batch::ConcurrentBatch; use util::lock_non_send; use util::measure; @@ -196,6 +197,9 @@ pub struct Inner { block_access: Arc, slab_allocator: Arc, + checkpoint_synced: watch::Receiver, + checkpoint_wanted: std::sync::Mutex>, + // lock ordering: index first then state old_index: Arc>, new_index: Arc>>, // merging into this @@ -787,6 +791,8 @@ struct ZettaCacheState { atime: Atime, stats: Arc, + checkpoint_synced: watch::Sender, + merge_requested: bool, } pub struct LockedKey(LockedItem); @@ -819,13 +825,7 @@ impl ZettaCache { let new_capacity = block_access .disks() - .map(|disk| { - Extent::new( - disk, - SUPERBLOCK_SIZE, - block_access.disk_size(disk) - SUPERBLOCK_SIZE, - ) - }) + .map(|disk| block_access.disk_extent(disk).trim_start(SUPERBLOCK_SIZE)) .collect::>(); let checkpoint = CheckpointPhys { @@ -941,13 +941,7 @@ impl ZettaCache { let new_capacity = extra_disks .iter() - .map(|&disk| { - Extent::new( - disk, - SUPERBLOCK_SIZE, - block_access.disk_size(disk) - SUPERBLOCK_SIZE, - ) - }) + .map(|&disk| block_access.disk_extent(disk).trim_start(SUPERBLOCK_SIZE)) .collect::>(); primary.disks.extend( extra_disks @@ -1089,6 +1083,9 @@ impl ZettaCache { let stats = Arc::new(CacheStats::default()); + let (checkpoint_synced_tx, checkpoint_synced_rx) = watch::channel(primary.checkpoint_id); + let (checkpoint_wanted_tx, checkpoint_wanted_rx) = watch::channel(primary.checkpoint_id); + let mut state = ZettaCacheState { block_access: block_access.clone(), pending_changes, @@ -1110,6 +1107,8 @@ impl ZettaCache { block_allocator, slab_allocator, stats: stats.clone(), + checkpoint_synced: checkpoint_synced_tx, + merge_requested: false, }; // Now that BlockAllocator is open grab its size stats (these will be updated periodically) @@ -1139,6 +1138,8 @@ impl ZettaCache { timebase: Instant::now(), cache_runtime_id: Uuid::new_v4(), pool_guids: PoolGuidMapping::open(checkpoint.pool_guids), + checkpoint_synced: checkpoint_synced_rx, + checkpoint_wanted: std::sync::Mutex::new(checkpoint_wanted_tx), })); let merging = match checkpoint.merge_progress { @@ -1158,7 +1159,9 @@ impl ZettaCache { let my_cache = this.clone(); measure!("checkpoint_task").spawn(async move { - my_cache.checkpoint_task(merging).await; + my_cache + .checkpoint_task(checkpoint_wanted_rx, merging) + .await; }); let state = this.state.clone(); @@ -1246,6 +1249,7 @@ impl ZettaCache { /// for the current progress are passed in. async fn checkpoint_task( &self, + mut checkpoint_wanted: watch::Receiver, mut merging: Option<(mpsc::Receiver, IndexRunPhys)>, ) { let mut next_tick = tokio::time::Instant::now(); @@ -1267,10 +1271,25 @@ impl ZettaCache { let mut state_lock_held = Duration::ZERO; // we have a channel to an active merge task, check it for messages loop { - let result = timeout_at(next_tick, rx.recv()).await; + let result = select! { + result = rx.recv() => result, + _ = sleep_until(next_tick) => break, + _ = checkpoint_wanted.changed() => { + let wanted = *checkpoint_wanted.borrow_and_update(); + let synced = *self.checkpoint_synced.borrow(); + debug!( + "checkpoint_wanted changed, wanted={wanted:?} synced={synced:?}", + ); + if wanted > synced { + break; + } else { + continue; + } + } + }; match result { // capture merge progress: the current next index phys and eviction requests - Ok(Some(MergeMessage::Progress(progress))) => { + Some(MergeMessage::Progress(progress)) => { msg_count += 1; free_count += progress.frees.len(); cache_updates_count += progress.cache_updates.len(); @@ -1311,7 +1330,11 @@ impl ZettaCache { match entry.value.location() { // It's possible the key wasn't already in the cache, so // this may add or update the key. - Some(_) => state.index_cache.put(entry.key, entry.value), + Some(_) => { + with_alloctag("ZettaCacheState::index_cache", || { + state.index_cache.put(entry.key, entry.value) + }) + } // It's possible the key isn't in the cache; .pop() doesn't // fail in that case. None => state.index_cache.pop(&entry.key), @@ -1344,7 +1367,7 @@ impl ZettaCache { } } // merge task complete, replace the current index with the new index - Ok(Some(MergeMessage::Complete(new_index))) => { + Some(MergeMessage::Complete(new_index)) => { let mut old_index = self.old_index.write().await; let mut new_index_opt = self.new_index.write().await; @@ -1354,24 +1377,37 @@ impl ZettaCache { *new_index_opt = None; merging = None; completed_merge = true; + // When a merge completes, we immediately flush a checkpoint, so that + // we can then check if another merge is needed without delay. break; } - Ok(None) => panic!("channel closed before Complete message received"), - Err(_) => break, // timed out + None => panic!("channel closed before Complete message received"), } } debug!( - "processed {} merge messages with {} frees and {} cache updates in {}ms (state lock held for {}ms)", - msg_count, - free_count, - cache_updates_count, + "processed {msg_count} merge messages with {free_count} frees and \ + {cache_updates_count} cache updates in {}ms (state lock held for {}ms)", begin.elapsed().as_millis(), state_lock_held.as_millis(), ); + } else { + loop { + select! { + _ = sleep_until(next_tick) => break, + _ = checkpoint_wanted.changed() => { + let wanted = *checkpoint_wanted.borrow_and_update(); + let synced = *self.checkpoint_synced.borrow(); + debug!( + "checkpoint_wanted changed, wanted={wanted:?} synced={synced:?}", + ); + if wanted > synced { + break; + } + } + } + } } - // flush out a new checkpoint every CHECKPOINT_INTERVAL to capture the current state - sleep_until(next_tick).await; self.flush_checkpoint( merging.as_mut().map(|(_, phys)| (phys.clone())), completed_merge, @@ -1731,8 +1767,7 @@ impl ZettaCache { cache.insert_impl(locked_key, bytes.into(), source).await; // We want to hold onto the insert_permit until the write completes because it // represents the memory that's required to buffer this insertion, which isn't - // released until the io completes. Similarly, the write_permit (roughly) represents - // the disks' capacity to perform i/o. + // released until the io completes. drop(insert_permit); }); } @@ -1823,6 +1858,35 @@ impl ZettaCache { } } + pub async fn add_disk(&self, path: &str) -> Result<()> { + self.state.lock().await.add_disk(path)?; + self.sync_checkpoint().await; + Ok(()) + } + + pub async fn initiate_merge(&self) { + self.state.lock().await.request_merge(); + self.sync_checkpoint().await; + } + + /// Wait for the next checkpoint to be written to disk. Note that a checkpoint may already + /// be in progress, and its completion will count as the "next" checkpoint. If that is not + /// desired, the caller would need to coordinate with the in-progress checkpoint, e.g. by + /// acquiring the ZettaCacheState lock, which is held while writing out a checkpoint. + pub async fn sync_checkpoint(&self) { + let mut watch = self.checkpoint_synced.clone(); + let next = watch.borrow_and_update().next(); + debug!("waiting for {next:?}"); + { + let wanted = self.checkpoint_wanted.lock().unwrap(); + if next > *wanted.borrow() { + debug!("sending checkpoint_wanted {next:?}"); + wanted.send(next).unwrap(); + } + } // drop wanted lock + watch.changed().await.ok(); + } + pub fn sector_size(&self) -> usize { self.block_access.round_up_to_sector(1) } @@ -2163,7 +2227,7 @@ impl ZettaCacheState { self.slab_allocator .free(self.slab_allocator.extent_to_slab_id(extent)); } - self.primary.checkpoint_id = self.primary.checkpoint_id.next(); + self.primary.checkpoint_id = checkpoint.id; self.primary.feature_flags = SUPPORTED_FEATURES.keys().cloned().collect(); // We need to write all the disks' superblocks in case new disks have been added. self.primary @@ -2176,6 +2240,7 @@ impl ZettaCacheState { ), ); self.slab_allocator.release_frees(); + self.checkpoint_synced.send(checkpoint.id).ok(); info!( "completed {:?} in {}ms; flushed {} operations ({}) to log", @@ -2329,6 +2394,11 @@ impl ZettaCacheState { } } + if self.merge_requested { + debug!("starting merge due to user request"); + need_merge = true; + } + need_merge } @@ -2341,6 +2411,8 @@ impl ZettaCacheState { return None; } + self.merge_requested = false; + let reduction = self.space_to_evict(); let eviction_atime = self.atime_histogram.atime_for_eviction_target(reduction); @@ -2530,4 +2602,34 @@ impl ZettaCacheState { old_pending + self.pending_changes.len() as u64, ); } + + fn add_disk(&mut self, path: &str) -> Result<()> { + // We hold the state lock across all these operations to ensure that we're always + // adding the last DiskId to the SlabAllocator and Primary, in the case of concurrent + // calls to add_disk(). + + let disk_id = self.block_access.add_disk(Disk::new(path, false)?); + + self.slab_allocator.extend( + self.block_access + .disk_extent(disk_id) + .trim_start(SUPERBLOCK_SIZE), + ); + + self.primary + .disks + .insert(disk_id, DiskPhys::new(self.block_access.disk_size(disk_id))); + + // The hit data isn't accurate across cache size changes, so clear + // it, which also updates the histogram parameters to reflect the + // new cache size. + self.clear_hit_data(); + + info!("added {path} as {disk_id:?}"); + Ok(()) + } + + fn request_merge(&mut self) { + self.merge_requested = true; + } } diff --git a/cmd/zfs_object_agent/zettaobject/src/root_connection.rs b/cmd/zfs_object_agent/zettaobject/src/root_connection.rs index b47dcf1c440f..00a6581dd3da 100644 --- a/cmd/zfs_object_agent/zettaobject/src/root_connection.rs +++ b/cmd/zfs_object_agent/zettaobject/src/root_connection.rs @@ -118,6 +118,9 @@ impl RootConnectionState { Box::new(Self::resume_destroy_pool), ); server.register_handler(TYPE_CLEAR_HIT_DATA, Box::new(Self::clear_hit_data)); + server.register_handler(TYPE_ADD_DISK, Box::new(Self::add_disk)); + server.register_handler(TYPE_SYNC_CHECKPOINT, Box::new(Self::sync_checkpoint)); + server.register_handler(TYPE_INITIATE_MERGE, Box::new(Self::initiate_merge)); server.register_struct_handler(MessageType::ReadBlock, Box::new(Self::read_block)); server.register_struct_handler(MessageType::WriteBlock, Box::new(Self::write_block)); } @@ -598,6 +601,52 @@ impl RootConnectionState { return_result(TYPE_CLEAR_HIT_DATA, (), result, true) })) } + + fn add_disk(&mut self, nvl: NvList) -> HandlerReturn { + let cache = self.cache.clone(); + Ok(Box::pin(async move { + let request: AddDiskRequest = nvpair::from_nvlist(&nvl)?; + debug!("got {:?}", request); + + let result = match cache { + Some(cache) => Ok(cache.add_disk(&request.path).await?), + None => Err(FailureMessage::new("zettacache not present")), + }; + return_result(TYPE_ADD_DISK, (), result, true) + })) + } + + fn sync_checkpoint(&mut self, nvl: NvList) -> HandlerReturn { + let cache = self.cache.clone(); + Ok(Box::pin(async move { + debug!("got {:?}", nvl); + + let result = match cache { + Some(cache) => { + cache.sync_checkpoint().await; + Ok(()) + } + None => Err(FailureMessage::new("zettacache not present")), + }; + return_result(TYPE_SYNC_CHECKPOINT, (), result, true) + })) + } + + fn initiate_merge(&mut self, nvl: NvList) -> HandlerReturn { + let cache = self.cache.clone(); + Ok(Box::pin(async move { + debug!("got {:?}", nvl); + + let result = match cache { + Some(cache) => { + cache.initiate_merge().await; + Ok(()) + } + None => Err(FailureMessage::new("zettacache not present")), + }; + return_result(TYPE_INITIATE_MERGE, (), result, true) + })) + } } fn return_struct(response: T, debug: bool) -> Result>