From fd7d675a2d852768e4843a2c0c761ec3ec4662d1 Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Sat, 10 Oct 2020 14:51:36 -0700 Subject: [PATCH 1/9] add remote caching support [ci skip-build-wheels] --- src/python/pants/engine/internals/native.py | 2 + src/python/pants/option/global_options.py | 20 + src/rust/engine/Cargo.lock | 30 +- src/rust/engine/process_execution/Cargo.toml | 1 + src/rust/engine/process_execution/src/lib.rs | 4 + .../engine/process_execution/src/remote.rs | 197 ++++---- .../process_execution/src/remote_cache.rs | 451 ++++++++++++++++++ .../src/remote_cache_tests.rs | 214 +++++++++ src/rust/engine/src/context.rs | 2 + src/rust/engine/src/externs/interface.rs | 6 +- .../engine/testutil/mock/src/action_cache.rs | 141 ++++++ src/rust/engine/testutil/mock/src/lib.rs | 5 +- 12 files changed, 969 insertions(+), 104 deletions(-) create mode 100644 src/rust/engine/process_execution/src/remote_cache.rs create mode 100644 src/rust/engine/process_execution/src/remote_cache_tests.rs create mode 100644 src/rust/engine/testutil/mock/src/action_cache.rs diff --git a/src/python/pants/engine/internals/native.py b/src/python/pants/engine/internals/native.py index 024f3db1835..951b2ca134a 100644 --- a/src/python/pants/engine/internals/native.py +++ b/src/python/pants/engine/internals/native.py @@ -260,6 +260,8 @@ def new_scheduler( speculation_strategy=execution_options.process_execution_speculation_strategy, use_local_cache=execution_options.process_execution_use_local_cache, local_enable_nailgun=execution_options.process_execution_local_enable_nailgun, + remote_cache_read=execution_options.remote_cache_read, + remote_cache_write=execution_options.remote_cache_write, ) return cast( diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index 8660dd65937..d8a0564debb 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -86,6 +86,8 @@ class ExecutionOptions: remote_execution_headers: Any remote_execution_overall_deadline_secs: int process_execution_local_enable_nailgun: bool + remote_cache_read: bool + remote_cache_write: bool @classmethod def from_bootstrap_options(cls, bootstrap_options): @@ -112,6 +114,8 @@ def from_bootstrap_options(cls, bootstrap_options): remote_execution_headers=bootstrap_options.remote_execution_headers, remote_execution_overall_deadline_secs=bootstrap_options.remote_execution_overall_deadline_secs, process_execution_local_enable_nailgun=bootstrap_options.process_execution_local_enable_nailgun, + remote_cache_read=bootstrap_options.remote_cache_read, + remote_cache_write=bootstrap_options.remote_cache_write, ) @@ -138,6 +142,8 @@ def from_bootstrap_options(cls, bootstrap_options): remote_execution_headers={}, remote_execution_overall_deadline_secs=60 * 60, # one hour process_execution_local_enable_nailgun=False, + remote_cache_read=False, + remote_cache_write=False, ) @@ -759,6 +765,20 @@ def register_bootstrap_options(cls, register): help="Whether or not to use nailgun to run the requests that are marked as nailgunnable.", advanced=True, ) + register( + "--remote-cache-read", + type=bool, + default=DEFAULT_EXECUTION_OPTIONS.remote_cache_read, + advanced=True, + help="Whether to enable reading from a remote cache", + ) + register( + "--remote-cache-write", + type=bool, + default=DEFAULT_EXECUTION_OPTIONS.remote_cache_write, + advanced=True, + help="Whether to enable writing results to a remote cache", + ) @classmethod def register_options(cls, register): diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 8848f18a2f8..af6e8298832 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -207,7 +207,7 @@ dependencies = [ "bytes 0.4.12", "clap", "dirs", - "env_logger", + "env_logger 0.5.13", "errno", "fuse", "futures 0.3.5", @@ -695,7 +695,7 @@ dependencies = [ "concrete_time", "cpython", "crossbeam-channel", - "env_logger", + "env_logger 0.5.13", "fnv", "fs", "futures 0.1.29", @@ -743,6 +743,19 @@ dependencies = [ "termcolor", ] +[[package]] +name = "env_logger" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aafcde04e90a5226a6443b7aabdb016ba2f8307c847d524724bd9b346dd1a2d3" +dependencies = [ + "atty", + "humantime", + "log 0.4.8", + "regex", + "termcolor", +] + [[package]] name = "errno" version = "0.2.4" @@ -837,7 +850,7 @@ dependencies = [ "boxfuture", "bytes 0.4.12", "clap", - "env_logger", + "env_logger 0.5.13", "fs", "futures 0.1.29", "futures 0.3.5", @@ -1067,7 +1080,7 @@ name = "graph" version = "0.0.1" dependencies = [ "async-trait", - "env_logger", + "env_logger 0.5.13", "fnv", "futures 0.3.5", "log 0.4.8", @@ -1479,7 +1492,7 @@ name = "local_cas" version = "0.0.1" dependencies = [ "clap", - "env_logger", + "env_logger 0.5.13", "mock", ] @@ -2103,6 +2116,7 @@ dependencies = [ "concrete_time", "derivative", "double-checked-cell-async", + "env_logger 0.6.2", "fs", "futures 0.1.29", "futures 0.3.5", @@ -2141,7 +2155,7 @@ version = "0.0.1" dependencies = [ "clap", "dirs", - "env_logger", + "env_logger 0.5.13", "fs", "futures 0.3.5", "hashing", @@ -2170,7 +2184,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6325275b85605f58f576456a47af44417edf5956a6f670bb59fbe12aff69597" dependencies = [ "bytes 0.4.12", - "env_logger", + "env_logger 0.5.13", "heck", "itertools 0.7.11", "log 0.4.8", @@ -2601,7 +2615,7 @@ dependencies = [ name = "rule_graph" version = "0.0.1" dependencies = [ - "env_logger", + "env_logger 0.5.13", "indexmap", "log 0.4.8", "petgraph", diff --git a/src/rust/engine/process_execution/Cargo.toml b/src/rust/engine/process_execution/Cargo.toml index 2dd89b6d49d..df5968c9190 100644 --- a/src/rust/engine/process_execution/Cargo.toml +++ b/src/rust/engine/process_execution/Cargo.toml @@ -6,6 +6,7 @@ authors = [ "Pants Build " ] publish = false [dependencies] +env_logger = "0.6" async-trait = "0.1" walkdir = "2" async_semaphore = { path = "../async_semaphore" } diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index 9dd1cb559ed..f11491b7128 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -57,6 +57,10 @@ pub mod remote; #[cfg(test)] pub mod remote_tests; +pub mod remote_cache; +#[cfg(test)] +mod remote_cache_tests; + pub mod speculate; #[cfg(test)] mod speculate_tests; diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index b9188621725..2118d1f5500 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -33,6 +33,7 @@ use crate::{ Context, ExecutionStats, FallibleProcessResultWithPlatform, MultiPlatformProcess, Platform, PlatformConstraint, Process, ProcessMetadata, }; +use bazel_protos::remote_execution_grpc::ActionCacheClient; // Environment variable which is exclusively used for cache key invalidation. // This may be not specified in an Process, and may be populated only by the @@ -183,17 +184,6 @@ impl CommandRunner { self.platform } - async fn store_proto_locally(&self, proto: &P) -> Result { - let command_bytes = proto - .write_to_bytes() - .map_err(|e| format!("Error serializing proto {:?}", e))?; - self - .store - .store_file_bytes(Bytes::from(command_bytes), true) - .await - .map_err(|e| format!("Error saving proto to local store: {:?}", e)) - } - async fn get_capabilities( &self, ) -> Result<&bazel_protos::remote_execution::ServerCapabilities, String> { @@ -218,85 +208,6 @@ impl CommandRunner { .await } - /// Check the remote Action Cache for a cached result of running the given `action_digest`. - /// - /// This check is necessary because some RE servers do not short-circuit the Execute method - /// by checking the Action Cache (e.g., BuildBarn). Thus, this client must check the cache - /// explicitly in order to avoid duplicating already-cached work. This behavior matches - /// the Bazel RE client. - async fn check_action_cache( - &self, - action_digest: Digest, - metadata: &ProcessMetadata, - context: &Context, - ) -> Result, String> { - let mut request = bazel_protos::remote_execution::GetActionResultRequest::new(); - if let Some(ref instance_name) = metadata.instance_name { - request.set_instance_name(instance_name.clone()); - } - request.set_action_digest(action_digest.into()); - - let call_opt = call_option(&self.headers, Some(context.build_id.clone()))?; - - let action_result_response = self - .action_cache_client - .get_action_result_async_opt(&request, call_opt) - .unwrap() - .compat() - .await; - - match action_result_response { - Ok(action_result) => { - let response = populate_fallible_execution_result( - self.store.clone(), - &action_result, - vec![], - self.platform, - false, - ) - .compat() - .await?; - Ok(Some(response)) - } - Err(err) => match err { - grpcio::Error::RpcFailure(rpc_status) - if rpc_status.status == grpcio::RpcStatusCode::NOT_FOUND => - { - Ok(None) - } - _ => Err(rpcerror_to_string(err)), - }, - } - } - - async fn ensure_action_stored_locally( - &self, - command: &Command, - action: &Action, - ) -> Result<(Digest, Digest), String> { - let (command_digest, action_digest) = future::try_join( - self.store_proto_locally(command), - self.store_proto_locally(action), - ) - .await?; - - Ok((command_digest, action_digest)) - } - - async fn ensure_action_uploaded( - &self, - store: &Store, - command_digest: Digest, - action_digest: Digest, - input_files: Digest, - ) -> Result<(), String> { - let _ = store - .ensure_remote_has_recursive(vec![command_digest, action_digest, input_files]) - .compat() - .await?; - Ok(()) - } - // Monitors the operation stream returned by the REv2 Execute and WaitExecution methods. // Outputs progress reported by the server and returns the next actionable operation // or gRPC status back to the main loop (plus the operation name so the main loop can @@ -797,7 +708,7 @@ impl crate::CommandRunner for CommandRunner { context.workunit_store.clone(), "ensure_action_stored_locally".to_owned(), WorkunitMetadata::with_level(Level::Debug), - self.ensure_action_stored_locally(&command, &action), + ensure_action_stored_locally(&self.store, &command, &action), |_, md| md, ) .await?; @@ -808,7 +719,15 @@ impl crate::CommandRunner for CommandRunner { context.workunit_store.clone(), "check_action_cache".to_owned(), WorkunitMetadata::with_level(Level::Debug), - self.check_action_cache(action_digest, &self.metadata, &context), + check_action_cache( + action_digest, + &self.metadata, + self.platform, + &context, + self.action_cache_client.clone(), + &self.headers, + self.store.clone(), + ), |_, md| md, ) .await?; @@ -825,7 +744,7 @@ impl crate::CommandRunner for CommandRunner { context.workunit_store.clone(), "ensure_action_uploaded".to_owned(), WorkunitMetadata::with_level(Level::Debug), - self.ensure_action_uploaded(&store, command_digest, action_digest, request.input_files), + ensure_action_uploaded(&store, command_digest, action_digest, request.input_files), |_, md| md, ) .await?; @@ -1213,6 +1132,7 @@ pub fn extract_output_files( // Retrieve the Tree proto and hash its root `Directory` proto to obtain the digest // of the output directory needed to construct the series of `Directory` protos needed // for the final merge of the output directories. + log::info!("processing dir={:?}", dir); let tree_digest: Digest = dir.get_tree_digest().try_into()?; let root_digest_opt = store.load_tree_from_remote(tree_digest).await?; let root_digest = root_digest_opt @@ -1246,6 +1166,7 @@ pub fn extract_output_files( .get_output_files() .iter() .map(|output_file| { + log::info!("processing output_file={:?}", output_file); let output_file_path_buf = PathBuf::from(output_file.get_path()); let digest: Result = output_file.get_digest().try_into(); path_map.insert(output_file_path_buf.clone(), digest?); @@ -1315,6 +1236,94 @@ pub fn extract_output_files( .to_boxed() } +/// Check the remote Action Cache for a cached result of running the given `action_digest`. +/// +/// This check is necessary because some RE servers do not short-circuit the Execute method +/// by checking the Action Cache (e.g., BuildBarn). Thus, this client must check the cache +/// explicitly in order to avoid duplicating already-cached work. This behavior matches +/// the Bazel RE client. +pub async fn check_action_cache( + action_digest: Digest, + metadata: &ProcessMetadata, + platform: Platform, + context: &Context, + action_cache_client: Arc, + headers: &BTreeMap, + store: Store, +) -> Result, String> { + let mut request = bazel_protos::remote_execution::GetActionResultRequest::new(); + if let Some(ref instance_name) = metadata.instance_name { + request.set_instance_name(instance_name.clone()); + } + request.set_action_digest(action_digest.into()); + + let call_opt = call_option(headers, Some(context.build_id.clone()))?; + + let action_result_response = action_cache_client + .get_action_result_async_opt(&request, call_opt) + .unwrap() + .compat() + .await; + + match action_result_response { + Ok(action_result) => { + let response = + populate_fallible_execution_result(store.clone(), &action_result, vec![], platform, false) + .compat() + .await?; + Ok(Some(response)) + } + Err(err) => match err { + grpcio::Error::RpcFailure(rpc_status) + if rpc_status.status == grpcio::RpcStatusCode::NOT_FOUND => + { + Ok(None) + } + _ => Err(rpcerror_to_string(err)), + }, + } +} + +pub async fn store_proto_locally( + store: &Store, + proto: &P, +) -> Result { + let command_bytes = proto + .write_to_bytes() + .map_err(|e| format!("Error serializing proto {:?}", e))?; + store + .store_file_bytes(Bytes::from(command_bytes), true) + .await + .map_err(|e| format!("Error saving proto to local store: {:?}", e)) +} + +pub async fn ensure_action_stored_locally( + store: &Store, + command: &Command, + action: &Action, +) -> Result<(Digest, Digest), String> { + let (command_digest, action_digest) = future::try_join( + store_proto_locally(store, command), + store_proto_locally(store, action), + ) + .await?; + + Ok((command_digest, action_digest)) +} + +pub async fn ensure_action_uploaded( + store: &Store, + command_digest: Digest, + action_digest: Digest, + input_files: Digest, +) -> Result<(), String> { + let _ = store + .ensure_remote_has_recursive(vec![command_digest, action_digest, input_files]) + .compat() + .await?; + Ok(()) +} + pub fn format_error(error: &bazel_protos::status::Status) -> String { let error_code_enum = bazel_protos::code::Code::from_i32(error.get_code()); let error_code = match error_code_enum { @@ -1350,7 +1359,7 @@ fn rpcerror_to_status_or_string( } } -fn rpcerror_to_string(error: grpcio::Error) -> String { +pub(crate) fn rpcerror_to_string(error: grpcio::Error) -> String { match error { grpcio::Error::RpcFailure(status) => format!( "{:?}: {:?}", diff --git a/src/rust/engine/process_execution/src/remote_cache.rs b/src/rust/engine/process_execution/src/remote_cache.rs new file mode 100644 index 00000000000..850c21c08c2 --- /dev/null +++ b/src/rust/engine/process_execution/src/remote_cache.rs @@ -0,0 +1,451 @@ +use std::collections::{BTreeMap, VecDeque}; +use std::convert::TryInto; +use std::path::Component; +use std::sync::Arc; + +use async_trait::async_trait; +use bazel_protos::call_option; +use bazel_protos::remote_execution::{ + ActionResult, Command, FileNode, Tree, UpdateActionResultRequest, +}; +use fs::RelativePath; +use futures::compat::Future01CompatExt; +use hashing::Digest; +use store::Store; +use workunit_store::{with_workunit, Level, WorkunitMetadata}; + +use crate::remote::make_execute_request; +use crate::{ + Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, Platform, Process, + ProcessMetadata, +}; + +/// This `CommandRunner` implementation caches results remotely using the Action Cache service +/// of the Remote Execution API. +/// +/// This runner expects to sit between the local cache CommandRunner and the CommandRunner +/// that is actually executing the Process. Thus, the local cache will be checked first, +/// then the remote cache, and then execution (local or remote) as necessary if neither cache +/// has a hit. On the way back out of the stack, the result will be stored remotely and +/// then locally. +#[derive(Clone)] +pub struct CommandRunner { + underlying: Arc, + metadata: ProcessMetadata, + store: Store, + action_cache_client: Arc, + headers: BTreeMap, + platform: Platform, + cache_read: bool, + cache_write: bool, +} + +impl CommandRunner { + pub fn new( + underlying: Arc, + metadata: ProcessMetadata, + store: Store, + action_cache_address: &str, + root_ca_certs: Option>, + oauth_bearer_token: Option, + headers: BTreeMap, + platform: Platform, + cache_read: bool, + cache_write: bool, + ) -> Result { + let env = Arc::new(grpcio::EnvBuilder::new().build()); + let channel = { + let builder = grpcio::ChannelBuilder::new(env); + if let Some(ref root_ca_certs) = root_ca_certs { + let creds = grpcio::ChannelCredentialsBuilder::new() + .root_cert(root_ca_certs.clone()) + .build(); + builder.secure_connect(action_cache_address, creds) + } else { + builder.connect(action_cache_address) + } + }; + let action_cache_client = Arc::new( + bazel_protos::remote_execution_grpc::ActionCacheClient::new(channel), + ); + + let mut headers = headers; + if let Some(oauth_bearer_token) = oauth_bearer_token { + headers.insert( + String::from("authorization"), + format!("Bearer {}", oauth_bearer_token.trim()), + ); + } + + // Validate any configured static headers. + call_option(&headers, None)?; + + Ok(CommandRunner { + underlying, + metadata, + store, + action_cache_client, + headers, + platform, + cache_read, + cache_write, + }) + } + + /// Create a REAPI `Tree` protobuf for an output directory by traversing down from a Pants + /// merged final output directory to find the specific path to extract. (REAPI requires + /// output directories to be stored as `Tree` protos that contain all of the `Directory` + /// protos that constitute the directory tree.) + async fn make_tree_for_output_directory( + &self, + root_directory_digest: Digest, + directory_path: RelativePath, + store: &Store, + ) -> Result { + // Traverse down from the root directory digest to find the directory digest for + // the output directory. + let mut current_directory_digest = root_directory_digest; + for next_path_component in directory_path.as_ref().components() { + let next_name = match next_path_component { + Component::Normal(name) => name + .to_str() + .ok_or_else(|| format!("unable to convert '{:?}' to string", name))?, + _ => return Err("illegal state: unexpected path component in relative path".into()), + }; + + // Load the Directory proto corresponding to `current_directory_digest`. + let current_directory = match store.load_directory(current_directory_digest).await? { + Some((dir, _)) => dir, + None => { + return Err(format!( + "illegal state: directory for digest {:?} did not exist locally", + ¤t_directory_digest + )) + } + }; + + // Scan the current directory for the current path component. + let dir_node = match current_directory + .directories + .iter() + .find(|dn| dn.name == next_name) + { + Some(dn) => dn, + None => { + return Err(format!( + "unable to find path component {:?} in directory", + next_name + )) + } + }; + + // Set the current directory digest to be the digest in the DirectoryNode just found. + // If there are more path components, then the search will continue there. + // Otherwise, if this loop ends then the final Directory digest has been found. + current_directory_digest = dir_node.get_digest().try_into()?; + } + + // At this point, `current_directory_digest` holds the digest of the output directory. + // This will be the root of the Tree. Add it to a queue of digests to traverse. + let mut tree = Tree::new(); + + let mut digest_queue = VecDeque::new(); + digest_queue.push_back(current_directory_digest); + + while let Some(directory_digest) = digest_queue.pop_front() { + let directory = match store.load_directory(directory_digest).await? { + Some((dir, _)) => dir, + None => { + return Err(format!( + "illegal state: directory for digest {:?} did not exist locally", + ¤t_directory_digest + )) + } + }; + + // Add all of the digests for subdirectories into the queue so they are processed + // in future iterations of the loop. + for subdirectory_node in &directory.directories { + digest_queue.push_back(subdirectory_node.get_digest().try_into()?); + } + + // Store this directory either as the `root` or one of the `children` if not the root. + if directory_digest == current_directory_digest { + tree.set_root(directory); + } else { + tree.mut_children().push(directory) + } + } + + Ok(tree) + } + + async fn extract_output_file( + &self, + root_directory_digest: Digest, + file_path: RelativePath, + store: &Store, + ) -> Result { + // Traverse down from the root directory digest to find the directory digest for + // the output directory. + let mut current_directory_digest = root_directory_digest; + let parent_path = file_path.as_ref().parent(); + let components_opt = parent_path.map(|x| x.components()); + if let Some(components) = components_opt { + for next_path_component in components { + let next_name = match next_path_component { + Component::Normal(name) => name + .to_str() + .ok_or_else(|| format!("unable to convert '{:?}' to string", name))?, + _ => return Err("illegal state: unexpected path component in relative path".into()), + }; + + // Load the Directory proto corresponding to `current_directory_digest`. + let current_directory = match store.load_directory(current_directory_digest).await? { + Some((dir, _)) => dir, + None => { + return Err(format!( + "illegal state: directory for digest {:?} did not exist locally", + ¤t_directory_digest + )) + } + }; + + // Scan the current directory for the current path component. + let dir_node = match current_directory + .directories + .iter() + .find(|dn| dn.name == next_name) + { + Some(dn) => dn, + None => { + return Err(format!( + "unable to find path component {:?} in directory", + next_name + )) + } + }; + + // Set the current directory digest to be the digest in the DirectoryNode just found. + // If there are more path components, then the search will continue there. + // Otherwise, if this loop ends then the final Directory digest has been found. + current_directory_digest = dir_node.get_digest().try_into()?; + } + } + + // Load the final directory. + let directory = match store.load_directory(current_directory_digest).await? { + Some((dir, _)) => dir, + None => { + return Err(format!( + "illegal state: directory for digest {:?} did not exist locally", + ¤t_directory_digest + )) + } + }; + + // Search for the file. + let file_base_name = file_path.as_ref().file_name().unwrap(); + directory + .files + .iter() + .find(|n| n.get_name() == file_base_name) + .cloned() + .ok_or_else(|| format!("file {:?} did not exist did not exist locally", file_path)) + } + + async fn make_action_result( + &self, + command: &Command, + result: &FallibleProcessResultWithPlatform, + store: &Store, + ) -> Result { + let mut action_result = ActionResult::new(); + action_result.set_exit_code(result.exit_code); + + action_result.set_stdout_digest(result.stdout_digest.into()); + action_result.set_stderr_digest(result.stderr_digest.into()); + + let mut tree_digests = Vec::new(); + for output_directory in &command.output_directories { + let tree = self + .make_tree_for_output_directory( + result.output_directory, + RelativePath::new(output_directory).unwrap(), + store, + ) + .await?; + + let tree_digest = crate::remote::store_proto_locally(&self.store, &tree).await?; + tree_digests.push(tree_digest); + + action_result.mut_output_directories().push({ + let mut directory = bazel_protos::remote_execution::OutputDirectory::new(); + directory.set_path(String::new()); + directory.set_tree_digest(tree_digest.into()); + directory + }); + } + + store + .ensure_remote_has_recursive(tree_digests) + .compat() + .await?; + + let mut file_digests = Vec::new(); + for output_file in &command.output_files { + let file_node = self + .extract_output_file( + result.output_directory, + RelativePath::new(output_file).unwrap(), + store, + ) + .await?; + + file_digests.push(file_node.get_digest().try_into()?); + + action_result.mut_output_files().push({ + let mut file = bazel_protos::remote_execution::OutputFile::new(); + let digest: Digest = file_node.get_digest().try_into()?; + file.set_digest(digest.into()); + file.set_is_executable(file_node.get_is_executable()); + file.set_path(output_file.to_owned()); + file + }) + } + + store + .ensure_remote_has_recursive(file_digests) + .compat() + .await?; + + Ok(action_result) + } + + /// Stores an execution result into the remote Action Cache. + async fn update_action_cache( + &self, + context: &Context, + request: &Process, + result: &FallibleProcessResultWithPlatform, + metadata: &ProcessMetadata, + command: &Command, + action_digest: Digest, + command_digest: Digest, + ) -> Result<(), String> { + // Upload the action (and related data, i.e. the embedded command and input files). + // Assumption: The Action and related data has already been stored locally. + with_workunit( + context.workunit_store.clone(), + "ensure_action_uploaded".to_owned(), + WorkunitMetadata::with_level(Level::Debug), + crate::remote::ensure_action_uploaded( + &self.store, + command_digest, + action_digest, + request.input_files, + ), + |_, md| md, + ) + .await?; + + // Create an ActionResult from the process result. + let action_result = self + .make_action_result(command, result, &self.store) + .await?; + + let mut update_action_cache_request = UpdateActionResultRequest::new(); + if let Some(ref instance_name) = metadata.instance_name { + update_action_cache_request.set_instance_name(instance_name.clone()); + } + update_action_cache_request.set_action_digest(action_digest.into()); + update_action_cache_request.set_action_result(action_result); + + let call_opt = call_option(&self.headers, Some(context.build_id.clone()))?; + + self + .action_cache_client + .update_action_result_async_opt(&update_action_cache_request, call_opt) + .unwrap() + .compat() + .await + .map_err(crate::remote::rpcerror_to_string)?; + + Ok(()) + } +} + +#[async_trait] +impl crate::CommandRunner for CommandRunner { + async fn run( + &self, + req: MultiPlatformProcess, + context: Context, + ) -> Result { + // Construct the REv2 ExecuteRequest and related data for this execution request. + let request = self.extract_compatible_request(&req).unwrap(); + let (action, command, _execute_request) = + make_execute_request(&request, self.metadata.clone())?; + + // Ensure the action and command are stored locally. + let (command_digest, action_digest) = with_workunit( + context.workunit_store.clone(), + "ensure_action_stored_locally".to_owned(), + WorkunitMetadata::with_level(Level::Debug), + crate::remote::ensure_action_stored_locally(&self.store, &command, &action), + |_, md| md, + ) + .await?; + + // Check the remote Action Cache to see if this request was already computed. + // If so, return immediately with the result. + if self.cache_read { + let cached_response_opt = with_workunit( + context.workunit_store.clone(), + "check_action_cache".to_owned(), + WorkunitMetadata::with_level(Level::Debug), + crate::remote::check_action_cache( + action_digest, + &self.metadata, + self.platform, + &context, + self.action_cache_client.clone(), + &self.headers, + self.store.clone(), + ), + |_, md| md, + ) + .await?; + log::debug!( + "action cache response: digest={:?}: {:?}", + action_digest, + cached_response_opt + ); + if let Some(cached_response) = cached_response_opt { + return Ok(cached_response); + } + } + + let result = self.underlying.run(req, context.clone()).await?; + if result.exit_code == 0 && self.cache_write { + // Store the result in the remote cache if not the product of a remote execution. + self + .update_action_cache( + &context, + &request, + &result, + &self.metadata, + &command, + action_digest, + command_digest, + ) + .await?; + } + + Ok(result) + } + + fn extract_compatible_request(&self, req: &MultiPlatformProcess) -> Option { + self.underlying.extract_compatible_request(req) + } +} diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs new file mode 100644 index 00000000000..04c7aa51a52 --- /dev/null +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -0,0 +1,214 @@ +use std::collections::BTreeMap; +use std::convert::TryInto; +use std::io::Write; +use std::path::PathBuf; +use std::time::Duration; + +use futures::compat::Future01CompatExt; +use mock::{StubActionCache, StubCAS}; +use store::{BackoffConfig, Store}; +use tempfile::TempDir; +use testutil::data::TestData; +use testutil::relative_paths; +use tokio::runtime::Handle; +use workunit_store::WorkunitStore; + +use crate::{ + CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, NamedCaches, + Platform, Process, ProcessMetadata, +}; + +struct RoundtripResults { + uncached: Result, + maybe_cached: Result, +} + +fn create_local_runner() -> (Box, Store, TempDir, StubCAS) { + let runtime = task_executor::Executor::new(Handle::current()); + let base_dir = TempDir::new().unwrap(); + let named_cache_dir = base_dir.path().join("named_cache_dir"); + let stub_cas = StubCAS::builder().build(); + let store_dir = base_dir.path().join("store_dir"); + let store = Store::with_remote( + runtime.clone(), + store_dir, + vec![stub_cas.address()], + None, + None, + None, + 1, + 10 * 1024 * 1024, + Duration::from_secs(1), + BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(), + 1, + 1, + ) + .unwrap(); + let runner = Box::new(crate::local::CommandRunner::new( + store.clone(), + runtime.clone(), + base_dir.path().to_owned(), + NamedCaches::new(named_cache_dir), + true, + )); + (runner, store, base_dir, stub_cas) +} + +fn create_cached_runner( + local: Box, + store: Store, +) -> (Box, TempDir, StubActionCache) { + let cache_dir = TempDir::new().unwrap(); + + let metadata = ProcessMetadata { + instance_name: None, + cache_key_gen_version: None, + platform_properties: vec![], + }; + + let action_cache = StubActionCache::new().unwrap(); + + let runner = Box::new( + crate::remote_cache::CommandRunner::new( + local.into(), + metadata, + store, + &action_cache.address(), + None, + None, + BTreeMap::default(), + Platform::current().unwrap(), + true, + true, + ) + .expect("caching command runner"), + ); + + (runner, cache_dir, action_cache) +} + +fn create_script(script_exit_code: i8) -> (Process, PathBuf, TempDir) { + let script_dir = TempDir::new().unwrap(); + let script_path = script_dir.path().join("script"); + std::fs::File::create(&script_path) + .and_then(|mut file| { + writeln!( + file, + "echo -n {} > roland && echo Hello && echo >&2 World; exit {}", + TestData::roland().string(), + script_exit_code + ) + }) + .unwrap(); + + let process = Process::new(vec![ + testutil::path::find_bash(), + format!("{}", script_path.display()), + ]) + .output_files(relative_paths(&["roland"]).collect()); + + (process, script_path, script_dir) +} + +async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults { + let (local, store, _local_runner_dir, _stub_cas) = create_local_runner(); + let (process, script_path, _script_dir) = create_script(script_exit_code); + + let local_result = local.run(process.clone().into(), Context::default()).await; + + let (caching, _cache_dir, _stub_action_cache) = create_cached_runner(local, store.clone()); + + let uncached_result = caching + .run(process.clone().into(), Context::default()) + .await; + + assert_eq!(local_result, uncached_result); + + // Removing the file means that were the command to be run again without any caching, it would + // fail due to a FileNotFound error. So, If the second run succeeds, that implies that the + // cache was successfully used. + std::fs::remove_file(&script_path).unwrap(); + let maybe_cached_result = caching.run(process.into(), Context::default()).await; + + RoundtripResults { + uncached: uncached_result, + maybe_cached: maybe_cached_result, + } +} + +#[tokio::test] +async fn cache_success() { + let workunit_store = WorkunitStore::new(false); + workunit_store.init_thread_state(None); + + let results = run_roundtrip(0).await; + assert_eq!(results.uncached, results.maybe_cached); +} + +#[tokio::test] +async fn failures_not_cached() { + let workunit_store = WorkunitStore::new(false); + workunit_store.init_thread_state(None); + + let results = run_roundtrip(1).await; + assert_ne!(results.uncached, results.maybe_cached); + assert_eq!(results.uncached.unwrap().exit_code, 1); + assert_eq!(results.maybe_cached.unwrap().exit_code, 127); // aka the return code for file not found +} + +#[tokio::test] +async fn recover_from_missing_store_contents() { + env_logger::init(); + let workunit_store = WorkunitStore::new(false); + workunit_store.init_thread_state(None); + + let (local, store, _local_runner_dir, _stub_cas) = create_local_runner(); + let (caching, _cache_dir, _stub_action_cache) = create_cached_runner(local, store.clone()); + let (process, _script_path, _script_dir) = create_script(0); + + // Run once to cache the process. + let first_result = caching + .run(process.clone().into(), Context::default()) + .await + .unwrap(); + + // Delete the first child of the output directory parent to confirm that we ensure that more + // than just the root of the output is present when hitting the cache. + { + let output_dir_digest = first_result.output_directory; + let (output_dir, _) = store + .load_directory(output_dir_digest) + .await + .unwrap() + .unwrap(); + let output_child_digest = output_dir + .get_files() + .first() + .unwrap() + .get_digest() + .try_into() + .unwrap(); + let removed = store.remove_file(output_child_digest).await.unwrap(); + assert!(removed); + let result = store + .contents_for_directory(output_dir_digest) + .compat() + .await; + log::info!("{:?}", &result); + assert!(result.err().is_some()); + } + + // Ensure that we don't fail if we re-run. + let second_result = caching + .run(process.clone().into(), Context::default()) + .await + .unwrap(); + + // And that the entire output directory can be loaded. + assert!(store + .contents_for_directory(second_result.output_directory) + .compat() + .await + .ok() + .is_some()) +} diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 7107852d35c..40a5378f5d9 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -96,6 +96,8 @@ pub struct ExecutionStrategyOptions { pub speculation_strategy: String, pub use_local_cache: bool, pub local_enable_nailgun: bool, + pub remote_cache_read: bool, + pub remote_cache_write: bool, } impl Core { diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index 9028b76567b..a6d11e7692b 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -474,7 +474,9 @@ py_class!(class PyExecutionStrategyOptions |py| { speculation_delay: f64, speculation_strategy: String, use_local_cache: bool, - local_enable_nailgun: bool + local_enable_nailgun: bool, + remote_cache_read: bool, + remote_cache_write: bool ) -> CPyResult { Self::create_instance(py, ExecutionStrategyOptions { @@ -485,6 +487,8 @@ py_class!(class PyExecutionStrategyOptions |py| { speculation_strategy, use_local_cache, local_enable_nailgun, + remote_cache_read, + remote_cache_write, } ) } diff --git a/src/rust/engine/testutil/mock/src/action_cache.rs b/src/rust/engine/testutil/mock/src/action_cache.rs new file mode 100644 index 00000000000..503fbfbe9a0 --- /dev/null +++ b/src/rust/engine/testutil/mock/src/action_cache.rs @@ -0,0 +1,141 @@ +// Copyright 2020 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). + +#![deny(warnings)] +// Enable all clippy lints except for many of the pedantic ones. It's a shame this needs to be copied and pasted across crates, but there doesn't appear to be a way to include inner attributes from a common source. +#![deny( +clippy::all, +clippy::default_trait_access, +clippy::expl_impl_clone_on_copy, +clippy::if_not_else, +clippy::needless_continue, +clippy::unseparated_literal_suffix, +// TODO: Falsely triggers for async/await: +// see https://github.com/rust-lang/rust-clippy/issues/5360 +// clippy::used_underscore_binding +)] +// It is often more clear to show that nothing is being moved. +#![allow(clippy::match_ref_pats)] +// Subjective style. +#![allow( +clippy::len_without_is_empty, +clippy::redundant_field_names, +clippy::too_many_arguments +)] +// Default isn't as big a deal as people seem to think it is. +#![allow(clippy::new_without_default, clippy::new_ret_no_self)] +// Arc can be more clear than needing to grok Orderings: +#![allow(clippy::mutex_atomic)] + +use std::collections::HashMap; +use std::sync::Arc; + +use bazel_protos::remote_execution::{ + ActionResult, GetActionResultRequest, UpdateActionResultRequest, +}; +use grpcio::{RpcContext, UnarySink}; +use hashing::{Digest, Fingerprint}; +use parking_lot::Mutex; +use std::convert::TryInto; + +pub struct StubActionCache { + server_transport: grpcio::Server, + pub action_map: Arc>>, +} + +#[derive(Clone)] +struct ActionCacheResponder { + action_map: Arc>>, +} + +impl bazel_protos::remote_execution_grpc::ActionCache for ActionCacheResponder { + fn get_action_result( + &self, + _: RpcContext<'_>, + req: GetActionResultRequest, + sink: UnarySink, + ) { + let action_digest: Digest = match req.get_action_digest().try_into() { + Ok(digest) => digest, + Err(_) => { + sink.fail(grpcio::RpcStatus::new( + grpcio::RpcStatusCode::INTERNAL, + Some("Unable to extract action_digest.".to_owned()), + )); + return; + } + }; + + let action_map = self.action_map.lock(); + let action_result = match action_map.get(&action_digest.0) { + Some(ar) => ar.clone(), + None => { + sink.fail(grpcio::RpcStatus::new( + grpcio::RpcStatusCode::NOT_FOUND, + Some(format!( + "ActionResult for Action {:?} does not exist", + action_digest + )), + )); + return; + } + }; + + sink.success(action_result); + } + + fn update_action_result( + &self, + _: RpcContext<'_>, + req: UpdateActionResultRequest, + sink: UnarySink, + ) { + let action_digest: Digest = match req.get_action_digest().try_into() { + Ok(digest) => digest, + Err(_) => { + sink.fail(grpcio::RpcStatus::new( + grpcio::RpcStatusCode::INTERNAL, + Some("Unable to extract action_digest.".to_owned()), + )); + return; + } + }; + + let mut action_map = self.action_map.lock(); + action_map.insert(action_digest.0, req.get_action_result().clone()); + + sink.success(req.get_action_result().clone()); + } +} + +impl StubActionCache { + pub fn new() -> Result { + let action_map = Arc::new(Mutex::new(HashMap::new())); + let responder = ActionCacheResponder { + action_map: action_map.clone(), + }; + + let env = Arc::new(grpcio::Environment::new(1)); + let mut server_transport = grpcio::ServerBuilder::new(env) + .register_service(bazel_protos::remote_execution_grpc::create_action_cache( + responder, + )) + .bind("127.0.0.1", 0) + .build() + .unwrap(); + server_transport.start(); + + Ok(StubActionCache { + server_transport, + action_map, + }) + } + + /// + /// The address on which this server is listening over insecure HTTP transport. + /// + pub fn address(&self) -> String { + let bind_addr = self.server_transport.bind_addrs().next().unwrap(); + format!("{}:{}", bind_addr.0, bind_addr.1) + } +} diff --git a/src/rust/engine/testutil/mock/src/lib.rs b/src/rust/engine/testutil/mock/src/lib.rs index bd9a7428b34..369f69741f5 100644 --- a/src/rust/engine/testutil/mock/src/lib.rs +++ b/src/rust/engine/testutil/mock/src/lib.rs @@ -27,7 +27,10 @@ // Arc can be more clear than needing to grok Orderings: #![allow(clippy::mutex_atomic)] +mod action_cache; mod cas; -pub use crate::cas::StubCAS; pub mod execution_server; + +pub use crate::action_cache::StubActionCache; +pub use crate::cas::StubCAS; pub use crate::execution_server::MockExecution; From 62f7206c7986515998c5bead6c9abec8035fb48f Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Wed, 21 Oct 2020 16:50:31 -0700 Subject: [PATCH 2/9] load remote caching command runner [ci skip-build-wheels] --- src/rust/engine/src/context.rs | 40 ++++++++++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 40a5378f5d9..d8103f2ff54 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -194,6 +194,7 @@ impl Core { fn make_command_runner( store: &Store, + remote_store_servers: &[String], executor: &Executor, local_execution_root_dir: &Path, named_caches_dir: &Path, @@ -204,6 +205,14 @@ impl Core { exec_strategy_opts: &ExecutionStrategyOptions, remoting_opts: &RemotingOptions, ) -> Result, String> { + if (exec_strategy_opts.remote_cache_read || exec_strategy_opts.remote_cache_write) + && remoting_opts.execution_enable + { + return Err( + "Remote caching mode and remote execution mode cannot be enabled concurrently".into(), + ); + } + let local_command_runner = Core::make_local_execution_runner( store, executor, @@ -245,7 +254,29 @@ impl Core { local_command_runner }; - let maybe_cached_command_runner = if exec_strategy_opts.use_local_cache { + let maybe_remote_cached_command_runner = + if exec_strategy_opts.remote_cache_read || exec_strategy_opts.remote_cache_write { + let action_cache_address = remote_store_servers + .first() + .ok_or_else(|| "at least one remote store must be specified".to_owned())?; + + Box::new(process_execution::remote_cache::CommandRunner::new( + command_runner.into(), + process_execution_metadata.clone(), + store.clone(), + action_cache_address.as_str(), + root_ca_certs.clone(), + oauth_bearer_token.clone(), + remoting_opts.execution_headers.clone(), + Platform::current()?, + exec_strategy_opts.remote_cache_read, + exec_strategy_opts.remote_cache_write, + )?) + } else { + command_runner + }; + + let maybe_local_cached_command_runner = if exec_strategy_opts.use_local_cache { let process_execution_store = ShardedLmdb::new( local_store_dir.join("processes"), 5 * GIGABYTES, @@ -254,16 +285,16 @@ impl Core { ) .map_err(|err| format!("Could not initialize store for process cache: {:?}", err))?; Box::new(process_execution::cache::CommandRunner::new( - command_runner.into(), + maybe_remote_cached_command_runner.into(), process_execution_store, store.clone(), process_execution_metadata.clone(), )) } else { - command_runner + maybe_remote_cached_command_runner }; - Ok(maybe_cached_command_runner) + Ok(maybe_local_cached_command_runner) } fn load_certificates( @@ -369,6 +400,7 @@ impl Core { let command_runner = Core::make_command_runner( &store, + &remote_store_servers, &executor, &local_execution_root_dir, &named_caches_dir, From 845b17373dae4fd8f54e756bcbc16cade6415c1a Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Wed, 21 Oct 2020 17:15:33 -0700 Subject: [PATCH 3/9] fix test compile [ci skip-build-wheels] --- src/rust/engine/process_execution/src/remote_cache_tests.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index 04c7aa51a52..6ef588234f5 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -10,7 +10,6 @@ use store::{BackoffConfig, Store}; use tempfile::TempDir; use testutil::data::TestData; use testutil::relative_paths; -use tokio::runtime::Handle; use workunit_store::WorkunitStore; use crate::{ @@ -24,7 +23,7 @@ struct RoundtripResults { } fn create_local_runner() -> (Box, Store, TempDir, StubCAS) { - let runtime = task_executor::Executor::new(Handle::current()); + let runtime = task_executor::Executor::new(); let base_dir = TempDir::new().unwrap(); let named_cache_dir = base_dir.path().join("named_cache_dir"); let stub_cas = StubCAS::builder().build(); From 414e7805da93e2e8e861481a373c5a75a152fb8b Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Wed, 21 Oct 2020 20:25:11 -0700 Subject: [PATCH 4/9] skip using cache on errors [ci skip-build-wheels] --- .../process_execution/src/remote_cache.rs | 35 +++++++----- .../src/remote_cache_tests.rs | 53 ++++--------------- .../engine/testutil/mock/src/action_cache.rs | 16 +++++- 3 files changed, 47 insertions(+), 57 deletions(-) diff --git a/src/rust/engine/process_execution/src/remote_cache.rs b/src/rust/engine/process_execution/src/remote_cache.rs index 850c21c08c2..68a0e975956 100644 --- a/src/rust/engine/process_execution/src/remote_cache.rs +++ b/src/rust/engine/process_execution/src/remote_cache.rs @@ -400,7 +400,7 @@ impl crate::CommandRunner for CommandRunner { // Check the remote Action Cache to see if this request was already computed. // If so, return immediately with the result. if self.cache_read { - let cached_response_opt = with_workunit( + let response = with_workunit( context.workunit_store.clone(), "check_action_cache".to_owned(), WorkunitMetadata::with_level(Level::Debug), @@ -415,21 +415,29 @@ impl crate::CommandRunner for CommandRunner { ), |_, md| md, ) - .await?; - log::debug!( - "action cache response: digest={:?}: {:?}", - action_digest, - cached_response_opt - ); - if let Some(cached_response) = cached_response_opt { - return Ok(cached_response); - } + .await; + match response { + Ok(cached_response_opt) => { + log::debug!( + "remote cache response: digest={:?}: {:?}", + action_digest, + cached_response_opt + ); + + if let Some(cached_response) = cached_response_opt { + return Ok(cached_response); + } + } + Err(err) => { + log::warn!("failed to read from remote cache: {}", err); + } + }; } let result = self.underlying.run(req, context.clone()).await?; if result.exit_code == 0 && self.cache_write { // Store the result in the remote cache if not the product of a remote execution. - self + if let Err(err) = self .update_action_cache( &context, &request, @@ -439,7 +447,10 @@ impl crate::CommandRunner for CommandRunner { action_digest, command_digest, ) - .await?; + .await + { + log::warn!("failed to update remote cache: {}", err) + } } Ok(result) diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index 6ef588234f5..d6c8f31f126 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -1,10 +1,9 @@ use std::collections::BTreeMap; -use std::convert::TryInto; use std::io::Write; use std::path::PathBuf; +use std::sync::atomic::Ordering; use std::time::Duration; -use futures::compat::Future01CompatExt; use mock::{StubActionCache, StubCAS}; use store::{BackoffConfig, Store}; use tempfile::TempDir; @@ -156,58 +155,24 @@ async fn failures_not_cached() { } #[tokio::test] -async fn recover_from_missing_store_contents() { +async fn skip_cache_on_error() { env_logger::init(); let workunit_store = WorkunitStore::new(false); workunit_store.init_thread_state(None); let (local, store, _local_runner_dir, _stub_cas) = create_local_runner(); - let (caching, _cache_dir, _stub_action_cache) = create_cached_runner(local, store.clone()); + let (caching, _cache_dir, stub_action_cache) = create_cached_runner(local, store.clone()); let (process, _script_path, _script_dir) = create_script(0); - // Run once to cache the process. - let first_result = caching - .run(process.clone().into(), Context::default()) - .await - .unwrap(); + stub_action_cache + .always_errors + .store(true, Ordering::SeqCst); - // Delete the first child of the output directory parent to confirm that we ensure that more - // than just the root of the output is present when hitting the cache. - { - let output_dir_digest = first_result.output_directory; - let (output_dir, _) = store - .load_directory(output_dir_digest) - .await - .unwrap() - .unwrap(); - let output_child_digest = output_dir - .get_files() - .first() - .unwrap() - .get_digest() - .try_into() - .unwrap(); - let removed = store.remove_file(output_child_digest).await.unwrap(); - assert!(removed); - let result = store - .contents_for_directory(output_dir_digest) - .compat() - .await; - log::info!("{:?}", &result); - assert!(result.err().is_some()); - } - - // Ensure that we don't fail if we re-run. - let second_result = caching + // Run once to ensure the cache is skipped on errors. + let result = caching .run(process.clone().into(), Context::default()) .await .unwrap(); - // And that the entire output directory can be loaded. - assert!(store - .contents_for_directory(second_result.output_directory) - .compat() - .await - .ok() - .is_some()) + assert_eq!(result.exit_code, 0); } diff --git a/src/rust/engine/testutil/mock/src/action_cache.rs b/src/rust/engine/testutil/mock/src/action_cache.rs index 503fbfbe9a0..f43c0acbcca 100644 --- a/src/rust/engine/testutil/mock/src/action_cache.rs +++ b/src/rust/engine/testutil/mock/src/action_cache.rs @@ -28,6 +28,8 @@ clippy::too_many_arguments #![allow(clippy::mutex_atomic)] use std::collections::HashMap; +use std::convert::TryInto; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use bazel_protos::remote_execution::{ @@ -36,16 +38,17 @@ use bazel_protos::remote_execution::{ use grpcio::{RpcContext, UnarySink}; use hashing::{Digest, Fingerprint}; use parking_lot::Mutex; -use std::convert::TryInto; pub struct StubActionCache { server_transport: grpcio::Server, pub action_map: Arc>>, + pub always_errors: Arc, } #[derive(Clone)] struct ActionCacheResponder { action_map: Arc>>, + always_errors: Arc, } impl bazel_protos::remote_execution_grpc::ActionCache for ActionCacheResponder { @@ -55,6 +58,14 @@ impl bazel_protos::remote_execution_grpc::ActionCache for ActionCacheResponder { req: GetActionResultRequest, sink: UnarySink, ) { + if self.always_errors.load(Ordering::SeqCst) { + sink.fail(grpcio::RpcStatus::new( + grpcio::RpcStatusCode::UNAVAILABLE, + Some("unavailable".to_owned()), + )); + return; + } + let action_digest: Digest = match req.get_action_digest().try_into() { Ok(digest) => digest, Err(_) => { @@ -111,8 +122,10 @@ impl bazel_protos::remote_execution_grpc::ActionCache for ActionCacheResponder { impl StubActionCache { pub fn new() -> Result { let action_map = Arc::new(Mutex::new(HashMap::new())); + let always_errors = Arc::new(AtomicBool::new(false)); let responder = ActionCacheResponder { action_map: action_map.clone(), + always_errors: always_errors.clone(), }; let env = Arc::new(grpcio::Environment::new(1)); @@ -128,6 +141,7 @@ impl StubActionCache { Ok(StubActionCache { server_transport, action_map, + always_errors, }) } From b27268a9b95a6110c99cafff666cfc0530542305 Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Wed, 21 Oct 2020 21:06:42 -0700 Subject: [PATCH 5/9] add test for extracting trees [ci skip-build-wheels] --- .../process_execution/src/remote_cache.rs | 16 ++--- .../src/remote_cache_tests.rs | 69 ++++++++++++++++++- 2 files changed, 74 insertions(+), 11 deletions(-) diff --git a/src/rust/engine/process_execution/src/remote_cache.rs b/src/rust/engine/process_execution/src/remote_cache.rs index 68a0e975956..958f9d3aa4f 100644 --- a/src/rust/engine/process_execution/src/remote_cache.rs +++ b/src/rust/engine/process_execution/src/remote_cache.rs @@ -96,8 +96,7 @@ impl CommandRunner { /// merged final output directory to find the specific path to extract. (REAPI requires /// output directories to be stored as `Tree` protos that contain all of the `Directory` /// protos that constitute the directory tree.) - async fn make_tree_for_output_directory( - &self, + pub(crate) async fn make_tree_for_output_directory( root_directory_digest: Digest, directory_path: RelativePath, store: &Store, @@ -268,13 +267,12 @@ impl CommandRunner { let mut tree_digests = Vec::new(); for output_directory in &command.output_directories { - let tree = self - .make_tree_for_output_directory( - result.output_directory, - RelativePath::new(output_directory).unwrap(), - store, - ) - .await?; + let tree = Self::make_tree_for_output_directory( + result.output_directory, + RelativePath::new(output_directory).unwrap(), + store, + ) + .await?; let tree_digest = crate::remote::store_proto_locally(&self.store, &tree).await?; tree_digests.push(tree_digest); diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index d6c8f31f126..1384d869768 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use std::convert::TryInto; use std::io::Write; use std::path::PathBuf; use std::sync::atomic::Ordering; @@ -7,7 +8,7 @@ use std::time::Duration; use mock::{StubActionCache, StubCAS}; use store::{BackoffConfig, Store}; use tempfile::TempDir; -use testutil::data::TestData; +use testutil::data::{TestData, TestDirectory}; use testutil::relative_paths; use workunit_store::WorkunitStore; @@ -15,6 +16,8 @@ use crate::{ CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, NamedCaches, Platform, Process, ProcessMetadata, }; +use fs::RelativePath; +use hashing::Digest; struct RoundtripResults { uncached: Result, @@ -156,7 +159,6 @@ async fn failures_not_cached() { #[tokio::test] async fn skip_cache_on_error() { - env_logger::init(); let workunit_store = WorkunitStore::new(false); workunit_store.init_thread_state(None); @@ -176,3 +178,66 @@ async fn skip_cache_on_error() { assert_eq!(result.exit_code, 0); } + +#[tokio::test] +async fn make_tree_from_directory() { + env_logger::init(); + + let store_dir = TempDir::new().unwrap(); + let executor = task_executor::Executor::new(); + let store = Store::local_only(executor.clone(), store_dir.path()).unwrap(); + + // Prepare the store to contain /pets/cats/roland. We will then extract varios pieces of it + // into Tree protos. + + store + .store_file_bytes(TestData::roland().bytes(), false) + .await + .expect("Error saving file bytes"); + store + .record_directory(&TestDirectory::containing_roland().directory(), true) + .await + .expect("Error saving directory"); + store + .record_directory(&TestDirectory::nested().directory(), true) + .await + .expect("Error saving directory"); + let directory_digest = store + .record_directory(&TestDirectory::double_nested().directory(), true) + .await + .expect("Error saving directory"); + + let tree = crate::remote_cache::CommandRunner::make_tree_for_output_directory( + directory_digest, + RelativePath::new("pets").unwrap(), + &store, + ) + .await + .unwrap(); + + let root_dir = tree.get_root(); + assert_eq!(root_dir.get_files().len(), 0); + assert_eq!(root_dir.get_directories().len(), 1); + let dir_node = &root_dir.get_directories()[0]; + assert_eq!(dir_node.get_name(), "cats"); + let dir_digest: Digest = dir_node.get_digest().try_into().unwrap(); + assert_eq!(dir_digest, TestDirectory::containing_roland().digest()); + let children = tree.get_children(); + assert_eq!(children.len(), 1); + let child_dir = &children[0]; + assert_eq!(child_dir.get_files().len(), 1); + assert_eq!(child_dir.get_directories().len(), 0); + let file_node = &child_dir.get_files()[0]; + assert_eq!(file_node.get_name(), "roland"); + let file_digest: Digest = file_node.get_digest().try_into().unwrap(); + assert_eq!(file_digest, TestData::roland().digest()); + + // Test that extracting a non-existent output directory fails. + crate::remote_cache::CommandRunner::make_tree_for_output_directory( + directory_digest, + RelativePath::new("animals").unwrap(), + &store, + ) + .await + .unwrap_err(); +} From 63d30f52a98bd3201bb86e57b2f671e38140e4a0 Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Wed, 21 Oct 2020 21:32:03 -0700 Subject: [PATCH 6/9] add test for extracting output file [ci skip-build-wheels] --- .../process_execution/src/remote_cache.rs | 16 ++++---- .../src/remote_cache_tests.rs | 37 ++++++++++++++++++- 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/src/rust/engine/process_execution/src/remote_cache.rs b/src/rust/engine/process_execution/src/remote_cache.rs index 958f9d3aa4f..4625c2c2600 100644 --- a/src/rust/engine/process_execution/src/remote_cache.rs +++ b/src/rust/engine/process_execution/src/remote_cache.rs @@ -179,8 +179,7 @@ impl CommandRunner { Ok(tree) } - async fn extract_output_file( - &self, + pub(crate) async fn extract_output_file( root_directory_digest: Digest, file_path: RelativePath, store: &Store, @@ -292,13 +291,12 @@ impl CommandRunner { let mut file_digests = Vec::new(); for output_file in &command.output_files { - let file_node = self - .extract_output_file( - result.output_directory, - RelativePath::new(output_file).unwrap(), - store, - ) - .await?; + let file_node = Self::extract_output_file( + result.output_directory, + RelativePath::new(output_file).unwrap(), + store, + ) + .await?; file_digests.push(file_node.get_digest().try_into()?); diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index 1384d869768..aac4c68be85 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -181,8 +181,6 @@ async fn skip_cache_on_error() { #[tokio::test] async fn make_tree_from_directory() { - env_logger::init(); - let store_dir = TempDir::new().unwrap(); let executor = task_executor::Executor::new(); let store = Store::local_only(executor.clone(), store_dir.path()).unwrap(); @@ -241,3 +239,38 @@ async fn make_tree_from_directory() { .await .unwrap_err(); } + +#[tokio::test] +async fn extract_output_file() { + let store_dir = TempDir::new().unwrap(); + let executor = task_executor::Executor::new(); + let store = Store::local_only(executor.clone(), store_dir.path()).unwrap(); + + // Prepare the store to contain /pets/cats/roland. We will then extract varios pieces of it + // into Tree protos. + + store + .store_file_bytes(TestData::roland().bytes(), false) + .await + .expect("Error saving file bytes"); + store + .record_directory(&TestDirectory::containing_roland().directory(), true) + .await + .expect("Error saving directory"); + let directory_digest = store + .record_directory(&TestDirectory::nested().directory(), true) + .await + .expect("Error saving directory"); + + let file_node = crate::remote_cache::CommandRunner::extract_output_file( + directory_digest, + RelativePath::new("cats/roland").unwrap(), + &store, + ) + .await + .unwrap(); + + assert_eq!(file_node.get_name(), "roland"); + let file_digest: Digest = file_node.get_digest().try_into().unwrap(); + assert_eq!(file_digest, TestData::roland().digest()); +} From c9903bb38ba31410f4e58d17ee10a13af09fb207 Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Wed, 21 Oct 2020 21:43:24 -0700 Subject: [PATCH 7/9] remove env_logger [ci skip-build-wheels] --- src/rust/engine/Cargo.lock | 30 ++++++-------------- src/rust/engine/process_execution/Cargo.toml | 1 - 2 files changed, 8 insertions(+), 23 deletions(-) diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index af6e8298832..8848f18a2f8 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -207,7 +207,7 @@ dependencies = [ "bytes 0.4.12", "clap", "dirs", - "env_logger 0.5.13", + "env_logger", "errno", "fuse", "futures 0.3.5", @@ -695,7 +695,7 @@ dependencies = [ "concrete_time", "cpython", "crossbeam-channel", - "env_logger 0.5.13", + "env_logger", "fnv", "fs", "futures 0.1.29", @@ -743,19 +743,6 @@ dependencies = [ "termcolor", ] -[[package]] -name = "env_logger" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aafcde04e90a5226a6443b7aabdb016ba2f8307c847d524724bd9b346dd1a2d3" -dependencies = [ - "atty", - "humantime", - "log 0.4.8", - "regex", - "termcolor", -] - [[package]] name = "errno" version = "0.2.4" @@ -850,7 +837,7 @@ dependencies = [ "boxfuture", "bytes 0.4.12", "clap", - "env_logger 0.5.13", + "env_logger", "fs", "futures 0.1.29", "futures 0.3.5", @@ -1080,7 +1067,7 @@ name = "graph" version = "0.0.1" dependencies = [ "async-trait", - "env_logger 0.5.13", + "env_logger", "fnv", "futures 0.3.5", "log 0.4.8", @@ -1492,7 +1479,7 @@ name = "local_cas" version = "0.0.1" dependencies = [ "clap", - "env_logger 0.5.13", + "env_logger", "mock", ] @@ -2116,7 +2103,6 @@ dependencies = [ "concrete_time", "derivative", "double-checked-cell-async", - "env_logger 0.6.2", "fs", "futures 0.1.29", "futures 0.3.5", @@ -2155,7 +2141,7 @@ version = "0.0.1" dependencies = [ "clap", "dirs", - "env_logger 0.5.13", + "env_logger", "fs", "futures 0.3.5", "hashing", @@ -2184,7 +2170,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6325275b85605f58f576456a47af44417edf5956a6f670bb59fbe12aff69597" dependencies = [ "bytes 0.4.12", - "env_logger 0.5.13", + "env_logger", "heck", "itertools 0.7.11", "log 0.4.8", @@ -2615,7 +2601,7 @@ dependencies = [ name = "rule_graph" version = "0.0.1" dependencies = [ - "env_logger 0.5.13", + "env_logger", "indexmap", "log 0.4.8", "petgraph", diff --git a/src/rust/engine/process_execution/Cargo.toml b/src/rust/engine/process_execution/Cargo.toml index df5968c9190..2dd89b6d49d 100644 --- a/src/rust/engine/process_execution/Cargo.toml +++ b/src/rust/engine/process_execution/Cargo.toml @@ -6,7 +6,6 @@ authors = [ "Pants Build " ] publish = false [dependencies] -env_logger = "0.6" async-trait = "0.1" walkdir = "2" async_semaphore = { path = "../async_semaphore" } From 33c38fc11f3003e2cda1f7cc90d0d40b0052f86e Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Thu, 22 Oct 2020 12:32:19 -0700 Subject: [PATCH 8/9] fix bug in remote store setup [ci skip-build-wheels] --- src/python/pants/option/global_options.py | 10 ++++++++++ src/rust/engine/src/context.rs | 9 ++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index d8a0564debb..6eee976c069 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -938,3 +938,13 @@ def validate_instance(cls, opts): "The `--remote-execution-server` option requires also setting " "`--remote-store-server`. Often these have the same value." ) + if opts.remote_cache_read and not opts.remote_store_server: + raise OptionsError( + "The `--remote-cache-read` option requires also setting " + "`--remote-store-server` to work properly." + ) + if opts.remote_cache_write and not opts.remote_store_server: + raise OptionsError( + "The `--remote-cache-write` option requires also setting " + "`--remote-store-server` to work properly." + ) diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index d8103f2ff54..2959013668f 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -377,13 +377,20 @@ impl Core { None }; + let need_remote_store = remoting_opts.execution_enable + || exec_strategy_opts.remote_cache_read + || exec_strategy_opts.remote_cache_write; + if need_remote_store && remote_store_servers.is_empty() { + return Err("Remote store required but none provided".into()); + } + let store = safe_create_dir_all_ioerror(&local_store_dir) .map_err(|e| format!("Error making directory {:?}: {:?}", local_store_dir, e)) .and_then(|_| { Core::make_store( &executor, &local_store_dir, - remoting_opts.execution_enable && !remote_store_servers.is_empty(), + need_remote_store, &remoting_opts, &remote_store_servers, &root_ca_certs, From 9a1a2e2e7d3c858daedf7e290ff1d726ba6de961 Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Thu, 22 Oct 2020 16:36:46 -0700 Subject: [PATCH 9/9] capitalize error messages [ci skip-build-wheels] --- .../process_execution/src/remote_cache.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/rust/engine/process_execution/src/remote_cache.rs b/src/rust/engine/process_execution/src/remote_cache.rs index 4625c2c2600..4c08a691e13 100644 --- a/src/rust/engine/process_execution/src/remote_cache.rs +++ b/src/rust/engine/process_execution/src/remote_cache.rs @@ -195,7 +195,11 @@ impl CommandRunner { Component::Normal(name) => name .to_str() .ok_or_else(|| format!("unable to convert '{:?}' to string", name))?, - _ => return Err("illegal state: unexpected path component in relative path".into()), + _ => { + return Err( + "Illegal state: Found an unexpected path component in relative path.".into(), + ) + } }; // Load the Directory proto corresponding to `current_directory_digest`. @@ -203,7 +207,7 @@ impl CommandRunner { Some((dir, _)) => dir, None => { return Err(format!( - "illegal state: directory for digest {:?} did not exist locally", + "Illegal state: The directory for digest {:?} did not exist locally.", ¤t_directory_digest )) } @@ -218,7 +222,7 @@ impl CommandRunner { Some(dn) => dn, None => { return Err(format!( - "unable to find path component {:?} in directory", + "Unable to find path component {:?} in directory.", next_name )) } @@ -236,7 +240,7 @@ impl CommandRunner { Some((dir, _)) => dir, None => { return Err(format!( - "illegal state: directory for digest {:?} did not exist locally", + "Illegal state: The directory for digest {:?} did not exist locally.", ¤t_directory_digest )) } @@ -249,7 +253,7 @@ impl CommandRunner { .iter() .find(|n| n.get_name() == file_base_name) .cloned() - .ok_or_else(|| format!("file {:?} did not exist did not exist locally", file_path)) + .ok_or_else(|| format!("File {:?} did not exist locally.", file_path)) } async fn make_action_result( @@ -425,7 +429,7 @@ impl crate::CommandRunner for CommandRunner { } } Err(err) => { - log::warn!("failed to read from remote cache: {}", err); + log::warn!("Failed to read from remote cache: {}", err); } }; } @@ -445,7 +449,7 @@ impl crate::CommandRunner for CommandRunner { ) .await { - log::warn!("failed to update remote cache: {}", err) + log::warn!("Failed to update remote cache: {}", err) } }