Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factor write_direct_dense out of Cython #2109

Merged
merged 5 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions tiledb/cc/query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,13 @@ void init_query(py::module &m) {
// uint64_t))&Query::set_data_buffer);

.def("set_data_buffer",
[](Query &q, std::string name, py::array a) {
// TODO check_type(a.dtype)
// size_t item_size = a.itemsize();
q.set_data_buffer(name, const_cast<void *>(a.data()), a.size());
[](Query &q, std::string name, py::array a, uint32_t buff_size) {
q.set_data_buffer(name, const_cast<void *>(a.data()), buff_size);
})

.def("set_offsets_buffer",
[](Query &q, std::string name, py::array a) {
// TODO check_type(a.dtype)
// size_t item_size = a.itemsize();
q.set_offsets_buffer(name, (uint64_t *)(a.data()), a.size());
[](Query &q, std::string name, py::array a, uint32_t buff_size) {
q.set_offsets_buffer(name, (uint64_t *)(a.data()), buff_size);
})

.def("set_subarray",
Expand All @@ -90,10 +86,8 @@ void init_query(py::module &m) {
})

.def("set_validity_buffer",
[](Query &q, std::string name, py::array a) {
// TODO check_type(a.dtype)
// size_t item_size = a.itemsize();
q.set_validity_buffer(name, (uint8_t *)(a.data()), a.size());
[](Query &q, std::string name, py::array a, uint32_t buff_size) {
q.set_validity_buffer(name, (uint8_t *)(a.data()), buff_size);
})

.def("_submit", &Query::submit, py::call_guard<py::gil_scoped_release>())
Expand Down
98 changes: 95 additions & 3 deletions tiledb/dense_array.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import warnings
from collections import OrderedDict

import numpy as np
Expand All @@ -14,6 +13,7 @@
replace_ellipsis,
replace_scalars_slice,
)
from .datatypes import DataType
from .query import Query
from .subarray import Subarray

Expand Down Expand Up @@ -601,9 +601,101 @@ def __array__(self, dtype=None, **kw):
return array

def write_direct(self, array: np.ndarray, **kw):
from .libtiledb import write_direct_dense
"""
Write directly to given array attribute with minimal checks,
assumes that the numpy array is the same shape as the array's domain

:param np.ndarray array: Numpy contiguous dense array of the same dtype \
and shape and layout of the DenseArray instance
:raises ValueError: cannot write to multi-attribute DenseArray
:raises ValueError: array is not contiguous
:raises: :py:exc:`tiledb.TileDBError`
"""
append_dim = kw.pop("append_dim", None)
mode = kw.pop("mode", "ingest")
start_idx = kw.pop("start_idx", None)

if not self.isopen or self.mode != "w":
raise tiledb.TileDBError("DenseArray is not opened for writing")
if self.schema.nattr != 1:
raise ValueError("cannot write_direct to a multi-attribute DenseArray")
if not array.flags.c_contiguous and not array.flags.f_contiguous:
raise ValueError("array is not contiguous")

use_global_order = (
self.ctx.config().get("py.use_global_order_1d_write", False) == "true"
)

layout = lt.LayoutType.ROW_MAJOR
if array.ndim == 1 and use_global_order:
layout = lt.LayoutType.GLOBAL_ORDER
elif array.flags.f_contiguous:
layout = lt.LayoutType.COL_MAJOR

range_start_idx = start_idx or 0

subarray_ranges = np.zeros(2 * array.ndim, np.uint64)
for n in range(array.ndim):
subarray_ranges[n * 2] = range_start_idx
subarray_ranges[n * 2 + 1] = array.shape[n] + range_start_idx - 1

if mode == "append":
with Array.load_typed(self.uri) as A:
ned = A.nonempty_domain()

if array.ndim <= append_dim:
raise IndexError("`append_dim` out of range")

if array.ndim != len(ned):
raise ValueError(
"The number of dimension of the TileDB array and "
"Numpy array to append do not match"
)

for n in range(array.ndim):
if n == append_dim:
if start_idx is not None:
range_start_idx = start_idx
range_end_idx = array.shape[n] + start_idx - 1
else:
range_start_idx = ned[n][1] + 1
range_end_idx = array.shape[n] + ned[n][1]

subarray_ranges[n * 2] = range_start_idx
subarray_ranges[n * 2 + 1] = range_end_idx
else:
if array.shape[n] != ned[n][1] - ned[n][0] + 1:
raise ValueError(
"The input Numpy array must be of the same "
"shape as the TileDB array, exluding the "
"`append_dim`, but the Numpy array at index "
f"{n} has {array.shape[n]} dimension(s) and "
f"the TileDB array has {ned[n][1]-ned[n][0]}."
)

ctx = lt.Context(self.ctx)
q = lt.Query(ctx, self.array, lt.QueryType.WRITE)
q.layout = layout

subarray = lt.Subarray(ctx, self.array)
for n in range(array.ndim):
subarray._add_dim_range(
n, (subarray_ranges[n * 2], subarray_ranges[n * 2 + 1])
)
q.set_subarray(subarray)

attr = self.schema.attr(0)
battr_name = attr._internal_name.encode("UTF-8")

tiledb_type = DataType.from_numpy(array.dtype)

if tiledb_type in (lt.DataType.BLOB, lt.DataType.CHAR, lt.DataType.STRING_UTF8):
q.set_data_buffer(battr_name, array, array.nbytes)
else:
q.set_data_buffer(battr_name, array, tiledb_type.ncells * array.size)

write_direct_dense(self, array, **kw)
q._submit()
q.finalize()

def read_direct(self, name=None):
"""Read attribute directly with minimal overhead, returns a numpy ndarray over the entire domain
Expand Down
137 changes: 0 additions & 137 deletions tiledb/libtiledb.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -341,140 +341,3 @@ cdef _raise_ctx_err(tiledb_ctx_t* ctx_ptr, int rc):
raise MemoryError()
raise TileDBError("error retrieving error object from ctx")
_raise_tiledb_error(err_ptr)


def write_direct_dense(self: Array, np.ndarray array not None, **kw):
"""
Write directly to given array attribute with minimal checks,
assumes that the numpy array is the same shape as the array's domain

:param np.ndarray array: Numpy contiguous dense array of the same dtype \
and shape and layout of the DenseArray instance
:raises ValueError: array is not contiguous
:raises: :py:exc:`tiledb.TileDBError`

"""
append_dim = kw.pop("append_dim", None)
mode = kw.pop("mode", "ingest")
start_idx = kw.pop("start_idx", None)

if not self.isopen or self.mode != 'w':
raise TileDBError("DenseArray is not opened for writing")
if self.schema.nattr != 1:
raise ValueError("cannot write_direct to a multi-attribute DenseArray")
if not array.flags.c_contiguous and not array.flags.f_contiguous:
raise ValueError("array is not contiguous")

cdef tiledb_ctx_t* ctx_ptr = safe_ctx_ptr(self.ctx)
cdef tiledb_array_t* array_ptr = <tiledb_array_t*>PyCapsule_GetPointer(self.array.__capsule__(), "array")

# attr name
attr = self.schema.attr(0)
cdef bytes battr_name = attr._internal_name.encode('UTF-8')
cdef const char* attr_name_ptr = PyBytes_AS_STRING(battr_name)

cdef void* buff_ptr = np.PyArray_DATA(array)
cdef uint64_t buff_size = array.nbytes
cdef np.ndarray subarray = np.zeros(2*array.ndim, np.uint64)

try:
use_global_order = self.ctx.config().get(
"py.use_global_order_1d_write") == "true"
except KeyError:
use_global_order = False

cdef tiledb_layout_t layout = TILEDB_ROW_MAJOR
if array.ndim == 1 and use_global_order:
layout = TILEDB_GLOBAL_ORDER
elif array.flags.f_contiguous:
layout = TILEDB_COL_MAJOR

cdef tiledb_query_t* query_ptr = NULL
cdef tiledb_subarray_t* subarray_ptr = NULL
cdef int rc = TILEDB_OK
rc = tiledb_query_alloc(ctx_ptr, array_ptr, TILEDB_WRITE, &query_ptr)
if rc != TILEDB_OK:
tiledb_query_free(&query_ptr)
_raise_ctx_err(ctx_ptr, rc)
try:
rc = tiledb_query_set_layout(ctx_ptr, query_ptr, layout)
if rc != TILEDB_OK:
_raise_ctx_err(ctx_ptr, rc)

range_start_idx = start_idx or 0
for n in range(array.ndim):
subarray[n*2] = range_start_idx
subarray[n*2 + 1] = array.shape[n] + range_start_idx - 1

if mode == "append":
with Array.load_typed(self.uri) as A:
ned = A.nonempty_domain()

if array.ndim <= append_dim:
raise IndexError("`append_dim` out of range")

if array.ndim != len(ned):
raise ValueError(
"The number of dimension of the TileDB array and "
"Numpy array to append do not match"
)

for n in range(array.ndim):
if n == append_dim:
if start_idx is not None:
range_start_idx = start_idx
range_end_idx = array.shape[n] + start_idx -1
else:
range_start_idx = ned[n][1] + 1
range_end_idx = array.shape[n] + ned[n][1]

subarray[n*2] = range_start_idx
subarray[n*2 + 1] = range_end_idx
else:
if array.shape[n] != ned[n][1] - ned[n][0] + 1:
raise ValueError(
"The input Numpy array must be of the same "
"shape as the TileDB array, exluding the "
"`append_dim`, but the Numpy array at index "
f"{n} has {array.shape[n]} dimension(s) and "
f"the TileDB array has {ned[n][1]-ned[n][0]}."
)

rc = tiledb_subarray_alloc(ctx_ptr, array_ptr, &subarray_ptr)
if rc != TILEDB_OK:
_raise_ctx_err(ctx_ptr, rc)
rc = tiledb_subarray_set_subarray(
ctx_ptr,
subarray_ptr,
<void*>np.PyArray_DATA(subarray)
)
if rc != TILEDB_OK:
_raise_ctx_err(ctx_ptr, rc)

rc = tiledb_query_set_subarray_t(ctx_ptr, query_ptr, subarray_ptr)
if rc != TILEDB_OK:
_raise_ctx_err(ctx_ptr, rc)

rc = tiledb_query_set_data_buffer(
ctx_ptr,
query_ptr,
attr_name_ptr,
buff_ptr,
&buff_size
)
if rc != TILEDB_OK:
_raise_ctx_err(ctx_ptr, rc)

with nogil:
rc = tiledb_query_submit(ctx_ptr, query_ptr)
if rc != TILEDB_OK:
_raise_ctx_err(ctx_ptr, rc)

with nogil:
rc = tiledb_query_finalize(ctx_ptr, query_ptr)
if rc != TILEDB_OK:
_raise_ctx_err(ctx_ptr, rc)
finally:
tiledb_subarray_free(&subarray_ptr)
tiledb_query_free(&query_ptr)
return
14 changes: 7 additions & 7 deletions tiledb/tests/cc/test_cc.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,8 @@ def write():
q.layout = lt.LayoutType.UNORDERED
assert q.query_type == lt.QueryType.WRITE

q.set_data_buffer("a", data)
q.set_data_buffer("x", coords)
q.set_data_buffer("a", data, len(data))
q.set_data_buffer("x", coords, len(coords))

assert q._submit() == lt.QueryStatus.COMPLETE

Expand All @@ -358,8 +358,8 @@ def read(uri):
rcoords = np.zeros(10).astype(np.int32)
rdata = np.zeros(10).astype(np.int32)

q.set_data_buffer("a", rdata)
q.set_data_buffer("x", rcoords)
q.set_data_buffer("a", rdata, len(rdata))
q.set_data_buffer("x", rcoords, len(rcoords))

assert q._submit() == lt.QueryStatus.COMPLETE
assert np.all(rcoords == coords)
Expand Down Expand Up @@ -404,8 +404,8 @@ def write():

q.set_subarray(subarray)

q.set_data_buffer("a", data)
# q.set_data_buffer("x", coords)
q.set_data_buffer("a", data, len(data))
# q.set_data_buffer("x", coords, len(coords))

assert q._submit() == lt.QueryStatus.COMPLETE

Expand All @@ -426,7 +426,7 @@ def read(uri):

rdata = np.zeros(10).astype(np.float32)

q.set_data_buffer("a", rdata)
q.set_data_buffer("a", rdata, len(rdata))

assert q._submit() == lt.QueryStatus.COMPLETE
assert np.all(rdata == data)
Expand Down
Loading