Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configuration file #53

Merged
merged 15 commits into from
Jan 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 1.4.0 [unreleased]

### Features
1. [#52](https://github.com/influxdata/influxdb-client-python/issues/52): Initialize client library from config file and environmental properties

## 1.3.0 [2020-01-17]

### Features
Expand Down
82 changes: 82 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,50 @@ Please follow the `Installation`_ and then run the following:
for cell in row:
val_count += 1


.. marker-query-end

Client configuration
--------------------

Via File
^^^^^^^^
A client can be configured via ``*.ini`` file in segment ``influx2``.

The following options are supported:

- ``url`` - the url to connect to InfluxDB
- ``org`` - default destination organization for writes and queries
- ``token`` - the token to use for the authorization
- ``timeout`` - socket timeout in ms (default value is 10000)

.. code-block:: python

self.client = InfluxDBClient.from_config_file("config.ini")

.. code-block::

[influx2]
url=http://localhost:9999
org=my-org
token=my-token
timeout=6000

Via Environment Properties
^^^^^^^^^^^^^^^^^^^^^^^^^^
A client can be configured via environment properties.

Supported properties are:

- ``INFLUXDB_V2_URL`` - the url to connect to InfluxDB
- ``INFLUXDB_V2_ORG`` - default destination organization for writes and queries
- ``INFLUXDB_V2_TOKEN`` - the token to use for the authorization
- ``INFLUXDB_V2_TIMEOUT`` - socket timeout in ms (default value is 10000)

.. code-block:: python

self.client = InfluxDBClient.from_env_properties()

.. marker-index-end


Expand Down Expand Up @@ -264,6 +307,9 @@ The expressions:
- ``California Miner`` - static value
- ``${env.hostname}`` - environment property

Via API
_______

.. code-block:: python

point_settings = PointSettings()
Expand All @@ -278,6 +324,42 @@ The expressions:
self.write_client = self.client.write_api(write_options=SYNCHRONOUS,
point_settings=PointSettings(**{"id": "132-987-655",
"customer": "California Miner"}))

Via Configuration file
______________________

In a ini configuration file you are able to specify default tags by ``tags`` segment.

.. code-block:: python

self.client = InfluxDBClient.from_config_file("config.ini")

.. code-block::

[influx2]
url=http://localhost:9999
org=my-org
token=my-token
timeout=6000

[tags]
id = 132-987-655
customer = California Miner
data_center = ${env.data_center}

Via Environment Properties
__________________________
You are able to specify default tags by environment properties with prefix ``INFLUXDB_V2_TAG_``.

Examples:

- ``INFLUXDB_V2_TAG_ID``
- ``INFLUXDB_V2_TAG_HOSTNAME``

.. code-block:: python

self.client = InfluxDBClient.from_env_properties()

.. marker-default-tags-end

Asynchronous client
Expand Down
52 changes: 51 additions & 1 deletion influxdb_client/client/influxdb_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from __future__ import absolute_import

import configparser
import os

from influxdb_client import Configuration, ApiClient, HealthCheck, HealthService, Ready, ReadyService
from influxdb_client.client.authorizations_api import AuthorizationsApi
from influxdb_client.client.bucket_api import BucketsApi
Expand All @@ -14,7 +17,8 @@

class InfluxDBClient(object):

def __init__(self, url, token, debug=None, timeout=10000, enable_gzip=False, org: str = None) -> None:
def __init__(self, url, token, debug=None, timeout=10000, enable_gzip=False, org: str = None,
default_tags: dict = None) -> None:
"""
:class:`influxdb_client.InfluxDBClient` is client for HTTP API defined
in https://github.com/influxdata/influxdb/blob/master/http/swagger.yml.
Expand All @@ -33,6 +37,8 @@ def __init__(self, url, token, debug=None, timeout=10000, enable_gzip=False, org
self.timeout = timeout
self.org = org

self.default_tags = default_tags

conf = _Configuration()
conf.host = self.url
conf.enable_gzip = enable_gzip
Expand All @@ -45,6 +51,50 @@ def __init__(self, url, token, debug=None, timeout=10000, enable_gzip=False, org
self.api_client = ApiClient(configuration=conf, header_name=auth_header_name,
header_value=auth_header_value)

@classmethod
def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gzip=False):
config = configparser.ConfigParser()
config.read(config_file)

url = config['influx2']['url']
token = config['influx2']['token']

timeout = None

if config.has_option('influx2', 'timeout'):
timeout = config['influx2']['timeout']

org = None

if config.has_option('influx2', 'org'):
org = config['influx2']['org']

default_tags = None

if config.has_section('tags'):
default_tags = dict(config.items('tags'))

if timeout:
return cls(url, token, debug=debug, timeout=int(timeout), org=org, default_tags=default_tags,
enable_gzip=enable_gzip)

return cls(url, token, debug=debug, org=org, default_tags=default_tags, enable_gzip=enable_gzip)

@classmethod
def from_env_properties(cls, debug=None, enable_gzip=False):
url = os.getenv('INFLUXDB_V2_URL', "http://localhost:9999")
token = os.getenv('INFLUXDB_V2_TOKEN', "my-token")
timeout = os.getenv('INFLUXDB_V2_TIMEOUT', "10000")
org = os.getenv('INFLUXDB_V2_ORG', "my-org")

default_tags = dict()

for key, value in os.environ.items():
if key.startswith("INFLUXDB_V2_TAG_"):
default_tags[key[16:].lower()] = value

return cls(url, token, debug=debug, timeout=int(timeout), org=org, default_tags=default_tags, enable_gzip=enable_gzip)

def write_api(self, write_options=WriteOptions(), point_settings=PointSettings()) -> WriteApi:
"""
Creates a Write API instance
Expand Down
5 changes: 5 additions & 0 deletions influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions()
self._write_service = WriteService(influxdb_client.api_client)
self._write_options = write_options
self._point_settings = point_settings

if influxdb_client.default_tags:
for key, value in influxdb_client.default_tags.items():
self._point_settings.add_default_tag(key, value)

if self._write_options.write_type is WriteType.batching:
# Define Subject that listen incoming data and produces writes into InfluxDB
self._subject = Subject()
Expand Down
10 changes: 10 additions & 0 deletions tests/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[influx2]
url=http://192.168.0.2:9999
org=my-org
token=my-token
timeout=6000

[tags]
id = 132-987-655
customer = California Miner
data_center = ${env.data_center}
94 changes: 87 additions & 7 deletions tests/test_WriteApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import time
from multiprocessing.pool import ApplyResult

from influxdb_client import Point, WritePrecision
from influxdb_client import Point, WritePrecision, InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS, PointSettings
from influxdb_client.rest import ApiException
from tests.base_test import BaseTest
Expand Down Expand Up @@ -77,7 +77,6 @@ def test_write_records_list(self):
self.write_client.write(bucket.name, self.org, record_list)

query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)'
print(query)

flux_result = self.client.query_api().query(query)

Expand Down Expand Up @@ -109,7 +108,6 @@ def test_write_points_unicode(self):
p.field(field_name, utf8_val)
p.tag(tag, tag_value)
record_list = [p]
print(record_list)

self.write_client.write(bucket.name, self.org, record_list)

Expand Down Expand Up @@ -147,7 +145,6 @@ def test_write_using_default_tags(self):
p2.time(2)

record_list = [p, p2]
print(record_list)

self.write_client.write(bucket.name, self.org, record_list)

Expand Down Expand Up @@ -304,7 +301,6 @@ def test_write_dictionaries(self):
time.sleep(1)

query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)'
print(query)

flux_result = self.client.query_api().query(query)

Expand Down Expand Up @@ -344,7 +340,6 @@ def test_use_default_tags_with_dictionaries(self):
time.sleep(1)

query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)'
print(query)

flux_result = self.client.query_api().query(query)

Expand Down Expand Up @@ -379,7 +374,6 @@ def test_write_bytes(self):
time.sleep(1)

query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)'
print(query)

flux_result = self.client.query_api().query(query)

Expand Down Expand Up @@ -444,5 +438,91 @@ def test_point_settings_with_add(self):
self.assertEqual("LA", default_tags.get("data_center"))


class DefaultTagsConfiguration(BaseTest):

def setUp(self) -> None:
super().setUp()

os.environ['data_center'] = "LA"

self.id_tag = "132-987-655"
self.customer_tag = "California Miner"
self.data_center_key = "data_center"

os.environ['INFLUXDB_V2_TOKEN'] = "my-token"
os.environ['INFLUXDB_V2_TIMEOUT'] = "6000"
os.environ['INFLUXDB_V2_ORG'] = "my-org"

os.environ['INFLUXDB_V2_TAG_ID'] = self.id_tag
os.environ['INFLUXDB_V2_TAG_CUSTOMER'] = self.customer_tag
os.environ['INFLUXDB_V2_TAG_DATA_CENTER'] = "${env.data_center}"

def tearDown(self) -> None:
self.write_client.__del__()
super().tearDown()

def test_connection_option_from_conf_file(self):
self.client.close()
self.client = InfluxDBClient.from_config_file(os.getcwd() + "/tests/config.ini", self.debug)

self._check_connection_settings()

def test_connection_option_from_env(self):
self.client.close()
self.client = InfluxDBClient.from_env_properties(self.debug)

self._check_connection_settings()

def _check_connection_settings(self):
self.write_client = self.client.write_api(write_options=SYNCHRONOUS)

self.assertEqual(os.getenv("INFLUXDB_V2_URL"), self.client.url)
self.assertEqual("my-org", self.client.org)
self.assertEqual("my-token", self.client.token)
self.assertEqual(6000, self.client.timeout)

def test_default_tags_from_conf_file(self):
self.client.close()
self.client = InfluxDBClient.from_config_file(os.getcwd() + "/tests/config.ini", self.debug)

self._write_point()

def test_default_tags_from_env(self):
self.client.close()
self.client = InfluxDBClient.from_env_properties(self.debug)

self._write_point()

def _write_point(self):
self.write_client = self.client.write_api(write_options=SYNCHRONOUS)

bucket = self.create_test_bucket()

measurement = "h2o_feet"
field_name = "water_level"
val = "1.0"
tag = "location"
tag_value = "creek level"

p = Point(measurement)
p.field(field_name, val)
p.tag(tag, tag_value)

record_list = [p]

self.write_client.write(bucket.name, self.org, record_list)

query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)'
flux_result = self.client.query_api().query(query)
self.assertEqual(1, len(flux_result))
rec = flux_result[0].records[0]

self.assertEqual(self.id_tag, rec["id"])
self.assertEqual(self.customer_tag, rec["customer"])
self.assertEqual("LA", rec[self.data_center_key])

self.delete_test_bucket(bucket)


if __name__ == '__main__':
unittest.main()