Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for schema auto-detection feature in the job method `Lo… #3648

Merged
merged 4 commits into from
Jul 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 38 additions & 7 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,20 @@ def _error_result_to_exception(error_result):
status_code, error_result.get('message', ''), errors=[error_result])


class AutoDetectSchema(_TypedProperty):
"""Typed Property for ``autodetect`` properties.

:raises ValueError: on ``set`` operation if ``instance.schema``
is already defined.
"""
def __set__(self, instance, value):
self._validate(value)
if instance.schema:
raise ValueError('A schema should not be already defined '
'when using schema auto-detection')
setattr(instance._configuration, self._backing_name, value)


class Compression(_EnumProperty):
"""Pseudo-enum for ``compression`` properties."""
GZIP = 'GZIP'
Expand Down Expand Up @@ -505,6 +519,7 @@ class _LoadConfiguration(object):
"""
_allow_jagged_rows = None
_allow_quoted_newlines = None
_autodetect = None
_create_disposition = None
_encoding = None
_field_delimiter = None
Expand Down Expand Up @@ -544,9 +559,10 @@ def __init__(self, name, destination, source_uris, client, schema=()):
super(LoadTableFromStorageJob, self).__init__(name, client)
self.destination = destination
self.source_uris = source_uris
# Let the @property do validation.
self.schema = schema
self._configuration = _LoadConfiguration()
# Let the @property do validation. This must occur after all other
# attributes have been set.
self.schema = schema

@property
def schema(self):
Expand All @@ -564,12 +580,20 @@ def schema(self, value):
:type value: list of :class:`SchemaField`
:param value: fields describing the schema

:raises: TypeError if 'value' is not a sequence, or ValueError if
any item in the sequence is not a SchemaField
:raises TypeError: If ``value`is not a sequence.
:raises ValueError: If any item in the sequence is not
a ``SchemaField``.
"""
if not all(isinstance(field, SchemaField) for field in value):
raise ValueError('Schema items must be fields')
self._schema = tuple(value)
if not value:
self._schema = ()
else:
if not all(isinstance(field, SchemaField) for field in value):
raise ValueError('Schema items must be fields')
if self.autodetect:
raise ValueError(
'Schema can not be set if `autodetect` property is True')

self._schema = tuple(value)

@property
def input_file_bytes(self):
Expand Down Expand Up @@ -625,6 +649,11 @@ def output_rows(self):
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.allowQuotedNewlines
"""

autodetect = AutoDetectSchema('autodetect', bool)
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.autodetect
"""

create_disposition = CreateDisposition('create_disposition')
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.createDisposition
Expand Down Expand Up @@ -676,6 +705,8 @@ def _populate_config_resource(self, configuration):
configuration['allowJaggedRows'] = self.allow_jagged_rows
if self.allow_quoted_newlines is not None:
configuration['allowQuotedNewlines'] = self.allow_quoted_newlines
if self.autodetect is not None:
configuration['autodetect'] = self.autodetect
if self.create_disposition is not None:
configuration['createDisposition'] = self.create_disposition
if self.encoding is not None:
Expand Down
81 changes: 74 additions & 7 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import base64
import csv
import datetime
import json
import operator
Expand All @@ -21,6 +22,8 @@
import unittest
import uuid

import six

from google.cloud import bigquery
from google.cloud._helpers import UTC
from google.cloud.bigquery import dbapi
Expand Down Expand Up @@ -290,8 +293,6 @@ def test_update_table(self):

@staticmethod
def _fetch_single_page(table):
import six

iterator = table.fetch_data()
page = six.next(iterator.pages)
return list(page)
Expand Down Expand Up @@ -341,7 +342,6 @@ def test_insert_data_then_dump_table(self):
sorted(ROWS, key=by_age))

def test_load_table_from_local_file_then_dump_table(self):
import csv
from google.cloud._testing import _NamedTemporaryFile

ROWS = [
Expand Down Expand Up @@ -432,7 +432,6 @@ def test_load_table_from_local_avro_file_then_dump_table(self):
sorted(ROWS, key=by_wavelength))

def test_load_table_from_storage_then_dump_table(self):
import csv
from google.cloud._testing import _NamedTemporaryFile
from google.cloud.storage import Client as StorageClient

Expand All @@ -448,11 +447,11 @@ def test_load_table_from_storage_then_dump_table(self):
]
TABLE_NAME = 'test_table'

s_client = StorageClient()
storage_client = StorageClient()

# In the **very** rare case the bucket name is reserved, this
# fails with a ConnectionError.
bucket = s_client.create_bucket(BUCKET_NAME)
bucket = storage_client.create_bucket(BUCKET_NAME)
self.to_delete.append(bucket)

blob = bucket.blob(BLOB_NAME)
Expand Down Expand Up @@ -501,6 +500,75 @@ def test_load_table_from_storage_then_dump_table(self):
self.assertEqual(sorted(rows, key=by_age),
sorted(ROWS, key=by_age))

def test_load_table_from_storage_w_autodetect_schema(self):
from google.cloud._testing import _NamedTemporaryFile
from google.cloud.storage import Client as StorageClient
from google.cloud.bigquery import SchemaField

local_id = unique_resource_id()
bucket_name = 'bq_load_test' + local_id
blob_name = 'person_ages.csv'
gs_url = 'gs://{}/{}'.format(bucket_name, blob_name)
rows = [
('Phred Phlyntstone', 32),
('Bharney Rhubble', 33),
('Wylma Phlyntstone', 29),
('Bhettye Rhubble', 27),
] * 100 # BigQuery internally uses the first 100 rows to detect schema
table_name = 'test_table'

storage_client = StorageClient()

# In the **very** rare case the bucket name is reserved, this
# fails with a ConnectionError.
bucket = storage_client.create_bucket(bucket_name)
self.to_delete.append(bucket)

blob = bucket.blob(blob_name)

with _NamedTemporaryFile() as temp:
with open(temp.name, 'w') as csv_write:
writer = csv.writer(csv_write)
writer.writerow(('Full Name', 'Age'))
writer.writerows(rows)

with open(temp.name, 'rb') as csv_read:
blob.upload_from_file(csv_read, content_type='text/csv')

self.to_delete.insert(0, blob)

dataset = Config.CLIENT.dataset(
_make_dataset_name('load_gcs_then_dump'))

retry_403(dataset.create)()
self.to_delete.append(dataset)

table = dataset.table(table_name)
self.to_delete.insert(0, table)

job = Config.CLIENT.load_table_from_storage(
'bq_load_storage_test_' + local_id, table, gs_url)
job.autodetect = True

job.begin()

# Allow for 90 seconds of "warm up" before rows visible. See
# https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability
# 8 tries -> 1 + 2 + 4 + 8 + 16 + 32 + 64 = 127 seconds
retry = RetryInstanceState(_job_done, max_tries=8)
retry(job.reload)()

table.reload()
field_name = SchemaField(
u'Full_Name', u'string', u'NULLABLE', None, ())
field_age = SchemaField(u'Age', u'integer', u'NULLABLE', None, ())
self.assertEqual(table.schema, [field_name, field_age])

actual_rows = self._fetch_single_page(table)
by_age = operator.itemgetter(1)
self.assertEqual(
sorted(actual_rows, key=by_age), sorted(rows, key=by_age))

def test_job_cancel(self):
DATASET_NAME = _make_dataset_name('job_cancel')
JOB_NAME = 'fetch_' + DATASET_NAME
Expand Down Expand Up @@ -674,7 +742,6 @@ def test_dbapi_w_standard_sql_types(self):
self.assertIsNone(row)

def _load_table_for_dml(self, rows, dataset_name, table_name):
import csv
from google.cloud._testing import _NamedTemporaryFile

dataset = Config.CLIENT.dataset(dataset_name)
Expand Down
82 changes: 82 additions & 0 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ def _verifyBooleanConfigProperties(self, job, config):
config['allowQuotedNewlines'])
else:
self.assertIsNone(job.allow_quoted_newlines)
if 'autodetect' in config:
self.assertEqual(
job.autodetect, config['autodetect'])
else:
self.assertIsNone(job.autodetect)
if 'ignoreUnknownValues' in config:
self.assertEqual(job.ignore_unknown_values,
config['ignoreUnknownValues'])
Expand Down Expand Up @@ -277,6 +282,7 @@ def test_ctor(self):
# set/read from resource['configuration']['load']
self.assertIsNone(job.allow_jagged_rows)
self.assertIsNone(job.allow_quoted_newlines)
self.assertIsNone(job.autodetect)
self.assertIsNone(job.create_disposition)
self.assertIsNone(job.encoding)
self.assertIsNone(job.field_delimiter)
Expand Down Expand Up @@ -326,6 +332,41 @@ def test_schema_setter(self):
job.schema = [full_name, age]
self.assertEqual(job.schema, [full_name, age])

def test_schema_setter_w_autodetect(self):
from google.cloud.bigquery.schema import SchemaField

client = _Client(self.PROJECT)
table = _Table()
full_name = SchemaField('full_name', 'STRING')
job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client)
job.autodetect = False
job.schema = [full_name]
self.assertEqual(job.schema, [full_name])

job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client)
job.autodetect = True
with self.assertRaises(ValueError):
job.schema = [full_name]

def test_autodetect_setter_w_schema(self):
from google.cloud.bigquery.schema import SchemaField

client = _Client(self.PROJECT)
table = _Table()
full_name = SchemaField('full_name', 'STRING')
job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client)

job.autodetect = True
job.schema = []
self.assertEqual(job.schema, [])

job.autodetect = False
job.schema = [full_name]
self.assertEqual(job.autodetect, False)

with self.assertRaises(ValueError):
job.autodetect = True

def test_props_set_by_server(self):
import datetime
from google.cloud._helpers import UTC
Expand Down Expand Up @@ -491,6 +532,47 @@ def test_begin_w_bound_client(self):
self.assertEqual(req['data'], SENT)
self._verifyResourceProperties(job, RESOURCE)

def test_begin_w_autodetect(self):
path = '/projects/{}/jobs'.format(self.PROJECT)
resource = self._makeResource()
resource['configuration']['load']['autodetect'] = True
# Ensure None for missing server-set props
del resource['statistics']['creationTime']
del resource['etag']
del resource['selfLink']
del resource['user_email']
conn = _Connection(resource)
client = _Client(project=self.PROJECT, connection=conn)
table = _Table()
job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client)
job.autodetect = True
job.begin()

sent = {
'jobReference': {
'projectId': self.PROJECT,
'jobId': self.JOB_NAME,
},
'configuration': {
'load': {
'sourceUris': [self.SOURCE1],
'destinationTable': {
'projectId': self.PROJECT,
'datasetId': self.DS_NAME,
'tableId': self.TABLE_NAME,
},
'autodetect': True
},
},
}
expected_request = {
'method': 'POST',
'path': path,
'data': sent,
}
self.assertEqual(conn._requested, [expected_request])
self._verifyResourceProperties(job, resource)

def test_begin_w_alternate_client(self):
from google.cloud.bigquery.schema import SchemaField

Expand Down