Skip to content

Commit

Permalink
Factor write_direct_dense out of Cython (#2109)
Browse files Browse the repository at this point in the history
Co-authored-by: nguyenv <[email protected]>
  • Loading branch information
kounelisagis and nguyenv authored Nov 25, 2024
1 parent 4674f14 commit c0e126a
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 159 deletions.
18 changes: 6 additions & 12 deletions tiledb/cc/query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,13 @@ void init_query(py::module &m) {
// uint64_t))&Query::set_data_buffer);

.def("set_data_buffer",
[](Query &q, std::string name, py::array a) {
// TODO check_type(a.dtype)
// size_t item_size = a.itemsize();
q.set_data_buffer(name, const_cast<void *>(a.data()), a.size());
[](Query &q, std::string name, py::array a, uint32_t buff_size) {
q.set_data_buffer(name, const_cast<void *>(a.data()), buff_size);
})

.def("set_offsets_buffer",
[](Query &q, std::string name, py::array a) {
// TODO check_type(a.dtype)
// size_t item_size = a.itemsize();
q.set_offsets_buffer(name, (uint64_t *)(a.data()), a.size());
[](Query &q, std::string name, py::array a, uint32_t buff_size) {
q.set_offsets_buffer(name, (uint64_t *)(a.data()), buff_size);
})

.def("set_subarray",
Expand All @@ -90,10 +86,8 @@ void init_query(py::module &m) {
})

.def("set_validity_buffer",
[](Query &q, std::string name, py::array a) {
// TODO check_type(a.dtype)
// size_t item_size = a.itemsize();
q.set_validity_buffer(name, (uint8_t *)(a.data()), a.size());
[](Query &q, std::string name, py::array a, uint32_t buff_size) {
q.set_validity_buffer(name, (uint8_t *)(a.data()), buff_size);
})

.def("_submit", &Query::submit, py::call_guard<py::gil_scoped_release>())
Expand Down
98 changes: 95 additions & 3 deletions tiledb/dense_array.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import warnings
from collections import OrderedDict

import numpy as np
Expand All @@ -14,6 +13,7 @@
replace_ellipsis,
replace_scalars_slice,
)
from .datatypes import DataType
from .query import Query
from .subarray import Subarray

Expand Down Expand Up @@ -601,9 +601,101 @@ def __array__(self, dtype=None, **kw):
return array

def write_direct(self, array: np.ndarray, **kw):
from .libtiledb import write_direct_dense
"""
Write directly to given array attribute with minimal checks,
assumes that the numpy array is the same shape as the array's domain
:param np.ndarray array: Numpy contiguous dense array of the same dtype \
and shape and layout of the DenseArray instance
:raises ValueError: cannot write to multi-attribute DenseArray
:raises ValueError: array is not contiguous
:raises: :py:exc:`tiledb.TileDBError`
"""
append_dim = kw.pop("append_dim", None)
mode = kw.pop("mode", "ingest")
start_idx = kw.pop("start_idx", None)

if not self.isopen or self.mode != "w":
raise tiledb.TileDBError("DenseArray is not opened for writing")
if self.schema.nattr != 1:
raise ValueError("cannot write_direct to a multi-attribute DenseArray")
if not array.flags.c_contiguous and not array.flags.f_contiguous:
raise ValueError("array is not contiguous")

use_global_order = (
self.ctx.config().get("py.use_global_order_1d_write", False) == "true"
)

layout = lt.LayoutType.ROW_MAJOR
if array.ndim == 1 and use_global_order:
layout = lt.LayoutType.GLOBAL_ORDER
elif array.flags.f_contiguous:
layout = lt.LayoutType.COL_MAJOR

range_start_idx = start_idx or 0

subarray_ranges = np.zeros(2 * array.ndim, np.uint64)
for n in range(array.ndim):
subarray_ranges[n * 2] = range_start_idx
subarray_ranges[n * 2 + 1] = array.shape[n] + range_start_idx - 1

if mode == "append":
with Array.load_typed(self.uri) as A:
ned = A.nonempty_domain()

if array.ndim <= append_dim:
raise IndexError("`append_dim` out of range")

if array.ndim != len(ned):
raise ValueError(
"The number of dimension of the TileDB array and "
"Numpy array to append do not match"
)

for n in range(array.ndim):
if n == append_dim:
if start_idx is not None:
range_start_idx = start_idx
range_end_idx = array.shape[n] + start_idx - 1
else:
range_start_idx = ned[n][1] + 1
range_end_idx = array.shape[n] + ned[n][1]

subarray_ranges[n * 2] = range_start_idx
subarray_ranges[n * 2 + 1] = range_end_idx
else:
if array.shape[n] != ned[n][1] - ned[n][0] + 1:
raise ValueError(
"The input Numpy array must be of the same "
"shape as the TileDB array, exluding the "
"`append_dim`, but the Numpy array at index "
f"{n} has {array.shape[n]} dimension(s) and "
f"the TileDB array has {ned[n][1]-ned[n][0]}."
)

ctx = lt.Context(self.ctx)
q = lt.Query(ctx, self.array, lt.QueryType.WRITE)
q.layout = layout

subarray = lt.Subarray(ctx, self.array)
for n in range(array.ndim):
subarray._add_dim_range(
n, (subarray_ranges[n * 2], subarray_ranges[n * 2 + 1])
)
q.set_subarray(subarray)

attr = self.schema.attr(0)
battr_name = attr._internal_name.encode("UTF-8")

tiledb_type = DataType.from_numpy(array.dtype)

if tiledb_type in (lt.DataType.BLOB, lt.DataType.CHAR, lt.DataType.STRING_UTF8):
q.set_data_buffer(battr_name, array, array.nbytes)
else:
q.set_data_buffer(battr_name, array, tiledb_type.ncells * array.size)

write_direct_dense(self, array, **kw)
q._submit()
q.finalize()

def read_direct(self, name=None):
"""Read attribute directly with minimal overhead, returns a numpy ndarray over the entire domain
Expand Down
137 changes: 0 additions & 137 deletions tiledb/libtiledb.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -340,140 +340,3 @@ cdef _raise_ctx_err(tiledb_ctx_t* ctx_ptr, int rc):
raise MemoryError()
raise TileDBError("error retrieving error object from ctx")
_raise_tiledb_error(err_ptr)


def write_direct_dense(self: Array, np.ndarray array not None, **kw):
"""
Write directly to given array attribute with minimal checks,
assumes that the numpy array is the same shape as the array's domain
:param np.ndarray array: Numpy contiguous dense array of the same dtype \
and shape and layout of the DenseArray instance
:raises ValueError: array is not contiguous
:raises: :py:exc:`tiledb.TileDBError`
"""
append_dim = kw.pop("append_dim", None)
mode = kw.pop("mode", "ingest")
start_idx = kw.pop("start_idx", None)

if not self.isopen or self.mode != 'w':
raise TileDBError("DenseArray is not opened for writing")
if self.schema.nattr != 1:
raise ValueError("cannot write_direct to a multi-attribute DenseArray")
if not array.flags.c_contiguous and not array.flags.f_contiguous:
raise ValueError("array is not contiguous")

cdef tiledb_ctx_t* ctx_ptr = safe_ctx_ptr(self.ctx)
cdef tiledb_array_t* array_ptr = <tiledb_array_t*>PyCapsule_GetPointer(self.array.__capsule__(), "array")

# attr name
attr = self.schema.attr(0)
cdef bytes battr_name = attr._internal_name.encode('UTF-8')
cdef const char* attr_name_ptr = PyBytes_AS_STRING(battr_name)

cdef void* buff_ptr = np.PyArray_DATA(array)
cdef uint64_t buff_size = array.nbytes
cdef np.ndarray subarray = np.zeros(2*array.ndim, np.uint64)

try:
use_global_order = self.ctx.config().get(
"py.use_global_order_1d_write") == "true"
except KeyError:
use_global_order = False

cdef tiledb_layout_t layout = TILEDB_ROW_MAJOR
if array.ndim == 1 and use_global_order:
layout = TILEDB_GLOBAL_ORDER
elif array.flags.f_contiguous:
layout = TILEDB_COL_MAJOR

cdef tiledb_query_t* query_ptr = NULL
cdef tiledb_subarray_t* subarray_ptr = NULL
cdef int rc = TILEDB_OK
rc = tiledb_query_alloc(ctx_ptr, array_ptr, TILEDB_WRITE, &query_ptr)
if rc != TILEDB_OK:
tiledb_query_free(&query_ptr)
_raise_ctx_err(ctx_ptr, rc)
try:
rc = tiledb_query_set_layout(ctx_ptr, query_ptr, layout)
if rc != TILEDB_OK:
_raise_ctx_err(ctx_ptr, rc)

range_start_idx = start_idx or 0
for n in range(array.ndim):
subarray[n*2] = range_start_idx
subarray[n*2 + 1] = array.shape[n] + range_start_idx - 1

if mode == "append":
with Array.load_typed(self.uri) as A:
ned = A.nonempty_domain()

if array.ndim <= append_dim:
raise IndexError("`append_dim` out of range")

if array.ndim != len(ned):
raise ValueError(
"The number of dimension of the TileDB array and "
"Numpy array to append do not match"
)

for n in range(array.ndim):
if n == append_dim:
if start_idx is not None:
range_start_idx = start_idx
range_end_idx = array.shape[n] + start_idx -1
else:
range_start_idx = ned[n][1] + 1
range_end_idx = array.shape[n] + ned[n][1]

subarray[n*2] = range_start_idx
subarray[n*2 + 1] = range_end_idx
else:
if array.shape[n] != ned[n][1] - ned[n][0] + 1:
raise ValueError(
"The input Numpy array must be of the same "
"shape as the TileDB array, exluding the "
"`append_dim`, but the Numpy array at index "
f"{n} has {array.shape[n]} dimension(s) and "
f"the TileDB array has {ned[n][1]-ned[n][0]}."
)

rc = tiledb_subarray_alloc(ctx_ptr, array_ptr, &subarray_ptr)
if rc != TILEDB_OK:
_raise_ctx_err(ctx_ptr, rc)
rc = tiledb_subarray_set_subarray(
ctx_ptr,
subarray_ptr,
<void*>np.PyArray_DATA(subarray)
)
if rc != TILEDB_OK:
_raise_ctx_err(ctx_ptr, rc)

rc = tiledb_query_set_subarray_t(ctx_ptr, query_ptr, subarray_ptr)
if rc != TILEDB_OK:
_raise_ctx_err(ctx_ptr, rc)

rc = tiledb_query_set_data_buffer(
ctx_ptr,
query_ptr,
attr_name_ptr,
buff_ptr,
&buff_size
)
if rc != TILEDB_OK:
_raise_ctx_err(ctx_ptr, rc)

with nogil:
rc = tiledb_query_submit(ctx_ptr, query_ptr)
if rc != TILEDB_OK:
_raise_ctx_err(ctx_ptr, rc)

with nogil:
rc = tiledb_query_finalize(ctx_ptr, query_ptr)
if rc != TILEDB_OK:
_raise_ctx_err(ctx_ptr, rc)
finally:
tiledb_subarray_free(&subarray_ptr)
tiledb_query_free(&query_ptr)
return
14 changes: 7 additions & 7 deletions tiledb/tests/cc/test_cc.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,8 @@ def write():
q.layout = lt.LayoutType.UNORDERED
assert q.query_type == lt.QueryType.WRITE

q.set_data_buffer("a", data)
q.set_data_buffer("x", coords)
q.set_data_buffer("a", data, len(data))
q.set_data_buffer("x", coords, len(coords))

assert q._submit() == lt.QueryStatus.COMPLETE

Expand All @@ -358,8 +358,8 @@ def read(uri):
rcoords = np.zeros(10).astype(np.int32)
rdata = np.zeros(10).astype(np.int32)

q.set_data_buffer("a", rdata)
q.set_data_buffer("x", rcoords)
q.set_data_buffer("a", rdata, len(rdata))
q.set_data_buffer("x", rcoords, len(rcoords))

assert q._submit() == lt.QueryStatus.COMPLETE
assert np.all(rcoords == coords)
Expand Down Expand Up @@ -404,8 +404,8 @@ def write():

q.set_subarray(subarray)

q.set_data_buffer("a", data)
# q.set_data_buffer("x", coords)
q.set_data_buffer("a", data, len(data))
# q.set_data_buffer("x", coords, len(coords))

assert q._submit() == lt.QueryStatus.COMPLETE

Expand All @@ -426,7 +426,7 @@ def read(uri):

rdata = np.zeros(10).astype(np.float32)

q.set_data_buffer("a", rdata)
q.set_data_buffer("a", rdata, len(rdata))

assert q._submit() == lt.QueryStatus.COMPLETE
assert np.all(rdata == data)
Expand Down

0 comments on commit c0e126a

Please sign in to comment.