Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make hash cache robust to crash and restart #5032

Merged
merged 6 commits into from
Feb 19, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 49 additions & 16 deletions accounts-db/src/cache_hash_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ impl Drop for CacheHashData {
}
}

/// The suffix to append to a cache hash data filename to indicate that the file is being written.
const IN_PROGRESS_SUFFIX: &str = ".in-progress";

impl CacheHashData {
pub(crate) fn new(cache_dir: PathBuf, deletion_policy: DeletionPolicy) -> CacheHashData {
std::fs::create_dir_all(&cache_dir).unwrap_or_else(|err| {
Expand Down Expand Up @@ -269,6 +272,12 @@ impl CacheHashData {
if let Ok(dir) = dir {
let mut pre_existing = self.pre_existing_cache_files.lock().unwrap();
for entry in dir.flatten() {
if entry.path().ends_with(IN_PROGRESS_SUFFIX) {
// ignore in-progress files and delete them
let _ = fs::remove_file(entry.path());
continue;
}

if let Some(name) = entry.path().file_name() {
pre_existing.insert(PathBuf::from(name));
}
Expand Down Expand Up @@ -320,26 +329,45 @@ impl CacheHashData {
file_name: impl AsRef<Path>,
data: &SavedTypeSlice,
) -> Result<(), std::io::Error> {
self.save_internal(file_name, data)
// delete any existing file at this path
let cache_path = self.cache_dir.join(file_name.as_ref());
let _ignored = remove_file(&cache_path);

// Append ".in-progress" to the filename to indicate that the file is
// being written
let work_in_progress_file_full_path = self
.cache_dir
.join(Self::get_work_in_progress_file_name(file_name.as_ref()));
Self::save_internal(&work_in_progress_file_full_path, data, &self.stats)?;
// Rename the file to remove the ".in-progress" suffix after the file
// has been successfully written. This is done to ensure that the file is
// not read before it has been completely written. For example, if the
// validator was stopped or crashed in the middle of writing the file, the file
// would be incomplete and would not be read by the validator on next restart.
fs::rename(work_in_progress_file_full_path, cache_path)
}

fn get_work_in_progress_file_name(file_name: impl AsRef<Path>) -> PathBuf {
let mut s = PathBuf::from(file_name.as_ref()).into_os_string();
s.push(IN_PROGRESS_SUFFIX);
s.into()
}

fn save_internal(
&self,
file_name: impl AsRef<Path>,
in_progress_cache_file_full_path: impl AsRef<Path>,
data: &SavedTypeSlice,
stats: &CacheHashDataStats,
) -> Result<(), std::io::Error> {
let mut m = Measure::start("save");
let cache_path = self.cache_dir.join(file_name);
// overwrite any existing file at this path
let _ignored = remove_file(&cache_path);
let _ignored = remove_file(&in_progress_cache_file_full_path);
let cell_size = std::mem::size_of::<EntryType>() as u64;
let mut m1 = Measure::start("create save");
let entries = data.iter().map(Vec::len).sum::<usize>();
let capacity = cell_size * (entries as u64) + std::mem::size_of::<Header>() as u64;

let mmap = CacheHashDataFile::new_map(&cache_path, capacity)?;
let mmap = CacheHashDataFile::new_map(&in_progress_cache_file_full_path, capacity)?;
m1.stop();
self.stats
stats
.create_save_us
.fetch_add(m1.as_us(), Ordering::Relaxed);
let mut cache_file = CacheHashDataFile {
Expand All @@ -351,12 +379,10 @@ impl CacheHashData {
let header = cache_file.get_header_mut();
header.count = entries;

self.stats
stats
.cache_file_size
.fetch_add(capacity as usize, Ordering::Relaxed);
self.stats
.total_entries
.fetch_add(entries, Ordering::Relaxed);
stats.total_entries.fetch_add(entries, Ordering::Relaxed);

let mut m2 = Measure::start("write_to_mmap");
let mut i = 0;
Expand All @@ -374,14 +400,14 @@ impl CacheHashData {
// entries will *not* be visible when the reader comes along.
let (_, measure_flush_us) = measure_us!(cache_file.mmap.flush()?);
m.stop();
self.stats
stats
.write_to_mmap_us
.fetch_add(m2.as_us(), Ordering::Relaxed);
self.stats
stats
.flush_mmap_us
.fetch_add(measure_flush_us, Ordering::Relaxed);
self.stats.save_us.fetch_add(m.as_us(), Ordering::Relaxed);
self.stats.saved_to_cache.fetch_add(1, Ordering::Relaxed);
stats.save_us.fetch_add(m.as_us(), Ordering::Relaxed);
stats.saved_to_cache.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
Expand Down Expand Up @@ -626,4 +652,11 @@ mod tests {
assert!(parse_filename(bad_filename).is_none());
}
}

#[test]
fn tet_get_work_in_progress_file_name() {
let filename = "test";
let work_in_progress_filename = CacheHashData::get_work_in_progress_file_name(filename);
assert_eq!(work_in_progress_filename.as_os_str(), "test.in-progress");
}
}
Loading