Skip to content

Commit

Permalink
[data] Support retries across datasinks and sources (ray-project#50091)
Browse files Browse the repository at this point in the history
## Why are these changes needed?

* Wraps all filesystem calls with RetryingPyFileSystem
* Avoids sprawling redundant changes on Reader and Datasource objects by
capturing the access point with iterate_with_retry
## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: Richard Liaw <[email protected]>
  • Loading branch information
richardliaw authored and ryanaoleary committed Jan 29, 2025
1 parent 0c8b860 commit ef4c567
Show file tree
Hide file tree
Showing 10 changed files with 381 additions and 154 deletions.
2 changes: 1 addition & 1 deletion python/ray/data/_internal/datasource/bigquery_datasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import pyarrow.parquet as pq

import ray
from ray.data._internal.execution.interfaces import TaskContext
from ray.data._internal.datasource import bigquery_datasource
from ray.data._internal.execution.interfaces import TaskContext
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.util import _check_import
from ray.data.block import Block, BlockAccessor
Expand Down
3 changes: 1 addition & 2 deletions python/ray/data/_internal/datasource/parquet_datasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from ray.data._internal.execution.interfaces import TaskContext
from ray.data._internal.util import call_with_retry
from ray.data.block import Block, BlockAccessor
from ray.data.context import DataContext
from ray.data.datasource.file_based_datasource import _resolve_kwargs
from ray.data.datasource.file_datasink import _FileDatasink
from ray.data.datasource.filename_provider import FilenameProvider
Expand Down Expand Up @@ -95,7 +94,7 @@ def write_blocks_to_path():
call_with_retry(
write_blocks_to_path,
description=f"write '{filename}' to '{self.path}'",
match=DataContext.get_current().retried_io_errors,
match=self._data_context.retried_io_errors,
max_attempts=WRITE_FILE_MAX_ATTEMPTS,
max_backoff_s=WRITE_FILE_RETRY_MAX_BACKOFF_SECONDS,
)
Expand Down
4 changes: 4 additions & 0 deletions python/ray/data/_internal/datasource/parquet_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.util import (
RetryingPyFileSystem,
_check_pyarrow_version,
_is_local_scheme,
call_with_retry,
Expand Down Expand Up @@ -196,6 +197,9 @@ def __init__(
)

paths, filesystem = _resolve_paths_and_filesystem(paths, filesystem)
filesystem = RetryingPyFileSystem.wrap(
filesystem, context=DataContext.get_current()
)

# HACK: PyArrow's `ParquetDataset` errors if input paths contain non-parquet
# files. To avoid this, we expand the input paths with the default metadata
Expand Down
15 changes: 10 additions & 5 deletions python/ray/data/_internal/datasource/tfrecords_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,19 @@ def _tfx_read_stream(self, f: "pyarrow.NativeFile", path: str) -> Iterator[Block
raise RuntimeError(f"Failed to read TFRecord file {full_path}.")

def _resolve_full_path(self, relative_path):
if isinstance(self._filesystem, pyarrow.fs.S3FileSystem):
from ray.data._internal.util import RetryingPyFileSystem

filesystem = self._filesystem
if isinstance(filesystem, RetryingPyFileSystem):
filesystem = filesystem.unwrap()
if isinstance(filesystem, pyarrow.fs.S3FileSystem):
return f"s3://{relative_path}"
if isinstance(self._filesystem, pyarrow.fs.GcsFileSystem):
if isinstance(filesystem, pyarrow.fs.GcsFileSystem):
return f"gs://{relative_path}"
if isinstance(self._filesystem, pyarrow.fs.HadoopFileSystem):
if isinstance(filesystem, pyarrow.fs.HadoopFileSystem):
return f"hdfs:///{relative_path}"
if isinstance(self._filesystem, pyarrow.fs.PyFileSystem):
protocol = self._filesystem.handler.fs.protocol
if isinstance(filesystem, pyarrow.fs.PyFileSystem):
protocol = filesystem.handler.fs.protocol
if isinstance(protocol, list) or isinstance(protocol, tuple):
protocol = protocol[0]
if protocol == "gcs":
Expand Down
6 changes: 3 additions & 3 deletions python/ray/data/_internal/datasource/webdataset_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from functools import partial
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union

import ray
from ray.data._internal.util import iterate_with_retry
from ray.data.block import BlockAccessor
from ray.data.datasource.file_based_datasource import FileBasedDatasource
Expand Down Expand Up @@ -353,9 +352,10 @@ def get_tar_file_iterator():
)

# S3 can raise transient errors during iteration
ctx = ray.data.DataContext.get_current()
files = iterate_with_retry(
get_tar_file_iterator, "iterate tar file", match=ctx.retried_io_errors
get_tar_file_iterator,
"iterate tar file",
match=self._data_context.retried_io_errors,
)

samples = _group_by_keys(files, meta=dict(__url__=path), suffixes=self.suffixes)
Expand Down
260 changes: 252 additions & 8 deletions python/ray/data/_internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
)

import numpy as np
import pyarrow

import ray
from ray._private.utils import _get_pyarrow_version
from ray.data.context import DEFAULT_READ_OP_MIN_NUM_BLOCKS, WARN_PREFIX, DataContext

if TYPE_CHECKING:
import pandas
import pyarrow

from ray.data._internal.compute import ComputeStrategy
from ray.data._internal.planner.exchange.sort_task_spec import SortKey
Expand Down Expand Up @@ -1108,6 +1108,251 @@ def _run_transforming_worker(worker_id: int):
interrupted_event.set()


class RetryingContextManager:
def __init__(
self,
f: pyarrow.NativeFile,
context: DataContext,
max_attempts: int = 10,
max_backoff_s: int = 32,
):
self._f = f
self._data_context = context
self._max_attempts = max_attempts
self._max_backoff_s = max_backoff_s

def _retry_operation(self, operation: Callable, description: str):
"""Execute an operation with retries."""
return call_with_retry(
operation,
description=description,
match=self._data_context.retried_io_errors,
max_attempts=self._max_attempts,
max_backoff_s=self._max_backoff_s,
)

def __enter__(self):
return self._retry_operation(self._f.__enter__, "enter file context")

def __exit__(self, exc_type, exc_value, traceback):
self._retry_operation(
lambda: self._f.__exit__(exc_type, exc_value, traceback),
"exit file context",
)


class RetryingPyFileSystem(pyarrow.fs.PyFileSystem):
def __init__(self, handler: "RetryingPyFileSystemHandler"):
if not isinstance(handler, RetryingPyFileSystemHandler):
assert ValueError("handler must be a RetryingPyFileSystemHandler")
super().__init__(handler)

@property
def data_context(self):
return self.handler.data_context

def unwrap(self):
return self.handler.unwrap()

@classmethod
def wrap(
cls,
fs: "pyarrow.fs.FileSystem",
context: DataContext,
max_attempts: int = 10,
max_backoff_s: int = 32,
):
if isinstance(fs, RetryingPyFileSystem):
return fs
handler = RetryingPyFileSystemHandler(fs, context, max_attempts, max_backoff_s)
return cls(handler)

def __reduce__(self):
# Serialization of this class breaks for some reason without this
return (self.__class__, (self.handler,))

@classmethod
def __setstate__(cls, state):
# Serialization of this class breaks for some reason without this
return cls(*state)


class RetryingPyFileSystemHandler(pyarrow.fs.FileSystemHandler):
"""Wrapper for filesystem objects that adds retry functionality for file operations.
This class wraps any filesystem object and adds automatic retries for common
file operations that may fail transiently.
"""

def __init__(
self,
fs: "pyarrow.fs.FileSystem",
context: DataContext,
max_attempts: int = 10,
max_backoff_s: int = 32,
):
"""Initialize the retrying filesystem wrapper.
Args:
fs: The underlying filesystem to wrap
context: DataContext for retry settings
max_attempts: Maximum number of retry attempts
max_backoff_s: Maximum backoff time in seconds
"""
assert not isinstance(
fs, RetryingPyFileSystem
), "Cannot wrap a RetryingPyFileSystem"
self._fs = fs
self._data_context = context
self._max_attempts = max_attempts
self._max_backoff_s = max_backoff_s

@property
def data_context(self):
return self._data_context

def _retry_operation(self, operation: Callable, description: str):
"""Execute an operation with retries."""
return call_with_retry(
operation,
description=description,
match=self._data_context.retried_io_errors,
max_attempts=self._max_attempts,
max_backoff_s=self._max_backoff_s,
)

def unwrap(self):
return self._fs

def copy_file(self, src: str, dest: str):
"""Copy a file."""
return self._retry_operation(
lambda: self._fs.copy_file(src, dest), f"copy file from {src} to {dest}"
)

def create_dir(self, path: str, recursive: bool):
"""Create a directory and subdirectories."""
return self._retry_operation(
lambda: self._fs.create_dir(path, recursive=recursive),
f"create directory {path}",
)

def delete_dir(self, path: str):
"""Delete a directory and its contents, recursively."""
return self._retry_operation(
lambda: self._fs.delete_dir(path), f"delete directory {path}"
)

def delete_dir_contents(self, path: str, missing_dir_ok: bool = False):
"""Delete a directory's contents, recursively."""
return self._retry_operation(
lambda: self._fs.delete_dir_contents(path, missing_dir_ok=missing_dir_ok),
f"delete directory contents {path}",
)

def delete_file(self, path: str):
"""Delete a file."""
return self._retry_operation(
lambda: self._fs.delete_file(path), f"delete file {path}"
)

def delete_root_dir_contents(self):
return self._retry_operation(
lambda: self._fs.delete_dir_contents("/", accept_root_dir=True),
"delete root dir contents",
)

def equals(self, other: "pyarrow.fs.FileSystem") -> bool:
"""Test if this filesystem equals another."""
return self._fs.equals(other)

def get_file_info(self, paths: List[str]):
"""Get info for the given files."""
return self._retry_operation(
lambda: self._fs.get_file_info(paths),
f"get file info for {paths}",
)

def get_file_info_selector(self, selector):
return self._retry_operation(
lambda: self._fs.get_file_info(selector),
f"get file info for {selector}",
)

def get_type_name(self):
return "RetryingPyFileSystem"

def move(self, src: str, dest: str):
"""Move / rename a file or directory."""
return self._retry_operation(
lambda: self._fs.move(src, dest), f"move from {src} to {dest}"
)

def normalize_path(self, path: str) -> str:
"""Normalize filesystem path."""
return self._retry_operation(
lambda: self._fs.normalize_path(path), f"normalize path {path}"
)

def open_append_stream(
self,
path: str,
metadata=None,
) -> "pyarrow.NativeFile":
"""Open an output stream for appending.
Compression is disabled in this method because it is handled in the
PyFileSystem abstract class.
"""
return self._retry_operation(
lambda: self._fs.open_append_stream(
path,
compression=None,
metadata=metadata,
),
f"open append stream for {path}",
)

def open_input_stream(
self,
path: str,
) -> "pyarrow.NativeFile":
"""Open an input stream for sequential reading.
Compression is disabled in this method because it is handled in the
PyFileSystem abstract class.
"""
return self._retry_operation(
lambda: self._fs.open_input_stream(path, compression=None),
f"open input stream for {path}",
)

def open_output_stream(
self,
path: str,
metadata=None,
) -> "pyarrow.NativeFile":
"""Open an output stream for sequential writing."
Compression is disabled in this method because it is handled in the
PyFileSystem abstract class.
"""
return self._retry_operation(
lambda: self._fs.open_output_stream(
path,
compression=None,
metadata=metadata,
),
f"open output stream for {path}",
)

def open_input_file(self, path: str) -> "pyarrow.NativeFile":
"""Open an input file for random access reading."""
return self._retry_operation(
lambda: self._fs.open_input_file(path), f"open input file {path}"
)


def call_with_retry(
f: Callable[[], Any],
description: str,
Expand All @@ -1133,17 +1378,18 @@ def call_with_retry(
try:
return f()
except Exception as e:
is_retryable = match is None or any(
[pattern in str(e) for pattern in match]
)
is_retryable = match is None or any(pattern in str(e) for pattern in match)
if is_retryable and i + 1 < max_attempts:
# Retry with binary expoential backoff with random jitter.
backoff = min((2 ** (i + 1)), max_backoff_s) * random.random()
backoff = min((2 ** (i + 1)), max_backoff_s) * (random.random())
logger.debug(
f"Retrying {i+1} attempts to {description} after {backoff} seconds."
)
time.sleep(backoff)
else:
logger.debug(
f"Did not find a match for {str(e)}. Raising after {i+1} attempts."
)
raise e from None


Expand Down Expand Up @@ -1184,9 +1430,7 @@ def iterate_with_retry(
yield item
return
except Exception as e:
is_retryable = match is None or any(
[pattern in str(e) for pattern in match]
)
is_retryable = match is None or any(pattern in str(e) for pattern in match)
if is_retryable and attempt + 1 < max_attempts:
# Retry with binary expoential backoff with random jitter.
backoff = min((2 ** (attempt + 1)), max_backoff_s) * random.random()
Expand Down
Loading

0 comments on commit ef4c567

Please sign in to comment.