Skip to content

Commit

Permalink
Add support for OpenDAL-backed remote stores (file://, initially) (#1…
Browse files Browse the repository at this point in the history
…9827)

This is the final major step that unlocks #11149: 

- adding new byte store and action cache providers, implemented using
https://opendal.apache.org
- plumbing through to be able to be use them via `remote_store_address =
"..."`
- (and implementing an "example" one, that's backed by the local file
system)

This doesn't yet resolve that issue, as it doesn't implement any sort of
useful "simple" truly-remote cache, but it sets up most of the
infrastructure.

### User facing change

This only adds support for one additional store for now:
`experimental:file:///path/to/dir`, which writes the store to
`/path/to/dir`.

This is store is enough to validate that this functionality (mostly)
works, and even seems like it would be useful for:
- debugging/testing remote cache behaviour (e.g. can set up reduced
reproducers for remote caching bugs using normal file manipulation
commands)
- potentially for real, for using an NFS mount as a remote cache,
without forcing _all_ of Pants' caches to have the NFS overhead.

The user facing functionality here is configured entirely through
`remote_store_address`: that option now supports URIs that have schemes
other than `grpc` or `grpcs`. To start with, just `file`. It also
explicitly supports these schemes being experimental, communicated
through the `experimental:` scheme-prefix. This is to, hopefully, allow
landing the feature earlier and with less risk.

For example, running `pants test ::` in the follow example writes
various blobs to subdirectories of `/tmp/remote-cache-example` (e.g.
`action-cache/e3/2f/e32f034f...` and `byte-store/50/ce/50ce4a68...`),
and a second `pants --no-pantsd --no-local-cache test ::` command is
able to service it from cache, reporting ` //:slow-test succeeded in
5.01s (cached remotely)`

```toml
# pants.toml
[GLOBAL]
pants_version = "2.17.0"

backend_packages = ["pants.backend.shell"]

remote_store_address = "experimental:file:///tmp/remote-cache-example"
remote_cache_read = true
remote_cache_write = true
```

```python
# BUILD
experimental_test_shell_command(name="slow-test", command="sleep 5", tools=["sleep"])
```

### Details of this change

There's three main bits of work:

1. add the new generic OpenDAL byte store and action cache provider (in
the two pairs of `base_opendal.rs` + tests files)
2. hook up the specific FS one (in `store/src/remote.rs` and
`remote/src/remote_cache.rs`)
3. adjust the options parsing to support them, especially the
`experimental:` scheme handling (in `global_options.py`)

None of these are huge in isolation, but they add up to a fair chunk of
code. I think each of those files can be reviewed somewhat in isolation,
in that order.

### Why OpenDAL?

It's used by sccache for similar remote-caching purposes
(https://github.com/search?q=repo%3Amozilla%2Fsccache%20opendal&type=code),
and supports a whole lot of services:

- blob stores (e.g. S3, GCS)
- key-value caches (e.g. Redis)
- CI caches (e.g. GitHub Actions)
- weirder ones (IPFS, Google Drive)

Pants doesn't/shouldn't support all of them, but definitely blob stores
and CI caches seem exciting!

### Next steps

This only adds support for caching "remotely" to the local file system
but I'll add support for GitHub Actions as follow up work (#19831), and
maybe S3 too. Hopefully as part of doing that I'll work through any
authentication (etc.) issues and so it becomes easier for others to add
the backends they need too.

I've suggested we use the URI scheme for deciding on the service, rather
than an explicit `remote_store_service = "reapi"` option, or similar. In
future, some services may use the same scheme, e.g. I imagine several
services might conventionally use `http://` or `https://`. My current
thinking is to solve this similar to pip's [VCS
requirements](https://pip.pypa.io/en/stable/topics/vcs-support/)
(`vcs+transport://...`) or `sqlalchemy` [database
URLs](https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls)
(`dialect+driver://...`), e.g. `remote_store_address = "s3+http://..."`
or `remote_store_address = "http+s3://..."`, and then the provider would
strip off the `s3` part as appropriate.

There's also a whole lot of TODOs that I don't think are critical to
landing this code, and I can chip away at once it's in: by landing this
sooner, I'm hoping we can start having the user-facing parts of #11149
being tested, validated and enjoyed sooner rather than later. I've
already done a loooot of *pre*-factoring (see all the PRs referencing
#11149), so it'd be nice to switch to having the feature in and just do
some normal *re*-factoring. 😅

TODOs mostly tracked in #19902
  • Loading branch information
huonw authored Sep 21, 2023
1 parent 324d976 commit de1a3d3
Show file tree
Hide file tree
Showing 15 changed files with 1,303 additions and 83 deletions.
5 changes: 3 additions & 2 deletions src/python/pants/engine/internals/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
LOCAL_STORE_LEASE_TIME_SECS,
ExecutionOptions,
LocalStoreOptions,
normalize_remote_address,
)
from pants.util.contextutil import temporary_file_path
from pants.util.logging import LogLevel
Expand Down Expand Up @@ -192,8 +193,8 @@ def __init__(
execution_headers=execution_options.remote_execution_headers,
execution_overall_deadline_secs=execution_options.remote_execution_overall_deadline_secs,
execution_rpc_concurrency=execution_options.remote_execution_rpc_concurrency,
store_address=execution_options.remote_store_address,
execution_address=execution_options.remote_execution_address,
store_address=normalize_remote_address(execution_options.remote_store_address),
execution_address=normalize_remote_address(execution_options.remote_execution_address),
execution_process_cache_namespace=execution_options.process_execution_cache_namespace,
instance_name=execution_options.remote_instance_name,
root_ca_certs_path=execution_options.remote_ca_certs_path,
Expand Down
298 changes: 247 additions & 51 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,213 @@ class DynamicUIRenderer(Enum):

_G = TypeVar("_G", bound="_GlobMatchErrorBehaviorOptionBase")

_EXPERIMENTAL_SCHEME = "experimental:"


def normalize_remote_address(addr: str | None) -> str | None:
if addr is None:
return None
return addr.removeprefix(_EXPERIMENTAL_SCHEME)


@dataclass(frozen=True)
class _RemoteAddressScheme:
schemes: tuple[str, ...]
supports_execution: bool
experimental: bool
description: str

def rendered_schemes(self) -> tuple[str, ...]:
"""Convert the schemes into what the user needs to write.
For example: `experimental:some-scheme://` if experimental, or `some-scheme://` if not.
This includes the :// because that's clearer in docs etc, even if it's not 'technically'
part of the scheme.
"""
# `experimental:` is used as a prefix-scheme, riffing on `view-source:https://...` in some
# web browsers. This ensures the experimental status is communicated right where a user is
# opting-in to using it.
experimental_prefix = _EXPERIMENTAL_SCHEME if self.experimental else ""
return tuple(f"{experimental_prefix}{scheme}://" for scheme in self.schemes)

@staticmethod
def _validate_address(
schemes: tuple[_RemoteAddressScheme, ...],
addr: str,
require_execution: bool,
context_for_diagnostics: str,
) -> None:
addr_is_experimental = addr.startswith(_EXPERIMENTAL_SCHEME)
experimentalless_addr = addr.removeprefix(_EXPERIMENTAL_SCHEME)

matching_scheme = next(
(
(scheme_str, scheme)
for scheme in schemes
for scheme_str in scheme.schemes
if experimentalless_addr.startswith(f"{scheme_str}://")
),
None,
)

if matching_scheme is None:
# This an address that doesn't seem to have a scheme we understand.
supported_schemes = ", ".join(
f"`{rendered}`" for scheme in schemes for rendered in scheme.rendered_schemes()
)
raise OptionsError(
softwrap(
f"""
{context_for_diagnostics} has invalid value `{addr}`: it does not have a
supported scheme.
The value must start with one of: {supported_schemes}
"""
)
)

scheme_str, scheme = matching_scheme

if scheme.experimental and not addr_is_experimental:
# This is a URL like `some-scheme://` for a scheme that IS experimental, so let's tell
# the user they need to specify it as `experimental:some-scheme://`.
raise OptionsError(
softwrap(
f"""
{context_for_diagnostics} has invalid value `{addr}`: the scheme `{scheme_str}`
is experimental and thus must include the `{_EXPERIMENTAL_SCHEME}` prefix to
opt-in to this less-stable Pants feature.
Specify the value as `{_EXPERIMENTAL_SCHEME}{addr}`, with the
`{_EXPERIMENTAL_SCHEME}` prefix.
"""
)
)

if not scheme.experimental and addr_is_experimental:
# This is a URL like `experimental:some-scheme://...` for a scheme that's NOT experimental,
# so let's tell the user to fix it up as `some-scheme://...`. It's low importance (we
# can unambigiously tell what they mean), so a warning is fine.
logger.warning(
softwrap(
f"""
{context_for_diagnostics} has value `{addr}` including `{_EXPERIMENTAL_SCHEME}`
prefix, but the scheme `{scheme_str}` is not experimental.
Specify the value as `{experimentalless_addr}`, without the `{_EXPERIMENTAL_SCHEME}`
prefix.
"""
)
)

if require_execution and not scheme.supports_execution:
# The address is being used for remote execution, but the scheme doesn't support it.
supported_execution_schemes = ", ".join(
f"`{rendered}`"
for scheme in schemes
if scheme.supports_execution
for rendered in scheme.rendered_schemes()
)
raise OptionsError(
softwrap(
f"""
{context_for_diagnostics} has invalid value `{addr}`: the scheme `{scheme_str}`
does not support remote execution.
Either remove the value (and disable remote execution), or use an address for a
server does support remote execution, starting with one of:
{supported_execution_schemes} """
)
)

# Validated, all good!

@staticmethod
def validate_address(addr: str, require_execution: bool, context_for_diagnostics: str) -> None:
_RemoteAddressScheme._validate_address(
_REMOTE_ADDRESS_SCHEMES,
addr=addr,
require_execution=require_execution,
context_for_diagnostics=context_for_diagnostics,
)

@staticmethod
def address_help(context: str, extra: str, requires_execution: bool) -> Callable[[object], str]:
def render_list_item(scheme_strs: tuple[str, ...], description: str) -> str:
schemes = ", ".join(f"`{s}`" for s in scheme_strs)
return f"- {schemes}: {description}"

def renderer(_: object) -> str:
supported_schemes = [
(scheme.rendered_schemes(), scheme.description)
for scheme in _REMOTE_ADDRESS_SCHEMES
if not requires_execution or (requires_execution and scheme.supports_execution)
]
if requires_execution:
# If this is the help for remote execution, still include the schemes that don't
# support it, but mark them as such.
supported_schemes.append(
(
tuple(
scheme_str
for scheme in _REMOTE_ADDRESS_SCHEMES
if not scheme.supports_execution
for scheme_str in scheme.rendered_schemes()
),
"Remote execution is not supported.",
)
)

schemes = "\n\n".join(
render_list_item(scheme_strs, description)
for scheme_strs, description in supported_schemes
)
extra_inline = f"\n\n{extra}" if extra else ""
return softwrap(
f"""
The URI of a server/entity used as a {context}.{extra_inline}
Supported schemes:
{schemes}
"""
)

return renderer


# This duplicates logic/semantics around choosing a byte store/action cache (and, even, technically,
# remote execution) provider: it'd be nice to have it in one place, but huonw thinks we do the
# validation before starting the engine, and, in any case, we can refactor our way there (the remote
# providers aren't configured in one place yet)
_REMOTE_ADDRESS_SCHEMES = (
_RemoteAddressScheme(
schemes=("grpc", "grpcs"),
supports_execution=True,
experimental=False,
description=softwrap(
"""
Use a [Remote Execution API](https://github.com/bazelbuild/remote-apis) remote
caching/execution server. `grpcs` uses TLS while `grpc` does not. Format:
`grpc[s]://$host:$port`.
"""
),
),
_RemoteAddressScheme(
schemes=("file",),
supports_execution=False,
experimental=True,
description=softwrap(
"""
Use a local directory as a 'remote' store, for testing, debugging, or potentially an NFS
mount. Format: `file://$path`. For example: `file:///tmp/remote-cache-example/` will
store within the `/tmp/remote-cache-example/` directory, creating it if necessary.
"""
),
),
)


@dataclass(frozen=True)
class _GlobMatchErrorBehaviorOptionBase:
Expand Down Expand Up @@ -149,10 +356,10 @@ class AuthPluginResult:
the merge strategy if your plugin sets conflicting headers. Usually, you will want to preserve
the `initial_store_headers` and `initial_execution_headers` passed to the plugin.
If set, the returned `instance_name` will override `[GLOBAL].remote_instance_name`, `store_address`
will override `[GLOBAL].remote_store_address`, and `execution_address` will override
``[GLOBAL].remote_execution_address``. The store address and execution address must be prefixed with
`grpc://` or `grpcs://`.
If set, the returned `instance_name` will override `[GLOBAL].remote_instance_name`,
`store_address` will override `[GLOBAL].remote_store_address`, and `execution_address` will
override ``[GLOBAL].remote_execution_address``. The addresses are interpreted and validated in
the same manner as the corresponding option.
"""

state: AuthPluginState
Expand All @@ -165,23 +372,21 @@ class AuthPluginResult:
plugin_name: str | None = None

def __post_init__(self) -> None:
def assert_valid_address(addr: str | None, field_name: str) -> None:
valid_schemes = [f"{scheme}://" for scheme in ("grpc", "grpcs")]
if addr and not any(addr.startswith(scheme) for scheme in valid_schemes):
name = self.plugin_name or ""
raise ValueError(
softwrap(
f"""
Invalid `{field_name}` in `AuthPluginResult` returned from
`[GLOBAL].remote_auth_plugin` {name}.
Must start with `grpc://` or `grpcs://`, but was {addr}.
"""
)
)
name = self.plugin_name or ""
plugin_context = f"in `AuthPluginResult` returned from `[GLOBAL].remote_auth_plugin` {name}"

assert_valid_address(self.store_address, "store_address")
assert_valid_address(self.execution_address, "execution_address")
if self.store_address:
_RemoteAddressScheme.validate_address(
self.store_address,
require_execution=False,
context_for_diagnostics=f"`store_address` {plugin_context}",
)
if self.execution_address:
_RemoteAddressScheme.validate_address(
self.execution_address,
require_execution=True,
context_for_diagnostics=f"`execution_address` {plugin_context}",
)

@property
def is_available(self) -> bool:
Expand Down Expand Up @@ -468,6 +673,7 @@ def _normalize_address(cls, address: str | None) -> str | None:
# NB: Tonic expects the schemes `http` and `https`, even though they are gRPC requests.
# We validate that users set `grpc` and `grpcs` in the options system / plugin for clarity,
# but then normalize to `http`/`https`.
# TODO: move this logic into the actual remote providers
return re.sub(r"^grpc", "http", address) if address else None


Expand Down Expand Up @@ -1367,6 +1573,7 @@ class BootstrapOptions:
"""
),
)
# TODO: update all these remote_... option helps for the new support for non-REAPI schemes
remote_instance_name = StrOption(
default=None,
advanced=True,
Expand Down Expand Up @@ -1413,13 +1620,10 @@ class BootstrapOptions:
remote_store_address = StrOption(
advanced=True,
default=cast(str, DEFAULT_EXECUTION_OPTIONS.remote_store_address),
help=softwrap(
"""
The URI of a server used for the remote file store.
Format: `scheme://host:port`. The supported schemes are `grpc` and `grpcs`, i.e. gRPC
with TLS enabled. If `grpc` is used, TLS will be disabled.
"""
help=_RemoteAddressScheme.address_help(
"remote file store",
extra="",
requires_execution=False,
),
)
remote_store_headers = DictOption(
Expand Down Expand Up @@ -1488,15 +1692,10 @@ class BootstrapOptions:
remote_execution_address = StrOption(
advanced=True,
default=cast(str, DEFAULT_EXECUTION_OPTIONS.remote_execution_address),
help=softwrap(
"""
The URI of a server used as a remote execution scheduler.
Format: `scheme://host:port`. The supported schemes are `grpc` and `grpcs`, i.e. gRPC
with TLS enabled. If `grpc` is used, TLS will be disabled.
You must also set `[GLOBAL].remote_store_address`, which will often be the same value.
"""
help=_RemoteAddressScheme.address_help(
"remote execution scheduler",
extra="You must also set `[GLOBAL].remote_store_address`, which will often be the same value.",
requires_execution=True,
),
)
remote_execution_headers = DictOption(
Expand Down Expand Up @@ -1782,21 +1981,18 @@ def validate_instance(cls, opts):
)
)

def validate_remote_address(opt_name: str) -> None:
valid_schemes = [f"{scheme}://" for scheme in ("grpc", "grpcs")]
address = getattr(opts, opt_name)
if address and not any(address.startswith(scheme) for scheme in valid_schemes):
raise OptionsError(
softwrap(
f"""
The `{opt_name}` option must begin with one of {valid_schemes}, but
was {address}.
"""
)
)

validate_remote_address("remote_execution_address")
validate_remote_address("remote_store_address")
if opts.remote_execution_address:
_RemoteAddressScheme.validate_address(
opts.remote_execution_address,
require_execution=True,
context_for_diagnostics="The `[GLOBAL].remote_execution_address` option",
)
if opts.remote_store_address:
_RemoteAddressScheme.validate_address(
opts.remote_store_address,
require_execution=False,
context_for_diagnostics="The `[GLOBAL].remote_store_address` option",
)

# Ensure that remote headers are ASCII.
def validate_remote_headers(opt_name: str) -> None:
Expand Down
Loading

0 comments on commit de1a3d3

Please sign in to comment.