Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Org parameter can be specified as ID, Name or Organization Object #264

Merged
merged 3 commits into from
Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## 1.19.0 [unreleased]

### Features
1. [#264](https://github.com/influxdata/influxdb-client-python/pull/264): Org parameter can be specified as ID, Name or Organization Object [write, query]

### Deprecated
1. [#264](https://github.com/influxdata/influxdb-client-python/pull/264): Deprecated `org_id` options BucketsApi.create_bucket in favor of `org` parameter

## 1.18.0 [2021-06-04]

### Breaking Changes
Expand Down
9 changes: 2 additions & 7 deletions examples/buckets_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,19 @@
"""
url = "http://localhost:8086"
token = "my-token"
org = "my-org"

with InfluxDBClient(url=url, token=token) as client:
buckets_api = client.buckets_api()

"""
The Bucket API uses as a parameter the Organization ID. We have to retrieve ID by Organization API.
"""
org_name = "my-org"
org = client.organizations_api().find_organizations(org=org_name)[0]

"""
Create Bucket with retention policy set to 3600 seconds and name "bucket-by-python"
"""
print(f"------- Create -------\n")
retention_rules = BucketRetentionRules(type="expire", every_seconds=3600)
created_bucket = buckets_api.create_bucket(bucket_name="bucket-by-python",
retention_rules=retention_rules,
org_id=org.id)
org=org)
print(created_bucket)

"""
Expand Down
17 changes: 13 additions & 4 deletions influxdb_client/client/bucket_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
All buckets have a retention policy, a duration of time that each data point persists.
A bucket belongs to an organization.
"""

import warnings

from influxdb_client import BucketsService, Bucket, PostBucketRequest
from influxdb_client.client.util.helpers import get_org_query_param


class BucketsApi(object):
Expand All @@ -18,15 +19,18 @@ def __init__(self, influxdb_client):
self._buckets_service = BucketsService(influxdb_client.api_client)

def create_bucket(self, bucket=None, bucket_name=None, org_id=None, retention_rules=None,
description=None) -> Bucket:
description=None, org=None) -> Bucket:
"""Create a bucket.

:param Bucket bucket: bucket to create (required)
:param Bucket bucket: bucket to create
:param bucket_name: bucket name
:param description: bucket description
:param org_id: org_id
:param bucket_name: bucket name
:param retention_rules: retention rules array or single BucketRetentionRules
:param str, Organization org: specifies the organization for create the bucket;
take the ID, Name or Organization;
if it's not specified then is used default from client.org.
:return: Bucket
If the method is called asynchronously,
returns the request thread.
Expand All @@ -41,11 +45,16 @@ def create_bucket(self, bucket=None, bucket_name=None, org_id=None, retention_ru
else:
rules.append(retention_rules)

if org_id is not None:
warnings.warn("org_id is deprecated; use org", DeprecationWarning)

if bucket is None:
bucket = PostBucketRequest(name=bucket_name,
retention_rules=rules,
description=description,
org_id=self._influxdb_client.org if org_id is None else org_id)
org_id=get_org_query_param(org=(org_id if org is None else org),
client=self._influxdb_client,
required_id=True))

return self._buckets_service.post_buckets(post_bucket_request=bucket)

Expand Down
43 changes: 27 additions & 16 deletions influxdb_client/client/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from influxdb_client.client.flux_csv_parser import FluxCsvParser, FluxSerializationMode
from influxdb_client.client.flux_table import FluxTable, FluxRecord
from influxdb_client.client.util.date_utils import get_date_helper
from influxdb_client.client.util.helpers import get_org_query_param


class QueryOptions(object):
Expand Down Expand Up @@ -51,14 +52,15 @@ def query_csv(self, query: str, org=None, dialect: Dialect = default_dialect, pa
Execute the Flux query and return results as a CSV iterator. Each iteration returns a row of the CSV file.

:param query: a Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param str, Organization org: specifies the organization for executing the query;
take the ID, Name or Organization;
if it's not specified then is used default from client.org.
:param dialect: csv dialect format
:param params: bind parameters
:return: The returned object is an iterator. Each iteration returns a row of the CSV file
(which can span multiple input lines).
"""
if org is None:
org = self._influxdb_client.org
org = self._org_param(org)
response = self._query_api.post_query(org=org, query=self._create_query(query, dialect, params),
async_req=False, _preload_content=False)

Expand All @@ -69,13 +71,14 @@ def query_raw(self, query: str, org=None, dialect=default_dialect, params: dict
Execute synchronous Flux query and return result as raw unprocessed result as a str.

:param query: a Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param str, Organization org: specifies the organization for executing the query;
take the ID, Name or Organization;
if it's not specified then is used default from client.org.
:param dialect: csv dialect format
:param params: bind parameters
:return: str
"""
if org is None:
org = self._influxdb_client.org
org = self._org_param(org)
result = self._query_api.post_query(org=org, query=self._create_query(query, dialect, params), async_req=False,
_preload_content=False)

Expand All @@ -86,12 +89,13 @@ def query(self, query: str, org=None, params: dict = None) -> List['FluxTable']:
Execute synchronous Flux query and return result as a List['FluxTable'].

:param query: the Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param str, Organization org: specifies the organization for executing the query;
take the ID, Name or Organization;
if it's not specified then is used default from client.org.
:param params: bind parameters
:return:
"""
if org is None:
org = self._influxdb_client.org
org = self._org_param(org)

response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params),
async_req=False, _preload_content=False, _return_http_data_only=False)
Expand All @@ -108,12 +112,13 @@ def query_stream(self, query: str, org=None, params: dict = None) -> Generator['
Execute synchronous Flux query and return stream of FluxRecord as a Generator['FluxRecord'].

:param query: the Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param str, Organization org: specifies the organization for executing the query;
take the ID, Name or Organization;
if it's not specified then is used default from client.org.
:param params: bind parameters
:return:
"""
if org is None:
org = self._influxdb_client.org
org = self._org_param(org)

response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params),
async_req=False, _preload_content=False, _return_http_data_only=False)
Expand All @@ -129,7 +134,9 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N
Note that if a query returns more then one table than the client generates a DataFrame for each of them.

:param query: the Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param str, Organization org: specifies the organization for executing the query;
take the ID, Name or Organization;
if it's not specified then is used default from client.org.
:param data_frame_index: the list of columns that are used as DataFrame index
:param params: bind parameters
:return:
Expand All @@ -153,13 +160,14 @@ def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[s
Note that if a query returns more then one table than the client generates a DataFrame for each of them.

:param query: the Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param str, Organization org: specifies the organization for executing the query;
take the ID, Name or Organization;
if it's not specified then is used default from client.org.
:param data_frame_index: the list of columns that are used as DataFrame index
:param params: bind parameters
:return:
"""
if org is None:
org = self._influxdb_client.org
org = self._org_param(org)

response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect, params),
async_req=False, _preload_content=False, _return_http_data_only=False)
Expand Down Expand Up @@ -187,6 +195,9 @@ def _create_query(self, query, dialect=default_dialect, params: dict = None):

return q

def _org_param(self, org):
return get_org_query_param(org=org, client=self._influxdb_client)

@staticmethod
def _params_to_extern_ast(params: dict) -> List['OptionStatement']:

Expand Down
39 changes: 39 additions & 0 deletions influxdb_client/client/util/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Functions to share utility across client classes."""
from influxdb_client.rest import ApiException


def _is_id(value):
"""
Check if the value is valid InfluxDB ID.

:param value: to check
:return: True if provided parameter is valid InfluxDB ID.
"""
if value and len(value) == 16:
try:
int(value, 16)
return True
except ValueError:
return False
return False


def get_org_query_param(org, client, required_id=False):
"""
Get required type of Org query parameter.

:param str, Organization org: value provided as a parameter into API (optional)
:param InfluxDBClient client: with default value for Org parameter
:param bool required_id: true if the query param has to be a ID
:return: request type of org query parameter or None
"""
_org = client.org if org is None else org
if 'Organization' in type(_org).__name__:
_org = _org.id
if required_id and _org and not _is_id(_org):
try:
return client.organizations_api().find_organizations(org=_org)[0].id
except ApiException:
return None

return _org
9 changes: 5 additions & 4 deletions influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from rx.subject import Subject

from influxdb_client import WritePrecision, WriteService
from influxdb_client.client.util.helpers import get_org_query_param
from influxdb_client.client.write.dataframe_serializer import data_frame_to_list_of_points
from influxdb_client.client.write.point import Point, DEFAULT_WRITE_PRECISION
from influxdb_client.client.write.retry import WritesRetry
Expand Down Expand Up @@ -221,8 +222,9 @@ def write(self, bucket: str, org: str = None,
"""
Write time-series data into InfluxDB.

:param str org: specifies the destination organization for writes; take either the ID or Name interchangeably;
if both orgID and org are specified, org takes precedence. (required)
:param str, Organization org: specifies the destination organization for writes;
take the ID, Name or Organization;
if it's not specified then is used default from client.org.
:param str bucket: specifies the destination bucket for writes (required)
:param WritePrecision write_precision: specifies the precision for the unix timestamps within
the body line-protocol. The precision specified on a Point has precedes
Expand All @@ -231,8 +233,7 @@ def write(self, bucket: str, org: str = None,
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame
:key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
"""
if org is None:
org = self._influxdb_client.org
org = get_org_query_param(org=org, client=self._influxdb_client)

if self._point_settings.defaultTags and record is not None:
for key, val in self._point_settings.defaultTags.items():
Expand Down
8 changes: 2 additions & 6 deletions tests/base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,15 @@ def tearDown(self) -> None:

def create_test_bucket(self):
bucket_name = generate_bucket_name()
bucket = self.buckets_api.create_bucket(bucket_name=bucket_name, org_id=self.my_organization.id,
bucket = self.buckets_api.create_bucket(bucket_name=bucket_name, org=self.my_organization,
description=bucket_name + "description")
return bucket

def delete_test_bucket(self, bucket):
return self.buckets_api.delete_bucket(bucket)

def find_my_org(self) -> Organization:
org_api = influxdb_client.service.organizations_service.OrganizationsService(self.api_client)
orgs = org_api.get_orgs()
for org in orgs.orgs:
if org.name == self.org:
return org
return self.client.organizations_api().find_organizations(org=self.org)[0]

@staticmethod
def log(args):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_AuthorizationApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def test_createAuthorizationBucket(self):
organization = self.client.organizations_api().create_organization(self.generate_name("Auth Organization"))
bucket = self.client.buckets_api().create_bucket(bucket_name=self.generate_name("Auth Bucket"),
retention_rules=BaseTest.retention_rule(),
org_id=self.organization.id)
org=self.organization)
resource = PermissionResource(org_id=organization.id, type="buckets", id=bucket.id)
create_bucket = Permission(action="read", resource=resource)
delete_bucket = Permission(action="write", resource=resource)
Expand Down
12 changes: 6 additions & 6 deletions tests/test_BucketsApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def test_create_delete_bucket(self):
my_org = self.find_my_org()

bucket_name = generate_bucket_name()
my_bucket = self.buckets_api.create_bucket(bucket_name=bucket_name, org_id=my_org.id)
my_bucket = self.buckets_api.create_bucket(bucket_name=bucket_name, org=my_org)
self.assertEqual(my_bucket.name, bucket_name)
self.assertEqual(my_bucket.org_id, my_org.id)
print(my_bucket)
Expand All @@ -41,7 +41,7 @@ def test_find_by_name(self):
my_org = self.find_my_org()

bucket_name = generate_bucket_name()
my_bucket = self.buckets_api.create_bucket(bucket_name=bucket_name, org_id=my_org.id)
my_bucket = self.buckets_api.create_bucket(bucket_name=bucket_name, org=my_org)

bucket_by_name = self.buckets_api.find_bucket_by_name(bucket_name=my_bucket.name)

Expand All @@ -58,7 +58,7 @@ def test_create_bucket_retention(self):

retention = BucketRetentionRules(type="expire", every_seconds=3600)
desc = "bucket with retention"
my_bucket = self.buckets_api.create_bucket(bucket_name=bucket_name, org_id=my_org.id,
my_bucket = self.buckets_api.create_bucket(bucket_name=bucket_name, org=my_org,
retention_rules=retention, description=desc)

self.assertEqual(my_bucket.description, desc)
Expand All @@ -76,7 +76,7 @@ def test_create_bucket_retention_list(self):
retention.type = "expire"
ret_list.append(retention)

my_bucket = self.buckets_api.create_bucket(bucket_name=bucket_name, org_id=my_org.id,
my_bucket = self.buckets_api.create_bucket(bucket_name=bucket_name, org=my_org,
retention_rules=ret_list)

self.assertEqual(my_bucket.name, bucket_name)
Expand All @@ -89,8 +89,8 @@ def test_pagination(self):
size = len(buckets)

# create 2 buckets
self.buckets_api.create_bucket(bucket_name=generate_bucket_name(), org_id=my_org.id)
self.buckets_api.create_bucket(bucket_name=generate_bucket_name(), org_id=my_org.id)
self.buckets_api.create_bucket(bucket_name=generate_bucket_name(), org=my_org)
self.buckets_api.create_bucket(bucket_name=generate_bucket_name(), org=my_org)

buckets = self.buckets_api.find_buckets().buckets
self.assertEqual(size + 2, len(buckets))
Expand Down
38 changes: 38 additions & 0 deletions tests/test_Helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from influxdb_client import InfluxDBClient, Organization
# noinspection PyProtectedMember
from influxdb_client.client.util.helpers import get_org_query_param, _is_id
from tests.base_test import BaseTest


class HelpersTest(BaseTest):

def test_is_id(self):
self.assertTrue(_is_id("ffffffffffffffff"))
self.assertTrue(_is_id("020f755c3c082000"))
self.assertTrue(_is_id("ca55e77eca55e77e"))
self.assertTrue(_is_id("02def021097c6000"))
self.assertFalse(_is_id("gggggggggggggggg"))
self.assertFalse(_is_id("abc"))
self.assertFalse(_is_id("abcdabcdabcdabcd0"))
self.assertFalse(_is_id("020f75"))
self.assertFalse(_is_id("020f755c3c082000aaa"))
self.assertFalse(_is_id(None))

def test_organization_as_query_param(self):
organization = Organization(id="org-id", name="org-name")
org = get_org_query_param(organization, self.client)
self.assertEqual("org-id", org)

def test_required_id(self):
org = get_org_query_param(None, self.client, required_id=True)
self.assertEqual(self.my_organization.id, org)

def test_required_id_not_exist(self):
org = get_org_query_param("not_exist_name", self.client, required_id=True)
self.assertIsNone(org)

def test_both_none(self):
self.client.close()
self.client = InfluxDBClient(url=self.client.url, token="my-token")
org = get_org_query_param(None, self.client)
self.assertIsNone(org)