Skip to content

Commit

Permalink
Minor updates: (#2)
Browse files Browse the repository at this point in the history
* Differentiate deleted and absent statuses returned from console
  api
* Allow 1 and 2 IndexPart format versions instead of just 1
* Allow to skip validation phase that downloads every index part
  (SKIP_VALIDATION env var)
  • Loading branch information
LizardWizzard authored Jul 3, 2023
1 parent 4de5939 commit a1714ce
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 38 deletions.
4 changes: 3 additions & 1 deletion src/checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ impl BranchCheckStats {
}
}

const KNOWN_VERSIONS: &[usize] = &[1, 2];

async fn branch_cleanup_and_check_errors(
id: TenantTimelineId,
s3_root: &RootTarget,
Expand Down Expand Up @@ -196,7 +198,7 @@ async fn branch_cleanup_and_check_errors(
mut index_part,
mut s3_layers,
} => {
if index_part.version != 1 {
if !KNOWN_VERSIONS.contains(&index_part.version) {
branch_check_errors
.push(format!("index_part.json version: {}", index_part.version))
}
Expand Down
18 changes: 14 additions & 4 deletions src/delete_batch_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,12 @@ where
Ok(total_entries)
}

enum FetchResult<A> {
Found(A),
Deleted,
Absent,
}

async fn split_to_active_and_deleted_entries<I, A, F, Fut>(
new_entry_ids: Vec<I>,
find_active_entry: F,
Expand All @@ -291,7 +297,7 @@ where
I: std::fmt::Display + Send + Sync + 'static + Copy,
A: Send + 'static,
F: FnOnce(I) -> Fut + Send + Sync + 'static + Clone,
Fut: Future<Output = anyhow::Result<Option<A>>> + Send,
Fut: Future<Output = anyhow::Result<FetchResult<A>>> + Send,
{
let entries_total = new_entry_ids.len();
let mut check_tasks = JoinSet::new();
Expand Down Expand Up @@ -328,12 +334,16 @@ where
while let Some(task_result) = check_tasks.join_next().await {
let (entry_id, entry_data_fetch_result) = task_result.context("task join")?;
match entry_data_fetch_result.context("entry data fetch")? {
Some(active_entry) => {
FetchResult::Found(active_entry) => {
info!("Entry {entry_id} is alive, cannot delete");
active_entries.push(active_entry);
}
None => {
info!("Entry {entry_id} is either deleted or absent in the admin data, can safely delete");
FetchResult::Deleted => {
info!("Entry {entry_id} deleted in the admin data, can safely delete");
entries_to_delete.push(entry_id);
}
FetchResult::Absent => {
info!("Entry {entry_id} absent in the admin data, can safely delete");
entries_to_delete.push(entry_id);
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/delete_batch_producer/tenant_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tracing::info;

use crate::cloud_admin_api::{CloudAdminApiClient, ProjectData};
use crate::copied_definitions::id::TenantTimelineId;
use crate::delete_batch_producer::FetchResult;
use crate::{RootTarget, TenantId, TraversingDepth};

use super::ProcessedS3List;
Expand Down Expand Up @@ -46,16 +47,16 @@ pub async fn schedule_cleanup_deleted_tenants(
Ok(if let Some(console_project) = project_data {
if console_project.deleted {
delete_sender.send(Either::Left(tenant_id)).ok();
None
FetchResult::Deleted
} else {
if traversing_depth == TraversingDepth::Timeline {
projects_to_check_sender.send(console_project.clone()).ok();
}
Some(console_project)
FetchResult::Found(console_project)
}
} else {
delete_sender.send(Either::Left(tenant_id)).ok();
None
FetchResult::Absent
})
})
.await
Expand Down
12 changes: 8 additions & 4 deletions src/delete_batch_producer/timeline_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tracing::{info, info_span, Instrument};

use crate::cloud_admin_api::{BranchData, CloudAdminApiClient, ProjectData};
use crate::copied_definitions::id::TenantTimelineId;
use crate::delete_batch_producer::ProcessedS3List;
use crate::delete_batch_producer::{FetchResult, ProcessedS3List};
use crate::{RootTarget, TenantId};

pub async fn schedule_cleanup_deleted_timelines(
Expand All @@ -32,9 +32,13 @@ pub async fn schedule_cleanup_deleted_timelines(
let mut timeline_stats = ProcessedS3List::default();
while let Some(project_to_check) = projects_to_check_receiver.recv().await {
let check_client = Arc::clone(admin_client);

let check_s3_client = Arc::clone(s3_client);

let check_delete_sender = Arc::clone(&delete_elements_sender);

let check_root = s3_root_target.clone();

let new_stats = async move {
let tenant_id_to_check = project_to_check.tenant;
let check_target = check_root.timelines_root(tenant_id_to_check);
Expand All @@ -60,14 +64,14 @@ pub async fn schedule_cleanup_deleted_timelines(
Some(console_branch) => {
if console_branch.deleted {
check_delete_sender.send(Either::Right(id)).ok();
None
FetchResult::Deleted
} else {
Some(console_branch)
FetchResult::Found(console_branch)
}
}
None => {
check_delete_sender.send(Either::Right(id)).ok();
None
FetchResult::Absent
}
})
},
Expand Down
63 changes: 37 additions & 26 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@ async fn main() -> anyhow::Result<()> {
let mut node_kind = env::var("NODE_KIND").context("'NODE_KIND' param retrieval")?;
node_kind.make_ascii_lowercase();

let _guard = init_logging(&binary_name, dry_run, &node_kind);
let skip_validation = env::var("SKIP_VALIDATION").is_ok();

let _guard = init_logging(&binary_name, dry_run, &node_kind);
let _main_span = info_span!("main", binary = %binary_name, %dry_run).entered();
if dry_run {
info!("Dry run, not removing items for real");
} else {
warn!("Dry run disabled, removing bucket items for real");
}

info!("skip_validation={skip_validation}");

let sso_account_id_param =
env::var("SSO_ACCOUNT_ID").context("'SSO_ACCOUNT_ID' param retrieval")?;
let region_param = env::var("REGION").context("'REGION' param retrieval")?;
Expand Down Expand Up @@ -131,33 +134,41 @@ async fn main() -> anyhow::Result<()> {
batch_producer_stats.timelines_checked()
);

if "pageserver" == node_kind.trim() {
info!("validating active tenants and timelines for pageserver S3 data");

// TODO kb real stats for validation + better stats for every place: add and print `min`, `max`, `mean` values at least
let validation_stats = checks::validate_pageserver_active_tenant_and_timelines(
s3_client,
s3_root,
cloud_admin_api_client,
batch_producer_stats,
)
.await
.context("active tenant and timeline validation")?;
info!("Finished active tenant and timeline validation, correct timelines: {}, timeline validation errors: {}",
validation_stats.normal_timelines.len(), validation_stats.timelines_with_errors.len());
if !validation_stats.timelines_with_errors.is_empty() {
warn!(
"Validation errors: {:#?}",
validation_stats
.timelines_with_errors
.into_iter()
.map(|(id, errors)| (id.to_string(), format!("{errors:?}")))
.collect::<HashMap<_, _>>()
);
}
if node_kind.trim() != "pageserver" {
info!("node_kind != pageserver, finish without performing validation step");
return Ok(());
}

if skip_validation {
info!("SKIP_VALIDATION env var is set, exiting");
return Ok(());
}

info!("validating active tenants and timelines for pageserver S3 data");

// TODO kb real stats for validation + better stats for every place: add and print `min`, `max`, `mean` values at least
let validation_stats = checks::validate_pageserver_active_tenant_and_timelines(
s3_client,
s3_root,
cloud_admin_api_client,
batch_producer_stats,
)
.await
.context("active tenant and timeline validation")?;
info!("Finished active tenant and timeline validation, correct timelines: {}, timeline validation errors: {}",
validation_stats.normal_timelines.len(), validation_stats.timelines_with_errors.len());
if !validation_stats.timelines_with_errors.is_empty() {
warn!(
"Validation errors: {:#?}",
validation_stats
.timelines_with_errors
.into_iter()
.map(|(id, errors)| (id.to_string(), format!("{errors:?}")))
.collect::<HashMap<_, _>>()
);
}

info!("Finished S3 removal");
info!("Done");

Ok(())
}

0 comments on commit a1714ce

Please sign in to comment.