Skip to content

Commit

Permalink
use REAPI batch API for small blob writes (#12537)
Browse files Browse the repository at this point in the history
## Problem

Pants currently uses the `ByteStream` API exposed by REAPI servers to write all blobs including small blobs. This incurs unnecessary network overhead when reading small blobs which could have been inlined into the request if  `BatchUpdateBlobs` had been used.

## Solution

1. Add a `--remote-store-batch-api-size-limit` option which controls the size threshold for when to switch from `BatchUpdateBlobs` to `BytesStream.Write`
2. If both local configuration (i.e., `--remote-store-batch-api-size-limit`) and REAPI server capabilities (i.e., `max_batch_total_size_bytes`) allow a write via the batch API, then write via the batch API otherwise continue to use `ByteStream.Write`.
  • Loading branch information
Tom Dyas authored Aug 13, 2021
1 parent c213d53 commit 79cd70e
Show file tree
Hide file tree
Showing 13 changed files with 120 additions and 5 deletions.
1 change: 1 addition & 0 deletions src/python/pants/engine/internals/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def __init__(
store_chunk_upload_timeout=execution_options.remote_store_chunk_upload_timeout_seconds,
store_rpc_retries=execution_options.remote_store_rpc_retries,
store_rpc_concurrency=execution_options.remote_store_rpc_concurrency,
store_batch_api_size_limit=execution_options.remote_store_batch_api_size_limit,
cache_warnings_behavior=execution_options.remote_cache_warnings.value,
cache_eager_fetch=execution_options.remote_cache_eager_fetch,
cache_rpc_concurrency=execution_options.remote_cache_rpc_concurrency,
Expand Down
10 changes: 10 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ class ExecutionOptions:
remote_store_chunk_upload_timeout_seconds: int
remote_store_rpc_retries: int
remote_store_rpc_concurrency: int
remote_store_batch_api_size_limit: int

remote_cache_eager_fetch: bool
remote_cache_warnings: RemoteCacheWarningsBehavior
Expand Down Expand Up @@ -378,6 +379,7 @@ def from_options(
remote_store_chunk_upload_timeout_seconds=bootstrap_options.remote_store_chunk_upload_timeout_seconds,
remote_store_rpc_retries=bootstrap_options.remote_store_rpc_retries,
remote_store_rpc_concurrency=dynamic_remote_options.store_rpc_concurrency,
remote_store_batch_api_size_limit=bootstrap_options.remote_store_batch_api_size_limit,
# Remote cache setup.
remote_cache_eager_fetch=bootstrap_options.remote_cache_eager_fetch,
remote_cache_warnings=bootstrap_options.remote_cache_warnings,
Expand Down Expand Up @@ -456,6 +458,7 @@ def from_options(cls, options: OptionValueContainer) -> LocalStoreOptions:
remote_store_chunk_upload_timeout_seconds=60,
remote_store_rpc_retries=2,
remote_store_rpc_concurrency=128,
remote_store_batch_api_size_limit=4194304,
# Remote cache setup.
remote_cache_eager_fetch=True,
remote_cache_warnings=RemoteCacheWarningsBehavior.first_only,
Expand Down Expand Up @@ -1220,6 +1223,13 @@ def register_bootstrap_options(cls, register):
default=DEFAULT_EXECUTION_OPTIONS.remote_store_rpc_concurrency,
help="The number of concurrent requests allowed to the remote store service.",
)
register(
"--remote-store-batch-api-size-limit",
type=int,
advanced=True,
default=DEFAULT_EXECUTION_OPTIONS.remote_store_batch_api_size_limit,
help="The maximum total size of blobs allowed to be sent in a single batch API call to the remote store.",
)

register(
"--remote-cache-warnings",
Expand Down
12 changes: 11 additions & 1 deletion src/rust/engine/fs/brfs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,15 @@ async fn main() {
.long("rpc-concurrency-limit")
.required(false)
.default_value("128")
).get_matches();
).arg(
Arg::with_name("batch-api-size-limit")
.help("Maximum total size of blobs allowed to be sent in a single batch API call to the remote store.")
.takes_value(true)
.long("batch-api-size-limit")
.required(false)
.default_value("4194304")
)
.get_matches();

let mount_path = args.value_of("mount-path").unwrap();
let store_path = args.value_of("local-store-path").unwrap();
Expand Down Expand Up @@ -756,6 +764,8 @@ async fn main() {
value_t!(args.value_of("rpc-concurrency-limit"), usize)
.expect("Bad rpc-concurrency-limit flag"),
None,
value_t!(args.value_of("batch-api-size-limit"), usize)
.expect("Bad batch-api-size-limit flag"),
)
.expect("Error making remote store"),
None => local_only_store,
Expand Down
9 changes: 9 additions & 0 deletions src/rust/engine/fs/fs_util/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,13 @@ to this directory.",
.required(false)
.default_value("128")
)
.arg(
Arg::with_name("batch-api-size-limit")
.help("Maximum total size of blobs allowed to be sent in a single batch API call to the remote store.")
.takes_value(true)
.long("batch-api-size-limit")
.required(false)
.default_value("4194304"))
.get_matches(),
).await {
Ok(_) => {}
Expand Down Expand Up @@ -327,6 +334,8 @@ async fn execute(top_match: &clap::ArgMatches<'_>) -> Result<(), ExitError> {
value_t!(top_match.value_of("rpc-concurrency-limit"), usize)
.expect("Bad rpc-concurrency-limit flag"),
None,
value_t!(top_match.value_of("batch-api-size-limit"), usize)
.expect("Bad batch-api-size-limit flag"),
),
true,
)
Expand Down
2 changes: 2 additions & 0 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ impl Store {
rpc_retries: usize,
rpc_concurrency_limit: usize,
capabilities_cell_opt: Option<Arc<DoubleCheckedCell<ServerCapabilities>>>,
batch_api_size_limit: usize,
) -> Result<Store, String> {
Ok(Store {
local: self.local,
Expand All @@ -340,6 +341,7 @@ impl Store {
rpc_retries,
rpc_concurrency_limit,
capabilities_cell_opt,
batch_api_size_limit,
)?)),
})
}
Expand Down
65 changes: 61 additions & 4 deletions src/rust/engine/fs/store/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use hashing::Digest;
use log::Level;
use remexec::{
capabilities_client::CapabilitiesClient,
content_addressable_storage_client::ContentAddressableStorageClient, ServerCapabilities,
content_addressable_storage_client::ContentAddressableStorageClient, BatchUpdateBlobsRequest,
ServerCapabilities,
};
use tonic::{Code, Request, Status};
use workunit_store::{in_workunit, Metric, ObservationMetric, WorkunitMetadata};
Expand All @@ -33,6 +34,7 @@ pub struct ByteStore {
cas_client: Arc<ContentAddressableStorageClient<LayeredService>>,
capabilities_cell: Arc<DoubleCheckedCell<ServerCapabilities>>,
capabilities_client: Arc<CapabilitiesClient<LayeredService>>,
batch_api_size_limit: usize,
}

impl fmt::Debug for ByteStore {
Expand Down Expand Up @@ -75,6 +77,7 @@ impl ByteStore {
rpc_retries: usize,
rpc_concurrency_limit: usize,
capabilities_cell_opt: Option<Arc<DoubleCheckedCell<ServerCapabilities>>>,
batch_api_size_limit: usize,
) -> Result<ByteStore, String> {
let tls_client_config = if cas_address.starts_with("https://") {
Some(grpc_util::create_tls_config(root_ca_certs)?)
Expand Down Expand Up @@ -107,6 +110,7 @@ impl ByteStore {
capabilities_cell: capabilities_cell_opt
.unwrap_or_else(|| Arc::new(DoubleCheckedCell::new())),
capabilities_client,
batch_api_size_limit,
})
}

Expand Down Expand Up @@ -203,6 +207,60 @@ impl ByteStore {
digest: Digest,
bytes: ByteSource,
) -> Result<(), ByteStoreError>
where
ByteSource: Fn(Range<usize>) -> Bytes + Send + Sync + 'static,
{
let len = digest.size_bytes;

let max_batch_total_size_bytes = {
let capabilities = self.get_capabilities().await?;

capabilities
.cache_capabilities
.as_ref()
.map(|c| c.max_batch_total_size_bytes as usize)
.unwrap_or_default()
};

let batch_api_allowed_by_local_config = len <= self.batch_api_size_limit;
let batch_api_allowed_by_server_config =
max_batch_total_size_bytes == 0 || len < max_batch_total_size_bytes;
if batch_api_allowed_by_local_config && batch_api_allowed_by_server_config {
self.store_bytes_source_batch(digest, bytes).await
} else {
self.store_bytes_source_stream(digest, bytes).await
}
}

async fn store_bytes_source_batch<ByteSource>(
&self,
digest: Digest,
bytes: ByteSource,
) -> Result<(), ByteStoreError>
where
ByteSource: Fn(Range<usize>) -> Bytes + Send + Sync + 'static,
{
let request = BatchUpdateBlobsRequest {
instance_name: self.instance_name.clone().unwrap_or_default(),
requests: vec![remexec::batch_update_blobs_request::Request {
digest: Some(digest.into()),
data: bytes(0..digest.size_bytes),
}],
};

let mut client = self.cas_client.as_ref().clone();
client
.batch_update_blobs(request)
.await
.map_err(ByteStoreError::Grpc)?;
Ok(())
}

async fn store_bytes_source_stream<ByteSource>(
&self,
digest: Digest,
bytes: ByteSource,
) -> Result<(), ByteStoreError>
where
ByteSource: Fn(Range<usize>) -> Bytes + Send + Sync + 'static,
{
Expand Down Expand Up @@ -464,8 +522,7 @@ impl ByteStore {
}
}

#[allow(dead_code)]
async fn get_capabilities(&self) -> Result<&remexec::ServerCapabilities, String> {
async fn get_capabilities(&self) -> Result<&remexec::ServerCapabilities, ByteStoreError> {
let capabilities_fut = async {
let mut request = remexec::GetCapabilitiesRequest::default();
if let Some(s) = self.instance_name.as_ref() {
Expand All @@ -477,7 +534,7 @@ impl ByteStore {
.get_capabilities(request)
.await
.map(|r| r.into_inner())
.map_err(status_to_str)
.map_err(ByteStoreError::Grpc)
};

self
Expand Down
3 changes: 3 additions & 0 deletions src/rust/engine/fs/store/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ async fn write_file_multiple_chunks() {
1,
256,
None,
0, // disable batch API, force streaming API
)
.unwrap();

Expand Down Expand Up @@ -230,6 +231,7 @@ async fn write_connection_error() {
1,
256,
None,
super::tests::STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap();
let error = store
Expand Down Expand Up @@ -307,6 +309,7 @@ fn new_byte_store(cas: &StubCAS) -> ByteStore {
1,
256,
None,
super::tests::STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap()
}
Expand Down
7 changes: 7 additions & 0 deletions src/rust/engine/fs/store/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use crate::{
MEGABYTES,
};

pub(crate) const STORE_BATCH_API_SIZE_LIMIT: usize = 4 * 1024 * 1024;

impl LoadMetadata {
fn is_remote(&self) -> bool {
match self {
Expand Down Expand Up @@ -105,6 +107,7 @@ fn new_store<P: AsRef<Path>>(dir: P, cas_address: &str) -> Store {
1,
256,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap()
}
Expand Down Expand Up @@ -847,6 +850,7 @@ async fn instance_name_upload() {
1,
256,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap();

Expand Down Expand Up @@ -876,6 +880,7 @@ async fn instance_name_download() {
1,
256,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap();

Expand Down Expand Up @@ -927,6 +932,7 @@ async fn auth_upload() {
1,
256,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap();

Expand Down Expand Up @@ -958,6 +964,7 @@ async fn auth_download() {
1,
256,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl StoreSetup {
1,
256,
None,
4 * 1024 * 1024,
)
.unwrap();
StoreSetup {
Expand Down
6 changes: 6 additions & 0 deletions src/rust/engine/process_execution/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use tonic::{Code, Status};
const OVERALL_DEADLINE_SECS: Duration = Duration::from_secs(10 * 60);
const RETRY_INTERVAL: Duration = Duration::from_micros(0);
const STORE_CONCURRENCY_LIMIT: usize = 256;
const STORE_BATCH_API_SIZE_LIMIT: usize = 4 * 1024 * 1024;
const EXEC_CONCURRENCY_LIMIT: usize = 256;
const CACHE_CONCURRENCY_LIMIT: usize = 256;

Expand Down Expand Up @@ -871,6 +872,7 @@ async fn sends_headers() {
1,
STORE_CONCURRENCY_LIMIT,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap();

Expand Down Expand Up @@ -1071,6 +1073,7 @@ async fn ensure_inline_stdio_is_stored() {
1,
STORE_CONCURRENCY_LIMIT,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap();

Expand Down Expand Up @@ -1452,6 +1455,7 @@ async fn execute_missing_file_uploads_if_known() {
1,
STORE_CONCURRENCY_LIMIT,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap();
store
Expand Down Expand Up @@ -1532,6 +1536,7 @@ async fn execute_missing_file_errors_if_unknown() {
1,
STORE_CONCURRENCY_LIMIT,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap();

Expand Down Expand Up @@ -2278,6 +2283,7 @@ pub(crate) fn make_store(
1,
STORE_CONCURRENCY_LIMIT,
None,
STORE_BATCH_API_SIZE_LIMIT,
)
.unwrap()
}
Expand Down
5 changes: 5 additions & 0 deletions src/rust/engine/process_executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ struct Opt {
#[structopt(long, default_value = "128")]
store_rpc_concurrency: usize,

/// Total size of blobs allowed to be sent in a single API call.
#[structopt(long, default_value = "4194304")]
store_batch_api_size_limit: usize,

/// Number of concurrent requests to the execution service.
#[structopt(long, default_value = "128")]
execution_rpc_concurrency: usize,
Expand Down Expand Up @@ -251,6 +255,7 @@ async fn main() {
args.store_rpc_retries,
args.store_rpc_concurrency,
None,
args.store_batch_api_size_limit,
)
}
(None, None) => Ok(local_only_store),
Expand Down
2 changes: 2 additions & 0 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub struct RemotingOptions {
pub store_chunk_upload_timeout: Duration,
pub store_rpc_retries: usize,
pub store_rpc_concurrency: usize,
pub store_batch_api_size_limit: usize,
pub cache_warnings_behavior: RemoteCacheWarningsBehavior,
pub cache_eager_fetch: bool,
pub cache_rpc_concurrency: usize,
Expand Down Expand Up @@ -151,6 +152,7 @@ impl Core {
remoting_opts.store_rpc_retries,
remoting_opts.store_rpc_concurrency,
capabilities_cell_opt,
remoting_opts.store_batch_api_size_limit,
)
} else {
Ok(local_only)
Expand Down
Loading

0 comments on commit 79cd70e

Please sign in to comment.