Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: Add tqdm progress bar for downloads #7552

Merged
merged 12 commits into from
Mar 28, 2019
2 changes: 1 addition & 1 deletion bigquery/docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,9 @@

# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {
"python": ("http://python.readthedocs.org/en/latest/", None),
"gax": ("https://gax-python.readthedocs.org/en/latest/", None),
"pandas": ("https://pandas.pydata.org/pandas-docs/stable/", None),
"python": ("http://python.readthedocs.org/en/latest/", None),
}

# Napoleon settings
Expand Down
12 changes: 11 additions & 1 deletion bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2778,7 +2778,7 @@ def result(self, timeout=None, retry=DEFAULT_RETRY):
dest_table = Table(dest_table_ref, schema=schema)
return self._client.list_rows(dest_table, retry=retry)

def to_dataframe(self, bqstorage_client=None, dtypes=None):
def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
"""Return a pandas DataFrame from a QueryJob

Args:
Expand All @@ -2805,6 +2805,16 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None):
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.
progress_bar_type (Optional[str]):
If set, use the `tqdm <https://tqdm.github.io/>`_ library to
display a progress bar while the data downloads. Install the
``tqdm`` package to use this feature.

See
:func:`~google.cloud.bigquery.table.RowIterator.to_dataframe`
for details.

..versionadded:: 1.11.0

Returns:
A :class:`~pandas.DataFrame` populated with row data and column
Expand Down
75 changes: 70 additions & 5 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
except ImportError: # pragma: NO COVER
pandas = None

try:
import tqdm
except ImportError: # pragma: NO COVER
tqdm = None

from google.api_core.page_iterator import HTTPIterator

import google.cloud._helpers
Expand All @@ -44,6 +49,10 @@
"The pandas library is not installed, please install "
"pandas to use the to_dataframe() function."
)
_NO_TQDM_ERROR = (
"A progress bar was requested, but there was an error loading the tqdm "
"library. Please install tqdm to use the progress bar functionality."
)
_TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"'
_MARKER = object()

Expand Down Expand Up @@ -1330,12 +1339,22 @@ def _to_dataframe_dtypes(self, page, column_names, dtypes):
columns[column] = pandas.Series(columns[column], dtype=dtypes[column])
return pandas.DataFrame(columns, columns=column_names)

def _to_dataframe_tabledata_list(self, dtypes):
def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None):
"""Use (slower, but free) tabledata.list to construct a DataFrame."""
column_names = [field.name for field in self.schema]
frames = []

for page in iter(self.pages):
frames.append(self._to_dataframe_dtypes(page, column_names, dtypes))
current_frame = self._to_dataframe_dtypes(page, column_names, dtypes)
frames.append(current_frame)

if progress_bar is not None:
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(len(current_frame))

return pandas.concat(frames)

def _to_dataframe_bqstorage(self, bqstorage_client, dtypes):
Expand Down Expand Up @@ -1385,10 +1404,34 @@ def get_dataframe(stream):
# the end using manually-parsed schema.
return pandas.concat(frames)[columns]

def to_dataframe(self, bqstorage_client=None, dtypes=None):
def _get_progress_bar(self, progress_bar_type):
"""Construct a tqdm progress bar object, if tqdm is installed."""
if tqdm is None:
if progress_bar_type is not None:
warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3)
return None

try:
if progress_bar_type == "tqdm":
return tqdm.tqdm(desc="Downloading", total=self.total_rows, unit="rows")
elif progress_bar_type == "tqdm_notebook":
return tqdm.tqdm_notebook(
desc="Downloading", total=self.total_rows, unit="rows"
)
elif progress_bar_type == "tqdm_gui":
return tqdm.tqdm_gui(
desc="Downloading", total=self.total_rows, unit="rows"
)
except (KeyError, TypeError):
# Protect ourselves from any tqdm errors. In case of
# unexpected tqdm behavior, just fall back to showing
# no progress bar.
warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3)
return None

def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
"""Create a pandas DataFrame by loading all pages of a query.


Args:
bqstorage_client ( \
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
Expand All @@ -1413,6 +1456,26 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None):
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.
progress_bar_type (Optional[str]):
If set, use the `tqdm <https://tqdm.github.io/>`_ library to
display a progress bar while the data downloads. Install the
``tqdm`` package to use this feature.

Possible values of ``progress_bar_type`` include:

``None``
No progress bar.
``'tqdm'``
Use the :func:`tqdm.tqdm` function to print a progress bar
to :data:`sys.stderr`.
``'tqdm_notebook'``
Use the :func:`tqdm.tqdm_notebook` function to display a
progress bar as a Jupyter notebook widget.
``'tqdm_gui'``
Use the :func:`tqdm.tqdm_gui` function to display a
progress bar as a graphical dialog box.

..versionadded:: 1.11.0

Returns:
pandas.DataFrame:
Expand All @@ -1429,10 +1492,12 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None):
if dtypes is None:
dtypes = {}

progress_bar = self._get_progress_bar(progress_bar_type)

if bqstorage_client is not None:
return self._to_dataframe_bqstorage(bqstorage_client, dtypes)
else:
return self._to_dataframe_tabledata_list(dtypes)
return self._to_dataframe_tabledata_list(dtypes, progress_bar=progress_bar)


class _EmptyRowIterator(object):
Expand Down
4 changes: 2 additions & 2 deletions bigquery/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ def default(session):

# Pyarrow does not support Python 3.7
if session.python == '3.7':
dev_install = '.[pandas]'
dev_install = '.[pandas, tqdm]'
else:
dev_install = '.[pandas, pyarrow]'
dev_install = '.[pandas, pyarrow, tqdm]'
session.install('-e', dev_install)

# IPython does not support Python 2 after version 5.x
Expand Down
1 change: 1 addition & 0 deletions bigquery/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
# Exclude PyArrow dependency from Windows Python 2.7.
'pyarrow: platform_system != "Windows" or python_version >= "3.4"':
'pyarrow>=0.4.1',
'tqdm': 'tqdm >= 4.0.0, <5.0.0dev',
'fastparquet': ['fastparquet', 'python-snappy'],
}

Expand Down
138 changes: 129 additions & 9 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import itertools
import json
import unittest
import warnings

import mock
import pytest
Expand All @@ -29,6 +30,11 @@
except (ImportError, AttributeError): # pragma: NO COVER
pandas = None

try:
from tqdm import tqdm
except (ImportError, AttributeError): # pragma: NO COVER
tqdm = None

from google.cloud.bigquery.dataset import DatasetReference


Expand Down Expand Up @@ -901,7 +907,6 @@ def test_time_partitioning_setter_none(self):
self.assertIsNone(table.time_partitioning)

def test_partitioning_type_setter(self):
import warnings
from google.cloud.bigquery.table import TimePartitioningType

dataset = DatasetReference(self.PROJECT, self.DS_ID)
Expand All @@ -920,7 +925,6 @@ def test_partitioning_type_setter(self):
self.assertIs(warning.category, PendingDeprecationWarning)

def test_partitioning_type_setter_w_time_partitioning_set(self):
import warnings
from google.cloud.bigquery.table import TimePartitioning

dataset = DatasetReference(self.PROJECT, self.DS_ID)
Expand All @@ -938,7 +942,6 @@ def test_partitioning_type_setter_w_time_partitioning_set(self):
self.assertIs(warning.category, PendingDeprecationWarning)

def test_partitioning_expiration_setter_w_time_partitioning_set(self):
import warnings
from google.cloud.bigquery.table import TimePartitioning

dataset = DatasetReference(self.PROJECT, self.DS_ID)
Expand All @@ -956,8 +959,6 @@ def test_partitioning_expiration_setter_w_time_partitioning_set(self):
self.assertIs(warning.category, PendingDeprecationWarning)

def test_partition_expiration_setter(self):
import warnings

dataset = DatasetReference(self.PROJECT, self.DS_ID)
table_ref = dataset.table(self.TABLE_NAME)
table = self._make_one(table_ref)
Expand Down Expand Up @@ -1112,8 +1113,6 @@ def _make_one(self, *args, **kw):
return self._get_target_class()(*args, **kw)

def test_ctor(self):
import warnings

project = "test-project"
dataset_id = "test_dataset"
table_id = "coffee_table"
Expand Down Expand Up @@ -1191,8 +1190,6 @@ def test_ctor_view(self):
self.assertTrue(table.view_use_legacy_sql)

def test_ctor_missing_properties(self):
import warnings

resource = {
"tableReference": {
"projectId": "testproject",
Expand Down Expand Up @@ -1413,6 +1410,129 @@ def test_to_dataframe(self):
self.assertEqual(df.name.dtype.name, "object")
self.assertEqual(df.age.dtype.name, "int64")

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(tqdm is None, "Requires `tqdm`")
@mock.patch("tqdm.tqdm_gui")
@mock.patch("tqdm.tqdm_notebook")
@mock.patch("tqdm.tqdm")
def test_to_dataframe_progress_bar(
self, tqdm_mock, tqdm_notebook_mock, tqdm_gui_mock
):
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.table import SchemaField

schema = [
SchemaField("name", "STRING", mode="REQUIRED"),
SchemaField("age", "INTEGER", mode="REQUIRED"),
]
rows = [
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
{"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]},
{"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]},
]
path = "/foo"
api_request = mock.Mock(return_value={"rows": rows})

progress_bars = (
("tqdm", tqdm_mock),
("tqdm_notebook", tqdm_notebook_mock),
("tqdm_gui", tqdm_gui_mock),
)

for progress_bar_type, progress_bar_mock in progress_bars:
row_iterator = RowIterator(_mock_client(), api_request, path, schema)
df = row_iterator.to_dataframe(progress_bar_type=progress_bar_type)

progress_bar_mock.assert_called()
progress_bar_mock().update.assert_called()
self.assertEqual(len(df), 4)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@mock.patch("google.cloud.bigquery.table.tqdm", new=None)
def test_to_dataframe_no_tqdm_no_progress_bar(self):
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.table import SchemaField

schema = [
SchemaField("name", "STRING", mode="REQUIRED"),
SchemaField("age", "INTEGER", mode="REQUIRED"),
]
rows = [
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
{"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]},
{"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]},
]
path = "/foo"
api_request = mock.Mock(return_value={"rows": rows})
row_iterator = RowIterator(_mock_client(), api_request, path, schema)

with warnings.catch_warnings(record=True) as warned:
df = row_iterator.to_dataframe()

self.assertEqual(len(warned), 0)
self.assertEqual(len(df), 4)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@mock.patch("google.cloud.bigquery.table.tqdm", new=None)
def test_to_dataframe_no_tqdm(self):
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.table import SchemaField

schema = [
SchemaField("name", "STRING", mode="REQUIRED"),
SchemaField("age", "INTEGER", mode="REQUIRED"),
]
rows = [
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
{"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]},
{"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]},
]
path = "/foo"
api_request = mock.Mock(return_value={"rows": rows})
row_iterator = RowIterator(_mock_client(), api_request, path, schema)

with warnings.catch_warnings(record=True) as warned:
df = row_iterator.to_dataframe(progress_bar_type="tqdm")

self.assertEqual(len(warned), 1)
for warning in warned:
self.assertIs(warning.category, UserWarning)

# Even though the progress bar won't show, downloading the dataframe
# should still work.
self.assertEqual(len(df), 4)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(tqdm is None, "Requires `tqdm`")
@mock.patch("tqdm.tqdm_gui", new=None) # will raise TypeError on call
@mock.patch("tqdm.tqdm_notebook", new=None) # will raise TypeError on call
@mock.patch("tqdm.tqdm", new=None) # will raise TypeError on call
def test_to_dataframe_tqdm_error(self):
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.table import SchemaField

schema = [
SchemaField("name", "STRING", mode="REQUIRED"),
SchemaField("age", "INTEGER", mode="REQUIRED"),
]
rows = [
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
{"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]},
{"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]},
]
path = "/foo"

for progress_bar_type in ("tqdm", "tqdm_notebook", "tqdm_gui"):
api_request = mock.Mock(return_value={"rows": rows})
row_iterator = RowIterator(_mock_client(), api_request, path, schema)
df = row_iterator.to_dataframe(progress_bar_type=progress_bar_type)

self.assertEqual(len(df), 4) # all should be well

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_to_dataframe_w_empty_results(self):
from google.cloud.bigquery.table import RowIterator
Expand Down
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,13 @@

# Configuration for intersphinx:
intersphinx_mapping = {
'fastavro': ('https://fastavro.readthedocs.io/en/stable/', None),
'google-auth': ('https://google-auth.readthedocs.io/en/stable', None),
'google-gax': ('https://gax-python.readthedocs.io/en/latest/', None),
'grpc': ('https://grpc.io/grpc/python/', None),
'requests': ('http://docs.python-requests.org/en/master/', None),
'fastavro': ('https://fastavro.readthedocs.io/en/stable/', None),
'pandas': ('https://pandas.pydata.org/pandas-docs/stable/', None),
'python': ('https://docs.python.org/3', None),
'requests': ('http://docs.python-requests.org/en/master/', None),
}

# Static HTML pages, e.g. to support redirects
Expand Down