diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 71fc732ac0a..d1b01eda73e 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -104,7 +104,7 @@ pub struct DirectoryMaterializeMetadata { } impl DirectoryMaterializeMetadata { - pub fn to_path_list(&self) -> Vec { + fn to_path_list(&self) -> Vec { fn recurse( outputs: &mut Vec, path_so_far: PathBuf, @@ -122,6 +122,10 @@ impl DirectoryMaterializeMetadata { recurse(&mut output_paths, PathBuf::new(), self); output_paths } + + pub fn contains(&self, path: &str) -> bool { + self.to_path_list().contains(&path.to_string()) + } } #[derive(Debug)] @@ -726,40 +730,13 @@ impl Store { digest: Digest, ) -> BoxFuture { let root = Arc::new(Mutex::new(None)); - let executor = self.local.executor().clone(); self .materialize_directory_helper( - destination.clone(), + destination, RootOrParentMetadataBuilder::Root(root.clone()), digest, ) - .and_then(move |()| { - let materialize_metadata = Arc::try_unwrap(root).unwrap().into_inner().unwrap().build(); - // We fundamentally materialize files for other processes to read; as such, we must ensure - // data is flushed to disk and visible to them as opposed to just our process. Even though - // we need to re-open all written files, executing all fsyncs at the end of the - // materialize call is significantly faster than doing it as we go. - future::join_all( - materialize_metadata - .to_path_list() - .into_iter() - .map(|path| { - let path = destination.join(path); - executor - .spawn_blocking(move || { - OpenOptions::new() - .write(true) - .create(false) - .open(path)? - .sync_all() - }) - .compat() - }) - .collect::>(), - ) - .map_err(|e| format!("Failed to fsync directory contents: {}", e)) - .map(move |_| materialize_metadata) - }) + .and_then(move |()| Ok(Arc::try_unwrap(root).unwrap().into_inner().unwrap().build())) .to_boxed() } @@ -831,7 +808,7 @@ impl Store { let child_files = child_files.clone(); let name = file_node.get_name().to_owned(); store - .materialize_file(path, digest, file_node.is_executable, false) + .materialize_file(path, digest, file_node.is_executable) .map(move |metadata| child_files.lock().insert(name, metadata)) .to_boxed() }) @@ -864,16 +841,11 @@ impl Store { .to_boxed() } - /// - /// Materializes a single file. This method is private because generally files should be - /// materialized together via `materialize_directory`, which handles batch fsync'ing. - /// fn materialize_file( &self, destination: PathBuf, digest: Digest, is_executable: bool, - fsync: bool, ) -> BoxFuture { let store = self.clone(); let res = async move { @@ -881,28 +853,19 @@ impl Store { .load_file_bytes_with(digest, move |bytes| { if destination.exists() { std::fs::remove_file(&destination) - } else { - Ok(()) + .map_err(|e| format!("Failed to overwrite {:?}: {:?}", destination, e))?; } - .and_then(|_| { - OpenOptions::new() - .create(true) - .write(true) - .mode(if is_executable { 0o755 } else { 0o644 }) - .open(&destination) - }) - .and_then(|mut f| { - f.write_all(&bytes)?; - if fsync { - f.sync_all() - } else { - Ok(()) - } - }) - .map_err(|e| format!("Error writing file {:?}: {:?}", destination, e)) + let mut f = OpenOptions::new() + .create(true) + .write(true) + .mode(if is_executable { 0o755 } else { 0o644 }) + .open(&destination) + .map_err(|e| format!("Error opening file {:?} for writing: {:?}", destination, e))?; + f.write_all(&bytes) + .map_err(|e| format!("Error writing file {:?}: {:?}", destination, e))?; + Ok(()) }) .await?; - match write_result { Some((Ok(()), metadata)) => Ok(metadata), Some((Err(e), _metadata)) => Err(e), diff --git a/src/rust/engine/fs/store/src/tests.rs b/src/rust/engine/fs/store/src/tests.rs index 063b47806e8..f14cd19957b 100644 --- a/src/rust/engine/fs/store/src/tests.rs +++ b/src/rust/engine/fs/store/src/tests.rs @@ -1007,7 +1007,7 @@ async fn materialize_missing_file() { let store_dir = TempDir::new().unwrap(); let store = new_local_store(store_dir.path()); store - .materialize_file(file.clone(), TestData::roland().digest(), false, true) + .materialize_file(file.clone(), TestData::roland().digest(), false) .compat() .await .expect_err("Want unknown digest error"); @@ -1027,7 +1027,7 @@ async fn materialize_file() { .await .expect("Error saving bytes"); store - .materialize_file(file.clone(), testdata.digest(), false, true) + .materialize_file(file.clone(), testdata.digest(), false) .compat() .await .expect("Error materializing file"); @@ -1049,7 +1049,7 @@ async fn materialize_file_executable() { .await .expect("Error saving bytes"); store - .materialize_file(file.clone(), testdata.digest(), true, true) + .materialize_file(file.clone(), testdata.digest(), true) .compat() .await .expect("Error materializing file"); diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index c710203800f..89518106aa8 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -182,6 +182,10 @@ impl RelativePath { pub fn to_str(&self) -> Option<&str> { self.0.to_str() } + + pub fn join(&self, other: Self) -> RelativePath { + RelativePath(self.0.join(other)) + } } impl AsRef for RelativePath { diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index d8d0a19c55a..00d97ffe97d 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -4,7 +4,6 @@ use fs::{self, GlobExpansionConjunction, GlobMatching, PathGlobs, StrictGlobMatc use futures::compat::Future01CompatExt; use futures::future::{FutureExt, TryFutureExt}; use futures::stream::{BoxStream, StreamExt, TryStreamExt}; -use lazy_static::lazy_static; use log::{debug, info}; use nails::execution::{ChildOutput, ExitCode}; use shell_quote::bash; @@ -12,7 +11,7 @@ use shell_quote::bash; use std::collections::{BTreeSet, HashSet}; use std::ffi::OsStr; use std::fs::create_dir_all; -use std::io::{BufRead, BufReader, Write}; +use std::io::Write; use std::ops::Neg; use std::os::unix::{ fs::{symlink, OpenOptionsExt}, @@ -24,13 +23,14 @@ use std::str; use std::sync::Arc; use store::{OneOffStoreFileByDigest, Snapshot, Store}; +use parking_lot::RwLock; use tokio::process::Command; use tokio::time::timeout; use tokio_util::codec::{BytesCodec, FramedRead}; use crate::{ Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, NamedCaches, Platform, - PlatformConstraint, Process, + PlatformConstraint, Process, RelativePath, }; use bytes::{Bytes, BytesMut}; @@ -45,6 +45,7 @@ pub struct CommandRunner { named_caches: NamedCaches, cleanup_local_dirs: bool, platform: Platform, + spawn_lock: Arc>, } impl CommandRunner { @@ -62,6 +63,7 @@ impl CommandRunner { named_caches, cleanup_local_dirs, platform: Platform::current().unwrap(), + spawn_lock: Arc::new(RwLock::new(true)), } } @@ -115,83 +117,6 @@ impl CommandRunner { } } -lazy_static! { - static ref IS_LIKELY_IN_DOCKER: bool = is_likely_in_docker().unwrap_or_else(|e| { - log::warn!( - "Failed to detect whether we are running in docker: {}\n\n\ - Please file an issue at https://github.com/pantsbuild/pants/issues/new", - e - ); - false - }); -} - -/// -/// Attempts to detect whether we are running inside a docker container. -/// -/// NB: Do not call this directly: it is stored in the IS_LIKELY_IN_DOCKER lazy_static. -/// -fn is_likely_in_docker() -> Result { - if cfg!(not(target_os = "linux")) { - return Ok(false); - } - - // Attempt to detect whether we are in docker. See: - // https://stackoverflow.com/questions/20010199/how-to-determine-if-a-process-runs-inside-lxc-docker - let cgroups_matches = { - BufReader::new( - std::fs::File::open("/proc/1/cgroup") - .map_err(|e| format!("Failed to inspect `/proc` to detect docker: {}", e))?, - ) - .lines() - .filter_map(|line_result| match line_result { - Ok(line) if line.contains("docker") || line.contains("lxc") => Some(line), - _ => None, - }) - .collect::>() - }; - - if cgroups_matches.is_empty() { - Ok(false) - } else { - log::debug!( - "Detected potential docker container based on cgroups ({}): will `sync` \ - before executing processes.", - cgroups_matches.join(", ") - ); - Ok(true) - } -} - -/// -/// If we are potentially running inside a docker container (TODO: technically only `aufs` is -/// relevant), `sync` the filesystem. Noop on other platforms. -/// -/// See https://github.com/moby/moby/issues/9547. -/// -async fn sync_if_needed() -> Result<(), String> { - if !(*IS_LIKELY_IN_DOCKER) { - return Ok(()); - } - - let output = Command::new("/bin/sync") - .stdin(Stdio::null()) - .output() - .await - .map_err(|e| format!("Failed to spawn `sync` in likely docker container: {}", e))?; - - if !output.status.success() { - let stdout = String::from_utf8_lossy(&output.stdout); - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(format!( - "Failed to run `/bin/sync` in likely docker container ({}): stdout: {}, stderr: {}", - output.status, stdout, stderr - )); - } - - Ok(()) -} - pub struct StreamedHermeticCommand { inner: Command, } @@ -380,16 +305,23 @@ impl CapturedWorkdir for CommandRunner { workdir_path: &'b Path, req: Process, _context: Context, + exclusive_spawn: bool, ) -> Result>, String> { - StreamedHermeticCommand::new(&req.argv[0]) + let mut command = StreamedHermeticCommand::new(&req.argv[0]); + command .args(&req.argv[1..]) .current_dir(if let Some(ref working_directory) = req.working_directory { workdir_path.join(working_directory) } else { workdir_path.to_owned() }) - .envs(&req.env) - .stream(&req) + .envs(&req.env); + let _locked = if exclusive_spawn { + *self.spawn_lock.write() + } else { + *self.spawn_lock.read() + }; + command.stream(&req) } } @@ -443,7 +375,7 @@ pub trait CapturedWorkdir { // Start with async materialization of input snapshots, followed by synchronous materialization // of other configured inputs. Note that we don't do this in parallel, as that might cause // non-determinism when paths overlap. - let _metadata = store + let sandbox = store .materialize_directory(workdir_path.clone(), req.input_files) .compat() .await?; @@ -500,6 +432,21 @@ pub trait CapturedWorkdir { }) .await?; + let exclusive_spawn = RelativePath::new(&req.argv[0]).map_or(false, |relative_path| { + let executable_path = if let Some(working_drectory) = &req.working_directory { + working_drectory.join(relative_path) + } else { + relative_path + }; + if let Some(exe) = executable_path.to_str() { + let exe_was_materialized = sandbox.contains(exe); + debug!("Obtaining exclusive spawn lock for process with argv {:?} since we materialized its binary {}.", &req.argv, exe); + exe_was_materialized + } else { + false + } + }); + // Spawn the process. // NB: We fully buffer up the `Stream` above into final `ChildResults` below and so could // instead be using `CommandExt::output_async` above to avoid the `ChildResults::collect_from` @@ -507,11 +454,12 @@ pub trait CapturedWorkdir { // down the line for streaming process results to console logs, etc. as tracked by: // https://github.com/pantsbuild/pants/issues/6089 let child_results_result = { - // Now that all inputs are on disk, `sync` if this platform requires it. - sync_if_needed().await?; - - let child_results_future = - ChildResults::collect_from(self.run_in_workdir(&workdir_path, req.clone(), context)?); + let child_results_future = ChildResults::collect_from(self.run_in_workdir( + &workdir_path, + req.clone(), + context, + exclusive_spawn, + )?); if let Some(req_timeout) = req.timeout { timeout(req_timeout, child_results_future) .await @@ -651,5 +599,6 @@ export {} workdir_path: &'b Path, req: Process, context: Context, + exclusive_spawn: bool, ) -> Result>, String>; } diff --git a/src/rust/engine/process_execution/src/nailgun/mod.rs b/src/rust/engine/process_execution/src/nailgun/mod.rs index 92ac8454756..c4c4484e81e 100644 --- a/src/rust/engine/process_execution/src/nailgun/mod.rs +++ b/src/rust/engine/process_execution/src/nailgun/mod.rs @@ -191,6 +191,7 @@ impl CapturedWorkdir for CommandRunner { workdir_path: &'b Path, req: Process, context: Context, + _exclusive_spawn: bool, ) -> Result>, String> { // Separate argument lists, to form distinct EPRs for (1) starting the nailgun server and (2) running the client in it. let ParsedJVMCommandLines {