Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

try: startup speedup #4366

Merged
merged 14 commits into from
May 29, 2023
32 changes: 32 additions & 0 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,35 @@ fn start_pageserver(
// Set up remote storage client
let remote_storage = create_remote_storage_client(conf)?;

// All tenant load operations carry this while they are ongoing; it will be dropped once those
// operations finish either successfully or in some other manner. However, the initial load
// will be then done, and we can start the global background tasks.
let (init_done_tx, init_done_rx) = tokio::sync::mpsc::channel::<()>(1);
let init_done_rx = Arc::new(tokio::sync::Mutex::new(init_done_rx));

// Scan the local 'tenants/' directory and start loading the tenants
let init_started_at = std::time::Instant::now();
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
broker_client.clone(),
remote_storage.clone(),
init_done_tx,
))?;

BACKGROUND_RUNTIME.spawn({
let init_done_rx = init_done_rx.clone();
async move {
let init_done = async move { init_done_rx.lock().await.recv().await };

let elapsed = init_started_at.elapsed();

tracing::info!(
elapsed_millis = elapsed.as_millis(),
"Initial load completed."
);
}
});

// shared state between the disk-usage backed eviction background task and the http endpoint
// that allows triggering disk-usage based eviction manually. note that the http endpoint
// is still accessible even if background task is not configured as long as remote storage has
Expand All @@ -353,6 +375,7 @@ fn start_pageserver(
conf,
remote_storage.clone(),
disk_usage_eviction_state.clone(),
init_done_rx.clone(),
)?;
}

Expand Down Expand Up @@ -390,6 +413,7 @@ fn start_pageserver(
);

if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
let init_done_rx = init_done_rx.clone();
let metrics_ctx = RequestContext::todo_child(
TaskKind::MetricsCollection,
// This task itself shouldn't download anything.
Expand All @@ -405,6 +429,14 @@ fn start_pageserver(
"consumption metrics collection",
true,
async move {
// first wait for initial load to complete before first iteration.
//
// this is because we only process active tenants and timelines, and the
// Timeline::get_current_logical_size will spawn the logical size calculation,
// which will not be rate-limited.
let init_done = async move { init_done_rx.lock().await.recv().await };
init_done.await;

pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint,
conf.metric_collection_interval,
Expand Down
5 changes: 5 additions & 0 deletions pageserver/src/disk_usage_eviction_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub fn launch_disk_usage_global_eviction_task(
conf: &'static PageServerConf,
storage: GenericRemoteStorage,
state: Arc<State>,
init_done_rx: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<()>>>,
) -> anyhow::Result<()> {
let Some(task_config) = &conf.disk_usage_based_eviction else {
info!("disk usage based eviction task not configured");
Expand All @@ -98,6 +99,10 @@ pub fn launch_disk_usage_global_eviction_task(
"disk usage based eviction",
false,
async move {
// wait until initial load is complete, because we cannot evict from loading tenants.
let init_done = async move { init_done_rx.lock().await.recv().await };
init_done.await;

disk_usage_eviction_task(
&state,
task_config,
Expand Down
4 changes: 4 additions & 0 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,7 @@ impl Tenant {
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done_tx: Option<tokio::sync::mpsc::Sender<()>>,
ctx: &RequestContext,
) -> Arc<Tenant> {
let tenant_conf = match Self::load_tenant_config(conf, tenant_id) {
Expand Down Expand Up @@ -920,6 +921,9 @@ impl Tenant {
"initial tenant load",
false,
async move {
// keep the sender alive as long as we have the initial load ongoing; it will be
// None for loads spawned after init_tenant_mgr.
let _init_done_tx = init_done_tx;
let doit = async {
tenant_clone.load(&ctx).await?;
tenant_clone.activate(broker_client, &ctx)?;
Expand Down
18 changes: 14 additions & 4 deletions pageserver/src/tenant/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done_tx: tokio::sync::mpsc::Sender<()>,
) -> anyhow::Result<()> {
// Scan local filesystem for attached tenants
let tenants_dir = conf.tenants_path();
Expand Down Expand Up @@ -119,6 +120,7 @@ pub async fn init_tenant_mgr(
&tenant_dir_path,
broker_client.clone(),
remote_storage.clone(),
Some(init_done_tx.clone()),
&ctx,
) {
Ok(tenant) => {
Expand Down Expand Up @@ -154,6 +156,7 @@ pub fn schedule_local_tenant_processing(
tenant_path: &Path,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done_tx: Option<tokio::sync::mpsc::Sender<()>>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
anyhow::ensure!(
Expand Down Expand Up @@ -207,7 +210,14 @@ pub fn schedule_local_tenant_processing(
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, ctx)
Tenant::spawn_load(
conf,
tenant_id,
broker_client,
remote_storage,
init_done_tx,
ctx,
)
};
Ok(tenant)
}
Expand Down Expand Up @@ -291,7 +301,7 @@ pub async fn create_tenant(
// See https://github.com/neondatabase/neon/issues/4233

let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, ctx)?;
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233

Expand Down Expand Up @@ -437,7 +447,7 @@ pub async fn load_tenant(
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
}

let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, ctx)
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, ctx)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
Expand Down Expand Up @@ -510,7 +520,7 @@ pub async fn attach_tenant(
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");

let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), ctx)?;
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,8 +715,14 @@ impl ConnectionManagerState {
&self,
node_to_omit: Option<NodeId>,
) -> Option<(NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> {
let currently_at = self.timeline.get_last_record_lsn();
self.applicable_connection_candidates()
.filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
.filter(|&(sk_id, info, _)| {
shanyp marked this conversation as resolved.
Show resolved Hide resolved
// avoid connecting until there is something to download, which will come through
// broker message
Lsn(info.commit_lsn) > currently_at
})
.max_by_key(|(_, info, _)| info.commit_lsn)
}

Expand Down