diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index d9d3d9d66244..cbc97e722816 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -335,13 +335,36 @@ fn start_pageserver( // Set up remote storage client let remote_storage = create_remote_storage_client(conf)?; + // All tenant load operations carry this while they are ongoing; it will be dropped once those + // operations finish either successfully or in some other manner. However, the initial load + // will be then done, and we can start the global background tasks. + let (init_done_tx, init_done_rx) = tokio::sync::mpsc::channel::<()>(1); + let init_done_rx = Arc::new(tokio::sync::Mutex::new(init_done_rx)); + // Scan the local 'tenants/' directory and start loading the tenants + let init_started_at = std::time::Instant::now(); BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr( conf, broker_client.clone(), remote_storage.clone(), + init_done_tx, ))?; + BACKGROUND_RUNTIME.spawn({ + let init_done_rx = init_done_rx.clone(); + async move { + let init_done = async move { init_done_rx.lock().await.recv().await }; + init_done.await; + + let elapsed = init_started_at.elapsed(); + + tracing::info!( + elapsed_millis = elapsed.as_millis(), + "Initial load completed." + ); + } + }); + // shared state between the disk-usage backed eviction background task and the http endpoint // that allows triggering disk-usage based eviction manually. note that the http endpoint // is still accessible even if background task is not configured as long as remote storage has @@ -353,6 +376,7 @@ fn start_pageserver( conf, remote_storage.clone(), disk_usage_eviction_state.clone(), + init_done_rx.clone(), )?; } @@ -390,6 +414,7 @@ fn start_pageserver( ); if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint { + let init_done_rx = init_done_rx; let metrics_ctx = RequestContext::todo_child( TaskKind::MetricsCollection, // This task itself shouldn't download anything. @@ -405,6 +430,14 @@ fn start_pageserver( "consumption metrics collection", true, async move { + // first wait for initial load to complete before first iteration. + // + // this is because we only process active tenants and timelines, and the + // Timeline::get_current_logical_size will spawn the logical size calculation, + // which will not be rate-limited. + let init_done = async move { init_done_rx.lock().await.recv().await }; + init_done.await; + pageserver::consumption_metrics::collect_metrics( metric_collection_endpoint, conf.metric_collection_interval, diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index f4a0f3f18eb0..03589691994a 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -82,6 +82,7 @@ pub fn launch_disk_usage_global_eviction_task( conf: &'static PageServerConf, storage: GenericRemoteStorage, state: Arc, + init_done_rx: Arc>>, ) -> anyhow::Result<()> { let Some(task_config) = &conf.disk_usage_based_eviction else { info!("disk usage based eviction task not configured"); @@ -98,6 +99,10 @@ pub fn launch_disk_usage_global_eviction_task( "disk usage based eviction", false, async move { + // wait until initial load is complete, because we cannot evict from loading tenants. + let init_done = async move { init_done_rx.lock().await.recv().await }; + init_done.await; + disk_usage_eviction_task( &state, task_config, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 4c8101af8d3d..d6eb82410755 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -895,6 +895,7 @@ impl Tenant { tenant_id: TenantId, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, + init_done_tx: Option>, ctx: &RequestContext, ) -> Arc { let tenant_conf = match Self::load_tenant_config(conf, tenant_id) { @@ -928,6 +929,9 @@ impl Tenant { "initial tenant load", false, async move { + // keep the sender alive as long as we have the initial load ongoing; it will be + // None for loads spawned after init_tenant_mgr. + let _init_done_tx = init_done_tx; match tenant_clone.load(&ctx).await { Ok(()) => { info!("load finished, activating"); diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index c0bd81ebfc1e..d74a025bbb11 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -66,6 +66,7 @@ pub async fn init_tenant_mgr( conf: &'static PageServerConf, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, + init_done_tx: tokio::sync::mpsc::Sender<()>, ) -> anyhow::Result<()> { // Scan local filesystem for attached tenants let tenants_dir = conf.tenants_path(); @@ -122,6 +123,7 @@ pub async fn init_tenant_mgr( &tenant_dir_path, broker_client.clone(), remote_storage.clone(), + Some(init_done_tx.clone()), &ctx, ) { Ok(tenant) => { @@ -157,6 +159,7 @@ pub fn schedule_local_tenant_processing( tenant_path: &Path, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, + init_done_tx: Option>, ctx: &RequestContext, ) -> anyhow::Result> { anyhow::ensure!( @@ -210,7 +213,14 @@ pub fn schedule_local_tenant_processing( } else { info!("tenant {tenant_id} is assumed to be loadable, starting load operation"); // Start loading the tenant into memory. It will initially be in Loading state. - Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, ctx) + Tenant::spawn_load( + conf, + tenant_id, + broker_client, + remote_storage, + init_done_tx, + ctx, + ) }; Ok(tenant) } @@ -363,7 +373,7 @@ pub async fn create_tenant( // See https://github.com/neondatabase/neon/issues/4233 let created_tenant = - schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, ctx)?; + schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, ctx)?; // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // See https://github.com/neondatabase/neon/issues/4233 @@ -509,7 +519,7 @@ pub async fn load_tenant( .with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?; } - let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, ctx) + let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, ctx) .with_context(|| { format!("Failed to schedule tenant processing in path {tenant_path:?}") })?; @@ -582,7 +592,7 @@ pub async fn attach_tenant( .context("check for attach marker file existence")?; anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file"); - let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), ctx)?; + let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, ctx)?; // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // See https://github.com/neondatabase/neon/issues/4233 diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 9dd5352a540c..0569bd45e0ce 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1910,6 +1910,23 @@ impl Timeline { // no cancellation here, because nothing really waits for this to complete compared // to spawn_ondemand_logical_size_calculation. let cancel = CancellationToken::new(); + + /// Ugly, but necessary until `spawn_blocking` is used for blocking I/O, otherwise + /// we could lock up all worker threads. + static GLOBAL_INITIAL_LOGICAL_SIZES_AT_ONCE: once_cell::sync::Lazy> = once_cell::sync::Lazy::new(|| { + let cores = std::thread::available_parallelism(); + // half rationale: we have other blocking work which will start later: + // consumption metrics and per timeline eviction task. we however need to + // be fast to accept page reads, so perhaps this is a suitable middle ground? + let max_blocked_threads = cores.map(|count| count.get() / 2); + let max_blocked_threads = max_blocked_threads.unwrap_or(1); + let max_blocked_threads = std::cmp::max(1, max_blocked_threads); + tracing::info!("using max {max_blocked_threads} threads for initial logical size"); + Arc::new(tokio::sync::Semaphore::new(max_blocked_threads)) + }); + + let _permit = GLOBAL_INITIAL_LOGICAL_SIZES_AT_ONCE.clone().acquire_owned().await.expect("global semaphore is never closed"); + let calculated_size = match self_clone .logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel) .await