From 67920764d24cd17be57249707e9163137b3d28e5 Mon Sep 17 00:00:00 2001 From: Matti Remes Date: Wed, 31 May 2017 14:21:19 +0300 Subject: [PATCH] ENH: Add table_schema parameter for user-defined BigQuery schema (#46) --- docs/source/changelog.rst | 3 +-- pandas_gbq/gbq.py | 14 ++++++++++-- pandas_gbq/tests/test_gbq.py | 42 ++++++++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 4 deletions(-) diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 5d1bb98b..04edb419 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -1,12 +1,11 @@ Changelog ========= - 0.3.1 / [TBD] ------------------ - Fix an issue where Unicode couldn't be uploaded in Python 2 (:issue:`93`) - +- Add support for a passed schema in :func:``to_gbq`` instead inferring the schema from the passed ``DataFrame`` with ``DataFrame.dtypes`` (:issue:`46`) 0.3.0 / 2018-01-03 ------------------ diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 67d5ea51..4410b6b1 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -891,7 +891,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, def to_gbq(dataframe, destination_table, project_id, chunksize=10000, verbose=True, reauth=False, if_exists='fail', private_key=None, - auth_local_webserver=False): + auth_local_webserver=False, table_schema=None): """Write a DataFrame to a Google BigQuery table. The main method a user calls to export pandas DataFrame contents to @@ -949,6 +949,13 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, .. [console flow] http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console .. versionadded:: 0.2.0 + table_schema : list of dicts + List of BigQuery table fields to which according DataFrame columns + conform to, e.g. `[{'name': 'col1', 'type': 'STRING'},...]`. If + schema is not provided, it will be generated according to dtypes + of DataFrame columns. See BigQuery API documentation on available + names of a field. + .. versionadded:: 0.3.0 """ _test_google_api_imports() @@ -968,7 +975,10 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, table = _Table(project_id, dataset_id, reauth=reauth, private_key=private_key) - table_schema = _generate_bq_schema(dataframe) + if not table_schema: + table_schema = _generate_bq_schema(dataframe) + else: + table_schema = dict(fields=table_schema) # If table exists, check if_exists parameter if table.exists(table_id): diff --git a/pandas_gbq/tests/test_gbq.py b/pandas_gbq/tests/test_gbq.py index 78928a60..975326ef 100644 --- a/pandas_gbq/tests/test_gbq.py +++ b/pandas_gbq/tests/test_gbq.py @@ -1422,6 +1422,48 @@ def test_schema_is_subset_fails_if_not_subset(self): assert self.sut.schema_is_subset( dataset, table_name, tested_schema) is False + def test_upload_data_with_valid_user_schema(self): + # Issue #46; tests test scenarios with user-provided + # schemas + df = tm.makeMixedDataFrame() + test_id = "15" + test_schema = [{'name': 'A', 'type': 'FLOAT'}, + {'name': 'B', 'type': 'FLOAT'}, + {'name': 'C', 'type': 'STRING'}, + {'name': 'D', 'type': 'TIMESTAMP'}] + destination_table = self.destination_table + test_id + gbq.to_gbq(df, destination_table, _get_project_id(), + private_key=_get_private_key_path(), + table_schema=test_schema) + dataset, table = destination_table.split('.') + assert self.table.verify_schema(dataset, table, + dict(fields=test_schema)) + + def test_upload_data_with_invalid_user_schema_raises_error(self): + df = tm.makeMixedDataFrame() + test_id = "16" + test_schema = [{'name': 'A', 'type': 'FLOAT'}, + {'name': 'B', 'type': 'FLOAT'}, + {'name': 'C', 'type': 'FLOAT'}, + {'name': 'D', 'type': 'FLOAT'}] + destination_table = self.destination_table + test_id + with tm.assertRaises(gbq.StreamingInsertError): + gbq.to_gbq(df, destination_table, _get_project_id(), + private_key=_get_private_key_path(), + table_schema=test_schema) + + def test_upload_data_with_missing_schema_fields_raises_error(self): + df = tm.makeMixedDataFrame() + test_id = "16" + test_schema = [{'name': 'A', 'type': 'FLOAT'}, + {'name': 'B', 'type': 'FLOAT'}, + {'name': 'C', 'type': 'FLOAT'}] + destination_table = self.destination_table + test_id + with tm.assertRaises(gbq.StreamingInsertError): + gbq.to_gbq(df, destination_table, _get_project_id(), + private_key=_get_private_key_path(), + table_schema=test_schema) + def test_list_dataset(self): dataset_id = self.dataset_prefix + "1" assert dataset_id in self.dataset.datasets()