From bae3a1e5487baad70c1ecaa5cc11442d8b57188f Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Thu, 20 Jun 2024 17:30:02 -0700 Subject: [PATCH 01/12] [BUG] Throw more understandable error when Ray Data tensor cannot be pickled --- .github/workflows/python-package.yml | 2 +- benchmarking/parquet/benchmark-requirements.txt | 2 +- daft/dataframe/dataframe.py | 15 ++++++++++++++- requirements-dev.txt | 2 +- .../retry_server/retry-server-requirements.txt | 2 +- 5 files changed, 18 insertions(+), 5 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 894b54ef0c..cca46ce14f 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -22,7 +22,7 @@ jobs: matrix: python-version: ['3.8', '3.10'] daft-runner: [py, ray] - pyarrow-version: [7.0.0, 12.0.1] + pyarrow-version: [7.0.0, 13.0.0] enable-aqe: [0, 1] os: [ubuntu-20.04, windows-latest] exclude: diff --git a/benchmarking/parquet/benchmark-requirements.txt b/benchmarking/parquet/benchmark-requirements.txt index 248520aae3..a308fcd5f5 100644 --- a/benchmarking/parquet/benchmark-requirements.txt +++ b/benchmarking/parquet/benchmark-requirements.txt @@ -1,5 +1,5 @@ pytest==7.4.0 pytest-benchmark==4.0.0 pytest-memray==1.4.1 -pyarrow==12.0.1 +pyarrow==13.0.0 boto3==1.28.3 diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 824e2ea302..f355aae864 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -1901,6 +1901,8 @@ def to_ray_dataset(self) -> "ray.data.dataset.DataSet": @classmethod def _from_ray_dataset(cls, ds: "ray.data.dataset.DataSet") -> "DataFrame": """Creates a DataFrame from a `Ray Dataset `__.""" + from ray.exceptions import RayTaskError + context = get_context() if context.runner_config.name != "ray": raise ValueError("Daft needs to be running on the Ray Runner for this operation") @@ -1912,7 +1914,18 @@ def _from_ray_dataset(cls, ds: "ray.data.dataset.DataSet") -> "DataFrame": partition_set, schema = ray_runner_io.partition_set_from_ray_dataset(ds) cache_entry = context.runner().put_partition_set_into_cache(partition_set) - size_bytes = partition_set.size_bytes() + try: + size_bytes = partition_set.size_bytes() + except RayTaskError as e: + import pyarrow as pa + from packaging.version import parse + + if "extension" in str(e) and parse(pa.__version__) < parse("13.0.0"): + raise ValueError( + f"Reading Ray Dataset tensors is only supported with PyArrow >= 13.0.0, found {pa.__version__}. See this issue for more information: https://github.com/apache/arrow/pull/35933" + ) + raise e + num_rows = len(partition_set) assert size_bytes is not None, "In-memory data should always have non-None size in bytes" builder = LogicalPlanBuilder.from_in_memory_scan( diff --git a/requirements-dev.txt b/requirements-dev.txt index 29b0ef07d3..623fdefd08 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -34,7 +34,7 @@ Pillow==9.5.0 opencv-python==4.8.1.78 # Pyarrow -pyarrow==12.0.1 +pyarrow==13.0.0 # Ray ray[data, client]==2.7.1; python_version < '3.8' ray[data, client]==2.10.0; python_version >= '3.8' diff --git a/tests/integration/io/docker-compose/retry_server/retry-server-requirements.txt b/tests/integration/io/docker-compose/retry_server/retry-server-requirements.txt index d3d0697331..fc550f4eb1 100644 --- a/tests/integration/io/docker-compose/retry_server/retry-server-requirements.txt +++ b/tests/integration/io/docker-compose/retry_server/retry-server-requirements.txt @@ -17,7 +17,7 @@ uvicorn==0.23.2 uvloop==0.17.0 watchfiles==0.19.0 websockets==11.0.3 -pyarrow==12.0.1 +pyarrow==13.0.0 slowapi==0.1.8 # Pin numpy version otherwise pyarrow doesn't work From 281da7ce46d6b9e4b7a4f2fea9ed86e66c936595 Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Fri, 21 Jun 2024 12:14:49 -0700 Subject: [PATCH 02/12] preserve original stack trace --- daft/dataframe/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index f355aae864..c0fb27af9b 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -1923,7 +1923,7 @@ def _from_ray_dataset(cls, ds: "ray.data.dataset.DataSet") -> "DataFrame": if "extension" in str(e) and parse(pa.__version__) < parse("13.0.0"): raise ValueError( f"Reading Ray Dataset tensors is only supported with PyArrow >= 13.0.0, found {pa.__version__}. See this issue for more information: https://github.com/apache/arrow/pull/35933" - ) + ) from e raise e num_rows = len(partition_set) From 355144079f4edcbfc15d47789b8f0fec15976740 Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Fri, 21 Jun 2024 12:15:35 -0700 Subject: [PATCH 03/12] use compliant nested type names --- src/daft-core/src/datatypes/dtype.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/daft-core/src/datatypes/dtype.rs b/src/daft-core/src/datatypes/dtype.rs index c5094fee57..fc530cf4cb 100644 --- a/src/daft-core/src/datatypes/dtype.rs +++ b/src/daft-core/src/datatypes/dtype.rs @@ -152,18 +152,18 @@ impl DataType { DataType::Utf8 => Ok(ArrowType::LargeUtf8), DataType::FixedSizeList(child_dtype, size) => Ok(ArrowType::FixedSizeList( Box::new(arrow2::datatypes::Field::new( - "item", + "element", child_dtype.to_arrow()?, true, )), *size, )), DataType::List(field) => Ok(ArrowType::LargeList(Box::new( - arrow2::datatypes::Field::new("item", field.to_arrow()?, true), + arrow2::datatypes::Field::new("element", field.to_arrow()?, true), ))), DataType::Map(field) => Ok(ArrowType::Map( Box::new(arrow2::datatypes::Field::new( - "item", + "element", field.to_arrow()?, true, )), From 30d2a33a765f5dfcbda8d3439475f8fd634598fe Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Fri, 21 Jun 2024 12:39:36 -0700 Subject: [PATCH 04/12] disable use compliant nested type --- daft/table/table_io.py | 10 ++++++---- src/daft-core/src/datatypes/dtype.rs | 6 +++--- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/daft/table/table_io.py b/daft/table/table_io.py index b8954d81cc..d47e606264 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -453,7 +453,7 @@ def write_tabular( format = pads.ParquetFileFormat() inflation_factor = execution_config.parquet_inflation_factor target_file_size = execution_config.parquet_target_filesize - opts = format.make_write_options(compression=compression) + opts = format.make_write_options(compression=compression, use_compliant_nested_type=False) elif file_format == FileFormat.Csv: format = pads.CsvFileFormat() opts = None @@ -633,12 +633,14 @@ def file_visitor(written_file, protocol=protocol): format = pads.ParquetFileFormat() + opts = format.make_write_options(compression="zstd", use_compliant_nested_type=False) + _write_tabular_arrow_table( arrow_table=arrow_table, schema=file_schema, full_path=resolved_path, format=format, - opts=format.make_write_options(compression="zstd"), + opts=opts, fs=fs, rows_per_file=rows_per_file, rows_per_row_group=rows_per_row_group, @@ -735,10 +737,10 @@ def file_visitor(written_file: Any) -> None: target_row_groups = max(math.ceil(size_bytes / target_row_group_size / inflation_factor), 1) rows_per_row_group = max(min(math.ceil(num_rows / target_row_groups), rows_per_file), 1) - opts = pads.ParquetFileFormat().make_write_options(use_compliant_nested_type=False) - format = pads.ParquetFileFormat() + opts = format.make_write_options(use_compliant_nested_type=False) + _write_tabular_arrow_table( arrow_table=arrow_batch, schema=None, diff --git a/src/daft-core/src/datatypes/dtype.rs b/src/daft-core/src/datatypes/dtype.rs index fc530cf4cb..c5094fee57 100644 --- a/src/daft-core/src/datatypes/dtype.rs +++ b/src/daft-core/src/datatypes/dtype.rs @@ -152,18 +152,18 @@ impl DataType { DataType::Utf8 => Ok(ArrowType::LargeUtf8), DataType::FixedSizeList(child_dtype, size) => Ok(ArrowType::FixedSizeList( Box::new(arrow2::datatypes::Field::new( - "element", + "item", child_dtype.to_arrow()?, true, )), *size, )), DataType::List(field) => Ok(ArrowType::LargeList(Box::new( - arrow2::datatypes::Field::new("element", field.to_arrow()?, true), + arrow2::datatypes::Field::new("item", field.to_arrow()?, true), ))), DataType::Map(field) => Ok(ArrowType::Map( Box::new(arrow2::datatypes::Field::new( - "element", + "item", field.to_arrow()?, true, )), From f4b739682c02b888545c894e3c2f80f4e726e83b Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Fri, 21 Jun 2024 13:49:09 -0700 Subject: [PATCH 05/12] fix test --- tests/series/test_if_else.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/tests/series/test_if_else.py b/tests/series/test_if_else.py index 090d232538..896afde4a6 100644 --- a/tests/series/test_if_else.py +++ b/tests/series/test_if_else.py @@ -428,29 +428,25 @@ def test_series_if_else_extension_type(uuid_ext_type, if_true_storage, if_false_ ( np.arange(16).reshape((4, 2, 2)), np.arange(16, 32).reshape((4, 2, 2)), - np.array( - [[[0, 1], [2, 3]], [[20, 21], [22, 23]], [[np.nan, np.nan], [np.nan, np.nan]], [[12, 13], [14, 15]]] - ), + np.array([[[0, 1], [2, 3]], [[20, 21], [22, 23]], [[12, 13], [14, 15]]]), ), # Broadcast left ( np.arange(4).reshape((1, 2, 2)), np.arange(16, 32).reshape((4, 2, 2)), - np.array([[[0, 1], [2, 3]], [[20, 21], [22, 23]], [[np.nan, np.nan], [np.nan, np.nan]], [[0, 1], [2, 3]]]), + np.array([[[0, 1], [2, 3]], [[20, 21], [22, 23]], [[0, 1], [2, 3]]]), ), # Broadcast right ( np.arange(16).reshape((4, 2, 2)), np.arange(16, 20).reshape((1, 2, 2)), - np.array( - [[[0, 1], [2, 3]], [[16, 17], [18, 19]], [[np.nan, np.nan], [np.nan, np.nan]], [[12, 13], [14, 15]]] - ), + np.array([[[0, 1], [2, 3]], [[16, 17], [18, 19]], [[12, 13], [14, 15]]]), ), # Broadcast both ( np.arange(4).reshape((1, 2, 2)), np.arange(16, 20).reshape((1, 2, 2)), - np.array([[[0, 1], [2, 3]], [[16, 17], [18, 19]], [[np.nan, np.nan], [np.nan, np.nan]], [[0, 1], [2, 3]]]), + np.array([[[0, 1], [2, 3]], [[16, 17], [18, 19]], [[0, 1], [2, 3]]]), ), ], ) @@ -467,7 +463,12 @@ def test_series_if_else_canonical_tensor_extension_type(if_true, if_false, expec DataType.from_arrow_type(if_true_arrow.type.storage_type.value_type), (2, 2) ) result_arrow = result.to_arrow() - np.testing.assert_equal(result_arrow.to_numpy_ndarray(), expected) + + # null element conversion to numpy is not well defined in pyarrow and changes between releases + # so this is a workaround to ensure our tests pass regardless of the pyarrow version + assert not result_arrow[2].is_valid + result_array_filtered = result_arrow.filter(pa.array([True, True, False, True])) + np.testing.assert_equal(result_array_filtered.to_numpy_ndarray(), expected) @pytest.mark.parametrize( From d0680a16824031c67014a559912ba08c8bfd2374 Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Fri, 21 Jun 2024 14:47:41 -0700 Subject: [PATCH 06/12] convert datetimes to ns --- tests/conftest.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index 1efafe82cb..405c8f0939 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,6 +5,7 @@ import pandas as pd import pyarrow as pa import pytest +from pandas.api.types import is_datetime64_any_dtype import daft from daft.table import MicroPartition @@ -176,6 +177,11 @@ def assert_df_equals( df_series = daft_pd_df[col] pd_series = pd_df[col] + # pyarrow 13.0.0 no longer coerces timestamp units to ns which causes assert_series_equal to fail + # so we need to manually convert the timestamp columns to ns + if is_datetime64_any_dtype(df_series): + df_series = pd.to_datetime(df_series, unit="ns") + try: pd.testing.assert_series_equal(df_series, pd_series, check_dtype=check_dtype) except AssertionError: From d196883cf295144b7fab628027f4b333bd585f84 Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Fri, 21 Jun 2024 15:04:34 -0700 Subject: [PATCH 07/12] fix pandas datetime checks --- tests/conftest.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 405c8f0939..16735e0d32 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -179,8 +179,10 @@ def assert_df_equals( # pyarrow 13.0.0 no longer coerces timestamp units to ns which causes assert_series_equal to fail # so we need to manually convert the timestamp columns to ns - if is_datetime64_any_dtype(df_series): + if is_datetime64_any_dtype(df_series.dtype): df_series = pd.to_datetime(df_series, unit="ns") + if is_datetime64_any_dtype(pd_series.dtype): + pd_series = pd.to_datetime(pd_series, unit="ns") try: pd.testing.assert_series_equal(df_series, pd_series, check_dtype=check_dtype) From 7f9e7706913c9231b74e802d3d1954db3862dba3 Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Fri, 21 Jun 2024 15:17:14 -0700 Subject: [PATCH 08/12] add print --- tests/conftest.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index 16735e0d32..9735906ef1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -180,8 +180,10 @@ def assert_df_equals( # pyarrow 13.0.0 no longer coerces timestamp units to ns which causes assert_series_equal to fail # so we need to manually convert the timestamp columns to ns if is_datetime64_any_dtype(df_series.dtype): + print(f"Converting {col} to datetime64[ns]") df_series = pd.to_datetime(df_series, unit="ns") if is_datetime64_any_dtype(pd_series.dtype): + print(f"Converting {col} to datetime64[ns]") pd_series = pd.to_datetime(pd_series, unit="ns") try: From 7e88028c373dcac99c0c36bdd5dd98a4975ad8a4 Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Fri, 21 Jun 2024 15:38:09 -0700 Subject: [PATCH 09/12] use astype instead of to_datetime --- tests/conftest.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 9735906ef1..5082f0134b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -180,11 +180,9 @@ def assert_df_equals( # pyarrow 13.0.0 no longer coerces timestamp units to ns which causes assert_series_equal to fail # so we need to manually convert the timestamp columns to ns if is_datetime64_any_dtype(df_series.dtype): - print(f"Converting {col} to datetime64[ns]") - df_series = pd.to_datetime(df_series, unit="ns") + df_series = df_series.astype("datetime64[ns]") if is_datetime64_any_dtype(pd_series.dtype): - print(f"Converting {col} to datetime64[ns]") - pd_series = pd.to_datetime(pd_series, unit="ns") + pd_series = pd_series.astype("datetime64[ns]") try: pd.testing.assert_series_equal(df_series, pd_series, check_dtype=check_dtype) From 91d6d61f630d274d76d3f2da50443af25ec1c44d Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Fri, 21 Jun 2024 16:07:05 -0700 Subject: [PATCH 10/12] make datetime check looser --- tests/conftest.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 5082f0134b..8fba919a8b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,7 +5,6 @@ import pandas as pd import pyarrow as pa import pytest -from pandas.api.types import is_datetime64_any_dtype import daft from daft.table import MicroPartition @@ -177,15 +176,10 @@ def assert_df_equals( df_series = daft_pd_df[col] pd_series = pd_df[col] - # pyarrow 13.0.0 no longer coerces timestamp units to ns which causes assert_series_equal to fail - # so we need to manually convert the timestamp columns to ns - if is_datetime64_any_dtype(df_series.dtype): - df_series = df_series.astype("datetime64[ns]") - if is_datetime64_any_dtype(pd_series.dtype): - pd_series = pd_series.astype("datetime64[ns]") - try: - pd.testing.assert_series_equal(df_series, pd_series, check_dtype=check_dtype) + pd.testing.assert_series_equal( + df_series, pd_series, check_dtype=check_dtype, check_datetimelike_compat=True + ) except AssertionError: print(f"Failed assertion for col: {col}") raise From ab4d7e2d714b9320ac7048e32fb5c64bdce55658 Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Fri, 21 Jun 2024 16:30:49 -0700 Subject: [PATCH 11/12] add to_pandas coerce_temporal_nanoseconds --- daft/dataframe/dataframe.py | 9 ++++++++- daft/runners/partitioning.py | 2 ++ daft/table/micropartition.py | 11 +++++++++-- daft/table/table.py | 21 ++++++++++++++++++--- tests/conftest.py | 4 +--- 5 files changed, 38 insertions(+), 9 deletions(-) diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index c0fb27af9b..40ddff8f80 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -1778,10 +1778,16 @@ def __contains__(self, col_name: str) -> bool: return col_name in self.column_names @DataframePublicAPI - def to_pandas(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> "pandas.DataFrame": + def to_pandas( + self, cast_tensors_to_ray_tensor_dtype: bool = False, coerce_temporal_nanoseconds: bool = True + ) -> "pandas.DataFrame": """Converts the current DataFrame to a `pandas DataFrame `__. If results have not computed yet, collect will be called. + Args: + cast_tensors_to_ray_tensor_dtype (bool): Whether to cast tensors to Ray tensor dtype. Defaults to False. + coerce_temporal_nanoseconds (bool): Whether to coerce temporal columns to nanoseconds. Only applicable to pandas version >= 2.0 and pyarrow version >= 13.0.0. Defaults to True. See `pyarrow.Table.to_pandas `__ for more information. + Returns: pandas.DataFrame: `pandas DataFrame `__ converted from a Daft DataFrame @@ -1795,6 +1801,7 @@ def to_pandas(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> "pandas.D pd_df = result.to_pandas( schema=self._builder.schema(), cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype, + coerce_temporal_nanoseconds=coerce_temporal_nanoseconds, ) return pd_df diff --git a/daft/runners/partitioning.py b/daft/runners/partitioning.py index 7bcca40665..2981ab7529 100644 --- a/daft/runners/partitioning.py +++ b/daft/runners/partitioning.py @@ -217,11 +217,13 @@ def to_pandas( self, schema: Schema | None = None, cast_tensors_to_ray_tensor_dtype: bool = False, + coerce_temporal_nanoseconds: bool = True, ) -> pd.DataFrame: merged_partition = self._get_merged_vpartition() return merged_partition.to_pandas( schema=schema, cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype, + coerce_temporal_nanoseconds=coerce_temporal_nanoseconds, ) def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> pa.Table: diff --git a/daft/table/micropartition.py b/daft/table/micropartition.py index c00ca23062..932b827753 100644 --- a/daft/table/micropartition.py +++ b/daft/table/micropartition.py @@ -150,9 +150,16 @@ def to_pydict(self) -> dict[str, list]: def to_pylist(self) -> list[dict[str, Any]]: return self.to_table().to_pylist() - def to_pandas(self, schema: Schema | None = None, cast_tensors_to_ray_tensor_dtype: bool = False) -> pd.DataFrame: + def to_pandas( + self, + schema: Schema | None = None, + cast_tensors_to_ray_tensor_dtype: bool = False, + coerce_temporal_nanoseconds: bool = True, + ) -> pd.DataFrame: return self.to_table().to_pandas( - schema=schema, cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype + schema=schema, + cast_tensors_to_ray_tensor_dtype=cast_tensors_to_ray_tensor_dtype, + coerce_temporal_nanoseconds=coerce_temporal_nanoseconds, ) ### diff --git a/daft/table/table.py b/daft/table/table.py index 2166ddcc8b..9f2773b5e6 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -217,7 +217,14 @@ def to_pylist(self) -> list[dict[str, Any]]: column_names = self.column_names() return [{colname: table[colname][i] for colname in column_names} for i in range(len(self))] - def to_pandas(self, schema: Schema | None = None, cast_tensors_to_ray_tensor_dtype: bool = False) -> pd.DataFrame: + def to_pandas( + self, + schema: Schema | None = None, + cast_tensors_to_ray_tensor_dtype: bool = False, + coerce_temporal_nanoseconds: bool = True, + ) -> pd.DataFrame: + from packaging.version import parse + if not _PANDAS_AVAILABLE: raise ImportError("Unable to import Pandas - please ensure that it is installed.") python_fields = set() @@ -236,12 +243,20 @@ def to_pandas(self, schema: Schema | None = None, cast_tensors_to_ray_tensor_dty column = column_series.to_pylist() else: # Arrow-native field, so provide column as Arrow array. - column = column_series.to_arrow(cast_tensors_to_ray_tensor_dtype).to_pandas() + column_arrow = column_series.to_arrow(cast_tensors_to_ray_tensor_dtype) + if parse(pa.__version__) < parse("13.0.0"): + column = column_arrow.to_pandas() + else: + column = column_arrow.to_pandas(coerce_temporal_nanoseconds=coerce_temporal_nanoseconds) table[colname] = column return pd.DataFrame.from_dict(table) else: - return self.to_arrow(cast_tensors_to_ray_tensor_dtype).to_pandas() + arrow_table = self.to_arrow(cast_tensors_to_ray_tensor_dtype) + if parse(pa.__version__) < parse("13.0.0"): + return arrow_table.to_pandas() + else: + return arrow_table.to_pandas(coerce_temporal_nanoseconds=coerce_temporal_nanoseconds) ### # Compute methods (Table -> Table) diff --git a/tests/conftest.py b/tests/conftest.py index 8fba919a8b..1efafe82cb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -177,9 +177,7 @@ def assert_df_equals( pd_series = pd_df[col] try: - pd.testing.assert_series_equal( - df_series, pd_series, check_dtype=check_dtype, check_datetimelike_compat=True - ) + pd.testing.assert_series_equal(df_series, pd_series, check_dtype=check_dtype) except AssertionError: print(f"Failed assertion for col: {col}") raise From 27b7b08747f4c0f9add73744be613da19f0d86f8 Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Fri, 21 Jun 2024 16:52:27 -0700 Subject: [PATCH 12/12] set coerce_temporal_nanoseconds to False by default --- daft/dataframe/dataframe.py | 4 ++-- daft/runners/partitioning.py | 2 +- daft/table/micropartition.py | 2 +- daft/table/table.py | 2 +- tests/integration/sql/test_sql.py | 20 ++++++++++---------- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 40ddff8f80..b366aba27f 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -1779,14 +1779,14 @@ def __contains__(self, col_name: str) -> bool: @DataframePublicAPI def to_pandas( - self, cast_tensors_to_ray_tensor_dtype: bool = False, coerce_temporal_nanoseconds: bool = True + self, cast_tensors_to_ray_tensor_dtype: bool = False, coerce_temporal_nanoseconds: bool = False ) -> "pandas.DataFrame": """Converts the current DataFrame to a `pandas DataFrame `__. If results have not computed yet, collect will be called. Args: cast_tensors_to_ray_tensor_dtype (bool): Whether to cast tensors to Ray tensor dtype. Defaults to False. - coerce_temporal_nanoseconds (bool): Whether to coerce temporal columns to nanoseconds. Only applicable to pandas version >= 2.0 and pyarrow version >= 13.0.0. Defaults to True. See `pyarrow.Table.to_pandas `__ for more information. + coerce_temporal_nanoseconds (bool): Whether to coerce temporal columns to nanoseconds. Only applicable to pandas version >= 2.0 and pyarrow version >= 13.0.0. Defaults to False. See `pyarrow.Table.to_pandas `__ for more information. Returns: pandas.DataFrame: `pandas DataFrame `__ converted from a Daft DataFrame diff --git a/daft/runners/partitioning.py b/daft/runners/partitioning.py index 2981ab7529..ff395594ff 100644 --- a/daft/runners/partitioning.py +++ b/daft/runners/partitioning.py @@ -217,7 +217,7 @@ def to_pandas( self, schema: Schema | None = None, cast_tensors_to_ray_tensor_dtype: bool = False, - coerce_temporal_nanoseconds: bool = True, + coerce_temporal_nanoseconds: bool = False, ) -> pd.DataFrame: merged_partition = self._get_merged_vpartition() return merged_partition.to_pandas( diff --git a/daft/table/micropartition.py b/daft/table/micropartition.py index 932b827753..66d1f8dd99 100644 --- a/daft/table/micropartition.py +++ b/daft/table/micropartition.py @@ -154,7 +154,7 @@ def to_pandas( self, schema: Schema | None = None, cast_tensors_to_ray_tensor_dtype: bool = False, - coerce_temporal_nanoseconds: bool = True, + coerce_temporal_nanoseconds: bool = False, ) -> pd.DataFrame: return self.to_table().to_pandas( schema=schema, diff --git a/daft/table/table.py b/daft/table/table.py index 9f2773b5e6..23854735c8 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -221,7 +221,7 @@ def to_pandas( self, schema: Schema | None = None, cast_tensors_to_ray_tensor_dtype: bool = False, - coerce_temporal_nanoseconds: bool = True, + coerce_temporal_nanoseconds: bool = False, ) -> pd.DataFrame: from packaging.version import parse diff --git a/tests/integration/sql/test_sql.py b/tests/integration/sql/test_sql.py index ed3821208d..3ca30ce6b2 100644 --- a/tests/integration/sql/test_sql.py +++ b/tests/integration/sql/test_sql.py @@ -28,7 +28,7 @@ def test_sql_show(test_db) -> None: def test_sql_create_dataframe_ok(test_db, pdf) -> None: df = daft.read_sql(f"SELECT * FROM {TEST_TABLE_NAME}", test_db) - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration() @@ -40,7 +40,7 @@ def test_sql_partitioned_read(test_db, num_partitions, pdf) -> None: df = daft.read_sql(f"SELECT * FROM {TEST_TABLE_NAME}", test_db, partition_col="id") assert df.num_partitions() == num_partitions - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration() @@ -53,7 +53,7 @@ def test_sql_partitioned_read_with_custom_num_partitions_and_partition_col( f"SELECT * FROM {TEST_TABLE_NAME}", test_db, partition_col=partition_col, num_partitions=num_partitions ) assert df.num_partitions() == num_partitions - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration() @@ -66,7 +66,7 @@ def test_sql_partitioned_read_with_non_uniformly_distributed_column(test_db, num num_partitions=num_partitions, ) assert df.num_partitions() == num_partitions - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration() @@ -122,7 +122,7 @@ def test_sql_read_with_binary_filter_pushdowns(test_db, column, operator, value, df = df.where(df[column] <= value) pdf = pdf[pdf[column] <= value] - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration() @@ -133,7 +133,7 @@ def test_sql_read_with_is_null_filter_pushdowns(test_db, num_partitions, pdf) -> pdf = pdf[pdf["null_col"].isnull()] - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration() @@ -144,7 +144,7 @@ def test_sql_read_with_not_null_filter_pushdowns(test_db, num_partitions, pdf) - pdf = pdf[pdf["null_col"].notnull()] - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration() @@ -155,7 +155,7 @@ def test_sql_read_with_if_else_filter_pushdown(test_db, num_partitions, pdf) -> pdf = pdf[(pdf["id"] > 100) & (pdf["float_col"] > 150) | (pdf["float_col"] < 50)] - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration() @@ -165,7 +165,7 @@ def test_sql_read_with_is_in_filter_pushdown(test_db, num_partitions, pdf) -> No df = df.where(df["id"].is_in([1, 2, 3])) pdf = pdf[pdf["id"].isin([1, 2, 3])] - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration() @@ -220,7 +220,7 @@ def create_conn(): return sqlalchemy.create_engine(test_db).connect() df = daft.read_sql(f"SELECT * FROM {TEST_TABLE_NAME}", create_conn) - assert_df_equals(df.to_pandas(), pdf, sort_key="id") + assert_df_equals(df.to_pandas(coerce_temporal_nanoseconds=True), pdf, sort_key="id") @pytest.mark.integration()