From d45612afc5c12592dc4425c64f226734f896bbef Mon Sep 17 00:00:00 2001 From: Willian Fuks Date: Fri, 28 Jul 2017 14:08:25 -0300 Subject: [PATCH] Added support for schema auto-detection feature in `LoadTableFromStorageJob` (#3648) --- bigquery/google/cloud/bigquery/job.py | 45 ++++++++++++--- bigquery/tests/system.py | 81 +++++++++++++++++++++++--- bigquery/tests/unit/test_job.py | 82 +++++++++++++++++++++++++++ 3 files changed, 194 insertions(+), 14 deletions(-) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index c2d1feee7120..953a2c265580 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -80,6 +80,20 @@ def _error_result_to_exception(error_result): status_code, error_result.get('message', ''), errors=[error_result]) +class AutoDetectSchema(_TypedProperty): + """Typed Property for ``autodetect`` properties. + + :raises ValueError: on ``set`` operation if ``instance.schema`` + is already defined. + """ + def __set__(self, instance, value): + self._validate(value) + if instance.schema: + raise ValueError('A schema should not be already defined ' + 'when using schema auto-detection') + setattr(instance._configuration, self._backing_name, value) + + class Compression(_EnumProperty): """Pseudo-enum for ``compression`` properties.""" GZIP = 'GZIP' @@ -505,6 +519,7 @@ class _LoadConfiguration(object): """ _allow_jagged_rows = None _allow_quoted_newlines = None + _autodetect = None _create_disposition = None _encoding = None _field_delimiter = None @@ -544,9 +559,10 @@ def __init__(self, name, destination, source_uris, client, schema=()): super(LoadTableFromStorageJob, self).__init__(name, client) self.destination = destination self.source_uris = source_uris - # Let the @property do validation. - self.schema = schema self._configuration = _LoadConfiguration() + # Let the @property do validation. This must occur after all other + # attributes have been set. + self.schema = schema @property def schema(self): @@ -564,12 +580,20 @@ def schema(self, value): :type value: list of :class:`SchemaField` :param value: fields describing the schema - :raises: TypeError if 'value' is not a sequence, or ValueError if - any item in the sequence is not a SchemaField + :raises TypeError: If ``value`is not a sequence. + :raises ValueError: If any item in the sequence is not + a ``SchemaField``. """ - if not all(isinstance(field, SchemaField) for field in value): - raise ValueError('Schema items must be fields') - self._schema = tuple(value) + if not value: + self._schema = () + else: + if not all(isinstance(field, SchemaField) for field in value): + raise ValueError('Schema items must be fields') + if self.autodetect: + raise ValueError( + 'Schema can not be set if `autodetect` property is True') + + self._schema = tuple(value) @property def input_file_bytes(self): @@ -625,6 +649,11 @@ def output_rows(self): https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.allowQuotedNewlines """ + autodetect = AutoDetectSchema('autodetect', bool) + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.autodetect + """ + create_disposition = CreateDisposition('create_disposition') """See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.createDisposition @@ -676,6 +705,8 @@ def _populate_config_resource(self, configuration): configuration['allowJaggedRows'] = self.allow_jagged_rows if self.allow_quoted_newlines is not None: configuration['allowQuotedNewlines'] = self.allow_quoted_newlines + if self.autodetect is not None: + configuration['autodetect'] = self.autodetect if self.create_disposition is not None: configuration['createDisposition'] = self.create_disposition if self.encoding is not None: diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 1d3da3d2a83d..9d3bb7794256 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -13,6 +13,7 @@ # limitations under the License. import base64 +import csv import datetime import json import operator @@ -21,6 +22,8 @@ import unittest import uuid +import six + from google.cloud import bigquery from google.cloud._helpers import UTC from google.cloud.bigquery import dbapi @@ -290,8 +293,6 @@ def test_update_table(self): @staticmethod def _fetch_single_page(table): - import six - iterator = table.fetch_data() page = six.next(iterator.pages) return list(page) @@ -341,7 +342,6 @@ def test_insert_data_then_dump_table(self): sorted(ROWS, key=by_age)) def test_load_table_from_local_file_then_dump_table(self): - import csv from google.cloud._testing import _NamedTemporaryFile ROWS = [ @@ -432,7 +432,6 @@ def test_load_table_from_local_avro_file_then_dump_table(self): sorted(ROWS, key=by_wavelength)) def test_load_table_from_storage_then_dump_table(self): - import csv from google.cloud._testing import _NamedTemporaryFile from google.cloud.storage import Client as StorageClient @@ -448,11 +447,11 @@ def test_load_table_from_storage_then_dump_table(self): ] TABLE_NAME = 'test_table' - s_client = StorageClient() + storage_client = StorageClient() # In the **very** rare case the bucket name is reserved, this # fails with a ConnectionError. - bucket = s_client.create_bucket(BUCKET_NAME) + bucket = storage_client.create_bucket(BUCKET_NAME) self.to_delete.append(bucket) blob = bucket.blob(BLOB_NAME) @@ -501,6 +500,75 @@ def test_load_table_from_storage_then_dump_table(self): self.assertEqual(sorted(rows, key=by_age), sorted(ROWS, key=by_age)) + def test_load_table_from_storage_w_autodetect_schema(self): + from google.cloud._testing import _NamedTemporaryFile + from google.cloud.storage import Client as StorageClient + from google.cloud.bigquery import SchemaField + + local_id = unique_resource_id() + bucket_name = 'bq_load_test' + local_id + blob_name = 'person_ages.csv' + gs_url = 'gs://{}/{}'.format(bucket_name, blob_name) + rows = [ + ('Phred Phlyntstone', 32), + ('Bharney Rhubble', 33), + ('Wylma Phlyntstone', 29), + ('Bhettye Rhubble', 27), + ] * 100 # BigQuery internally uses the first 100 rows to detect schema + table_name = 'test_table' + + storage_client = StorageClient() + + # In the **very** rare case the bucket name is reserved, this + # fails with a ConnectionError. + bucket = storage_client.create_bucket(bucket_name) + self.to_delete.append(bucket) + + blob = bucket.blob(blob_name) + + with _NamedTemporaryFile() as temp: + with open(temp.name, 'w') as csv_write: + writer = csv.writer(csv_write) + writer.writerow(('Full Name', 'Age')) + writer.writerows(rows) + + with open(temp.name, 'rb') as csv_read: + blob.upload_from_file(csv_read, content_type='text/csv') + + self.to_delete.insert(0, blob) + + dataset = Config.CLIENT.dataset( + _make_dataset_name('load_gcs_then_dump')) + + retry_403(dataset.create)() + self.to_delete.append(dataset) + + table = dataset.table(table_name) + self.to_delete.insert(0, table) + + job = Config.CLIENT.load_table_from_storage( + 'bq_load_storage_test_' + local_id, table, gs_url) + job.autodetect = True + + job.begin() + + # Allow for 90 seconds of "warm up" before rows visible. See + # https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability + # 8 tries -> 1 + 2 + 4 + 8 + 16 + 32 + 64 = 127 seconds + retry = RetryInstanceState(_job_done, max_tries=8) + retry(job.reload)() + + table.reload() + field_name = SchemaField( + u'Full_Name', u'string', u'NULLABLE', None, ()) + field_age = SchemaField(u'Age', u'integer', u'NULLABLE', None, ()) + self.assertEqual(table.schema, [field_name, field_age]) + + actual_rows = self._fetch_single_page(table) + by_age = operator.itemgetter(1) + self.assertEqual( + sorted(actual_rows, key=by_age), sorted(rows, key=by_age)) + def test_job_cancel(self): DATASET_NAME = _make_dataset_name('job_cancel') JOB_NAME = 'fetch_' + DATASET_NAME @@ -674,7 +742,6 @@ def test_dbapi_w_standard_sql_types(self): self.assertIsNone(row) def _load_table_for_dml(self, rows, dataset_name, table_name): - import csv from google.cloud._testing import _NamedTemporaryFile dataset = Config.CLIENT.dataset(dataset_name) diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index d2ec7027d5e6..46326441a5e1 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -189,6 +189,11 @@ def _verifyBooleanConfigProperties(self, job, config): config['allowQuotedNewlines']) else: self.assertIsNone(job.allow_quoted_newlines) + if 'autodetect' in config: + self.assertEqual( + job.autodetect, config['autodetect']) + else: + self.assertIsNone(job.autodetect) if 'ignoreUnknownValues' in config: self.assertEqual(job.ignore_unknown_values, config['ignoreUnknownValues']) @@ -277,6 +282,7 @@ def test_ctor(self): # set/read from resource['configuration']['load'] self.assertIsNone(job.allow_jagged_rows) self.assertIsNone(job.allow_quoted_newlines) + self.assertIsNone(job.autodetect) self.assertIsNone(job.create_disposition) self.assertIsNone(job.encoding) self.assertIsNone(job.field_delimiter) @@ -326,6 +332,41 @@ def test_schema_setter(self): job.schema = [full_name, age] self.assertEqual(job.schema, [full_name, age]) + def test_schema_setter_w_autodetect(self): + from google.cloud.bigquery.schema import SchemaField + + client = _Client(self.PROJECT) + table = _Table() + full_name = SchemaField('full_name', 'STRING') + job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) + job.autodetect = False + job.schema = [full_name] + self.assertEqual(job.schema, [full_name]) + + job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) + job.autodetect = True + with self.assertRaises(ValueError): + job.schema = [full_name] + + def test_autodetect_setter_w_schema(self): + from google.cloud.bigquery.schema import SchemaField + + client = _Client(self.PROJECT) + table = _Table() + full_name = SchemaField('full_name', 'STRING') + job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) + + job.autodetect = True + job.schema = [] + self.assertEqual(job.schema, []) + + job.autodetect = False + job.schema = [full_name] + self.assertEqual(job.autodetect, False) + + with self.assertRaises(ValueError): + job.autodetect = True + def test_props_set_by_server(self): import datetime from google.cloud._helpers import UTC @@ -491,6 +532,47 @@ def test_begin_w_bound_client(self): self.assertEqual(req['data'], SENT) self._verifyResourceProperties(job, RESOURCE) + def test_begin_w_autodetect(self): + path = '/projects/{}/jobs'.format(self.PROJECT) + resource = self._makeResource() + resource['configuration']['load']['autodetect'] = True + # Ensure None for missing server-set props + del resource['statistics']['creationTime'] + del resource['etag'] + del resource['selfLink'] + del resource['user_email'] + conn = _Connection(resource) + client = _Client(project=self.PROJECT, connection=conn) + table = _Table() + job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client) + job.autodetect = True + job.begin() + + sent = { + 'jobReference': { + 'projectId': self.PROJECT, + 'jobId': self.JOB_NAME, + }, + 'configuration': { + 'load': { + 'sourceUris': [self.SOURCE1], + 'destinationTable': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_NAME, + 'tableId': self.TABLE_NAME, + }, + 'autodetect': True + }, + }, + } + expected_request = { + 'method': 'POST', + 'path': path, + 'data': sent, + } + self.assertEqual(conn._requested, [expected_request]) + self._verifyResourceProperties(job, resource) + def test_begin_w_alternate_client(self): from google.cloud.bigquery.schema import SchemaField