Skip to content

Commit

Permalink
refactor: update tests for wal file removal (#25846)
Browse files Browse the repository at this point in the history
* refactor: update tests for wal file removal

- update the last wal file seen first so that removal doesn't
  wait for one more cycle
- added the worked out example test
- minor tidy ups (introduce inner so that block scopes are delegated)

* refactor: address PR feedback
  • Loading branch information
praveen-influx authored Jan 16, 2025
1 parent b8a9448 commit 6ebbf26
Showing 1 changed file with 285 additions and 26 deletions.
311 changes: 285 additions & 26 deletions influxdb3_wal/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct WalObjectStore {
flush_buffer: Mutex<FlushBuffer>,
/// number of snapshotted wal files to retain in object store
snapshotted_wal_files_to_keep: u64,
wal_remover: parking_lot::Mutex<WalFileRemover>,
wal_remover: WalFileRemover,
}

impl WalObjectStore {
Expand Down Expand Up @@ -108,10 +108,12 @@ impl WalObjectStore {
),
)),
snapshotted_wal_files_to_keep: num_wal_files_to_keep,
wal_remover: parking_lot::Mutex::new(WalFileRemover {
oldest_wal_file: oldest_wal_file_num,
last_snapshotted_wal_sequence_number: last_wal_sequence_number,
}),
wal_remover: WalFileRemover {
inner: parking_lot::Mutex::new(WalFileRemoverInner {
oldest_wal_file: oldest_wal_file_num,
last_snapshotted_wal_sequence_number: last_wal_sequence_number,
}),
},
}
}

Expand Down Expand Up @@ -397,14 +399,17 @@ impl WalObjectStore {
snapshot_permit: OwnedSemaphorePermit,
) {
let (oldest, last, curr_num_files) = {
let (oldest, last) = self.wal_remover.lock().get_current_state();
let (oldest, last) = self
.wal_remover
.update_last_and_get_current_state(snapshot_details.last_wal_sequence_number);
let curr_num_files = last - oldest;
(oldest, last, curr_num_files)
};
debug!(
?oldest,
?last,
?curr_num_files,
?self.snapshotted_wal_files_to_keep,
">>> checking num wal files to delete"
);

Expand All @@ -428,6 +433,15 @@ impl WalObjectStore {
debug!(?path, ">>> deleting wal file");

loop {
// if there are errors in between we are changing oldest to
// last_to_delete, this could potentially leave dangling wal
// files that this running process won't be able to catch.
// On restart however that should clamp oldest back to first
// dangling wal file and continue. This is the only way to
// address these dangling wal files.
//
// TODO: Maybe need a separate issue to address any of those left
// over wal files?
match self.object_store.delete(&path).await {
Ok(_) => break,
Err(object_store::Error::Generic { store, source }) => {
Expand All @@ -445,28 +459,26 @@ impl WalObjectStore {
}
}

{
self.wal_remover.lock().update_state(
last_to_delete,
snapshot_details.last_wal_sequence_number.as_u64(),
);
}
} else {
{
self.wal_remover
.lock()
.update_last_wal_num(snapshot_details.last_wal_sequence_number);
}
self.wal_remover.update_oldest_wal_num(last_to_delete);
}

// release the permit so the next snapshot can be run when the time comes
drop(snapshot_permit);
}
}

fn oldest_wal_file_num(all_wal_file_paths: &[Path]) -> Option<WalFileSequenceNumber> {
let file_name = all_wal_file_paths.first()?.filename()?;
WalFileSequenceNumber::from_str(file_name).ok()
let file_name_with_path = all_wal_file_paths.first()?.filename()?;
let wal_file_name = file_name_with_path
.split("/")
.last()?
.split(".wal")
.next()?;
debug!(
?file_name_with_path,
?wal_file_name,
">>> file name path and wal file name"
);
WalFileSequenceNumber::from_str(wal_file_name).ok()
}

async fn load_all_wal_file_paths(
Expand Down Expand Up @@ -832,27 +844,49 @@ impl<'a> TryFrom<&'a Path> for WalFileSequenceNumber {

#[derive(Debug)]
struct WalFileRemover {
inner: parking_lot::Mutex<WalFileRemoverInner>,
}

impl WalFileRemover {
#[cfg(test)]
fn get_current_state(&self) -> (u64, u64) {
self.inner.lock().get_current_state()
}

fn update_oldest_wal_num(&self, oldest: u64) {
self.inner.lock().update_oldest_wal_num(oldest);
}

fn update_last_and_get_current_state(&self, last_wal: WalFileSequenceNumber) -> (u64, u64) {
let mut inner = self.inner.lock();
inner.update_last_wal_num(last_wal);
inner.get_current_state()
}
}

#[derive(Debug)]
struct WalFileRemoverInner {
oldest_wal_file: Option<WalFileSequenceNumber>,
last_snapshotted_wal_sequence_number: Option<WalFileSequenceNumber>,
}

impl WalFileRemover {
impl WalFileRemoverInner {
fn get_current_state(&self) -> (u64, u64) {
(
self.oldest_wal_file.map(|num| num.as_u64()).unwrap_or(0),
// all wal files start from 1, not 0
self.oldest_wal_file.map(|num| num.as_u64()).unwrap_or(1),
self.last_snapshotted_wal_sequence_number
.map(|num| num.as_u64())
.unwrap_or(0),
.unwrap_or(1),
)
}

fn update_last_wal_num(&mut self, last_wal: WalFileSequenceNumber) {
self.last_snapshotted_wal_sequence_number.replace(last_wal);
}

fn update_state(&mut self, oldest: u64, last: u64) {
fn update_oldest_wal_num(&mut self, oldest: u64) {
self.oldest_wal_file = Some(WalFileSequenceNumber::new(oldest));
self.last_snapshotted_wal_sequence_number = Some(WalFileSequenceNumber::new(last));
}
}

Expand Down Expand Up @@ -1486,6 +1520,231 @@ mod tests {
);
}

#[test_log::test(tokio::test)]
async fn test_wal_file_removal_after_snapshot() {
let time_provider: Arc<dyn TimeProvider> =
Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());

let notifier: Arc<dyn WalFileNotifier> = Arc::new(TestNotifier::default());
let wal_config = WalConfig {
max_write_buffer_size: 100,
flush_interval: Duration::from_secs(1),
snapshot_size: 1,
gen1_duration: Gen1Duration::new_1m(),
};

// load some files into OS
let path1 = wal_path("my_host", WalFileSequenceNumber::new(1));
let path2 = wal_path("my_host", WalFileSequenceNumber::new(2));
let path3 = wal_path("my_host", WalFileSequenceNumber::new(3));
object_store
.put(&path2, PutPayload::from_static(b"boo"))
.await
.unwrap();
let all_paths = load_all_wal_file_paths(Arc::clone(&object_store), "my_host".to_string())
.await
.unwrap();

debug!(?all_paths, ">>> test: all paths in object store");

let wal = WalObjectStore::new_without_replay(
Arc::clone(&time_provider),
Arc::clone(&object_store),
"my_host",
Arc::clone(&notifier),
wal_config,
None,
None,
&[],
1,
);

{}
let snapshot_details = SnapshotDetails {
snapshot_sequence_number: SnapshotSequenceNumber::new(1),
end_time_marker: 10,
// snapshot size is 1, so we just snapshotted to a single file
first_wal_sequence_number: WalFileSequenceNumber::new(1),
last_wal_sequence_number: WalFileSequenceNumber::new(1),
forced: false,
};
// add that wal file to obj store
object_store
.put(&path1, PutPayload::from_static(b"boo"))
.await
.unwrap();

let snapshot_permit = Arc::new(Semaphore::new(1)).acquire_owned().await.unwrap();
// this will not delete any files
wal.remove_snapshot_wal_files(snapshot_details, snapshot_permit)
.await;

// now add another snapshot
let snapshot_details = SnapshotDetails {
snapshot_sequence_number: SnapshotSequenceNumber::new(2),
end_time_marker: 20,
first_wal_sequence_number: WalFileSequenceNumber::new(2),
last_wal_sequence_number: WalFileSequenceNumber::new(2),
forced: false,
};

// add that wal file to obj store
object_store
.put(&path2, PutPayload::from_static(b"boo"))
.await
.unwrap();

let snapshot_permit = Arc::new(Semaphore::new(1)).acquire_owned().await.unwrap();
// this will not delete any files - we've added the 2nd file still no deletes
wal.remove_snapshot_wal_files(snapshot_details, snapshot_permit)
.await;

let all_paths = load_all_wal_file_paths(Arc::clone(&object_store), "my_host".to_string())
.await
.unwrap();

debug!(
?all_paths,
">>> test: all paths in object store after removal"
);

assert!(object_store.get(&path1).await.ok().is_some());
assert!(object_store.get(&path2).await.ok().is_some());

// now add 3rd snapshot
let snapshot_details = SnapshotDetails {
snapshot_sequence_number: SnapshotSequenceNumber::new(3),
end_time_marker: 30,
// snapshot size is 1, so we just snapshotted to a single file
first_wal_sequence_number: WalFileSequenceNumber::new(3),
last_wal_sequence_number: WalFileSequenceNumber::new(3),
forced: false,
};

// add that wal file to obj store
object_store
.put(&path3, PutPayload::from_static(b"foo"))
.await
.unwrap();

let snapshot_permit = Arc::new(Semaphore::new(1)).acquire_owned().await.unwrap();
wal.remove_snapshot_wal_files(snapshot_details, snapshot_permit)
.await;

let all_paths = load_all_wal_file_paths(Arc::clone(&object_store), "my_host".to_string())
.await
.unwrap();

debug!(
?all_paths,
">>> test: all paths in object store after removal"
);

let err = object_store.get(&path1).await.err().unwrap();

assert!(matches!(
err,
object_store::Error::NotFound { path: _, source: _ }
));
assert!(object_store.get(&path2).await.ok().is_some());
assert!(object_store.get(&path3).await.ok().is_some());
}

#[test_log::test(tokio::test)]
async fn test_wal_file_removal_after_snapshot_worked_out_example() {
// Say we snapshot every 5 files, and we always want to keep around at least 10 snapshotted files.
// Then say we're currently in this state
// oldest - 20
// last_snapshot - 30
// latest - 33
// All good, keep running and we get two more wal files, then we'll get to the state of
// oldest - 20
// last_snapshot - 30
// latest - 35
// When a snapshot gets triggered. When it's done, you'll have this state:
// oldest - 20
// last_snapshot - 35
// latest - (some number >=35 given we may have received more wal files while the snapshot was running)
// So now we kick off the deletion, which should end us in this state:
// oldest - 25
// last_snapshot - 35
// latest - >= 35
let time_provider: Arc<dyn TimeProvider> =
Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());

let notifier: Arc<dyn WalFileNotifier> = Arc::new(TestNotifier::default());
let wal_config = WalConfig {
max_write_buffer_size: 100,
flush_interval: Duration::from_secs(1),
snapshot_size: 1,
gen1_duration: Gen1Duration::new_1m(),
};

// load some files into OS
for i in 20..=35 {
let path = wal_path("my_host", WalFileSequenceNumber::new(i));
object_store
.put(&path, PutPayload::from_static(b"boo"))
.await
.unwrap();
}
let all_paths = load_all_wal_file_paths(Arc::clone(&object_store), "my_host".to_string())
.await
.unwrap();

debug!(?all_paths, ">>> test: all paths in object store");

let wal = WalObjectStore::new_without_replay(
Arc::clone(&time_provider),
Arc::clone(&object_store),
"my_host",
Arc::clone(&notifier),
wal_config,
Some(WalFileSequenceNumber::new(30)),
Some(SnapshotSequenceNumber::new(10)),
&all_paths,
10,
);

let snapshot_details = SnapshotDetails {
snapshot_sequence_number: SnapshotSequenceNumber::new(20),
end_time_marker: 10,
first_wal_sequence_number: WalFileSequenceNumber::new(31),
last_wal_sequence_number: WalFileSequenceNumber::new(35),
forced: false,
};
let snapshot_permit = Arc::new(Semaphore::new(1)).acquire_owned().await.unwrap();
wal.remove_snapshot_wal_files(snapshot_details, snapshot_permit)
.await;

let (oldest, last) = wal.wal_remover.get_current_state();
assert_eq!(25, oldest);
assert_eq!(35, last);

for i in 20..=24 {
let err = object_store
.get(&wal_path("my_host", WalFileSequenceNumber::new(i)))
.await
.err()
.unwrap();

assert!(matches!(
err,
object_store::Error::NotFound { path: _, source: _ }
));
}

for i in 25..35 {
let _ = object_store
.get(&wal_path("my_host", WalFileSequenceNumber::new(i)))
.await
.ok()
.unwrap();
}
}

#[derive(Debug, Default)]
struct TestNotifier {
notified_writes: parking_lot::Mutex<Vec<Arc<WalContents>>>,
Expand Down

0 comments on commit 6ebbf26

Please sign in to comment.