Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move query and wait logic to separate module #720

Merged
merged 5 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pandas_gbq/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,12 @@ class PerformanceWarning(RuntimeWarning):
Such warnings can occur when dependencies for the requested feature
aren't up-to-date.
"""


class QueryTimeout(ValueError):
"""
Raised when the query request exceeds the timeoutMs value specified in the
BigQuery configuration.
"""

pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we keep the pass here? Other error classes do not have this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct. pass is not needed because there is a docstring. I'll remove.

144 changes: 19 additions & 125 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# license that can be found in the LICENSE file.

import copy
import concurrent.futures
from datetime import datetime
import logging
import re
Expand All @@ -20,8 +19,9 @@
if typing.TYPE_CHECKING: # pragma: NO COVER
import pandas

from pandas_gbq.exceptions import AccessDenied, GenericGBQException
from pandas_gbq.exceptions import GenericGBQException, QueryTimeout
from pandas_gbq.features import FEATURES
import pandas_gbq.query
import pandas_gbq.schema
import pandas_gbq.timestamp

Expand Down Expand Up @@ -130,15 +130,6 @@ class NotFoundException(ValueError):
pass


class QueryTimeout(ValueError):
"""
Raised when the query request exceeds the timeoutMs value specified in the
BigQuery configuration.
"""

pass


class TableCreationError(ValueError):
"""
Raised when the create table method fails
Expand Down Expand Up @@ -340,10 +331,6 @@ def __init__(
self.client = self.get_client()
self.use_bqstorage_api = use_bqstorage_api

# BQ Queries costs $5 per TB. First 1 TB per month is free
# see here for more: https://cloud.google.com/bigquery/pricing
self.query_price_for_TB = 5.0 / 2**40 # USD/TB

def _start_timer(self):
self.start = time.time()

Expand All @@ -355,16 +342,6 @@ def log_elapsed_seconds(self, prefix="Elapsed", postfix="s.", overlong=6):
if sec > overlong:
logger.info("{} {} {}".format(prefix, sec, postfix))

# http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
@staticmethod
def sizeof_fmt(num, suffix="B"):
fmt = "%3.1f %s%s"
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if abs(num) < 1024.0:
return fmt % (num, unit, suffix)
num /= 1024.0
return fmt % (num, "Y", suffix)

def get_client(self):
import google.api_core.client_info
from google.cloud import bigquery
Expand Down Expand Up @@ -421,46 +398,10 @@ def download_table(
user_dtypes=dtypes,
)

def _wait_for_query_job(self, query_reply, timeout_ms):
"""Wait for query to complete, pausing occasionally to update progress.

Args:
query_reply (QueryJob):
A query job which has started.

timeout_ms (Optional[int]):
How long to wait before cancelling the query.
"""
# Wait at most 10 seconds so we can show progress.
# TODO(https://github.com/googleapis/python-bigquery-pandas/issues/327):
# Include a tqdm progress bar here instead of a stream of log messages.
timeout_sec = 10.0
if timeout_ms:
timeout_sec = min(timeout_sec, timeout_ms / 1000.0)

while query_reply.state != "DONE":
self.log_elapsed_seconds(" Elapsed", "s. Waiting...")

if timeout_ms and timeout_ms < self.get_elapsed_seconds() * 1000:
self.client.cancel_job(
query_reply.job_id, location=query_reply.location
)
raise QueryTimeout("Query timeout: {} ms".format(timeout_ms))

try:
query_reply.result(timeout=timeout_sec)
except concurrent.futures.TimeoutError:
# Use our own timeout logic
pass
except self.http_error as ex:
self.process_http_error(ex)

def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
from google.auth.exceptions import RefreshError
from google.cloud import bigquery
import pandas

job_config = {
job_config_dict = {
"query": {
"useLegacySql": self.dialect
== "legacy"
Expand All @@ -470,74 +411,27 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
}
config = kwargs.get("configuration")
if config is not None:
job_config.update(config)
job_config_dict.update(config)

self._start_timer()

try:
logger.debug("Requesting query... ")
query_reply = self.client.query(
query,
job_config=bigquery.QueryJobConfig.from_api_repr(job_config),
location=self.location,
project=self.project_id,
)
logger.debug("Query running...")
except (RefreshError, ValueError) as ex:
if self.private_key:
raise AccessDenied(
f"The service account credentials are not valid: {ex}"
)
else:
raise AccessDenied(
"The credentials have been revoked or expired, "
f"please re-run the application to re-authorize: {ex}"
)
except self.http_error as ex:
self.process_http_error(ex)

job_id = query_reply.job_id
logger.debug("Job ID: %s" % job_id)

timeout_ms = job_config.get("jobTimeoutMs") or job_config["query"].get(
"timeoutMs"
)
timeout_ms = job_config_dict.get("jobTimeoutMs") or job_config_dict[
"query"
].get("timeoutMs")
timeout_ms = int(timeout_ms) if timeout_ms else None
self._wait_for_query_job(query_reply, timeout_ms)

if query_reply.cache_hit:
logger.debug("Query done.\nCache hit.\n")
else:
bytes_processed = query_reply.total_bytes_processed or 0
bytes_billed = query_reply.total_bytes_billed or 0
logger.debug(
"Query done.\nProcessed: {} Billed: {}".format(
self.sizeof_fmt(bytes_processed),
self.sizeof_fmt(bytes_billed),
)
)
logger.debug(
"Standard price: ${:,.2f} USD\n".format(
bytes_billed * self.query_price_for_TB
)
)
self._start_timer()
job_config = bigquery.QueryJobConfig.from_api_repr(job_config_dict)
rows_iter = pandas_gbq.query.query_and_wait(
self,
self.client,
query,
location=self.location,
project_id=self.project_id,
job_config=job_config,
max_results=max_results,
timeout_ms=timeout_ms,
)

dtypes = kwargs.get("dtypes")

# Ensure destination is populated.
try:
query_reply.result()
except self.http_error as ex:
self.process_http_error(ex)

# Avoid attempting to download results from DML queries, which have no
# destination.
if query_reply.destination is None:
return pandas.DataFrame()

rows_iter = self.client.list_rows(
query_reply.destination, max_results=max_results
)
return self._download_results(
rows_iter,
max_results=max_results,
Expand Down
135 changes: 135 additions & 0 deletions pandas_gbq/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright (c) 2017 pandas-gbq Authors All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.

from __future__ import annotations

import concurrent.futures
import logging
from typing import Optional

from google.cloud import bigquery

import pandas_gbq.exceptions


logger = logging.getLogger(__name__)


# On-demand BQ Queries costs $6.25 per TB. First 1 TB per month is free
# see here for more: https://cloud.google.com/bigquery/pricing
QUERY_PRICE_FOR_TB = 6.25 / 2**40 # USD/TB


# http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
def sizeof_fmt(num, suffix="B"):
fmt = "%3.1f %s%s"
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if abs(num) < 1024.0:
return fmt % (num, unit, suffix)
num /= 1024.0
return fmt % (num, "Y", suffix)


def _wait_for_query_job(
connector,
client: bigquery.Client,
query_reply: bigquery.QueryJob,
timeout_ms: Optional[float],
):
"""Wait for query to complete, pausing occasionally to update progress.

Args:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this method has been moved out to be a helper function, maybe we should also update the description as the parameters have changed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated docstrings here and added docstrings to the query_and_wait() method.

query_reply (QueryJob):
A query job which has started.

timeout_ms (Optional[int]):
How long to wait before cancelling the query.
"""
# Wait at most 10 seconds so we can show progress.
# TODO(https://github.com/googleapis/python-bigquery-pandas/issues/327):
# Include a tqdm progress bar here instead of a stream of log messages.
timeout_sec = 10.0
if timeout_ms:
timeout_sec = min(timeout_sec, timeout_ms / 1000.0)

while query_reply.state != "DONE":
connector.log_elapsed_seconds(" Elapsed", "s. Waiting...")

if timeout_ms and timeout_ms < connector.get_elapsed_seconds() * 1000:
client.cancel_job(query_reply.job_id, location=query_reply.location)
raise pandas_gbq.exceptions.QueryTimeout(
"Query timeout: {} ms".format(timeout_ms)
)

try:
query_reply.result(timeout=timeout_sec)
except concurrent.futures.TimeoutError:
# Use our own timeout logic
pass
except connector.http_error as ex:
connector.process_http_error(ex)


def query_and_wait(
connector,
client: bigquery.Client,
query: str,
*,
job_config: bigquery.QueryJobConfig,
location: Optional[str],
project_id: Optional[str],
max_results: Optional[int],
timeout_ms: Optional[int],
):
from google.auth.exceptions import RefreshError

try:
logger.debug("Requesting query... ")
query_reply = client.query(
query,
job_config=job_config,
location=location,
project=project_id,
)
logger.debug("Query running...")
except (RefreshError, ValueError) as ex:
if connector.private_key:
raise pandas_gbq.exceptions.AccessDenied(
f"The service account credentials are not valid: {ex}"
)
else:
raise pandas_gbq.exceptions.AccessDenied(
"The credentials have been revoked or expired, "
f"please re-run the application to re-authorize: {ex}"
)
except connector.http_error as ex:
connector.process_http_error(ex)

job_id = query_reply.job_id
logger.debug("Job ID: %s" % job_id)

_wait_for_query_job(connector, connector.client, query_reply, timeout_ms)

if query_reply.cache_hit:
logger.debug("Query done.\nCache hit.\n")
else:
bytes_processed = query_reply.total_bytes_processed or 0
bytes_billed = query_reply.total_bytes_billed or 0
logger.debug(
"Query done.\nProcessed: {} Billed: {}".format(
sizeof_fmt(bytes_processed),
sizeof_fmt(bytes_billed),
)
)
logger.debug(
"Standard price: ${:,.2f} USD\n".format(bytes_billed * QUERY_PRICE_FOR_TB)
)

# As of google-cloud-bigquery 2.3.0, QueryJob.result() uses
# getQueryResults() instead of tabledata.list, which returns the correct
# response with DML/DDL queries.
try:
return query_reply.result(max_results=max_results)
except connector.http_error as ex:
connector.process_http_error(ex)
Loading