Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blocksize to DocumentDataset.read_* that uses dask_cudf.read_* #285

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
04139d0
fc
praateekmahajan Oct 8, 2024
953c84e
merge conflict
praateekmahajan Nov 15, 2024
b2de5cb
review comments
praateekmahajan Nov 15, 2024
eb49b70
make blocksize work with parquet
praateekmahajan Nov 18, 2024
386d443
filetype
praateekmahajan Nov 19, 2024
f4c963a
complete merge
praateekmahajan Nov 19, 2024
aa47a37
fix merge
praateekmahajan Nov 19, 2024
fed553e
Merge branch 'main' of github.com:NVIDIA/NeMo-Curator into praateek/t…
praateekmahajan Nov 20, 2024
c1ea0fb
add test cases
praateekmahajan Nov 22, 2024
3a0f13f
add test file
praateekmahajan Nov 22, 2024
9c6428c
failing test for select_columns
praateekmahajan Nov 22, 2024
1e8a7fc
rename func name
praateekmahajan Nov 22, 2024
2f18132
add test case for different columns
praateekmahajan Dec 6, 2024
a540d05
improve test for different_cols
praateekmahajan Dec 6, 2024
999e46b
..
praateekmahajan Dec 6, 2024
2599f26
review comments + add warnings for inconsistent schemas
praateekmahajan Dec 13, 2024
09ca9d9
Update nemo_curator/utils/distributed_utils.py
praateekmahajan Dec 15, 2024
70efd69
Update nemo_curator/utils/distributed_utils.py
praateekmahajan Dec 15, 2024
91671ec
Update nemo_curator/utils/distributed_utils.py
praateekmahajan Dec 15, 2024
a4fcd2f
Update nemo_curator/utils/distributed_utils.py
praateekmahajan Dec 15, 2024
8e4827f
Update nemo_curator/utils/distributed_utils.py
praateekmahajan Dec 15, 2024
0cd86a6
Update nemo_curator/utils/distributed_utils.py
praateekmahajan Dec 15, 2024
5871b83
Update nemo_curator/utils/distributed_utils.py
praateekmahajan Dec 15, 2024
0284045
Merge branch 'main' of github.com:praateekmahajan/NeMo-Curator into p…
praateekmahajan Dec 16, 2024
7aeca38
Merge branch 'main' of github.com:NVIDIA/NeMo-Curator into praateek/t…
praateekmahajan Dec 16, 2024
fdd0c69
Merge branch 'praateek/try-dask-cudf-read-json' of github.com:praatee…
praateekmahajan Dec 16, 2024
fc196d5
fix tests
praateekmahajan Dec 16, 2024
e52d0ea
Merge branch 'main' into praateek/try-dask-cudf-read-json
sarahyurick Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/user-guide/bestpractices.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ Handling GPU Out-of-Memory (OOM) Errors
NeMo Curator is designed to be scalable with large amounts of text data, but OOM errors occur when the available GPU memory is insufficient for a given task.
To help avoid these issues and ensure efficient processing, here are some strategies for managing memory usage and mitigating OOM challenges.

Controlling Partition Sizes
~~~~~~~~~~~~~~~~~~~~~~~~~~~

You should consider using ``files_per_partition`` or ``blocksize`` when reading data. This can help reduce the memory load by processing large datasets in smaller chunks.

#. ``blocksize`` argument is available for ``jsonl`` and ``parquet`` files. But for `parquet` files it's only available when ``add_filename=False``
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@praateekmahajan , Have we filed issues to cuDF to support these features long term.

  1. Supporting add_file_name to cuDF with multiple files (to decrease that perf delta)
  2. Blocksize+add_file_name argument for cuDF.

Copy link
Collaborator Author

@praateekmahajan praateekmahajan Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, i think there is confusion, we need to resurrect the issue dask/dask#6575 so that we can have add_filename also be supported for cudf + parquet (if not cpu)


#. For ``blocksize``, the recommendation is to use 1/32 of the total GPU memory. For example, if you have a GPU with 32GB of memory, you can set the blocksize to ``1gb``.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that https://github.com/rapidsai/cudf/pull/17250/files should we also note that by default we will move to 1/32 by default if nothing is provided.

What do you think ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, we can't support 1/32 as an input value because dask_cudf.read_parquet supports it but not read_json (or either of cpu implementations)



Utilize RMM Options
~~~~~~~~~~~~~~~~~~~
`RAPIDS Memory Manager (RMM) <https://github.com/rapidsai/rmm>`_ is a package that enables you to allocate device memory in a highly configurable way.
Expand Down Expand Up @@ -59,6 +70,7 @@ Alternatively, you can set these flags while initializing your own Dask client,

client = Client(cluster)


Fuzzy Deduplication Guidelines
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Fuzzy deduplication is one of the most computationally expensive algorithms within the NeMo Curator pipeline.
Expand Down
24 changes: 14 additions & 10 deletions nemo_curator/datasets/doc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def read_json(
cls,
input_files: Union[str, List[str]],
backend: Literal["pandas", "cudf"] = "pandas",
files_per_partition: int = 1,
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = "1gb",
add_filename: bool = False,
input_meta: Union[str, dict] = None,
columns: Optional[List[str]] = None,
Expand All @@ -69,8 +70,9 @@ def read_json(
input_files=input_files,
file_type="jsonl",
backend=backend,
files_per_partition=files_per_partition,
add_filename=add_filename,
files_per_partition=files_per_partition,
blocksize=blocksize,
input_meta=input_meta,
columns=columns,
**kwargs,
Expand All @@ -82,8 +84,9 @@ def read_parquet(
cls,
input_files: Union[str, List[str]],
backend: Literal["pandas", "cudf"] = "pandas",
files_per_partition: int = 1,
add_filename: bool = False,
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = "1gb",
add_filename=False,
columns: Optional[List[str]] = None,
**kwargs,
) -> "DocumentDataset":
Expand All @@ -104,8 +107,9 @@ def read_parquet(
input_files=input_files,
file_type="parquet",
backend=backend,
files_per_partition=files_per_partition,
add_filename=add_filename,
files_per_partition=files_per_partition,
blocksize=blocksize,
columns=columns,
**kwargs,
)
Expand All @@ -116,8 +120,6 @@ def read_pickle(
cls,
input_files: Union[str, List[str]],
backend: Literal["pandas", "cudf"] = "pandas",
files_per_partition: int = 1,
add_filename: bool = False,
columns: Optional[List[str]] = None,
**kwargs,
) -> "DocumentDataset":
Expand All @@ -137,8 +139,6 @@ def read_pickle(
input_files=input_files,
file_type="pickle",
backend=backend,
files_per_partition=files_per_partition,
add_filename=add_filename,
columns=columns,
**kwargs,
)
Expand Down Expand Up @@ -229,8 +229,9 @@ def _read_json_or_parquet(
input_files: Union[str, List[str]],
file_type: str,
backend: Literal["cudf", "pandas"],
files_per_partition: int,
add_filename: bool,
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = None,
input_meta: Union[str, dict] = None,
columns: Optional[List[str]] = None,
**kwargs,
Expand Down Expand Up @@ -262,6 +263,7 @@ def _read_json_or_parquet(
file_type=file_type,
backend=backend,
files_per_partition=files_per_partition,
blocksize=blocksize,
add_filename=add_filename,
input_meta=input_meta,
columns=columns,
Expand All @@ -281,6 +283,7 @@ def _read_json_or_parquet(
file_type=file_type,
backend=backend,
files_per_partition=files_per_partition,
blocksize=blocksize,
add_filename=add_filename,
input_meta=input_meta,
columns=columns,
Expand All @@ -306,6 +309,7 @@ def _read_json_or_parquet(
file_type=file_type,
backend=backend,
files_per_partition=files_per_partition,
blocksize=blocksize,
add_filename=add_filename,
input_meta=input_meta,
columns=columns,
Expand Down
182 changes: 147 additions & 35 deletions nemo_curator/utils/distributed_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import ast
import os

import dask

os.environ["RAPIDS_NO_INITIALIZE"] = "1"
import random
import warnings
from contextlib import nullcontext
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Literal, Optional, Union
from typing import Callable, Dict, List, Literal, Optional, Union

import dask.dataframe as dd
import numpy as np
Expand Down Expand Up @@ -267,6 +269,23 @@ def _set_torch_to_use_rmm():
torch.cuda.memory.change_current_allocator(rmm_torch_allocator)


def select_columns(
df: Union[dd.DataFrame, dask_cudf.DataFrame],
columns: List[str],
filetype: Literal["jsonl", "json", "parquet"],
add_filename: bool,
) -> Union[dd.DataFrame, dask_cudf.DataFrame]:
# We exclude parquet because the parquet readers already support column selection
if filetype in ["jsonl", "json"] and columns is not None:
if add_filename and "filename" not in columns:
columns.append("filename")
df = df[columns]

df = df[sorted(df.columns)]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sorting also isn't respected under the read_files_fpp path. Removing the sorting still results in test_read_data_select_columns failures but far fewer than when we sort.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this a lot, we can have underlying data that has columns with order not respected. Doing this makes a lot of sense especially for formats like jsonl


return df


def read_single_partition(
files: List[str],
backend: Literal["cudf", "pandas"] = "cudf",
Expand Down Expand Up @@ -349,13 +368,99 @@ def read_single_partition(
else:
df = read_f(files, **read_kwargs, **kwargs)

if filetype in ["jsonl", "json"] and columns is not None:
if add_filename and "filename" not in columns:
columns.append("filename")
df = df[columns]
return select_columns(df, columns, filetype, add_filename)

df = df[sorted(df.columns)]
return df

def read_data_blocksize(
input_files: List[str],
backend: Literal["cudf", "pandas"],
file_type: Literal["parquet", "jsonl"],
blocksize: str,
add_filename: bool = False,
input_meta: Union[str, dict] = None,
columns: Optional[List[str]] = None,
**kwargs,
) -> Union[dd.DataFrame, dask_cudf.DataFrame]:

read_kwargs = dict()

postprocessing_func: Optional[Callable[[dd.DataFrame], dd.DataFrame]] = None
if file_type == "jsonl":
if backend == "panads":
warnings.warn(
"Pandas backend with blocksize cannot read multiple JSONL files into a single partition. "
"Use files_per_partition if blocksize exceeds average file size"
)
read_func = dd.read_json
read_kwargs["lines"] = True
if input_meta is not None:
if backend == "cudf":
# To save GPU memory, we prune columns while reading, and keep only those that are
# specified in the input_meta
read_kwargs["prune_columns"] = True

Comment on lines +411 to +414
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just add a note about this upstream.

read_kwargs["dtype"] = (
ast.literal_eval(input_meta)
if isinstance(input_meta, str)
else input_meta
)
if add_filename:

def extract_filename(path: str) -> str:
return os.path.basename(path)

read_kwargs["include_path_column"] = add_filename
read_kwargs["path_converter"] = extract_filename
postprocessing_func = lambda df: df.rename(columns={"path": "filename"})

elif file_type == "parquet":
if add_filename:
msg = "add_filename and blocksize cannot be set at the same time for parquet files"
raise ValueError(msg)
read_func = dd.read_parquet
read_kwargs["columns"] = columns
read_kwargs["aggregate_files"] = True
else:
msg = f"Reading with blocksize is only supported for jsonl and parquet files, not {file_type=}"
raise ValueError(msg)

with dask.config.set({"dataframe.backend": backend}):
df = read_func(input_files, blocksize=blocksize, **read_kwargs, **kwargs)
if postprocessing_func is not None:
df = postprocessing_func(df)
return select_columns(df, columns, file_type, add_filename)


def read_data_fpp(
input_files: List[str],
file_type: Literal["parquet", "json", "jsonl"],
backend: Literal["cudf", "pandas"] = "cudf",
add_filename: bool = False,
files_per_partition: Optional[int] = None,
input_meta: Union[str, dict] = None,
columns: Optional[List[str]] = None,
**kwargs,
) -> Union[dd.DataFrame, dask_cudf.DataFrame]:
input_files = sorted(input_files)
if files_per_partition > 1:
input_files = [
input_files[i : i + files_per_partition]
for i in range(0, len(input_files), files_per_partition)
]
else:
input_files = [[file] for file in input_files]

return dd.from_map(
read_single_partition,
input_files,
filetype=file_type,
backend=backend,
add_filename=add_filename,
input_meta=input_meta,
enforce_metadata=False,
columns=columns,
**kwargs,
)


def read_pandas_pickle(
Expand Down Expand Up @@ -388,7 +493,8 @@ def read_data(
input_files: Union[str, List[str]],
file_type: str = "pickle",
backend: Literal["cudf", "pandas"] = "cudf",
files_per_partition: int = 1,
blocksize: Optional[str] = None,
files_per_partition: Optional[int] = 1,
add_filename: bool = False,
input_meta: Union[str, dict] = None,
columns: Optional[List[str]] = None,
Expand All @@ -412,13 +518,8 @@ def read_data(
A Dask-cuDF or a Dask-pandas DataFrame.

"""
if backend == "cudf":
# Try using cuDF. If not availible will throw an error.
test_obj = cudf.Series

if isinstance(input_files, str):
input_files = [input_files]

if file_type == "pickle":
df = read_pandas_pickle(
input_files[0], add_filename=add_filename, columns=columns, **kwargs
Expand All @@ -441,29 +542,40 @@ def read_data(
)

print(f"Reading {len(input_files)} files", flush=True)
input_files = sorted(input_files)

if files_per_partition > 1:
input_files = [
input_files[i : i + files_per_partition]
for i in range(0, len(input_files), files_per_partition)
]

if blocksize is not None and files_per_partition is not None:
msg = "blocksize and files_per_partition cannot be set at the same time"
raise ValueError(msg)

if blocksize is not None and (
file_type == "jsonl" or (file_type == "parquet" and not add_filename)
):
return read_data_blocksize(
input_files,
backend=backend,
file_type=file_type,
blocksize=blocksize,
add_filename=add_filename,
input_meta=input_meta,
columns=columns,
**kwargs,
)
else:
input_files = [[file] for file in input_files]

return dd.from_map(
read_single_partition,
input_files,
filetype=file_type,
backend=backend,
add_filename=add_filename,
input_meta=input_meta,
enforce_metadata=False,
columns=columns,
**kwargs,
)

if backend == "cudf" and (
file_type == "jsonl" or (file_type == "parquet" and not add_filename)
):
warnings.warn(
"Consider passing in blocksize for better control over memory usage."
)
return read_data_fpp(
input_files,
file_type=file_type,
backend=backend,
add_filename=add_filename,
files_per_partition=files_per_partition,
input_meta=input_meta,
columns=columns,
**kwargs,
)
else:
raise RuntimeError("Could not read data, please check file type")

Expand Down
Loading
Loading