Skip to content

Commit

Permalink
BigQuery - add get_query_results method. (#3838)
Browse files Browse the repository at this point in the history
This method calls the getQueryResults API directly and returns a
QueryResults object. Note: the response from this API does not include
the query, so I modified the constructor to make query optional in this
case.
  • Loading branch information
tswast authored Aug 18, 2017
1 parent 406ecad commit 7a8f5a9
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 0 deletions.
35 changes: 35 additions & 0 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,41 @@ def dataset(self, dataset_name, project=None):
"""
return Dataset(dataset_name, client=self, project=project)

def get_query_results(self, job_id, project=None, timeout_ms=None):
"""Get the query results object for a query job.
:type job_id: str
:param job_id: Name of the query job.
:type project: str
:param project:
(Optional) project ID for the query job (defaults to the project of
the client).
:type timeout_ms: int
:param timeout_ms:
(Optional) number of milliseconds the the API call should wait for
the query to complete before the request times out.
:rtype: :class:`google.cloud.bigquery.query.QueryResults`
:returns: a new ``QueryResults`` instance
"""

extra_params = {'maxResults': 0}

if project is None:
project = self.project

if timeout_ms is not None:
extra_params['timeoutMs'] = timeout_ms

path = '/projects/{}/queries/{}'.format(project, job_id)

resource = self._connection.api_request(
method='GET', path=path, query_params=extra_params)

return QueryResults.from_api_repr(resource, self)

def job_from_resource(self, resource):
"""Detect correct job type from resource and instantiate.
Expand Down
6 changes: 6 additions & 0 deletions bigquery/google/cloud/bigquery/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ def __init__(self, query, client, udf_resources=(), query_parameters=()):
self.query_parameters = query_parameters
self._job = None

@classmethod
def from_api_repr(cls, api_response, client):
instance = cls(None, client)
instance._set_properties(api_response)
return instance

@classmethod
def from_query_job(cls, job):
"""Factory: construct from an existing job.
Expand Down
7 changes: 7 additions & 0 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,13 @@ def test_job_cancel(self):
# raise an error, and that the job completed (in the `retry()`
# above).

def test_get_query_results(self):
job_id = 'test-get-query-results-' + str(uuid.uuid4())
query_job = Config.CLIENT.run_async_query(job_id, 'SELECT 1')
query_job.begin()
results = Config.CLIENT.get_query_results(job_id)
self.assertEqual(results.total_rows, 1)

def test_sync_query_w_legacy_sql_types(self):
naive = datetime.datetime(2016, 12, 5, 12, 41, 9)
stamp = '%s %s' % (naive.date().isoformat(), naive.time().isoformat())
Expand Down
63 changes: 63 additions & 0 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,64 @@ def test_ctor(self):
self.assertIs(client._connection.credentials, creds)
self.assertIs(client._connection.http, http)

def test_get_job_miss_w_explicit_project_and_timeout(self):
from google.cloud.exceptions import NotFound

project = 'PROJECT'
creds = _make_credentials()
client = self._make_one(project, creds)
conn = client._connection = _Connection()

with self.assertRaises(NotFound):
client.get_query_results(
'nothere', project='other-project', timeout_ms=500)

self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'GET')
self.assertEqual(
req['path'], '/projects/other-project/queries/nothere')
self.assertEqual(
req['query_params'], {'maxResults': 0, 'timeoutMs': 500})

def test_get_query_results_hit(self):
project = 'PROJECT'
job_id = 'query_job'
data = {
'kind': 'bigquery#getQueryResultsResponse',
'etag': 'some-tag',
'schema': {
'fields': [
{
'name': 'title',
'type': 'STRING',
'mode': 'NULLABLE'
},
{
'name': 'unique_words',
'type': 'INTEGER',
'mode': 'NULLABLE'
}
]
},
'jobReference': {
'projectId': project,
'jobId': job_id,
},
'totalRows': '10',
'totalBytesProcessed': '2464625',
'jobComplete': True,
'cacheHit': False,
}

creds = _make_credentials()
client = self._make_one(project, creds)
client._connection = _Connection(data)
query_results = client.get_query_results(job_id)

self.assertEqual(query_results.total_rows, 10)
self.assertTrue(query_results.complete)

def test_list_projects_defaults(self):
import six
from google.cloud.bigquery.client import Project
Expand Down Expand Up @@ -607,6 +665,11 @@ def __init__(self, *responses):
self._requested = []

def api_request(self, **kw):
from google.cloud.exceptions import NotFound
self._requested.append(kw)

if len(self._responses) == 0:
raise NotFound('miss')

response, self._responses = self._responses[0], self._responses[1:]
return response

0 comments on commit 7a8f5a9

Please sign in to comment.