From 428354d3471351b75c1483c7b404fa7490bca78a Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Mon, 13 Mar 2023 19:08:46 +0800 Subject: [PATCH] feat(backup): support mutating backup config (#8505) Co-authored-by: Zhidong Guo <52783948+Gun9niR@users.noreply.github.com> --- src/common/src/system_param/mod.rs | 10 ++ src/compute/src/server.rs | 20 ++- src/meta/src/backup_restore/backup_manager.rs | 149 ++++++++++++++++-- src/meta/src/rpc/server.rs | 29 +--- .../backup/integration_tests/run_all.sh | 1 + .../integration_tests/test_set_config.sh | 79 ++++++++++ src/storage/backup/src/storage.rs | 1 + src/storage/src/hummock/backup_reader.rs | 98 +++++++++--- src/storage/src/hummock/mod.rs | 17 +- src/storage/src/store_impl.rs | 15 +- .../src/delete_range_runner.rs | 2 - 11 files changed, 341 insertions(+), 80 deletions(-) create mode 100644 src/storage/backup/integration_tests/test_set_config.sh diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index eaf51308d5e56..a0759c9a1d7d6 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -281,6 +281,16 @@ impl ValidateOnSet for OverrideValidateOnSet { fn checkpoint_frequency(v: &u64) -> Result<()> { Self::expect_range(*v, 1..) } + + fn backup_storage_directory(_v: &String) -> Result<()> { + // TODO + Ok(()) + } + + fn backup_storage_url(_v: &String) -> Result<()> { + // TODO + Ok(()) + } } for_all_undeprecated_params!(impl_default_from_other_params); diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index c972951180cae..8c27dd8749672 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -170,6 +170,13 @@ pub async fn compute_node_serve( .await .unwrap(); + // Initialize observer manager. + let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params.clone())); + let compute_observer_node = ComputeObserverNode::new(system_params_manager.clone()); + let observer_manager = + ObserverManager::new_with_meta_client(meta_client.clone(), compute_observer_node).await; + observer_manager.start().await; + let mut extra_info_sources: Vec = vec![]; if let Some(storage) = state_store.as_hummock_trait() { extra_info_sources.push(storage.sstable_id_manager().clone()); @@ -206,6 +213,12 @@ pub async fn compute_node_serve( memory_limiter, )); monitor_cache(memory_collector, ®istry).unwrap(); + let backup_reader = storage.backup_reader(); + tokio::spawn(async move { + backup_reader + .watch_config_change(system_params_manager.watch_params()) + .await; + }); } sub_tasks.push(MetaClient::start_heartbeat_loop( @@ -253,13 +266,6 @@ pub async fn compute_node_serve( // of lru manager. stream_mgr.set_watermark_epoch(watermark_epoch).await; - // Initialize observer manager. - let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params)); - let compute_observer_node = ComputeObserverNode::new(system_params_manager.clone()); - let observer_manager = - ObserverManager::new_with_meta_client(meta_client.clone(), compute_observer_node).await; - observer_manager.start().await; - let grpc_await_tree_reg = await_tree_config .map(|config| AwaitTreeRegistryRef::new(await_tree::Registry::new(config).into())); let dml_mgr = Arc::new(DmlManager::default()); diff --git a/src/meta/src/backup_restore/backup_manager.rs b/src/meta/src/backup_restore/backup_manager.rs index edcd57b73bfce..93e342776223e 100644 --- a/src/meta/src/backup_restore/backup_manager.rs +++ b/src/meta/src/backup_restore/backup_manager.rs @@ -15,13 +15,16 @@ use std::sync::Arc; use std::time::Instant; +use arc_swap::ArcSwap; use itertools::Itertools; use prometheus::Registry; use risingwave_backup::error::BackupError; -use risingwave_backup::storage::MetaSnapshotStorageRef; +use risingwave_backup::storage::{BoxedMetaSnapshotStorage, ObjectStoreMetaSnapshotStorage}; use risingwave_backup::{MetaBackupJobId, MetaSnapshotId, MetaSnapshotManifest}; use risingwave_common::bail; use risingwave_hummock_sdk::HummockSstableId; +use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; +use risingwave_object_store::object::parse_remote_object_store; use risingwave_pb::backup_service::{BackupJobStatus, MetaBackupManifestId}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use tokio::task::JoinHandle; @@ -29,7 +32,7 @@ use tokio::task::JoinHandle; use crate::backup_restore::meta_snapshot_builder::MetaSnapshotBuilder; use crate::backup_restore::metrics::BackupManagerMetrics; use crate::hummock::{HummockManagerRef, HummockVersionSafePoint}; -use crate::manager::{IdCategory, MetaSrvEnv}; +use crate::manager::{IdCategory, LocalNotification, MetaSrvEnv}; use crate::storage::MetaStore; use crate::MetaResult; @@ -57,40 +60,118 @@ impl BackupJobHandle { } pub type BackupManagerRef = Arc>; +/// (url, dir) +type StoreConfig = (String, String); /// `BackupManager` manages lifecycle of all existent backups and the running backup job. pub struct BackupManager { env: MetaSrvEnv, hummock_manager: HummockManagerRef, - backup_store: MetaSnapshotStorageRef, + backup_store: ArcSwap<(BoxedMetaSnapshotStorage, StoreConfig)>, /// Tracks the running backup job. Concurrent jobs is not supported. running_backup_job: tokio::sync::Mutex>, metrics: BackupManagerMetrics, } impl BackupManager { - pub fn new( + pub async fn new( env: MetaSrvEnv, hummock_manager: HummockManagerRef, - backup_store: MetaSnapshotStorageRef, registry: Registry, + store_url: &str, + store_dir: &str, + ) -> MetaResult> { + let store_config = (store_url.to_string(), store_dir.to_string()); + let store = create_snapshot_store(&store_config).await?; + tracing::info!( + "backup manager initialized: url={}, dir={}", + store_config.0, + store_config.1 + ); + let instance = Arc::new(Self::with_store( + env.clone(), + hummock_manager, + registry, + (store, store_config), + )); + let (local_notification_tx, mut local_notification_rx) = + tokio::sync::mpsc::unbounded_channel(); + env.notification_manager() + .insert_local_sender(local_notification_tx) + .await; + let this = instance.clone(); + tokio::spawn(async move { + loop { + match local_notification_rx.recv().await { + Some(notification) => { + if let LocalNotification::SystemParamsChange(p) = notification { + let new_config = ( + p.backup_storage_url().to_string(), + p.backup_storage_directory().to_string(), + ); + this.handle_new_config(new_config).await; + } + } + None => { + return; + } + } + } + }); + Ok(instance) + } + + async fn handle_new_config(&self, new_config: StoreConfig) { + if self.backup_store.load().1 == new_config { + return; + } + if let Err(e) = self.set_store(new_config.clone()).await { + // Retry is driven by periodic system params notification. + tracing::warn!( + "failed to apply new backup config: url={}, dir={}, {:#?}", + new_config.0, + new_config.1, + e + ); + } + } + + fn with_store( + env: MetaSrvEnv, + hummock_manager: HummockManagerRef, + registry: Registry, + backup_store: (BoxedMetaSnapshotStorage, StoreConfig), ) -> Self { Self { env, hummock_manager, - backup_store, + backup_store: ArcSwap::from_pointee(backup_store), running_backup_job: tokio::sync::Mutex::new(None), metrics: BackupManagerMetrics::new(registry), } } + pub async fn set_store(&self, config: StoreConfig) -> MetaResult<()> { + let new_store = create_snapshot_store(&config).await?; + tracing::info!( + "new backup config is applied: url={}, dir={}", + config.0, + config.1 + ); + self.backup_store.store(Arc::new((new_store, config))); + Ok(()) + } + #[cfg(test)] pub fn for_test(env: MetaSrvEnv, hummock_manager: HummockManagerRef) -> Self { - Self::new( + Self::with_store( env, hummock_manager, - Arc::new(risingwave_backup::storage::DummyMetaSnapshotStorage::default()), Registry::new(), + ( + Box::::default(), + StoreConfig::default(), + ), ) } @@ -104,6 +185,26 @@ impl BackupManager { job.job_id )); } + // The reasons to limit number of meta snapshot are: + // 1. limit size of `MetaSnapshotManifest`, which is kept in memory by + // `ObjectStoreMetaSnapshotStorage`. + // 2. limit number of pinned SSTs returned by + // `list_pinned_ssts`, which subsequently is used by GC. + const MAX_META_SNAPSHOT_NUM: usize = 100; + let current_number = self + .backup_store + .load() + .0 + .manifest() + .snapshot_metadata + .len(); + if current_number > MAX_META_SNAPSHOT_NUM { + bail!(format!( + "too many existent meta snapshots, expect at most {}", + MAX_META_SNAPSHOT_NUM + )) + } + let job_id = self .env .id_gen_manager() @@ -134,6 +235,8 @@ impl BackupManager { } if self .backup_store + .load() + .0 .manifest() .snapshot_metadata .iter() @@ -160,7 +263,7 @@ impl BackupManager { .notify_hummock_without_version( Operation::Update, Info::MetaBackupManifestId(MetaBackupManifestId { - id: self.backup_store.manifest().manifest_id, + id: self.backup_store.load().0.manifest().manifest_id, }), ); } @@ -188,13 +291,13 @@ impl BackupManager { /// Deletes existent backups from backup storage. pub async fn delete_backups(&self, ids: &[MetaSnapshotId]) -> MetaResult<()> { - self.backup_store.delete(ids).await?; + self.backup_store.load().0.delete(ids).await?; self.env .notification_manager() .notify_hummock_without_version( Operation::Update, Info::MetaBackupManifestId(MetaBackupManifestId { - id: self.backup_store.manifest().manifest_id, + id: self.backup_store.load().0.manifest().manifest_id, }), ); Ok(()) @@ -203,6 +306,8 @@ impl BackupManager { /// List all `SSTables` required by backups. pub fn list_pinned_ssts(&self) -> Vec { self.backup_store + .load() + .0 .manifest() .snapshot_metadata .iter() @@ -212,7 +317,7 @@ impl BackupManager { } pub fn manifest(&self) -> Arc { - self.backup_store.manifest() + self.backup_store.load().0.manifest() } } @@ -234,7 +339,12 @@ impl BackupWorker { // Reuse job id as snapshot id. snapshot_builder.build(job_id).await?; let snapshot = snapshot_builder.finish()?; - backup_manager_clone.backup_store.create(&snapshot).await?; + backup_manager_clone + .backup_store + .load() + .0 + .create(&snapshot) + .await?; Ok(BackupJobResult::Succeeded) }; tokio::spawn(async move { @@ -245,3 +355,16 @@ impl BackupWorker { }) } } + +async fn create_snapshot_store(config: &StoreConfig) -> MetaResult { + let object_store = Arc::new( + parse_remote_object_store( + &config.0, + Arc::new(ObjectStoreMetrics::unused()), + "Meta Backup", + ) + .await, + ); + let store = ObjectStoreMetaSnapshotStorage::new(&config.1, object_store).await?; + Ok(Box::new(store)) +} diff --git a/src/meta/src/rpc/server.rs b/src/meta/src/rpc/server.rs index 63d8e8f8640d9..2ec0ef07eb93c 100644 --- a/src/meta/src/rpc/server.rs +++ b/src/meta/src/rpc/server.rs @@ -18,11 +18,8 @@ use std::time::Duration; use either::Either; use etcd_client::ConnectOptions; -use risingwave_backup::storage::ObjectStoreMetaSnapshotStorage; use risingwave_common::monitor::process_linux::monitor_process; use risingwave_common_service::metrics_manager::MetricsManager; -use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; -use risingwave_object_store::object::parse_remote_object_store; use risingwave_pb::backup_service::backup_service_server::BackupServiceServer; use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer; use risingwave_pb::health::health_server::HealthServer; @@ -426,31 +423,17 @@ pub async fn start_service_as_election_leader( .await .expect("list_table_fragments"), ) - .await - .unwrap(); + .await?; // Initialize services. - let backup_object_store = Arc::new( - parse_remote_object_store( - system_params_reader.backup_storage_url(), - Arc::new(ObjectStoreMetrics::unused()), - "Meta Backup", - ) - .await, - ); - let backup_storage = Arc::new( - ObjectStoreMetaSnapshotStorage::new( - system_params_reader.backup_storage_directory(), - backup_object_store, - ) - .await?, - ); - let backup_manager = Arc::new(BackupManager::new( + let backup_manager = BackupManager::new( env.clone(), hummock_manager.clone(), - backup_storage, meta_metrics.registry().clone(), - )); + system_params_reader.backup_storage_url(), + system_params_reader.backup_storage_directory(), + ) + .await?; let vacuum_manager = Arc::new(hummock::VacuumManager::new( env.clone(), hummock_manager.clone(), diff --git a/src/storage/backup/integration_tests/run_all.sh b/src/storage/backup/integration_tests/run_all.sh index b89e2a77a0040..41dce2d51a0e0 100644 --- a/src/storage/backup/integration_tests/run_all.sh +++ b/src/storage/backup/integration_tests/run_all.sh @@ -6,6 +6,7 @@ tests=( \ "test_basic.sh" \ "test_pin_sst.sh" \ "test_query_backup.sh" \ +"test_set_config.sh" \ ) for t in "${tests[@]}" do diff --git a/src/storage/backup/integration_tests/test_set_config.sh b/src/storage/backup/integration_tests/test_set_config.sh new file mode 100644 index 0000000000000..2539df02882e0 --- /dev/null +++ b/src/storage/backup/integration_tests/test_set_config.sh @@ -0,0 +1,79 @@ +#!/usr/bin/env bash + +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" +. "${DIR}/common.sh" + +stop_cluster +clean_all_data +start_cluster + +execute_sql_and_expect \ +"SHOW parameters;" \ +"backup_storage_directory | backup" + +execute_sql_and_expect \ +"SHOW parameters;" \ +"backup_storage_url | minio://hummockadmin:hummockadmin@127.0.0.1:9301/hummock001" + +backup + +execute_sql_and_expect \ +"SELECT meta_snapshot_id FROM rw_catalog.rw_meta_snapshot;" \ +"1 row" + +execute_sql_and_expect \ +"alter system set backup_storage_directory to backup_1;" \ +"ALTER_SYSTEM" +# system params application is async. +sleep 5 + +execute_sql_and_expect \ +"SELECT meta_snapshot_id FROM rw_catalog.rw_meta_snapshot;" \ +"0 row" + +backup +backup +backup +execute_sql_and_expect \ +"SELECT meta_snapshot_id FROM rw_catalog.rw_meta_snapshot;" \ +"3 row" + +execute_sql_and_expect \ +"alter system set backup_storage_directory to backup;" \ +"ALTER_SYSTEM" +sleep 5 + +execute_sql_and_expect \ +"SELECT meta_snapshot_id FROM rw_catalog.rw_meta_snapshot;" \ +"1 row" + +execute_sql_and_expect \ +"alter system set backup_storage_url to memory;" \ +"ALTER_SYSTEM" +sleep 5 + +execute_sql_and_expect \ +"SELECT meta_snapshot_id FROM rw_catalog.rw_meta_snapshot;" \ +"0 row" + +backup +backup +execute_sql_and_expect \ +"SELECT meta_snapshot_id FROM rw_catalog.rw_meta_snapshot;" \ +"2 row" + +execute_sql_and_expect \ +"alter system set backup_storage_url to \"minio://hummockadmin:hummockadmin@127.0.0.1:9301/hummock001\"" \ +"ALTER_SYSTEM" +sleep 5 + +execute_sql_and_expect \ +"SELECT meta_snapshot_id FROM rw_catalog.rw_meta_snapshot;" \ +"1 row" + +backup +execute_sql_and_expect \ +"SELECT meta_snapshot_id FROM rw_catalog.rw_meta_snapshot;" \ +"2 row" + +echo "test succeeded" \ No newline at end of file diff --git a/src/storage/backup/src/storage.rs b/src/storage/backup/src/storage.rs index bc9108bdd2e57..ff8ef286d632b 100644 --- a/src/storage/backup/src/storage.rs +++ b/src/storage/backup/src/storage.rs @@ -24,6 +24,7 @@ use crate::{ }; pub type MetaSnapshotStorageRef = Arc; +pub type BoxedMetaSnapshotStorage = Box; #[async_trait::async_trait] pub trait MetaSnapshotStorage: 'static + Sync + Send { diff --git a/src/storage/src/hummock/backup_reader.rs b/src/storage/src/hummock/backup_reader.rs index 1bcfe9bc1a72a..1a79a87e9d5ec 100644 --- a/src/storage/src/hummock/backup_reader.rs +++ b/src/storage/src/hummock/backup_reader.rs @@ -18,14 +18,16 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; +use arc_swap::ArcSwap; use futures::future::Shared; use futures::FutureExt; use risingwave_backup::error::BackupError; use risingwave_backup::meta_snapshot::MetaSnapshot; use risingwave_backup::storage::{ - DummyMetaSnapshotStorage, MetaSnapshotStorageRef, ObjectStoreMetaSnapshotStorage, + BoxedMetaSnapshotStorage, DummyMetaSnapshotStorage, ObjectStoreMetaSnapshotStorage, }; use risingwave_backup::MetaSnapshotId; +use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_object_store::object::parse_remote_object_store; @@ -40,39 +42,48 @@ type VersionHolder = ( tokio::sync::mpsc::UnboundedReceiver, ); -pub async fn parse_meta_snapshot_storage( - storage_url: &str, - storage_directory: &str, -) -> StorageResult { +async fn create_snapshot_store(config: &StoreConfig) -> StorageResult { let backup_object_store = Arc::new( parse_remote_object_store( - storage_url, + &config.0, Arc::new(ObjectStoreMetrics::unused()), "Meta Backup", ) .await, ); - let store = Arc::new( - ObjectStoreMetaSnapshotStorage::new(storage_directory, backup_object_store).await?, - ); + let store = + Box::new(ObjectStoreMetaSnapshotStorage::new(&config.1, backup_object_store).await?); Ok(store) } type InflightRequest = Shared> + Send>>>; +/// (url, dir) +type StoreConfig = (String, String); /// `BackupReader` helps to access historical hummock versions, /// which are persisted in meta snapshots (aka backups). pub struct BackupReader { versions: parking_lot::RwLock>, inflight_request: parking_lot::Mutex>, - store: MetaSnapshotStorageRef, + store: ArcSwap<(BoxedMetaSnapshotStorage, StoreConfig)>, refresh_tx: tokio::sync::mpsc::UnboundedSender, } impl BackupReader { - pub fn new(store: MetaSnapshotStorageRef) -> BackupReaderRef { + pub async fn new(storage_url: &str, storage_directory: &str) -> StorageResult { + let config = (storage_url.to_string(), storage_directory.to_string()); + let store = create_snapshot_store(&config).await?; + tracing::info!( + "backup reader is initialized: url={}, dir={}", + config.0, + config.1 + ); + Ok(Self::with_store((store, config))) + } + + fn with_store(store: (BoxedMetaSnapshotStorage, StoreConfig)) -> BackupReaderRef { let (refresh_tx, refresh_rx) = tokio::sync::mpsc::unbounded_channel(); let instance = Arc::new(Self { - store, + store: ArcSwap::from_pointee(store), versions: Default::default(), inflight_request: Default::default(), refresh_tx, @@ -82,9 +93,24 @@ impl BackupReader { } pub fn unused() -> BackupReaderRef { - Self::new(Arc::new(DummyMetaSnapshotStorage::default())) + Self::with_store(( + Box::::default(), + StoreConfig::default(), + )) + } + + async fn set_store(&self, config: StoreConfig) -> StorageResult<()> { + let new_store = create_snapshot_store(&config).await?; + tracing::info!( + "backup reader is updated: url={}, dir={}", + config.0, + config.1 + ); + self.store.store(Arc::new((new_store, config))); + Ok(()) } + /// Watches latest manifest id to keep local manifest update to date. async fn start_manifest_refresher( backup_reader: BackupReaderRef, mut refresh_rx: tokio::sync::mpsc::UnboundedReceiver, @@ -95,11 +121,13 @@ impl BackupReader { break; } let expect_manifest_id = expect_manifest_id.unwrap(); - let previous_id = backup_reader.store.manifest().manifest_id; + // Use the same store throughout one run. + let current_store = backup_reader.store.load_full(); + let previous_id = current_store.0.manifest().manifest_id; if expect_manifest_id <= previous_id { continue; } - if let Err(e) = backup_reader.store.refresh_manifest().await { + if let Err(e) = current_store.0.refresh_manifest().await { // reschedule refresh request tracing::warn!("failed to refresh backup manifest, will retry. {}", e); let backup_reader_clone = backup_reader.clone(); @@ -110,8 +138,8 @@ impl BackupReader { continue; } // purge stale version cache - let manifest: HashSet = backup_reader - .store + let manifest: HashSet = current_store + .0 .manifest() .snapshot_metadata .iter() @@ -138,9 +166,11 @@ impl BackupReader { self: &BackupReaderRef, epoch: u64, ) -> StorageResult> { + // Use the same store throughout the call. + let current_store = self.store.load_full(); // 1. check manifest to locate snapshot, if any. - let snapshot_id = self - .store + let snapshot_id = current_store + .0 .manifest() .snapshot_metadata .iter() @@ -163,7 +193,7 @@ impl BackupReader { } else { let this = self.clone(); let f = async move { - let snapshot = this.store.get(snapshot_id).await.map_err(|e| { + let snapshot = current_store.0.get(snapshot_id).await.map_err(|e| { format!("failed to get meta snapshot {}. {}", snapshot_id, e) })?; let version_holder = build_version_holder(snapshot); @@ -184,6 +214,34 @@ impl BackupReader { self.inflight_request.lock().remove(&snapshot_id); result } + + pub async fn watch_config_change( + &self, + mut rx: tokio::sync::watch::Receiver, + ) { + loop { + if rx.changed().await.is_err() { + break; + } + let p = rx.borrow().load(); + let config = ( + p.backup_storage_url().to_string(), + p.backup_storage_directory().to_string(), + ); + if config == self.store.load().1 { + continue; + } + if let Err(e) = self.set_store(config.clone()).await { + // Retry is driven by periodic system params notification. + tracing::warn!( + "failed to update backup reader: url={}, dir={}, {:#?}", + config.0, + config.1, + e + ); + } + } + } } fn build_version_holder(s: MetaSnapshot) -> VersionHolder { diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index 6cc3dd73eb97a..c5527ad7eed99 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -80,9 +80,7 @@ use self::event_handler::ReadVersionMappingType; use self::iterator::HummockIterator; pub use self::sstable_store::*; use super::monitor::HummockStateStoreMetrics; -#[cfg(any(test, feature = "test"))] -use crate::hummock::backup_reader::BackupReader; -use crate::hummock::backup_reader::BackupReaderRef; +use crate::hummock::backup_reader::{BackupReader, BackupReaderRef}; use crate::hummock::compactor::CompactorContext; use crate::hummock::event_handler::hummock_event_handler::BufferTracker; use crate::hummock::event_handler::{HummockEvent, HummockEventHandler}; @@ -146,7 +144,6 @@ impl HummockStorage { pub async fn new( options: Arc, sstable_store: SstableStoreRef, - backup_reader: BackupReaderRef, hummock_meta_client: Arc, notification_client: impl NotificationClient, state_store_metrics: Arc, @@ -157,7 +154,12 @@ impl HummockStorage { hummock_meta_client.clone(), options.sstable_id_remote_fetch_number, )); - + let backup_reader = BackupReader::new( + &options.backup_storage_url, + &options.backup_storage_directory, + ) + .await + .map_err(HummockError::read_backup_error)?; let filter_key_extractor_manager = Arc::new(FilterKeyExtractorManager::default()); let write_limiter = Arc::new(WriteLimiter::default()); let (event_tx, mut event_rx) = unbounded_channel(); @@ -272,6 +274,10 @@ impl HummockStorage { pub fn get_pinned_version(&self) -> PinnedVersion { self.pinned_version.load().deref().deref().clone() } + + pub fn backup_reader(&self) -> BackupReaderRef { + self.backup_reader.clone() + } } #[cfg(any(test, feature = "test"))] @@ -327,7 +333,6 @@ impl HummockStorage { Self::new( options, sstable_store, - BackupReader::unused(), hummock_meta_client, notification_client, Arc::new(HummockStateStoreMetrics::unused()), diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index c87f3b905a848..60ebb3c7edf84 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -23,7 +23,7 @@ use risingwave_object_store::object::{ }; use crate::error::StorageResult; -use crate::hummock::backup_reader::{parse_meta_snapshot_storage, BackupReader}; +use crate::hummock::backup_reader::BackupReaderRef; use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient; use crate::hummock::sstable_store::SstableStoreRef; use crate::hummock::{ @@ -599,17 +599,9 @@ impl StateStoreImpl { )); let notification_client = RpcNotificationClient::new(hummock_meta_client.get_inner().clone()); - - let backup_store = parse_meta_snapshot_storage( - &opts.backup_storage_url, - &opts.backup_storage_directory, - ) - .await?; - let backup_reader = BackupReader::new(backup_store); let inner = HummockStorage::new( opts.clone(), sstable_store, - backup_reader, hummock_meta_client.clone(), notification_client, state_store_metrics.clone(), @@ -645,6 +637,7 @@ pub trait HummockTrait { fn sstable_store(&self) -> SstableStoreRef; fn filter_key_extractor_manager(&self) -> &FilterKeyExtractorManagerRef; fn get_memory_limiter(&self) -> Arc; + fn backup_reader(&self) -> BackupReaderRef; fn as_hummock(&self) -> Option<&HummockStorage>; } @@ -665,6 +658,10 @@ impl HummockTrait for HummockStorage { self.get_memory_limiter() } + fn backup_reader(&self) -> BackupReaderRef { + self.backup_reader() + } + fn as_hummock(&self) -> Option<&HummockStorage> { Some(self) } diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 7188fdb1f27f8..6655732276765 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -41,7 +41,6 @@ use risingwave_pb::catalog::Table as ProstTable; use risingwave_pb::hummock::{CompactionConfig, CompactionGroupInfo}; use risingwave_pb::meta::SystemParams; use risingwave_rpc_client::HummockMetaClient; -use risingwave_storage::hummock::backup_reader::BackupReader; use risingwave_storage::hummock::compactor::{CompactionExecutor, CompactorContext}; use risingwave_storage::hummock::sstable_store::SstableStoreRef; use risingwave_storage::hummock::{ @@ -185,7 +184,6 @@ async fn compaction_test( let store = HummockStorage::new( storage_opts.clone(), sstable_store.clone(), - BackupReader::unused(), meta_client.clone(), get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node), state_store_metrics.clone(),