Skip to content

Commit

Permalink
Add a callback to get_parameter_data to follow data loading (#4688)
Browse files Browse the repository at this point in the history
* Add a callback to get_parameter_data to follow data loading

* Make mypy happy

* Replace CamelCase  by snake case

* Simplify typing

* Typing more precise

* Move callback_percent parameter to qcodesrc.json and add the matching schema file

* Fix some forgotten typing of callback

* Put back `many` and `many_many` previous signature

Moved the appending of data in the `get_parameter_tree_values` function.

* Add newsfragment file  explaining the new callback parameter

* Remove unnecessary loop, fix mypy issue

* add callback to make mypy happy

* Improve description

* Remove useless import

* Delete useless file

* Breakdown logics in smaller functions

iteration is directly taken from config

* Add (incorrect) test of callback

* Run darker

* fix missing type

* Take into account null column

* extend callback test to cover multiple num params and < 100 rows

* Improvement of callback algorithm

* Handle small database with less than 100 rows
* Handle non commensurable tuple of row to be downloaded with percentage of progress
* Adapt test for small database with less than 100 rows

Co-authored-by: Jens H. Nielsen <[email protected]>
Co-authored-by: Jens Hedegaard Nielsen <[email protected]>
  • Loading branch information
3 people authored Nov 28, 2022
1 parent 544295d commit 34715d9
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 32 deletions.
6 changes: 6 additions & 0 deletions docs/changes/newsfragments/4688.new
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Add a callback to ``dataset.get_parameter_data``.
This is usefull to track the progress of the data download.
Since sqlite3 does not allow to keep track of the data loading progress,
we compute how many sqlite requests correspond to a certain percentage of
progress which is dictated by a config parameter "callback_percent".
Then we perform x sql request instead of one, running the callback everytime.
1 change: 1 addition & 0 deletions qcodes/configuration/qcodesrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
"use_threads": false,
"dond_plot": false,
"dond_show_progress": false,
"callback_percent" : 5.0,
"export_automatic": false,
"export_type": null,
"export_prefix": "qcodes_",
Expand Down
5 changes: 5 additions & 0 deletions qcodes/configuration/qcodesrc_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@
"default": false,
"description": "Should dond functions show a progress bar during the measurement"
},
"callback_percent": {
"type": "number",
"default": 5.0,
"description": "If user wants to callback a function while loading data, the callback is done every callback_percent"
},
"export_automatic": {
"type": "boolean",
"default": false,
Expand Down
8 changes: 6 additions & 2 deletions qcodes/dataset/data_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@ def get_parameter_data(
*params: str | ParamSpec | ParameterBase,
start: int | None = None,
end: int | None = None,
callback: Callable[[float], None] | None = None,
) -> ParameterData:
"""
Returns the values stored in the :class:`.DataSet` for the specified parameters
Expand Down Expand Up @@ -801,6 +802,8 @@ def get_parameter_data(
if None
end: end value of selection range (by results count); ignored if
None
callback: Function called during the data loading every
config.dataset.callback_percent.
Returns:
Dictionary from requested parameters to Dict of parameter names
Expand All @@ -812,8 +815,9 @@ def get_parameter_data(
for ps in self._rundescriber.interdeps.non_dependencies]
else:
valid_param_names = self._validate_parameters(*params)
return get_parameter_data(self.conn, self.table_name,
valid_param_names, start, end)
return get_parameter_data(
self.conn, self.table_name, valid_param_names, start, end, callback
)

def to_pandas_dataframe_dict(
self,
Expand Down
3 changes: 2 additions & 1 deletion qcodes/dataset/data_set_in_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import warnings
from collections.abc import Mapping, Sequence
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Callable

import numpy as np

Expand Down Expand Up @@ -844,6 +844,7 @@ def get_parameter_data(
*params: str | ParamSpec | ParameterBase,
start: int | None = None,
end: int | None = None,
callback: Callable[[float], None] | None = None,
) -> ParameterData:
self._warn_if_set(*params, start=start, end=end)
return self.cache.data()
Expand Down
2 changes: 2 additions & 0 deletions qcodes/dataset/data_set_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Protocol,
Expand Down Expand Up @@ -231,6 +232,7 @@ def get_parameter_data(
*params: str | ParamSpec | ParameterBase,
start: int | None = None,
end: int | None = None,
callback: Callable[[float], None] | None = None,
) -> ParameterData:
pass

Expand Down
193 changes: 166 additions & 27 deletions qcodes/dataset/sqlite/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing_extensions import TypedDict

import qcodes as qc
from qcodes import config
from qcodes.dataset.descriptions.dependencies import InterDependencies_
from qcodes.dataset.descriptions.param_spec import ParamSpec, ParamSpecBase
from qcodes.dataset.descriptions.rundescriber import RunDescriber
Expand Down Expand Up @@ -174,6 +175,7 @@ def get_parameter_data(
columns: Sequence[str] = (),
start: int | None = None,
end: int | None = None,
callback: Callable[[float], None] | None = None,
) -> dict[str, dict[str, np.ndarray]]:
"""
Get data for one or more parameters and its dependencies. The data
Expand All @@ -199,6 +201,8 @@ def get_parameter_data(
are returned.
start: start of range; if None, then starts from the top of the table
end: end of range; if None, then ends at the bottom of the table
callback: Function called during the data loading every
config.dataset.callback_percent.
"""
rundescriber = get_rundescriber_from_result_table_name(conn, table_name)

Expand All @@ -209,12 +213,8 @@ def get_parameter_data(
# loop over all the requested parameters
for output_param in columns:
output[output_param] = get_shaped_parameter_data_for_one_paramtree(
conn,
table_name,
rundescriber,
output_param,
start,
end)
conn, table_name, rundescriber, output_param, start, end, callback
)
return output


Expand All @@ -225,6 +225,7 @@ def get_shaped_parameter_data_for_one_paramtree(
output_param: str,
start: int | None,
end: int | None,
callback: Callable[[float], None] | None = None,
) -> dict[str, np.ndarray]:
"""
Get the data for a parameter tree and reshape it according to the
Expand All @@ -236,12 +237,7 @@ def get_shaped_parameter_data_for_one_paramtree(
"""

one_param_output, _ = get_parameter_data_for_one_paramtree(
conn,
table_name,
rundescriber,
output_param,
start,
end
conn, table_name, rundescriber, output_param, start, end, callback
)
if rundescriber.shapes is not None:
shape = rundescriber.shapes.get(output_param)
Expand Down Expand Up @@ -288,10 +284,11 @@ def get_parameter_data_for_one_paramtree(
output_param: str,
start: int | None,
end: int | None,
callback: Callable[[float], None] | None = None,
) -> tuple[dict[str, np.ndarray], int]:
interdeps = rundescriber.interdeps
data, paramspecs, n_rows = _get_data_for_one_param_tree(
conn, table_name, interdeps, output_param, start, end
conn, table_name, interdeps, output_param, start, end, callback
)
if not paramspecs[0].name == output_param:
raise ValueError("output_param should always be the first "
Expand Down Expand Up @@ -389,19 +386,23 @@ def _get_data_for_one_param_tree(
output_param: str,
start: int | None,
end: int | None,
callback: Callable[[float], None] | None = None,
) -> tuple[list[tuple[Any, ...]], list[ParamSpecBase], int]:
output_param_spec = interdeps._id_to_paramspec[output_param]
# find all the dependencies of this param

dependency_params = list(interdeps.dependencies.get(output_param_spec, ()))
dependency_names = [param.name for param in dependency_params]
paramspecs = [output_param_spec] + dependency_params
res = get_parameter_tree_values(conn,
table_name,
output_param,
*dependency_names,
start=start,
end=end)
res = get_parameter_tree_values(
conn,
table_name,
output_param,
*dependency_names,
start=start,
end=end,
callback=callback,
)
n_rows = len(res)
return res, paramspecs, n_rows

Expand Down Expand Up @@ -434,13 +435,106 @@ def get_values(
return res


def get_parameter_db_row(conn: ConnectionPlus, table_name: str, param_name: str) -> int:
"""
Get the total number of not-null values of a parameter
Args:
conn: Connection to the database
table_name: Name of the table that holds the data
param_name: Name of the parameter to get the setpoints of
Returns:
The total number of not-null values
"""
sql = f"""
SELECT COUNT({param_name}) FROM "{table_name}"
WHERE {param_name} IS NOT NULL
"""
c = atomic_transaction(conn, sql)

return one(c, 0)


def get_table_max_id(conn: ConnectionPlus, table_name: str) -> int:
"""
Get the max id of a table
Args:
conn: Connection to the database
table_name: Name of the table that holds the data
Returns:
The max id of a table
"""
sql = f"""
SELECT MAX(id)
FROM "{table_name}"
"""
c = atomic_transaction(conn, sql)

return one(c, 0)


def _get_offset_limit_for_callback(
conn: ConnectionPlus, table_name: str, param_name: str
) -> tuple[np.ndarray, np.ndarray]:
"""
Since sqlite3 does not allow to keep track of the data loading progress,
we compute how many sqlite request correspond to a progress of
config.dataset.callback_percent.
This function return a list of offset and a integer value of limit to
be used to run such SQL requests.
Args:
conn: Connection to the database
table_name: Name of the table that holds the data
param_name: Name of the parameter to get the setpoints of
Returns:
offset: list of SQL offset corresponding to a progress of
config.dataset.callback_percent
limit: SQL limit corresponding to a progress of
config.dataset.callback_percent
"""

# First, we get the number of row to be downloaded for the wanted
# dependent parameter
nb_row = get_parameter_db_row(conn, table_name, param_name)

# Second, we get the max id of the table
max_id = get_table_max_id(conn, table_name)

# Third, we create a list of offset corresponding to a progress of
# config.dataset.callback_percent
if nb_row >= 100:

# Using linspace with dtype=int ensure of having an array finishing
# by max_id
offset = np.linspace(
0, max_id, int(100 / config.dataset.callback_percent) + 1, dtype=int
)

else:
# If there is less than 100 row to be downloaded, we overwrite the
# config.dataset.callback_percent to avoid many calls for small download
offset = np.array([0, nb_row // 2, nb_row])

# The number of row downloaded between two iterations may vary
# We compute the limit corresponding to each offset
limit = offset[1:] - offset[:-1]

return offset, limit


def get_parameter_tree_values(
conn: ConnectionPlus,
result_table_name: str,
toplevel_param_name: str,
*other_param_names: str,
start: int | None = None,
end: int | None = None,
callback: Callable[[float], None] | None = None,
) -> list[tuple[Any, ...]]:
"""
Get the values of one or more columns from a data table. The rows
Expand All @@ -461,27 +555,71 @@ def get_parameter_tree_values(
end: The (1-indexed) result to include as the last result to be
returned. None is equivalent to "all the rest". If start > end,
nothing is returned.
callback: Function called during the data loading every
config.dataset.callback_percent.
Returns:
A list of list. The outer list index is row number, the inner list
index is parameter value (first toplevel_param, then other_param_names)
"""

cursor = conn.cursor()

# Without callback: int
# With callback: np.ndarray
offset: int | np.ndarray
limit: int | np.ndarray

offset = max((start - 1), 0) if start is not None else 0
limit = max((end - offset), 0) if end is not None else -1

if start is not None and end is not None and start > end:
limit = 0

# start and end currently not working with callback
if start is None and end is None and callback is not None:
offset, limit = _get_offset_limit_for_callback(
conn, result_table_name, toplevel_param_name
)

# Create the base sql query
columns = [toplevel_param_name] + list(other_param_names)
sql = f"""
SELECT {','.join(columns)} FROM "{result_table_name}"
WHERE {toplevel_param_name} IS NOT NULL
LIMIT ? OFFSET ?
"""
cursor = conn.cursor()
cursor.execute(sql, (limit, offset))
return many_many(cursor, *columns)
SELECT {','.join(columns)} FROM "{result_table_name}"
WHERE {toplevel_param_name} IS NOT NULL
LIMIT ? OFFSET ?
"""

# Request if no callback
if callback is None:

cursor.execute(sql, (limit, offset))
res = many_many(cursor, *columns)

# Request if callback
elif callback is not None:
assert isinstance(offset, np.ndarray)
assert isinstance(limit, np.ndarray)
progress_current = 100 / len(limit)

# 0
progress_total = 0.0
callback(progress_total)

# 1
cursor.execute(sql, (limit[0], offset[0]))
res = many_many(cursor, *columns)
progress_total += progress_current
callback(progress_total)

# others
for i in range(1, len(offset) - 1):
cursor.execute(sql, (limit[i], offset[i]))
res.extend(many_many(cursor, *columns))
progress_total += progress_current
callback(progress_total)

return res


@deprecate(alternative="get_parameter_data")
Expand Down Expand Up @@ -2068,7 +2206,8 @@ def load_new_data_for_rundescriber(
rundescriber=rundescriber,
output_param=meas_parameter,
start=start,
end=None
end=None,
callback=None,
)
new_data_dict[meas_parameter] = new_data
updated_read_status[meas_parameter] = start + n_rows_read - 1
Expand Down
Loading

0 comments on commit 34715d9

Please sign in to comment.