Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: update library to support google-cloud-bigquery 2.0 #334

Merged
merged 3 commits into from
Oct 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ def bigquery_client(project_id, private_key_path):
@pytest.fixture()
def random_dataset_id(bigquery_client):
import google.api_core.exceptions
from google.cloud import bigquery

dataset_id = "".join(["pandas_gbq_", str(uuid.uuid4()).replace("-", "_")])
dataset_ref = bigquery_client.dataset(dataset_id)
dataset_ref = bigquery.DatasetReference(
bigquery_client.project, dataset_id
)
yield dataset_id
try:
bigquery_client.delete_dataset(dataset_ref, delete_contents=True)
Expand Down
5 changes: 3 additions & 2 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ Changelog

.. _changelog-0.14.0:

0.14.0 / TBD
------------
0.14.0 / 2020-10-05
-------------------

- Add ``dtypes`` argument to ``read_gbq``. Use this argument to override the
default ``dtype`` for a particular column in the query results. For
Expand All @@ -22,6 +22,7 @@ Changelog
Dependency updates
~~~~~~~~~~~~~~~~~~

- Support ``google-cloud-bigquery-storage`` 2.0 and higher. (:issue:`329`)
- Update the minimum version of ``pandas`` to 0.20.1.
(:issue:`331`)

Expand Down
9 changes: 9 additions & 0 deletions pandas_gbq/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,12 @@ class InvalidPrivateKeyFormat(ValueError):
"""

pass


class PerformanceWarning(RuntimeWarning):
"""
Raised when a performance-related feature is requested, but unsupported.

Such warnings can occur when dependencies for the requested feature
aren't up-to-date.
"""
78 changes: 35 additions & 43 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,8 @@
bigquery = None
google_exceptions = None

try:
# The BigQuery Storage API client is an optional dependency. It is only
# required when use_bqstorage_api=True.
from google.cloud import bigquery_storage_v1beta1
except ImportError: # pragma: NO COVER
bigquery_storage_v1beta1 = None

from pandas_gbq.exceptions import AccessDenied
from pandas_gbq.exceptions import PerformanceWarning
import pandas_gbq.schema
import pandas_gbq.timestamp

Expand All @@ -30,7 +24,9 @@

BIGQUERY_INSTALLED_VERSION = None
BIGQUERY_CLIENT_INFO_VERSION = "1.12.0"
BIGQUERY_BQSTORAGE_VERSION = "1.24.0"
HAS_CLIENT_INFO = False
HAS_BQSTORAGE_SUPPORT = False

try:
import tqdm # noqa
Expand All @@ -39,26 +35,32 @@


def _check_google_client_version():
global BIGQUERY_INSTALLED_VERSION, HAS_CLIENT_INFO, SHOW_VERBOSE_DEPRECATION
global BIGQUERY_INSTALLED_VERSION, HAS_CLIENT_INFO, HAS_BQSTORAGE_SUPPORT, SHOW_VERBOSE_DEPRECATION

try:
import pkg_resources

except ImportError:
raise ImportError("Could not import pkg_resources (setuptools).")

# https://github.com/GoogleCloudPlatform/google-cloud-python/blob/master/bigquery/CHANGELOG.md
# https://github.com/googleapis/python-bigquery/blob/master/CHANGELOG.md
bigquery_minimum_version = pkg_resources.parse_version("1.11.0")
bigquery_client_info_version = pkg_resources.parse_version(
BIGQUERY_CLIENT_INFO_VERSION
)
bigquery_bqstorage_version = pkg_resources.parse_version(
BIGQUERY_BQSTORAGE_VERSION
)
BIGQUERY_INSTALLED_VERSION = pkg_resources.get_distribution(
"google-cloud-bigquery"
).parsed_version

HAS_CLIENT_INFO = (
BIGQUERY_INSTALLED_VERSION >= bigquery_client_info_version
)
HAS_BQSTORAGE_SUPPORT = (
BIGQUERY_INSTALLED_VERSION >= bigquery_bqstorage_version
)

if BIGQUERY_INSTALLED_VERSION < bigquery_minimum_version:
raise ImportError(
Expand Down Expand Up @@ -548,14 +550,30 @@ def _download_results(
if user_dtypes is None:
user_dtypes = {}

try:
bqstorage_client = None
if max_results is None:
# Only use the BigQuery Storage API if the full result set is requested.
bqstorage_client = _make_bqstorage_client(
self.use_bqstorage_api, self.credentials
)
if self.use_bqstorage_api and not HAS_BQSTORAGE_SUPPORT:
warnings.warn(
(
"use_bqstorage_api was set, but have google-cloud-bigquery "
"version {}. Requires google-cloud-bigquery version "
"{} or later."
).format(
BIGQUERY_INSTALLED_VERSION, BIGQUERY_BQSTORAGE_VERSION
),
PerformanceWarning,
stacklevel=4,
)

create_bqstorage_client = self.use_bqstorage_api
if max_results is not None:
create_bqstorage_client = False

to_dataframe_kwargs = {}
if HAS_BQSTORAGE_SUPPORT:
to_dataframe_kwargs[
"create_bqstorage_client"
] = create_bqstorage_client

try:
query_job.result()
# Get the table schema, so that we can list rows.
destination = self.client.get_table(query_job.destination)
Expand All @@ -568,16 +586,11 @@ def _download_results(
conversion_dtypes.update(user_dtypes)
df = rows_iter.to_dataframe(
dtypes=conversion_dtypes,
bqstorage_client=bqstorage_client,
progress_bar_type=progress_bar_type,
**to_dataframe_kwargs
)
except self.http_error as ex:
self.process_http_error(ex)
finally:
if bqstorage_client:
# Clean up open socket resources. See:
# https://github.com/pydata/pandas-gbq/issues/294
bqstorage_client.transport.channel.close()

if df.empty:
df = _cast_empty_df_dtypes(schema_fields, df)
Expand Down Expand Up @@ -763,27 +776,6 @@ def _cast_empty_df_dtypes(schema_fields, df):
return df


def _make_bqstorage_client(use_bqstorage_api, credentials):
if not use_bqstorage_api:
return None

if bigquery_storage_v1beta1 is None:
raise ImportError(
"Install the google-cloud-bigquery-storage and fastavro/pyarrow "
"packages to use the BigQuery Storage API."
)

import google.api_core.gapic_v1.client_info
import pandas

client_info = google.api_core.gapic_v1.client_info.ClientInfo(
user_agent="pandas-{}".format(pandas.__version__)
)
return bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=credentials, client_info=client_info
)


def read_gbq(
query,
project_id=None,
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def readme():
"pydata-google-auth",
"google-auth",
"google-auth-oauthlib",
"google-cloud-bigquery[bqstorage,pandas]>=1.11.1,<2.0.0dev",
"google-cloud-bigquery[bqstorage,pandas]>=1.11.1,<3.0.0dev",
]

extras = {"tqdm": "tqdm>=4.23.0"}
Expand Down
8 changes: 6 additions & 2 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ def gbq_connector(project, credentials):
def random_dataset(bigquery_client, random_dataset_id):
from google.cloud import bigquery

dataset_ref = bigquery_client.dataset(random_dataset_id)
dataset_ref = bigquery.DatasetReference(
bigquery_client.project, random_dataset_id
)
dataset = bigquery.Dataset(dataset_ref)
bigquery_client.create_dataset(dataset)
return dataset
Expand All @@ -38,7 +40,9 @@ def random_dataset(bigquery_client, random_dataset_id):
def tokyo_dataset(bigquery_client, random_dataset_id):
from google.cloud import bigquery

dataset_ref = bigquery_client.dataset(random_dataset_id)
dataset_ref = bigquery.DatasetReference(
bigquery_client.project, random_dataset_id
)
dataset = bigquery.Dataset(dataset_ref)
dataset.location = "asia-northeast1"
bigquery_client.create_dataset(dataset)
Expand Down
2 changes: 1 addition & 1 deletion tests/system/test_read_gbq_with_bqstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest


pytest.importorskip("google.cloud.bigquery_storage_v1beta1")
pytest.importorskip("google.cloud.bigquery", minversion="1.24.0")


@pytest.fixture
Expand Down
32 changes: 26 additions & 6 deletions tests/unit/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pandas_gbq import gbq


pytestmark = pytest.mark.filter_warnings(
pytestmark = pytest.mark.filterwarnings(
"ignore:credentials from Google Cloud SDK"
)
pandas_installed_version = pkg_resources.get_distribution(
Expand Down Expand Up @@ -490,9 +490,30 @@ def test_read_gbq_passes_dtypes(

mock_list_rows = mock_bigquery_client.list_rows("dest", max_results=100)

_, to_dataframe_kwargs = mock_list_rows.to_dataframe.call_args
assert to_dataframe_kwargs["dtypes"] == {"int_col": "my-custom-dtype"}


def test_read_gbq_use_bqstorage_api(
mock_bigquery_client, mock_service_account_credentials
):
gbq._check_google_client_version()
if not gbq.HAS_BQSTORAGE_SUPPORT:
pytest.skip("requires BigQuery Storage API")

mock_service_account_credentials.project_id = "service_account_project_id"
df = gbq.read_gbq(
"SELECT 1 AS int_col",
dialect="standard",
credentials=mock_service_account_credentials,
use_bqstorage_api=True,
)
assert df is not None

mock_list_rows = mock_bigquery_client.list_rows("dest", max_results=100)
mock_list_rows.to_dataframe.assert_called_once_with(
dtypes={"int_col": "my-custom-dtype"},
bqstorage_client=mock.ANY,
create_bqstorage_client=True,
dtypes=mock.ANY,
progress_bar_type=mock.ANY,
)

Expand All @@ -511,6 +532,5 @@ def test_read_gbq_calls_tqdm(

mock_list_rows = mock_bigquery_client.list_rows("dest", max_results=100)

mock_list_rows.to_dataframe.assert_called_once_with(
dtypes=mock.ANY, bqstorage_client=mock.ANY, progress_bar_type="foobar"
)
_, to_dataframe_kwargs = mock_list_rows.to_dataframe.call_args
assert to_dataframe_kwargs["progress_bar_type"] == "foobar"