From 316142ada22b9a8535520708e3d97dc5532da252 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Tue, 6 Jun 2023 15:30:55 +0300 Subject: [PATCH] refactor: just one way to shutdown a tenant (#4407) We have 2 ways of tenant shutdown, we should have just one. Changes are mostly mechanical simple refactorings. Added `warn!` on the "shutdown all remaining tasks" should trigger test failures in the between time of not having solved the "tenant/timeline owns all spawned tasks" issue. Cc: #4327. --- pageserver/src/lib.rs | 6 - pageserver/src/task_mgr.rs | 21 +++- pageserver/src/tenant.rs | 127 +++++++++++++++++--- pageserver/src/tenant/mgr.rs | 123 +++++-------------- test_runner/fixtures/neon_fixtures.py | 3 + test_runner/regress/test_remote_storage.py | 6 +- test_runner/regress/test_timeline_delete.py | 3 +- 7 files changed, 167 insertions(+), 122 deletions(-) diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 776cf0dac1b4..40a672bee3fb 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -58,12 +58,6 @@ pub async fn shutdown_pageserver(exit_code: i32) { // the checkpoint and GC tasks. tenant::mgr::shutdown_all_tenants().await; - // Stop syncing with remote storage. - // - // FIXME: Does this wait for the sync tasks to finish syncing what's queued up? - // Should it? - task_mgr::shutdown_tasks(Some(TaskKind::RemoteUploadTask), None, None).await; - // Shut down the HTTP endpoint last, so that you can still check the server's // status while it's shutting down. // FIXME: We should probably stop accepting commands like attach/detach earlier. diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 82aebc6c07a1..4df0e4e6f22c 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -476,18 +476,35 @@ pub async fn shutdown_tasks( && (timeline_id.is_none() || task_mut.timeline_id == timeline_id) { task.cancel.cancel(); - victim_tasks.push(Arc::clone(task)); + victim_tasks.push(( + Arc::clone(task), + task.kind, + task_mut.tenant_id, + task_mut.timeline_id, + )); } } } - for task in victim_tasks { + let log_all = kind.is_none() && tenant_id.is_none() && timeline_id.is_none(); + + for (task, task_kind, tenant_id, timeline_id) in victim_tasks { let join_handle = { let mut task_mut = task.mutable.lock().unwrap(); task_mut.join_handle.take() }; if let Some(mut join_handle) = join_handle { + if log_all { + if tenant_id.is_none() { + // there are quite few of these + info!(name = task.name, kind = ?task_kind, "stopping global task"); + } else { + // warn to catch these in tests; there shouldn't be any + warn!(name = task.name, tenant_id = ?tenant_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over"); + } + } let completed = tokio::select! { + biased; _ = &mut join_handle => { true }, _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => { // allow some time to elapse before logging to cut down the number of log diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index bcf4495ac2e4..7ce0ed81bc31 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -486,6 +486,10 @@ impl std::fmt::Display for WaitToBecomeActiveError { } } +pub(crate) enum ShutdownError { + AlreadyStopping, +} + impl Tenant { /// Yet another helper for timeline initialization. /// Contains the common part of `load_local_timeline` and `load_remote_timeline`. @@ -1439,28 +1443,63 @@ impl Tenant { Ok(()) } - /// Flush all in-memory data to disk. + /// Flush all in-memory data to disk and remote storage, if any. /// /// Used at graceful shutdown. - /// - pub async fn freeze_and_flush(&self) -> anyhow::Result<()> { - // Scan through the hashmap and collect a list of all the timelines, - // while holding the lock. Then drop the lock and actually perform the - // flushing. We don't want to block everything else while the - // flushing is performed. - let timelines_to_flush = { + async fn freeze_and_flush_on_shutdown(&self) { + let mut js = tokio::task::JoinSet::new(); + + // execute on each timeline on the JoinSet, join after. + let per_timeline = |timeline_id: TimelineId, timeline: Arc| { + async move { + debug_assert_current_span_has_tenant_and_timeline_id(); + + match timeline.freeze_and_flush().await { + Ok(()) => {} + Err(e) => { + warn!("failed to freeze and flush: {e:#}"); + return; + } + } + + let res = if let Some(client) = timeline.remote_client.as_ref() { + // if we did not wait for completion here, it might be our shutdown process + // didn't wait for remote uploads to complete at all, as new tasks can forever + // be spawned. + // + // what is problematic is the shutting down of RemoteTimelineClient, because + // obviously it does not make sense to stop while we wait for it, but what + // about corner cases like s3 suddenly hanging up? + client.wait_completion().await + } else { + Ok(()) + }; + + if let Err(e) = res { + warn!("failed to await for frozen and flushed uploads: {e:#}"); + } + } + .instrument(tracing::info_span!("freeze_and_flush_on_shutdown", %timeline_id)) + }; + + { let timelines = self.timelines.lock().unwrap(); timelines .iter() - .map(|(_id, timeline)| Arc::clone(timeline)) - .collect::>() + .map(|(id, tl)| (*id, Arc::clone(tl))) + .for_each(|(timeline_id, timeline)| { + js.spawn(per_timeline(timeline_id, timeline)); + }) }; - for timeline in &timelines_to_flush { - timeline.freeze_and_flush().await?; + while let Some(res) = js.join_next().await { + match res { + Ok(()) => {} + Err(je) if je.is_cancelled() => unreachable!("no cancelling used"), + Err(je) if je.is_panic() => { /* logged already */ } + Err(je) => warn!("unexpected JoinError: {je:?}"), + } } - - Ok(()) } /// Shuts down a timeline's tasks, removes its in-memory structures, and deletes its @@ -1756,12 +1795,70 @@ impl Tenant { } } + /// Shutdown the tenant and join all of the spawned tasks. + /// + /// The method caters for all use-cases: + /// - pageserver shutdown (freeze_and_flush == true) + /// - detach + ignore (freeze_and_flush == false) + /// + /// This will attempt to shutdown even if tenant is broken. + pub(crate) async fn shutdown(&self, freeze_and_flush: bool) -> Result<(), ShutdownError> { + debug_assert_current_span_has_tenant_id(); + // Set tenant (and its timlines) to Stoppping state. + // + // Since we can only transition into Stopping state after activation is complete, + // run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed. + // + // Transitioning tenants to Stopping state has a couple of non-obvious side effects: + // 1. Lock out any new requests to the tenants. + // 2. Signal cancellation to WAL receivers (we wait on it below). + // 3. Signal cancellation for other tenant background loops. + // 4. ??? + // + // The waiting for the cancellation is not done uniformly. + // We certainly wait for WAL receivers to shut down. + // That is necessary so that no new data comes in before the freeze_and_flush. + // But the tenant background loops are joined-on in our caller. + // It's mesed up. + // we just ignore the failure to stop + match self.set_stopping().await { + Ok(()) => {} + Err(SetStoppingError::Broken) => { + // assume that this is acceptable + } + Err(SetStoppingError::AlreadyStopping) => return Err(ShutdownError::AlreadyStopping), + }; + + if freeze_and_flush { + // walreceiver has already began to shutdown with TenantState::Stopping, but we need to + // await for them to stop. + task_mgr::shutdown_tasks( + Some(TaskKind::WalReceiverManager), + Some(self.tenant_id), + None, + ) + .await; + + // this will wait for uploads to complete; in the past, it was done outside tenant + // shutdown in pageserver::shutdown_pageserver. + self.freeze_and_flush_on_shutdown().await; + } + + // shutdown all tenant and timeline tasks: gc, compaction, page service + // No new tasks will be started for this tenant because it's in `Stopping` state. + // + // this will additionally shutdown and await all timeline tasks. + task_mgr::shutdown_tasks(None, Some(self.tenant_id), None).await; + + Ok(()) + } + /// Change tenant status to Stopping, to mark that it is being shut down. /// /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state. /// /// This function is not cancel-safe! - pub async fn set_stopping(&self) -> Result<(), SetStoppingError> { + async fn set_stopping(&self) -> Result<(), SetStoppingError> { let mut rx = self.state.subscribe(); // cannot stop before we're done activating, so wait out until we're done activating diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 05874bdd72cf..740f9621b695 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -20,9 +20,7 @@ use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::{self, TaskKind}; use crate::tenant::config::TenantConfOpt; -use crate::tenant::{ - create_tenant_files, CreateTenantFilesMode, SetStoppingError, Tenant, TenantState, -}; +use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState}; use crate::IGNORED_TENANT_FILE_NAME; use utils::completion; @@ -255,46 +253,28 @@ pub async fn shutdown_all_tenants() { tenants_clone } TenantsMap::ShuttingDown(_) => { + // TODO: it is possible that detach and shutdown happen at the same time. as a + // result, during shutdown we do not wait for detach. error!("already shutting down, this function isn't supposed to be called more than once"); return; } } }; - // Set tenant (and its timlines) to Stoppping state. - // - // Since we can only transition into Stopping state after activation is complete, - // run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed. - // - // Transitioning tenants to Stopping state has a couple of non-obvious side effects: - // 1. Lock out any new requests to the tenants. - // 2. Signal cancellation to WAL receivers (we wait on it below). - // 3. Signal cancellation for other tenant background loops. - // 4. ??? - // - // The waiting for the cancellation is not done uniformly. - // We certainly wait for WAL receivers to shut down. - // That is necessary so that no new data comes in before the freeze_and_flush. - // But the tenant background loops are joined-on in our caller. - // It's mesed up. let mut join_set = JoinSet::new(); - let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len()); for (tenant_id, tenant) in tenants_to_shut_down { join_set.spawn( async move { - match tenant.set_stopping().await { + let freeze_and_flush = true; + + match tenant.shutdown(freeze_and_flush).await { Ok(()) => debug!("tenant successfully stopped"), - Err(SetStoppingError::Broken) => { - info!("tenant is broken, so stopping failed, freeze_and_flush is likely going to make noise as well"); - }, - Err(SetStoppingError::AlreadyStopping) => { - // our task_mgr::shutdown_tasks are going to coalesce on that just fine + Err(super::ShutdownError::AlreadyStopping) => { + warn!("tenant was already shutting down") } } - - tenant } - .instrument(info_span!("set_stopping", %tenant_id)), + .instrument(info_span!("shutdown", %tenant_id)), ); } @@ -302,6 +282,7 @@ pub async fn shutdown_all_tenants() { while let Some(res) = join_set.join_next().await { match res { + Ok(()) => {} Err(join_error) if join_error.is_cancelled() => { unreachable!("we are not cancelling any of the futures"); } @@ -312,50 +293,11 @@ pub async fn shutdown_all_tenants() { Err(join_error) => { warn!("unknown kind of JoinError: {join_error}"); } - Ok(tenant) => tenants_to_freeze_and_flush.push(tenant), } } if panicked > 0 { - warn!(panicked, "observed panicks while stopping tenants"); - } - - // Shut down all existing walreceiver connections and stop accepting the new ones. - task_mgr::shutdown_tasks(Some(TaskKind::WalReceiverManager), None, None).await; - - // Ok, no background tasks running anymore. Flush any remaining data in - // memory to disk. - // - // We assume that any incoming connections that might request pages from - // the tenant have already been terminated by the caller, so there - // should be no more activity in any of the repositories. - // - // On error, log it but continue with the shutdown for other tenants. - - let mut join_set = tokio::task::JoinSet::new(); - - for tenant in tenants_to_freeze_and_flush { - let tenant_id = tenant.tenant_id(); - - join_set.spawn( - async move { - if let Err(err) = tenant.freeze_and_flush().await { - warn!("Could not checkpoint tenant during shutdown: {err:?}"); - } - } - .instrument(info_span!("freeze_and_flush", %tenant_id)), - ); - } - - while let Some(next) = join_set.join_next().await { - match next { - Ok(()) => {} - Err(join_error) if join_error.is_cancelled() => { - unreachable!("no cancelling") - } - Err(join_error) if join_error.is_panic() => { /* reported already */ } - Err(join_error) => warn!("unknown kind of JoinError: {join_error}"), - } + warn!(panicked, "observed panicks while shutting down tenants"); } } @@ -671,35 +613,26 @@ where // The exclusive lock here ensures we don't miss the tenant state updates before trying another removal. // tenant-wde cleanup operations may take some time (removing the entire tenant directory), we want to // avoid holding the lock for the entire process. - { - let tenants_accessor = TENANTS.write().await; - match tenants_accessor.get(&tenant_id) { - Some(tenant) => { - let tenant = Arc::clone(tenant); - // don't hold TENANTS lock while set_stopping waits for activation to finish - drop(tenants_accessor); - match tenant.set_stopping().await { - Ok(()) => { - // we won, continue stopping procedure - } - Err(SetStoppingError::Broken) => { - // continue the procedure, let's hope the closure can deal with broken tenants - } - Err(SetStoppingError::AlreadyStopping) => { - // the tenant is already stopping or broken, don't do anything - return Err(TenantStateError::IsStopping(tenant_id)); - } - } - } - None => return Err(TenantStateError::NotFound(tenant_id)), + let tenant = { + TENANTS + .write() + .await + .get(&tenant_id) + .cloned() + .ok_or(TenantStateError::NotFound(tenant_id))? + }; + + let freeze_and_flush = false; + + // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so + // that we can continue safely to cleanup. + match tenant.shutdown(freeze_and_flush).await { + Ok(()) => {} + Err(super::ShutdownError::AlreadyStopping) => { + return Err(TenantStateError::IsStopping(tenant_id)) } } - // shutdown all tenant and timeline tasks: gc, compaction, page service) - // No new tasks will be started for this tenant because it's in `Stopping` state. - // Hence, once we're done here, the `tenant_cleanup` callback can mutate tenant on-disk state freely. - task_mgr::shutdown_tasks(None, Some(tenant_id), None).await; - match tenant_cleanup .await .with_context(|| format!("Failed to run cleanup for tenant {tenant_id}")) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 0c63fd126217..a810c367d808 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1689,6 +1689,9 @@ def assert_no_errors(self): else: errors.append(line) + for error in errors: + log.info(f"not allowed error: {error.strip()}") + assert not errors def log_contains(self, pattern: str) -> Optional[str]: diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index aefc8befeb4d..baef8ecacc34 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -693,15 +693,15 @@ def test_empty_branch_remote_storage_upload_on_restart( f".*POST.* path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing" ) - # index upload is now hitting the failpoint, should not block the shutdown - env.pageserver.stop() + # index upload is now hitting the failpoint, it should block the shutdown + env.pageserver.stop(immediate=True) timeline_path = ( Path("tenants") / str(env.initial_tenant) / "timelines" / str(new_branch_timeline_id) ) local_metadata = env.repo_dir / timeline_path / "metadata" - assert local_metadata.is_file(), "timeout cancelled timeline branching, not the upload" + assert local_metadata.is_file() assert isinstance(env.remote_storage, LocalFsStorage) new_branch_on_remote_storage = env.remote_storage.root / timeline_path diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index 1e15a8e7cb23..be79538843c0 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -271,8 +271,9 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild env.pageserver.allowed_errors.append( ".*Ignoring new state, equal to the existing one: Stopping" ) + # this happens, because the stuck timeline is visible to shutdown env.pageserver.allowed_errors.append( - ".*during shutdown: cannot flush frozen layers when flush_loop is not running, state is Exited" + ".*freeze_and_flush_on_shutdown.+: failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited" ) ps_http = env.pageserver.http_client()