Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new dask_cudf.read_parquet API #17250

Merged
merged 28 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3e853f8
add new read_parquet API to dask_cudf
rjzamora Nov 5, 2024
b30c529
fix non-expr deprecation
rjzamora Nov 5, 2024
e3c640a
fix CudfReadParquetFSSpec fusion
rjzamora Nov 6, 2024
e482026
correct for aggregate_files=False
rjzamora Nov 6, 2024
b9af7b7
Merge remote-tracking branch 'upstream/branch-24.12' into new-read-pa…
rjzamora Nov 6, 2024
2ad1867
update default blocksize, and add docstring
rjzamora Nov 6, 2024
6c37a9c
Merge branch 'branch-24.12' into new-read-parquet-api
rjzamora Nov 6, 2024
53dfbdf
Merge branch 'branch-24.12' into new-read-parquet-api
rjzamora Nov 7, 2024
900ebf6
Merge remote-tracking branch 'upstream/branch-24.12' into new-read-pa…
rjzamora Nov 12, 2024
3fb23fd
revise _normalize_blocksize
rjzamora Nov 12, 2024
7af233e
Merge remote-tracking branch 'upstream/branch-24.12' into new-read-pa…
rjzamora Nov 13, 2024
0552b33
fix test
rjzamora Nov 13, 2024
564c13c
proper test fix - and disable dataset processing unless necessary
rjzamora Nov 13, 2024
64dd105
preserve default hive handling in cudf
rjzamora Nov 13, 2024
68b8faf
Merge remote-tracking branch 'upstream/branch-24.12' into new-read-pa…
rjzamora Nov 13, 2024
e074004
Merge branch 'branch-24.12' into new-read-parquet-api
rjzamora Nov 14, 2024
a305398
Merge branch 'branch-24.12' into new-read-parquet-api
vyasr Nov 15, 2024
b539ed4
Merge remote-tracking branch 'upstream/branch-24.12' into new-read-pa…
rjzamora Nov 18, 2024
0291b95
address code review
rjzamora Nov 18, 2024
beafffb
Merge remote-tracking branch 'upstream/branch-24.12' into new-read-pa…
rjzamora Nov 18, 2024
0832e55
sample single worker for device size
rjzamora Nov 18, 2024
db82e0b
Merge remote-tracking branch 'upstream/branch-24.12' into new-read-pa…
rjzamora Nov 18, 2024
df1e283
Merge branch 'branch-24.12' into new-read-parquet-api
rjzamora Nov 19, 2024
2195947
use CUDA_VISIBLE_DEVICES
rjzamora Nov 19, 2024
1038170
Merge branch 'new-read-parquet-api' of github.com:rjzamora/cudf into …
rjzamora Nov 19, 2024
0ecf6dd
support mig
rjzamora Nov 19, 2024
fa03396
Update python/dask_cudf/dask_cudf/io/parquet.py
rjzamora Nov 19, 2024
1b79270
Merge branch 'branch-24.12' into new-read-parquet-api
madsbk Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 3 additions & 133 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,140 +700,10 @@ def from_dict(
)

@staticmethod
def read_parquet(path, *args, filesystem="fsspec", engine=None, **kwargs):
import dask_expr as dx
import fsspec

if (
isinstance(filesystem, fsspec.AbstractFileSystem)
or isinstance(filesystem, str)
and filesystem.lower() == "fsspec"
):
# Default "fsspec" filesystem
from dask_cudf._legacy.io.parquet import CudfEngine
def read_parquet(*args, **kwargs):
from dask_cudf.io.parquet import read_parquet as read_parquet_expr

_raise_unsupported_parquet_kwargs(**kwargs)
return _default_backend(
dx.read_parquet,
path,
*args,
filesystem=filesystem,
engine=CudfEngine,
**kwargs,
)

else:
# EXPERIMENTAL filesystem="arrow" support.
# This code path uses PyArrow for IO, which is only
# beneficial for remote storage (e.g. S3)

from fsspec.utils import stringify_path
from pyarrow import fs as pa_fs

# CudfReadParquetPyarrowFS requires import of distributed beforehand
# (See: https://github.com/dask/dask/issues/11352)
import distributed # noqa: F401
from dask.core import flatten
from dask.dataframe.utils import pyarrow_strings_enabled

from dask_cudf.io.parquet import CudfReadParquetPyarrowFS

if args:
raise ValueError(f"Unexpected positional arguments: {args}")

if not (
isinstance(filesystem, pa_fs.FileSystem)
or isinstance(filesystem, str)
and filesystem.lower() in ("arrow", "pyarrow")
):
raise ValueError(f"Unexpected filesystem value: {filesystem}.")

if not PYARROW_GE_15:
raise NotImplementedError(
"Experimental Arrow filesystem support requires pyarrow>=15"
)

if not isinstance(path, str):
path = stringify_path(path)

# Extract kwargs
columns = kwargs.pop("columns", None)
filters = kwargs.pop("filters", None)
categories = kwargs.pop("categories", None)
index = kwargs.pop("index", None)
storage_options = kwargs.pop("storage_options", None)
dtype_backend = kwargs.pop("dtype_backend", None)
calculate_divisions = kwargs.pop("calculate_divisions", False)
ignore_metadata_file = kwargs.pop("ignore_metadata_file", False)
metadata_task_size = kwargs.pop("metadata_task_size", None)
split_row_groups = kwargs.pop("split_row_groups", "infer")
blocksize = kwargs.pop("blocksize", "default")
aggregate_files = kwargs.pop("aggregate_files", None)
parquet_file_extension = kwargs.pop(
"parquet_file_extension", (".parq", ".parquet", ".pq")
)
arrow_to_pandas = kwargs.pop("arrow_to_pandas", None)
open_file_options = kwargs.pop("open_file_options", None)

# Validate and normalize kwargs
kwargs["dtype_backend"] = dtype_backend
if arrow_to_pandas is not None:
raise ValueError(
"arrow_to_pandas not supported for the 'cudf' backend."
)
if open_file_options is not None:
raise ValueError(
"The open_file_options argument is no longer supported "
"by the 'cudf' backend."
)
if filters is not None:
for filter in flatten(filters, container=list):
_, op, val = filter
if op == "in" and not isinstance(val, (set, list, tuple)):
raise TypeError(
"Value of 'in' filter must be a list, set or tuple."
)
if metadata_task_size is not None:
raise NotImplementedError(
"metadata_task_size is not supported when using the pyarrow filesystem."
)
if split_row_groups != "infer":
raise NotImplementedError(
"split_row_groups is not supported when using the pyarrow filesystem."
)
if parquet_file_extension != (".parq", ".parquet", ".pq"):
raise NotImplementedError(
"parquet_file_extension is not supported when using the pyarrow filesystem."
)
if blocksize is not None and blocksize != "default":
warnings.warn(
"blocksize is not supported when using the pyarrow filesystem."
"blocksize argument will be ignored."
)
if aggregate_files is not None:
warnings.warn(
"aggregate_files is not supported when using the pyarrow filesystem. "
"Please use the 'dataframe.parquet.minimum-partition-size' config."
"aggregate_files argument will be ignored."
)

return dx.new_collection(
CudfReadParquetPyarrowFS(
path,
columns=dx._util._convert_to_list(columns),
filters=filters,
categories=categories,
index=index,
calculate_divisions=calculate_divisions,
storage_options=storage_options,
filesystem=filesystem,
ignore_metadata_file=ignore_metadata_file,
arrow_to_pandas=arrow_to_pandas,
pyarrow_strings_enabled=pyarrow_strings_enabled(),
kwargs=kwargs,
_series=isinstance(columns, str),
)
)
return read_parquet_expr(*args, **kwargs)

@staticmethod
def read_csv(
Expand Down
4 changes: 1 addition & 3 deletions python/dask_cudf/dask_cudf/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
read_text = _deprecated_api(
"dask_cudf.io.read_text", new_api="dask_cudf.read_text"
)
read_parquet = _deprecated_api(
"dask_cudf.io.read_parquet", new_api="dask_cudf.read_parquet"
)
read_parquet = parquet.read_parquet
to_parquet = _deprecated_api(
"dask_cudf.io.to_parquet",
new_api="dask_cudf._legacy.io.parquet.to_parquet",
Expand Down
Loading
Loading