-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This follows a similar pattern to other output tests by mocking the end-to-end API calls. While this was a bit harder for the BigQuery client, in the absence of functional tests the advantages are: - It clarifies what the error behaviours are. - It thoroughly verifies the calls are valid. The documentation is a bit fragmented: - The old Grafana data source has more extensive guidance on how to setup the service user for querying data [^1]. - I can't find a clear reference to the environment variable for the credentials file path - just the error message. Note that v3 of the Google Cloud BigQuery client won't install on a Raspberry Pi right now, so we need to pin it to a lower version [^2]. [^1]: https://grafana.com/grafana/plugins/doitintl-bigquery-datasource/ [^2]: googleapis/python-bigquery#1142 (comment)
- Loading branch information
1 parent
58f6fbe
commit 9627928
Showing
8 changed files
with
208 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,3 +3,4 @@ __pycache__/ | |
*.egg-info | ||
tmp | ||
docs/sphinx/autoapi/ | ||
*.json |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
google-cloud-bigquery < 3.0.0dev |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
httpretty |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
""" | ||
Sends batches of :mod:`Readings <snsary.models.reading>` as rows to Google BigQuery to be stored in a dataset / table called ``snsary.readings``, using `the BigQuery Python client <https://github.com/googleapis/python-bigquery>`_. | ||
Configuration should be in `a JSON credentials file <https://cloud.google.com/iam/docs/creating-managing-service-account-keys#creating>`_, the path to which should be specified using the environment variable ``GOOGLE_APPLICATION_CREDENTIALS``. | ||
Setting up BigQuery | ||
=================== | ||
You can use the BigQuery UI to do most of the setup. | ||
1. Create a dataset called ``snsary``. | ||
- Do not enable table expiration (this is different to partition expiration). | ||
2. Create a table called ``readings``. | ||
- Add columns ``timestamp``, ``host``, ``sensor``, ``metric`` and ``value``. | ||
- Use ``TIMESTAMP`` for ``timestamp``, ``FLOAT`` for ``value`` and otherwise ``STRING``. | ||
- Partition the table **by day** using values of the **timestamp** column. | ||
3. Set up partition expiration e.g. :: | ||
ALTER TABLE snsary.readings | ||
SET OPTIONS ( | ||
partition_expiration_days=425 | ||
) | ||
You will also need to create `a Google Cloud service account <https://cloud.google.com/iam/docs/service-accounts>`_ and corresponding API key. The service account should have the "BigQuery Data Editor" role or similar. | ||
Querying the data | ||
================= | ||
Example query for data in the table: :: | ||
SELECT $__timeGroup(timestamp,$__interval), sensor, metric, avg(value) | ||
FROM `snsary.readings` | ||
where $__timeFilter(timestamp) | ||
group by $__timeGroup(timestamp,$__interval), sensor, metric | ||
order by 1 asc | ||
Note that the ``$__`` functions are `defined by Grafana <https://grafana.com/grafana/plugins/grafana-bigquery-datasource/>`_. A service account reading the data will need to have "BigQuery Data Viewer" and "BigQuery Job User" roles. | ||
""" | ||
import platform | ||
|
||
import pytz | ||
from google.api_core.retry import Retry | ||
from google.cloud import bigquery | ||
|
||
from snsary.outputs import BatchOutput | ||
|
||
|
||
class BigQueryOutput(BatchOutput): | ||
def __init__(self, retry_deadline=10): | ||
BatchOutput.__init__(self) | ||
self.__retry_deadline = retry_deadline | ||
self.__client = bigquery.Client() | ||
self.__table = self.__client.get_table("snsary.readings") | ||
|
||
def publish_batch(self, readings): | ||
errors = self.__client.insert_rows_json( | ||
self.__table, | ||
[self.__row(reading) for reading in readings], | ||
retry=Retry(deadline=self.__retry_deadline) | ||
) | ||
|
||
# useful to catch setup errors like incorrect table column names | ||
for row_error in errors: | ||
self.logger.error(f'Error inserting row: #{row_error}') | ||
|
||
def __row(self, reading): | ||
return { | ||
# using UTC ensures test stability when run in different zones | ||
'timestamp': reading.datetime.astimezone(pytz.utc).isoformat(), | ||
'host': platform.node(), | ||
'sensor': reading.sensor_name, | ||
'metric': reading.name, | ||
'value': reading.value, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
{ | ||
"type": "service_account", | ||
"project_id": "test-project", | ||
"private_key_id": "05079bf59a25e2be4a7ad38dd5f60bc9d61ac906", | ||
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC0J3YBegVaBknk\ndN1She7cgaENFaO/pxMIV774kfR/PmIOyi4KYTTOOtMGvDKRVGikku2Vp+mFvGk+\nRf+Gu1OTUp4pPNWUuQmQ7iKg7laKSKofcWRGmjHdu7/WrxLGtL+N9nqsXpo7U2qE\nSjaZuYM9zlC8OGq4qW8TznKEiAnFYQT6UkzuA5BrgnmEge/1CodA5QqaNYesp/gK\nQPTWzeHPsFnmgzU/+pkHr+ybW/ZEjhoRZbVDQezgouCrSwjP3EAE0Z9N7HhGhQ0i\nnQ/2KtIcxSkIDbTN69JNyeY1V8Al47Msy+lLv0Q9ZpqhE8pV4ZvbPvSlQDsbZDnN\nhq5JZ04XAgMBAAECggEABaHGLdc9G5devMzUFNeP10Z/wzaN+L8MXt8ykfeRVF9q\n4vN6UcpZw3WE61nIauqsnf7jfO46F3tp/txi8WwhxlVEd7cpjI1/hkLFKl38SDTS\nIxfnmDcLbNciipLM9d4b4iBDRxtqzusvEhVDDhzSkYTfP6mApjOfZiS24WKVvtBA\nHvp57iaoyi9bmrOCE4NXwjkw+HXyx6A/cvLcaJJ1gyNUEhbGHAu9wnCTVrkh/vwb\nI/hAKrUJ1eSCilNdR6oOdLcF3pgZPhYoo2CJLxTra4CMXnYLfXGbf2AbJYbrR/qy\nIwIZtrFEtB89TEVZAN8/Ihjnphd5TZE3zySFeD8uoQKBgQDyJR5HbkXqJQFMn7b/\nconrbgMZIKqYvvvDKCBpe9vpyhouW2NjPx7PeyzNmo4Y612pghmdtpxyA4QYOGZn\n7Wgz2DXjFB3WryuqSJ+KzsA1X6v62k9OBIhLL0dgxJVZejtJGvDbkx0MOtZDq81s\n8KTDTv3ppFqlAtMTJNkuvL+qtQKBgQC+dlDL8Ww2tqRwHC6adzq6HbwPDoUH0mem\nI7Z3VX0nZ1fksadXstuPzH7M7EDUb+rmtEJHXGoiocAQ3baYgl+ww3btNFz++0jz\ns/VaayCHS1jFixsFfRTXQ9hFwt59Mw7PUTOHX+VEoZvbkUiBNDm/Fv7AKH2z2q0R\nbA9iKjY5GwKBgQC1P2FluD3u4BoT36zbkkF4DtWU1mW7haDvWDbCnipi2Zs1PcSs\nAhAu35UOpPRd5Lcr7Sz9ZzW9BbK3amgEvKh1vg2/1kvraRz3DfsVrGIk0WLqlsQr\nsqrOFCqKXTspSZvcwaStxnVEtTmyV4OhWpMJejQG0bXjs7SIrNk/6ZpRzQKBgDF0\nVUxvVpLqX85AcxVmqhgTinY6Ze5/AqzQDxvzVbj6i4b1XidWaM5w4efh3H+Mw+i9\nm4weAe0WJOH8P+Jfz+r9Bg9XjmIi63FkI+wjbuHSfa1ljlGhhdzTX//VNqI5tLm+\nMuwOyWBFukuL8NCjt+7XULSijuk5ecDRVzk9DrFLAoGATjdR3Xn7+U9VqN7iOOax\n8YVbmT9vuuge4dbGR3NUTzUITfziOn5poLwFmC2MyKUJjoiwSWAvIv3SbZR5iqkC\nsHs2DxYhwkF5q+m46rGRXkf2obDk0rtLJCD2d9HLAUdxe9EZA3QIhULlZBzu5pOO\nFxegU3PKXvcahnlrMka27DQ=\n-----END PRIVATE KEY-----\n", | ||
"client_email": "[email protected]", | ||
"client_id": "106531388598493814321", | ||
"auth_uri": "https://accounts.google.com/o/oauth2/auth", | ||
"token_uri": "https://oauth2.googleapis.com/token", | ||
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", | ||
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/test-78%40test-project.iam.gserviceaccount.com" | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
import json | ||
import logging | ||
import os | ||
from pathlib import Path | ||
|
||
import httpretty | ||
import pytest | ||
|
||
from snsary.contrib.google import BigQueryOutput | ||
|
||
|
||
@pytest.fixture() | ||
def credentials_path(): | ||
return f"{Path(__file__).parent}/credentials.json" | ||
|
||
|
||
@pytest.fixture | ||
@httpretty.activate(allow_net_connect=False) | ||
def big_query(mocker, credentials_path): | ||
mocker.patch.dict(os.environ, { | ||
'GOOGLE_APPLICATION_CREDENTIALS': credentials_path, | ||
}) | ||
|
||
# body obtained by inspecting code and errors from returning an empty JSON response | ||
httpretty.register_uri( | ||
httpretty.POST, | ||
'https://oauth2.googleapis.com/token', | ||
body=json.dumps({"access_token": "1234", "expires_in": 33}) | ||
) | ||
|
||
# body obtained by inspecting code and errors from returning an empty JSON response | ||
httpretty.register_uri( | ||
httpretty.GET, | ||
( | ||
'https://bigquery.googleapis.com/bigquery' + | ||
'/v2/projects/test-project/datasets/snsary/tables/readings?prettyPrint=false' | ||
), | ||
body=json.dumps({ | ||
"tableReference": {"tableId": "1234", "projectId": "1234", "datasetId": "1234"} | ||
}) | ||
) | ||
|
||
return BigQueryOutput(retry_deadline=0) | ||
|
||
|
||
@httpretty.activate(allow_net_connect=False) | ||
def test_publish_batch( | ||
mocker, | ||
reading, | ||
big_query | ||
): | ||
mocker.patch('platform.node', return_value='snsary') | ||
|
||
httpretty.register_uri( | ||
httpretty.POST, | ||
( | ||
'https://bigquery.googleapis.com/bigquery' + | ||
'/v2/projects/1234/datasets/1234/tables/1234/insertAll?prettyPrint=false' | ||
), | ||
) | ||
|
||
big_query.publish_batch([reading]) | ||
request = httpretty.last_request() | ||
|
||
assert b'"host": "snsary"' in request.body | ||
assert b'"metric": "myreading"' in request.body | ||
assert b'"sensor": "mysensor"' in request.body | ||
assert b'"timestamp": "2022-04-23T20:25:46+00:00"' in request.body | ||
assert b'"value": 123' in request.body | ||
|
||
|
||
@httpretty.activate(allow_net_connect=False) | ||
def test_publish_batch_error(big_query): | ||
httpretty.register_uri( | ||
httpretty.POST, | ||
( | ||
'https://bigquery.googleapis.com/bigquery' + | ||
'/v2/projects/1234/datasets/1234/tables/1234/insertAll?prettyPrint=false' | ||
), | ||
status=500 | ||
) | ||
|
||
with pytest.raises(Exception) as excinfo: | ||
big_query.publish_batch([]) | ||
|
||
assert 'Deadline of 0.0s exceeded' in str(excinfo.value) | ||
|
||
|
||
@httpretty.activate(allow_net_connect=False) | ||
def test_publish_batch_invalid(caplog, big_query): | ||
caplog.set_level(logging.ERROR) | ||
|
||
httpretty.register_uri( | ||
httpretty.POST, | ||
( | ||
'https://bigquery.googleapis.com/bigquery' + | ||
'/v2/projects/1234/datasets/1234/tables/1234/insertAll?prettyPrint=false' | ||
), | ||
body=json.dumps({ | ||
'insertErrors': [{'index': 0, 'errors': [{'message': 'no such field: abc.'}]}] | ||
}) | ||
) | ||
|
||
big_query.publish_batch([]) | ||
assert 'Error inserting row' in caplog.text | ||
assert 'no such field' in caplog.text |