Skip to content

Commit

Permalink
refactor: just one way to shutdown a tenant (#4407)
Browse files Browse the repository at this point in the history
We have 2 ways of tenant shutdown, we should have just one.

Changes are mostly mechanical simple refactorings.

Added `warn!` on the "shutdown all remaining tasks" should trigger test
failures in the between time of not having solved the "tenant/timeline
owns all spawned tasks" issue.

Cc: #4327.
  • Loading branch information
koivunej authored and awestover committed Jun 14, 2023
1 parent 8c67c60 commit 316142a
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 122 deletions.
6 changes: 0 additions & 6 deletions pageserver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,6 @@ pub async fn shutdown_pageserver(exit_code: i32) {
// the checkpoint and GC tasks.
tenant::mgr::shutdown_all_tenants().await;

// Stop syncing with remote storage.
//
// FIXME: Does this wait for the sync tasks to finish syncing what's queued up?
// Should it?
task_mgr::shutdown_tasks(Some(TaskKind::RemoteUploadTask), None, None).await;

// Shut down the HTTP endpoint last, so that you can still check the server's
// status while it's shutting down.
// FIXME: We should probably stop accepting commands like attach/detach earlier.
Expand Down
21 changes: 19 additions & 2 deletions pageserver/src/task_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,18 +476,35 @@ pub async fn shutdown_tasks(
&& (timeline_id.is_none() || task_mut.timeline_id == timeline_id)
{
task.cancel.cancel();
victim_tasks.push(Arc::clone(task));
victim_tasks.push((
Arc::clone(task),
task.kind,
task_mut.tenant_id,
task_mut.timeline_id,
));
}
}
}

for task in victim_tasks {
let log_all = kind.is_none() && tenant_id.is_none() && timeline_id.is_none();

for (task, task_kind, tenant_id, timeline_id) in victim_tasks {
let join_handle = {
let mut task_mut = task.mutable.lock().unwrap();
task_mut.join_handle.take()
};
if let Some(mut join_handle) = join_handle {
if log_all {
if tenant_id.is_none() {
// there are quite few of these
info!(name = task.name, kind = ?task_kind, "stopping global task");
} else {
// warn to catch these in tests; there shouldn't be any
warn!(name = task.name, tenant_id = ?tenant_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
}
}
let completed = tokio::select! {
biased;
_ = &mut join_handle => { true },
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
// allow some time to elapse before logging to cut down the number of log
Expand Down
127 changes: 112 additions & 15 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,10 @@ impl std::fmt::Display for WaitToBecomeActiveError {
}
}

pub(crate) enum ShutdownError {
AlreadyStopping,
}

impl Tenant {
/// Yet another helper for timeline initialization.
/// Contains the common part of `load_local_timeline` and `load_remote_timeline`.
Expand Down Expand Up @@ -1439,28 +1443,63 @@ impl Tenant {
Ok(())
}

/// Flush all in-memory data to disk.
/// Flush all in-memory data to disk and remote storage, if any.
///
/// Used at graceful shutdown.
///
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// flushing. We don't want to block everything else while the
// flushing is performed.
let timelines_to_flush = {
async fn freeze_and_flush_on_shutdown(&self) {
let mut js = tokio::task::JoinSet::new();

// execute on each timeline on the JoinSet, join after.
let per_timeline = |timeline_id: TimelineId, timeline: Arc<Timeline>| {
async move {
debug_assert_current_span_has_tenant_and_timeline_id();

match timeline.freeze_and_flush().await {
Ok(()) => {}
Err(e) => {
warn!("failed to freeze and flush: {e:#}");
return;
}
}

let res = if let Some(client) = timeline.remote_client.as_ref() {
// if we did not wait for completion here, it might be our shutdown process
// didn't wait for remote uploads to complete at all, as new tasks can forever
// be spawned.
//
// what is problematic is the shutting down of RemoteTimelineClient, because
// obviously it does not make sense to stop while we wait for it, but what
// about corner cases like s3 suddenly hanging up?
client.wait_completion().await
} else {
Ok(())
};

if let Err(e) = res {
warn!("failed to await for frozen and flushed uploads: {e:#}");
}
}
.instrument(tracing::info_span!("freeze_and_flush_on_shutdown", %timeline_id))
};

{
let timelines = self.timelines.lock().unwrap();
timelines
.iter()
.map(|(_id, timeline)| Arc::clone(timeline))
.collect::<Vec<_>>()
.map(|(id, tl)| (*id, Arc::clone(tl)))
.for_each(|(timeline_id, timeline)| {
js.spawn(per_timeline(timeline_id, timeline));
})
};

for timeline in &timelines_to_flush {
timeline.freeze_and_flush().await?;
while let Some(res) = js.join_next().await {
match res {
Ok(()) => {}
Err(je) if je.is_cancelled() => unreachable!("no cancelling used"),
Err(je) if je.is_panic() => { /* logged already */ }
Err(je) => warn!("unexpected JoinError: {je:?}"),
}
}

Ok(())
}

/// Shuts down a timeline's tasks, removes its in-memory structures, and deletes its
Expand Down Expand Up @@ -1756,12 +1795,70 @@ impl Tenant {
}
}

/// Shutdown the tenant and join all of the spawned tasks.
///
/// The method caters for all use-cases:
/// - pageserver shutdown (freeze_and_flush == true)
/// - detach + ignore (freeze_and_flush == false)
///
/// This will attempt to shutdown even if tenant is broken.
pub(crate) async fn shutdown(&self, freeze_and_flush: bool) -> Result<(), ShutdownError> {
debug_assert_current_span_has_tenant_id();
// Set tenant (and its timlines) to Stoppping state.
//
// Since we can only transition into Stopping state after activation is complete,
// run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed.
//
// Transitioning tenants to Stopping state has a couple of non-obvious side effects:
// 1. Lock out any new requests to the tenants.
// 2. Signal cancellation to WAL receivers (we wait on it below).
// 3. Signal cancellation for other tenant background loops.
// 4. ???
//
// The waiting for the cancellation is not done uniformly.
// We certainly wait for WAL receivers to shut down.
// That is necessary so that no new data comes in before the freeze_and_flush.
// But the tenant background loops are joined-on in our caller.
// It's mesed up.
// we just ignore the failure to stop
match self.set_stopping().await {
Ok(()) => {}
Err(SetStoppingError::Broken) => {
// assume that this is acceptable
}
Err(SetStoppingError::AlreadyStopping) => return Err(ShutdownError::AlreadyStopping),
};

if freeze_and_flush {
// walreceiver has already began to shutdown with TenantState::Stopping, but we need to
// await for them to stop.
task_mgr::shutdown_tasks(
Some(TaskKind::WalReceiverManager),
Some(self.tenant_id),
None,
)
.await;

// this will wait for uploads to complete; in the past, it was done outside tenant
// shutdown in pageserver::shutdown_pageserver.
self.freeze_and_flush_on_shutdown().await;
}

// shutdown all tenant and timeline tasks: gc, compaction, page service
// No new tasks will be started for this tenant because it's in `Stopping` state.
//
// this will additionally shutdown and await all timeline tasks.
task_mgr::shutdown_tasks(None, Some(self.tenant_id), None).await;

Ok(())
}

/// Change tenant status to Stopping, to mark that it is being shut down.
///
/// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
///
/// This function is not cancel-safe!
pub async fn set_stopping(&self) -> Result<(), SetStoppingError> {
async fn set_stopping(&self) -> Result<(), SetStoppingError> {
let mut rx = self.state.subscribe();

// cannot stop before we're done activating, so wait out until we're done activating
Expand Down
123 changes: 28 additions & 95 deletions pageserver/src/tenant/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{
create_tenant_files, CreateTenantFilesMode, SetStoppingError, Tenant, TenantState,
};
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
use crate::IGNORED_TENANT_FILE_NAME;

use utils::completion;
Expand Down Expand Up @@ -255,53 +253,36 @@ pub async fn shutdown_all_tenants() {
tenants_clone
}
TenantsMap::ShuttingDown(_) => {
// TODO: it is possible that detach and shutdown happen at the same time. as a
// result, during shutdown we do not wait for detach.
error!("already shutting down, this function isn't supposed to be called more than once");
return;
}
}
};

// Set tenant (and its timlines) to Stoppping state.
//
// Since we can only transition into Stopping state after activation is complete,
// run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed.
//
// Transitioning tenants to Stopping state has a couple of non-obvious side effects:
// 1. Lock out any new requests to the tenants.
// 2. Signal cancellation to WAL receivers (we wait on it below).
// 3. Signal cancellation for other tenant background loops.
// 4. ???
//
// The waiting for the cancellation is not done uniformly.
// We certainly wait for WAL receivers to shut down.
// That is necessary so that no new data comes in before the freeze_and_flush.
// But the tenant background loops are joined-on in our caller.
// It's mesed up.
let mut join_set = JoinSet::new();
let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len());
for (tenant_id, tenant) in tenants_to_shut_down {
join_set.spawn(
async move {
match tenant.set_stopping().await {
let freeze_and_flush = true;

match tenant.shutdown(freeze_and_flush).await {
Ok(()) => debug!("tenant successfully stopped"),
Err(SetStoppingError::Broken) => {
info!("tenant is broken, so stopping failed, freeze_and_flush is likely going to make noise as well");
},
Err(SetStoppingError::AlreadyStopping) => {
// our task_mgr::shutdown_tasks are going to coalesce on that just fine
Err(super::ShutdownError::AlreadyStopping) => {
warn!("tenant was already shutting down")
}
}

tenant
}
.instrument(info_span!("set_stopping", %tenant_id)),
.instrument(info_span!("shutdown", %tenant_id)),
);
}

let mut panicked = 0;

while let Some(res) = join_set.join_next().await {
match res {
Ok(()) => {}
Err(join_error) if join_error.is_cancelled() => {
unreachable!("we are not cancelling any of the futures");
}
Expand All @@ -312,50 +293,11 @@ pub async fn shutdown_all_tenants() {
Err(join_error) => {
warn!("unknown kind of JoinError: {join_error}");
}
Ok(tenant) => tenants_to_freeze_and_flush.push(tenant),
}
}

if panicked > 0 {
warn!(panicked, "observed panicks while stopping tenants");
}

// Shut down all existing walreceiver connections and stop accepting the new ones.
task_mgr::shutdown_tasks(Some(TaskKind::WalReceiverManager), None, None).await;

// Ok, no background tasks running anymore. Flush any remaining data in
// memory to disk.
//
// We assume that any incoming connections that might request pages from
// the tenant have already been terminated by the caller, so there
// should be no more activity in any of the repositories.
//
// On error, log it but continue with the shutdown for other tenants.

let mut join_set = tokio::task::JoinSet::new();

for tenant in tenants_to_freeze_and_flush {
let tenant_id = tenant.tenant_id();

join_set.spawn(
async move {
if let Err(err) = tenant.freeze_and_flush().await {
warn!("Could not checkpoint tenant during shutdown: {err:?}");
}
}
.instrument(info_span!("freeze_and_flush", %tenant_id)),
);
}

while let Some(next) = join_set.join_next().await {
match next {
Ok(()) => {}
Err(join_error) if join_error.is_cancelled() => {
unreachable!("no cancelling")
}
Err(join_error) if join_error.is_panic() => { /* reported already */ }
Err(join_error) => warn!("unknown kind of JoinError: {join_error}"),
}
warn!(panicked, "observed panicks while shutting down tenants");
}
}

Expand Down Expand Up @@ -671,35 +613,26 @@ where
// The exclusive lock here ensures we don't miss the tenant state updates before trying another removal.
// tenant-wde cleanup operations may take some time (removing the entire tenant directory), we want to
// avoid holding the lock for the entire process.
{
let tenants_accessor = TENANTS.write().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => {
let tenant = Arc::clone(tenant);
// don't hold TENANTS lock while set_stopping waits for activation to finish
drop(tenants_accessor);
match tenant.set_stopping().await {
Ok(()) => {
// we won, continue stopping procedure
}
Err(SetStoppingError::Broken) => {
// continue the procedure, let's hope the closure can deal with broken tenants
}
Err(SetStoppingError::AlreadyStopping) => {
// the tenant is already stopping or broken, don't do anything
return Err(TenantStateError::IsStopping(tenant_id));
}
}
}
None => return Err(TenantStateError::NotFound(tenant_id)),
let tenant = {
TENANTS
.write()
.await
.get(&tenant_id)
.cloned()
.ok_or(TenantStateError::NotFound(tenant_id))?
};

let freeze_and_flush = false;

// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
// that we can continue safely to cleanup.
match tenant.shutdown(freeze_and_flush).await {
Ok(()) => {}
Err(super::ShutdownError::AlreadyStopping) => {
return Err(TenantStateError::IsStopping(tenant_id))
}
}

// shutdown all tenant and timeline tasks: gc, compaction, page service)
// No new tasks will be started for this tenant because it's in `Stopping` state.
// Hence, once we're done here, the `tenant_cleanup` callback can mutate tenant on-disk state freely.
task_mgr::shutdown_tasks(None, Some(tenant_id), None).await;

match tenant_cleanup
.await
.with_context(|| format!("Failed to run cleanup for tenant {tenant_id}"))
Expand Down
Loading

0 comments on commit 316142a

Please sign in to comment.