Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remote caching support #10960

Merged
merged 9 commits into from
Oct 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/python/pants/engine/internals/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ def new_scheduler(
speculation_strategy=execution_options.process_execution_speculation_strategy,
use_local_cache=execution_options.process_execution_use_local_cache,
local_enable_nailgun=execution_options.process_execution_local_enable_nailgun,
remote_cache_read=execution_options.remote_cache_read,
remote_cache_write=execution_options.remote_cache_write,
)

return cast(
Expand Down
30 changes: 30 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class ExecutionOptions:
remote_execution_headers: Any
remote_execution_overall_deadline_secs: int
process_execution_local_enable_nailgun: bool
remote_cache_read: bool
remote_cache_write: bool

@classmethod
def from_bootstrap_options(cls, bootstrap_options):
Expand All @@ -112,6 +114,8 @@ def from_bootstrap_options(cls, bootstrap_options):
remote_execution_headers=bootstrap_options.remote_execution_headers,
remote_execution_overall_deadline_secs=bootstrap_options.remote_execution_overall_deadline_secs,
process_execution_local_enable_nailgun=bootstrap_options.process_execution_local_enable_nailgun,
remote_cache_read=bootstrap_options.remote_cache_read,
remote_cache_write=bootstrap_options.remote_cache_write,
)


Expand All @@ -138,6 +142,8 @@ def from_bootstrap_options(cls, bootstrap_options):
remote_execution_headers={},
remote_execution_overall_deadline_secs=60 * 60, # one hour
process_execution_local_enable_nailgun=False,
remote_cache_read=False,
remote_cache_write=False,
)


Expand Down Expand Up @@ -759,6 +765,20 @@ def register_bootstrap_options(cls, register):
help="Whether or not to use nailgun to run the requests that are marked as nailgunnable.",
advanced=True,
)
register(
"--remote-cache-read",
type=bool,
default=DEFAULT_EXECUTION_OPTIONS.remote_cache_read,
advanced=True,
help="Whether to enable reading from a remote cache",
)
register(
"--remote-cache-write",
type=bool,
default=DEFAULT_EXECUTION_OPTIONS.remote_cache_write,
advanced=True,
help="Whether to enable writing results to a remote cache",
)

@classmethod
def register_options(cls, register):
Expand Down Expand Up @@ -918,3 +938,13 @@ def validate_instance(cls, opts):
"The `--remote-execution-server` option requires also setting "
"`--remote-store-server`. Often these have the same value."
)
if opts.remote_cache_read and not opts.remote_store_server:
raise OptionsError(
"The `--remote-cache-read` option requires also setting "
"`--remote-store-server` to work properly."
)
if opts.remote_cache_write and not opts.remote_store_server:
raise OptionsError(
"The `--remote-cache-write` option requires also setting "
"`--remote-store-server` to work properly."
)
4 changes: 4 additions & 0 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ pub mod remote;
#[cfg(test)]
pub mod remote_tests;

pub mod remote_cache;
#[cfg(test)]
mod remote_cache_tests;

pub mod speculate;
#[cfg(test)]
mod speculate_tests;
Expand Down
197 changes: 103 additions & 94 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::{
Context, ExecutionStats, FallibleProcessResultWithPlatform, MultiPlatformProcess, Platform,
PlatformConstraint, Process, ProcessMetadata,
};
use bazel_protos::remote_execution_grpc::ActionCacheClient;

// Environment variable which is exclusively used for cache key invalidation.
// This may be not specified in an Process, and may be populated only by the
Expand Down Expand Up @@ -183,17 +184,6 @@ impl CommandRunner {
self.platform
}

async fn store_proto_locally<P: protobuf::Message>(&self, proto: &P) -> Result<Digest, String> {
let command_bytes = proto
.write_to_bytes()
.map_err(|e| format!("Error serializing proto {:?}", e))?;
self
.store
.store_file_bytes(Bytes::from(command_bytes), true)
.await
.map_err(|e| format!("Error saving proto to local store: {:?}", e))
}

async fn get_capabilities(
&self,
) -> Result<&bazel_protos::remote_execution::ServerCapabilities, String> {
Expand All @@ -218,85 +208,6 @@ impl CommandRunner {
.await
}

/// Check the remote Action Cache for a cached result of running the given `action_digest`.
///
/// This check is necessary because some RE servers do not short-circuit the Execute method
/// by checking the Action Cache (e.g., BuildBarn). Thus, this client must check the cache
/// explicitly in order to avoid duplicating already-cached work. This behavior matches
/// the Bazel RE client.
async fn check_action_cache(
&self,
action_digest: Digest,
metadata: &ProcessMetadata,
context: &Context,
) -> Result<Option<FallibleProcessResultWithPlatform>, String> {
let mut request = bazel_protos::remote_execution::GetActionResultRequest::new();
if let Some(ref instance_name) = metadata.instance_name {
request.set_instance_name(instance_name.clone());
}
request.set_action_digest(action_digest.into());

let call_opt = call_option(&self.headers, Some(context.build_id.clone()))?;

let action_result_response = self
.action_cache_client
.get_action_result_async_opt(&request, call_opt)
.unwrap()
.compat()
.await;

match action_result_response {
Ok(action_result) => {
let response = populate_fallible_execution_result(
self.store.clone(),
&action_result,
vec![],
self.platform,
false,
)
.compat()
.await?;
Ok(Some(response))
}
Err(err) => match err {
grpcio::Error::RpcFailure(rpc_status)
if rpc_status.status == grpcio::RpcStatusCode::NOT_FOUND =>
{
Ok(None)
}
_ => Err(rpcerror_to_string(err)),
},
}
}

async fn ensure_action_stored_locally(
&self,
command: &Command,
action: &Action,
) -> Result<(Digest, Digest), String> {
let (command_digest, action_digest) = future::try_join(
self.store_proto_locally(command),
self.store_proto_locally(action),
)
.await?;

Ok((command_digest, action_digest))
}

async fn ensure_action_uploaded(
&self,
store: &Store,
command_digest: Digest,
action_digest: Digest,
input_files: Digest,
) -> Result<(), String> {
let _ = store
.ensure_remote_has_recursive(vec![command_digest, action_digest, input_files])
.compat()
.await?;
Ok(())
}

// Monitors the operation stream returned by the REv2 Execute and WaitExecution methods.
// Outputs progress reported by the server and returns the next actionable operation
// or gRPC status back to the main loop (plus the operation name so the main loop can
Expand Down Expand Up @@ -797,7 +708,7 @@ impl crate::CommandRunner for CommandRunner {
context.workunit_store.clone(),
"ensure_action_stored_locally".to_owned(),
WorkunitMetadata::with_level(Level::Debug),
self.ensure_action_stored_locally(&command, &action),
ensure_action_stored_locally(&self.store, &command, &action),
|_, md| md,
)
.await?;
Expand All @@ -808,7 +719,15 @@ impl crate::CommandRunner for CommandRunner {
context.workunit_store.clone(),
"check_action_cache".to_owned(),
WorkunitMetadata::with_level(Level::Debug),
self.check_action_cache(action_digest, &self.metadata, &context),
check_action_cache(
action_digest,
&self.metadata,
self.platform,
&context,
self.action_cache_client.clone(),
&self.headers,
self.store.clone(),
),
|_, md| md,
)
.await?;
Expand All @@ -825,7 +744,7 @@ impl crate::CommandRunner for CommandRunner {
context.workunit_store.clone(),
"ensure_action_uploaded".to_owned(),
WorkunitMetadata::with_level(Level::Debug),
self.ensure_action_uploaded(&store, command_digest, action_digest, request.input_files),
ensure_action_uploaded(&store, command_digest, action_digest, request.input_files),
|_, md| md,
)
.await?;
Expand Down Expand Up @@ -1213,6 +1132,7 @@ pub fn extract_output_files(
// Retrieve the Tree proto and hash its root `Directory` proto to obtain the digest
// of the output directory needed to construct the series of `Directory` protos needed
// for the final merge of the output directories.
log::info!("processing dir={:?}", dir);
let tree_digest: Digest = dir.get_tree_digest().try_into()?;
let root_digest_opt = store.load_tree_from_remote(tree_digest).await?;
let root_digest = root_digest_opt
Expand Down Expand Up @@ -1246,6 +1166,7 @@ pub fn extract_output_files(
.get_output_files()
.iter()
.map(|output_file| {
log::info!("processing output_file={:?}", output_file);
let output_file_path_buf = PathBuf::from(output_file.get_path());
let digest: Result<Digest, String> = output_file.get_digest().try_into();
path_map.insert(output_file_path_buf.clone(), digest?);
Expand Down Expand Up @@ -1315,6 +1236,94 @@ pub fn extract_output_files(
.to_boxed()
}

/// Check the remote Action Cache for a cached result of running the given `action_digest`.
///
/// This check is necessary because some RE servers do not short-circuit the Execute method
/// by checking the Action Cache (e.g., BuildBarn). Thus, this client must check the cache
/// explicitly in order to avoid duplicating already-cached work. This behavior matches
/// the Bazel RE client.
pub async fn check_action_cache(
action_digest: Digest,
metadata: &ProcessMetadata,
platform: Platform,
context: &Context,
action_cache_client: Arc<ActionCacheClient>,
headers: &BTreeMap<String, String>,
store: Store,
) -> Result<Option<FallibleProcessResultWithPlatform>, String> {
let mut request = bazel_protos::remote_execution::GetActionResultRequest::new();
if let Some(ref instance_name) = metadata.instance_name {
request.set_instance_name(instance_name.clone());
}
request.set_action_digest(action_digest.into());

let call_opt = call_option(headers, Some(context.build_id.clone()))?;

let action_result_response = action_cache_client
.get_action_result_async_opt(&request, call_opt)
.unwrap()
.compat()
.await;

match action_result_response {
Ok(action_result) => {
let response =
populate_fallible_execution_result(store.clone(), &action_result, vec![], platform, false)
.compat()
.await?;
Ok(Some(response))
}
Err(err) => match err {
grpcio::Error::RpcFailure(rpc_status)
if rpc_status.status == grpcio::RpcStatusCode::NOT_FOUND =>
{
Ok(None)
}
_ => Err(rpcerror_to_string(err)),
},
}
}

pub async fn store_proto_locally<P: protobuf::Message>(
store: &Store,
proto: &P,
) -> Result<Digest, String> {
let command_bytes = proto
.write_to_bytes()
.map_err(|e| format!("Error serializing proto {:?}", e))?;
store
.store_file_bytes(Bytes::from(command_bytes), true)
.await
.map_err(|e| format!("Error saving proto to local store: {:?}", e))
}

pub async fn ensure_action_stored_locally(
store: &Store,
command: &Command,
action: &Action,
) -> Result<(Digest, Digest), String> {
let (command_digest, action_digest) = future::try_join(
store_proto_locally(store, command),
store_proto_locally(store, action),
)
.await?;

Ok((command_digest, action_digest))
}

pub async fn ensure_action_uploaded(
store: &Store,
command_digest: Digest,
action_digest: Digest,
input_files: Digest,
) -> Result<(), String> {
let _ = store
.ensure_remote_has_recursive(vec![command_digest, action_digest, input_files])
.compat()
.await?;
Ok(())
}

pub fn format_error(error: &bazel_protos::status::Status) -> String {
let error_code_enum = bazel_protos::code::Code::from_i32(error.get_code());
let error_code = match error_code_enum {
Expand Down Expand Up @@ -1350,7 +1359,7 @@ fn rpcerror_to_status_or_string(
}
}

fn rpcerror_to_string(error: grpcio::Error) -> String {
pub(crate) fn rpcerror_to_string(error: grpcio::Error) -> String {
match error {
grpcio::Error::RpcFailure(status) => format!(
"{:?}: {:?}",
Expand Down
Loading