Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Add table_schema parameter for user-defined BigQuery schema #46

Merged
merged 5 commits into from
Jan 28, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Changelog
------------------

- Fix an issue where Unicode couldn't be uploaded in Python 2 (:issue:`93`)
- Add support for a passed schema in :func:``to_gbq`` instead inferring the schema from the passed ``DataFrame`` with ``DataFrame.dtypes`` (:issue:`46`)


0.3.0 / 2018-01-03
Expand Down
14 changes: 12 additions & 2 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,

def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
verbose=True, reauth=False, if_exists='fail', private_key=None,
auth_local_webserver=False):
auth_local_webserver=False, table_schema=None):
"""Write a DataFrame to a Google BigQuery table.

The main method a user calls to export pandas DataFrame contents to
Expand Down Expand Up @@ -949,6 +949,13 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
.. [console flow]
http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console
.. versionadded:: 0.2.0
table_schema : list of dicts
List of BigQuery table fields to which according DataFrame columns
conform to, e.g. `[{'name': 'col1', 'type': 'STRING'},...]`. If
schema is not provided, it will be generated according to dtypes
of DataFrame columns. See BigQuery API documentation on available
names of a field.
.. versionadded:: 0.3.0
"""

_test_google_api_imports()
Expand All @@ -968,7 +975,10 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
table = _Table(project_id, dataset_id, reauth=reauth,
private_key=private_key)

table_schema = _generate_bq_schema(dataframe)
if not table_schema:
table_schema = _generate_bq_schema(dataframe)
else:
table_schema = dict(fields=table_schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assume validation is now up to BQ. can you test this though?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current implementation will throw StreamingInsetError after a chunk is done (see tests) along printing error trace from the BQ API. Which is OK.


# If table exists, check if_exists parameter
if table.exists(table_id):
Expand Down
42 changes: 42 additions & 0 deletions pandas_gbq/tests/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,48 @@ def test_schema_is_subset_fails_if_not_subset(self):
assert self.sut.schema_is_subset(
dataset, table_name, tested_schema) is False

def test_upload_data_with_valid_user_schema(self):
# Issue #46; tests test scenarios with user-provided
# schemas
df = tm.makeMixedDataFrame()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the issue number as a comment

test_id = "15"
test_schema = [{'name': 'A', 'type': 'FLOAT'},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a chance this might fail with version 0.29.0 of google-cloud-bigquery due to googleapis/google-cloud-python#4456

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generated schemas do not include the mode property in the fields either. So this should be fine.

{'name': 'B', 'type': 'FLOAT'},
{'name': 'C', 'type': 'STRING'},
{'name': 'D', 'type': 'TIMESTAMP'}]
destination_table = self.destination_table + test_id
gbq.to_gbq(df, destination_table, _get_project_id(),
private_key=_get_private_key_path(),
table_schema=test_schema)
dataset, table = destination_table.split('.')
assert self.table.verify_schema(dataset, table,
dict(fields=test_schema))

def test_upload_data_with_invalid_user_schema_raises_error(self):
df = tm.makeMixedDataFrame()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also tests with missing keys in the schema

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test added.

test_id = "16"
test_schema = [{'name': 'A', 'type': 'FLOAT'},
{'name': 'B', 'type': 'FLOAT'},
{'name': 'C', 'type': 'FLOAT'},
{'name': 'D', 'type': 'FLOAT'}]
destination_table = self.destination_table + test_id
with tm.assertRaises(gbq.StreamingInsertError):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamingInsertError was removed in 0.3.0. to_gbq now creates a load job.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with the generic error. Errors need a better hierarchy in this module though.

gbq.to_gbq(df, destination_table, _get_project_id(),
private_key=_get_private_key_path(),
table_schema=test_schema)

def test_upload_data_with_missing_schema_fields_raises_error(self):
df = tm.makeMixedDataFrame()
test_id = "16"
test_schema = [{'name': 'A', 'type': 'FLOAT'},
{'name': 'B', 'type': 'FLOAT'},
{'name': 'C', 'type': 'FLOAT'}]
destination_table = self.destination_table + test_id
with tm.assertRaises(gbq.StreamingInsertError):
gbq.to_gbq(df, destination_table, _get_project_id(),
private_key=_get_private_key_path(),
table_schema=test_schema)

def test_list_dataset(self):
dataset_id = self.dataset_prefix + "1"
assert dataset_id in self.dataset.datasets()
Expand Down