Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blocksize to DocumentDataset.read_* that uses dask_cudf.read_* #285

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
04139d0
fc
praateekmahajan Oct 8, 2024
953c84e
merge conflict
praateekmahajan Nov 15, 2024
b2de5cb
review comments
praateekmahajan Nov 15, 2024
eb49b70
make blocksize work with parquet
praateekmahajan Nov 18, 2024
386d443
filetype
praateekmahajan Nov 19, 2024
f4c963a
complete merge
praateekmahajan Nov 19, 2024
aa47a37
fix merge
praateekmahajan Nov 19, 2024
fed553e
Merge branch 'main' of github.com:NVIDIA/NeMo-Curator into praateek/t…
praateekmahajan Nov 20, 2024
c1ea0fb
add test cases
praateekmahajan Nov 22, 2024
3a0f13f
add test file
praateekmahajan Nov 22, 2024
9c6428c
failing test for select_columns
praateekmahajan Nov 22, 2024
1e8a7fc
rename func name
praateekmahajan Nov 22, 2024
2f18132
add test case for different columns
praateekmahajan Dec 6, 2024
a540d05
improve test for different_cols
praateekmahajan Dec 6, 2024
999e46b
..
praateekmahajan Dec 6, 2024
2599f26
review comments + add warnings for inconsistent schemas
praateekmahajan Dec 13, 2024
09ca9d9
Update nemo_curator/utils/distributed_utils.py
praateekmahajan Dec 15, 2024
70efd69
Update nemo_curator/utils/distributed_utils.py
praateekmahajan Dec 15, 2024
91671ec
Update nemo_curator/utils/distributed_utils.py
praateekmahajan Dec 15, 2024
a4fcd2f
Update nemo_curator/utils/distributed_utils.py
praateekmahajan Dec 15, 2024
8e4827f
Update nemo_curator/utils/distributed_utils.py
praateekmahajan Dec 15, 2024
0cd86a6
Update nemo_curator/utils/distributed_utils.py
praateekmahajan Dec 15, 2024
5871b83
Update nemo_curator/utils/distributed_utils.py
praateekmahajan Dec 15, 2024
0284045
Merge branch 'main' of github.com:praateekmahajan/NeMo-Curator into p…
praateekmahajan Dec 16, 2024
7aeca38
Merge branch 'main' of github.com:NVIDIA/NeMo-Curator into praateek/t…
praateekmahajan Dec 16, 2024
fdd0c69
Merge branch 'praateek/try-dask-cudf-read-json' of github.com:praatee…
praateekmahajan Dec 16, 2024
fc196d5
fix tests
praateekmahajan Dec 16, 2024
e52d0ea
Merge branch 'main' into praateek/try-dask-cudf-read-json
sarahyurick Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions nemo_curator/datasets/doc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def read_json(
cls,
input_files: Union[str, List[str]],
backend: str = "pandas",
files_per_partition: int = 1,
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = "1gb",
add_filename: bool = False,
input_meta: Union[str, dict] = None,
columns: Optional[List[str]] = None,
Expand All @@ -54,8 +55,9 @@ def read_json(
input_files=input_files,
file_type="jsonl",
backend=backend,
files_per_partition=files_per_partition,
add_filename=add_filename,
files_per_partition=files_per_partition,
blocksize=blocksize,
input_meta=input_meta,
columns=columns,
**kwargs,
Expand All @@ -67,7 +69,8 @@ def read_parquet(
cls,
input_files,
backend="pandas",
files_per_partition=1,
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = "1gb",
add_filename=False,
columns: Optional[List[str]] = None,
**kwargs,
Expand All @@ -77,8 +80,9 @@ def read_parquet(
input_files=input_files,
file_type="parquet",
backend=backend,
files_per_partition=files_per_partition,
add_filename=add_filename,
files_per_partition=files_per_partition,
blocksize=blocksize,
columns=columns,
**kwargs,
)
Expand All @@ -89,8 +93,6 @@ def read_pickle(
cls,
input_files,
backend="pandas",
files_per_partition=1,
add_filename=False,
columns: Optional[List[str]] = None,
**kwargs,
):
Expand All @@ -99,8 +101,6 @@ def read_pickle(
input_files=input_files,
file_type="pickle",
backend=backend,
files_per_partition=files_per_partition,
add_filename=add_filename,
columns=columns,
**kwargs,
)
Expand Down Expand Up @@ -191,8 +191,9 @@ def _read_json_or_parquet(
input_files: Union[str, List[str]],
file_type: str,
backend: str,
files_per_partition: int,
add_filename: bool,
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = None,
input_meta: Union[str, dict] = None,
columns: Optional[List[str]] = None,
**kwargs,
Expand Down Expand Up @@ -224,6 +225,7 @@ def _read_json_or_parquet(
file_type=file_type,
backend=backend,
files_per_partition=files_per_partition,
blocksize=blocksize,
add_filename=add_filename,
input_meta=input_meta,
columns=columns,
Expand All @@ -243,6 +245,7 @@ def _read_json_or_parquet(
file_type=file_type,
backend=backend,
files_per_partition=files_per_partition,
blocksize=blocksize,
add_filename=add_filename,
input_meta=input_meta,
columns=columns,
Expand All @@ -268,6 +271,7 @@ def _read_json_or_parquet(
file_type=file_type,
backend=backend,
files_per_partition=files_per_partition,
blocksize=blocksize,
add_filename=add_filename,
input_meta=input_meta,
columns=columns,
Expand Down
164 changes: 130 additions & 34 deletions nemo_curator/utils/distributed_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from contextlib import nullcontext
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Union
from typing import Dict, List, Literal, Optional, Union

import dask.dataframe as dd
import numpy as np
Expand Down Expand Up @@ -261,6 +261,20 @@ def _set_torch_to_use_rmm():
torch.cuda.memory.change_current_allocator(rmm_torch_allocator)


def select_and_sort_columns(
df: Union[dd.DataFrame, dask_cudf.DataFrame],
columns: List[str],
add_filename: bool,
) -> Union[dd.DataFrame, dask_cudf.DataFrame]:
# TODO : Reviewer TAL if filetype check is needed
if columns is not None:
if add_filename and "filename" not in columns:
columns.append("filename")
df = df[columns]
df = df[sorted(df.columns)]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sorting also isn't respected under the read_files_fpp path. Removing the sorting still results in test_read_data_select_columns failures but far fewer than when we sort.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this a lot, we can have underlying data that has columns with order not respected. Doing this makes a lot of sense especially for formats like jsonl

return df


def read_single_partition(
files,
backend="cudf",
Expand Down Expand Up @@ -343,13 +357,80 @@ def read_single_partition(
else:
df = read_f(files, **read_kwargs, **kwargs)

if filetype in ["jsonl", "json"] and columns is not None:
if add_filename and "filename" not in columns:
columns.append("filename")
df = df[columns]
print(f"Reading with {read_kwargs=}", flush=True)
return select_and_sort_columns(df, columns, add_filename)

df = df[sorted(df.columns)]
return df

def read_data_cudf_blocksize(
input_files: List[str],
file_type: Literal["parquet", "jsonl"],
blocksize: str,
add_filename: bool = False,
input_meta: Union[str, dict] = None,
columns: Optional[List[str]] = None,
**kwargs,
) -> dask_cudf.DataFrame:
import dask_cudf

read_kwargs = dict()
if file_type == "jsonl":
read_func = dask_cudf.read_json
read_kwargs["lines"] = True
if input_meta is not None:
read_kwargs["prune_columns"] = True
read_kwargs["dtype"] = (
ast.literal_eval(input_meta)
if isinstance(input_meta, str)
else input_meta
)
if add_filename:
read_kwargs["include_path_column"] = add_filename

elif file_type == "parquet":
if add_filename:
msg = "add_filename and blocksize cannot be set at the same time for parquet files"
raise ValueError(msg)
read_func = dask_cudf.read_parquet
read_kwargs["columns"] = columns
else:
msg = f"Reading with blocksize is only supported for jsonl and parquet files, not {file_type=}"
raise ValueError(msg)

print(f"Reading {blocksize=} with {read_kwargs=} {kwargs=}", flush=True)
df = read_func(input_files, blocksize=blocksize, **read_kwargs, **kwargs)
return select_and_sort_columns(df, columns, add_filename)


def read_data_fpp(
input_files: List[str],
file_type: Literal["parquet", "json", "jsonl"],
backend: Literal["cudf", "pandas"] = "cudf",
add_filename: bool = False,
files_per_partition: Optional[int] = None,
input_meta: Union[str, dict] = None,
columns: Optional[List[str]] = None,
**kwargs,
) -> Union[dd.DataFrame, dask_cudf.DataFrame]:
input_files = sorted(input_files)
if files_per_partition > 1:
input_files = [
input_files[i : i + files_per_partition]
for i in range(0, len(input_files), files_per_partition)
]
else:
input_files = [[file] for file in input_files]

return dd.from_map(
read_single_partition,
input_files,
filetype=file_type,
backend=backend,
add_filename=add_filename,
input_meta=input_meta,
enforce_metadata=False,
columns=columns,
**kwargs,
)


def read_pandas_pickle(
Expand All @@ -375,10 +456,11 @@ def read_pandas_pickle(


def read_data(
input_files,
input_files: Union[str, List[str]],
file_type: str = "pickle",
backend: str = "cudf",
files_per_partition: int = 1,
backend: Literal["cudf", "pandas"] = "cudf",
blocksize: Optional[str] = None,
files_per_partition: Optional[int] = 1,
add_filename: bool = False,
input_meta: Union[str, dict] = None,
columns: Optional[List[str]] = None,
Expand All @@ -402,39 +484,53 @@ def read_data(
A Dask-cuDF or a Dask-pandas DataFrame.

"""
if backend == "cudf":
# Try using cuDF. If not availible will throw an error.
test_obj = cudf.Series

if isinstance(input_files, str):
input_files = [input_files]
if file_type == "pickle":
df = read_pandas_pickle(
input_files[0], add_filename=add_filename, columns=columns, **kwargs
)
df = dd.from_pandas(df, npartitions=16)
if backend == "cudf":
df = df.to_backend("cudf")

elif file_type in ["json", "jsonl", "parquet"]:
df = select_and_sort_columns(df, columns, add_filename)
elif file_type in {"json", "jsonl", "parquet"}:
print(f"Reading {len(input_files)} files", flush=True)
input_files = sorted(input_files)
if files_per_partition > 1:
input_files = [
input_files[i : i + files_per_partition]
for i in range(0, len(input_files), files_per_partition)
]
if blocksize is not None and files_per_partition is not None:
msg = "blocksize and files_per_partition cannot be set at the same time"
raise ValueError(msg)

if (
blocksize is not None
and backend == "cudf"
and (file_type == "jsonl" or (file_type == "parquet" and not add_filename))
):
return read_data_cudf_blocksize(
input_files,
file_type=file_type,
blocksize=blocksize,
add_filename=add_filename,
input_meta=input_meta,
columns=columns,
**kwargs,
)
else:
input_files = [[file] for file in input_files]
return dd.from_map(
read_single_partition,
input_files,
filetype=file_type,
backend=backend,
add_filename=add_filename,
input_meta=input_meta,
enforce_metadata=False,
columns=columns,
**kwargs,
)
if backend == "cudf" and (
file_type == "jsonl" or (file_type == "parquet" and not add_filename)
):
warnings.warn(
"Consider passing in blocksize for better control over memory usage."
)
return read_data_fpp(
input_files,
file_type=file_type,
backend=backend,
add_filename=add_filename,
files_per_partition=files_per_partition,
input_meta=input_meta,
columns=columns,
**kwargs,
)
else:
raise RuntimeError("Could not read data, please check file type")
return df
Expand Down
Loading