diff --git a/src/python/pants/engine/internals/native.py b/src/python/pants/engine/internals/native.py index 024f3db1835..951b2ca134a 100644 --- a/src/python/pants/engine/internals/native.py +++ b/src/python/pants/engine/internals/native.py @@ -260,6 +260,8 @@ def new_scheduler( speculation_strategy=execution_options.process_execution_speculation_strategy, use_local_cache=execution_options.process_execution_use_local_cache, local_enable_nailgun=execution_options.process_execution_local_enable_nailgun, + remote_cache_read=execution_options.remote_cache_read, + remote_cache_write=execution_options.remote_cache_write, ) return cast( diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index 8660dd65937..6eee976c069 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -86,6 +86,8 @@ class ExecutionOptions: remote_execution_headers: Any remote_execution_overall_deadline_secs: int process_execution_local_enable_nailgun: bool + remote_cache_read: bool + remote_cache_write: bool @classmethod def from_bootstrap_options(cls, bootstrap_options): @@ -112,6 +114,8 @@ def from_bootstrap_options(cls, bootstrap_options): remote_execution_headers=bootstrap_options.remote_execution_headers, remote_execution_overall_deadline_secs=bootstrap_options.remote_execution_overall_deadline_secs, process_execution_local_enable_nailgun=bootstrap_options.process_execution_local_enable_nailgun, + remote_cache_read=bootstrap_options.remote_cache_read, + remote_cache_write=bootstrap_options.remote_cache_write, ) @@ -138,6 +142,8 @@ def from_bootstrap_options(cls, bootstrap_options): remote_execution_headers={}, remote_execution_overall_deadline_secs=60 * 60, # one hour process_execution_local_enable_nailgun=False, + remote_cache_read=False, + remote_cache_write=False, ) @@ -759,6 +765,20 @@ def register_bootstrap_options(cls, register): help="Whether or not to use nailgun to run the requests that are marked as nailgunnable.", advanced=True, ) + register( + "--remote-cache-read", + type=bool, + default=DEFAULT_EXECUTION_OPTIONS.remote_cache_read, + advanced=True, + help="Whether to enable reading from a remote cache", + ) + register( + "--remote-cache-write", + type=bool, + default=DEFAULT_EXECUTION_OPTIONS.remote_cache_write, + advanced=True, + help="Whether to enable writing results to a remote cache", + ) @classmethod def register_options(cls, register): @@ -918,3 +938,13 @@ def validate_instance(cls, opts): "The `--remote-execution-server` option requires also setting " "`--remote-store-server`. Often these have the same value." ) + if opts.remote_cache_read and not opts.remote_store_server: + raise OptionsError( + "The `--remote-cache-read` option requires also setting " + "`--remote-store-server` to work properly." + ) + if opts.remote_cache_write and not opts.remote_store_server: + raise OptionsError( + "The `--remote-cache-write` option requires also setting " + "`--remote-store-server` to work properly." + ) diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index 9dd1cb559ed..f11491b7128 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -57,6 +57,10 @@ pub mod remote; #[cfg(test)] pub mod remote_tests; +pub mod remote_cache; +#[cfg(test)] +mod remote_cache_tests; + pub mod speculate; #[cfg(test)] mod speculate_tests; diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index b9188621725..2118d1f5500 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -33,6 +33,7 @@ use crate::{ Context, ExecutionStats, FallibleProcessResultWithPlatform, MultiPlatformProcess, Platform, PlatformConstraint, Process, ProcessMetadata, }; +use bazel_protos::remote_execution_grpc::ActionCacheClient; // Environment variable which is exclusively used for cache key invalidation. // This may be not specified in an Process, and may be populated only by the @@ -183,17 +184,6 @@ impl CommandRunner { self.platform } - async fn store_proto_locally(&self, proto: &P) -> Result { - let command_bytes = proto - .write_to_bytes() - .map_err(|e| format!("Error serializing proto {:?}", e))?; - self - .store - .store_file_bytes(Bytes::from(command_bytes), true) - .await - .map_err(|e| format!("Error saving proto to local store: {:?}", e)) - } - async fn get_capabilities( &self, ) -> Result<&bazel_protos::remote_execution::ServerCapabilities, String> { @@ -218,85 +208,6 @@ impl CommandRunner { .await } - /// Check the remote Action Cache for a cached result of running the given `action_digest`. - /// - /// This check is necessary because some RE servers do not short-circuit the Execute method - /// by checking the Action Cache (e.g., BuildBarn). Thus, this client must check the cache - /// explicitly in order to avoid duplicating already-cached work. This behavior matches - /// the Bazel RE client. - async fn check_action_cache( - &self, - action_digest: Digest, - metadata: &ProcessMetadata, - context: &Context, - ) -> Result, String> { - let mut request = bazel_protos::remote_execution::GetActionResultRequest::new(); - if let Some(ref instance_name) = metadata.instance_name { - request.set_instance_name(instance_name.clone()); - } - request.set_action_digest(action_digest.into()); - - let call_opt = call_option(&self.headers, Some(context.build_id.clone()))?; - - let action_result_response = self - .action_cache_client - .get_action_result_async_opt(&request, call_opt) - .unwrap() - .compat() - .await; - - match action_result_response { - Ok(action_result) => { - let response = populate_fallible_execution_result( - self.store.clone(), - &action_result, - vec![], - self.platform, - false, - ) - .compat() - .await?; - Ok(Some(response)) - } - Err(err) => match err { - grpcio::Error::RpcFailure(rpc_status) - if rpc_status.status == grpcio::RpcStatusCode::NOT_FOUND => - { - Ok(None) - } - _ => Err(rpcerror_to_string(err)), - }, - } - } - - async fn ensure_action_stored_locally( - &self, - command: &Command, - action: &Action, - ) -> Result<(Digest, Digest), String> { - let (command_digest, action_digest) = future::try_join( - self.store_proto_locally(command), - self.store_proto_locally(action), - ) - .await?; - - Ok((command_digest, action_digest)) - } - - async fn ensure_action_uploaded( - &self, - store: &Store, - command_digest: Digest, - action_digest: Digest, - input_files: Digest, - ) -> Result<(), String> { - let _ = store - .ensure_remote_has_recursive(vec![command_digest, action_digest, input_files]) - .compat() - .await?; - Ok(()) - } - // Monitors the operation stream returned by the REv2 Execute and WaitExecution methods. // Outputs progress reported by the server and returns the next actionable operation // or gRPC status back to the main loop (plus the operation name so the main loop can @@ -797,7 +708,7 @@ impl crate::CommandRunner for CommandRunner { context.workunit_store.clone(), "ensure_action_stored_locally".to_owned(), WorkunitMetadata::with_level(Level::Debug), - self.ensure_action_stored_locally(&command, &action), + ensure_action_stored_locally(&self.store, &command, &action), |_, md| md, ) .await?; @@ -808,7 +719,15 @@ impl crate::CommandRunner for CommandRunner { context.workunit_store.clone(), "check_action_cache".to_owned(), WorkunitMetadata::with_level(Level::Debug), - self.check_action_cache(action_digest, &self.metadata, &context), + check_action_cache( + action_digest, + &self.metadata, + self.platform, + &context, + self.action_cache_client.clone(), + &self.headers, + self.store.clone(), + ), |_, md| md, ) .await?; @@ -825,7 +744,7 @@ impl crate::CommandRunner for CommandRunner { context.workunit_store.clone(), "ensure_action_uploaded".to_owned(), WorkunitMetadata::with_level(Level::Debug), - self.ensure_action_uploaded(&store, command_digest, action_digest, request.input_files), + ensure_action_uploaded(&store, command_digest, action_digest, request.input_files), |_, md| md, ) .await?; @@ -1213,6 +1132,7 @@ pub fn extract_output_files( // Retrieve the Tree proto and hash its root `Directory` proto to obtain the digest // of the output directory needed to construct the series of `Directory` protos needed // for the final merge of the output directories. + log::info!("processing dir={:?}", dir); let tree_digest: Digest = dir.get_tree_digest().try_into()?; let root_digest_opt = store.load_tree_from_remote(tree_digest).await?; let root_digest = root_digest_opt @@ -1246,6 +1166,7 @@ pub fn extract_output_files( .get_output_files() .iter() .map(|output_file| { + log::info!("processing output_file={:?}", output_file); let output_file_path_buf = PathBuf::from(output_file.get_path()); let digest: Result = output_file.get_digest().try_into(); path_map.insert(output_file_path_buf.clone(), digest?); @@ -1315,6 +1236,94 @@ pub fn extract_output_files( .to_boxed() } +/// Check the remote Action Cache for a cached result of running the given `action_digest`. +/// +/// This check is necessary because some RE servers do not short-circuit the Execute method +/// by checking the Action Cache (e.g., BuildBarn). Thus, this client must check the cache +/// explicitly in order to avoid duplicating already-cached work. This behavior matches +/// the Bazel RE client. +pub async fn check_action_cache( + action_digest: Digest, + metadata: &ProcessMetadata, + platform: Platform, + context: &Context, + action_cache_client: Arc, + headers: &BTreeMap, + store: Store, +) -> Result, String> { + let mut request = bazel_protos::remote_execution::GetActionResultRequest::new(); + if let Some(ref instance_name) = metadata.instance_name { + request.set_instance_name(instance_name.clone()); + } + request.set_action_digest(action_digest.into()); + + let call_opt = call_option(headers, Some(context.build_id.clone()))?; + + let action_result_response = action_cache_client + .get_action_result_async_opt(&request, call_opt) + .unwrap() + .compat() + .await; + + match action_result_response { + Ok(action_result) => { + let response = + populate_fallible_execution_result(store.clone(), &action_result, vec![], platform, false) + .compat() + .await?; + Ok(Some(response)) + } + Err(err) => match err { + grpcio::Error::RpcFailure(rpc_status) + if rpc_status.status == grpcio::RpcStatusCode::NOT_FOUND => + { + Ok(None) + } + _ => Err(rpcerror_to_string(err)), + }, + } +} + +pub async fn store_proto_locally( + store: &Store, + proto: &P, +) -> Result { + let command_bytes = proto + .write_to_bytes() + .map_err(|e| format!("Error serializing proto {:?}", e))?; + store + .store_file_bytes(Bytes::from(command_bytes), true) + .await + .map_err(|e| format!("Error saving proto to local store: {:?}", e)) +} + +pub async fn ensure_action_stored_locally( + store: &Store, + command: &Command, + action: &Action, +) -> Result<(Digest, Digest), String> { + let (command_digest, action_digest) = future::try_join( + store_proto_locally(store, command), + store_proto_locally(store, action), + ) + .await?; + + Ok((command_digest, action_digest)) +} + +pub async fn ensure_action_uploaded( + store: &Store, + command_digest: Digest, + action_digest: Digest, + input_files: Digest, +) -> Result<(), String> { + let _ = store + .ensure_remote_has_recursive(vec![command_digest, action_digest, input_files]) + .compat() + .await?; + Ok(()) +} + pub fn format_error(error: &bazel_protos::status::Status) -> String { let error_code_enum = bazel_protos::code::Code::from_i32(error.get_code()); let error_code = match error_code_enum { @@ -1350,7 +1359,7 @@ fn rpcerror_to_status_or_string( } } -fn rpcerror_to_string(error: grpcio::Error) -> String { +pub(crate) fn rpcerror_to_string(error: grpcio::Error) -> String { match error { grpcio::Error::RpcFailure(status) => format!( "{:?}: {:?}", diff --git a/src/rust/engine/process_execution/src/remote_cache.rs b/src/rust/engine/process_execution/src/remote_cache.rs new file mode 100644 index 00000000000..4c08a691e13 --- /dev/null +++ b/src/rust/engine/process_execution/src/remote_cache.rs @@ -0,0 +1,462 @@ +use std::collections::{BTreeMap, VecDeque}; +use std::convert::TryInto; +use std::path::Component; +use std::sync::Arc; + +use async_trait::async_trait; +use bazel_protos::call_option; +use bazel_protos::remote_execution::{ + ActionResult, Command, FileNode, Tree, UpdateActionResultRequest, +}; +use fs::RelativePath; +use futures::compat::Future01CompatExt; +use hashing::Digest; +use store::Store; +use workunit_store::{with_workunit, Level, WorkunitMetadata}; + +use crate::remote::make_execute_request; +use crate::{ + Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, Platform, Process, + ProcessMetadata, +}; + +/// This `CommandRunner` implementation caches results remotely using the Action Cache service +/// of the Remote Execution API. +/// +/// This runner expects to sit between the local cache CommandRunner and the CommandRunner +/// that is actually executing the Process. Thus, the local cache will be checked first, +/// then the remote cache, and then execution (local or remote) as necessary if neither cache +/// has a hit. On the way back out of the stack, the result will be stored remotely and +/// then locally. +#[derive(Clone)] +pub struct CommandRunner { + underlying: Arc, + metadata: ProcessMetadata, + store: Store, + action_cache_client: Arc, + headers: BTreeMap, + platform: Platform, + cache_read: bool, + cache_write: bool, +} + +impl CommandRunner { + pub fn new( + underlying: Arc, + metadata: ProcessMetadata, + store: Store, + action_cache_address: &str, + root_ca_certs: Option>, + oauth_bearer_token: Option, + headers: BTreeMap, + platform: Platform, + cache_read: bool, + cache_write: bool, + ) -> Result { + let env = Arc::new(grpcio::EnvBuilder::new().build()); + let channel = { + let builder = grpcio::ChannelBuilder::new(env); + if let Some(ref root_ca_certs) = root_ca_certs { + let creds = grpcio::ChannelCredentialsBuilder::new() + .root_cert(root_ca_certs.clone()) + .build(); + builder.secure_connect(action_cache_address, creds) + } else { + builder.connect(action_cache_address) + } + }; + let action_cache_client = Arc::new( + bazel_protos::remote_execution_grpc::ActionCacheClient::new(channel), + ); + + let mut headers = headers; + if let Some(oauth_bearer_token) = oauth_bearer_token { + headers.insert( + String::from("authorization"), + format!("Bearer {}", oauth_bearer_token.trim()), + ); + } + + // Validate any configured static headers. + call_option(&headers, None)?; + + Ok(CommandRunner { + underlying, + metadata, + store, + action_cache_client, + headers, + platform, + cache_read, + cache_write, + }) + } + + /// Create a REAPI `Tree` protobuf for an output directory by traversing down from a Pants + /// merged final output directory to find the specific path to extract. (REAPI requires + /// output directories to be stored as `Tree` protos that contain all of the `Directory` + /// protos that constitute the directory tree.) + pub(crate) async fn make_tree_for_output_directory( + root_directory_digest: Digest, + directory_path: RelativePath, + store: &Store, + ) -> Result { + // Traverse down from the root directory digest to find the directory digest for + // the output directory. + let mut current_directory_digest = root_directory_digest; + for next_path_component in directory_path.as_ref().components() { + let next_name = match next_path_component { + Component::Normal(name) => name + .to_str() + .ok_or_else(|| format!("unable to convert '{:?}' to string", name))?, + _ => return Err("illegal state: unexpected path component in relative path".into()), + }; + + // Load the Directory proto corresponding to `current_directory_digest`. + let current_directory = match store.load_directory(current_directory_digest).await? { + Some((dir, _)) => dir, + None => { + return Err(format!( + "illegal state: directory for digest {:?} did not exist locally", + ¤t_directory_digest + )) + } + }; + + // Scan the current directory for the current path component. + let dir_node = match current_directory + .directories + .iter() + .find(|dn| dn.name == next_name) + { + Some(dn) => dn, + None => { + return Err(format!( + "unable to find path component {:?} in directory", + next_name + )) + } + }; + + // Set the current directory digest to be the digest in the DirectoryNode just found. + // If there are more path components, then the search will continue there. + // Otherwise, if this loop ends then the final Directory digest has been found. + current_directory_digest = dir_node.get_digest().try_into()?; + } + + // At this point, `current_directory_digest` holds the digest of the output directory. + // This will be the root of the Tree. Add it to a queue of digests to traverse. + let mut tree = Tree::new(); + + let mut digest_queue = VecDeque::new(); + digest_queue.push_back(current_directory_digest); + + while let Some(directory_digest) = digest_queue.pop_front() { + let directory = match store.load_directory(directory_digest).await? { + Some((dir, _)) => dir, + None => { + return Err(format!( + "illegal state: directory for digest {:?} did not exist locally", + ¤t_directory_digest + )) + } + }; + + // Add all of the digests for subdirectories into the queue so they are processed + // in future iterations of the loop. + for subdirectory_node in &directory.directories { + digest_queue.push_back(subdirectory_node.get_digest().try_into()?); + } + + // Store this directory either as the `root` or one of the `children` if not the root. + if directory_digest == current_directory_digest { + tree.set_root(directory); + } else { + tree.mut_children().push(directory) + } + } + + Ok(tree) + } + + pub(crate) async fn extract_output_file( + root_directory_digest: Digest, + file_path: RelativePath, + store: &Store, + ) -> Result { + // Traverse down from the root directory digest to find the directory digest for + // the output directory. + let mut current_directory_digest = root_directory_digest; + let parent_path = file_path.as_ref().parent(); + let components_opt = parent_path.map(|x| x.components()); + if let Some(components) = components_opt { + for next_path_component in components { + let next_name = match next_path_component { + Component::Normal(name) => name + .to_str() + .ok_or_else(|| format!("unable to convert '{:?}' to string", name))?, + _ => { + return Err( + "Illegal state: Found an unexpected path component in relative path.".into(), + ) + } + }; + + // Load the Directory proto corresponding to `current_directory_digest`. + let current_directory = match store.load_directory(current_directory_digest).await? { + Some((dir, _)) => dir, + None => { + return Err(format!( + "Illegal state: The directory for digest {:?} did not exist locally.", + ¤t_directory_digest + )) + } + }; + + // Scan the current directory for the current path component. + let dir_node = match current_directory + .directories + .iter() + .find(|dn| dn.name == next_name) + { + Some(dn) => dn, + None => { + return Err(format!( + "Unable to find path component {:?} in directory.", + next_name + )) + } + }; + + // Set the current directory digest to be the digest in the DirectoryNode just found. + // If there are more path components, then the search will continue there. + // Otherwise, if this loop ends then the final Directory digest has been found. + current_directory_digest = dir_node.get_digest().try_into()?; + } + } + + // Load the final directory. + let directory = match store.load_directory(current_directory_digest).await? { + Some((dir, _)) => dir, + None => { + return Err(format!( + "Illegal state: The directory for digest {:?} did not exist locally.", + ¤t_directory_digest + )) + } + }; + + // Search for the file. + let file_base_name = file_path.as_ref().file_name().unwrap(); + directory + .files + .iter() + .find(|n| n.get_name() == file_base_name) + .cloned() + .ok_or_else(|| format!("File {:?} did not exist locally.", file_path)) + } + + async fn make_action_result( + &self, + command: &Command, + result: &FallibleProcessResultWithPlatform, + store: &Store, + ) -> Result { + let mut action_result = ActionResult::new(); + action_result.set_exit_code(result.exit_code); + + action_result.set_stdout_digest(result.stdout_digest.into()); + action_result.set_stderr_digest(result.stderr_digest.into()); + + let mut tree_digests = Vec::new(); + for output_directory in &command.output_directories { + let tree = Self::make_tree_for_output_directory( + result.output_directory, + RelativePath::new(output_directory).unwrap(), + store, + ) + .await?; + + let tree_digest = crate::remote::store_proto_locally(&self.store, &tree).await?; + tree_digests.push(tree_digest); + + action_result.mut_output_directories().push({ + let mut directory = bazel_protos::remote_execution::OutputDirectory::new(); + directory.set_path(String::new()); + directory.set_tree_digest(tree_digest.into()); + directory + }); + } + + store + .ensure_remote_has_recursive(tree_digests) + .compat() + .await?; + + let mut file_digests = Vec::new(); + for output_file in &command.output_files { + let file_node = Self::extract_output_file( + result.output_directory, + RelativePath::new(output_file).unwrap(), + store, + ) + .await?; + + file_digests.push(file_node.get_digest().try_into()?); + + action_result.mut_output_files().push({ + let mut file = bazel_protos::remote_execution::OutputFile::new(); + let digest: Digest = file_node.get_digest().try_into()?; + file.set_digest(digest.into()); + file.set_is_executable(file_node.get_is_executable()); + file.set_path(output_file.to_owned()); + file + }) + } + + store + .ensure_remote_has_recursive(file_digests) + .compat() + .await?; + + Ok(action_result) + } + + /// Stores an execution result into the remote Action Cache. + async fn update_action_cache( + &self, + context: &Context, + request: &Process, + result: &FallibleProcessResultWithPlatform, + metadata: &ProcessMetadata, + command: &Command, + action_digest: Digest, + command_digest: Digest, + ) -> Result<(), String> { + // Upload the action (and related data, i.e. the embedded command and input files). + // Assumption: The Action and related data has already been stored locally. + with_workunit( + context.workunit_store.clone(), + "ensure_action_uploaded".to_owned(), + WorkunitMetadata::with_level(Level::Debug), + crate::remote::ensure_action_uploaded( + &self.store, + command_digest, + action_digest, + request.input_files, + ), + |_, md| md, + ) + .await?; + + // Create an ActionResult from the process result. + let action_result = self + .make_action_result(command, result, &self.store) + .await?; + + let mut update_action_cache_request = UpdateActionResultRequest::new(); + if let Some(ref instance_name) = metadata.instance_name { + update_action_cache_request.set_instance_name(instance_name.clone()); + } + update_action_cache_request.set_action_digest(action_digest.into()); + update_action_cache_request.set_action_result(action_result); + + let call_opt = call_option(&self.headers, Some(context.build_id.clone()))?; + + self + .action_cache_client + .update_action_result_async_opt(&update_action_cache_request, call_opt) + .unwrap() + .compat() + .await + .map_err(crate::remote::rpcerror_to_string)?; + + Ok(()) + } +} + +#[async_trait] +impl crate::CommandRunner for CommandRunner { + async fn run( + &self, + req: MultiPlatformProcess, + context: Context, + ) -> Result { + // Construct the REv2 ExecuteRequest and related data for this execution request. + let request = self.extract_compatible_request(&req).unwrap(); + let (action, command, _execute_request) = + make_execute_request(&request, self.metadata.clone())?; + + // Ensure the action and command are stored locally. + let (command_digest, action_digest) = with_workunit( + context.workunit_store.clone(), + "ensure_action_stored_locally".to_owned(), + WorkunitMetadata::with_level(Level::Debug), + crate::remote::ensure_action_stored_locally(&self.store, &command, &action), + |_, md| md, + ) + .await?; + + // Check the remote Action Cache to see if this request was already computed. + // If so, return immediately with the result. + if self.cache_read { + let response = with_workunit( + context.workunit_store.clone(), + "check_action_cache".to_owned(), + WorkunitMetadata::with_level(Level::Debug), + crate::remote::check_action_cache( + action_digest, + &self.metadata, + self.platform, + &context, + self.action_cache_client.clone(), + &self.headers, + self.store.clone(), + ), + |_, md| md, + ) + .await; + match response { + Ok(cached_response_opt) => { + log::debug!( + "remote cache response: digest={:?}: {:?}", + action_digest, + cached_response_opt + ); + + if let Some(cached_response) = cached_response_opt { + return Ok(cached_response); + } + } + Err(err) => { + log::warn!("Failed to read from remote cache: {}", err); + } + }; + } + + let result = self.underlying.run(req, context.clone()).await?; + if result.exit_code == 0 && self.cache_write { + // Store the result in the remote cache if not the product of a remote execution. + if let Err(err) = self + .update_action_cache( + &context, + &request, + &result, + &self.metadata, + &command, + action_digest, + command_digest, + ) + .await + { + log::warn!("Failed to update remote cache: {}", err) + } + } + + Ok(result) + } + + fn extract_compatible_request(&self, req: &MultiPlatformProcess) -> Option { + self.underlying.extract_compatible_request(req) + } +} diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs new file mode 100644 index 00000000000..aac4c68be85 --- /dev/null +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -0,0 +1,276 @@ +use std::collections::BTreeMap; +use std::convert::TryInto; +use std::io::Write; +use std::path::PathBuf; +use std::sync::atomic::Ordering; +use std::time::Duration; + +use mock::{StubActionCache, StubCAS}; +use store::{BackoffConfig, Store}; +use tempfile::TempDir; +use testutil::data::{TestData, TestDirectory}; +use testutil::relative_paths; +use workunit_store::WorkunitStore; + +use crate::{ + CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, NamedCaches, + Platform, Process, ProcessMetadata, +}; +use fs::RelativePath; +use hashing::Digest; + +struct RoundtripResults { + uncached: Result, + maybe_cached: Result, +} + +fn create_local_runner() -> (Box, Store, TempDir, StubCAS) { + let runtime = task_executor::Executor::new(); + let base_dir = TempDir::new().unwrap(); + let named_cache_dir = base_dir.path().join("named_cache_dir"); + let stub_cas = StubCAS::builder().build(); + let store_dir = base_dir.path().join("store_dir"); + let store = Store::with_remote( + runtime.clone(), + store_dir, + vec![stub_cas.address()], + None, + None, + None, + 1, + 10 * 1024 * 1024, + Duration::from_secs(1), + BackoffConfig::new(Duration::from_millis(10), 1.0, Duration::from_millis(10)).unwrap(), + 1, + 1, + ) + .unwrap(); + let runner = Box::new(crate::local::CommandRunner::new( + store.clone(), + runtime.clone(), + base_dir.path().to_owned(), + NamedCaches::new(named_cache_dir), + true, + )); + (runner, store, base_dir, stub_cas) +} + +fn create_cached_runner( + local: Box, + store: Store, +) -> (Box, TempDir, StubActionCache) { + let cache_dir = TempDir::new().unwrap(); + + let metadata = ProcessMetadata { + instance_name: None, + cache_key_gen_version: None, + platform_properties: vec![], + }; + + let action_cache = StubActionCache::new().unwrap(); + + let runner = Box::new( + crate::remote_cache::CommandRunner::new( + local.into(), + metadata, + store, + &action_cache.address(), + None, + None, + BTreeMap::default(), + Platform::current().unwrap(), + true, + true, + ) + .expect("caching command runner"), + ); + + (runner, cache_dir, action_cache) +} + +fn create_script(script_exit_code: i8) -> (Process, PathBuf, TempDir) { + let script_dir = TempDir::new().unwrap(); + let script_path = script_dir.path().join("script"); + std::fs::File::create(&script_path) + .and_then(|mut file| { + writeln!( + file, + "echo -n {} > roland && echo Hello && echo >&2 World; exit {}", + TestData::roland().string(), + script_exit_code + ) + }) + .unwrap(); + + let process = Process::new(vec![ + testutil::path::find_bash(), + format!("{}", script_path.display()), + ]) + .output_files(relative_paths(&["roland"]).collect()); + + (process, script_path, script_dir) +} + +async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults { + let (local, store, _local_runner_dir, _stub_cas) = create_local_runner(); + let (process, script_path, _script_dir) = create_script(script_exit_code); + + let local_result = local.run(process.clone().into(), Context::default()).await; + + let (caching, _cache_dir, _stub_action_cache) = create_cached_runner(local, store.clone()); + + let uncached_result = caching + .run(process.clone().into(), Context::default()) + .await; + + assert_eq!(local_result, uncached_result); + + // Removing the file means that were the command to be run again without any caching, it would + // fail due to a FileNotFound error. So, If the second run succeeds, that implies that the + // cache was successfully used. + std::fs::remove_file(&script_path).unwrap(); + let maybe_cached_result = caching.run(process.into(), Context::default()).await; + + RoundtripResults { + uncached: uncached_result, + maybe_cached: maybe_cached_result, + } +} + +#[tokio::test] +async fn cache_success() { + let workunit_store = WorkunitStore::new(false); + workunit_store.init_thread_state(None); + + let results = run_roundtrip(0).await; + assert_eq!(results.uncached, results.maybe_cached); +} + +#[tokio::test] +async fn failures_not_cached() { + let workunit_store = WorkunitStore::new(false); + workunit_store.init_thread_state(None); + + let results = run_roundtrip(1).await; + assert_ne!(results.uncached, results.maybe_cached); + assert_eq!(results.uncached.unwrap().exit_code, 1); + assert_eq!(results.maybe_cached.unwrap().exit_code, 127); // aka the return code for file not found +} + +#[tokio::test] +async fn skip_cache_on_error() { + let workunit_store = WorkunitStore::new(false); + workunit_store.init_thread_state(None); + + let (local, store, _local_runner_dir, _stub_cas) = create_local_runner(); + let (caching, _cache_dir, stub_action_cache) = create_cached_runner(local, store.clone()); + let (process, _script_path, _script_dir) = create_script(0); + + stub_action_cache + .always_errors + .store(true, Ordering::SeqCst); + + // Run once to ensure the cache is skipped on errors. + let result = caching + .run(process.clone().into(), Context::default()) + .await + .unwrap(); + + assert_eq!(result.exit_code, 0); +} + +#[tokio::test] +async fn make_tree_from_directory() { + let store_dir = TempDir::new().unwrap(); + let executor = task_executor::Executor::new(); + let store = Store::local_only(executor.clone(), store_dir.path()).unwrap(); + + // Prepare the store to contain /pets/cats/roland. We will then extract varios pieces of it + // into Tree protos. + + store + .store_file_bytes(TestData::roland().bytes(), false) + .await + .expect("Error saving file bytes"); + store + .record_directory(&TestDirectory::containing_roland().directory(), true) + .await + .expect("Error saving directory"); + store + .record_directory(&TestDirectory::nested().directory(), true) + .await + .expect("Error saving directory"); + let directory_digest = store + .record_directory(&TestDirectory::double_nested().directory(), true) + .await + .expect("Error saving directory"); + + let tree = crate::remote_cache::CommandRunner::make_tree_for_output_directory( + directory_digest, + RelativePath::new("pets").unwrap(), + &store, + ) + .await + .unwrap(); + + let root_dir = tree.get_root(); + assert_eq!(root_dir.get_files().len(), 0); + assert_eq!(root_dir.get_directories().len(), 1); + let dir_node = &root_dir.get_directories()[0]; + assert_eq!(dir_node.get_name(), "cats"); + let dir_digest: Digest = dir_node.get_digest().try_into().unwrap(); + assert_eq!(dir_digest, TestDirectory::containing_roland().digest()); + let children = tree.get_children(); + assert_eq!(children.len(), 1); + let child_dir = &children[0]; + assert_eq!(child_dir.get_files().len(), 1); + assert_eq!(child_dir.get_directories().len(), 0); + let file_node = &child_dir.get_files()[0]; + assert_eq!(file_node.get_name(), "roland"); + let file_digest: Digest = file_node.get_digest().try_into().unwrap(); + assert_eq!(file_digest, TestData::roland().digest()); + + // Test that extracting a non-existent output directory fails. + crate::remote_cache::CommandRunner::make_tree_for_output_directory( + directory_digest, + RelativePath::new("animals").unwrap(), + &store, + ) + .await + .unwrap_err(); +} + +#[tokio::test] +async fn extract_output_file() { + let store_dir = TempDir::new().unwrap(); + let executor = task_executor::Executor::new(); + let store = Store::local_only(executor.clone(), store_dir.path()).unwrap(); + + // Prepare the store to contain /pets/cats/roland. We will then extract varios pieces of it + // into Tree protos. + + store + .store_file_bytes(TestData::roland().bytes(), false) + .await + .expect("Error saving file bytes"); + store + .record_directory(&TestDirectory::containing_roland().directory(), true) + .await + .expect("Error saving directory"); + let directory_digest = store + .record_directory(&TestDirectory::nested().directory(), true) + .await + .expect("Error saving directory"); + + let file_node = crate::remote_cache::CommandRunner::extract_output_file( + directory_digest, + RelativePath::new("cats/roland").unwrap(), + &store, + ) + .await + .unwrap(); + + assert_eq!(file_node.get_name(), "roland"); + let file_digest: Digest = file_node.get_digest().try_into().unwrap(); + assert_eq!(file_digest, TestData::roland().digest()); +} diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 7107852d35c..2959013668f 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -96,6 +96,8 @@ pub struct ExecutionStrategyOptions { pub speculation_strategy: String, pub use_local_cache: bool, pub local_enable_nailgun: bool, + pub remote_cache_read: bool, + pub remote_cache_write: bool, } impl Core { @@ -192,6 +194,7 @@ impl Core { fn make_command_runner( store: &Store, + remote_store_servers: &[String], executor: &Executor, local_execution_root_dir: &Path, named_caches_dir: &Path, @@ -202,6 +205,14 @@ impl Core { exec_strategy_opts: &ExecutionStrategyOptions, remoting_opts: &RemotingOptions, ) -> Result, String> { + if (exec_strategy_opts.remote_cache_read || exec_strategy_opts.remote_cache_write) + && remoting_opts.execution_enable + { + return Err( + "Remote caching mode and remote execution mode cannot be enabled concurrently".into(), + ); + } + let local_command_runner = Core::make_local_execution_runner( store, executor, @@ -243,7 +254,29 @@ impl Core { local_command_runner }; - let maybe_cached_command_runner = if exec_strategy_opts.use_local_cache { + let maybe_remote_cached_command_runner = + if exec_strategy_opts.remote_cache_read || exec_strategy_opts.remote_cache_write { + let action_cache_address = remote_store_servers + .first() + .ok_or_else(|| "at least one remote store must be specified".to_owned())?; + + Box::new(process_execution::remote_cache::CommandRunner::new( + command_runner.into(), + process_execution_metadata.clone(), + store.clone(), + action_cache_address.as_str(), + root_ca_certs.clone(), + oauth_bearer_token.clone(), + remoting_opts.execution_headers.clone(), + Platform::current()?, + exec_strategy_opts.remote_cache_read, + exec_strategy_opts.remote_cache_write, + )?) + } else { + command_runner + }; + + let maybe_local_cached_command_runner = if exec_strategy_opts.use_local_cache { let process_execution_store = ShardedLmdb::new( local_store_dir.join("processes"), 5 * GIGABYTES, @@ -252,16 +285,16 @@ impl Core { ) .map_err(|err| format!("Could not initialize store for process cache: {:?}", err))?; Box::new(process_execution::cache::CommandRunner::new( - command_runner.into(), + maybe_remote_cached_command_runner.into(), process_execution_store, store.clone(), process_execution_metadata.clone(), )) } else { - command_runner + maybe_remote_cached_command_runner }; - Ok(maybe_cached_command_runner) + Ok(maybe_local_cached_command_runner) } fn load_certificates( @@ -344,13 +377,20 @@ impl Core { None }; + let need_remote_store = remoting_opts.execution_enable + || exec_strategy_opts.remote_cache_read + || exec_strategy_opts.remote_cache_write; + if need_remote_store && remote_store_servers.is_empty() { + return Err("Remote store required but none provided".into()); + } + let store = safe_create_dir_all_ioerror(&local_store_dir) .map_err(|e| format!("Error making directory {:?}: {:?}", local_store_dir, e)) .and_then(|_| { Core::make_store( &executor, &local_store_dir, - remoting_opts.execution_enable && !remote_store_servers.is_empty(), + need_remote_store, &remoting_opts, &remote_store_servers, &root_ca_certs, @@ -367,6 +407,7 @@ impl Core { let command_runner = Core::make_command_runner( &store, + &remote_store_servers, &executor, &local_execution_root_dir, &named_caches_dir, diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index 9028b76567b..a6d11e7692b 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -474,7 +474,9 @@ py_class!(class PyExecutionStrategyOptions |py| { speculation_delay: f64, speculation_strategy: String, use_local_cache: bool, - local_enable_nailgun: bool + local_enable_nailgun: bool, + remote_cache_read: bool, + remote_cache_write: bool ) -> CPyResult { Self::create_instance(py, ExecutionStrategyOptions { @@ -485,6 +487,8 @@ py_class!(class PyExecutionStrategyOptions |py| { speculation_strategy, use_local_cache, local_enable_nailgun, + remote_cache_read, + remote_cache_write, } ) } diff --git a/src/rust/engine/testutil/mock/src/action_cache.rs b/src/rust/engine/testutil/mock/src/action_cache.rs new file mode 100644 index 00000000000..f43c0acbcca --- /dev/null +++ b/src/rust/engine/testutil/mock/src/action_cache.rs @@ -0,0 +1,155 @@ +// Copyright 2020 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). + +#![deny(warnings)] +// Enable all clippy lints except for many of the pedantic ones. It's a shame this needs to be copied and pasted across crates, but there doesn't appear to be a way to include inner attributes from a common source. +#![deny( +clippy::all, +clippy::default_trait_access, +clippy::expl_impl_clone_on_copy, +clippy::if_not_else, +clippy::needless_continue, +clippy::unseparated_literal_suffix, +// TODO: Falsely triggers for async/await: +// see https://github.com/rust-lang/rust-clippy/issues/5360 +// clippy::used_underscore_binding +)] +// It is often more clear to show that nothing is being moved. +#![allow(clippy::match_ref_pats)] +// Subjective style. +#![allow( +clippy::len_without_is_empty, +clippy::redundant_field_names, +clippy::too_many_arguments +)] +// Default isn't as big a deal as people seem to think it is. +#![allow(clippy::new_without_default, clippy::new_ret_no_self)] +// Arc can be more clear than needing to grok Orderings: +#![allow(clippy::mutex_atomic)] + +use std::collections::HashMap; +use std::convert::TryInto; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use bazel_protos::remote_execution::{ + ActionResult, GetActionResultRequest, UpdateActionResultRequest, +}; +use grpcio::{RpcContext, UnarySink}; +use hashing::{Digest, Fingerprint}; +use parking_lot::Mutex; + +pub struct StubActionCache { + server_transport: grpcio::Server, + pub action_map: Arc>>, + pub always_errors: Arc, +} + +#[derive(Clone)] +struct ActionCacheResponder { + action_map: Arc>>, + always_errors: Arc, +} + +impl bazel_protos::remote_execution_grpc::ActionCache for ActionCacheResponder { + fn get_action_result( + &self, + _: RpcContext<'_>, + req: GetActionResultRequest, + sink: UnarySink, + ) { + if self.always_errors.load(Ordering::SeqCst) { + sink.fail(grpcio::RpcStatus::new( + grpcio::RpcStatusCode::UNAVAILABLE, + Some("unavailable".to_owned()), + )); + return; + } + + let action_digest: Digest = match req.get_action_digest().try_into() { + Ok(digest) => digest, + Err(_) => { + sink.fail(grpcio::RpcStatus::new( + grpcio::RpcStatusCode::INTERNAL, + Some("Unable to extract action_digest.".to_owned()), + )); + return; + } + }; + + let action_map = self.action_map.lock(); + let action_result = match action_map.get(&action_digest.0) { + Some(ar) => ar.clone(), + None => { + sink.fail(grpcio::RpcStatus::new( + grpcio::RpcStatusCode::NOT_FOUND, + Some(format!( + "ActionResult for Action {:?} does not exist", + action_digest + )), + )); + return; + } + }; + + sink.success(action_result); + } + + fn update_action_result( + &self, + _: RpcContext<'_>, + req: UpdateActionResultRequest, + sink: UnarySink, + ) { + let action_digest: Digest = match req.get_action_digest().try_into() { + Ok(digest) => digest, + Err(_) => { + sink.fail(grpcio::RpcStatus::new( + grpcio::RpcStatusCode::INTERNAL, + Some("Unable to extract action_digest.".to_owned()), + )); + return; + } + }; + + let mut action_map = self.action_map.lock(); + action_map.insert(action_digest.0, req.get_action_result().clone()); + + sink.success(req.get_action_result().clone()); + } +} + +impl StubActionCache { + pub fn new() -> Result { + let action_map = Arc::new(Mutex::new(HashMap::new())); + let always_errors = Arc::new(AtomicBool::new(false)); + let responder = ActionCacheResponder { + action_map: action_map.clone(), + always_errors: always_errors.clone(), + }; + + let env = Arc::new(grpcio::Environment::new(1)); + let mut server_transport = grpcio::ServerBuilder::new(env) + .register_service(bazel_protos::remote_execution_grpc::create_action_cache( + responder, + )) + .bind("127.0.0.1", 0) + .build() + .unwrap(); + server_transport.start(); + + Ok(StubActionCache { + server_transport, + action_map, + always_errors, + }) + } + + /// + /// The address on which this server is listening over insecure HTTP transport. + /// + pub fn address(&self) -> String { + let bind_addr = self.server_transport.bind_addrs().next().unwrap(); + format!("{}:{}", bind_addr.0, bind_addr.1) + } +} diff --git a/src/rust/engine/testutil/mock/src/lib.rs b/src/rust/engine/testutil/mock/src/lib.rs index bd9a7428b34..369f69741f5 100644 --- a/src/rust/engine/testutil/mock/src/lib.rs +++ b/src/rust/engine/testutil/mock/src/lib.rs @@ -27,7 +27,10 @@ // Arc can be more clear than needing to grok Orderings: #![allow(clippy::mutex_atomic)] +mod action_cache; mod cas; -pub use crate::cas::StubCAS; pub mod execution_server; + +pub use crate::action_cache::StubActionCache; +pub use crate::cas::StubCAS; pub use crate::execution_server::MockExecution;