From 7345159079c980c7aadb791693780f4df064e4b6 Mon Sep 17 00:00:00 2001 From: Matti Remes Date: Wed, 31 May 2017 14:21:19 +0300 Subject: [PATCH] ENH: Add table_schema parameter for user-defined BigQuery schema (#46) --- docs/source/changelog.rst | 3 ++- pandas_gbq/gbq.py | 14 ++++++++++-- pandas_gbq/tests/test_gbq.py | 42 ++++++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 3 deletions(-) diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index dc35067e..5ecef22d 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -1,12 +1,13 @@ Changelog ========= -0.2.1 / 2017-??-?? +0.3.0 / 2017-??-?? ------------------ - :func:`read_gbq` now raises ``QueryTimeout`` if the request exceeds the ``query.timeoutMs`` value specified in the BigQuery configuration. (:issue:`76`) - Environment variable ``PANDAS_GBQ_CREDENTIALS_FILE`` can now be used to override the default location where the BigQuery user account credentials are stored. (:issue:`86`) - BigQuery user account credentials are now stored in an application-specific hidden user folder on the operating system. (:issue:`41`) +- Add support for a passed schema in :func:``to_gbq`` instead inferring the schema from the passed ``DataFrame`` with ``DataFrame.dtypes`` (:issue:`46`) 0.2.0 / 2017-07-24 ------------------ diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 9473b082..2c8a9dbd 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -1017,7 +1017,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, def to_gbq(dataframe, destination_table, project_id, chunksize=10000, verbose=True, reauth=False, if_exists='fail', private_key=None, - auth_local_webserver=False): + auth_local_webserver=False, table_schema=None): """Write a DataFrame to a Google BigQuery table. The main method a user calls to export pandas DataFrame contents to @@ -1075,6 +1075,13 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, .. [console flow] http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console .. versionadded:: 0.2.0 + table_schema : list of dicts + List of BigQuery table fields to which according DataFrame columns + conform to, e.g. `[{'name': 'col1', 'type': 'STRING'},...]`. If + schema is not provided, it will be generated according to dtypes + of DataFrame columns. See BigQuery API documentation on available + names of a field. + .. versionadded:: 0.3.0 """ _test_google_api_imports() @@ -1094,7 +1101,10 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, table = _Table(project_id, dataset_id, reauth=reauth, private_key=private_key) - table_schema = _generate_bq_schema(dataframe) + if not table_schema: + table_schema = _generate_bq_schema(dataframe) + else: + table_schema = dict(fields=table_schema) # If table exists, check if_exists parameter if table.exists(table_id): diff --git a/pandas_gbq/tests/test_gbq.py b/pandas_gbq/tests/test_gbq.py index 62b72dbc..d7808d11 100644 --- a/pandas_gbq/tests/test_gbq.py +++ b/pandas_gbq/tests/test_gbq.py @@ -1312,6 +1312,48 @@ def test_schema_is_subset_fails_if_not_subset(self): assert self.sut.schema_is_subset( dataset, table_name, tested_schema) is False + def test_upload_data_with_valid_user_schema(self): + # Issue #46; tests test scenarios with user-provided + # schemas + df = tm.makeMixedDataFrame() + test_id = "15" + test_schema = [{'name': 'A', 'type': 'FLOAT'}, + {'name': 'B', 'type': 'FLOAT'}, + {'name': 'C', 'type': 'STRING'}, + {'name': 'D', 'type': 'TIMESTAMP'}] + destination_table = self.destination_table + test_id + gbq.to_gbq(df, destination_table, _get_project_id(), + private_key=_get_private_key_path(), + table_schema=test_schema) + dataset, table = destination_table.split('.') + assert self.table.verify_schema(dataset, table, + dict(fields=test_schema)) + + def test_upload_data_with_invalid_user_schema_raises_error(self): + df = tm.makeMixedDataFrame() + test_id = "16" + test_schema = [{'name': 'A', 'type': 'FLOAT'}, + {'name': 'B', 'type': 'FLOAT'}, + {'name': 'C', 'type': 'FLOAT'}, + {'name': 'D', 'type': 'FLOAT'}] + destination_table = self.destination_table + test_id + with tm.assertRaises(gbq.StreamingInsertError): + gbq.to_gbq(df, destination_table, _get_project_id(), + private_key=_get_private_key_path(), + table_schema=test_schema) + + def test_upload_data_with_missing_schema_fields_raises_error(self): + df = tm.makeMixedDataFrame() + test_id = "16" + test_schema = [{'name': 'A', 'type': 'FLOAT'}, + {'name': 'B', 'type': 'FLOAT'}, + {'name': 'C', 'type': 'FLOAT'}] + destination_table = self.destination_table + test_id + with tm.assertRaises(gbq.StreamingInsertError): + gbq.to_gbq(df, destination_table, _get_project_id(), + private_key=_get_private_key_path(), + table_schema=test_schema) + def test_list_dataset(self): dataset_id = self.dataset_prefix + "1" assert dataset_id in self.dataset.datasets()