Skip to content

Commit

Permalink
Add best-effort limits on async file opens to reduce file handle coun…
Browse files Browse the repository at this point in the history
…ts (#20055)

As described in #19765, `2.17.x` uses more file handles than previous
versions. Based on the location of the reported error, I suspect that
this is due to the move from using the LMDB store for all files, to
using the filesystem-based store for large files (#18153).

In particular: rather than digesting files inside of `spawn_blocking`
while capturing them into the LMDB store (imposing the [limit of
blocking
threads](https://docs.rs/tokio/latest/tokio/runtime/struct.Builder.html#method.max_blocking_threads)
from the tokio runtime), `fn store` moved to digesting them using
tokio's async file APIs, which impose no such limit.

This change adds a semaphore to (some) file opens to provide a
best-effort limit on files opened for the purposes of being captured. It
additionally (in the first commit) fixes an extraneous file handle that
was being kept open during capture.

Fixes #19765.
  • Loading branch information
stuhood authored and WorkerPants committed Oct 24, 2023
1 parent a4a3a3e commit fd405ec
Showing 1 changed file with 62 additions and 9 deletions.
71 changes: 62 additions & 9 deletions src/rust/engine/fs/store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use task_executor::Executor;
use tempfile::Builder;
use tokio::fs::hard_link;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use tokio::sync::{Semaphore, SemaphorePermit};
use workunit_store::ObservationMetric;

/// How big a file must be to be stored as a file on disk.
Expand Down Expand Up @@ -58,6 +59,7 @@ trait UnderlyingByteStore {
initial_lease: bool,
src_is_immutable: bool,
expected_digest: Digest,
file_source: &FileSource,
src: PathBuf,
) -> Result<(), String>;

Expand Down Expand Up @@ -113,13 +115,18 @@ impl UnderlyingByteStore for ShardedLmdb {
initial_lease: bool,
src_is_immutable: bool,
expected_digest: Digest,
_file_source: &FileSource,
src: PathBuf,
) -> Result<(), String> {
self.store(
initial_lease,
src_is_immutable,
expected_digest,
move || std::fs::File::open(&src),
move || {
// NB: This file access is bounded by the number of blocking threads on the runtime, and
// so we don't bother to acquire against the file handle limit in this case.
std::fs::File::open(&src)
},
)
.await
}
Expand Down Expand Up @@ -396,11 +403,13 @@ impl UnderlyingByteStore for ShardedFSDB {
_initial_lease: bool,
src_is_immutable: bool,
expected_digest: Digest,
file_source: &FileSource,
src: PathBuf,
) -> Result<(), String> {
let mut attempts = 0;
loop {
let reader = tokio::fs::File::open(src.clone())
let (reader, _permit) = file_source
.open_readonly(&src)
.await
.map_err(|e| format!("Failed to open {src:?}: {e}"))?;

Expand Down Expand Up @@ -519,6 +528,29 @@ impl UnderlyingByteStore for ShardedFSDB {
}
}

/// A best-effort limit on the number of concurrent attempts to open files.
#[derive(Debug)]
struct FileSource {
open_files: Semaphore,
}

impl FileSource {
async fn open_readonly(
&self,
path: &Path,
) -> Result<(tokio::fs::File, SemaphorePermit), String> {
let permit = self
.open_files
.acquire()
.await
.map_err(|e| format!("Failed to acquire permit to open file: {e}"))?;
let file = tokio::fs::File::open(path)
.await
.map_err(|e| e.to_string())?;
Ok((file, permit))
}
}

#[derive(Debug, Clone)]
pub struct ByteStore {
inner: Arc<InnerStore>,
Expand All @@ -532,6 +564,7 @@ struct InnerStore {
file_lmdb: Result<Arc<ShardedLmdb>, String>,
directory_lmdb: Result<Arc<ShardedLmdb>, String>,
file_fsdb: ShardedFSDB,
file_source: FileSource,
}

impl ByteStore {
Expand Down Expand Up @@ -582,6 +615,13 @@ impl ByteStore {
dest_initializer: Arc::new(Mutex::default()),
hardlinkable_destinations: Arc::new(Mutex::default()),
},
// NB: This is much larger than the number of cores on modern machines, but still small
// enough to be a "reasonable" number of open files to set in `ulimit`. This is a
// best-effort limit (because it does-not/cannot cover all of the places where we open
// files).
file_source: FileSource {
open_files: Semaphore::new(1024),
},
}),
})
}
Expand Down Expand Up @@ -790,17 +830,28 @@ impl ByteStore {
src_is_immutable: bool,
src: PathBuf,
) -> Result<Digest, String> {
let mut file = tokio::fs::File::open(src.clone())
.await
.map_err(|e| format!("Failed to open {src:?}: {e}"))?;
let digest = async_copy_and_hash(&mut file, &mut tokio::io::sink())
.await
.map_err(|e| format!("Failed to hash {src:?}: {e}"))?;
let digest = {
let (mut file, _permit) = self
.inner
.file_source
.open_readonly(&src)
.await
.map_err(|e| format!("Failed to open {src:?}: {e}"))?;
async_copy_and_hash(&mut file, &mut tokio::io::sink())
.await
.map_err(|e| format!("Failed to hash {src:?}: {e}"))?
};

if ByteStore::should_use_fsdb(entry_type, digest.size_bytes) {
self.inner
.file_fsdb
.store(initial_lease, src_is_immutable, digest, src)
.store(
initial_lease,
src_is_immutable,
digest,
&self.inner.file_source,
src,
)
.await?;
} else {
let dbs = match entry_type {
Expand All @@ -809,6 +860,8 @@ impl ByteStore {
};
let _ = dbs
.store(initial_lease, src_is_immutable, digest, move || {
// NB: This file access is bounded by the number of blocking threads on the runtime, and
// so we don't bother to acquire against the file handle limit in this case.
std::fs::File::open(&src)
})
.await;
Expand Down

0 comments on commit fd405ec

Please sign in to comment.