From d8f8ac6627c93712f55b7931fc40dc200f8d4a59 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 29 May 2023 14:13:48 +0300 Subject: [PATCH 01/13] fix: delay background tasks until initial load --- pageserver/src/bin/pageserver.rs | 13 +++++++++++++ pageserver/src/disk_usage_eviction_task.rs | 5 +++++ pageserver/src/tenant.rs | 4 ++++ pageserver/src/tenant/mgr.rs | 18 ++++++++++++++---- 4 files changed, 36 insertions(+), 4 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index d9d3d9d66244..097408cdbb4d 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -335,11 +335,18 @@ fn start_pageserver( // Set up remote storage client let remote_storage = create_remote_storage_client(conf)?; + // All tenant load operations carry this while they are ongoing; it will be dropped once those + // operations finish either successfully or in some other manner. However, the initial load + // will be then done, and we can start the global background tasks. + let (init_done_tx, init_done_rx) = tokio::sync::mpsc::channel::<()>(1); + let init_done_rx = Arc::new(tokio::sync::Mutex::new(init_done_rx)); + // Scan the local 'tenants/' directory and start loading the tenants BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr( conf, broker_client.clone(), remote_storage.clone(), + init_done_tx, ))?; // shared state between the disk-usage backed eviction background task and the http endpoint @@ -353,6 +360,7 @@ fn start_pageserver( conf, remote_storage.clone(), disk_usage_eviction_state.clone(), + init_done_rx.clone(), )?; } @@ -390,6 +398,7 @@ fn start_pageserver( ); if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint { + let init_done_rx = init_done_rx.clone(); let metrics_ctx = RequestContext::todo_child( TaskKind::MetricsCollection, // This task itself shouldn't download anything. @@ -405,6 +414,10 @@ fn start_pageserver( "consumption metrics collection", true, async move { + // first wait for initial load to complete before first iteration + let init_done = async move { init_done_rx.lock().await.recv().await }; + init_done.await; + pageserver::consumption_metrics::collect_metrics( metric_collection_endpoint, conf.metric_collection_interval, diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index f4a0f3f18eb0..3411d6a1a4e3 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -82,6 +82,7 @@ pub fn launch_disk_usage_global_eviction_task( conf: &'static PageServerConf, storage: GenericRemoteStorage, state: Arc, + init_done_rx: Arc>>, ) -> anyhow::Result<()> { let Some(task_config) = &conf.disk_usage_based_eviction else { info!("disk usage based eviction task not configured"); @@ -98,6 +99,10 @@ pub fn launch_disk_usage_global_eviction_task( "disk usage based eviction", false, async move { + // wait until initial load is complete, because we cannot evict if there are no tenants + let init_done = async move { init_done_rx.lock().await.recv().await }; + init_done.await; + disk_usage_eviction_task( &state, task_config, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 991f5ca1c64e..56e4233597ff 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -887,6 +887,7 @@ impl Tenant { tenant_id: TenantId, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, + init_done_tx: Option>, ctx: &RequestContext, ) -> Arc { let tenant_conf = match Self::load_tenant_config(conf, tenant_id) { @@ -920,6 +921,9 @@ impl Tenant { "initial tenant load", false, async move { + // keep the sender alive as long as we have the initial load ongoing; it will be + // None for loads spawned after init_tenant_mgr. + let _init_done_tx = init_done_tx; let doit = async { tenant_clone.load(&ctx).await?; tenant_clone.activate(broker_client, &ctx)?; diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index dbb9577bf0f3..175b150760e9 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -63,6 +63,7 @@ pub async fn init_tenant_mgr( conf: &'static PageServerConf, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, + init_done_tx: tokio::sync::mpsc::Sender<()>, ) -> anyhow::Result<()> { // Scan local filesystem for attached tenants let tenants_dir = conf.tenants_path(); @@ -119,6 +120,7 @@ pub async fn init_tenant_mgr( &tenant_dir_path, broker_client.clone(), remote_storage.clone(), + Some(init_done_tx.clone()), &ctx, ) { Ok(tenant) => { @@ -154,6 +156,7 @@ pub fn schedule_local_tenant_processing( tenant_path: &Path, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, + init_done_tx: Option>, ctx: &RequestContext, ) -> anyhow::Result> { anyhow::ensure!( @@ -207,7 +210,14 @@ pub fn schedule_local_tenant_processing( } else { info!("tenant {tenant_id} is assumed to be loadable, starting load operation"); // Start loading the tenant into memory. It will initially be in Loading state. - Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, ctx) + Tenant::spawn_load( + conf, + tenant_id, + broker_client, + remote_storage, + init_done_tx, + ctx, + ) }; Ok(tenant) } @@ -291,7 +301,7 @@ pub async fn create_tenant( // See https://github.com/neondatabase/neon/issues/4233 let created_tenant = - schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, ctx)?; + schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, ctx)?; // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // See https://github.com/neondatabase/neon/issues/4233 @@ -437,7 +447,7 @@ pub async fn load_tenant( .with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?; } - let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, ctx) + let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, ctx) .with_context(|| { format!("Failed to schedule tenant processing in path {tenant_path:?}") })?; @@ -510,7 +520,7 @@ pub async fn attach_tenant( .context("check for attach marker file existence")?; anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file"); - let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), ctx)?; + let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, ctx)?; // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // See https://github.com/neondatabase/neon/issues/4233 From e879d6cb4ce19de627c155ed20c81cf8cba3ff33 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 29 May 2023 14:14:34 +0300 Subject: [PATCH 02/13] feat: communicate initial load duration --- pageserver/src/bin/pageserver.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 097408cdbb4d..8396a5f4f284 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -342,6 +342,7 @@ fn start_pageserver( let init_done_rx = Arc::new(tokio::sync::Mutex::new(init_done_rx)); // Scan the local 'tenants/' directory and start loading the tenants + let init_started_at = std::time::Instant::now(); BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr( conf, broker_client.clone(), @@ -349,6 +350,20 @@ fn start_pageserver( init_done_tx, ))?; + BACKGROUND_RUNTIME.spawn({ + let init_done_rx = init_done_rx.clone(); + async move { + let init_done = async move { init_done_rx.lock().await.recv().await }; + + let elapsed = init_started_at.elapsed(); + + tracing::info!( + elapsed_millis = elapsed.as_millis(), + "Initial load completed." + ); + } + }); + // shared state between the disk-usage backed eviction background task and the http endpoint // that allows triggering disk-usage based eviction manually. note that the http endpoint // is still accessible even if background task is not configured as long as remote storage has From 9ea62902aceb763c3887d98c7d0f3153cfe15a10 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 29 May 2023 14:20:14 +0300 Subject: [PATCH 03/13] doc: better explanations --- pageserver/src/bin/pageserver.rs | 6 +++++- pageserver/src/disk_usage_eviction_task.rs | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 8396a5f4f284..4dbe8ee40003 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -429,7 +429,11 @@ fn start_pageserver( "consumption metrics collection", true, async move { - // first wait for initial load to complete before first iteration + // first wait for initial load to complete before first iteration. + // + // this is because we only process active tenants and timelines, and the + // Timeline::get_current_logical_size will spawn the logical size calculation, + // which will not be rate-limited. let init_done = async move { init_done_rx.lock().await.recv().await }; init_done.await; diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 3411d6a1a4e3..03589691994a 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -99,7 +99,7 @@ pub fn launch_disk_usage_global_eviction_task( "disk usage based eviction", false, async move { - // wait until initial load is complete, because we cannot evict if there are no tenants + // wait until initial load is complete, because we cannot evict from loading tenants. let init_done = async move { init_done_rx.lock().await.recv().await }; init_done.await; From b5bda5b3200760357cf536a3e97422bc7d0d6d2d Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 29 May 2023 14:27:22 +0300 Subject: [PATCH 04/13] fix: avoid connecting to safekeepers without any new wal --- .../src/tenant/timeline/walreceiver/connection_manager.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 6b65e1fd429c..70b3ed1aa270 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -715,8 +715,14 @@ impl ConnectionManagerState { &self, node_to_omit: Option, ) -> Option<(NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> { + let currently_at = self.timeline.get_last_record_lsn(); self.applicable_connection_candidates() .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit) + .filter(|&(sk_id, info, _)| { + // avoid connecting until there is something to download, which will come through + // broker message + Lsn(info.commit_lsn) > currently_at + }) .max_by_key(|(_, info, _)| info.commit_lsn) } From 269224176da9b16646104f9a6c39650a48ee46a3 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 29 May 2023 14:59:55 +0300 Subject: [PATCH 05/13] fixup: do the actual awaiting before reporting --- pageserver/src/bin/pageserver.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 4dbe8ee40003..b613740c8061 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -354,6 +354,7 @@ fn start_pageserver( let init_done_rx = init_done_rx.clone(); async move { let init_done = async move { init_done_rx.lock().await.recv().await }; + init_done.await; let elapsed = init_started_at.elapsed(); From b7c7a4ef892715cd45be44b9e3bb5defbd609483 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 29 May 2023 15:00:08 +0300 Subject: [PATCH 06/13] chore: remove unused binding --- .../src/tenant/timeline/walreceiver/connection_manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 70b3ed1aa270..7e4a9b853f0c 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -718,7 +718,7 @@ impl ConnectionManagerState { let currently_at = self.timeline.get_last_record_lsn(); self.applicable_connection_candidates() .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit) - .filter(|&(sk_id, info, _)| { + .filter(|&(_, info, _)| { // avoid connecting until there is something to download, which will come through // broker message Lsn(info.commit_lsn) > currently_at From 756517410f8d3333ee7708593df411993bb625ba Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 29 May 2023 15:21:22 +0300 Subject: [PATCH 07/13] chore: extra clone --- pageserver/src/bin/pageserver.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index b613740c8061..cbc97e722816 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -414,7 +414,7 @@ fn start_pageserver( ); if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint { - let init_done_rx = init_done_rx.clone(); + let init_done_rx = init_done_rx; let metrics_ctx = RequestContext::todo_child( TaskKind::MetricsCollection, // This task itself shouldn't download anything. From d64d317c729f8671b7e17f0398e965eac97f23a7 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 29 May 2023 15:45:52 +0300 Subject: [PATCH 08/13] test: expect no connection in test this is rather big change from always eager connections. --- test_runner/regress/test_pageserver_api.py | 38 ++++++++++++++-------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/test_runner/regress/test_pageserver_api.py b/test_runner/regress/test_pageserver_api.py index 28732872df58..785cc7d154a2 100644 --- a/test_runner/regress/test_pageserver_api.py +++ b/test_runner/regress/test_pageserver_api.py @@ -6,7 +6,9 @@ DEFAULT_BRANCH_NAME, NeonEnv, NeonEnvBuilder, + wait_for_last_flush_lsn, ) +from fixtures.log_helper import log from fixtures.pageserver.http import PageserverHttpClient from fixtures.pg_version import PgVersion from fixtures.types import Lsn, TenantId, TimelineId @@ -126,7 +128,7 @@ def expect_updated_msg_lsn( tenant_id: TenantId, timeline_id: TimelineId, prev_msg_lsn: Optional[Lsn], -) -> Lsn: +) -> Optional[Lsn]: timeline_details = client.timeline_detail(tenant_id, timeline_id=timeline_id) # a successful `timeline_details` response must contain the below fields @@ -134,17 +136,22 @@ def expect_updated_msg_lsn( assert "last_received_msg_lsn" in timeline_details.keys() assert "last_received_msg_ts" in timeline_details.keys() - assert ( - timeline_details["last_received_msg_lsn"] is not None - ), "the last received message's LSN is empty" - - last_msg_lsn = Lsn(timeline_details["last_received_msg_lsn"]) - assert ( - prev_msg_lsn is None or prev_msg_lsn < last_msg_lsn - ), f"the last received message's LSN {last_msg_lsn} hasn't been updated \ - compared to the previous message's LSN {prev_msg_lsn}" + log.info("wal_source_connstr: %s" % (timeline_details["wal_source_connstr"],)) + log.info("last_received_msg_lsn: %s" % (timeline_details["last_received_msg_lsn"],)) + log.info("last_received_msg_ts: %s" % (timeline_details["last_received_msg_ts"],)) - return last_msg_lsn + if prev_msg_lsn is None: + assert ( + timeline_details["last_received_msg_lsn"] is None + ), "the last received message's LSN is empty" + return None + else: + last_msg_lsn = Lsn(timeline_details["last_received_msg_lsn"]) + assert ( + prev_msg_lsn < last_msg_lsn + ), f"the last received message's LSN {last_msg_lsn} hasn't been updated \ + compared to the previous message's LSN {prev_msg_lsn}" + return last_msg_lsn # Test the WAL-receiver related fields in the response to `timeline_details` API call @@ -161,19 +168,22 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv): # We need to wait here because it's possible that we don't have access to # the latest WAL yet, when the `timeline_detail` API is first called. # See: https://github.com/neondatabase/neon/issues/1768. - lsn = wait_until( + wait_until( number_of_iterations=5, interval=1, func=lambda: expect_updated_msg_lsn(client, tenant_id, timeline_id, None), ) + initdb_lsn = wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) # Make a DB modification then expect getting a new WAL receiver's data. endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)") - wait_until( + flush_lsn = wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) + later_lsn = wait_until( number_of_iterations=5, interval=1, - func=lambda: expect_updated_msg_lsn(client, tenant_id, timeline_id, lsn), + func=lambda: expect_updated_msg_lsn(client, tenant_id, timeline_id, initdb_lsn), ) + assert flush_lsn == later_lsn def test_pageserver_http_api_client(neon_simple_env: NeonEnv): From f2604d374952a25ebd3c27f6e7aa1ea821ea4a18 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 29 May 2023 16:24:58 +0300 Subject: [PATCH 09/13] chore: sort imports --- test_runner/regress/test_pageserver_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test_runner/regress/test_pageserver_api.py b/test_runner/regress/test_pageserver_api.py index 785cc7d154a2..c3da1215d65a 100644 --- a/test_runner/regress/test_pageserver_api.py +++ b/test_runner/regress/test_pageserver_api.py @@ -2,13 +2,13 @@ from pathlib import Path from typing import Optional +from fixtures.log_helper import log from fixtures.neon_fixtures import ( DEFAULT_BRANCH_NAME, NeonEnv, NeonEnvBuilder, wait_for_last_flush_lsn, ) -from fixtures.log_helper import log from fixtures.pageserver.http import PageserverHttpClient from fixtures.pg_version import PgVersion from fixtures.types import Lsn, TenantId, TimelineId From a80bf2fcccc506ffade5d8bf4eb47d8e4243a254 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 29 May 2023 17:08:42 +0300 Subject: [PATCH 10/13] revert: walreceiver modifications --- .../walreceiver/connection_manager.rs | 6 --- test_runner/regress/test_pageserver_api.py | 37 +++++++------------ 2 files changed, 14 insertions(+), 29 deletions(-) diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 7e4a9b853f0c..6b65e1fd429c 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -715,14 +715,8 @@ impl ConnectionManagerState { &self, node_to_omit: Option, ) -> Option<(NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> { - let currently_at = self.timeline.get_last_record_lsn(); self.applicable_connection_candidates() .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit) - .filter(|&(_, info, _)| { - // avoid connecting until there is something to download, which will come through - // broker message - Lsn(info.commit_lsn) > currently_at - }) .max_by_key(|(_, info, _)| info.commit_lsn) } diff --git a/test_runner/regress/test_pageserver_api.py b/test_runner/regress/test_pageserver_api.py index c3da1215d65a..db82c3898566 100644 --- a/test_runner/regress/test_pageserver_api.py +++ b/test_runner/regress/test_pageserver_api.py @@ -7,7 +7,6 @@ DEFAULT_BRANCH_NAME, NeonEnv, NeonEnvBuilder, - wait_for_last_flush_lsn, ) from fixtures.pageserver.http import PageserverHttpClient from fixtures.pg_version import PgVersion @@ -128,7 +127,7 @@ def expect_updated_msg_lsn( tenant_id: TenantId, timeline_id: TimelineId, prev_msg_lsn: Optional[Lsn], -) -> Optional[Lsn]: +) -> Lsn: timeline_details = client.timeline_detail(tenant_id, timeline_id=timeline_id) # a successful `timeline_details` response must contain the below fields @@ -136,22 +135,17 @@ def expect_updated_msg_lsn( assert "last_received_msg_lsn" in timeline_details.keys() assert "last_received_msg_ts" in timeline_details.keys() - log.info("wal_source_connstr: %s" % (timeline_details["wal_source_connstr"],)) - log.info("last_received_msg_lsn: %s" % (timeline_details["last_received_msg_lsn"],)) - log.info("last_received_msg_ts: %s" % (timeline_details["last_received_msg_ts"],)) + assert ( + timeline_details["last_received_msg_lsn"] is not None + ), "the last received message's LSN is empty" - if prev_msg_lsn is None: - assert ( - timeline_details["last_received_msg_lsn"] is None - ), "the last received message's LSN is empty" - return None - else: - last_msg_lsn = Lsn(timeline_details["last_received_msg_lsn"]) - assert ( - prev_msg_lsn < last_msg_lsn - ), f"the last received message's LSN {last_msg_lsn} hasn't been updated \ - compared to the previous message's LSN {prev_msg_lsn}" - return last_msg_lsn + last_msg_lsn = Lsn(timeline_details["last_received_msg_lsn"]) + assert ( + prev_msg_lsn is None or prev_msg_lsn < last_msg_lsn + ), f"the last received message's LSN {last_msg_lsn} hasn't been updated \ + compared to the previous message's LSN {prev_msg_lsn}" + + return last_msg_lsn # Test the WAL-receiver related fields in the response to `timeline_details` API call @@ -168,22 +162,19 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv): # We need to wait here because it's possible that we don't have access to # the latest WAL yet, when the `timeline_detail` API is first called. # See: https://github.com/neondatabase/neon/issues/1768. - wait_until( + lsn = wait_until( number_of_iterations=5, interval=1, func=lambda: expect_updated_msg_lsn(client, tenant_id, timeline_id, None), ) - initdb_lsn = wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) # Make a DB modification then expect getting a new WAL receiver's data. endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)") - flush_lsn = wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) - later_lsn = wait_until( + wait_until( number_of_iterations=5, interval=1, - func=lambda: expect_updated_msg_lsn(client, tenant_id, timeline_id, initdb_lsn), + func=lambda: expect_updated_msg_lsn(client, tenant_id, timeline_id, lsn), ) - assert flush_lsn == later_lsn def test_pageserver_http_api_client(neon_simple_env: NeonEnv): From f174628714793b657ca8df7ec74525ab1b7d98c0 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 29 May 2023 17:21:49 +0300 Subject: [PATCH 11/13] hack: limit ongoing init logical size requests --- pageserver/src/tenant/timeline.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 9dd5352a540c..a4dcf56f345a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1910,6 +1910,17 @@ impl Timeline { // no cancellation here, because nothing really waits for this to complete compared // to spawn_ondemand_logical_size_calculation. let cancel = CancellationToken::new(); + + /// Ugly, but necessary until `spawn_blocking` is used for blocking I/O, otherwise + /// we could lock up all worker threads. + static GLOBAL_INITIAL_LOGICAL_SIZES_AT_ONCE: once_cell::sync::Lazy> = once_cell::sync::Lazy::new(|| { + let cores = std::thread::available_parallelism(); + let max_blocked_threads = cores.map(|count| count.get() / 2); + Arc::new(tokio::sync::Semaphore::new(max_blocked_threads.unwrap_or(1).min(1))) + }); + + let _permit = GLOBAL_INITIAL_LOGICAL_SIZES_AT_ONCE.clone().acquire_owned().await.expect("global semaphore is never closed"); + let calculated_size = match self_clone .logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel) .await From a3be8ad3a8f8ec1e631db95aca39639ec561bcbe Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 29 May 2023 20:27:53 +0300 Subject: [PATCH 12/13] chore: python again --- test_runner/regress/test_pageserver_api.py | 1 - 1 file changed, 1 deletion(-) diff --git a/test_runner/regress/test_pageserver_api.py b/test_runner/regress/test_pageserver_api.py index db82c3898566..28732872df58 100644 --- a/test_runner/regress/test_pageserver_api.py +++ b/test_runner/regress/test_pageserver_api.py @@ -2,7 +2,6 @@ from pathlib import Path from typing import Optional -from fixtures.log_helper import log from fixtures.neon_fixtures import ( DEFAULT_BRANCH_NAME, NeonEnv, From 793536a87932e725a77fa6511f7852a436660297 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 29 May 2023 20:34:08 +0300 Subject: [PATCH 13/13] fix: failure with min, and docs --- pageserver/src/tenant/timeline.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a4dcf56f345a..0569bd45e0ce 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1915,8 +1915,14 @@ impl Timeline { /// we could lock up all worker threads. static GLOBAL_INITIAL_LOGICAL_SIZES_AT_ONCE: once_cell::sync::Lazy> = once_cell::sync::Lazy::new(|| { let cores = std::thread::available_parallelism(); + // half rationale: we have other blocking work which will start later: + // consumption metrics and per timeline eviction task. we however need to + // be fast to accept page reads, so perhaps this is a suitable middle ground? let max_blocked_threads = cores.map(|count| count.get() / 2); - Arc::new(tokio::sync::Semaphore::new(max_blocked_threads.unwrap_or(1).min(1))) + let max_blocked_threads = max_blocked_threads.unwrap_or(1); + let max_blocked_threads = std::cmp::max(1, max_blocked_threads); + tracing::info!("using max {max_blocked_threads} threads for initial logical size"); + Arc::new(tokio::sync::Semaphore::new(max_blocked_threads)) }); let _permit = GLOBAL_INITIAL_LOGICAL_SIZES_AT_ONCE.clone().acquire_owned().await.expect("global semaphore is never closed");