Skip to content

Commit

Permalink
ENH: Add table_schema parameter for user-defined BigQuery schema (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
mremes authored and Matti Remes committed Jan 24, 2018
1 parent 336fd78 commit 6792076
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 4 deletions.
3 changes: 1 addition & 2 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
Changelog
=========


0.3.1 / [TBD]
------------------

- Fix an issue where Unicode couldn't be uploaded in Python 2 (:issue:`93`)

- Add support for a passed schema in :func:``to_gbq`` instead inferring the schema from the passed ``DataFrame`` with ``DataFrame.dtypes`` (:issue:`46`)

0.3.0 / 2018-01-03
------------------
Expand Down
14 changes: 12 additions & 2 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,

def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
verbose=True, reauth=False, if_exists='fail', private_key=None,
auth_local_webserver=False):
auth_local_webserver=False, table_schema=None):
"""Write a DataFrame to a Google BigQuery table.
The main method a user calls to export pandas DataFrame contents to
Expand Down Expand Up @@ -949,6 +949,13 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
.. [console flow]
http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console
.. versionadded:: 0.2.0
table_schema : list of dicts
List of BigQuery table fields to which according DataFrame columns
conform to, e.g. `[{'name': 'col1', 'type': 'STRING'},...]`. If
schema is not provided, it will be generated according to dtypes
of DataFrame columns. See BigQuery API documentation on available
names of a field.
.. versionadded:: 0.3.0
"""

_test_google_api_imports()
Expand All @@ -968,7 +975,10 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
table = _Table(project_id, dataset_id, reauth=reauth,
private_key=private_key)

table_schema = _generate_bq_schema(dataframe)
if not table_schema:
table_schema = _generate_bq_schema(dataframe)
else:
table_schema = dict(fields=table_schema)

# If table exists, check if_exists parameter
if table.exists(table_id):
Expand Down
42 changes: 42 additions & 0 deletions pandas_gbq/tests/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,48 @@ def test_schema_is_subset_fails_if_not_subset(self):
assert self.sut.schema_is_subset(
dataset, table_name, tested_schema) is False

def test_upload_data_with_valid_user_schema(self):
# Issue #46; tests test scenarios with user-provided
# schemas
df = tm.makeMixedDataFrame()
test_id = "15"
test_schema = [{'name': 'A', 'type': 'FLOAT'},
{'name': 'B', 'type': 'FLOAT'},
{'name': 'C', 'type': 'STRING'},
{'name': 'D', 'type': 'TIMESTAMP'}]
destination_table = self.destination_table + test_id
gbq.to_gbq(df, destination_table, _get_project_id(),
private_key=_get_private_key_path(),
table_schema=test_schema)
dataset, table = destination_table.split('.')
assert self.table.verify_schema(dataset, table,
dict(fields=test_schema))

def test_upload_data_with_invalid_user_schema_raises_error(self):
df = tm.makeMixedDataFrame()
test_id = "16"
test_schema = [{'name': 'A', 'type': 'FLOAT'},
{'name': 'B', 'type': 'FLOAT'},
{'name': 'C', 'type': 'FLOAT'},
{'name': 'D', 'type': 'FLOAT'}]
destination_table = self.destination_table + test_id
with tm.assertRaises(gbq.StreamingInsertError):
gbq.to_gbq(df, destination_table, _get_project_id(),
private_key=_get_private_key_path(),
table_schema=test_schema)

def test_upload_data_with_missing_schema_fields_raises_error(self):
df = tm.makeMixedDataFrame()
test_id = "16"
test_schema = [{'name': 'A', 'type': 'FLOAT'},
{'name': 'B', 'type': 'FLOAT'},
{'name': 'C', 'type': 'FLOAT'}]
destination_table = self.destination_table + test_id
with tm.assertRaises(gbq.StreamingInsertError):
gbq.to_gbq(df, destination_table, _get_project_id(),
private_key=_get_private_key_path(),
table_schema=test_schema)

def test_list_dataset(self):
dataset_id = self.dataset_prefix + "1"
assert dataset_id in self.dataset.datasets()
Expand Down

0 comments on commit 6792076

Please sign in to comment.