Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add query_data_frame_stream method #121

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Features
1. [#112](https://github.com/influxdata/influxdb-client-python/pull/113): Support timestamp with different timezone in _convert_timestamp
1. [#120](https://github.com/influxdata/influxdb-client-python/pull/120): ciso8601 is an optional dependency and has to be installed separably
1. [#121](https://github.com/influxdata/influxdb-client-python/pull/121): Added query_data_frame_stream method

### Bug Fixes
1. [#117](https://github.com/influxdata/influxdb-client-python/pull/117): Fixed appending default tags for single Point
Expand Down
30 changes: 22 additions & 8 deletions influxdb_client/client/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,27 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N

from ..extras import pd

_generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index)
_dataFrames = list(_generator)

if len(_dataFrames) == 0:
return pd.DataFrame(columns=[], index=None)
elif len(_dataFrames) == 1:
return _dataFrames[0]
else:
return _dataFrames

def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[str] = None):
"""
Synchronously executes the Flux query and return stream of Pandas DataFrame as a Generator['pd.DataFrame'].
Note that if a query returns more then one table than the client generates a DataFrame for each of them.

:param query: the Flux query
:param org: organization name (optional if already specified in InfluxDBClient)
:param data_frame_index: the list of columns that are used as DataFrame index
:return:
"""

if org is None:
org = self._influxdb_client.org

Expand All @@ -113,14 +134,7 @@ def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = N

_parser = FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame,
data_frame_index=data_frame_index)
_dataFrames = list(_parser.generator())

if len(_dataFrames) == 0:
return pd.DataFrame(columns=[], index=None)
elif len(_dataFrames) == 1:
return _dataFrames[0]
else:
return _dataFrames
return _parser.generator()

# private helper for c
@staticmethod
Expand Down