From dca376116170327a25f5b3bb6c7fac7ff2503eb3 Mon Sep 17 00:00:00 2001 From: Agisilaos Kounelis <36283973+kounelisagis@users.noreply.github.com> Date: Mon, 25 Nov 2024 08:55:37 +0200 Subject: [PATCH] Factor `write_direct_dense` out of Cython (#2109) Co-authored-by: nguyenv --- tiledb/cc/query.cc | 18 ++--- tiledb/dense_array.py | 98 +++++++++++++++++++++++++- tiledb/libtiledb.pyx | 137 ------------------------------------- tiledb/tests/cc/test_cc.py | 14 ++-- 4 files changed, 108 insertions(+), 159 deletions(-) diff --git a/tiledb/cc/query.cc b/tiledb/cc/query.cc index db1bfc7917..6be4ad6a81 100644 --- a/tiledb/cc/query.cc +++ b/tiledb/cc/query.cc @@ -71,17 +71,13 @@ void init_query(py::module &m) { // uint64_t))&Query::set_data_buffer); .def("set_data_buffer", - [](Query &q, std::string name, py::array a) { - // TODO check_type(a.dtype) - // size_t item_size = a.itemsize(); - q.set_data_buffer(name, const_cast(a.data()), a.size()); + [](Query &q, std::string name, py::array a, uint32_t buff_size) { + q.set_data_buffer(name, const_cast(a.data()), buff_size); }) .def("set_offsets_buffer", - [](Query &q, std::string name, py::array a) { - // TODO check_type(a.dtype) - // size_t item_size = a.itemsize(); - q.set_offsets_buffer(name, (uint64_t *)(a.data()), a.size()); + [](Query &q, std::string name, py::array a, uint32_t buff_size) { + q.set_offsets_buffer(name, (uint64_t *)(a.data()), buff_size); }) .def("set_subarray", @@ -90,10 +86,8 @@ void init_query(py::module &m) { }) .def("set_validity_buffer", - [](Query &q, std::string name, py::array a) { - // TODO check_type(a.dtype) - // size_t item_size = a.itemsize(); - q.set_validity_buffer(name, (uint8_t *)(a.data()), a.size()); + [](Query &q, std::string name, py::array a, uint32_t buff_size) { + q.set_validity_buffer(name, (uint8_t *)(a.data()), buff_size); }) .def("_submit", &Query::submit, py::call_guard()) diff --git a/tiledb/dense_array.py b/tiledb/dense_array.py index 9957f67cad..8423e9457e 100644 --- a/tiledb/dense_array.py +++ b/tiledb/dense_array.py @@ -1,4 +1,3 @@ -import warnings from collections import OrderedDict import numpy as np @@ -14,6 +13,7 @@ replace_ellipsis, replace_scalars_slice, ) +from .datatypes import DataType from .query import Query from .subarray import Subarray @@ -601,9 +601,101 @@ def __array__(self, dtype=None, **kw): return array def write_direct(self, array: np.ndarray, **kw): - from .libtiledb import write_direct_dense + """ + Write directly to given array attribute with minimal checks, + assumes that the numpy array is the same shape as the array's domain + + :param np.ndarray array: Numpy contiguous dense array of the same dtype \ + and shape and layout of the DenseArray instance + :raises ValueError: cannot write to multi-attribute DenseArray + :raises ValueError: array is not contiguous + :raises: :py:exc:`tiledb.TileDBError` + """ + append_dim = kw.pop("append_dim", None) + mode = kw.pop("mode", "ingest") + start_idx = kw.pop("start_idx", None) + + if not self.isopen or self.mode != "w": + raise tiledb.TileDBError("DenseArray is not opened for writing") + if self.schema.nattr != 1: + raise ValueError("cannot write_direct to a multi-attribute DenseArray") + if not array.flags.c_contiguous and not array.flags.f_contiguous: + raise ValueError("array is not contiguous") + + use_global_order = ( + self.ctx.config().get("py.use_global_order_1d_write", False) == "true" + ) + + layout = lt.LayoutType.ROW_MAJOR + if array.ndim == 1 and use_global_order: + layout = lt.LayoutType.GLOBAL_ORDER + elif array.flags.f_contiguous: + layout = lt.LayoutType.COL_MAJOR + + range_start_idx = start_idx or 0 + + subarray_ranges = np.zeros(2 * array.ndim, np.uint64) + for n in range(array.ndim): + subarray_ranges[n * 2] = range_start_idx + subarray_ranges[n * 2 + 1] = array.shape[n] + range_start_idx - 1 + + if mode == "append": + with Array.load_typed(self.uri) as A: + ned = A.nonempty_domain() + + if array.ndim <= append_dim: + raise IndexError("`append_dim` out of range") + + if array.ndim != len(ned): + raise ValueError( + "The number of dimension of the TileDB array and " + "Numpy array to append do not match" + ) + + for n in range(array.ndim): + if n == append_dim: + if start_idx is not None: + range_start_idx = start_idx + range_end_idx = array.shape[n] + start_idx - 1 + else: + range_start_idx = ned[n][1] + 1 + range_end_idx = array.shape[n] + ned[n][1] + + subarray_ranges[n * 2] = range_start_idx + subarray_ranges[n * 2 + 1] = range_end_idx + else: + if array.shape[n] != ned[n][1] - ned[n][0] + 1: + raise ValueError( + "The input Numpy array must be of the same " + "shape as the TileDB array, exluding the " + "`append_dim`, but the Numpy array at index " + f"{n} has {array.shape[n]} dimension(s) and " + f"the TileDB array has {ned[n][1]-ned[n][0]}." + ) + + ctx = lt.Context(self.ctx) + q = lt.Query(ctx, self.array, lt.QueryType.WRITE) + q.layout = layout + + subarray = lt.Subarray(ctx, self.array) + for n in range(array.ndim): + subarray._add_dim_range( + n, (subarray_ranges[n * 2], subarray_ranges[n * 2 + 1]) + ) + q.set_subarray(subarray) + + attr = self.schema.attr(0) + battr_name = attr._internal_name.encode("UTF-8") + + tiledb_type = DataType.from_numpy(array.dtype) + + if tiledb_type in (lt.DataType.BLOB, lt.DataType.CHAR, lt.DataType.STRING_UTF8): + q.set_data_buffer(battr_name, array, array.nbytes) + else: + q.set_data_buffer(battr_name, array, tiledb_type.ncells * array.size) - write_direct_dense(self, array, **kw) + q._submit() + q.finalize() def read_direct(self, name=None): """Read attribute directly with minimal overhead, returns a numpy ndarray over the entire domain diff --git a/tiledb/libtiledb.pyx b/tiledb/libtiledb.pyx index 6b11d769c0..de540896d0 100644 --- a/tiledb/libtiledb.pyx +++ b/tiledb/libtiledb.pyx @@ -340,140 +340,3 @@ cdef _raise_ctx_err(tiledb_ctx_t* ctx_ptr, int rc): raise MemoryError() raise TileDBError("error retrieving error object from ctx") _raise_tiledb_error(err_ptr) - - -def write_direct_dense(self: Array, np.ndarray array not None, **kw): - """ - Write directly to given array attribute with minimal checks, - assumes that the numpy array is the same shape as the array's domain - - :param np.ndarray array: Numpy contiguous dense array of the same dtype \ - and shape and layout of the DenseArray instance - :raises ValueError: array is not contiguous - :raises: :py:exc:`tiledb.TileDBError` - - """ - append_dim = kw.pop("append_dim", None) - mode = kw.pop("mode", "ingest") - start_idx = kw.pop("start_idx", None) - - if not self.isopen or self.mode != 'w': - raise TileDBError("DenseArray is not opened for writing") - if self.schema.nattr != 1: - raise ValueError("cannot write_direct to a multi-attribute DenseArray") - if not array.flags.c_contiguous and not array.flags.f_contiguous: - raise ValueError("array is not contiguous") - - cdef tiledb_ctx_t* ctx_ptr = safe_ctx_ptr(self.ctx) - cdef tiledb_array_t* array_ptr = PyCapsule_GetPointer(self.array.__capsule__(), "array") - - # attr name - attr = self.schema.attr(0) - cdef bytes battr_name = attr._internal_name.encode('UTF-8') - cdef const char* attr_name_ptr = PyBytes_AS_STRING(battr_name) - - cdef void* buff_ptr = np.PyArray_DATA(array) - cdef uint64_t buff_size = array.nbytes - cdef np.ndarray subarray = np.zeros(2*array.ndim, np.uint64) - - try: - use_global_order = self.ctx.config().get( - "py.use_global_order_1d_write") == "true" - except KeyError: - use_global_order = False - - cdef tiledb_layout_t layout = TILEDB_ROW_MAJOR - if array.ndim == 1 and use_global_order: - layout = TILEDB_GLOBAL_ORDER - elif array.flags.f_contiguous: - layout = TILEDB_COL_MAJOR - - cdef tiledb_query_t* query_ptr = NULL - cdef tiledb_subarray_t* subarray_ptr = NULL - cdef int rc = TILEDB_OK - rc = tiledb_query_alloc(ctx_ptr, array_ptr, TILEDB_WRITE, &query_ptr) - if rc != TILEDB_OK: - tiledb_query_free(&query_ptr) - _raise_ctx_err(ctx_ptr, rc) - try: - rc = tiledb_query_set_layout(ctx_ptr, query_ptr, layout) - if rc != TILEDB_OK: - _raise_ctx_err(ctx_ptr, rc) - - range_start_idx = start_idx or 0 - for n in range(array.ndim): - subarray[n*2] = range_start_idx - subarray[n*2 + 1] = array.shape[n] + range_start_idx - 1 - - if mode == "append": - with Array.load_typed(self.uri) as A: - ned = A.nonempty_domain() - - if array.ndim <= append_dim: - raise IndexError("`append_dim` out of range") - - if array.ndim != len(ned): - raise ValueError( - "The number of dimension of the TileDB array and " - "Numpy array to append do not match" - ) - - for n in range(array.ndim): - if n == append_dim: - if start_idx is not None: - range_start_idx = start_idx - range_end_idx = array.shape[n] + start_idx -1 - else: - range_start_idx = ned[n][1] + 1 - range_end_idx = array.shape[n] + ned[n][1] - - subarray[n*2] = range_start_idx - subarray[n*2 + 1] = range_end_idx - else: - if array.shape[n] != ned[n][1] - ned[n][0] + 1: - raise ValueError( - "The input Numpy array must be of the same " - "shape as the TileDB array, exluding the " - "`append_dim`, but the Numpy array at index " - f"{n} has {array.shape[n]} dimension(s) and " - f"the TileDB array has {ned[n][1]-ned[n][0]}." - ) - - rc = tiledb_subarray_alloc(ctx_ptr, array_ptr, &subarray_ptr) - if rc != TILEDB_OK: - _raise_ctx_err(ctx_ptr, rc) - rc = tiledb_subarray_set_subarray( - ctx_ptr, - subarray_ptr, - np.PyArray_DATA(subarray) - ) - if rc != TILEDB_OK: - _raise_ctx_err(ctx_ptr, rc) - - rc = tiledb_query_set_subarray_t(ctx_ptr, query_ptr, subarray_ptr) - if rc != TILEDB_OK: - _raise_ctx_err(ctx_ptr, rc) - - rc = tiledb_query_set_data_buffer( - ctx_ptr, - query_ptr, - attr_name_ptr, - buff_ptr, - &buff_size - ) - if rc != TILEDB_OK: - _raise_ctx_err(ctx_ptr, rc) - - with nogil: - rc = tiledb_query_submit(ctx_ptr, query_ptr) - if rc != TILEDB_OK: - _raise_ctx_err(ctx_ptr, rc) - - with nogil: - rc = tiledb_query_finalize(ctx_ptr, query_ptr) - if rc != TILEDB_OK: - _raise_ctx_err(ctx_ptr, rc) - finally: - tiledb_subarray_free(&subarray_ptr) - tiledb_query_free(&query_ptr) - return diff --git a/tiledb/tests/cc/test_cc.py b/tiledb/tests/cc/test_cc.py index 6d9b509354..d18f170fe9 100644 --- a/tiledb/tests/cc/test_cc.py +++ b/tiledb/tests/cc/test_cc.py @@ -340,8 +340,8 @@ def write(): q.layout = lt.LayoutType.UNORDERED assert q.query_type == lt.QueryType.WRITE - q.set_data_buffer("a", data) - q.set_data_buffer("x", coords) + q.set_data_buffer("a", data, len(data)) + q.set_data_buffer("x", coords, len(coords)) assert q._submit() == lt.QueryStatus.COMPLETE @@ -358,8 +358,8 @@ def read(uri): rcoords = np.zeros(10).astype(np.int32) rdata = np.zeros(10).astype(np.int32) - q.set_data_buffer("a", rdata) - q.set_data_buffer("x", rcoords) + q.set_data_buffer("a", rdata, len(rdata)) + q.set_data_buffer("x", rcoords, len(rcoords)) assert q._submit() == lt.QueryStatus.COMPLETE assert np.all(rcoords == coords) @@ -404,8 +404,8 @@ def write(): q.set_subarray(subarray) - q.set_data_buffer("a", data) - # q.set_data_buffer("x", coords) + q.set_data_buffer("a", data, len(data)) + # q.set_data_buffer("x", coords, len(coords)) assert q._submit() == lt.QueryStatus.COMPLETE @@ -426,7 +426,7 @@ def read(uri): rdata = np.zeros(10).astype(np.float32) - q.set_data_buffer("a", rdata) + q.set_data_buffer("a", rdata, len(rdata)) assert q._submit() == lt.QueryStatus.COMPLETE assert np.all(rdata == data)