diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 050672531..b0308c389 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -12,7 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Shared helper functions for connecting BigQuery and pandas.""" +"""Shared helper functions for connecting BigQuery and pandas. + +NOTE: This module is DEPRECATED. Please make updates in the pandas-gbq package, +instead. See: go/pandas-gbq-and-bigframes-redundancy and +https://github.com/googleapis/python-bigquery-pandas/blob/main/pandas_gbq/schema/pandas_to_bigquery.py +""" import concurrent.futures from datetime import datetime @@ -39,6 +44,16 @@ else: import numpy + +try: + import pandas_gbq.schema.pandas_to_bigquery # type: ignore + + pandas_gbq_import_exception = None +except ImportError as exc: + pandas_gbq = None + pandas_gbq_import_exception = exc + + try: import db_dtypes # type: ignore @@ -429,6 +444,10 @@ def _first_array_valid(series): def dataframe_to_bq_schema(dataframe, bq_schema): """Convert a pandas DataFrame schema to a BigQuery schema. + DEPRECATED: Use + pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields(), + instead. See: go/pandas-gbq-and-bigframes-redundancy. + Args: dataframe (pandas.DataFrame): DataFrame for which the client determines the BigQuery schema. @@ -444,6 +463,20 @@ def dataframe_to_bq_schema(dataframe, bq_schema): The automatically determined schema. Returns None if the type of any column cannot be determined. """ + if pandas_gbq is None: + warnings.warn( + "Loading pandas DataFrame into BigQuery will require pandas-gbq " + "package version 0.26.1 or greater in the future. " + f"Tried to import pandas-gbq and got: {pandas_gbq_import_exception}", + category=FutureWarning, + ) + else: + return pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields( + dataframe, + override_bigquery_fields=bq_schema, + index=True, + ) + if bq_schema: bq_schema = schema._to_schema_fields(bq_schema) bq_schema_index = {field.name: field for field in bq_schema} diff --git a/google/cloud/bigquery/_pyarrow_helpers.py b/google/cloud/bigquery/_pyarrow_helpers.py index 3c745a611..1b42cd5c7 100644 --- a/google/cloud/bigquery/_pyarrow_helpers.py +++ b/google/cloud/bigquery/_pyarrow_helpers.py @@ -12,7 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Shared helper functions for connecting BigQuery and pyarrow.""" +"""Shared helper functions for connecting BigQuery and pyarrow. + +NOTE: This module is DEPRECATED. Please make updates in the pandas-gbq package, +instead. See: go/pandas-gbq-and-bigframes-redundancy and +https://github.com/googleapis/python-bigquery-pandas/blob/main/pandas_gbq/schema/pyarrow_to_bigquery.py +""" from typing import Any diff --git a/noxfile.py b/noxfile.py index e08956b11..87bd9a70c 100644 --- a/noxfile.py +++ b/noxfile.py @@ -110,6 +110,14 @@ def default(session, install_extras=True): else: install_target = "." session.install("-e", install_target, "-c", constraints_path) + + # Test with some broken "extras" in case the user didn't install the extra + # directly. For example, pandas-gbq is recommended for pandas features, but + # we want to test that we fallback to the previous behavior. For context, + # see internal document go/pandas-gbq-and-bigframes-redundancy. + if session.python == UNIT_TEST_PYTHON_VERSIONS[0]: + session.run("python", "-m", "pip", "uninstall", "pandas-gbq", "-y") + session.run("python", "-m", "pip", "freeze") # Run py.test against the unit tests. @@ -228,6 +236,13 @@ def system(session): extras = "[all]" session.install("-e", f".{extras}", "-c", constraints_path) + # Test with some broken "extras" in case the user didn't install the extra + # directly. For example, pandas-gbq is recommended for pandas features, but + # we want to test that we fallback to the previous behavior. For context, + # see internal document go/pandas-gbq-and-bigframes-redundancy. + if session.python == SYSTEM_TEST_PYTHON_VERSIONS[0]: + session.run("python", "-m", "pip", "uninstall", "pandas-gbq", "-y") + # print versions of all dependencies session.run("python", "-m", "pip", "freeze") diff --git a/pyproject.toml b/pyproject.toml index ecf21d922..c4e5c2f0d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -74,6 +74,9 @@ bqstorage = [ ] pandas = [ "pandas >= 1.1.0", + "pandas-gbq >= 0.26.1; python_version >= '3.8'", + "grpcio >= 1.47.0, < 2.0dev", + "grpcio >= 1.49.1, < 2.0dev; python_version >= '3.11'", "pyarrow >= 3.0.0", "db-dtypes >= 0.3.0, < 2.0.0dev", "importlib_metadata >= 1.0.0; python_version < '3.8'", diff --git a/testing/constraints-3.8.txt b/testing/constraints-3.8.txt index e5e73c5c7..9883fb8cc 100644 --- a/testing/constraints-3.8.txt +++ b/testing/constraints-3.8.txt @@ -1,2 +1,11 @@ grpcio==1.47.0 pandas==1.2.0 + +# This constraints file is used to check that lower bounds +# are correct in setup.py +# +# Pin the version to the lower bound. +# +# e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev", +# Then this file should have foo==1.14.0 +pandas-gbq==0.26.1 diff --git a/tests/system/test_pandas.py b/tests/system/test_pandas.py index 85c7b79e6..a9e76d416 100644 --- a/tests/system/test_pandas.py +++ b/tests/system/test_pandas.py @@ -1259,7 +1259,7 @@ def test_upload_time_and_datetime_56(bigquery_client, dataset_id): df = pandas.DataFrame( dict( dt=[ - datetime.datetime(2020, 1, 8, 8, 0, 0), + datetime.datetime(2020, 1, 8, 8, 0, 0, tzinfo=datetime.timezone.utc), datetime.datetime( 2020, 1, diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index 3a5fddacc..20a25eddf 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -34,6 +34,11 @@ except ImportError: pandas = None +try: + import pandas_gbq.schema.pandas_to_bigquery +except ImportError: + pandas_gbq = None + try: import geopandas except ImportError: @@ -1280,7 +1285,21 @@ def test_dataframe_to_parquet_compression_method(module_under_test): @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -def test_dataframe_to_bq_schema_w_named_index(module_under_test): +@pytest.mark.skipif(pandas_gbq is None, reason="Requires `pandas-gbq`") +def test_dataframe_to_bq_schema_returns_schema_with_pandas_gbq( + module_under_test, monkeypatch +): + monkeypatch.setattr(module_under_test, "pandas_gbq", None) + dataframe = pandas.DataFrame({"field00": ["foo", "bar"]}) + got = module_under_test.dataframe_to_bq_schema(dataframe, []) + # Don't assert beyond this, since pandas-gbq is now source of truth. + assert got is not None + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_dataframe_to_bq_schema_w_named_index(module_under_test, monkeypatch): + monkeypatch.setattr(module_under_test, "pandas_gbq", None) + df_data = collections.OrderedDict( [ ("str_column", ["hello", "world"]), @@ -1291,7 +1310,8 @@ def test_dataframe_to_bq_schema_w_named_index(module_under_test): index = pandas.Index(["a", "b"], name="str_index") dataframe = pandas.DataFrame(df_data, index=index) - returned_schema = module_under_test.dataframe_to_bq_schema(dataframe, []) + with pytest.warns(FutureWarning, match="pandas-gbq"): + returned_schema = module_under_test.dataframe_to_bq_schema(dataframe, []) expected_schema = ( schema.SchemaField("str_index", "STRING", "NULLABLE"), @@ -1303,7 +1323,9 @@ def test_dataframe_to_bq_schema_w_named_index(module_under_test): @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -def test_dataframe_to_bq_schema_w_multiindex(module_under_test): +def test_dataframe_to_bq_schema_w_multiindex(module_under_test, monkeypatch): + monkeypatch.setattr(module_under_test, "pandas_gbq", None) + df_data = collections.OrderedDict( [ ("str_column", ["hello", "world"]), @@ -1320,7 +1342,8 @@ def test_dataframe_to_bq_schema_w_multiindex(module_under_test): ) dataframe = pandas.DataFrame(df_data, index=index) - returned_schema = module_under_test.dataframe_to_bq_schema(dataframe, []) + with pytest.warns(FutureWarning, match="pandas-gbq"): + returned_schema = module_under_test.dataframe_to_bq_schema(dataframe, []) expected_schema = ( schema.SchemaField("str_index", "STRING", "NULLABLE"), @@ -1334,7 +1357,9 @@ def test_dataframe_to_bq_schema_w_multiindex(module_under_test): @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -def test_dataframe_to_bq_schema_w_bq_schema(module_under_test): +def test_dataframe_to_bq_schema_w_bq_schema(module_under_test, monkeypatch): + monkeypatch.setattr(module_under_test, "pandas_gbq", None) + df_data = collections.OrderedDict( [ ("str_column", ["hello", "world"]), @@ -1349,7 +1374,10 @@ def test_dataframe_to_bq_schema_w_bq_schema(module_under_test): {"name": "bool_column", "type": "BOOL", "mode": "REQUIRED"}, ] - returned_schema = module_under_test.dataframe_to_bq_schema(dataframe, dict_schema) + with pytest.warns(FutureWarning, match="pandas-gbq"): + returned_schema = module_under_test.dataframe_to_bq_schema( + dataframe, dict_schema + ) expected_schema = ( schema.SchemaField("str_column", "STRING", "NULLABLE"), @@ -1360,7 +1388,11 @@ def test_dataframe_to_bq_schema_w_bq_schema(module_under_test): @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -def test_dataframe_to_bq_schema_fallback_needed_wo_pyarrow(module_under_test): +def test_dataframe_to_bq_schema_fallback_needed_wo_pyarrow( + module_under_test, monkeypatch +): + monkeypatch.setattr(module_under_test, "pandas_gbq", None) + dataframe = pandas.DataFrame( data=[ {"id": 10, "status": "FOO", "execution_date": datetime.date(2019, 5, 10)}, @@ -1388,7 +1420,11 @@ def test_dataframe_to_bq_schema_fallback_needed_wo_pyarrow(module_under_test): @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") @pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`") -def test_dataframe_to_bq_schema_fallback_needed_w_pyarrow(module_under_test): +def test_dataframe_to_bq_schema_fallback_needed_w_pyarrow( + module_under_test, monkeypatch +): + monkeypatch.setattr(module_under_test, "pandas_gbq", None) + dataframe = pandas.DataFrame( data=[ {"id": 10, "status": "FOO", "created_at": datetime.date(2019, 5, 10)}, @@ -1418,7 +1454,9 @@ def test_dataframe_to_bq_schema_fallback_needed_w_pyarrow(module_under_test): @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") @pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`") -def test_dataframe_to_bq_schema_pyarrow_fallback_fails(module_under_test): +def test_dataframe_to_bq_schema_pyarrow_fallback_fails(module_under_test, monkeypatch): + monkeypatch.setattr(module_under_test, "pandas_gbq", None) + dataframe = pandas.DataFrame( data=[ {"struct_field": {"one": 2}, "status": "FOO"}, @@ -1442,9 +1480,11 @@ def test_dataframe_to_bq_schema_pyarrow_fallback_fails(module_under_test): @pytest.mark.skipif(geopandas is None, reason="Requires `geopandas`") -def test_dataframe_to_bq_schema_geography(module_under_test): +def test_dataframe_to_bq_schema_geography(module_under_test, monkeypatch): from shapely import wkt + monkeypatch.setattr(module_under_test, "pandas_gbq", None) + df = geopandas.GeoDataFrame( pandas.DataFrame( dict( @@ -1455,7 +1495,10 @@ def test_dataframe_to_bq_schema_geography(module_under_test): ), geometry="geo1", ) - bq_schema = module_under_test.dataframe_to_bq_schema(df, []) + + with pytest.warns(FutureWarning, match="pandas-gbq"): + bq_schema = module_under_test.dataframe_to_bq_schema(df, []) + assert bq_schema == ( schema.SchemaField("name", "STRING"), schema.SchemaField("geo1", "GEOGRAPHY"), diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 462a70bbe..a5af37b6b 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -8391,8 +8391,12 @@ def test_load_table_from_dataframe_w_automatic_schema_detection_fails(self): autospec=True, side_effect=google.api_core.exceptions.NotFound("Table not found"), ) + pandas_gbq_patch = mock.patch( + "google.cloud.bigquery._pandas_helpers.pandas_gbq", + new=None, + ) - with load_patch as load_table_from_file, get_table_patch: + with load_patch as load_table_from_file, get_table_patch, pandas_gbq_patch: with warnings.catch_warnings(record=True) as warned: client.load_table_from_dataframe( dataframe, self.TABLE_REF, location=self.LOCATION @@ -8448,7 +8452,6 @@ def test_load_table_from_dataframe_w_index_and_auto_schema(self): load_patch = mock.patch( "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True ) - get_table_patch = mock.patch( "google.cloud.bigquery.client.Client.get_table", autospec=True, @@ -8460,6 +8463,7 @@ def test_load_table_from_dataframe_w_index_and_auto_schema(self): ] ), ) + with load_patch as load_table_from_file, get_table_patch: client.load_table_from_dataframe( dataframe, self.TABLE_REF, location=self.LOCATION @@ -8580,10 +8584,10 @@ def test_load_table_from_dataframe_w_nullable_int64_datatype_automatic_schema(se client = self._make_client() dataframe = pandas.DataFrame({"x": [1, 2, None, 4]}, dtype="Int64") + load_patch = mock.patch( "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True ) - get_table_patch = mock.patch( "google.cloud.bigquery.client.Client.get_table", autospec=True, @@ -8612,8 +8616,11 @@ def test_load_table_from_dataframe_w_nullable_int64_datatype_automatic_schema(se sent_config = load_table_from_file.mock_calls[0][2]["job_config"] assert sent_config.source_format == job.SourceFormat.PARQUET - assert tuple(sent_config.schema) == ( - SchemaField("x", "INT64", "NULLABLE", None), + assert ( + # Accept either the GoogleSQL or legacy SQL type name from pandas-gbq. + tuple(sent_config.schema) == (SchemaField("x", "INT64", "NULLABLE", None),) + or tuple(sent_config.schema) + == (SchemaField("x", "INTEGER", "NULLABLE", None),) ) def test_load_table_from_dataframe_struct_fields(self): @@ -8759,7 +8766,7 @@ def test_load_table_from_dataframe_array_fields_w_auto_schema(self): data=records, columns=["float_column", "array_column"] ) - expected_schema = [ + expected_schema_googlesql = [ SchemaField("float_column", "FLOAT"), SchemaField( "array_column", @@ -8767,6 +8774,14 @@ def test_load_table_from_dataframe_array_fields_w_auto_schema(self): mode="REPEATED", ), ] + expected_schema_legacy_sql = [ + SchemaField("float_column", "FLOAT"), + SchemaField( + "array_column", + "INTEGER", + mode="REPEATED", + ), + ] load_patch = mock.patch( "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True @@ -8802,7 +8817,10 @@ def test_load_table_from_dataframe_array_fields_w_auto_schema(self): sent_config = load_table_from_file.mock_calls[0][2]["job_config"] assert sent_config.source_format == job.SourceFormat.PARQUET - assert sent_config.schema == expected_schema + assert ( + sent_config.schema == expected_schema_googlesql + or sent_config.schema == expected_schema_legacy_sql + ) def test_load_table_from_dataframe_w_partial_schema(self): pandas = pytest.importorskip("pandas") @@ -8922,7 +8940,6 @@ def test_load_table_from_dataframe_w_partial_schema_extra_types(self): load_table_from_file.assert_not_called() message = str(exc_context.value) - assert "bq_schema contains fields not present in dataframe" in message assert "unknown_col" in message def test_load_table_from_dataframe_w_schema_arrow_custom_compression(self):