Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When appending to a table, load if the dataframe contains a subset of the existing schema #24

Merged
merged 9 commits into from
Jun 13, 2017
4 changes: 3 additions & 1 deletion docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ Changelog

- Resolve issue where the optional ``--noauth_local_webserver`` command line argument would not be propagated during the authentication process. (:issue:`35`)
- Drop support for Python 3.4 (:issue:`40`)
- The dataframe passed to ```.to_gbq(...., if_exists='append')``` needs to contain only a subset of the fields in the BigQuery schema. To support this, ```schema_is_subset``` tests whether a local dataframe is a subset of the BigQuery schema and ```schema``` returns the remote schema. (:issue:`24`)


0.1.6 / 2017-05-03
------------------

- All gbq errors will simply be subclasses of ``ValueError`` and no longer inherit from the deprecated ``PandasError``.

0.1.4 / 2017-03-17
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you change this? there isn't a 1.5 release

0.1.5 / 2017-04-20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to latest (0.1.7)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the issue reference is misformatted, see the other entries

------------------

- ``InvalidIndexColumn`` will be raised instead of ``InvalidColumnOrder`` in ``read_gbq`` when the index column specified does not exist in the BigQuery schema. (:issue:`6`)
Expand Down
2 changes: 1 addition & 1 deletion docs/source/writing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ a ``TableCreationError`` if the destination table already exists.

If the ``if_exists`` argument is set to ``'append'``, the destination dataframe will
be written to the table using the defined table schema and column types. The
dataframe must match the destination table in structure and data types.
dataframe must contain fields (matching name and type) currently in the destination.
If the ``if_exists`` argument is set to ``'replace'``, and the existing table has a
different schema, a delay of 2 minutes will be forced to ensure that the new schema
has propagated in the Google environment. See
Expand Down
88 changes: 80 additions & 8 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,25 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize):

self._print("\n")

def verify_schema(self, dataset_id, table_id, schema):
def schema(self, dataset_id, table_id):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you update the to_gbq doc-string as well (with expl of the new behavior of if_exists)

"""Retrieve the schema of the table

Obtain from BigQuery the field names and field types
for the table defined by the parameters

Parameters
----------
dataset_id : str
Name of the BigQuery dataset for the table
table_id : str
Name of the BigQuery table

Returns
-------
list of dicts
Fields representing the schema
"""

try:
from googleapiclient.errors import HttpError
except:
Expand All @@ -573,15 +591,67 @@ def verify_schema(self, dataset_id, table_id, schema):
'type': field_remote['type']}
for field_remote in remote_schema['fields']]

fields_remote = set([json.dumps(field_remote)
for field_remote in remote_fields])
fields_local = set(json.dumps(field_local)
for field_local in schema['fields'])

return fields_remote == fields_local
return remote_fields
except HttpError as ex:
self.process_http_error(ex)

def verify_schema(self, dataset_id, table_id, schema):
"""Indicate whether schemas match exactly

Compare the BigQuery table identified in the parameters with
the schema passed in and indicate whether all fields in the former
are present in the latter. Order is not considered.

Parameters
----------
dataset_id :str
Name of the BigQuery dataset for the table
table_id : str
Name of the BigQuery table
schema : list(dict)
Schema for comparison. Each item should have
a 'name' and a 'type'

Returns
-------
bool
Whether the schemas match
"""

fields_remote = sorted(self.schema(dataset_id, table_id),
key=lambda x: x['name'])
fields_local = sorted(schema['fields'], key=lambda x: x['name'])

return fields_remote == fields_local

def schema_is_subset(self, dataset_id, table_id, schema):
"""Indicate whether the schema to be uploaded is a subset

Compare the BigQuery table identified in the parameters with
the schema passed in and indicate whether a subset of the fields in
the former are present in the latter. Order is not considered.

Parameters
----------
dataset_id : str
Name of the BigQuery dataset for the table
table_id : str
Name of the BigQuery table
schema : list(dict)
Schema for comparison. Each item should have
a 'name' and a 'type'

Returns
-------
bool
Whether the passed schema is a subset
"""

fields_remote = self.schema(dataset_id, table_id)
fields_local = schema['fields']

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these routines should prob just take the result of self.schema(dataset_id, table_id), yes?

return all(field in fields_remote for field in fields_local)

def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
delay = 0

Expand Down Expand Up @@ -844,7 +914,9 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
connector.delete_and_recreate_table(
dataset_id, table_id, table_schema)
elif if_exists == 'append':
if not connector.verify_schema(dataset_id, table_id, table_schema):
if not connector.schema_is_subset(dataset_id,
table_id,
table_schema):
raise InvalidSchema("Please verify that the structure and "
"data types in the DataFrame match the "
"schema of the destination table.")
Expand Down
85 changes: 85 additions & 0 deletions pandas_gbq/tests/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,31 @@ def test_upload_data_if_table_exists_append(self):
_get_project_id(), if_exists='append',
private_key=_get_private_key_path())

def test_upload_subset_columns_if_table_exists_append(self):
# Issue 24: Upload is succesful if dataframe has columns
# which are a subset of the current schema
test_id = "16"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment with the issue ref number (this PR number is ok)

test_size = 10
df = make_mixed_dataframe_v2(test_size)
df_subset_cols = df.iloc[:, :2]

# Initialize table with sample data
gbq.to_gbq(df, self.destination_table + test_id, _get_project_id(),
chunksize=10000, private_key=_get_private_key_path())

# Test the if_exists parameter with value 'append'
gbq.to_gbq(df_subset_cols,
self.destination_table + test_id, _get_project_id(),
if_exists='append', private_key=_get_private_key_path())

sleep(30) # <- Curses Google!!!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a function wait_for_job which we copied from the Google docs; I'll put PR it in down the road (it relies on google.cloud.bigquery, so need to move some other code onto that)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls update as @MaximilianR suggests (look at other tests to see how this is done)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaximilianR hmm, wait_for_job must be old.....


result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
.format(self.destination_table + test_id),
project_id=_get_project_id(),
private_key=_get_private_key_path())
assert result['num_rows'][0] == test_size * 2

def test_upload_data_if_table_exists_replace(self):
test_id = "4"
test_size = 10
Expand Down Expand Up @@ -1258,6 +1283,66 @@ def test_verify_schema_ignores_field_mode(self):
assert self.sut.verify_schema(
self.dataset_prefix + "1", TABLE_ID + test_id, test_schema_2)

def test_retrieve_schema(self):
# Issue #24 schema function returns the schema in biquery
test_id = "15"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment of issue number

test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'},
{'name': 'B', 'type': 'FLOAT'},
{'name': 'C', 'type': 'STRING'},
{'name': 'D', 'type': 'TIMESTAMP'}]}

self.table.create(TABLE_ID + test_id, test_schema)
actual = self.sut.schema(self.dataset_prefix + "1", TABLE_ID + test_id)
expected = test_schema['fields']
assert expected == actual, 'Expected schema used to create table'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some tests that exercise schema_is_subset

def test_schema_is_subset_passes_if_subset(self):
# Issue #24 schema_is_subset indicates whether the schema of the
# dataframe is a subset of the schema of the bigquery table
test_id = '16'

table_name = TABLE_ID + test_id
dataset = self.dataset_prefix + '1'

table_schema = {'fields': [{'name': 'A',
'type': 'FLOAT'},
{'name': 'B',
'type': 'FLOAT'},
{'name': 'C',
'type': 'STRING'}]}
tested_schema = {'fields': [{'name': 'A',
'type': 'FLOAT'},
{'name': 'B',
'type': 'FLOAT'}]}

self.table.create(table_name, table_schema)

assert self.sut.schema_is_subset(
dataset, table_name, tested_schema) is True

def test_schema_is_subset_fails_if_not_subset(self):
# For pull request #24
test_id = '17'

table_name = TABLE_ID + test_id
dataset = self.dataset_prefix + '1'

table_schema = {'fields': [{'name': 'A',
'type': 'FLOAT'},
{'name': 'B',
'type': 'FLOAT'},
{'name': 'C',
'type': 'STRING'}]}
tested_schema = {'fields': [{'name': 'A',
'type': 'FLOAT'},
{'name': 'C',
'type': 'FLOAT'}]}

self.table.create(table_name, table_schema)

assert self.sut.schema_is_subset(
dataset, table_name, tested_schema) is False

def test_list_dataset(self):
dataset_id = self.dataset_prefix + "1"
assert dataset_id in self.dataset.datasets()
Expand Down