diff --git a/conftest.py b/conftest.py index 1485e41e..7f9a6721 100644 --- a/conftest.py +++ b/conftest.py @@ -56,9 +56,12 @@ def bigquery_client(project_id, private_key_path): @pytest.fixture() def random_dataset_id(bigquery_client): import google.api_core.exceptions + from google.cloud import bigquery dataset_id = "".join(["pandas_gbq_", str(uuid.uuid4()).replace("-", "_")]) - dataset_ref = bigquery_client.dataset(dataset_id) + dataset_ref = bigquery.DatasetReference( + bigquery_client.project, dataset_id + ) yield dataset_id try: bigquery_client.delete_dataset(dataset_ref, delete_contents=True) diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index f1ebae90..46570643 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -3,8 +3,8 @@ Changelog .. _changelog-0.14.0: -0.14.0 / TBD ------------- +0.14.0 / 2020-10-05 +------------------- - Add ``dtypes`` argument to ``read_gbq``. Use this argument to override the default ``dtype`` for a particular column in the query results. For @@ -22,6 +22,7 @@ Changelog Dependency updates ~~~~~~~~~~~~~~~~~~ +- Support ``google-cloud-bigquery-storage`` 2.0 and higher. (:issue:`329`) - Update the minimum version of ``pandas`` to 0.20.1. (:issue:`331`) diff --git a/pandas_gbq/exceptions.py b/pandas_gbq/exceptions.py index 96711455..dde45081 100644 --- a/pandas_gbq/exceptions.py +++ b/pandas_gbq/exceptions.py @@ -12,3 +12,12 @@ class InvalidPrivateKeyFormat(ValueError): """ pass + + +class PerformanceWarning(RuntimeWarning): + """ + Raised when a performance-related feature is requested, but unsupported. + + Such warnings can occur when dependencies for the requested feature + aren't up-to-date. + """ diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 4671227d..7ae4fcf1 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -14,14 +14,8 @@ bigquery = None google_exceptions = None -try: - # The BigQuery Storage API client is an optional dependency. It is only - # required when use_bqstorage_api=True. - from google.cloud import bigquery_storage_v1beta1 -except ImportError: # pragma: NO COVER - bigquery_storage_v1beta1 = None - from pandas_gbq.exceptions import AccessDenied +from pandas_gbq.exceptions import PerformanceWarning import pandas_gbq.schema import pandas_gbq.timestamp @@ -30,7 +24,9 @@ BIGQUERY_INSTALLED_VERSION = None BIGQUERY_CLIENT_INFO_VERSION = "1.12.0" +BIGQUERY_BQSTORAGE_VERSION = "1.24.0" HAS_CLIENT_INFO = False +HAS_BQSTORAGE_SUPPORT = False try: import tqdm # noqa @@ -39,7 +35,7 @@ def _check_google_client_version(): - global BIGQUERY_INSTALLED_VERSION, HAS_CLIENT_INFO, SHOW_VERBOSE_DEPRECATION + global BIGQUERY_INSTALLED_VERSION, HAS_CLIENT_INFO, HAS_BQSTORAGE_SUPPORT, SHOW_VERBOSE_DEPRECATION try: import pkg_resources @@ -47,11 +43,14 @@ def _check_google_client_version(): except ImportError: raise ImportError("Could not import pkg_resources (setuptools).") - # https://github.com/GoogleCloudPlatform/google-cloud-python/blob/master/bigquery/CHANGELOG.md + # https://github.com/googleapis/python-bigquery/blob/master/CHANGELOG.md bigquery_minimum_version = pkg_resources.parse_version("1.11.0") bigquery_client_info_version = pkg_resources.parse_version( BIGQUERY_CLIENT_INFO_VERSION ) + bigquery_bqstorage_version = pkg_resources.parse_version( + BIGQUERY_BQSTORAGE_VERSION + ) BIGQUERY_INSTALLED_VERSION = pkg_resources.get_distribution( "google-cloud-bigquery" ).parsed_version @@ -59,6 +58,9 @@ def _check_google_client_version(): HAS_CLIENT_INFO = ( BIGQUERY_INSTALLED_VERSION >= bigquery_client_info_version ) + HAS_BQSTORAGE_SUPPORT = ( + BIGQUERY_INSTALLED_VERSION >= bigquery_bqstorage_version + ) if BIGQUERY_INSTALLED_VERSION < bigquery_minimum_version: raise ImportError( @@ -548,14 +550,30 @@ def _download_results( if user_dtypes is None: user_dtypes = {} - try: - bqstorage_client = None - if max_results is None: - # Only use the BigQuery Storage API if the full result set is requested. - bqstorage_client = _make_bqstorage_client( - self.use_bqstorage_api, self.credentials - ) + if self.use_bqstorage_api and not HAS_BQSTORAGE_SUPPORT: + warnings.warn( + ( + "use_bqstorage_api was set, but have google-cloud-bigquery " + "version {}. Requires google-cloud-bigquery version " + "{} or later." + ).format( + BIGQUERY_INSTALLED_VERSION, BIGQUERY_BQSTORAGE_VERSION + ), + PerformanceWarning, + stacklevel=4, + ) + + create_bqstorage_client = self.use_bqstorage_api + if max_results is not None: + create_bqstorage_client = False + + to_dataframe_kwargs = {} + if HAS_BQSTORAGE_SUPPORT: + to_dataframe_kwargs[ + "create_bqstorage_client" + ] = create_bqstorage_client + try: query_job.result() # Get the table schema, so that we can list rows. destination = self.client.get_table(query_job.destination) @@ -568,16 +586,11 @@ def _download_results( conversion_dtypes.update(user_dtypes) df = rows_iter.to_dataframe( dtypes=conversion_dtypes, - bqstorage_client=bqstorage_client, progress_bar_type=progress_bar_type, + **to_dataframe_kwargs ) except self.http_error as ex: self.process_http_error(ex) - finally: - if bqstorage_client: - # Clean up open socket resources. See: - # https://github.com/pydata/pandas-gbq/issues/294 - bqstorage_client.transport.channel.close() if df.empty: df = _cast_empty_df_dtypes(schema_fields, df) @@ -763,27 +776,6 @@ def _cast_empty_df_dtypes(schema_fields, df): return df -def _make_bqstorage_client(use_bqstorage_api, credentials): - if not use_bqstorage_api: - return None - - if bigquery_storage_v1beta1 is None: - raise ImportError( - "Install the google-cloud-bigquery-storage and fastavro/pyarrow " - "packages to use the BigQuery Storage API." - ) - - import google.api_core.gapic_v1.client_info - import pandas - - client_info = google.api_core.gapic_v1.client_info.ClientInfo( - user_agent="pandas-{}".format(pandas.__version__) - ) - return bigquery_storage_v1beta1.BigQueryStorageClient( - credentials=credentials, client_info=client_info - ) - - def read_gbq( query, project_id=None, diff --git a/setup.py b/setup.py index bf2fde71..4e3d01c9 100644 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ def readme(): "pydata-google-auth", "google-auth", "google-auth-oauthlib", - "google-cloud-bigquery[bqstorage,pandas]>=1.11.1,<2.0.0dev", + "google-cloud-bigquery[bqstorage,pandas]>=1.11.1,<3.0.0dev", ] extras = {"tqdm": "tqdm>=4.23.0"} diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 2004519a..9d160b47 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -28,7 +28,9 @@ def gbq_connector(project, credentials): def random_dataset(bigquery_client, random_dataset_id): from google.cloud import bigquery - dataset_ref = bigquery_client.dataset(random_dataset_id) + dataset_ref = bigquery.DatasetReference( + bigquery_client.project, random_dataset_id + ) dataset = bigquery.Dataset(dataset_ref) bigquery_client.create_dataset(dataset) return dataset @@ -38,7 +40,9 @@ def random_dataset(bigquery_client, random_dataset_id): def tokyo_dataset(bigquery_client, random_dataset_id): from google.cloud import bigquery - dataset_ref = bigquery_client.dataset(random_dataset_id) + dataset_ref = bigquery.DatasetReference( + bigquery_client.project, random_dataset_id + ) dataset = bigquery.Dataset(dataset_ref) dataset.location = "asia-northeast1" bigquery_client.create_dataset(dataset) diff --git a/tests/system/test_read_gbq_with_bqstorage.py b/tests/system/test_read_gbq_with_bqstorage.py index 31f2e484..4dc1fb25 100644 --- a/tests/system/test_read_gbq_with_bqstorage.py +++ b/tests/system/test_read_gbq_with_bqstorage.py @@ -6,7 +6,7 @@ import pytest -pytest.importorskip("google.cloud.bigquery_storage_v1beta1") +pytest.importorskip("google.cloud.bigquery", minversion="1.24.0") @pytest.fixture diff --git a/tests/unit/test_gbq.py b/tests/unit/test_gbq.py index be965c27..1307babe 100644 --- a/tests/unit/test_gbq.py +++ b/tests/unit/test_gbq.py @@ -13,7 +13,7 @@ from pandas_gbq import gbq -pytestmark = pytest.mark.filter_warnings( +pytestmark = pytest.mark.filterwarnings( "ignore:credentials from Google Cloud SDK" ) pandas_installed_version = pkg_resources.get_distribution( @@ -490,9 +490,30 @@ def test_read_gbq_passes_dtypes( mock_list_rows = mock_bigquery_client.list_rows("dest", max_results=100) + _, to_dataframe_kwargs = mock_list_rows.to_dataframe.call_args + assert to_dataframe_kwargs["dtypes"] == {"int_col": "my-custom-dtype"} + + +def test_read_gbq_use_bqstorage_api( + mock_bigquery_client, mock_service_account_credentials +): + gbq._check_google_client_version() + if not gbq.HAS_BQSTORAGE_SUPPORT: + pytest.skip("requires BigQuery Storage API") + + mock_service_account_credentials.project_id = "service_account_project_id" + df = gbq.read_gbq( + "SELECT 1 AS int_col", + dialect="standard", + credentials=mock_service_account_credentials, + use_bqstorage_api=True, + ) + assert df is not None + + mock_list_rows = mock_bigquery_client.list_rows("dest", max_results=100) mock_list_rows.to_dataframe.assert_called_once_with( - dtypes={"int_col": "my-custom-dtype"}, - bqstorage_client=mock.ANY, + create_bqstorage_client=True, + dtypes=mock.ANY, progress_bar_type=mock.ANY, ) @@ -511,6 +532,5 @@ def test_read_gbq_calls_tqdm( mock_list_rows = mock_bigquery_client.list_rows("dest", max_results=100) - mock_list_rows.to_dataframe.assert_called_once_with( - dtypes=mock.ANY, bqstorage_client=mock.ANY, progress_bar_type="foobar" - ) + _, to_dataframe_kwargs = mock_list_rows.to_dataframe.call_args + assert to_dataframe_kwargs["progress_bar_type"] == "foobar"