From 3e853f837f4297f68041c0759e4a76695f1eed50 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 5 Nov 2024 12:56:09 -0800 Subject: [PATCH 01/14] add new read_parquet API to dask_cudf --- python/dask_cudf/dask_cudf/backends.py | 136 +---- python/dask_cudf/dask_cudf/io/__init__.py | 4 +- python/dask_cudf/dask_cudf/io/parquet.py | 552 ++++++++++++++++-- .../dask_cudf/io/tests/test_parquet.py | 16 +- 4 files changed, 519 insertions(+), 189 deletions(-) diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index fb02e0ac772..9c5d5523019 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -700,140 +700,10 @@ def from_dict( ) @staticmethod - def read_parquet(path, *args, filesystem="fsspec", engine=None, **kwargs): - import dask_expr as dx - import fsspec - - if ( - isinstance(filesystem, fsspec.AbstractFileSystem) - or isinstance(filesystem, str) - and filesystem.lower() == "fsspec" - ): - # Default "fsspec" filesystem - from dask_cudf._legacy.io.parquet import CudfEngine + def read_parquet(*args, **kwargs): + from dask_cudf.io.parquet import read_parquet as read_parquet_expr - _raise_unsupported_parquet_kwargs(**kwargs) - return _default_backend( - dx.read_parquet, - path, - *args, - filesystem=filesystem, - engine=CudfEngine, - **kwargs, - ) - - else: - # EXPERIMENTAL filesystem="arrow" support. - # This code path uses PyArrow for IO, which is only - # beneficial for remote storage (e.g. S3) - - from fsspec.utils import stringify_path - from pyarrow import fs as pa_fs - - # CudfReadParquetPyarrowFS requires import of distributed beforehand - # (See: https://github.com/dask/dask/issues/11352) - import distributed # noqa: F401 - from dask.core import flatten - from dask.dataframe.utils import pyarrow_strings_enabled - - from dask_cudf.io.parquet import CudfReadParquetPyarrowFS - - if args: - raise ValueError(f"Unexpected positional arguments: {args}") - - if not ( - isinstance(filesystem, pa_fs.FileSystem) - or isinstance(filesystem, str) - and filesystem.lower() in ("arrow", "pyarrow") - ): - raise ValueError(f"Unexpected filesystem value: {filesystem}.") - - if not PYARROW_GE_15: - raise NotImplementedError( - "Experimental Arrow filesystem support requires pyarrow>=15" - ) - - if not isinstance(path, str): - path = stringify_path(path) - - # Extract kwargs - columns = kwargs.pop("columns", None) - filters = kwargs.pop("filters", None) - categories = kwargs.pop("categories", None) - index = kwargs.pop("index", None) - storage_options = kwargs.pop("storage_options", None) - dtype_backend = kwargs.pop("dtype_backend", None) - calculate_divisions = kwargs.pop("calculate_divisions", False) - ignore_metadata_file = kwargs.pop("ignore_metadata_file", False) - metadata_task_size = kwargs.pop("metadata_task_size", None) - split_row_groups = kwargs.pop("split_row_groups", "infer") - blocksize = kwargs.pop("blocksize", "default") - aggregate_files = kwargs.pop("aggregate_files", None) - parquet_file_extension = kwargs.pop( - "parquet_file_extension", (".parq", ".parquet", ".pq") - ) - arrow_to_pandas = kwargs.pop("arrow_to_pandas", None) - open_file_options = kwargs.pop("open_file_options", None) - - # Validate and normalize kwargs - kwargs["dtype_backend"] = dtype_backend - if arrow_to_pandas is not None: - raise ValueError( - "arrow_to_pandas not supported for the 'cudf' backend." - ) - if open_file_options is not None: - raise ValueError( - "The open_file_options argument is no longer supported " - "by the 'cudf' backend." - ) - if filters is not None: - for filter in flatten(filters, container=list): - _, op, val = filter - if op == "in" and not isinstance(val, (set, list, tuple)): - raise TypeError( - "Value of 'in' filter must be a list, set or tuple." - ) - if metadata_task_size is not None: - raise NotImplementedError( - "metadata_task_size is not supported when using the pyarrow filesystem." - ) - if split_row_groups != "infer": - raise NotImplementedError( - "split_row_groups is not supported when using the pyarrow filesystem." - ) - if parquet_file_extension != (".parq", ".parquet", ".pq"): - raise NotImplementedError( - "parquet_file_extension is not supported when using the pyarrow filesystem." - ) - if blocksize is not None and blocksize != "default": - warnings.warn( - "blocksize is not supported when using the pyarrow filesystem." - "blocksize argument will be ignored." - ) - if aggregate_files is not None: - warnings.warn( - "aggregate_files is not supported when using the pyarrow filesystem. " - "Please use the 'dataframe.parquet.minimum-partition-size' config." - "aggregate_files argument will be ignored." - ) - - return dx.new_collection( - CudfReadParquetPyarrowFS( - path, - columns=dx._util._convert_to_list(columns), - filters=filters, - categories=categories, - index=index, - calculate_divisions=calculate_divisions, - storage_options=storage_options, - filesystem=filesystem, - ignore_metadata_file=ignore_metadata_file, - arrow_to_pandas=arrow_to_pandas, - pyarrow_strings_enabled=pyarrow_strings_enabled(), - kwargs=kwargs, - _series=isinstance(columns, str), - ) - ) + return read_parquet_expr(*args, **kwargs) @staticmethod def read_csv( diff --git a/python/dask_cudf/dask_cudf/io/__init__.py b/python/dask_cudf/dask_cudf/io/__init__.py index 1e0f24d78ce..e92853e0fec 100644 --- a/python/dask_cudf/dask_cudf/io/__init__.py +++ b/python/dask_cudf/dask_cudf/io/__init__.py @@ -22,9 +22,7 @@ read_text = _deprecated_api( "dask_cudf.io.read_text", new_api="dask_cudf.read_text" ) -read_parquet = _deprecated_api( - "dask_cudf.io.read_parquet", new_api="dask_cudf.read_parquet" -) +read_parquet = parquet.read_parquet to_parquet = _deprecated_api( "dask_cudf.io.to_parquet", new_api="dask_cudf._legacy.io.parquet.to_parquet", diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 48cea7266af..143c12382f1 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -1,9 +1,24 @@ # Copyright (c) 2024, NVIDIA CORPORATION. + import functools +import itertools +import math +import warnings +import numpy as np import pandas as pd -from dask_expr.io.io import FusedParquetIO -from dask_expr.io.parquet import FragmentWrapper, ReadParquetPyarrowFS +from dask_expr._expr import Elemwise +from dask_expr._util import _convert_to_list +from dask_expr.io.io import FusedIO, FusedParquetIO +from dask_expr.io.parquet import ( + FragmentWrapper, + ReadParquetFSSpec, + ReadParquetPyarrowFS, +) + +from dask.dataframe.io.parquet.arrow import _filters_to_expression +from dask.dataframe.io.parquet.core import ParquetFunctionWrapper +from dask.utils import parse_bytes import cudf @@ -13,44 +28,140 @@ from dask_cudf._legacy.io.parquet import CudfEngine # noqa: F401 -class CudfFusedParquetIO(FusedParquetIO): +class NoOp(Elemwise): + # Workaround - Always wrap read_parquet operations + # in a NoOp to trigger tune_up optimizations. + _parameters = ["frame"] + _is_length_preserving = True + _projection_passthrough = True + _filter_passthrough = True + _preserves_partitioning_information = True + @staticmethod - def _load_multiple_files( - frag_filters, - columns, - schema, - *to_pandas_args, - ): - import pyarrow as pa + def operation(x): + return x - from dask.base import apply, tokenize - from dask.threaded import get - token = tokenize(frag_filters, columns, schema) - name = f"pq-file-{token}" - dsk = { - (name, i): ( - CudfReadParquetPyarrowFS._fragment_to_table, - frag, - filter, - columns, - schema, - ) - for i, (frag, filter) in enumerate(frag_filters) +class CudfReadParquetFSSpec(ReadParquetFSSpec): + def approx_statistics(self): + # Use a few files to approximate column-size statistics + + # Account for filters + ds_filters = None + if self.filters is not None: + ds_filters = _filters_to_expression(self.filters) + + # Use average total_uncompressed_size of three files + n_sample = 3 + column_sizes = {} + for i, frag in enumerate( + self._dataset_info["ds"].get_fragments(ds_filters) + ): + md = frag.metadata + for rg in range(md.num_row_groups): + row_group = md.row_group(rg) + for col in range(row_group.num_columns): + column = row_group.column(col) + name = column.path_in_schema + if name not in column_sizes: + column_sizes[name] = np.zeros(n_sample, dtype="int64") + column_sizes[name][i] += column.total_uncompressed_size + if (i + 1) >= n_sample: + break + + # Reorganize stats to look like arrow-fs version + return { + "columns": [ + { + "path_in_schema": name, + "total_uncompressed_size": np.mean(sizes), + } + for name, sizes in column_sizes.items() + ] } - dsk[name] = ( - apply, - pa.concat_tables, - [list(dsk.keys())], - {"promote_options": "permissive"}, - ) - return CudfReadParquetPyarrowFS._table_to_pandas( - get(dsk, name), - *to_pandas_args, - ) + + # ## OLD + # @property + # def _fusion_compression_factor(self): + # if self.operand("columns") is None: + # return 1 + # nr_original_columns = max(len(self._dataset_info["schema"].names) - 1, 1) + # return max( + # len(_convert_to_list(self.operand("columns"))) / nr_original_columns, 0.001 + # ) + + @functools.cached_property + def _fusion_compression_factor(self): + blocksize = self.blocksize + if blocksize is None or self.aggregate_files: + # NOTE: We cannot fuse files *again* if + # aggregate_files is True (this creates + # too much OOM risk) + return 1 + elif blocksize == "default": + blocksize = "256MiB" + + approx_stats = self.approx_statistics() + projected_size = 0 + col_op = self.operand("columns") or self.columns + for col in approx_stats["columns"]: + if col["path_in_schema"] in col_op or ( + (split_name := col["path_in_schema"].split(".")) + and split_name[0] in col_op + ): + projected_size += col["total_uncompressed_size"] + + if projected_size < 1: + return 1 + + aggregate_files = max(1, int(parse_bytes(blocksize) / projected_size)) + return max(1 / aggregate_files, 0.001) + + def _tune_up(self, parent): + if self._fusion_compression_factor >= 1: + return + if isinstance(parent, FusedIO): + return + return parent.substitute(self, CudfFusedIO(self)) class CudfReadParquetPyarrowFS(ReadParquetPyarrowFS): + _parameters = [ + "path", + "columns", + "filters", + "categories", + "index", + "storage_options", + "filesystem", + "blocksize", + "ignore_metadata_file", + "calculate_divisions", + "arrow_to_pandas", + "pyarrow_strings_enabled", + "kwargs", + "_partitions", + "_series", + "_dataset_info_cache", + ] + _defaults = { + "columns": None, + "filters": None, + "categories": None, + "index": None, + "storage_options": None, + "filesystem": None, + "blocksize": "256 MiB", + "ignore_metadata_file": True, + "calculate_divisions": False, + "arrow_to_pandas": None, + "pyarrow_strings_enabled": True, + "kwargs": None, + "_partitions": None, + "_series": False, + "_dataset_info_cache": None, + } + @functools.cached_property def _dataset_info(self): from dask_cudf._legacy.io.parquet import ( @@ -84,11 +195,94 @@ def _dataset_info(self): @staticmethod def _table_to_pandas(table, index_name): - df = cudf.DataFrame.from_arrow(table) - if index_name is not None: - df = df.set_index(index_name) + if isinstance(table, cudf.DataFrame): + df = table + else: + df = cudf.DataFrame.from_arrow(table) + if index_name is not None: + return df.set_index(index_name) return df + @staticmethod + def _fragments_to_cudf_dataframe( + fragment_wrappers, + filters, + columns, + schema, + ): + from dask.dataframe.io.utils import _is_local_fs + + from cudf.io.parquet import _apply_post_filters, _normalize_filters + + if not isinstance(fragment_wrappers, list): + fragment_wrappers = [fragment_wrappers] + + filesystem = None + paths, row_groups = [], [] + for fw in fragment_wrappers: + frag = fw.fragment if isinstance(fw, FragmentWrapper) else fw + paths.append(frag.path) + row_groups.append( + [rg.id for rg in frag.row_groups] if frag.row_groups else None + ) + if filesystem is None: + filesystem = frag.filesystem + + if _is_local_fs(filesystem): + filesystem = None + else: + from fsspec.implementations.arrow import ArrowFSWrapper + + filesystem = ArrowFSWrapper(filesystem) + protocol = filesystem.protocol + paths = [f"{protocol}://{path}" for path in paths] + + filters = _normalize_filters(filters) + projected_columns = None + if columns and filters: + projected_columns = [c for c in columns if c is not None] + columns = sorted( + set(v[0] for v in itertools.chain.from_iterable(filters)) + | set(projected_columns) + ) + + if row_groups == [None for path in paths]: + row_groups = None + + df = cudf.read_parquet( + paths, + columns=columns, + filters=filters, + row_groups=row_groups, + dataset_kwargs={"schema": schema}, + ) + + # Apply filters (if any are defined) + df = _apply_post_filters(df, filters) + if projected_columns: + # Elements of `projected_columns` may now be in the index. + # We must filter these names from our projection + projected_columns = [ + col for col in projected_columns if col in df._column_names + ] + df = df[projected_columns] + + # TODO: Deal with hive partitioning. + # Note that ReadParquetPyarrowFS does NOT support this yet anyway. + return df + + @functools.cached_property + def _use_device_io(self): + from dask.dataframe.io.utils import _is_local_fs + + # Use host for remote filesystem only + return _is_local_fs(self.fs) + # TODO: Use KvikIO-S3 support when available. + # or ( + # self.fs.type_name + # == "s3" # TODO: and cudf.get_option("kvikio_remote_io") + # ) + def _filtered_task(self, index: int): columns = self.columns.copy() index_name = self.index.name @@ -99,10 +293,14 @@ def _filtered_task(self, index: int): if columns is None: columns = list(schema.names) columns.append(index_name) + + frag_to_table = self._fragment_to_table + if self._use_device_io: + frag_to_table = self._fragments_to_cudf_dataframe return ( self._table_to_pandas, ( - self._fragment_to_table, + frag_to_table, FragmentWrapper(self.fragments[index], filesystem=self.fs), self.filters, columns, @@ -111,18 +309,286 @@ def _filtered_task(self, index: int): index_name, ) + @property + def _fusion_compression_factor(self): + blocksize = self.blocksize + if blocksize is None: + return 1 + elif blocksize == "default": + blocksize = "256MiB" + + approx_stats = self.approx_statistics() + projected_size = 0 + col_op = self.operand("columns") or self.columns + for col in approx_stats["columns"]: + if col["path_in_schema"] in col_op or ( + (split_name := col["path_in_schema"].split(".")) + and split_name[0] in col_op + ): + projected_size += col["total_uncompressed_size"] + + if projected_size < 1: + return 1 + + aggregate_files = max(1, int(parse_bytes(blocksize) / projected_size)) + return max(1 / aggregate_files, 0.001) + def _tune_up(self, parent): if self._fusion_compression_factor >= 1: return - if isinstance(parent, CudfFusedParquetIO): + fused_cls = ( + CudfFusedParquetIO + if self._use_device_io + else CudfFusedParquetIOHost + ) + if isinstance(parent, fused_cls): return - return parent.substitute(self, CudfFusedParquetIO(self)) + return parent.substitute(self, fused_cls(self)) + + +class CudfFusedIO(FusedIO): + def _task(self, index: int): + expr = self.operand("_expr") + bucket = self._fusion_buckets[index] + + io_func = expr._filtered_task(0)[0] + if not isinstance( + io_func, ParquetFunctionWrapper + ) or io_func.common_kwargs.get("partitions", None): + # Just use "simple" fusion if we have an unexpected + # callable, or we are dealing with hive partitioning. + return (cudf.concat, [expr._filtered_task(i) for i in bucket]) + + pieces = [] + for i in bucket: + piece = expr._filtered_task(i)[1] + if isinstance(piece, list): + pieces.extend(piece) + else: + pieces.append(piece) + return (io_func, pieces) + + +class CudfFusedParquetIO(FusedParquetIO): + @functools.cached_property + def _fusion_buckets(self): + partitions = self.operand("_expr")._partitions + npartitions = len(partitions) + + step = math.ceil(1 / self.operand("_expr")._fusion_compression_factor) + + # TODO: Heuristic to limit fusion should probably + # account for the number of workers. For now, just + # limiting fusion to 100 partitions at once. + step = min(step, 100) + + buckets = [ + partitions[i : i + step] for i in range(0, npartitions, step) + ] + return buckets + + @classmethod + def _load_multiple_files( + cls, + frag_filters, + columns, + schema, + *to_pandas_args, + ): + frag_to_table = CudfReadParquetPyarrowFS._fragments_to_cudf_dataframe + return CudfReadParquetPyarrowFS._table_to_pandas( + frag_to_table( + [frag[0] for frag in frag_filters], + frag_filters[0][1], # TODO: Check for consistent filters? + columns, + schema, + ), + *to_pandas_args, + ) + + +class CudfFusedParquetIOHost(CudfFusedParquetIO): + @classmethod + def _load_multiple_files( + cls, + frag_filters, + columns, + schema, + *to_pandas_args, + ): + import pyarrow as pa + + from dask.base import apply, tokenize + from dask.threaded import get + + token = tokenize(frag_filters, columns, schema) + name = f"pq-file-{token}" + dsk = { + (name, i): ( + CudfReadParquetPyarrowFS._fragment_to_table, + frag, + filter, + columns, + schema, + ) + for i, (frag, filter) in enumerate(frag_filters) + } + dsk[name] = ( + apply, + pa.concat_tables, + [list(dsk.keys())], + {"promote_options": "permissive"}, + ) + + return CudfReadParquetPyarrowFS._table_to_pandas( + get(dsk, name), + *to_pandas_args, + ) + + +def read_parquet( + path, + *args, + columns=None, + filters=None, + categories=None, + index=None, + storage_options=None, + dtype_backend=None, + calculate_divisions=False, + ignore_metadata_file=False, + metadata_task_size=None, + split_row_groups="infer", + blocksize="default", + aggregate_files=None, + parquet_file_extension=(".parq", ".parquet", ".pq"), + filesystem="fsspec", + engine=None, + arrow_to_pandas=None, + **kwargs, +): + import dask_expr as dx + from fsspec.utils import stringify_path + from pyarrow import fs as pa_fs + + from dask.core import flatten + from dask.dataframe.utils import pyarrow_strings_enabled + + from dask_cudf.backends import PYARROW_GE_15 + + if args: + raise ValueError(f"Unexpected positional arguments: {args}") + + if dtype_backend is not None: + raise NotImplementedError( + "dtype_backend is not supported by the 'cudf' backend." + ) + if arrow_to_pandas is not None: + raise NotImplementedError( + "arrow_to_pandas is not supported by the 'cudf' backend." + ) + if engine not in (None, "cudf", CudfEngine): + raise NotImplementedError( + "engine={engine} is not supported by the 'cudf' backend." + ) + + if not isinstance(path, str): + path = stringify_path(path) + + kwargs["dtype_backend"] = None + if arrow_to_pandas: + kwargs["arrow_to_pandas"] = None + + if filters is not None: + for filter in flatten(filters, container=list): + _, op, val = filter + if op == "in" and not isinstance(val, (set, list, tuple)): + raise TypeError( + "Value of 'in' filter must be a list, set or tuple." + ) + + if ( + isinstance(filesystem, pa_fs.FileSystem) + or isinstance(filesystem, str) + and filesystem.lower() in ("arrow", "pyarrow") + ): + # EXPERIMENTAL filesystem="arrow" support. + # This code path may use PyArrow for remote IO. + + # CudfReadParquetPyarrowFS requires import of distributed beforehand + # (See: https://github.com/dask/dask/issues/11352) + import distributed # noqa: F401 + + if not PYARROW_GE_15: + raise ValueError( + "pyarrow>=15.0.0 is required to use the pyarrow filesystem." + ) + if metadata_task_size is not None: + warnings.warn( + "metadata_task_size is not supported when using the pyarrow filesystem." + " This argument will be ignored!" + ) + if aggregate_files is not None: + warnings.warn( + "aggregate_files is not supported when using the pyarrow filesystem." + " This argument will be ignored!" + ) + if split_row_groups != "infer": + warnings.warn( + "split_row_groups is not supported when using the pyarrow filesystem." + " This argument will be ignored!" + ) + if parquet_file_extension != (".parq", ".parquet", ".pq"): + raise NotImplementedError( + "parquet_file_extension is not supported when using the pyarrow filesystem." + ) + + return dx.new_collection( + NoOp( + CudfReadParquetPyarrowFS( + path, + columns=_convert_to_list(columns), + filters=filters, + categories=categories, + index=index, + calculate_divisions=calculate_divisions, + storage_options=storage_options, + filesystem=filesystem, + blocksize=blocksize, + ignore_metadata_file=ignore_metadata_file, + arrow_to_pandas=None, + pyarrow_strings_enabled=pyarrow_strings_enabled(), + kwargs=kwargs, + _series=isinstance(columns, str), + ), + ) + ) + + return dx.new_collection( + NoOp( + CudfReadParquetFSSpec( + path, + columns=_convert_to_list(columns), + filters=filters, + categories=categories, + index=index, + blocksize=blocksize, + storage_options=storage_options, + calculate_divisions=calculate_divisions, + ignore_metadata_file=ignore_metadata_file, + metadata_task_size=metadata_task_size, + split_row_groups=split_row_groups, + aggregate_files=aggregate_files, + parquet_file_extension=parquet_file_extension, + filesystem=filesystem, + engine=CudfEngine, + kwargs=kwargs, + _series=isinstance(columns, str), + ), + ) + ) -read_parquet = _deprecated_api( - "dask_cudf.io.parquet.read_parquet", - new_api="dask_cudf.read_parquet", -) to_parquet = _deprecated_api( "dask_cudf.io.parquet.to_parquet", new_api="dask_cudf._legacy.io.parquet.to_parquet", diff --git a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py index 522a21e12a5..3ccd44ec696 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py @@ -46,7 +46,7 @@ def test_roundtrip_backend_dispatch(tmpdir): tmpdir = str(tmpdir) ddf.to_parquet(tmpdir, engine="pyarrow") with dask.config.set({"dataframe.backend": "cudf"}): - ddf2 = dd.read_parquet(tmpdir, index=False) + ddf2 = dd.read_parquet(tmpdir, index=False, blocksize=None) assert isinstance(ddf2, dask_cudf.DataFrame) dd.assert_eq(ddf.reset_index(drop=False), ddf2) @@ -100,7 +100,7 @@ def test_roundtrip_from_dask_index_false(tmpdir): tmpdir = str(tmpdir) ddf.to_parquet(tmpdir, engine="pyarrow") - ddf2 = dask_cudf.read_parquet(tmpdir, index=False) + ddf2 = dask_cudf.read_parquet(tmpdir, index=False, blocksize=None) dd.assert_eq(ddf.reset_index(drop=False), ddf2) @@ -667,7 +667,7 @@ def test_to_parquet_append(tmpdir, write_metadata_file): write_metadata_file=write_metadata_file, write_index=False, ) - ddf2 = dask_cudf.read_parquet(tmpdir) + ddf2 = dask_cudf.read_parquet(tmpdir, blocksize=None) dd.assert_eq(cudf.concat([df, df]), ddf2) @@ -677,13 +677,9 @@ def test_deprecated_api_paths(tmpdir): with pytest.warns(match="dask_cudf.io.to_parquet is now deprecated"): dask_cudf.io.to_parquet(df, tmpdir) - # Encourage top-level read_parquet import only - with pytest.warns(match="dask_cudf.io.read_parquet is now deprecated"): - df2 = dask_cudf.io.read_parquet(tmpdir) + # Allow internal read_parquet import + df2 = dask_cudf.io.read_parquet(tmpdir) dd.assert_eq(df, df2, check_divisions=False) - with pytest.warns( - match="dask_cudf.io.parquet.read_parquet is now deprecated" - ): - df2 = dask_cudf.io.parquet.read_parquet(tmpdir) + df2 = dask_cudf.io.parquet.read_parquet(tmpdir) dd.assert_eq(df, df2, check_divisions=False) From b30c529af3699525432ada9edc3588c60909b3c6 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 5 Nov 2024 13:04:32 -0800 Subject: [PATCH 02/14] fix non-expr deprecation --- python/dask_cudf/dask_cudf/io/parquet.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 143c12382f1..9901d20847b 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -22,7 +22,7 @@ import cudf -from dask_cudf import _deprecated_api +from dask_cudf import QUERY_PLANNING_ON, _deprecated_api # Dask-expr imports CudfEngine from this module from dask_cudf._legacy.io.parquet import CudfEngine # noqa: F401 @@ -446,7 +446,7 @@ def _load_multiple_files( ) -def read_parquet( +def read_parquet_expr( path, *args, columns=None, @@ -589,6 +589,13 @@ def read_parquet( ) +if QUERY_PLANNING_ON: + read_parquet = read_parquet_expr +else: + read_parquet = _deprecated_api( + "dask_cudf.io.parquet.read_parquet", + new_api="dask_cudf.read_parquet", + ) to_parquet = _deprecated_api( "dask_cudf.io.parquet.to_parquet", new_api="dask_cudf._legacy.io.parquet.to_parquet", From e3c640a478ab55a41caf44116428c7dc621959b4 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 5 Nov 2024 16:33:15 -0800 Subject: [PATCH 03/14] fix CudfReadParquetFSSpec fusion --- python/dask_cudf/dask_cudf/io/parquet.py | 40 +++++++++++------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 9901d20847b..45eba8371bd 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -80,42 +80,38 @@ def approx_statistics(self): ] } - # ## OLD - # @property - # def _fusion_compression_factor(self): - # if self.operand("columns") is None: - # return 1 - # nr_original_columns = max(len(self._dataset_info["schema"].names) - 1, 1) - # return max( - # len(_convert_to_list(self.operand("columns"))) / nr_original_columns, 0.001 - # ) - @functools.cached_property def _fusion_compression_factor(self): - blocksize = self.blocksize - if blocksize is None or self.aggregate_files: - # NOTE: We cannot fuse files *again* if - # aggregate_files is True (this creates - # too much OOM risk) + if self.blocksize is None: + # Let blocksize=None disable fusion + return 1 + + # At this point, `blockwise` was already used to + # split/aggregate files. Therefore, we now + # need to figure out whether we should fuse + # the current partitions to handle column + # projection. + # NOTE: We don't know if the current partitions + # are multiple files or file fragments. + + if self.operand("columns") is None: return 1 - elif blocksize == "default": - blocksize = "256MiB" approx_stats = self.approx_statistics() - projected_size = 0 + projected_size, original_size = 0, 0 col_op = self.operand("columns") or self.columns for col in approx_stats["columns"]: + original_size += col["total_uncompressed_size"] if col["path_in_schema"] in col_op or ( (split_name := col["path_in_schema"].split(".")) and split_name[0] in col_op ): projected_size += col["total_uncompressed_size"] - if projected_size < 1: + if original_size < 1 or projected_size < 1: return 1 - aggregate_files = max(1, int(parse_bytes(blocksize) / projected_size)) - return max(1 / aggregate_files, 0.001) + return max(projected_size / original_size, 0.001) def _tune_up(self, parent): if self._fusion_compression_factor >= 1: @@ -317,8 +313,8 @@ def _fusion_compression_factor(self): elif blocksize == "default": blocksize = "256MiB" - approx_stats = self.approx_statistics() projected_size = 0 + approx_stats = self.approx_statistics() col_op = self.operand("columns") or self.columns for col in approx_stats["columns"]: if col["path_in_schema"] in col_op or ( From e48202658c0d39bba99d21427a4919b6faf1a41f Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 5 Nov 2024 18:49:05 -0800 Subject: [PATCH 04/14] correct for aggregate_files=False --- python/dask_cudf/dask_cudf/io/parquet.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 45eba8371bd..11a484a848d 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -90,11 +90,12 @@ def _fusion_compression_factor(self): # split/aggregate files. Therefore, we now # need to figure out whether we should fuse # the current partitions to handle column - # projection. - # NOTE: We don't know if the current partitions - # are multiple files or file fragments. + # projection. We don't know if the current + # partitions are multiple files or file fragments. + # Therefore, we need to use a "multiplier" below + # to correct aggregate_files=False partitioning. - if self.operand("columns") is None: + if self.operand("columns") is None and self.aggregate_files: return 1 approx_stats = self.approx_statistics() @@ -111,7 +112,16 @@ def _fusion_compression_factor(self): if original_size < 1 or projected_size < 1: return 1 - return max(projected_size / original_size, 0.001) + # The multiplier corrects for the fact that the original + # files would have been fused for aggregate_files=True + multiplier = 1.0 + if not self.aggregate_files: + multiplier = original_size / parse_bytes(self.blocksize) + + return max( + (projected_size / original_size) * multiplier, + 0.001, + ) def _tune_up(self, parent): if self._fusion_compression_factor >= 1: From 2ad1867c3190b6217f2d60556242916555ef5275 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 6 Nov 2024 08:04:31 -0800 Subject: [PATCH 05/14] update default blocksize, and add docstring --- python/dask_cudf/dask_cudf/io/parquet.py | 307 ++++++++++++++++++----- 1 file changed, 250 insertions(+), 57 deletions(-) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 11a484a848d..9c7015539c7 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -1,9 +1,12 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +from __future__ import annotations + import functools import itertools import math import warnings +from typing import TYPE_CHECKING, Any import numpy as np import pandas as pd @@ -18,6 +21,7 @@ from dask.dataframe.io.parquet.arrow import _filters_to_expression from dask.dataframe.io.parquet.core import ParquetFunctionWrapper +from dask.tokenize import tokenize from dask.utils import parse_bytes import cudf @@ -27,6 +31,32 @@ # Dask-expr imports CudfEngine from this module from dask_cudf._legacy.io.parquet import CudfEngine # noqa: F401 +if TYPE_CHECKING: + from collections.abc import MutableMapping + + +_DEVICE_SIZE_CACHE: int | None = None +_STATS_CACHE: MutableMapping[str, Any] = {} + + +def _normalize_blocksize(fraction: float = 0.03125): + global _DEVICE_SIZE_CACHE + + try: + # Plan A: Use PyNVML to set the blocksize + # (Default is 1/32 the total memory of device 0) + import pynvml + + if _DEVICE_SIZE_CACHE is None: + pynvml.nvmlInit() + handle = pynvml.nvmlDeviceGetHandleByIndex(0) + _DEVICE_SIZE_CACHE = pynvml.nvmlDeviceGetMemoryInfo(handle).total + + return int(_DEVICE_SIZE_CACHE * fraction) + except ImportError: + # Fall back to a 256MiB default + return "256MiB" + class NoOp(Elemwise): # Workaround - Always wrap read_parquet operations @@ -45,59 +75,69 @@ def operation(x): class CudfReadParquetFSSpec(ReadParquetFSSpec): def approx_statistics(self): # Use a few files to approximate column-size statistics - - # Account for filters - ds_filters = None - if self.filters is not None: - ds_filters = _filters_to_expression(self.filters) - - # Use average total_uncompressed_size of three files - n_sample = 3 - column_sizes = {} - for i, frag in enumerate( - self._dataset_info["ds"].get_fragments(ds_filters) - ): - md = frag.metadata - for rg in range(md.num_row_groups): - row_group = md.row_group(rg) - for col in range(row_group.num_columns): - column = row_group.column(col) - name = column.path_in_schema - if name not in column_sizes: - column_sizes[name] = np.zeros(n_sample, dtype="int64") - column_sizes[name][i] += column.total_uncompressed_size - if (i + 1) >= n_sample: - break - - # Reorganize stats to look like arrow-fs version - return { - "columns": [ - { - "path_in_schema": name, - "total_uncompressed_size": np.mean(sizes), - } - for name, sizes in column_sizes.items() - ] - } + global _STATS_CACHE + + key = tokenize(self._dataset_info["ds"].files[:10], self.filters) + try: + return _STATS_CACHE[key] + + except KeyError: + # Account for filters + ds_filters = None + if self.filters is not None: + ds_filters = _filters_to_expression(self.filters) + + # Use average total_uncompressed_size of three files + n_sample = 3 + column_sizes = {} + for i, frag in enumerate( + self._dataset_info["ds"].get_fragments(ds_filters) + ): + md = frag.metadata + for rg in range(md.num_row_groups): + row_group = md.row_group(rg) + for col in range(row_group.num_columns): + column = row_group.column(col) + name = column.path_in_schema + if name not in column_sizes: + column_sizes[name] = np.zeros( + n_sample, dtype="int64" + ) + column_sizes[name][i] += column.total_uncompressed_size + if (i + 1) >= n_sample: + break + + # Reorganize stats to look like arrow-fs version + _STATS_CACHE[key] = { + "columns": [ + { + "path_in_schema": name, + "total_uncompressed_size": np.mean(sizes), + } + for name, sizes in column_sizes.items() + ] + } + return _STATS_CACHE[key] @functools.cached_property def _fusion_compression_factor(self): + # Disable fusion when blocksize=None if self.blocksize is None: - # Let blocksize=None disable fusion - return 1 - - # At this point, `blockwise` was already used to - # split/aggregate files. Therefore, we now - # need to figure out whether we should fuse - # the current partitions to handle column - # projection. We don't know if the current - # partitions are multiple files or file fragments. - # Therefore, we need to use a "multiplier" below - # to correct aggregate_files=False partitioning. - - if self.operand("columns") is None and self.aggregate_files: return 1 + # At this point, we *may* have used `blockwise` + # already to split or aggregate files. We don't + # *know* if the current partitions correspond to + # individual/full files, multiple/aggregated files + # or partial/split files. + # + # Therefore, we need to use the statistics from + # a few files to estimate the current partition + # size. This size should be similar to `blocksize` + # *if* aggregate_files is True or if the files + # are *smaller* than `blocksize`. + + # Step 1: Sample statistics approx_stats = self.approx_statistics() projected_size, original_size = 0, 0 col_op = self.operand("columns") or self.columns @@ -108,20 +148,35 @@ def _fusion_compression_factor(self): and split_name[0] in col_op ): projected_size += col["total_uncompressed_size"] - if original_size < 1 or projected_size < 1: return 1 - # The multiplier corrects for the fact that the original - # files would have been fused for aggregate_files=True - multiplier = 1.0 - if not self.aggregate_files: - multiplier = original_size / parse_bytes(self.blocksize) + # Step 2: Estimate the correction factor + # (Correct for possible pre-optimization fusion/splitting) + blocksize = parse_bytes(self.blocksize) + if original_size > blocksize: + # Input files are bigger than blocksize + # and we already split these large files. + # (correction_factor > 1) + correction_factor = original_size / blocksize + elif self.aggregate_files: + # Input files are smaller than blocksize + # and we already aggregate small files. + # (correction_factor == 1) + correction_factor = 1 + else: + # Input files are smaller than blocksize + # but we haven't aggregate small files yet. + # (correction_factor < 1) + correction_factor = original_size / blocksize + + # Step 3. Estimate column-projection factor + if self.operand("columns") is None: + projection_factor = 1 + else: + projection_factor = projected_size / original_size - return max( - (projected_size / original_size) * multiplier, - 0.001, - ) + return max(projection_factor * correction_factor, 0.001) def _tune_up(self, parent): if self._fusion_compression_factor >= 1: @@ -473,6 +528,137 @@ def read_parquet_expr( arrow_to_pandas=None, **kwargs, ): + """ + Read a Parquet file into a Dask-cuDF DataFrame. + + This reads a directory of Parquet data into a DataFrame collection. + Partitioning behavior mostly depends on the ``blocksize`` argument. + + .. note:: + Dask may automatically resize partitions at optimization time. + Please set ``blocksize=None`` to disable this behavior in Dask cuDF. + (NOTE: This will not disable fusion for the "pandas" backend) + + .. note:: + Specifying ``filesystem="arrow"`` leverages a complete reimplementation of + the Parquet reader that is solely based on PyArrow. It is faster than the + legacy implementation in some cases, but doesn't yet support all features. + + Parameters + ---------- + path : str or list + Source directory for data, or path(s) to individual parquet files. + Prefix with a protocol like ``s3://`` to read from alternative + filesystems. To read from multiple files you can pass a globstring or a + list of paths, with the caveat that they must all have the same + protocol. + columns : str or list, default None + Field name(s) to read in as columns in the output. By default all + non-index fields will be read (as determined by the pandas parquet + metadata, if present). Provide a single field name instead of a list to + read in the data as a Series. + filters : Union[List[Tuple[str, str, Any]], List[List[Tuple[str, str, Any]]]], default None + List of filters to apply, like ``[[('col1', '==', 0), ...], ...]``. + Using this argument will result in row-wise filtering of the final partitions. + + Predicates can be expressed in disjunctive normal form (DNF). This means that + the inner-most tuple describes a single column predicate. These inner predicates + are combined with an AND conjunction into a larger predicate. The outer-most + list then combines all of the combined filters with an OR disjunction. + + Predicates can also be expressed as a ``List[Tuple]``. These are evaluated + as an AND conjunction. To express OR in predicates, one must use the + (preferred for "pyarrow") ``List[List[Tuple]]`` notation. + index : str, list or False, default None + Field name(s) to use as the output frame index. By default will be + inferred from the pandas parquet file metadata, if present. Use ``False`` + to read all fields as columns. + categories : list or dict, default None + For any fields listed here, if the parquet encoding is Dictionary, + the column will be created with dtype category. Use only if it is + guaranteed that the column is encoded as dictionary in all row-groups. + If a list, assumes up to 2**16-1 labels; if a dict, specify the number + of labels expected; if None, will load categories automatically for + data written by dask, not otherwise. + storage_options : dict, default None + Key/value pairs to be passed on to the file-system backend, if any. + Note that the default file-system backend can be configured with the + ``filesystem`` argument, described below. + calculate_divisions : bool, default False + Whether to use min/max statistics from the footer metadata (or global + ``_metadata`` file) to calculate divisions for the output DataFrame + collection. Divisions will not be calculated if statistics are missing. + This option will be ignored if ``index`` is not specified and there is + no physical index column specified in the custom "pandas" Parquet + metadata. Note that ``calculate_divisions=True`` may be extremely slow + when no global ``_metadata`` file is present, especially when reading + from remote storage. Set this to ``True`` only when known divisions + are needed for your workload (see :ref:`dataframe-design-partitions`). + ignore_metadata_file : bool, default False + Whether to ignore the global ``_metadata`` file (when one is present). + If ``True``, or if the global ``_metadata`` file is missing, the parquet + metadata may be gathered and processed in parallel. Parallel metadata + processing is currently supported for ``ArrowDatasetEngine`` only. + metadata_task_size : int, default configurable + If parquet metadata is processed in parallel (see ``ignore_metadata_file`` + description above), this argument can be used to specify the number of + dataset files to be processed by each task in the Dask graph. If this + argument is set to ``0``, parallel metadata processing will be disabled. + The default values for local and remote filesystems can be specified + with the "metadata-task-size-local" and "metadata-task-size-remote" + config fields, respectively (see "dataframe.parquet"). + split_row_groups : 'infer', 'adaptive', bool, or int, default 'infer' + WARNING: The ``split_row_groups`` argument is now deprecated, please use + ``blocksize`` instead. + + blocksize : int, float or str, default 'default' + The desired size of each output ``DataFrame`` partition in terms of total + (uncompressed) parquet storage space. This argument may be used to split + large files or aggregate small files into the same partition. Use ``None`` + for a simple 1:1 mapping between files and partitions. Use a float value + less than 1.0 to specify the fractional size of the partitions with + respect to the total memory of the first NVIDIA GPU on your machine. + Default is 1/32 the total memory of a single GPU. + aggregate_files : bool or str, default None + WARNING: The behavior of ``aggregate_files=True`` is now obsolete + when query-planning is enabled (the default). Small files are now + aggregated automatically according to the ``blocksize`` setting. + Please expect this argument to be deprecated in a future release. + + WARNING: Passing a string argument to ``aggregate_files`` will result + in experimental behavior that may be removed at any time. + + parquet_file_extension: str, tuple[str], or None, default (".parq", ".parquet", ".pq") + A file extension or an iterable of extensions to use when discovering + parquet files in a directory. Files that don't match these extensions + will be ignored. This argument only applies when ``paths`` corresponds + to a directory and no ``_metadata`` file is present (or + ``ignore_metadata_file=True``). Passing in ``parquet_file_extension=None`` + will treat all files in the directory as parquet files. + + The purpose of this argument is to ensure that the engine will ignore + unsupported metadata files (like Spark's '_SUCCESS' and 'crc' files). + It may be necessary to change this argument if the data files in your + parquet dataset do not end in ".parq", ".parquet", or ".pq". + filesystem: "fsspec", "arrow", or fsspec.AbstractFileSystem backend to use. + dataset: dict, default None + Dictionary of options to use when creating a ``pyarrow.dataset.Dataset`` object. + These options may include a "filesystem" key to configure the desired + file-system backend. However, the top-level ``filesystem`` argument will always + take precedence. + + **Note**: The ``dataset`` options may include a "partitioning" key. + However, since ``pyarrow.dataset.Partitioning`` + objects cannot be serialized, the value can be a dict of key-word + arguments for the ``pyarrow.dataset.partitioning`` API + (e.g. ``dataset={"partitioning": {"flavor": "hive", "schema": ...}}``). + Note that partitioned columns will not be converted to categorical + dtypes when a custom partitioning schema is specified in this way. + read: dict, default None + Dictionary of options to pass through to ``CudfEngine.read_partitions`` + using the ``read`` key-word argument. + """ + import dask_expr as dx from fsspec.utils import stringify_path from pyarrow import fs as pa_fs @@ -513,6 +699,12 @@ def read_parquet_expr( "Value of 'in' filter must be a list, set or tuple." ) + # Normalize blocksize input + if blocksize == "default": + blocksize = _normalize_blocksize() + elif isinstance(blocksize, float) and blocksize < 1: + blocksize = _normalize_blocksize(blocksize) + if ( isinstance(filesystem, pa_fs.FileSystem) or isinstance(filesystem, str) @@ -597,6 +789,7 @@ def read_parquet_expr( if QUERY_PLANNING_ON: read_parquet = read_parquet_expr + read_parquet.__doc__ = read_parquet_expr.__doc__ else: read_parquet = _deprecated_api( "dask_cudf.io.parquet.read_parquet", From 3fb23fdab47086dade47bee7343e606f7093e852 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 12 Nov 2024 11:52:55 -0800 Subject: [PATCH 06/14] revise _normalize_blocksize --- python/dask_cudf/dask_cudf/io/parquet.py | 42 ++++++++++++++++-------- 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 3e2b44d0b24..214fc0c5ec7 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -40,23 +40,39 @@ _STATS_CACHE: MutableMapping[str, Any] = {} -def _normalize_blocksize(fraction: float = 0.03125): - global _DEVICE_SIZE_CACHE - +def _get_device_size(): try: - # Plan A: Use PyNVML to set the blocksize - # (Default is 1/32 the total memory of device 0) + # Plan A: Use PyNVML to check device size import pynvml - if _DEVICE_SIZE_CACHE is None: - pynvml.nvmlInit() - handle = pynvml.nvmlDeviceGetHandleByIndex(0) - _DEVICE_SIZE_CACHE = pynvml.nvmlDeviceGetMemoryInfo(handle).total - - return int(_DEVICE_SIZE_CACHE * fraction) + pynvml.nvmlInit() + handle = pynvml.nvmlDeviceGetHandleByIndex(0) + return pynvml.nvmlDeviceGetMemoryInfo(handle).total except ImportError: - # Fall back to a 256MiB default - return "256MiB" + # Fall back to a conservative 8GiB default + return "8GiB" + + +def _normalize_blocksize(fraction: float = 0.03125): + # Set the blocksize to fraction * . + # We use the smallest worker device to set . + # (Default blocksize is 1/32 * ) + global _DEVICE_SIZE_CACHE + + if _DEVICE_SIZE_CACHE is None: + try: + # Check distributed workers (if a client exists) + from distributed import get_client + + client = get_client() + # TODO: Check "GPU" worker resources only. + # Depends on (https://github.com/rapidsai/dask-cuda/pull/1401) + device_size = min(client.run(_get_device_size).values()) + except (ImportError, ValueError): + device_size = _get_device_size() + _DEVICE_SIZE_CACHE = device_size + + return int(_DEVICE_SIZE_CACHE * fraction) class NoOp(Elemwise): From 0552b330ccb901370948551fc81b477027f4be67 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 13 Nov 2024 08:24:48 -0800 Subject: [PATCH 07/14] fix test --- python/dask_cudf/dask_cudf/io/tests/test_parquet.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py index 3ccd44ec696..27a0d3ebb1c 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py @@ -681,5 +681,6 @@ def test_deprecated_api_paths(tmpdir): df2 = dask_cudf.io.read_parquet(tmpdir) dd.assert_eq(df, df2, check_divisions=False) - df2 = dask_cudf.io.parquet.read_parquet(tmpdir) - dd.assert_eq(df, df2, check_divisions=False) + if dask_cudf.QUERY_PLANNING_ON: + df2 = dask_cudf.io.parquet.read_parquet(tmpdir) + dd.assert_eq(df, df2, check_divisions=False) From 564c13cf0df0387a11cc81f2eec741031c527bd9 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 13 Nov 2024 10:41:48 -0800 Subject: [PATCH 08/14] proper test fix - and disable dataset processing unless necessary --- python/cudf/cudf/io/parquet.py | 7 +++++++ python/dask_cudf/dask_cudf/_legacy/io/parquet.py | 3 ++- python/dask_cudf/dask_cudf/io/__init__.py | 11 +++++++++-- python/dask_cudf/dask_cudf/io/parquet.py | 3 ++- .../dask_cudf/dask_cudf/io/tests/test_parquet.py | 15 +++++++++++---- 5 files changed, 31 insertions(+), 8 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index ce99f98b559..fff6a454471 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -368,6 +368,13 @@ def _process_dataset( file_list = paths if len(paths) == 1 and ioutils.is_directory(paths[0]): paths = ioutils.stringify_pathlike(paths[0]) + elif ( + filters is None + and (dataset_kwargs or {}).get("partitioning", None) is None + ): + # Skip dataset processing if we have no filters + # or hive/directory partitioning to deal with. + return paths, row_groups, [], {} # Convert filters to ds.Expression if filters is not None: diff --git a/python/dask_cudf/dask_cudf/_legacy/io/parquet.py b/python/dask_cudf/dask_cudf/_legacy/io/parquet.py index 39ac6474958..c0638e4a1c3 100644 --- a/python/dask_cudf/dask_cudf/_legacy/io/parquet.py +++ b/python/dask_cudf/dask_cudf/_legacy/io/parquet.py @@ -86,7 +86,8 @@ def _read_paths( ) dataset_kwargs = dataset_kwargs or {} - dataset_kwargs["partitioning"] = partitioning or "hive" + if partitions: + dataset_kwargs["partitioning"] = partitioning or "hive" # Use cudf to read in data try: diff --git a/python/dask_cudf/dask_cudf/io/__init__.py b/python/dask_cudf/dask_cudf/io/__init__.py index e92853e0fec..212951336c9 100644 --- a/python/dask_cudf/dask_cudf/io/__init__.py +++ b/python/dask_cudf/dask_cudf/io/__init__.py @@ -1,6 +1,6 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from dask_cudf import _deprecated_api +from dask_cudf import _deprecated_api, QUERY_PLANNING_ON from . import csv, orc, json, parquet, text # noqa: F401 @@ -22,7 +22,14 @@ read_text = _deprecated_api( "dask_cudf.io.read_text", new_api="dask_cudf.read_text" ) -read_parquet = parquet.read_parquet +if QUERY_PLANNING_ON: + read_parquet = parquet.read_parquet +else: + read_parquet = _deprecated_api( + "The legacy dask_cudf.io.read_parquet API", + new_api="dask_cudf.read_parquet", + rec="", + ) to_parquet = _deprecated_api( "dask_cudf.io.to_parquet", new_api="dask_cudf._legacy.io.parquet.to_parquet", diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 214fc0c5ec7..b3fbba4492f 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -821,8 +821,9 @@ def read_parquet_expr( read_parquet.__doc__ = read_parquet_expr.__doc__ else: read_parquet = _deprecated_api( - "dask_cudf.io.parquet.read_parquet", + "The legacy dask_cudf.io.parquet.read_parquet API", new_api="dask_cudf.read_parquet", + rec="", ) to_parquet = _deprecated_api( "dask_cudf.io.parquet.to_parquet", diff --git a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py index 27a0d3ebb1c..6efe6c4f388 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py @@ -677,10 +677,17 @@ def test_deprecated_api_paths(tmpdir): with pytest.warns(match="dask_cudf.io.to_parquet is now deprecated"): dask_cudf.io.to_parquet(df, tmpdir) - # Allow internal read_parquet import - df2 = dask_cudf.io.read_parquet(tmpdir) - dd.assert_eq(df, df2, check_divisions=False) - if dask_cudf.QUERY_PLANNING_ON: + df2 = dask_cudf.io.read_parquet(tmpdir) + dd.assert_eq(df, df2, check_divisions=False) + df2 = dask_cudf.io.parquet.read_parquet(tmpdir) dd.assert_eq(df, df2, check_divisions=False) + else: + with pytest.warns(match="legacy dask_cudf.io.read_parquet"): + df2 = dask_cudf.io.read_parquet(tmpdir) + dd.assert_eq(df, df2, check_divisions=False) + + with pytest.warns(match="legacy dask_cudf.io.parquet.read_parquet"): + df2 = dask_cudf.io.parquet.read_parquet(tmpdir) + dd.assert_eq(df, df2, check_divisions=False) From 64dd105d96a2c3a74bd3a5e725f8db409324a5ca Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 13 Nov 2024 12:33:47 -0800 Subject: [PATCH 09/14] preserve default hive handling in cudf --- python/cudf/cudf/io/parquet.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index fff6a454471..750c6cec180 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -370,7 +370,8 @@ def _process_dataset( paths = ioutils.stringify_pathlike(paths[0]) elif ( filters is None - and (dataset_kwargs or {}).get("partitioning", None) is None + and isinstance(dataset_kwargs, dict) + and dataset_kwargs.get("partitioning") is None ): # Skip dataset processing if we have no filters # or hive/directory partitioning to deal with. From 0291b95827a6054453087e71a1e36ecbfe65a96f Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 18 Nov 2024 07:10:09 -0800 Subject: [PATCH 10/14] address code review --- python/dask_cudf/dask_cudf/io/parquet.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index b3fbba4492f..89e15af1145 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -37,7 +37,6 @@ _DEVICE_SIZE_CACHE: int | None = None -_STATS_CACHE: MutableMapping[str, Any] = {} def _get_device_size(): @@ -90,13 +89,13 @@ def operation(x): class CudfReadParquetFSSpec(ReadParquetFSSpec): + _STATS_CACHE: MutableMapping[str, Any] = {} + def approx_statistics(self): # Use a few files to approximate column-size statistics - global _STATS_CACHE - key = tokenize(self._dataset_info["ds"].files[:10], self.filters) try: - return _STATS_CACHE[key] + return self._STATS_CACHE[key] except KeyError: # Account for filters @@ -125,7 +124,7 @@ def approx_statistics(self): break # Reorganize stats to look like arrow-fs version - _STATS_CACHE[key] = { + self._STATS_CACHE[key] = { "columns": [ { "path_in_schema": name, @@ -134,7 +133,7 @@ def approx_statistics(self): for name, sizes in column_sizes.items() ] } - return _STATS_CACHE[key] + return self._STATS_CACHE[key] @functools.cached_property def _fusion_compression_factor(self): From 0832e5522c9891c857f5b80e592be7287224a9bf Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 18 Nov 2024 11:26:57 -0800 Subject: [PATCH 11/14] sample single worker for device size --- python/dask_cudf/dask_cudf/io/parquet.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 89e15af1145..7016cb323a3 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -39,17 +39,22 @@ _DEVICE_SIZE_CACHE: int | None = None -def _get_device_size(): +def _get_min_device_size(): try: - # Plan A: Use PyNVML to check device size + # Use PyNVML to find minimum device size + # on the same node as this worker. + # (Assume this is representative of the cluster) import pynvml pynvml.nvmlInit() - handle = pynvml.nvmlDeviceGetHandleByIndex(0) - return pynvml.nvmlDeviceGetMemoryInfo(handle).total - except ImportError: + device_sizes = [] + for i in range(pynvml.nvmlDeviceGetCount()): + handle = pynvml.nvmlDeviceGetHandleByIndex(i) + device_sizes.append(pynvml.nvmlDeviceGetMemoryInfo(handle).total) + return min(device_sizes) + except (ImportError, ValueError): # Fall back to a conservative 8GiB default - return "8GiB" + return 8 * 1024**3 def _normalize_blocksize(fraction: float = 0.03125): @@ -66,9 +71,9 @@ def _normalize_blocksize(fraction: float = 0.03125): client = get_client() # TODO: Check "GPU" worker resources only. # Depends on (https://github.com/rapidsai/dask-cuda/pull/1401) - device_size = min(client.run(_get_device_size).values()) + device_size = client.submit(_get_min_device_size).result() except (ImportError, ValueError): - device_size = _get_device_size() + device_size = _get_min_device_size() _DEVICE_SIZE_CACHE = device_size return int(_DEVICE_SIZE_CACHE * fraction) From 21959474991a3d2d943f2533ee18bf5d35aa9f18 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 19 Nov 2024 06:39:40 -0800 Subject: [PATCH 12/14] use CUDA_VISIBLE_DEVICES --- python/dask_cudf/dask_cudf/io/parquet.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 7016cb323a3..26302e6f066 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -5,6 +5,7 @@ import functools import itertools import math +import os import warnings from typing import TYPE_CHECKING, Any @@ -39,19 +40,16 @@ _DEVICE_SIZE_CACHE: int | None = None -def _get_min_device_size(): +def _get_device_size(): try: - # Use PyNVML to find minimum device size - # on the same node as this worker. - # (Assume this is representative of the cluster) + # Use PyNVML to find the worker device size. import pynvml pynvml.nvmlInit() - device_sizes = [] - for i in range(pynvml.nvmlDeviceGetCount()): - handle = pynvml.nvmlDeviceGetHandleByIndex(i) - device_sizes.append(pynvml.nvmlDeviceGetMemoryInfo(handle).total) - return min(device_sizes) + dev = int(os.environ.get("CUDA_VISIBLE_DEVICES", "0").split(",")[0]) + handle = pynvml.nvmlDeviceGetHandleByIndex(dev) + return pynvml.nvmlDeviceGetMemoryInfo(handle).total + except (ImportError, ValueError): # Fall back to a conservative 8GiB default return 8 * 1024**3 @@ -71,9 +69,9 @@ def _normalize_blocksize(fraction: float = 0.03125): client = get_client() # TODO: Check "GPU" worker resources only. # Depends on (https://github.com/rapidsai/dask-cuda/pull/1401) - device_size = client.submit(_get_min_device_size).result() + device_size = min(client.run(_get_device_size).values()) except (ImportError, ValueError): - device_size = _get_min_device_size() + device_size = _get_device_size() _DEVICE_SIZE_CACHE = device_size return int(_DEVICE_SIZE_CACHE * fraction) From 0ecf6dd93ac10c753a19a2feefe97c2a1c4b3628 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 19 Nov 2024 08:15:01 -0800 Subject: [PATCH 13/14] support mig --- python/dask_cudf/dask_cudf/io/parquet.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 26302e6f066..996287a0fa0 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -46,8 +46,13 @@ def _get_device_size(): import pynvml pynvml.nvmlInit() - dev = int(os.environ.get("CUDA_VISIBLE_DEVICES", "0").split(",")[0]) - handle = pynvml.nvmlDeviceGetHandleByIndex(dev) + index = os.environ.get("CUDA_VISIBLE_DEVICES", "0").split(",")[0] + if not index.isnumeric(): + # This means index is UUID. This works for both MIG and non-MIG device UUIDs. + handle = pynvml.nvmlDeviceGetHandleByUUID(str.encode(index)) + else: + # This is a device index + handle = pynvml.nvmlDeviceGetHandleByIndex(int(index)) return pynvml.nvmlDeviceGetMemoryInfo(handle).total except (ImportError, ValueError): From fa03396aed214e7e233c082c7b3afba7ec852efd Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Tue, 19 Nov 2024 10:45:09 -0600 Subject: [PATCH 14/14] Update python/dask_cudf/dask_cudf/io/parquet.py --- python/dask_cudf/dask_cudf/io/parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 996287a0fa0..bf8fae552c2 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -47,7 +47,7 @@ def _get_device_size(): pynvml.nvmlInit() index = os.environ.get("CUDA_VISIBLE_DEVICES", "0").split(",")[0] - if not index.isnumeric(): + if index and not index.isnumeric(): # This means index is UUID. This works for both MIG and non-MIG device UUIDs. handle = pynvml.nvmlDeviceGetHandleByUUID(str.encode(index)) else: