Skip to content

Commit

Permalink
[Feat][Core] Implement single file module for runtime_env (#47807)
Browse files Browse the repository at this point in the history
Supports single file modules in `py_module` runtime_env.

Signed-off-by: Chi-Sheng Liu <[email protected]>
  • Loading branch information
MortalHappiness authored Oct 11, 2024
1 parent 211e739 commit 61a4220
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 66 deletions.
10 changes: 4 additions & 6 deletions doc/source/ray-core/handling-dependencies.rst
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,6 @@ To ensure your local changes show up across all Ray workers and can be imported

ray.get(f.remote())

Note: This feature is currently limited to modules that are packages with a single directory containing an ``__init__.py`` file. For single-file modules, you may use ``working_dir``.

.. _runtime-environments-api-ref:

API Reference
Expand Down Expand Up @@ -358,13 +356,15 @@ The ``runtime_env`` is a Python dictionary or a Python class :class:`ray.runtime
Note: If the local directory contains symbolic links, Ray follows the links and the files they point to are uploaded to the cluster.

- ``py_modules`` (List[str|module]): Specifies Python modules to be available for import in the Ray workers. (For more ways to specify packages, see also the ``pip`` and ``conda`` fields below.)
Each entry must be either (1) a path to a local directory, (2) a URI to a remote zip or wheel file (see :ref:`remote-uris` for details), (3) a Python module object, or (4) a path to a local `.whl` file.
Each entry must be either (1) a path to a local file or directory, (2) a URI to a remote zip or wheel file (see :ref:`remote-uris` for details), (3) a Python module object, or (4) a path to a local `.whl` file.

- Examples of entries in the list:

- ``"."``

- ``"/local_dependency/my_module"``
- ``"/local_dependency/my_dir_module"``

- ``"/local_dependency/my_file_module.py"``

- ``"s3://bucket/my_module.zip"``

Expand All @@ -380,8 +380,6 @@ The ``runtime_env`` is a Python dictionary or a Python class :class:`ray.runtime

Note: For option (1), if the local directory contains a ``.gitignore`` file, the files and paths specified there are not uploaded to the cluster. You can disable this by setting the environment variable `RAY_RUNTIME_ENV_IGNORE_GITIGNORE=1` on the machine doing the uploading.

Note: This feature is currently limited to modules that are packages with a single directory containing an ``__init__.py`` file. For single-file modules, you may use ``working_dir``.

- ``excludes`` (List[str]): When used with ``working_dir`` or ``py_modules``, specifies a list of files or paths to exclude from being uploaded to the cluster.
This field uses the pattern-matching syntax used by ``.gitignore`` files: see `<https://git-scm.com/docs/gitignore>`_ for details.
Note: In accordance with ``.gitignore`` syntax, if there is a separator (``/``) at the beginning or middle (or both) of the pattern, then the pattern is interpreted relative to the level of the ``working_dir``.
Expand Down
140 changes: 104 additions & 36 deletions python/ray/_private/runtime_env/packaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,56 @@ def _dir_travel(
excludes.pop()


def _hash_file_content_or_directory_name(
filepath: Path,
relative_path: Path,
logger: Optional[logging.Logger] = default_logger,
) -> bytes:
"""Helper function to create hash of a single file or directory.
This function will hash the path of the file or directory,
and if it's a file, it'll hash its content too.
"""

BUF_SIZE = 4096 * 1024

sha1 = hashlib.sha1()
sha1.update(str(filepath.relative_to(relative_path)).encode())
if not filepath.is_dir():
try:
f = filepath.open("rb")
except Exception as e:
logger.debug(
f"Skipping contents of file {filepath} when calculating package hash "
f"because the file could not be opened: {e}"
)
else:
try:
data = f.read(BUF_SIZE)
while len(data) != 0:
sha1.update(data)
data = f.read(BUF_SIZE)
finally:
f.close()

return sha1.digest()


def _hash_file(
filepath: Path,
relative_path: Path,
logger: Optional[logging.Logger] = default_logger,
) -> bytes:
"""Helper function to create hash of a single file.
It'll hash the path of the file and its content to create a hash value.
"""
file_hash = _hash_file_content_or_directory_name(
filepath, relative_path, logger=logger
)
return _xor_bytes(file_hash, b"0" * 8)


def _hash_directory(
root: Path,
relative_path: Path,
Expand All @@ -147,30 +197,13 @@ def _hash_directory(
hash(file_name, file_content) to create a hash value.
"""
hash_val = b"0" * 8
BUF_SIZE = 4096 * 1024

def handler(path: Path):
sha1 = hashlib.sha1()
sha1.update(str(path.relative_to(relative_path)).encode())
if not path.is_dir():
try:
f = path.open("rb")
except Exception as e:
logger.debug(
f"Skipping contents of file {path} when calculating package hash "
f"because the file could not be opened: {e}"
)
else:
try:
data = f.read(BUF_SIZE)
while len(data) != 0:
sha1.update(data)
data = f.read(BUF_SIZE)
finally:
f.close()

file_hash = _hash_file_content_or_directory_name(
path, relative_path, logger=logger
)
nonlocal hash_val
hash_val = _xor_bytes(hash_val, sha1.digest())
hash_val = _xor_bytes(hash_val, file_hash)

excludes = [] if excludes is None else [excludes]
_dir_travel(root, excludes, handler, logger=logger)
Expand Down Expand Up @@ -378,16 +411,16 @@ def _get_local_path(base_directory: str, pkg_uri: str) -> str:
return os.path.join(base_directory, pkg_name)


def _zip_directory(
directory: str,
def _zip_files(
path_str: str,
excludes: List[str],
output_path: str,
include_parent_dir: bool = False,
logger: Optional[logging.Logger] = default_logger,
) -> None:
"""Zip the target directory and write it to the output_path.
"""Zip the target file or directory and write it to the output_path.
directory: The directory to zip.
path_str: The file or directory to zip.
excludes (List(str)): The directories or file to be excluded.
output_path: The output path for the zip file.
include_parent_dir: If true, includes the top-level directory as a
Expand All @@ -396,7 +429,10 @@ def _zip_directory(
pkg_file = Path(output_path).absolute()
with ZipFile(pkg_file, "w", strict_timestamps=False) as zip_handler:
# Put all files in the directory into the zip file.
dir_path = Path(directory).absolute()
file_path = Path(path_str).absolute()
dir_path = file_path
if file_path.is_file():
dir_path = file_path.parent

def handler(path: Path):
# Pack this path if it's an empty directory or it's a file.
Expand All @@ -415,8 +451,8 @@ def handler(path: Path):
to_path = dir_path.name / to_path
zip_handler.write(path, to_path)

excludes = [_get_excludes(dir_path, excludes)]
_dir_travel(dir_path, excludes, handler, logger=logger)
excludes = [_get_excludes(file_path, excludes)]
_dir_travel(file_path, excludes, handler, logger=logger)


def package_exists(pkg_uri: str) -> bool:
Expand Down Expand Up @@ -451,6 +487,38 @@ def get_uri_for_package(package: Path) -> str:
)


def get_uri_for_file(file: str) -> str:
"""Get a content-addressable URI from a file's content.
This function will generate the name of the package by the file.
The final package name is: _ray_pkg_<HASH_VAL>.zip of this package.
e.g., _ray_pkg_029f88d5ecc55e1e4d64fc6e388fd103.zip
Examples:
>>> get_uri_for_file("/my_file.py") # doctest: +SKIP
_ray_pkg_af2734982a741.zip
Args:
file: The file.
Returns:
URI (str)
Raises:
ValueError if the file doesn't exist.
"""
filepath = Path(file).absolute()
if not filepath.exists() or not filepath.is_file():
raise ValueError(f"File {filepath} must be an existing file")

hash_val = _hash_file(filepath, filepath.parent)

return "{protocol}://{pkg_name}.zip".format(
protocol=Protocol.GCS.value, pkg_name=RAY_PKG_PREFIX + hash_val.hex()
)


def get_uri_for_directory(directory: str, excludes: Optional[List[str]] = None) -> str:
"""Get a content-addressable URI from a directory's contents.
Expand Down Expand Up @@ -515,7 +583,7 @@ def upload_package_to_gcs(pkg_uri: str, pkg_bytes: bytes) -> None:


def create_package(
directory: str,
module_path: str,
target_path: Path,
include_parent_dir: bool = False,
excludes: Optional[List[str]] = None,
Expand All @@ -528,11 +596,11 @@ def create_package(
logger = default_logger

if not target_path.exists():
logger.info(f"Creating a file package for local directory '{directory}'.")
_zip_directory(
directory,
logger.info(f"Creating a file package for local module '{module_path}'.")
_zip_files(
module_path,
excludes,
target_path,
str(target_path),
include_parent_dir=include_parent_dir,
logger=logger,
)
Expand All @@ -541,7 +609,7 @@ def create_package(
def upload_package_if_needed(
pkg_uri: str,
base_directory: str,
directory: str,
module_path: str,
include_parent_dir: bool = False,
excludes: Optional[List[str]] = None,
logger: Optional[logging.Logger] = default_logger,
Expand All @@ -556,7 +624,7 @@ def upload_package_if_needed(
Args:
pkg_uri: URI of the package to upload.
base_directory: Directory where package files are stored.
directory: Directory to be uploaded.
module_path: The module to be uploaded, either a single .py file or a directory.
include_parent_dir: If true, includes the top-level directory as a
directory inside the zip file.
excludes: List specifying files to exclude.
Expand Down Expand Up @@ -586,7 +654,7 @@ def upload_package_if_needed(
f"{time.time_ns()}_{os.getpid()}_{package_file.name}"
)
create_package(
directory,
module_path,
package_file,
include_parent_dir=include_parent_dir,
excludes=excludes,
Expand Down
37 changes: 24 additions & 13 deletions python/ray/_private/runtime_env/py_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
delete_package,
download_and_unpack_package,
get_local_dir_from_uri,
get_uri_for_file,
get_uri_for_directory,
get_uri_for_package,
install_wheel_package,
Expand Down Expand Up @@ -71,15 +72,20 @@ def upload_py_modules_if_needed(
elif isinstance(module, Path):
module_path = str(module)
elif isinstance(module, ModuleType):
# NOTE(edoakes): Python allows some installed Python packages to
# be split into multiple directories. We could probably handle
# this, but it seems tricky & uncommon. If it's a problem for
# users, we can add this support on demand.
if len(module.__path__) > 1:
raise ValueError(
"py_modules only supports modules whose __path__ has length 1."
)
[module_path] = module.__path__
if not hasattr(module, "__path__"):
# This is a single-file module.
module_path = module.__file__
else:
# NOTE(edoakes): Python allows some installed Python packages to
# be split into multiple directories. We could probably handle
# this, but it seems tricky & uncommon. If it's a problem for
# users, we can add this support on demand.
if len(module.__path__) > 1:
raise ValueError(
"py_modules only supports modules whose __path__"
" has length 1 or those who are single-file."
)
[module_path] = module.__path__
else:
raise TypeError(
"py_modules must be a list of file paths, URIs, "
Expand All @@ -90,17 +96,21 @@ def upload_py_modules_if_needed(
module_uri = module_path
else:
# module_path is a local path.
if Path(module_path).is_dir():
if Path(module_path).is_dir() or Path(module_path).suffix == ".py":
is_dir = Path(module_path).is_dir()
excludes = runtime_env.get("excludes", None)
module_uri = get_uri_for_directory(module_path, excludes=excludes)
if is_dir:
module_uri = get_uri_for_directory(module_path, excludes=excludes)
else:
module_uri = get_uri_for_file(module_path)
if upload_fn is None:
try:
upload_package_if_needed(
module_uri,
scratch_dir,
module_path,
excludes=excludes,
include_parent_dir=True,
include_parent_dir=is_dir,
logger=logger,
)
except Exception as e:
Expand Down Expand Up @@ -136,7 +146,8 @@ def upload_py_modules_if_needed(
upload_fn(module_path, excludes=None, is_file=True)
else:
raise ValueError(
"py_modules entry must be a directory or a .whl file; "
"py_modules entry must be a .py file, "
"a directory, or a .whl file; "
f"got {module_path}"
)

Expand Down
5 changes: 5 additions & 0 deletions python/ray/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,11 @@ def tmp_working_dir():
with hello_file.open(mode="w") as f:
f.write("world")

test_file_module = path / "file_module.py"
with test_file_module.open(mode="w") as f:
f.write("def hello():\n")
f.write(" return 'hello'\n")

module_path = path / "test_module"
module_path.mkdir(parents=True)

Expand Down
Loading

0 comments on commit 61a4220

Please sign in to comment.