From 96279280452d0d052a676d8930a0e5c35e2c7ac5 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Mon, 2 May 2022 20:57:31 +0100 Subject: [PATCH] Add new output for Google BigQuery This follows a similar pattern to other output tests by mocking the end-to-end API calls. While this was a bit harder for the BigQuery client, in the absence of functional tests the advantages are: - It clarifies what the error behaviours are. - It thoroughly verifies the calls are valid. The documentation is a bit fragmented: - The old Grafana data source has more extensive guidance on how to setup the service user for querying data [^1]. - I can't find a clear reference to the environment variable for the credentials file path - just the error message. Note that v3 of the Google Cloud BigQuery client won't install on a Raspberry Pi right now, so we need to pin it to a lower version [^2]. [^1]: https://grafana.com/grafana/plugins/doitintl-bigquery-datasource/ [^2]: https://github.com/googleapis/python-bigquery/issues/1142#issuecomment-1117984209 --- .gitignore | 1 + docs/examples/contrib.py | 12 ++- docs/extras/web_apis.md | 1 + requirements/google/extra.txt | 1 + requirements/google/tests.txt | 1 + src/snsary/contrib/google.py | 78 +++++++++++++++++++ tests/contrib/google/credentials.json | 12 +++ tests/contrib/google/test_google.py | 106 ++++++++++++++++++++++++++ 8 files changed, 208 insertions(+), 4 deletions(-) create mode 100644 requirements/google/extra.txt create mode 100644 requirements/google/tests.txt create mode 100644 src/snsary/contrib/google.py create mode 100644 tests/contrib/google/credentials.json create mode 100644 tests/contrib/google/test_google.py diff --git a/.gitignore b/.gitignore index 4ec91e9..e289346 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ __pycache__/ *.egg-info tmp docs/sphinx/autoapi/ +*.json diff --git a/docs/examples/contrib.py b/docs/examples/contrib.py index 5d613f5..0cc887e 100644 --- a/docs/examples/contrib.py +++ b/docs/examples/contrib.py @@ -12,6 +12,7 @@ from snsary.contrib.adafruit.sgp30 import SGP30Sensor from snsary.contrib.awair import AwairSensor from snsary.contrib.datastax import GraphQLOutput +from snsary.contrib.google import BigQueryOutput from snsary.contrib.grafana import GraphiteOutput from snsary.contrib.influxdb import InfluxDBOutput from snsary.contrib.octopus import OctopusSensor @@ -25,11 +26,14 @@ load_dotenv() configure_logging() -graphql = GraphQLOutput.from_env() +# summarization is necessary to minimise the amount of data stored but also +# for GraphQL to make longterm queries practical in the absence of grouping longterm_stream = SimpleStream() -longterm_stream.summarize(minutes=1).rename(append="/minute").into(graphql) -longterm_stream.summarize(hours=1).rename(append="/hour").into(graphql) -longterm_stream.summarize(days=1).rename(append="/day").into(graphql) +bigquery = BigQueryOutput() +graphql = GraphQLOutput.from_env() +longterm_stream.summarize(minutes=1).rename(append="/minute").into(graphql, bigquery) +longterm_stream.summarize(hours=1).rename(append="/hour").into(graphql, bigquery) +longterm_stream.summarize(days=1).rename(append="/day").into(graphql, bigquery) sgp30 = SGP30Sensor(Adafruit_SGP30(i2c)) diff --git a/docs/extras/web_apis.md b/docs/extras/web_apis.md index 170228f..15a25fe 100644 --- a/docs/extras/web_apis.md +++ b/docs/extras/web_apis.md @@ -15,3 +15,4 @@ Outputs for remote / web APIs include: - GraphiteOutput ([API docs](https://snsary.readthedocs.io/en/latest/autoapi/snsary/contrib/grafana/index.html)) - InfluxDBOutput ([API docs](https://snsary.readthedocs.io/en/latest/autoapi/snsary/contrib/influxdb/index.html)) +- BigQueryOutput ([API docs](https://snsary.readthedocs.io/en/latest/autoapi/snsary/contrib/influxdb/index.html)) - requires `sudo apt install cmake` diff --git a/requirements/google/extra.txt b/requirements/google/extra.txt new file mode 100644 index 0000000..cd9a4a0 --- /dev/null +++ b/requirements/google/extra.txt @@ -0,0 +1 @@ +google-cloud-bigquery < 3.0.0dev diff --git a/requirements/google/tests.txt b/requirements/google/tests.txt new file mode 100644 index 0000000..b0284b1 --- /dev/null +++ b/requirements/google/tests.txt @@ -0,0 +1 @@ +httpretty diff --git a/src/snsary/contrib/google.py b/src/snsary/contrib/google.py new file mode 100644 index 0000000..2f802ce --- /dev/null +++ b/src/snsary/contrib/google.py @@ -0,0 +1,78 @@ +""" +Sends batches of :mod:`Readings ` as rows to Google BigQuery to be stored in a dataset / table called ``snsary.readings``, using `the BigQuery Python client `_. + +Configuration should be in `a JSON credentials file `_, the path to which should be specified using the environment variable ``GOOGLE_APPLICATION_CREDENTIALS``. + +Setting up BigQuery +=================== + +You can use the BigQuery UI to do most of the setup. + +1. Create a dataset called ``snsary``. + + - Do not enable table expiration (this is different to partition expiration). + +2. Create a table called ``readings``. + + - Add columns ``timestamp``, ``host``, ``sensor``, ``metric`` and ``value``. + - Use ``TIMESTAMP`` for ``timestamp``, ``FLOAT`` for ``value`` and otherwise ``STRING``. + - Partition the table **by day** using values of the **timestamp** column. + +3. Set up partition expiration e.g. :: + + ALTER TABLE snsary.readings + SET OPTIONS ( + partition_expiration_days=425 + ) + +You will also need to create `a Google Cloud service account `_ and corresponding API key. The service account should have the "BigQuery Data Editor" role or similar. + +Querying the data +================= + +Example query for data in the table: :: + + SELECT $__timeGroup(timestamp,$__interval), sensor, metric, avg(value) + FROM `snsary.readings` + where $__timeFilter(timestamp) + group by $__timeGroup(timestamp,$__interval), sensor, metric + order by 1 asc + +Note that the ``$__`` functions are `defined by Grafana `_. A service account reading the data will need to have "BigQuery Data Viewer" and "BigQuery Job User" roles. +""" +import platform + +import pytz +from google.api_core.retry import Retry +from google.cloud import bigquery + +from snsary.outputs import BatchOutput + + +class BigQueryOutput(BatchOutput): + def __init__(self, retry_deadline=10): + BatchOutput.__init__(self) + self.__retry_deadline = retry_deadline + self.__client = bigquery.Client() + self.__table = self.__client.get_table("snsary.readings") + + def publish_batch(self, readings): + errors = self.__client.insert_rows_json( + self.__table, + [self.__row(reading) for reading in readings], + retry=Retry(deadline=self.__retry_deadline) + ) + + # useful to catch setup errors like incorrect table column names + for row_error in errors: + self.logger.error(f'Error inserting row: #{row_error}') + + def __row(self, reading): + return { + # using UTC ensures test stability when run in different zones + 'timestamp': reading.datetime.astimezone(pytz.utc).isoformat(), + 'host': platform.node(), + 'sensor': reading.sensor_name, + 'metric': reading.name, + 'value': reading.value, + } diff --git a/tests/contrib/google/credentials.json b/tests/contrib/google/credentials.json new file mode 100644 index 0000000..62a6818 --- /dev/null +++ b/tests/contrib/google/credentials.json @@ -0,0 +1,12 @@ +{ + "type": "service_account", + "project_id": "test-project", + "private_key_id": "05079bf59a25e2be4a7ad38dd5f60bc9d61ac906", + "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC0J3YBegVaBknk\ndN1She7cgaENFaO/pxMIV774kfR/PmIOyi4KYTTOOtMGvDKRVGikku2Vp+mFvGk+\nRf+Gu1OTUp4pPNWUuQmQ7iKg7laKSKofcWRGmjHdu7/WrxLGtL+N9nqsXpo7U2qE\nSjaZuYM9zlC8OGq4qW8TznKEiAnFYQT6UkzuA5BrgnmEge/1CodA5QqaNYesp/gK\nQPTWzeHPsFnmgzU/+pkHr+ybW/ZEjhoRZbVDQezgouCrSwjP3EAE0Z9N7HhGhQ0i\nnQ/2KtIcxSkIDbTN69JNyeY1V8Al47Msy+lLv0Q9ZpqhE8pV4ZvbPvSlQDsbZDnN\nhq5JZ04XAgMBAAECggEABaHGLdc9G5devMzUFNeP10Z/wzaN+L8MXt8ykfeRVF9q\n4vN6UcpZw3WE61nIauqsnf7jfO46F3tp/txi8WwhxlVEd7cpjI1/hkLFKl38SDTS\nIxfnmDcLbNciipLM9d4b4iBDRxtqzusvEhVDDhzSkYTfP6mApjOfZiS24WKVvtBA\nHvp57iaoyi9bmrOCE4NXwjkw+HXyx6A/cvLcaJJ1gyNUEhbGHAu9wnCTVrkh/vwb\nI/hAKrUJ1eSCilNdR6oOdLcF3pgZPhYoo2CJLxTra4CMXnYLfXGbf2AbJYbrR/qy\nIwIZtrFEtB89TEVZAN8/Ihjnphd5TZE3zySFeD8uoQKBgQDyJR5HbkXqJQFMn7b/\nconrbgMZIKqYvvvDKCBpe9vpyhouW2NjPx7PeyzNmo4Y612pghmdtpxyA4QYOGZn\n7Wgz2DXjFB3WryuqSJ+KzsA1X6v62k9OBIhLL0dgxJVZejtJGvDbkx0MOtZDq81s\n8KTDTv3ppFqlAtMTJNkuvL+qtQKBgQC+dlDL8Ww2tqRwHC6adzq6HbwPDoUH0mem\nI7Z3VX0nZ1fksadXstuPzH7M7EDUb+rmtEJHXGoiocAQ3baYgl+ww3btNFz++0jz\ns/VaayCHS1jFixsFfRTXQ9hFwt59Mw7PUTOHX+VEoZvbkUiBNDm/Fv7AKH2z2q0R\nbA9iKjY5GwKBgQC1P2FluD3u4BoT36zbkkF4DtWU1mW7haDvWDbCnipi2Zs1PcSs\nAhAu35UOpPRd5Lcr7Sz9ZzW9BbK3amgEvKh1vg2/1kvraRz3DfsVrGIk0WLqlsQr\nsqrOFCqKXTspSZvcwaStxnVEtTmyV4OhWpMJejQG0bXjs7SIrNk/6ZpRzQKBgDF0\nVUxvVpLqX85AcxVmqhgTinY6Ze5/AqzQDxvzVbj6i4b1XidWaM5w4efh3H+Mw+i9\nm4weAe0WJOH8P+Jfz+r9Bg9XjmIi63FkI+wjbuHSfa1ljlGhhdzTX//VNqI5tLm+\nMuwOyWBFukuL8NCjt+7XULSijuk5ecDRVzk9DrFLAoGATjdR3Xn7+U9VqN7iOOax\n8YVbmT9vuuge4dbGR3NUTzUITfziOn5poLwFmC2MyKUJjoiwSWAvIv3SbZR5iqkC\nsHs2DxYhwkF5q+m46rGRXkf2obDk0rtLJCD2d9HLAUdxe9EZA3QIhULlZBzu5pOO\nFxegU3PKXvcahnlrMka27DQ=\n-----END PRIVATE KEY-----\n", + "client_email": "test-78@test-project.iam.gserviceaccount.com", + "client_id": "106531388598493814321", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/test-78%40test-project.iam.gserviceaccount.com" +} diff --git a/tests/contrib/google/test_google.py b/tests/contrib/google/test_google.py new file mode 100644 index 0000000..03d0c3c --- /dev/null +++ b/tests/contrib/google/test_google.py @@ -0,0 +1,106 @@ +import json +import logging +import os +from pathlib import Path + +import httpretty +import pytest + +from snsary.contrib.google import BigQueryOutput + + +@pytest.fixture() +def credentials_path(): + return f"{Path(__file__).parent}/credentials.json" + + +@pytest.fixture +@httpretty.activate(allow_net_connect=False) +def big_query(mocker, credentials_path): + mocker.patch.dict(os.environ, { + 'GOOGLE_APPLICATION_CREDENTIALS': credentials_path, + }) + + # body obtained by inspecting code and errors from returning an empty JSON response + httpretty.register_uri( + httpretty.POST, + 'https://oauth2.googleapis.com/token', + body=json.dumps({"access_token": "1234", "expires_in": 33}) + ) + + # body obtained by inspecting code and errors from returning an empty JSON response + httpretty.register_uri( + httpretty.GET, + ( + 'https://bigquery.googleapis.com/bigquery' + + '/v2/projects/test-project/datasets/snsary/tables/readings?prettyPrint=false' + ), + body=json.dumps({ + "tableReference": {"tableId": "1234", "projectId": "1234", "datasetId": "1234"} + }) + ) + + return BigQueryOutput(retry_deadline=0) + + +@httpretty.activate(allow_net_connect=False) +def test_publish_batch( + mocker, + reading, + big_query +): + mocker.patch('platform.node', return_value='snsary') + + httpretty.register_uri( + httpretty.POST, + ( + 'https://bigquery.googleapis.com/bigquery' + + '/v2/projects/1234/datasets/1234/tables/1234/insertAll?prettyPrint=false' + ), + ) + + big_query.publish_batch([reading]) + request = httpretty.last_request() + + assert b'"host": "snsary"' in request.body + assert b'"metric": "myreading"' in request.body + assert b'"sensor": "mysensor"' in request.body + assert b'"timestamp": "2022-04-23T20:25:46+00:00"' in request.body + assert b'"value": 123' in request.body + + +@httpretty.activate(allow_net_connect=False) +def test_publish_batch_error(big_query): + httpretty.register_uri( + httpretty.POST, + ( + 'https://bigquery.googleapis.com/bigquery' + + '/v2/projects/1234/datasets/1234/tables/1234/insertAll?prettyPrint=false' + ), + status=500 + ) + + with pytest.raises(Exception) as excinfo: + big_query.publish_batch([]) + + assert 'Deadline of 0.0s exceeded' in str(excinfo.value) + + +@httpretty.activate(allow_net_connect=False) +def test_publish_batch_invalid(caplog, big_query): + caplog.set_level(logging.ERROR) + + httpretty.register_uri( + httpretty.POST, + ( + 'https://bigquery.googleapis.com/bigquery' + + '/v2/projects/1234/datasets/1234/tables/1234/insertAll?prettyPrint=false' + ), + body=json.dumps({ + 'insertErrors': [{'index': 0, 'errors': [{'message': 'no such field: abc.'}]}] + }) + ) + + big_query.publish_batch([]) + assert 'Error inserting row' in caplog.text + assert 'no such field' in caplog.text