Skip to content

Commit

Permalink
Spawning against materialized binaries works.
Browse files Browse the repository at this point in the history
It's generically unsafe to fork+exec against binaries written out in a
multithreaded program when concurrent forks are possible. Even if all
files are opened for writing with O_CLOEXEC (which is the case for
Rust) if thread1 opens a file for writing and then thread2 forks,
thread2 will hold an open file descriptor. If thread2's subsequent exec
is delayed past the fork+exec thread1 does against the file it wrote,
then the thread1 fork+exec'd process will encounter ETXTBSY.

OSX "solves" this by retrying some number of times when it hits ETXTBSY,
but Linux does not attempt this hack. O_CLOFORK has been proposed and
adopted by some unices, but not Linux. As such we need to make some
tradeoff to allow this use case. This change introduces a lock around
process spawns (fork+exec) to prevent interleaved fork / exec whenever
there is a spawn that we know exec's a binary we wrote out.

Fixes pantsbuild#10507

[ci skip-build-wheels]
  • Loading branch information
jsirois committed Aug 15, 2020
1 parent 3917782 commit 6b4705c
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 147 deletions.
73 changes: 18 additions & 55 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub struct DirectoryMaterializeMetadata {
}

impl DirectoryMaterializeMetadata {
pub fn to_path_list(&self) -> Vec<String> {
fn to_path_list(&self) -> Vec<String> {
fn recurse(
outputs: &mut Vec<String>,
path_so_far: PathBuf,
Expand All @@ -122,6 +122,10 @@ impl DirectoryMaterializeMetadata {
recurse(&mut output_paths, PathBuf::new(), self);
output_paths
}

pub fn contains(&self, path: &str) -> bool {
self.to_path_list().contains(&path.to_string())
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -726,40 +730,13 @@ impl Store {
digest: Digest,
) -> BoxFuture<DirectoryMaterializeMetadata, String> {
let root = Arc::new(Mutex::new(None));
let executor = self.local.executor().clone();
self
.materialize_directory_helper(
destination.clone(),
destination,
RootOrParentMetadataBuilder::Root(root.clone()),
digest,
)
.and_then(move |()| {
let materialize_metadata = Arc::try_unwrap(root).unwrap().into_inner().unwrap().build();
// We fundamentally materialize files for other processes to read; as such, we must ensure
// data is flushed to disk and visible to them as opposed to just our process. Even though
// we need to re-open all written files, executing all fsyncs at the end of the
// materialize call is significantly faster than doing it as we go.
future::join_all(
materialize_metadata
.to_path_list()
.into_iter()
.map(|path| {
let path = destination.join(path);
executor
.spawn_blocking(move || {
OpenOptions::new()
.write(true)
.create(false)
.open(path)?
.sync_all()
})
.compat()
})
.collect::<Vec<_>>(),
)
.map_err(|e| format!("Failed to fsync directory contents: {}", e))
.map(move |_| materialize_metadata)
})
.and_then(move |()| Ok(Arc::try_unwrap(root).unwrap().into_inner().unwrap().build()))
.to_boxed()
}

Expand Down Expand Up @@ -831,7 +808,7 @@ impl Store {
let child_files = child_files.clone();
let name = file_node.get_name().to_owned();
store
.materialize_file(path, digest, file_node.is_executable, false)
.materialize_file(path, digest, file_node.is_executable)
.map(move |metadata| child_files.lock().insert(name, metadata))
.to_boxed()
})
Expand Down Expand Up @@ -864,45 +841,31 @@ impl Store {
.to_boxed()
}

///
/// Materializes a single file. This method is private because generally files should be
/// materialized together via `materialize_directory`, which handles batch fsync'ing.
///
fn materialize_file(
&self,
destination: PathBuf,
digest: Digest,
is_executable: bool,
fsync: bool,
) -> BoxFuture<LoadMetadata, String> {
let store = self.clone();
let res = async move {
let write_result = store
.load_file_bytes_with(digest, move |bytes| {
if destination.exists() {
std::fs::remove_file(&destination)
} else {
Ok(())
.map_err(|e| format!("Failed to overwrite {:?}: {:?}", destination, e))?;
}
.and_then(|_| {
OpenOptions::new()
.create(true)
.write(true)
.mode(if is_executable { 0o755 } else { 0o644 })
.open(&destination)
})
.and_then(|mut f| {
f.write_all(&bytes)?;
if fsync {
f.sync_all()
} else {
Ok(())
}
})
.map_err(|e| format!("Error writing file {:?}: {:?}", destination, e))
let mut f = OpenOptions::new()
.create(true)
.write(true)
.mode(if is_executable { 0o755 } else { 0o644 })
.open(&destination)
.map_err(|e| format!("Error opening file {:?} for writing: {:?}", destination, e))?;
f.write_all(&bytes)
.map_err(|e| format!("Error writing file {:?}: {:?}", destination, e))?;
Ok(())
})
.await?;

match write_result {
Some((Ok(()), metadata)) => Ok(metadata),
Some((Err(e), _metadata)) => Err(e),
Expand Down
6 changes: 3 additions & 3 deletions src/rust/engine/fs/store/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ async fn materialize_missing_file() {
let store_dir = TempDir::new().unwrap();
let store = new_local_store(store_dir.path());
store
.materialize_file(file.clone(), TestData::roland().digest(), false, true)
.materialize_file(file.clone(), TestData::roland().digest(), false)
.compat()
.await
.expect_err("Want unknown digest error");
Expand All @@ -1027,7 +1027,7 @@ async fn materialize_file() {
.await
.expect("Error saving bytes");
store
.materialize_file(file.clone(), testdata.digest(), false, true)
.materialize_file(file.clone(), testdata.digest(), false)
.compat()
.await
.expect("Error materializing file");
Expand All @@ -1049,7 +1049,7 @@ async fn materialize_file_executable() {
.await
.expect("Error saving bytes");
store
.materialize_file(file.clone(), testdata.digest(), true, true)
.materialize_file(file.clone(), testdata.digest(), true)
.compat()
.await
.expect("Error materializing file");
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ impl RelativePath {
pub fn to_str(&self) -> Option<&str> {
self.0.to_str()
}

pub fn join(&self, other: Self) -> RelativePath {
RelativePath(self.0.join(other))
}
}

impl AsRef<Path> for RelativePath {
Expand Down
127 changes: 38 additions & 89 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ use fs::{self, GlobExpansionConjunction, GlobMatching, PathGlobs, StrictGlobMatc
use futures::compat::Future01CompatExt;
use futures::future::{FutureExt, TryFutureExt};
use futures::stream::{BoxStream, StreamExt, TryStreamExt};
use lazy_static::lazy_static;
use log::{debug, info};
use nails::execution::{ChildOutput, ExitCode};
use shell_quote::bash;

use std::collections::{BTreeSet, HashSet};
use std::ffi::OsStr;
use std::fs::create_dir_all;
use std::io::{BufRead, BufReader, Write};
use std::io::Write;
use std::ops::Neg;
use std::os::unix::{
fs::{symlink, OpenOptionsExt},
Expand All @@ -24,13 +23,14 @@ use std::str;
use std::sync::Arc;
use store::{OneOffStoreFileByDigest, Snapshot, Store};

use parking_lot::RwLock;
use tokio::process::Command;
use tokio::time::timeout;
use tokio_util::codec::{BytesCodec, FramedRead};

use crate::{
Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, NamedCaches, Platform,
PlatformConstraint, Process,
PlatformConstraint, Process, RelativePath,
};

use bytes::{Bytes, BytesMut};
Expand All @@ -45,6 +45,7 @@ pub struct CommandRunner {
named_caches: NamedCaches,
cleanup_local_dirs: bool,
platform: Platform,
spawn_lock: Arc<RwLock<bool>>,
}

impl CommandRunner {
Expand All @@ -62,6 +63,7 @@ impl CommandRunner {
named_caches,
cleanup_local_dirs,
platform: Platform::current().unwrap(),
spawn_lock: Arc::new(RwLock::new(true)),
}
}

Expand Down Expand Up @@ -115,83 +117,6 @@ impl CommandRunner {
}
}

lazy_static! {
static ref IS_LIKELY_IN_DOCKER: bool = is_likely_in_docker().unwrap_or_else(|e| {
log::warn!(
"Failed to detect whether we are running in docker: {}\n\n\
Please file an issue at https://github.com/pantsbuild/pants/issues/new",
e
);
false
});
}

///
/// Attempts to detect whether we are running inside a docker container.
///
/// NB: Do not call this directly: it is stored in the IS_LIKELY_IN_DOCKER lazy_static.
///
fn is_likely_in_docker() -> Result<bool, String> {
if cfg!(not(target_os = "linux")) {
return Ok(false);
}

// Attempt to detect whether we are in docker. See:
// https://stackoverflow.com/questions/20010199/how-to-determine-if-a-process-runs-inside-lxc-docker
let cgroups_matches = {
BufReader::new(
std::fs::File::open("/proc/1/cgroup")
.map_err(|e| format!("Failed to inspect `/proc` to detect docker: {}", e))?,
)
.lines()
.filter_map(|line_result| match line_result {
Ok(line) if line.contains("docker") || line.contains("lxc") => Some(line),
_ => None,
})
.collect::<Vec<_>>()
};

if cgroups_matches.is_empty() {
Ok(false)
} else {
log::debug!(
"Detected potential docker container based on cgroups ({}): will `sync` \
before executing processes.",
cgroups_matches.join(", ")
);
Ok(true)
}
}

///
/// If we are potentially running inside a docker container (TODO: technically only `aufs` is
/// relevant), `sync` the filesystem. Noop on other platforms.
///
/// See https://github.com/moby/moby/issues/9547.
///
async fn sync_if_needed() -> Result<(), String> {
if !(*IS_LIKELY_IN_DOCKER) {
return Ok(());
}

let output = Command::new("/bin/sync")
.stdin(Stdio::null())
.output()
.await
.map_err(|e| format!("Failed to spawn `sync` in likely docker container: {}", e))?;

if !output.status.success() {
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!(
"Failed to run `/bin/sync` in likely docker container ({}): stdout: {}, stderr: {}",
output.status, stdout, stderr
));
}

Ok(())
}

pub struct StreamedHermeticCommand {
inner: Command,
}
Expand Down Expand Up @@ -380,16 +305,23 @@ impl CapturedWorkdir for CommandRunner {
workdir_path: &'b Path,
req: Process,
_context: Context,
exclusive_spawn: bool,
) -> Result<BoxStream<'c, Result<ChildOutput, String>>, String> {
StreamedHermeticCommand::new(&req.argv[0])
let mut command = StreamedHermeticCommand::new(&req.argv[0]);
command
.args(&req.argv[1..])
.current_dir(if let Some(ref working_directory) = req.working_directory {
workdir_path.join(working_directory)
} else {
workdir_path.to_owned()
})
.envs(&req.env)
.stream(&req)
.envs(&req.env);
let _locked = if exclusive_spawn {
*self.spawn_lock.write()
} else {
*self.spawn_lock.read()
};
command.stream(&req)
}
}

Expand Down Expand Up @@ -443,7 +375,7 @@ pub trait CapturedWorkdir {
// Start with async materialization of input snapshots, followed by synchronous materialization
// of other configured inputs. Note that we don't do this in parallel, as that might cause
// non-determinism when paths overlap.
let _metadata = store
let sandbox = store
.materialize_directory(workdir_path.clone(), req.input_files)
.compat()
.await?;
Expand Down Expand Up @@ -500,18 +432,34 @@ pub trait CapturedWorkdir {
})
.await?;

let exclusive_spawn = RelativePath::new(&req.argv[0]).map_or(false, |relative_path| {
let executable_path = if let Some(working_drectory) = &req.working_directory {
working_drectory.join(relative_path)
} else {
relative_path
};
if let Some(exe) = executable_path.to_str() {
let exe_was_materialized = sandbox.contains(exe);
debug!("Obtaining exclusive spawn lock for process with argv {:?} since we materialized its binary {}.", &req.argv, exe);
exe_was_materialized
} else {
false
}
});

// Spawn the process.
// NB: We fully buffer up the `Stream` above into final `ChildResults` below and so could
// instead be using `CommandExt::output_async` above to avoid the `ChildResults::collect_from`
// code. The idea going forward though is we eventually want to pass incremental results on
// down the line for streaming process results to console logs, etc. as tracked by:
// https://github.com/pantsbuild/pants/issues/6089
let child_results_result = {
// Now that all inputs are on disk, `sync` if this platform requires it.
sync_if_needed().await?;

let child_results_future =
ChildResults::collect_from(self.run_in_workdir(&workdir_path, req.clone(), context)?);
let child_results_future = ChildResults::collect_from(self.run_in_workdir(
&workdir_path,
req.clone(),
context,
exclusive_spawn,
)?);
if let Some(req_timeout) = req.timeout {
timeout(req_timeout, child_results_future)
.await
Expand Down Expand Up @@ -651,5 +599,6 @@ export {}
workdir_path: &'b Path,
req: Process,
context: Context,
exclusive_spawn: bool,
) -> Result<BoxStream<'c, Result<ChildOutput, String>>, String>;
}
1 change: 1 addition & 0 deletions src/rust/engine/process_execution/src/nailgun/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ impl CapturedWorkdir for CommandRunner {
workdir_path: &'b Path,
req: Process,
context: Context,
_exclusive_spawn: bool,
) -> Result<BoxStream<'c, Result<ChildOutput, String>>, String> {
// Separate argument lists, to form distinct EPRs for (1) starting the nailgun server and (2) running the client in it.
let ParsedJVMCommandLines {
Expand Down

0 comments on commit 6b4705c

Please sign in to comment.