diff --git a/doc/source/python-api.rst b/doc/source/python-api.rst index 27a16d7568..111c7b6419 100644 --- a/doc/source/python-api.rst +++ b/doc/source/python-api.rst @@ -142,7 +142,7 @@ Sparse Array Query --------------- -.. autoclass:: tiledb.libtiledb.Query +.. autoclass:: tiledb.Query :members: Query Condition diff --git a/tiledb/cc/query.cc b/tiledb/cc/query.cc index 145277c33f..db1bfc7917 100644 --- a/tiledb/cc/query.cc +++ b/tiledb/cc/query.cc @@ -96,7 +96,7 @@ void init_query(py::module &m) { q.set_validity_buffer(name, (uint8_t *)(a.data()), a.size()); }) - .def("submit", &Query::submit, py::call_guard()) + .def("_submit", &Query::submit, py::call_guard()) /** hackery from another branch... */ //.def("set_fragment_uri", &Query::set_fragment_uri) diff --git a/tiledb/dense_array.py b/tiledb/dense_array.py index 7d2dd7c6fa..9957f67cad 100644 --- a/tiledb/dense_array.py +++ b/tiledb/dense_array.py @@ -14,7 +14,7 @@ replace_ellipsis, replace_scalars_slice, ) -from .libtiledb import Query +from .query import Query from .subarray import Subarray @@ -173,7 +173,7 @@ def query( attrs=attrs, cond=cond, dims=dims, - coords=coords, + has_coords=coords, order=order, use_arrow=use_arrow, return_arrow=return_arrow, @@ -666,7 +666,7 @@ def read_subarray(self, subarray): if has_labels: label_query = Query(self, self.ctx) label_query.set_subarray(subarray) - label_query.submit() + label_query._submit() if not label_query.is_complete(): raise tiledb.TileDBError("Failed to get dimension ranges from labels") result_subarray = Subarray(self, self.ctx) diff --git a/tiledb/domain_indexer.py b/tiledb/domain_indexer.py index 8796eb84ca..d6c178de65 100644 --- a/tiledb/domain_indexer.py +++ b/tiledb/domain_indexer.py @@ -90,7 +90,7 @@ def __getitem__(self, idx): self.query.attrs if self.query.attrs else attr_names ) # query.attrs might be None -> all attr_cond = self.query.attr_cond - coords = self.query.coords + coords = self.query.has_coords if coords: attr_names = [ diff --git a/tiledb/libtiledb.pxd b/tiledb/libtiledb.pxd index efb4571927..2b49ef672b 100644 --- a/tiledb/libtiledb.pxd +++ b/tiledb/libtiledb.pxd @@ -1100,7 +1100,6 @@ cdef extern from "tiledb/tiledb_experimental.h": ) # Free helper functions -cpdef unicode ustring(object s) cdef _raise_tiledb_error(tiledb_error_t* err_ptr) cdef _raise_ctx_err(tiledb_ctx_t* ctx_ptr, int rc) diff --git a/tiledb/libtiledb.pyx b/tiledb/libtiledb.pyx index bdd8dc389f..081562972f 100644 --- a/tiledb/libtiledb.pyx +++ b/tiledb/libtiledb.pyx @@ -7,15 +7,9 @@ from cpython.version cimport PY_MAJOR_VERSION include "common.pxi" include "indexing.pyx" -import collections.abc -from json import loads as json_loads from .cc import TileDBError from .ctx import Config, Ctx, default_ctx -from .domain_indexer import DomainIndexer -from .vfs import VFS -from .sparse_array import SparseArrayImpl -from .dense_array import DenseArrayImpl from .array import Array ############################################################################### @@ -349,269 +343,6 @@ cdef _raise_ctx_err(tiledb_ctx_t* ctx_ptr, int rc): _raise_tiledb_error(err_ptr) -cpdef unicode ustring(object s): - """Coerce a python object to a unicode string""" - - if type(s) is unicode: - return s - elif PY_MAJOR_VERSION < 3 and isinstance(s, bytes): - return ( s).decode('ascii') - elif isinstance(s, unicode): - return unicode(s) - raise TypeError( - "ustring() must be a string or a bytes-like object" - ", not {0!r}".format(type(s))) - - -cdef bytes unicode_path(object path): - """Returns a UTF-8 encoded byte representation of a given URI path string""" - return ustring(path).encode('UTF-8') - - -############################################################################### -# # -# CLASS DEFINITIONS # -# # -############################################################################### - - -cdef class Query(object): - """ - Proxy object returned by query() to index into original array - on a subselection of attribute in a defined layout order - - See documentation of Array.query - """ - - def __init__(self, array, attrs=None, cond=None, dims=None, - coords=False, index_col=True, order=None, - use_arrow=None, return_arrow=False, return_incomplete=False): - if array.mode not in ('r', 'd'): - raise ValueError("array mode must be read or delete mode") - - if dims is not None and coords == True: - raise ValueError("Cannot pass both dims and coords=True to Query") - - cdef list dims_to_set = list() - - if dims is False: - self.dims = False - elif dims != None and dims != True: - domain = array.schema.domain - for dname in dims: - if not domain.has_dim(dname): - raise TileDBError(f"Selected dimension does not exist: '{dname}'") - self.dims = [unicode(dname) for dname in dims] - elif coords == True or dims == True: - domain = array.schema.domain - self.dims = [domain.dim(i).name for i in range(domain.ndim)] - - if attrs is not None: - for name in attrs: - if not array.schema.has_attr(name): - raise TileDBError(f"Selected attribute does not exist: '{name}'") - self.attrs = attrs - self.cond = cond - - if order == None: - if array.schema.sparse: - self.order = 'U' # unordered - else: - self.order = 'C' # row-major - else: - self.order = order - - # reference to the array we are querying - self.array = array - self.coords = coords - self.index_col = index_col - self.return_arrow = return_arrow - if return_arrow: - if use_arrow is None: - use_arrow = True - if not use_arrow: - raise TileDBError("Cannot initialize return_arrow with use_arrow=False") - self.use_arrow = use_arrow - - if return_incomplete and not array.schema.sparse: - raise TileDBError("Incomplete queries are only supported for sparse arrays at this time") - - self.return_incomplete = return_incomplete - - self.domain_index = DomainIndexer(array, query=self) - - def __getitem__(self, object selection): - if self.return_arrow: - raise TileDBError("`return_arrow=True` requires .df indexer`") - - return self.array.subarray(selection, - attrs=self.attrs, - cond=self.cond, - coords=self.coords if self.coords else self.dims, - order=self.order) - - def agg(self, aggs): - """ - Calculate an aggregate operation for a given attribute. Available - operations are sum, min, max, mean, count, and null_count (for nullable - attributes only). Aggregates may be combined with other query operations - such as query conditions and slicing. - - The input may be a single operation, a list of operations, or a - dictionary with attribute mapping to a single operation or list of - operations. - - For undefined operations on max and min, which can occur when a nullable - attribute contains only nulled data at the given coordinates or when - there is no data read for the given query (e.g. query conditions that do - not match any values or coordinates that contain no data)), invalid - results are represented as np.nan for attributes of floating point types - and None for integer types. - - >>> import tiledb, tempfile, numpy as np - >>> path = tempfile.mkdtemp() - - >>> with tiledb.from_numpy(path, np.arange(1, 10)) as A: - ... pass - - >>> # Note that tiledb.from_numpy creates anonymous attributes, so the - >>> # name of the attribute is represented as an empty string - - >>> with tiledb.open(path, 'r') as A: - ... A.query().agg("sum")[:] - 45 - - >>> with tiledb.open(path, 'r') as A: - ... A.query(cond="attr('') < 5").agg(["count", "mean"])[:] - {'count': 9, 'mean': 2.5} - - >>> with tiledb.open(path, 'r') as A: - ... A.query().agg({"": ["max", "min"]})[2:7] - {'max': 7, 'min': 3} - - :param agg: The input attributes and operations to apply aggregations on - :returns: single value for single operation on one attribute, a dictionary - of attribute keys associated with a single value for a single operation - across multiple attributes, or a dictionary of attribute keys that maps - to a dictionary of operation labels with the associated value - """ - schema = self.array.schema - attr_to_aggs_map = {} - if isinstance(aggs, dict): - attr_to_aggs_map = { - a: ( - tuple([aggs[a]]) - if isinstance(aggs[a], str) - else tuple(aggs[a]) - ) - for a in aggs - } - elif isinstance(aggs, str): - attrs = tuple(schema.attr(i).name for i in range(schema.nattr)) - attr_to_aggs_map = {a: (aggs,) for a in attrs} - elif isinstance(aggs, collections.abc.Sequence): - attrs = tuple(schema.attr(i).name for i in range(schema.nattr)) - attr_to_aggs_map = {a: tuple(aggs) for a in attrs} - - from .aggregation import Aggregation - return Aggregation(self, attr_to_aggs_map) - - @property - def array(self): - return self.array - - @property - def attrs(self): - """List of attributes to include in Query.""" - return self.attrs - - @property - def cond(self): - """QueryCondition used to filter attributes or dimensions in Query.""" - return self.cond - - @property - def dims(self): - """List of dimensions to include in Query.""" - return self.dims - - @property - def coords(self): - """ - True if query should include (return) coordinate values. - - :rtype: bool - """ - return self.coords - - @property - def order(self): - """Return underlying Array order.""" - return self.order - - @property - def index_col(self): - """List of columns to set as index for dataframe queries, or None.""" - return self.index_col - - @property - def use_arrow(self): - return self.use_arrow - - @property - def return_arrow(self): - return self.return_arrow - - @property - def return_incomplete(self): - return self.return_incomplete - - @property - def domain_index(self): - """Apply Array.domain_index with query parameters.""" - return self.domain_index - - def label_index(self, labels): - """Apply Array.label_index with query parameters.""" - from .multirange_indexing import LabelIndexer - return LabelIndexer(self.array, tuple(labels), query=self) - - @property - def multi_index(self): - """Apply Array.multi_index with query parameters.""" - # Delayed to avoid circular import - from .multirange_indexing import MultiRangeIndexer - return MultiRangeIndexer(self.array, query=self) - - @property - def df(self): - """Apply Array.multi_index with query parameters and return result - as a Pandas dataframe.""" - # Delayed to avoid circular import - from .multirange_indexing import DataFrameIndexer - return DataFrameIndexer(self.array, query=self, use_arrow=self.use_arrow) - - def get_stats(self, print_out=True, json=False): - """Retrieves the stats from a TileDB query. - - :param print_out: Print string to console (default True), or return as string - :param json: Return stats JSON object (default: False) - """ - pyquery = self.array.pyquery - if pyquery is None: - return "" - stats = self.array.pyquery.get_stats() - if json: - stats = json_loads(stats) - if print_out: - print(stats) - else: - return stats - - def submit(self): - """An alias for calling the regular indexer [:]""" - return self[:] - def write_direct_dense(self: Array, np.ndarray array not None, **kw): """ Write directly to given array attribute with minimal checks, diff --git a/tiledb/multirange_indexing.py b/tiledb/multirange_indexing.py index 4eb267a12f..4400d55d0a 100644 --- a/tiledb/multirange_indexing.py +++ b/tiledb/multirange_indexing.py @@ -29,10 +29,10 @@ from .array_schema import ArraySchema from .cc import TileDBError from .dataframe_ import check_dataframe_deps -from .libtiledb import Query as QueryProxy from .main import PyAgg, PyQuery, increment_stat, use_stats from .metadata import Metadata from .query import Query +from .query import Query as QueryProxy from .query_condition import QueryCondition from .subarray import Subarray @@ -422,7 +422,7 @@ def __init__( check_dataframe_deps() # we need to use a Query in order to get coords for a dense array if not query: - query = QueryProxy(array, coords=True) + query = QueryProxy(array, has_coords=True) use_arrow = ( bool(importlib.util.find_spec("pyarrow")) if use_arrow is None @@ -586,7 +586,7 @@ def _run_query(self) -> Dict[str, np.ndarray]: # If querying by label and the label query is not yet complete, run the label # query and update the pyquery with the actual dimensions. if self.label_query is not None and not self.label_query.is_complete(): - self.label_query.submit() + self.label_query._submit() if not self.label_query.is_complete(): raise TileDBError("failed to get dimension ranges from labels") @@ -687,7 +687,7 @@ def _iter_dim_names( if query is not None: if query.dims is not None: return iter(query.dims or ()) - if query.coords is False: + if query.has_coords is False: return iter(()) if not schema.sparse: return iter(()) diff --git a/tiledb/query.py b/tiledb/query.py index 5e8676ccd4..a44d0a0c40 100644 --- a/tiledb/query.py +++ b/tiledb/query.py @@ -1,7 +1,13 @@ +from collections.abc import Sequence +from json import loads as json_loads +from typing import Optional, Sequence, Union + import tiledb.cc as lt +from tiledb import TileDBError from .array import Array from .ctx import Ctx, CtxMixin, default_ctx +from .domain_indexer import DomainIndexer from .subarray import Subarray @@ -13,21 +19,295 @@ class Query(CtxMixin, lt.Query): def __init__( self, array: Array, - ctx: Ctx = None, + ctx: Optional[Ctx] = None, + attrs: Optional[Union[Sequence[str], Sequence[int]]] = None, + cond: Optional[str] = None, + dims: Union[bool, Sequence[str]] = False, + has_coords: bool = False, + index_col: Optional[Union[bool, Sequence[int]]] = True, + order: Optional[str] = None, + use_arrow: Optional[bool] = None, + return_arrow: bool = False, + return_incomplete: bool = False, ): """Class representing a query on a TileDB Array. - :param array: tiledb.Array the query is on - :param ctx: A TileDB Context + Allows easy subarray queries of cells for an item or region of the array + across one or more attributes. Optionally subselect over attributes, return + dense result coordinate values, and specify a layout a result layout / cell-order. + + :param array: the Array object to query. + :param ctx: the TileDB context. + :param attrs: the attributes to subselect over. + If attrs is None (default) all array attributes will be returned. + Array attributes can be defined by name or by positional index. + :param cond: the str expression to filter attributes or dimensions on. The expression must be parsable by tiledb.QueryCondition(). See help(tiledb.QueryCondition) for more details. + :param dims: the dimensions to subselect over. If dims is False (default), no specific selection is made. + If True, all dimensions are returned. Otherwise, specify a list of dimension names. + :param has_coords: (deprecated) if True, return array of coordinate value (default False). + :param index_col: For dataframe queries, override the saved index information, + and only set specified index(es) in the final dataframe, or None. + :param order: 'C', 'F', or 'G' (row-major, col-major, tiledb global order) + :param use_arrow: if True, return dataframes via PyArrow if applicable. + :param return_arrow: if True, return results as a PyArrow Table if applicable. + :param return_incomplete: if True, initialize and return an iterable Query object over the indexed range. + Consuming this iterable returns a result set for each TileDB incomplete query. + If False (default), queries will be internally run to completion by resizing buffers and + resubmitting until query is complete. """ + + if array.mode not in ("r", "d"): + raise ValueError("array mode must be read or delete mode") + + if dims not in (False, None) and has_coords == True: + raise ValueError("Cannot pass both dims and has_coords=True to Query") + + if return_incomplete and not array.schema.sparse: + raise TileDBError( + "Incomplete queries are only supported for sparse arrays at this time" + ) + + # reference to the array we are querying self._array = array super().__init__( ctx, lt.Array(ctx if ctx is not None else default_ctx(), array) ) + self._dims = dims + + if dims == True or has_coords == True: + domain = array.schema.domain + self._dims = [domain.dim(i).name for i in range(domain.ndim)] + elif dims: + domain = array.schema.domain + for dname in dims: + if not domain.has_dim(dname): + raise TileDBError(f"Selected dimension does not exist: '{dname}'") + self._dims = dims + else: + self._dims = None + + if attrs is not None: + for name in attrs: + if not array.schema.has_attr(name): + raise TileDBError(f"Selected attribute does not exist: '{name}'") + self._attrs = attrs + self._cond = cond + + if order == None: + if array.schema.sparse: + self._order = "U" # unordered + else: + self._order = "C" # row-major + else: + self._order = order + + self._has_coords = has_coords + self._index_col = index_col + self._return_arrow = return_arrow + if return_arrow: + if use_arrow is None: + use_arrow = True + if not use_arrow: + raise TileDBError("Cannot initialize return_arrow with use_arrow=False") + self._use_arrow = use_arrow + + self._return_incomplete = return_incomplete + self._domain_index = DomainIndexer(array, query=self) + def subarray(self) -> Subarray: """Subarray with the ranges this query is on. :rtype: Subarray """ return Subarray.from_pybind11(self._ctx, self._subarray) + + def __getitem__(self, selection): + if self._return_arrow: + raise TileDBError("`return_arrow=True` requires .df indexer`") + + return self._array.subarray( + selection, + attrs=self._attrs, + cond=self._cond, + coords=self._has_coords if self._has_coords else self._dims, + order=self._order, + ) + + def agg(self, aggs): + """ + Calculate an aggregate operation for a given attribute. Available + operations are sum, min, max, mean, count, and null_count (for nullable + attributes only). Aggregates may be combined with other query operations + such as query conditions and slicing. + + The input may be a single operation, a list of operations, or a + dictionary with attribute mapping to a single operation or list of + operations. + + For undefined operations on max and min, which can occur when a nullable + attribute contains only nulled data at the given coordinates or when + there is no data read for the given query (e.g. query conditions that do + not match any values or coordinates that contain no data)), invalid + results are represented as np.nan for attributes of floating point types + and None for integer types. + + >>> import tiledb, tempfile, numpy as np + >>> path = tempfile.mkdtemp() + + >>> with tiledb.from_numpy(path, np.arange(1, 10)) as A: + ... pass + + >>> # Note that tiledb.from_numpy creates anonymous attributes, so the + >>> # name of the attribute is represented as an empty string + + >>> with tiledb.open(path, 'r') as A: + ... A.query().agg("sum")[:] + 45 + + >>> with tiledb.open(path, 'r') as A: + ... A.query(cond="attr('') < 5").agg(["count", "mean"])[:] + {'count': 9, 'mean': 2.5} + + >>> with tiledb.open(path, 'r') as A: + ... A.query().agg({"": ["max", "min"]})[2:7] + {'max': 7, 'min': 3} + + :param agg: The input attributes and operations to apply aggregations on + :returns: single value for single operation on one attribute, a dictionary + of attribute keys associated with a single value for a single operation + across multiple attributes, or a dictionary of attribute keys that maps + to a dictionary of operation labels with the associated value + """ + schema = self._array.schema + attr_to_aggs_map = {} + if isinstance(aggs, dict): + attr_to_aggs_map = { + a: (tuple([aggs[a]]) if isinstance(aggs[a], str) else tuple(aggs[a])) + for a in aggs + } + elif isinstance(aggs, str): + attrs = tuple(schema.attr(i).name for i in range(schema.nattr)) + attr_to_aggs_map = {a: (aggs,) for a in attrs} + elif isinstance(aggs, Sequence): + attrs = tuple(schema.attr(i).name for i in range(schema.nattr)) + attr_to_aggs_map = {a: tuple(aggs) for a in attrs} + + from .aggregation import Aggregation + + return Aggregation(self, attr_to_aggs_map) + + @property + def array(self): + return self._array + + @array.setter + def array(self, value): + self._array = value + + @property + def attrs(self): + """List of attributes to include in Query.""" + return self._attrs + + @attrs.setter + def attrs(self, value): + self._attrs = value + + @property + def cond(self): + """QueryCondition used to filter attributes or dimensions in Query.""" + return self._cond + + @cond.setter + def cond(self, value): + self._cond = value + + @property + def dims(self): + """List of dimensions to include in Query.""" + return self._dims + + @property + def has_coords(self): + """ + True if query should include (return) coordinate values. + + :rtype: bool + """ + return self._has_coords + + @property + def order(self): + """Return underlying Array order.""" + return self._order + + @order.setter + def order(self, value): + self._order = value + + @property + def index_col(self): + """List of columns to set as index for dataframe queries, or None.""" + return self._index_col + + @property + def use_arrow(self): + return self._use_arrow + + @property + def return_arrow(self): + return self._return_arrow + + @property + def return_incomplete(self): + return self._return_incomplete + + @property + def domain_index(self): + """Apply Array.domain_index with query parameters.""" + return self._domain_index + + def label_index(self, labels): + """Apply Array.label_index with query parameters.""" + from .multirange_indexing import LabelIndexer + + return LabelIndexer(self._array, tuple(labels), query=self) + + @property + def multi_index(self): + """Apply Array.multi_index with query parameters.""" + # Delayed to avoid circular import + from .multirange_indexing import MultiRangeIndexer + + return MultiRangeIndexer(self._array, query=self) + + @property + def df(self): + """Apply Array.multi_index with query parameters and return result + as a Pandas dataframe.""" + # Delayed to avoid circular import + from .multirange_indexing import DataFrameIndexer + + return DataFrameIndexer(self._array, query=self, use_arrow=self._use_arrow) + + def get_stats(self, print_out=True, json=False): + """Retrieves the stats from a TileDB query. + + :param print_out: Print string to console (default True), or return as string + :param json: Return stats JSON object (default: False) + """ + pyquery = self._array.pyquery + if pyquery is None: + return "" + stats = self._array.pyquery.get_stats() + if json: + stats = json_loads(stats) + if print_out: + print(stats) + else: + return stats + + def submit(self): + """An alias for calling the regular indexer [:]""" + return self[:] diff --git a/tiledb/sparse_array.py b/tiledb/sparse_array.py index 94dc8e209e..47bbd5a909 100644 --- a/tiledb/sparse_array.py +++ b/tiledb/sparse_array.py @@ -12,6 +12,7 @@ replace_ellipsis, replace_scalars_slice, ) +from .query import Query # point query index a tiledb array (zips) columnar index vectors @@ -387,12 +388,12 @@ def query( elif dims is None and coords is None: _coords = True - return tiledb.libtiledb.Query( + return Query( self, attrs=attrs, cond=cond, dims=dims, - coords=_coords, + has_coords=_coords, index_col=index_col, order=order, use_arrow=use_arrow, @@ -402,7 +403,6 @@ def query( def read_subarray(self, subarray): from .main import PyQuery - from .subarray import Subarray # Set layout to UNORDERED for sparse query. # cdef tiledb_layout_t layout = TILEDB_UNORDERED @@ -547,7 +547,6 @@ def _read_sparse_subarray(self, subarray, attr_names: list, cond, layout): nattr = len(attr_names) from .main import PyQuery - from .subarray import Subarray q = PyQuery(self.ctx, self, tuple(attr_names), tuple(), layout, False) self.pyquery = q diff --git a/tiledb/tests/cc/test_cc.py b/tiledb/tests/cc/test_cc.py index 683ab580c3..6d9b509354 100644 --- a/tiledb/tests/cc/test_cc.py +++ b/tiledb/tests/cc/test_cc.py @@ -343,7 +343,7 @@ def write(): q.set_data_buffer("a", data) q.set_data_buffer("x", coords) - assert q.submit() == lt.QueryStatus.COMPLETE + assert q._submit() == lt.QueryStatus.COMPLETE return uri @@ -361,7 +361,7 @@ def read(uri): q.set_data_buffer("a", rdata) q.set_data_buffer("x", rcoords) - assert q.submit() == lt.QueryStatus.COMPLETE + assert q._submit() == lt.QueryStatus.COMPLETE assert np.all(rcoords == coords) assert np.all(rdata == data) @@ -407,7 +407,7 @@ def write(): q.set_data_buffer("a", data) # q.set_data_buffer("x", coords) - assert q.submit() == lt.QueryStatus.COMPLETE + assert q._submit() == lt.QueryStatus.COMPLETE return uri @@ -428,7 +428,7 @@ def read(uri): q.set_data_buffer("a", rdata) - assert q.submit() == lt.QueryStatus.COMPLETE + assert q._submit() == lt.QueryStatus.COMPLETE assert np.all(rdata == data) uri = write() diff --git a/tiledb/tests/test_query.py b/tiledb/tests/test_query.py index e2576bd88b..f2d963c470 100644 --- a/tiledb/tests/test_query.py +++ b/tiledb/tests/test_query.py @@ -35,6 +35,6 @@ def test_label_range_query(self): input_subarray.add_label_range("l1", (-8, -6)) query = tiledb.Query(array) query.set_subarray(input_subarray) - query.submit() + query._submit() output_subarray = query.subarray() assert output_subarray.num_dim_ranges(0) == 2