diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index 2116ac8f004..ae3ba6cdcad 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -27,7 +27,7 @@ pub struct WalObjectStore { flush_buffer: Mutex, /// number of snapshotted wal files to retain in object store snapshotted_wal_files_to_keep: u64, - wal_remover: parking_lot::Mutex, + wal_remover: WalFileRemover, } impl WalObjectStore { @@ -108,10 +108,12 @@ impl WalObjectStore { ), )), snapshotted_wal_files_to_keep: num_wal_files_to_keep, - wal_remover: parking_lot::Mutex::new(WalFileRemover { - oldest_wal_file: oldest_wal_file_num, - last_snapshotted_wal_sequence_number: last_wal_sequence_number, - }), + wal_remover: WalFileRemover { + inner: parking_lot::Mutex::new(WalFileRemoverInner { + oldest_wal_file: oldest_wal_file_num, + last_snapshotted_wal_sequence_number: last_wal_sequence_number, + }), + }, } } @@ -397,7 +399,9 @@ impl WalObjectStore { snapshot_permit: OwnedSemaphorePermit, ) { let (oldest, last, curr_num_files) = { - let (oldest, last) = self.wal_remover.lock().get_current_state(); + let (oldest, last) = self + .wal_remover + .update_last_and_get_current_state(snapshot_details.last_wal_sequence_number); let curr_num_files = last - oldest; (oldest, last, curr_num_files) }; @@ -405,6 +409,7 @@ impl WalObjectStore { ?oldest, ?last, ?curr_num_files, + ?self.snapshotted_wal_files_to_keep, ">>> checking num wal files to delete" ); @@ -428,6 +433,15 @@ impl WalObjectStore { debug!(?path, ">>> deleting wal file"); loop { + // if there are errors in between we are changing oldest to + // last_to_delete, this could potentially leave dangling wal + // files that this running process won't be able to catch. + // On restart however that should clamp oldest back to first + // dangling wal file and continue. This is the only way to + // address these dangling wal files. + // + // TODO: Maybe need a separate issue to address any of those left + // over wal files? match self.object_store.delete(&path).await { Ok(_) => break, Err(object_store::Error::Generic { store, source }) => { @@ -445,28 +459,26 @@ impl WalObjectStore { } } - { - self.wal_remover.lock().update_state( - last_to_delete, - snapshot_details.last_wal_sequence_number.as_u64(), - ); - } - } else { - { - self.wal_remover - .lock() - .update_last_wal_num(snapshot_details.last_wal_sequence_number); - } + self.wal_remover.update_oldest_wal_num(last_to_delete); } - // release the permit so the next snapshot can be run when the time comes drop(snapshot_permit); } } fn oldest_wal_file_num(all_wal_file_paths: &[Path]) -> Option { - let file_name = all_wal_file_paths.first()?.filename()?; - WalFileSequenceNumber::from_str(file_name).ok() + let file_name_with_path = all_wal_file_paths.first()?.filename()?; + let wal_file_name = file_name_with_path + .split("/") + .last()? + .split(".wal") + .next()?; + debug!( + ?file_name_with_path, + ?wal_file_name, + ">>> file name path and wal file name" + ); + WalFileSequenceNumber::from_str(wal_file_name).ok() } async fn load_all_wal_file_paths( @@ -832,17 +844,40 @@ impl<'a> TryFrom<&'a Path> for WalFileSequenceNumber { #[derive(Debug)] struct WalFileRemover { + inner: parking_lot::Mutex, +} + +impl WalFileRemover { + #[cfg(test)] + fn get_current_state(&self) -> (u64, u64) { + self.inner.lock().get_current_state() + } + + fn update_oldest_wal_num(&self, oldest: u64) { + self.inner.lock().update_oldest_wal_num(oldest); + } + + fn update_last_and_get_current_state(&self, last_wal: WalFileSequenceNumber) -> (u64, u64) { + let mut inner = self.inner.lock(); + inner.update_last_wal_num(last_wal); + inner.get_current_state() + } +} + +#[derive(Debug)] +struct WalFileRemoverInner { oldest_wal_file: Option, last_snapshotted_wal_sequence_number: Option, } -impl WalFileRemover { +impl WalFileRemoverInner { fn get_current_state(&self) -> (u64, u64) { ( - self.oldest_wal_file.map(|num| num.as_u64()).unwrap_or(0), + // all wal files start from 1, not 0 + self.oldest_wal_file.map(|num| num.as_u64()).unwrap_or(1), self.last_snapshotted_wal_sequence_number .map(|num| num.as_u64()) - .unwrap_or(0), + .unwrap_or(1), ) } @@ -850,9 +885,8 @@ impl WalFileRemover { self.last_snapshotted_wal_sequence_number.replace(last_wal); } - fn update_state(&mut self, oldest: u64, last: u64) { + fn update_oldest_wal_num(&mut self, oldest: u64) { self.oldest_wal_file = Some(WalFileSequenceNumber::new(oldest)); - self.last_snapshotted_wal_sequence_number = Some(WalFileSequenceNumber::new(last)); } } @@ -1486,6 +1520,231 @@ mod tests { ); } + #[test_log::test(tokio::test)] + async fn test_wal_file_removal_after_snapshot() { + let time_provider: Arc = + Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + let object_store: Arc = Arc::new(InMemory::new()); + + let notifier: Arc = Arc::new(TestNotifier::default()); + let wal_config = WalConfig { + max_write_buffer_size: 100, + flush_interval: Duration::from_secs(1), + snapshot_size: 1, + gen1_duration: Gen1Duration::new_1m(), + }; + + // load some files into OS + let path1 = wal_path("my_host", WalFileSequenceNumber::new(1)); + let path2 = wal_path("my_host", WalFileSequenceNumber::new(2)); + let path3 = wal_path("my_host", WalFileSequenceNumber::new(3)); + object_store + .put(&path2, PutPayload::from_static(b"boo")) + .await + .unwrap(); + let all_paths = load_all_wal_file_paths(Arc::clone(&object_store), "my_host".to_string()) + .await + .unwrap(); + + debug!(?all_paths, ">>> test: all paths in object store"); + + let wal = WalObjectStore::new_without_replay( + Arc::clone(&time_provider), + Arc::clone(&object_store), + "my_host", + Arc::clone(¬ifier), + wal_config, + None, + None, + &[], + 1, + ); + + {} + let snapshot_details = SnapshotDetails { + snapshot_sequence_number: SnapshotSequenceNumber::new(1), + end_time_marker: 10, + // snapshot size is 1, so we just snapshotted to a single file + first_wal_sequence_number: WalFileSequenceNumber::new(1), + last_wal_sequence_number: WalFileSequenceNumber::new(1), + forced: false, + }; + // add that wal file to obj store + object_store + .put(&path1, PutPayload::from_static(b"boo")) + .await + .unwrap(); + + let snapshot_permit = Arc::new(Semaphore::new(1)).acquire_owned().await.unwrap(); + // this will not delete any files + wal.remove_snapshot_wal_files(snapshot_details, snapshot_permit) + .await; + + // now add another snapshot + let snapshot_details = SnapshotDetails { + snapshot_sequence_number: SnapshotSequenceNumber::new(2), + end_time_marker: 20, + first_wal_sequence_number: WalFileSequenceNumber::new(2), + last_wal_sequence_number: WalFileSequenceNumber::new(2), + forced: false, + }; + + // add that wal file to obj store + object_store + .put(&path2, PutPayload::from_static(b"boo")) + .await + .unwrap(); + + let snapshot_permit = Arc::new(Semaphore::new(1)).acquire_owned().await.unwrap(); + // this will not delete any files - we've added the 2nd file still no deletes + wal.remove_snapshot_wal_files(snapshot_details, snapshot_permit) + .await; + + let all_paths = load_all_wal_file_paths(Arc::clone(&object_store), "my_host".to_string()) + .await + .unwrap(); + + debug!( + ?all_paths, + ">>> test: all paths in object store after removal" + ); + + assert!(object_store.get(&path1).await.ok().is_some()); + assert!(object_store.get(&path2).await.ok().is_some()); + + // now add 3rd snapshot + let snapshot_details = SnapshotDetails { + snapshot_sequence_number: SnapshotSequenceNumber::new(3), + end_time_marker: 30, + // snapshot size is 1, so we just snapshotted to a single file + first_wal_sequence_number: WalFileSequenceNumber::new(3), + last_wal_sequence_number: WalFileSequenceNumber::new(3), + forced: false, + }; + + // add that wal file to obj store + object_store + .put(&path3, PutPayload::from_static(b"foo")) + .await + .unwrap(); + + let snapshot_permit = Arc::new(Semaphore::new(1)).acquire_owned().await.unwrap(); + wal.remove_snapshot_wal_files(snapshot_details, snapshot_permit) + .await; + + let all_paths = load_all_wal_file_paths(Arc::clone(&object_store), "my_host".to_string()) + .await + .unwrap(); + + debug!( + ?all_paths, + ">>> test: all paths in object store after removal" + ); + + let err = object_store.get(&path1).await.err().unwrap(); + + assert!(matches!( + err, + object_store::Error::NotFound { path: _, source: _ } + )); + assert!(object_store.get(&path2).await.ok().is_some()); + assert!(object_store.get(&path3).await.ok().is_some()); + } + + #[test_log::test(tokio::test)] + async fn test_wal_file_removal_after_snapshot_worked_out_example() { + // Say we snapshot every 5 files, and we always want to keep around at least 10 snapshotted files. + // Then say we're currently in this state + // oldest - 20 + // last_snapshot - 30 + // latest - 33 + // All good, keep running and we get two more wal files, then we'll get to the state of + // oldest - 20 + // last_snapshot - 30 + // latest - 35 + // When a snapshot gets triggered. When it's done, you'll have this state: + // oldest - 20 + // last_snapshot - 35 + // latest - (some number >=35 given we may have received more wal files while the snapshot was running) + // So now we kick off the deletion, which should end us in this state: + // oldest - 25 + // last_snapshot - 35 + // latest - >= 35 + let time_provider: Arc = + Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + let object_store: Arc = Arc::new(InMemory::new()); + + let notifier: Arc = Arc::new(TestNotifier::default()); + let wal_config = WalConfig { + max_write_buffer_size: 100, + flush_interval: Duration::from_secs(1), + snapshot_size: 1, + gen1_duration: Gen1Duration::new_1m(), + }; + + // load some files into OS + for i in 20..=35 { + let path = wal_path("my_host", WalFileSequenceNumber::new(i)); + object_store + .put(&path, PutPayload::from_static(b"boo")) + .await + .unwrap(); + } + let all_paths = load_all_wal_file_paths(Arc::clone(&object_store), "my_host".to_string()) + .await + .unwrap(); + + debug!(?all_paths, ">>> test: all paths in object store"); + + let wal = WalObjectStore::new_without_replay( + Arc::clone(&time_provider), + Arc::clone(&object_store), + "my_host", + Arc::clone(¬ifier), + wal_config, + Some(WalFileSequenceNumber::new(30)), + Some(SnapshotSequenceNumber::new(10)), + &all_paths, + 10, + ); + + let snapshot_details = SnapshotDetails { + snapshot_sequence_number: SnapshotSequenceNumber::new(20), + end_time_marker: 10, + first_wal_sequence_number: WalFileSequenceNumber::new(31), + last_wal_sequence_number: WalFileSequenceNumber::new(35), + forced: false, + }; + let snapshot_permit = Arc::new(Semaphore::new(1)).acquire_owned().await.unwrap(); + wal.remove_snapshot_wal_files(snapshot_details, snapshot_permit) + .await; + + let (oldest, last) = wal.wal_remover.get_current_state(); + assert_eq!(25, oldest); + assert_eq!(35, last); + + for i in 20..=24 { + let err = object_store + .get(&wal_path("my_host", WalFileSequenceNumber::new(i))) + .await + .err() + .unwrap(); + + assert!(matches!( + err, + object_store::Error::NotFound { path: _, source: _ } + )); + } + + for i in 25..35 { + let _ = object_store + .get(&wal_path("my_host", WalFileSequenceNumber::new(i))) + .await + .ok() + .unwrap(); + } + } + #[derive(Debug, Default)] struct TestNotifier { notified_writes: parking_lot::Mutex>>,