From e7bfca333382d7daf6b61742da26bc3a9a37ab5e Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Mon, 2 Sep 2019 16:53:52 +0200 Subject: [PATCH 1/3] Add client.insert_rows_from_dataframe() method --- bigquery/google/cloud/bigquery/client.py | 51 ++++++++++++++++ bigquery/tests/system.py | 66 +++++++++++++++++++++ bigquery/tests/unit/test_client.py | 75 ++++++++++++++++++++++++ 3 files changed, 192 insertions(+) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index cc53ffa22985..3383463b5953 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -15,6 +15,7 @@ """Client for interacting with the Google BigQuery API.""" from __future__ import absolute_import +from __future__ import division try: from collections import abc as collections_abc @@ -25,7 +26,9 @@ import functools import gzip import io +import itertools import json +import math import os import tempfile import uuid @@ -2080,6 +2083,54 @@ def insert_rows(self, table, rows, selected_fields=None, **kwargs): return self.insert_rows_json(table, json_rows, **kwargs) + def insert_rows_from_dataframe( + self, table, dataframe, selected_fields=None, chunk_size=500, **kwargs + ): + """Insert rows into a table from a dataframe via the streaming API. + + Args: + table (Union[ \ + :class:`~google.cloud.bigquery.table.Table`, \ + :class:`~google.cloud.bigquery.table.TableReference`, \ + str, \ + ]): + The destination table for the row data, or a reference to it. + dataframe (pandas.DataFrame): + A :class:`~pandas.DataFrame` containing the data to load. + selected_fields (Sequence[ \ + :class:`~google.cloud.bigquery.schema.SchemaField`, \ + ]): + The fields to return. Required if ``table`` is a + :class:`~google.cloud.bigquery.table.TableReference`. + chunk_size (int): + The number of rows to stream in a single chunk. Must be positive. + kwargs (dict): + Keyword arguments to + :meth:`~google.cloud.bigquery.client.Client.insert_rows_json`. + + Returns: + Sequence[Sequence[Mappings]]: + A list with insert errors for each insert chunk. Each element + is a list containing one mapping per row with insert errors: + the "index" key identifies the row, and the "errors" key + contains a list of the mappings describing one or more problems + with the row. + + Raises: + ValueError: if table's schema is not set + """ + insert_results = [] + + chunk_count = int(math.ceil(len(dataframe) / chunk_size)) + rows_iter = (row._asdict() for row in dataframe.itertuples(index=False)) + + for _ in range(chunk_count): + rows_chunk = itertools.islice(rows_iter, chunk_size) + result = self.insert_rows(table, rows_chunk, selected_fields, **kwargs) + insert_results.append(result) + + return insert_results + def insert_rows_json( self, table, diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 3593e1ecb609..ca3ae4958a6e 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -1951,6 +1951,72 @@ def test_query_results_to_dataframe_w_bqstorage(self): if not row[col] is None: self.assertIsInstance(row[col], exp_datatypes[col]) + def test_insert_rows_from_dataframe(self): + SF = bigquery.SchemaField + schema = [ + SF("float_col", "FLOAT", mode="REQUIRED"), + SF("int_col", "INTEGER", mode="REQUIRED"), + SF("bool_col", "BOOLEAN", mode="REQUIRED"), + SF("string_col", "STRING", mode="NULLABLE"), + ] + + dataframe = pandas.DataFrame( + [ + { + "float_col": 1.11, + "bool_col": True, + "string_col": "my string", + "int_col": 10, + }, + { + "float_col": 2.22, + "bool_col": False, + "string_col": "another string", + "int_col": 20, + }, + { + "float_col": 3.33, + "bool_col": False, + "string_col": "another string", + "int_col": 30, + }, + { + "float_col": 4.44, + "bool_col": True, + "string_col": "another string", + "int_col": 40, + }, + { + "float_col": 5.55, + "bool_col": False, + "string_col": "another string", + "int_col": 50, + }, + ] + ) + + table_id = "test_table" + dataset = self.temp_dataset(_make_dataset_id("issue_7553")) + table_arg = Table(dataset.table(table_id), schema=schema) + table = retry_403(Config.CLIENT.create_table)(table_arg) + self.to_delete.insert(0, table) + + Config.CLIENT.insert_rows_from_dataframe(table, dataframe, chunk_size=3) + + retry = RetryResult(_has_rows, max_tries=8) + rows = retry(self._fetch_single_page)(table) + + sorted_rows = sorted(rows, key=operator.attrgetter("int_col")) + row_tuples = [r.values() for r in sorted_rows] + expected = [tuple(data_row) for data_row in dataframe.itertuples(index=False)] + + assert len(row_tuples) == len(expected) + + for row, expected_row in zip(row_tuples, expected): + six.assertCountEqual( + self, row, expected_row + ) # column order does not matter + def test_insert_rows_nested_nested(self): # See #2951 SF = bigquery.SchemaField diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index f31d8587322b..06917cf4cac9 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -4472,6 +4472,81 @@ def test_insert_rows_w_numeric(self): data=sent, ) + def test_insert_rows_from_dataframe(self): + from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.table import Table + + API_PATH = "/projects/{}/datasets/{}/tables/{}/insertAll".format( + self.PROJECT, self.DS_ID, self.TABLE_REF.table_id + ) + + dataframe = pandas.DataFrame( + [ + {"name": u"Little One", "age": 10, "adult": False}, + {"name": u"Young Gun", "age": 20, "adult": True}, + {"name": u"Dad", "age": 30, "adult": True}, + {"name": u"Stranger", "age": 40, "adult": True}, + ] + ) + + # create client + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection({}, {}) + + # create table + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + SchemaField("adult", "BOOLEAN", mode="REQUIRED"), + ] + table = Table(self.TABLE_REF, schema=schema) + + with mock.patch("uuid.uuid4", side_effect=map(str, range(len(dataframe)))): + error_info = client.insert_rows_from_dataframe( + table, dataframe, chunk_size=3 + ) + + self.assertEqual(len(error_info), 2) + for chunk_errors in error_info: + assert chunk_errors == [] + + EXPECTED_SENT_DATA = [ + { + "rows": [ + { + "insertId": "0", + "json": {"name": "Little One", "age": "10", "adult": "false"}, + }, + { + "insertId": "1", + "json": {"name": "Young Gun", "age": "20", "adult": "true"}, + }, + { + "insertId": "2", + "json": {"name": "Dad", "age": "30", "adult": "true"}, + }, + ] + }, + { + "rows": [ + { + "insertId": "3", + "json": {"name": "Stranger", "age": "40", "adult": "true"}, + } + ] + }, + ] + + actual_calls = conn.api_request.call_args_list + + for call, expected_data in six.moves.zip_longest( + actual_calls, EXPECTED_SENT_DATA + ): + expected_call = mock.call(method="POST", path=API_PATH, data=expected_data) + assert call == expected_call + def test_insert_rows_json(self): from google.cloud.bigquery.table import Table, SchemaField from google.cloud.bigquery.dataset import DatasetReference From 0779509f593f4360d16595123a6b36f0a92ab358 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 4 Sep 2019 12:59:32 +0200 Subject: [PATCH 2/3] Avoid using nametuples for dataframe row iteration dataframe.itertuples() returns plain tuples under certain conditions, thus this commit enforces always returning plain tuples, and constructs the row dictionary manually from each tuple. --- bigquery/google/cloud/bigquery/client.py | 5 ++- bigquery/tests/unit/test_client.py | 47 ++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 3383463b5953..a3c4189691ed 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -2122,7 +2122,10 @@ def insert_rows_from_dataframe( insert_results = [] chunk_count = int(math.ceil(len(dataframe) / chunk_size)) - rows_iter = (row._asdict() for row in dataframe.itertuples(index=False)) + rows_iter = ( + dict(six.moves.zip(dataframe.columns, row)) + for row in dataframe.itertuples(index=False, name=None) + ) for _ in range(chunk_count): rows_chunk = itertools.islice(rows_iter, chunk_size) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 06917cf4cac9..2ec1d99034b9 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -4547,6 +4547,53 @@ def test_insert_rows_from_dataframe(self): expected_call = mock.call(method="POST", path=API_PATH, data=expected_data) assert call == expected_call + def test_insert_rows_from_dataframe_many_columns(self): + from google.cloud.bigquery.table import SchemaField + from google.cloud.bigquery.table import Table + + API_PATH = "/projects/{}/datasets/{}/tables/{}/insertAll".format( + self.PROJECT, self.DS_ID, self.TABLE_REF.table_id + ) + N_COLUMNS = 256 # should be >= 256 + + dataframe = pandas.DataFrame( + [{"foo_{}".format(i): "bar_{}".format(i) for i in range(N_COLUMNS)}] + ) + + # create client + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection({}, {}) + + # create table + schema = [SchemaField("foo_{}".format(i), "STRING") for i in range(N_COLUMNS)] + table = Table(self.TABLE_REF, schema=schema) + + with mock.patch("uuid.uuid4", side_effect=map(str, range(len(dataframe)))): + error_info = client.insert_rows_from_dataframe( + table, dataframe, chunk_size=3 + ) + + assert len(error_info) == 1 + assert error_info[0] == [] + + EXPECTED_SENT_DATA = { + "rows": [ + { + "insertId": "0", + "json": { + "foo_{}".format(i): "bar_{}".format(i) for i in range(N_COLUMNS) + }, + } + ] + } + expected_call = mock.call(method="POST", path=API_PATH, data=EXPECTED_SENT_DATA) + + actual_calls = conn.api_request.call_args_list + assert len(actual_calls) == 1 + assert actual_calls[0] == expected_call + def test_insert_rows_json(self): from google.cloud.bigquery.table import Table, SchemaField from google.cloud.bigquery.dataset import DatasetReference From 2dba8cd722d5e7f32d61d6c77ad57ae60e2218dd Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 4 Sep 2019 14:02:20 +0200 Subject: [PATCH 3/3] Skip insert_rows_from_dataframe tests if no Pandas --- bigquery/tests/system.py | 1 + bigquery/tests/unit/test_client.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index ca3ae4958a6e..813aa1d9442c 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -1951,6 +1951,7 @@ def test_query_results_to_dataframe_w_bqstorage(self): if not row[col] is None: self.assertIsInstance(row[col], exp_datatypes[col]) + @unittest.skipIf(pandas is None, "Requires `pandas`") def test_insert_rows_from_dataframe(self): SF = bigquery.SchemaField schema = [ diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 2ec1d99034b9..4cdc413a3263 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -4472,6 +4472,7 @@ def test_insert_rows_w_numeric(self): data=sent, ) + @unittest.skipIf(pandas is None, "Requires `pandas`") def test_insert_rows_from_dataframe(self): from google.cloud.bigquery.table import SchemaField from google.cloud.bigquery.table import Table @@ -4547,6 +4548,7 @@ def test_insert_rows_from_dataframe(self): expected_call = mock.call(method="POST", path=API_PATH, data=expected_data) assert call == expected_call + @unittest.skipIf(pandas is None, "Requires `pandas`") def test_insert_rows_from_dataframe_many_columns(self): from google.cloud.bigquery.table import SchemaField from google.cloud.bigquery.table import Table