-
Notifications
You must be signed in to change notification settings - Fork 6.5k
/
Copy pathsamples_test.py
356 lines (289 loc) · 12.1 KB
/
samples_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# Copyright 2018 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from google.api_core.retry import Retry
import google.auth
import google.datalab
import IPython
from IPython.terminal import interactiveshell
from IPython.testing import tools
import pytest
# Get default project
_, PROJECT_ID = google.auth.default()
# Set Datalab project ID
context = google.datalab.Context.default()
context.set_project_id(PROJECT_ID)
@pytest.fixture(scope="session")
def ipython_interactive():
config = tools.default_config()
config.TerminalInteractiveShell.simple_prompt = True
shell = interactiveshell.TerminalInteractiveShell.instance(config=config)
return shell
@pytest.fixture
def to_delete():
from google.cloud import bigquery
client = bigquery.Client()
doomed = []
yield doomed
for dataset_id in doomed:
dataset = client.get_dataset(dataset_id)
client.delete_dataset(dataset, delete_contents=True)
def _set_up_ipython(extension):
ip = IPython.get_ipython()
ip.extension_manager.load_extension(extension)
return ip
def _strip_region_tags(sample_text):
"""Remove blank lines and region tags from sample text"""
magic_lines = [
line for line in sample_text.split("\n") if len(line) > 0 and "# [" not in line
]
return "\n".join(magic_lines)
def test_datalab_query_magic(ipython_interactive):
import google.datalab.bigquery as bq
ip = _set_up_ipython("google.datalab.kernel")
sample = """
# [START bigquery_migration_datalab_query_magic]
%%bq query
SELECT word, SUM(word_count) as count
FROM `bigquery-public-data.samples.shakespeare`
GROUP BY word
ORDER BY count ASC
LIMIT 100
# [END bigquery_migration_datalab_query_magic]
"""
ip.run_cell(_strip_region_tags(sample))
results = ip.user_ns["_"] # Last returned object in notebook session
assert isinstance(results, bq.QueryResultsTable)
df = results.to_dataframe()
assert len(df) == 100
@pytest.mark.skip("datalab is deprecated, remove tests in sept 2023")
def test_client_library_query_magic(ipython_interactive):
import pandas
ip = _set_up_ipython("google.cloud.bigquery")
sample = """
# [START bigquery_migration_client_library_query_magic]
%%bigquery
SELECT word, SUM(word_count) as count
FROM `bigquery-public-data.samples.shakespeare`
GROUP BY word
ORDER BY count ASC
LIMIT 100
# [END bigquery_migration_client_library_query_magic]
"""
ip.run_cell(_strip_region_tags(sample))
df = ip.user_ns["_"] # Last returned object in notebook session
assert isinstance(df, pandas.DataFrame)
assert len(df) == 100
@pytest.mark.skip("datalab is deprecated, remove tests in sept 2023")
def test_datalab_query_magic_results_variable(ipython_interactive):
ip = _set_up_ipython("google.datalab.kernel")
sample = """
# [START bigquery_migration_datalab_query_magic_define_query]
%%bq query -n my_query
SELECT name FROM `bigquery-public-data.usa_names.usa_1910_current`
WHERE state = "TX"
LIMIT 100
# [END bigquery_migration_datalab_query_magic_define_query]
"""
ip.run_cell(_strip_region_tags(sample))
sample = """
# [START bigquery_migration_datalab_execute_query]
import google.datalab.bigquery as bq
my_variable = my_query.execute().result().to_dataframe()
# [END bigquery_migration_datalab_execute_query]
"""
ip.run_cell(_strip_region_tags(sample))
variable_name = "my_variable"
assert variable_name in ip.user_ns # verify that variable exists
my_variable = ip.user_ns[variable_name]
assert len(my_variable) == 100
ip.user_ns.pop(variable_name) # clean up variable
def test_client_library_query_magic_results_variable(ipython_interactive):
ip = _set_up_ipython("google.cloud.bigquery")
sample = """
# [START bigquery_migration_client_library_query_magic_results_variable]
%%bigquery my_variable
SELECT name FROM `bigquery-public-data.usa_names.usa_1910_current`
WHERE state = "TX"
LIMIT 100
# [END bigquery_migration_client_library_query_magic_results_variable]
"""
ip.run_cell(_strip_region_tags(sample))
variable_name = "my_variable"
assert variable_name in ip.user_ns # verify that variable exists
my_variable = ip.user_ns[variable_name]
assert len(my_variable) == 100
ip.user_ns.pop(variable_name) # clean up variable
@pytest.mark.skip("datalab is deprecated, remove tests in sept 2023")
def test_datalab_list_tables_magic(ipython_interactive):
ip = _set_up_ipython("google.datalab.kernel")
sample = """
# [START bigquery_migration_datalab_list_tables_magic]
%bq tables list --dataset bigquery-public-data.samples
# [END bigquery_migration_datalab_list_tables_magic]
"""
ip.run_cell(_strip_region_tags(sample))
# Retrieves last returned object in notebook session
html_element = ip.user_ns["_"]
assert "shakespeare" in html_element.data
@pytest.mark.skip("datalab is deprecated, remove tests in sept 2023")
def test_datalab_query():
# [START bigquery_migration_datalab_query]
import google.datalab.bigquery as bq
sql = """
SELECT name FROM `bigquery-public-data.usa_names.usa_1910_current`
WHERE state = "TX"
LIMIT 100
"""
df = bq.Query(sql).execute().result().to_dataframe()
# [END bigquery_migration_datalab_query]
assert len(df) == 100
def test_client_library_query():
# [START bigquery_migration_client_library_query]
from google.cloud import bigquery
client = bigquery.Client()
sql = """
SELECT name FROM `bigquery-public-data.usa_names.usa_1910_current`
WHERE state = "TX"
LIMIT 100
"""
df = client.query(sql).to_dataframe()
# [END bigquery_migration_client_library_query]
assert len(df) == 100
@pytest.mark.skip("datalab is deprecated, remove tests in sept 2023")
def test_datalab_load_table_from_gcs_csv(to_delete):
# [START bigquery_migration_datalab_load_table_from_gcs_csv]
import google.datalab.bigquery as bq
# Create the dataset
dataset_id = "import_sample"
# [END bigquery_migration_datalab_load_table_from_gcs_csv]
# Use unique dataset ID to avoid collisions when running tests
dataset_id = f"test_dataset_{int(time.time() * 1000)}"
to_delete.append(dataset_id)
# [START bigquery_migration_datalab_load_table_from_gcs_csv]
bq.Dataset(dataset_id).create()
# Create the table
schema = [
{"name": "name", "type": "STRING"},
{"name": "post_abbr", "type": "STRING"},
]
table = bq.Table(f"{dataset_id}.us_states").create(schema=schema)
table.load(
"gs://cloud-samples-data/bigquery/us-states/us-states.csv",
mode="append",
source_format="csv",
csv_options=bq.CSVOptions(skip_leading_rows=1),
) # Waits for the job to complete
# [END bigquery_migration_datalab_load_table_from_gcs_csv]
assert table.length == 50
def test_client_library_load_table_from_gcs_csv(to_delete):
# [START bigquery_migration_client_library_load_table_from_gcs_csv]
from google.cloud import bigquery
client = bigquery.Client(location="US")
# Create the dataset
dataset_id = "import_sample"
# [END bigquery_migration_client_library_load_table_from_gcs_csv]
# Use unique dataset ID to avoid collisions when running tests
dataset_id = f"test_dataset_{int(time.time() * 1000)}"
to_delete.append(dataset_id)
# [START bigquery_migration_client_library_load_table_from_gcs_csv]
dataset = client.create_dataset(dataset_id)
# Create the table
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("post_abbr", "STRING"),
],
skip_leading_rows=1,
# The source format defaults to CSV, so the line below is optional.
source_format=bigquery.SourceFormat.CSV,
)
load_job = client.load_table_from_uri(
"gs://cloud-samples-data/bigquery/us-states/us-states.csv",
dataset.table("us_states"),
job_config=job_config,
)
load_job.result() # Waits for table load to complete.
# [END bigquery_migration_client_library_load_table_from_gcs_csv]
table = client.get_table(dataset.table("us_states"))
assert table.num_rows == 50
@pytest.mark.skip("datalab is deprecated, remove tests in sept 2023")
def test_datalab_load_table_from_dataframe(to_delete):
"""Wrap test with retries to handle transient errors"""
@Retry()
def datalab_load_table_from_dataframe(to_delete):
# [START bigquery_migration_datalab_load_table_from_dataframe]
import google.datalab.bigquery as bq
import pandas
# Create the dataset
dataset_id = "import_sample"
# [END bigquery_migration_datalab_load_table_from_dataframe]
# Use unique dataset ID to avoid collisions when running tests
dataset_id = f"test_dataset_{int(time.time() * 1000)}"
to_delete.append(dataset_id)
# [START bigquery_migration_datalab_load_table_from_dataframe]
bq.Dataset(dataset_id).create()
# Create the table and load the data
dataframe = pandas.DataFrame(
[
{"title": "The Meaning of Life", "release_year": 1983},
{"title": "Monty Python and the Holy Grail", "release_year": 1975},
{"title": "Life of Brian", "release_year": 1979},
{
"title": "And Now for Something Completely Different",
"release_year": 1971,
},
]
)
schema = bq.Schema.from_data(dataframe)
table = bq.Table(f"{dataset_id}.monty_python").create(schema=schema)
table.insert(dataframe) # Starts steaming insert of data
# [END bigquery_migration_datalab_load_table_from_dataframe]
# The Datalab library uses tabledata().insertAll() to load data from
# pandas DataFrames to tables. Because it can take a long time for the rows
# to be available in the table, this test does not assert on the number of
# rows in the destination table after the job is run. If errors are
# encountered during the insertion, this test will fail.
# See https://cloud.google.com/bigquery/streaming-data-into-bigquery
datalab_load_table_from_dataframe(to_delete)
def test_client_library_load_table_from_dataframe(to_delete):
# [START bigquery_migration_client_library_load_table_from_dataframe]
import pandas
from google.cloud import bigquery
client = bigquery.Client(location="US")
dataset_id = "import_sample"
# [END bigquery_migration_client_library_load_table_from_dataframe]
# Use unique dataset ID to avoid collisions when running tests
dataset_id = f"test_dataset_{int(time.time() * 1000)}"
to_delete.append(dataset_id)
# [START bigquery_migration_client_library_load_table_from_dataframe]
dataset = client.create_dataset(dataset_id)
# Create the table and load the data
dataframe = pandas.DataFrame(
[
{"title": "The Meaning of Life", "release_year": 1983},
{"title": "Monty Python and the Holy Grail", "release_year": 1975},
{"title": "Life of Brian", "release_year": 1979},
{
"title": "And Now for Something Completely Different",
"release_year": 1971,
},
]
)
table_ref = dataset.table("monty_python")
load_job = client.load_table_from_dataframe(dataframe, table_ref)
load_job.result() # Waits for table load to complete.
# [END bigquery_migration_client_library_load_table_from_dataframe]
table = client.get_table(table_ref)
assert table.num_rows == 4