Skip to content

Commit

Permalink
Add new output for Google BigQuery
Browse files Browse the repository at this point in the history
This follows a similar pattern to other output tests by mocking the
end-to-end API calls. While this was a bit harder for the BigQuery
client, in the absence of functional tests the advantages are:

- It clarifies what the error behaviours are.
- It thoroughly verifies the calls are valid.

The documentation is a bit fragmented:

- The old Grafana data source has more extensive guidance on how to
setup the service user for querying data [^1].

- I can't find a clear reference to the environment variable for the
credentials file path - just the error message.

Note that v3 of the Google Cloud BigQuery client won't install on a
Raspberry Pi right now, so we need to pin it to a lower version [^2].

[^1]: https://grafana.com/grafana/plugins/doitintl-bigquery-datasource/
[^2]: googleapis/python-bigquery#1142 (comment)
  • Loading branch information
benthorner committed May 4, 2022
1 parent dbf553c commit 5024b28
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ __pycache__/
*.egg-info
tmp
docs/sphinx/autoapi/
*.json
7 changes: 6 additions & 1 deletion docs/examples/contrib.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from snsary.contrib.adafruit.sgp30 import SGP30Sensor
from snsary.contrib.awair import AwairSensor
from snsary.contrib.datastax import GraphQLOutput
from snsary.contrib.google import BigQueryOutput
from snsary.contrib.grafana import GraphiteOutput
from snsary.contrib.influxdb import InfluxDBOutput
from snsary.contrib.octopus import OctopusSensor
Expand All @@ -25,8 +26,12 @@
load_dotenv()
configure_logging()

graphql = GraphQLOutput.from_env()
longterm_stream = SimpleStream()
longterm_stream.into(BigQueryOutput())

# summarization is necessary to minimise the amount of data stored but also
# to make it more practical to query longterm data in the absence of grouping
graphql = GraphQLOutput.from_env()
longterm_stream.summarize(minutes=1).rename(append="/minute").into(graphql)
longterm_stream.summarize(hours=1).rename(append="/hour").into(graphql)
longterm_stream.summarize(days=1).rename(append="/day").into(graphql)
Expand Down
1 change: 1 addition & 0 deletions docs/extras/web_apis.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ Outputs for remote / web APIs include:

- GraphiteOutput ([API docs](https://snsary.readthedocs.io/en/latest/autoapi/snsary/contrib/grafana/index.html))
- InfluxDBOutput ([API docs](https://snsary.readthedocs.io/en/latest/autoapi/snsary/contrib/influxdb/index.html))
- BigQueryOutput ([API docs](https://snsary.readthedocs.io/en/latest/autoapi/snsary/contrib/influxdb/index.html)) - requires `sudo apt install cmake`
1 change: 1 addition & 0 deletions requirements/google/extra.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
google-cloud-bigquery < 3.0.0dev
1 change: 1 addition & 0 deletions requirements/google/tests.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
httpretty
78 changes: 78 additions & 0 deletions src/snsary/contrib/google.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
Sends batches of :mod:`Readings <snsary.models.reading>` as rows to Google BigQuery to be stored in a dataset / table called ``snsary.readings``, using `the BigQuery Python client <https://github.com/googleapis/python-bigquery>`_.
Configuration should be in `a JSON credentials file <https://cloud.google.com/iam/docs/creating-managing-service-account-keys#creating>`_, the path to which should be specified using the environment variable ``GOOGLE_APPLICATION_CREDENTIALS``.
Setting up BigQuery
===================
You can use the BigQuery UI to do most of the setup.
1. Create a dataset called ``snsary``.
- Do not enable table expiration (this is different to partition expiration).
2. Create a table called ``readings``.
- Add columns ``timestamp``, ``host``, ``sensor``, ``metric`` and ``value``.
- Use ``TIMESTAMP`` for ``timestamp``, ``FLOAT`` for ``value`` and otherwise ``STRING``.
- Partition the table **by day** using values of the **timestamp** column.
3. Set up partition expiration e.g. ::
ALTER TABLE snsary.readings
SET OPTIONS (
partition_expiration_days=425
)
You will also need to create `a Google Cloud service account <https://cloud.google.com/iam/docs/service-accounts>`_ and corresponding API key. The service account should have the "BigQuery Data Editor" role or similar.
Querying the data
=================
Example query for data in the table: ::
SELECT $__timeGroup(timestamp,$__interval), sensor, metric, avg(value)
FROM `snsary.readings`
where $__timeFilter(timestamp)
group by $__timeGroup(timestamp,$__interval), sensor, metric
order by 1 asc
Note that the ``$__`` functions are `defined by Grafana <https://grafana.com/grafana/plugins/grafana-bigquery-datasource/>`_. A service account reading the data will need to have "BigQuery Data Viewer" and "BigQuery Job User" roles.
"""
import platform

import pytz
from google.api_core.retry import Retry
from google.cloud import bigquery

from snsary.outputs import BatchOutput


class BigQueryOutput(BatchOutput):
def __init__(self, retry_deadline=10):
BatchOutput.__init__(self)
self.__retry_deadline = retry_deadline
self.__client = bigquery.Client()
self.__table = self.__client.get_table("snsary.readings")

def publish_batch(self, readings):
errors = self.__client.insert_rows_json(
self.__table,
[self.__row(reading) for reading in readings],
retry=Retry(deadline=self.__retry_deadline)
)

# useful to catch setup errors like incorrect table column names
for row_error in errors:
self.logger.error(f'Error inserting row: #{row_error}')

def __row(self, reading):
return {
# using UTC ensures test stability when run in different zones
'timestamp': reading.datetime.astimezone(pytz.utc).isoformat(),
'host': platform.node(),
'sensor': reading.sensor_name,
'metric': reading.name,
'value': reading.value,
}
12 changes: 12 additions & 0 deletions tests/contrib/google/credentials.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"type": "service_account",
"project_id": "test-project",
"private_key_id": "05079bf59a25e2be4a7ad38dd5f60bc9d61ac906",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC0J3YBegVaBknk\ndN1She7cgaENFaO/pxMIV774kfR/PmIOyi4KYTTOOtMGvDKRVGikku2Vp+mFvGk+\nRf+Gu1OTUp4pPNWUuQmQ7iKg7laKSKofcWRGmjHdu7/WrxLGtL+N9nqsXpo7U2qE\nSjaZuYM9zlC8OGq4qW8TznKEiAnFYQT6UkzuA5BrgnmEge/1CodA5QqaNYesp/gK\nQPTWzeHPsFnmgzU/+pkHr+ybW/ZEjhoRZbVDQezgouCrSwjP3EAE0Z9N7HhGhQ0i\nnQ/2KtIcxSkIDbTN69JNyeY1V8Al47Msy+lLv0Q9ZpqhE8pV4ZvbPvSlQDsbZDnN\nhq5JZ04XAgMBAAECggEABaHGLdc9G5devMzUFNeP10Z/wzaN+L8MXt8ykfeRVF9q\n4vN6UcpZw3WE61nIauqsnf7jfO46F3tp/txi8WwhxlVEd7cpjI1/hkLFKl38SDTS\nIxfnmDcLbNciipLM9d4b4iBDRxtqzusvEhVDDhzSkYTfP6mApjOfZiS24WKVvtBA\nHvp57iaoyi9bmrOCE4NXwjkw+HXyx6A/cvLcaJJ1gyNUEhbGHAu9wnCTVrkh/vwb\nI/hAKrUJ1eSCilNdR6oOdLcF3pgZPhYoo2CJLxTra4CMXnYLfXGbf2AbJYbrR/qy\nIwIZtrFEtB89TEVZAN8/Ihjnphd5TZE3zySFeD8uoQKBgQDyJR5HbkXqJQFMn7b/\nconrbgMZIKqYvvvDKCBpe9vpyhouW2NjPx7PeyzNmo4Y612pghmdtpxyA4QYOGZn\n7Wgz2DXjFB3WryuqSJ+KzsA1X6v62k9OBIhLL0dgxJVZejtJGvDbkx0MOtZDq81s\n8KTDTv3ppFqlAtMTJNkuvL+qtQKBgQC+dlDL8Ww2tqRwHC6adzq6HbwPDoUH0mem\nI7Z3VX0nZ1fksadXstuPzH7M7EDUb+rmtEJHXGoiocAQ3baYgl+ww3btNFz++0jz\ns/VaayCHS1jFixsFfRTXQ9hFwt59Mw7PUTOHX+VEoZvbkUiBNDm/Fv7AKH2z2q0R\nbA9iKjY5GwKBgQC1P2FluD3u4BoT36zbkkF4DtWU1mW7haDvWDbCnipi2Zs1PcSs\nAhAu35UOpPRd5Lcr7Sz9ZzW9BbK3amgEvKh1vg2/1kvraRz3DfsVrGIk0WLqlsQr\nsqrOFCqKXTspSZvcwaStxnVEtTmyV4OhWpMJejQG0bXjs7SIrNk/6ZpRzQKBgDF0\nVUxvVpLqX85AcxVmqhgTinY6Ze5/AqzQDxvzVbj6i4b1XidWaM5w4efh3H+Mw+i9\nm4weAe0WJOH8P+Jfz+r9Bg9XjmIi63FkI+wjbuHSfa1ljlGhhdzTX//VNqI5tLm+\nMuwOyWBFukuL8NCjt+7XULSijuk5ecDRVzk9DrFLAoGATjdR3Xn7+U9VqN7iOOax\n8YVbmT9vuuge4dbGR3NUTzUITfziOn5poLwFmC2MyKUJjoiwSWAvIv3SbZR5iqkC\nsHs2DxYhwkF5q+m46rGRXkf2obDk0rtLJCD2d9HLAUdxe9EZA3QIhULlZBzu5pOO\nFxegU3PKXvcahnlrMka27DQ=\n-----END PRIVATE KEY-----\n",
"client_email": "[email protected]",
"client_id": "106531388598493814321",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/test-78%40test-project.iam.gserviceaccount.com"
}
106 changes: 106 additions & 0 deletions tests/contrib/google/test_google.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import json
import logging
import os
from pathlib import Path

import httpretty
import pytest

from snsary.contrib.google import BigQueryOutput


@pytest.fixture()
def credentials_path():
return f"{Path(__file__).parent}/credentials.json"


@pytest.fixture
@httpretty.activate(allow_net_connect=False)
def big_query(mocker, credentials_path):
mocker.patch.dict(os.environ, {
'GOOGLE_APPLICATION_CREDENTIALS': credentials_path,
})

# body obtained by inspecting code and errors from returning an empty JSON response
httpretty.register_uri(
httpretty.POST,
'https://oauth2.googleapis.com/token',
body=json.dumps({"access_token": "1234", "expires_in": 33})
)

# body obtained by inspecting code and errors from returning an empty JSON response
httpretty.register_uri(
httpretty.GET,
(
'https://bigquery.googleapis.com/bigquery' +
'/v2/projects/test-project/datasets/snsary/tables/readings?prettyPrint=false'
),
body=json.dumps({
"tableReference": {"tableId": "1234", "projectId": "1234", "datasetId": "1234"}
})
)

return BigQueryOutput(retry_deadline=0)


@httpretty.activate(allow_net_connect=False)
def test_publish_batch(
mocker,
reading,
big_query
):
mocker.patch('platform.node', return_value='snsary')

httpretty.register_uri(
httpretty.POST,
(
'https://bigquery.googleapis.com/bigquery' +
'/v2/projects/1234/datasets/1234/tables/1234/insertAll?prettyPrint=false'
),
)

big_query.publish_batch([reading])
request = httpretty.last_request()

assert b'"host": "snsary"' in request.body
assert b'"metric": "myreading"' in request.body
assert b'"sensor": "mysensor"' in request.body
assert b'"timestamp": "2022-04-23T20:25:46+00:00"' in request.body
assert b'"value": 123' in request.body


@httpretty.activate(allow_net_connect=False)
def test_publish_batch_error(big_query):
httpretty.register_uri(
httpretty.POST,
(
'https://bigquery.googleapis.com/bigquery' +
'/v2/projects/1234/datasets/1234/tables/1234/insertAll?prettyPrint=false'
),
status=500
)

with pytest.raises(Exception) as excinfo:
big_query.publish_batch([])

assert 'Deadline of 0.0s exceeded' in str(excinfo.value)


@httpretty.activate(allow_net_connect=False)
def test_publish_batch_invalid(caplog, big_query):
caplog.set_level(logging.ERROR)

httpretty.register_uri(
httpretty.POST,
(
'https://bigquery.googleapis.com/bigquery' +
'/v2/projects/1234/datasets/1234/tables/1234/insertAll?prettyPrint=false'
),
body=json.dumps({
'insertErrors': [{'index': 0, 'errors': [{'message': 'no such field: abc.'}]}]
})
)

big_query.publish_batch([])
assert 'Error inserting row' in caplog.text
assert 'no such field' in caplog.text

0 comments on commit 5024b28

Please sign in to comment.