Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Raise error when Ray Data tensor cannot be pickled and disable compliant nested types #2428

Merged
merged 12 commits into from
Jun 22, 2024
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
matrix:
python-version: ['3.8', '3.10']
daft-runner: [py, ray]
pyarrow-version: [7.0.0, 12.0.1]
pyarrow-version: [7.0.0, 13.0.0]
enable-aqe: [0, 1]
os: [ubuntu-20.04, windows-latest]
exclude:
Expand Down
2 changes: 1 addition & 1 deletion benchmarking/parquet/benchmark-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pytest==7.4.0
pytest-benchmark==4.0.0
pytest-memray==1.4.1
pyarrow==12.0.1
pyarrow==13.0.0
boto3==1.28.3
24 changes: 22 additions & 2 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1778,10 +1778,16 @@
return col_name in self.column_names

@DataframePublicAPI
def to_pandas(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> "pandas.DataFrame":
def to_pandas(
self, cast_tensors_to_ray_tensor_dtype: bool = False, coerce_temporal_nanoseconds: bool = False
) -> "pandas.DataFrame":
"""Converts the current DataFrame to a `pandas DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`__.
If results have not computed yet, collect will be called.

Args:
cast_tensors_to_ray_tensor_dtype (bool): Whether to cast tensors to Ray tensor dtype. Defaults to False.
coerce_temporal_nanoseconds (bool): Whether to coerce temporal columns to nanoseconds. Only applicable to pandas version >= 2.0 and pyarrow version >= 13.0.0. Defaults to False. See `pyarrow.Table.to_pandas <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pandas>`__ for more information.

Returns:
pandas.DataFrame: `pandas DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`__ converted from a Daft DataFrame

Expand All @@ -1795,6 +1801,7 @@
pd_df = result.to_pandas(
schema=self._builder.schema(),
cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype,
coerce_temporal_nanoseconds=coerce_temporal_nanoseconds,
)
return pd_df

Expand Down Expand Up @@ -1901,6 +1908,8 @@
@classmethod
def _from_ray_dataset(cls, ds: "ray.data.dataset.DataSet") -> "DataFrame":
"""Creates a DataFrame from a `Ray Dataset <https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.html#ray.data.Dataset>`__."""
from ray.exceptions import RayTaskError

context = get_context()
if context.runner_config.name != "ray":
raise ValueError("Daft needs to be running on the Ray Runner for this operation")
Expand All @@ -1912,7 +1921,18 @@

partition_set, schema = ray_runner_io.partition_set_from_ray_dataset(ds)
cache_entry = context.runner().put_partition_set_into_cache(partition_set)
size_bytes = partition_set.size_bytes()
try:
size_bytes = partition_set.size_bytes()
except RayTaskError as e:
import pyarrow as pa
from packaging.version import parse

Check warning on line 1928 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L1926-L1928

Added lines #L1926 - L1928 were not covered by tests

if "extension<arrow.fixed_shape_tensor>" in str(e) and parse(pa.__version__) < parse("13.0.0"):
raise ValueError(

Check warning on line 1931 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L1930-L1931

Added lines #L1930 - L1931 were not covered by tests
f"Reading Ray Dataset tensors is only supported with PyArrow >= 13.0.0, found {pa.__version__}. See this issue for more information: https://github.com/apache/arrow/pull/35933"
) from e
raise e

Check warning on line 1934 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L1934

Added line #L1934 was not covered by tests

num_rows = len(partition_set)
assert size_bytes is not None, "In-memory data should always have non-None size in bytes"
builder = LogicalPlanBuilder.from_in_memory_scan(
Expand Down
2 changes: 2 additions & 0 deletions daft/runners/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,13 @@ def to_pandas(
self,
schema: Schema | None = None,
cast_tensors_to_ray_tensor_dtype: bool = False,
coerce_temporal_nanoseconds: bool = False,
) -> pd.DataFrame:
merged_partition = self._get_merged_vpartition()
return merged_partition.to_pandas(
schema=schema,
cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype,
coerce_temporal_nanoseconds=coerce_temporal_nanoseconds,
)

def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> pa.Table:
Expand Down
11 changes: 9 additions & 2 deletions daft/table/micropartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,16 @@ def to_pydict(self) -> dict[str, list]:
def to_pylist(self) -> list[dict[str, Any]]:
return self.to_table().to_pylist()

def to_pandas(self, schema: Schema | None = None, cast_tensors_to_ray_tensor_dtype: bool = False) -> pd.DataFrame:
def to_pandas(
self,
schema: Schema | None = None,
cast_tensors_to_ray_tensor_dtype: bool = False,
coerce_temporal_nanoseconds: bool = False,
) -> pd.DataFrame:
return self.to_table().to_pandas(
schema=schema, cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype
schema=schema,
cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype,
coerce_temporal_nanoseconds=coerce_temporal_nanoseconds,
)

###
Expand Down
21 changes: 18 additions & 3 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,14 @@ def to_pylist(self) -> list[dict[str, Any]]:
column_names = self.column_names()
return [{colname: table[colname][i] for colname in column_names} for i in range(len(self))]

def to_pandas(self, schema: Schema | None = None, cast_tensors_to_ray_tensor_dtype: bool = False) -> pd.DataFrame:
def to_pandas(
self,
schema: Schema | None = None,
cast_tensors_to_ray_tensor_dtype: bool = False,
coerce_temporal_nanoseconds: bool = False,
) -> pd.DataFrame:
from packaging.version import parse

if not _PANDAS_AVAILABLE:
raise ImportError("Unable to import Pandas - please ensure that it is installed.")
python_fields = set()
Expand All @@ -236,12 +243,20 @@ def to_pandas(self, schema: Schema | None = None, cast_tensors_to_ray_tensor_dty
column = column_series.to_pylist()
else:
# Arrow-native field, so provide column as Arrow array.
column = column_series.to_arrow(cast_tensors_to_ray_tensor_dtype).to_pandas()
column_arrow = column_series.to_arrow(cast_tensors_to_ray_tensor_dtype)
if parse(pa.__version__) < parse("13.0.0"):
column = column_arrow.to_pandas()
else:
column = column_arrow.to_pandas(coerce_temporal_nanoseconds=coerce_temporal_nanoseconds)
table[colname] = column

return pd.DataFrame.from_dict(table)
else:
return self.to_arrow(cast_tensors_to_ray_tensor_dtype).to_pandas()
arrow_table = self.to_arrow(cast_tensors_to_ray_tensor_dtype)
if parse(pa.__version__) < parse("13.0.0"):
return arrow_table.to_pandas()
else:
return arrow_table.to_pandas(coerce_temporal_nanoseconds=coerce_temporal_nanoseconds)

###
# Compute methods (Table -> Table)
Expand Down
10 changes: 6 additions & 4 deletions daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ def write_tabular(
format = pads.ParquetFileFormat()
inflation_factor = execution_config.parquet_inflation_factor
target_file_size = execution_config.parquet_target_filesize
opts = format.make_write_options(compression=compression)
opts = format.make_write_options(compression=compression, use_compliant_nested_type=False)
elif file_format == FileFormat.Csv:
format = pads.CsvFileFormat()
opts = None
Expand Down Expand Up @@ -633,12 +633,14 @@ def file_visitor(written_file, protocol=protocol):

format = pads.ParquetFileFormat()

opts = format.make_write_options(compression="zstd", use_compliant_nested_type=False)

_write_tabular_arrow_table(
arrow_table=arrow_table,
schema=file_schema,
full_path=resolved_path,
format=format,
opts=format.make_write_options(compression="zstd"),
opts=opts,
fs=fs,
rows_per_file=rows_per_file,
rows_per_row_group=rows_per_row_group,
Expand Down Expand Up @@ -735,10 +737,10 @@ def file_visitor(written_file: Any) -> None:
target_row_groups = max(math.ceil(size_bytes / target_row_group_size / inflation_factor), 1)
rows_per_row_group = max(min(math.ceil(num_rows / target_row_groups), rows_per_file), 1)

opts = pads.ParquetFileFormat().make_write_options(use_compliant_nested_type=False)

format = pads.ParquetFileFormat()

opts = format.make_write_options(use_compliant_nested_type=False)

_write_tabular_arrow_table(
arrow_table=arrow_batch,
schema=None,
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Pillow==9.5.0
opencv-python==4.8.1.78

# Pyarrow
pyarrow==12.0.1
pyarrow==13.0.0
# Ray
ray[data, client]==2.7.1; python_version < '3.8'
ray[data, client]==2.10.0; python_version >= '3.8'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ uvicorn==0.23.2
uvloop==0.17.0
watchfiles==0.19.0
websockets==11.0.3
pyarrow==12.0.1
pyarrow==13.0.0
slowapi==0.1.8

# Pin numpy version otherwise pyarrow doesn't work
Expand Down
20 changes: 10 additions & 10 deletions tests/integration/sql/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_sql_show(test_db) -> None:
def test_sql_create_dataframe_ok(test_db, pdf) -> None:
df = daft.read_sql(f"SELECT * FROM {TEST_TABLE_NAME}", test_db)

assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand All @@ -40,7 +40,7 @@ def test_sql_partitioned_read(test_db, num_partitions, pdf) -> None:

df = daft.read_sql(f"SELECT * FROM {TEST_TABLE_NAME}", test_db, partition_col="id")
assert df.num_partitions() == num_partitions
assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand All @@ -53,7 +53,7 @@ def test_sql_partitioned_read_with_custom_num_partitions_and_partition_col(
f"SELECT * FROM {TEST_TABLE_NAME}", test_db, partition_col=partition_col, num_partitions=num_partitions
)
assert df.num_partitions() == num_partitions
assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand All @@ -66,7 +66,7 @@ def test_sql_partitioned_read_with_non_uniformly_distributed_column(test_db, num
num_partitions=num_partitions,
)
assert df.num_partitions() == num_partitions
assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand Down Expand Up @@ -122,7 +122,7 @@ def test_sql_read_with_binary_filter_pushdowns(test_db, column, operator, value,
df = df.where(df[column] <= value)
pdf = pdf[pdf[column] <= value]

assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand All @@ -133,7 +133,7 @@ def test_sql_read_with_is_null_filter_pushdowns(test_db, num_partitions, pdf) ->

pdf = pdf[pdf["null_col"].isnull()]

assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand All @@ -144,7 +144,7 @@ def test_sql_read_with_not_null_filter_pushdowns(test_db, num_partitions, pdf) -

pdf = pdf[pdf["null_col"].notnull()]

assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand All @@ -155,7 +155,7 @@ def test_sql_read_with_if_else_filter_pushdown(test_db, num_partitions, pdf) ->

pdf = pdf[(pdf["id"] > 100) & (pdf["float_col"] > 150) | (pdf["float_col"] < 50)]

assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand All @@ -165,7 +165,7 @@ def test_sql_read_with_is_in_filter_pushdown(test_db, num_partitions, pdf) -> No
df = df.where(df["id"].is_in([1, 2, 3]))

pdf = pdf[pdf["id"].isin([1, 2, 3])]
assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand Down Expand Up @@ -220,7 +220,7 @@ def create_conn():
return sqlalchemy.create_engine(test_db).connect()

df = daft.read_sql(f"SELECT * FROM {TEST_TABLE_NAME}", create_conn)
assert_df_equals(df.to_pandas(), pdf, sort_key="id")
assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id")


@pytest.mark.integration()
Expand Down
19 changes: 10 additions & 9 deletions tests/series/test_if_else.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,29 +428,25 @@ def test_series_if_else_extension_type(uuid_ext_type, if_true_storage, if_false_
(
np.arange(16).reshape((4, 2, 2)),
np.arange(16, 32).reshape((4, 2, 2)),
np.array(
[[[0, 1], [2, 3]], [[20, 21], [22, 23]], [[np.nan, np.nan], [np.nan, np.nan]], [[12, 13], [14, 15]]]
),
np.array([[[0, 1], [2, 3]], [[20, 21], [22, 23]], [[12, 13], [14, 15]]]),
),
# Broadcast left
(
np.arange(4).reshape((1, 2, 2)),
np.arange(16, 32).reshape((4, 2, 2)),
np.array([[[0, 1], [2, 3]], [[20, 21], [22, 23]], [[np.nan, np.nan], [np.nan, np.nan]], [[0, 1], [2, 3]]]),
np.array([[[0, 1], [2, 3]], [[20, 21], [22, 23]], [[0, 1], [2, 3]]]),
),
# Broadcast right
(
np.arange(16).reshape((4, 2, 2)),
np.arange(16, 20).reshape((1, 2, 2)),
np.array(
[[[0, 1], [2, 3]], [[16, 17], [18, 19]], [[np.nan, np.nan], [np.nan, np.nan]], [[12, 13], [14, 15]]]
),
np.array([[[0, 1], [2, 3]], [[16, 17], [18, 19]], [[12, 13], [14, 15]]]),
),
# Broadcast both
(
np.arange(4).reshape((1, 2, 2)),
np.arange(16, 20).reshape((1, 2, 2)),
np.array([[[0, 1], [2, 3]], [[16, 17], [18, 19]], [[np.nan, np.nan], [np.nan, np.nan]], [[0, 1], [2, 3]]]),
np.array([[[0, 1], [2, 3]], [[16, 17], [18, 19]], [[0, 1], [2, 3]]]),
),
],
)
Expand All @@ -467,7 +463,12 @@ def test_series_if_else_canonical_tensor_extension_type(if_true, if_false, expec
DataType.from_arrow_type(if_true_arrow.type.storage_type.value_type), (2, 2)
)
result_arrow = result.to_arrow()
np.testing.assert_equal(result_arrow.to_numpy_ndarray(), expected)

# null element conversion to numpy is not well defined in pyarrow and changes between releases
# so this is a workaround to ensure our tests pass regardless of the pyarrow version
assert not result_arrow[2].is_valid
result_array_filtered = result_arrow.filter(pa.array([True, True, False, True]))
np.testing.assert_equal(result_array_filtered.to_numpy_ndarray(), expected)


@pytest.mark.parametrize(
Expand Down
Loading