Skip to content

Commit

Permalink
[r] Replace SOMAArray read and write calls with ManagedQuery (#3678)
Browse files Browse the repository at this point in the history
  • Loading branch information
nguyenv authored Feb 7, 2025
1 parent 4bcda05 commit 2105abc
Show file tree
Hide file tree
Showing 22 changed files with 190 additions and 206 deletions.
5 changes: 1 addition & 4 deletions apis/python/src/tiledbsoma/managed_query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ void load_managed_query(py::module& m) {
py::init([](SOMAArray array,
std::shared_ptr<SOMAContext> ctx,
std::string_view name) {
return ManagedQuery(
std::make_unique<SOMAArray>(array),
ctx->tiledb_ctx(),
name);
return ManagedQuery(array, ctx->tiledb_ctx(), name);
}),
py::arg("array"),
py::arg("ctx"),
Expand Down
2 changes: 1 addition & 1 deletion apis/r/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Description: Interface for working with 'TileDB'-based Stack of Matrices,
like those commonly used for single cell data analysis. It is documented at
<https://github.com/single-cell-data>; a formal specification available is at
<https://github.com/single-cell-data/SOMA/blob/main/abstract_specification.md>.
Version: 1.16.99.2
Version: 1.16.99.3
Authors@R: c(
person(given = "Aaron", family = "Wolen",
role = c("cre", "aut"), email = "[email protected]",
Expand Down
1 change: 1 addition & 0 deletions apis/r/NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

* Encode string metadata as `TILEDB_STRING_UTF8` instead of `TILEDB_STRING_ASCII`
* Use S3 method dispatch on `integer64` instead of directly calling the S3 methods
* [c++] Replace `SOMAArray` read and write calls with `ManagedQuery` [#3678](https://github.com/single-cell-data/TileDB-SOMA/pull/3678)

# tiledbsoma 1.15.0

Expand Down
4 changes: 2 additions & 2 deletions apis/r/R/BlockwiseIter.R
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ BlockwiseReadIterBase <- R6::R6Class(
if (is.null(private$soma_reader_pointer)) {
return(NULL)
}
sr_reset(private$soma_reader_pointer)
mq_reset(private$soma_reader_pointer)
return(invisible(NULL))
},
# @description Re-index an Arrow table
Expand Down Expand Up @@ -237,7 +237,7 @@ BlockwiseReadIterBase <- R6::R6Class(
if (is.null(private$soma_reader_pointer)) {
return(NULL)
}
sr_set_dim_points(private$soma_reader_pointer, dimname, points)
mq_set_dim_points(private$soma_reader_pointer, dimname, points)
return(invisible(NULL))
}
)
Expand Down
42 changes: 21 additions & 21 deletions apis/r/R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,14 @@ c_update_dataframe_schema <- function(uri, ctxxp, column_names_to_drop, add_cols
invisible(.Call(`_tiledbsoma_c_update_dataframe_schema`, uri, ctxxp, column_names_to_drop, add_cols_types, add_cols_enum_value_types, add_cols_enum_ordered))
}

#' Iterator-Style Access to SOMA Array via SOMAArray
#' Iterator-Style Access to SOMA Array via ManagedQuery
#'
#' The `sr_*` functions provide low-level access to an instance of the SOMAArray
#' The `mq_*` functions provide low-level access to an instance of a ManagedQuery
#' class so that iterative access over parts of a (large) array is possible.
#' \describe{
#' \item{\code{sr_setup}}{instantiates and by default also submits a query}
#' \item{\code{sr_complete}}{checks if more data is available}
#' \item{\code{sr_next}}{returns the next chunk}
#' \item{\code{mq_setup}}{instantiates and by default also submits a query}
#' \item{\code{mq_complete}}{checks if more data is available}
#' \item{\code{mq_next}}{returns the next chunk}
#' }
#'
#' @param uri Character value with URI path to a SOMA data set
Expand All @@ -289,20 +289,20 @@ c_update_dataframe_schema <- function(uri, ctxxp, column_names_to_drop, add_cols
#' new logging level.
#' @param timestamprange Optional POSIXct (i.e. Datetime) vector with start
#' and end of ' interval for which data is considered.
#' @param sr An external pointer to a TileDB SOMAArray object.
#' @param mq An external pointer to a ManagedQuery object.
#'
#' @return \code{sr_setup} returns an external pointer to a SOMAArray.
#' \code{sr_complete} ' returns a boolean, and \code{sr_next} returns an Arrow
#' @return \code{mq_setup} returns an external pointer to a ManagedQuery object.
#' \code{mq_complete} ' returns a boolean, and \code{mq_next} returns an Arrow
#' array helper object.
#'
#' @examples
#' \dontrun{
#' uri <- extract_dataset("soma-dataframe-pbmc3k-processed-obs")
#' ctxcp <- soma_context()
#' sr <- sr_setup(uri, ctxxp)
#' mq <- mq_setup(uri, ctxxp)
#' rl <- data.frame()
#' while (!sr_complete(sr)) {
#' dat <- sr_next(sr)
#' while (!mq_complete(mq)) {
#' dat <- mq_next(mq)
#' rb <- arrow::RecordBatch$import_from_c(dat$array_data, dat$schema)
#' rl <- rbind(rl, as.data.frame(rb))
#' }
Expand All @@ -311,12 +311,12 @@ c_update_dataframe_schema <- function(uri, ctxxp, column_names_to_drop, add_cols
#' @noRd
NULL

sr_setup <- function(uri, ctxxp, colnames = NULL, qc = NULL, dim_points = NULL, dim_ranges = NULL, batch_size = "auto", result_order = "auto", timestamprange = NULL, loglevel = "auto") {
.Call(`_tiledbsoma_sr_setup`, uri, ctxxp, colnames, qc, dim_points, dim_ranges, batch_size, result_order, timestamprange, loglevel)
mq_setup <- function(uri, ctxxp, colnames = NULL, qc = NULL, dim_points = NULL, dim_ranges = NULL, batch_size = "auto", result_order = "auto", timestamprange = NULL, loglevel = "auto") {
.Call(`_tiledbsoma_mq_setup`, uri, ctxxp, colnames, qc, dim_points, dim_ranges, batch_size, result_order, timestamprange, loglevel)
}

sr_complete <- function(sr) {
.Call(`_tiledbsoma_sr_complete`, sr)
mq_complete <- function(mq) {
.Call(`_tiledbsoma_mq_complete`, mq)
}

#' @noRd
Expand All @@ -325,16 +325,16 @@ create_empty_arrow_table <- function() {
.Call(`_tiledbsoma_create_empty_arrow_table`)
}

sr_next <- function(sr) {
.Call(`_tiledbsoma_sr_next`, sr)
mq_next <- function(mq) {
.Call(`_tiledbsoma_mq_next`, mq)
}

sr_reset <- function(sr) {
invisible(.Call(`_tiledbsoma_sr_reset`, sr))
mq_reset <- function(mq) {
invisible(.Call(`_tiledbsoma_mq_reset`, mq))
}

sr_set_dim_points <- function(sr, dim, points) {
invisible(.Call(`_tiledbsoma_sr_set_dim_points`, sr, dim, points))
mq_set_dim_points <- function(mq, dim, points) {
invisible(.Call(`_tiledbsoma_mq_set_dim_points`, mq, dim, points))
}

#' TileDB SOMA statistics
Expand Down
4 changes: 2 additions & 2 deletions apis/r/R/ReadIter.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ReadIter <- R6::R6Class(
if (is.null(private$soma_reader_pointer)) {
TRUE
} else {
sr_complete(private$soma_reader_pointer)
mq_complete(private$soma_reader_pointer)
}
},

Expand Down Expand Up @@ -57,7 +57,7 @@ ReadIter <- R6::R6Class(
if (is.null(private$soma_reader_pointer)) {
return(NULL)
}
rl <- sr_next(private$soma_reader_pointer)
rl <- mq_next(private$soma_reader_pointer)
return(private$soma_reader_transform(rl))
},

Expand Down
4 changes: 2 additions & 2 deletions apis/r/R/SOMADataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,10 @@ SOMADataFrame <- R6::R6Class(
value_filter <- parsed@ptr
}
spdl::debug(
"[SOMADataFrame$read] calling sr_setup for {} at ({},{})", self$uri,
"[SOMADataFrame$read] calling mq_setup for {} at ({},{})", self$uri,
private$tiledb_timestamp[1], private$tiledb_timestamp[2]
)
sr <- sr_setup(
sr <- mq_setup(
uri = self$uri,
private$.soma_context,
colnames = column_names,
Expand Down
2 changes: 1 addition & 1 deletion apis/r/R/SOMANDArrayBase.R
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ SOMANDArrayBase <- R6::R6Class(
},

# @description Converts a list of vectors corresponding to coords to a
# format acceptable for sr_setup and soma_array_reader
# format acceptable for mq_setup and soma_array_reader
.convert_coords = function(coords) {
# Ensure coords is a named list, use to select dim points
stopifnot(
Expand Down
2 changes: 1 addition & 1 deletion apis/r/R/SOMASparseNDArray.R
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ SOMASparseNDArray <- R6::R6Class(
coords <- private$.convert_coords(coords)
}

sr <- sr_setup(
sr <- mq_setup(
uri = self$uri,
private$.soma_context,
dim_points = coords,
Expand Down
2 changes: 1 addition & 1 deletion apis/r/R/utils-readerTransformers.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#' Transformer function: SOMAArray to Arrow table
#'
#' @description Converts the results of a \link{soma_array_reader} or
#' \link{sr_next} to an arrow::\link[arrow]{Table}
#' \link{mq_next} to an arrow::\link[arrow]{Table}
#' @param x A nanoarrow_array object which is itself a wrapper around the external pointer
#' to the Arrow array data; the schema external pointer is added to it as well
#' @return arrow::\link[arrow]{Table}
Expand Down
2 changes: 1 addition & 1 deletion apis/r/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ uns_hint <- function(type = c("1d", "2d")) {
}
x$reopen("READ", tiledb_timestamp = x$tiledb_timestamp)
dimname <- x$dimnames()[axis + 1L]
sr <- sr_setup(
sr <- mq_setup(
uri = x$uri,
soma_context(),
colnames = dimname,
Expand Down
4 changes: 2 additions & 2 deletions apis/r/src/Makevars.in
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
CXX_STD = CXX20
#CXX_STD = CXX20

## We need the TileDB Headers, and for macOS aka Darwin need to set minimum version 13.3 for macOS
PKG_CPPFLAGS = -I. -I../inst/include/ @tiledb_include@ @cxx20_macos@ -DSPDLOG_USE_STD_FORMAT
PKG_CPPFLAGS = -std=c++20 -I. -I../inst/include/ @tiledb_include@ @cxx20_macos@ -DSPDLOG_USE_STD_FORMAT

## We also need the TileDB library
PKG_LIBS = @cxx20_macos@ @tiledb_libs@ @tiledb_rpath@
Expand Down
58 changes: 29 additions & 29 deletions apis/r/src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,9 +627,9 @@ BEGIN_RCPP
return R_NilValue;
END_RCPP
}
// sr_setup
Rcpp::XPtr<tdbs::SOMAArray> sr_setup(const std::string& uri, Rcpp::XPtr<somactx_wrap_t> ctxxp, Rcpp::Nullable<Rcpp::CharacterVector> colnames, Rcpp::Nullable<Rcpp::XPtr<tiledb::QueryCondition>> qc, Rcpp::Nullable<Rcpp::List> dim_points, Rcpp::Nullable<Rcpp::List> dim_ranges, std::string batch_size, std::string result_order, Rcpp::Nullable<Rcpp::DatetimeVector> timestamprange, const std::string& loglevel);
RcppExport SEXP _tiledbsoma_sr_setup(SEXP uriSEXP, SEXP ctxxpSEXP, SEXP colnamesSEXP, SEXP qcSEXP, SEXP dim_pointsSEXP, SEXP dim_rangesSEXP, SEXP batch_sizeSEXP, SEXP result_orderSEXP, SEXP timestamprangeSEXP, SEXP loglevelSEXP) {
// mq_setup
Rcpp::XPtr<tdbs::ManagedQuery> mq_setup(const std::string& uri, Rcpp::XPtr<somactx_wrap_t> ctxxp, Rcpp::Nullable<Rcpp::CharacterVector> colnames, Rcpp::Nullable<Rcpp::XPtr<tiledb::QueryCondition>> qc, Rcpp::Nullable<Rcpp::List> dim_points, Rcpp::Nullable<Rcpp::List> dim_ranges, std::string batch_size, std::string result_order, Rcpp::Nullable<Rcpp::DatetimeVector> timestamprange, const std::string& loglevel);
RcppExport SEXP _tiledbsoma_mq_setup(SEXP uriSEXP, SEXP ctxxpSEXP, SEXP colnamesSEXP, SEXP qcSEXP, SEXP dim_pointsSEXP, SEXP dim_rangesSEXP, SEXP batch_sizeSEXP, SEXP result_orderSEXP, SEXP timestamprangeSEXP, SEXP loglevelSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Expand All @@ -643,18 +643,18 @@ BEGIN_RCPP
Rcpp::traits::input_parameter< std::string >::type result_order(result_orderSEXP);
Rcpp::traits::input_parameter< Rcpp::Nullable<Rcpp::DatetimeVector> >::type timestamprange(timestamprangeSEXP);
Rcpp::traits::input_parameter< const std::string& >::type loglevel(loglevelSEXP);
rcpp_result_gen = Rcpp::wrap(sr_setup(uri, ctxxp, colnames, qc, dim_points, dim_ranges, batch_size, result_order, timestamprange, loglevel));
rcpp_result_gen = Rcpp::wrap(mq_setup(uri, ctxxp, colnames, qc, dim_points, dim_ranges, batch_size, result_order, timestamprange, loglevel));
return rcpp_result_gen;
END_RCPP
}
// sr_complete
bool sr_complete(Rcpp::XPtr<tdbs::SOMAArray> sr);
RcppExport SEXP _tiledbsoma_sr_complete(SEXP srSEXP) {
// mq_complete
bool mq_complete(Rcpp::XPtr<tdbs::ManagedQuery> mq);
RcppExport SEXP _tiledbsoma_mq_complete(SEXP mqSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< Rcpp::XPtr<tdbs::SOMAArray> >::type sr(srSEXP);
rcpp_result_gen = Rcpp::wrap(sr_complete(sr));
Rcpp::traits::input_parameter< Rcpp::XPtr<tdbs::ManagedQuery> >::type mq(mqSEXP);
rcpp_result_gen = Rcpp::wrap(mq_complete(mq));
return rcpp_result_gen;
END_RCPP
}
Expand All @@ -668,36 +668,36 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
// sr_next
SEXP sr_next(Rcpp::XPtr<tdbs::SOMAArray> sr);
RcppExport SEXP _tiledbsoma_sr_next(SEXP srSEXP) {
// mq_next
SEXP mq_next(Rcpp::XPtr<tdbs::ManagedQuery> mq);
RcppExport SEXP _tiledbsoma_mq_next(SEXP mqSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< Rcpp::XPtr<tdbs::SOMAArray> >::type sr(srSEXP);
rcpp_result_gen = Rcpp::wrap(sr_next(sr));
Rcpp::traits::input_parameter< Rcpp::XPtr<tdbs::ManagedQuery> >::type mq(mqSEXP);
rcpp_result_gen = Rcpp::wrap(mq_next(mq));
return rcpp_result_gen;
END_RCPP
}
// sr_reset
void sr_reset(Rcpp::XPtr<tdbs::SOMAArray> sr);
RcppExport SEXP _tiledbsoma_sr_reset(SEXP srSEXP) {
// mq_reset
void mq_reset(Rcpp::XPtr<tdbs::ManagedQuery> mq);
RcppExport SEXP _tiledbsoma_mq_reset(SEXP mqSEXP) {
BEGIN_RCPP
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< Rcpp::XPtr<tdbs::SOMAArray> >::type sr(srSEXP);
sr_reset(sr);
Rcpp::traits::input_parameter< Rcpp::XPtr<tdbs::ManagedQuery> >::type mq(mqSEXP);
mq_reset(mq);
return R_NilValue;
END_RCPP
}
// sr_set_dim_points
void sr_set_dim_points(Rcpp::XPtr<tdbs::SOMAArray> sr, std::string dim, Rcpp::NumericVector points);
RcppExport SEXP _tiledbsoma_sr_set_dim_points(SEXP srSEXP, SEXP dimSEXP, SEXP pointsSEXP) {
// mq_set_dim_points
void mq_set_dim_points(Rcpp::XPtr<tdbs::ManagedQuery> mq, std::string dim, Rcpp::NumericVector points);
RcppExport SEXP _tiledbsoma_mq_set_dim_points(SEXP mqSEXP, SEXP dimSEXP, SEXP pointsSEXP) {
BEGIN_RCPP
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< Rcpp::XPtr<tdbs::SOMAArray> >::type sr(srSEXP);
Rcpp::traits::input_parameter< Rcpp::XPtr<tdbs::ManagedQuery> >::type mq(mqSEXP);
Rcpp::traits::input_parameter< std::string >::type dim(dimSEXP);
Rcpp::traits::input_parameter< Rcpp::NumericVector >::type points(pointsSEXP);
sr_set_dim_points(sr, dim, points);
mq_set_dim_points(mq, dim, points);
return R_NilValue;
END_RCPP
}
Expand Down Expand Up @@ -845,12 +845,12 @@ static const R_CallMethodDef CallEntries[] = {
{"_tiledbsoma_tiledbsoma_upgrade_shape", (DL_FUNC) &_tiledbsoma_tiledbsoma_upgrade_shape, 5},
{"_tiledbsoma_upgrade_or_change_domain", (DL_FUNC) &_tiledbsoma_upgrade_or_change_domain, 7},
{"_tiledbsoma_c_update_dataframe_schema", (DL_FUNC) &_tiledbsoma_c_update_dataframe_schema, 6},
{"_tiledbsoma_sr_setup", (DL_FUNC) &_tiledbsoma_sr_setup, 10},
{"_tiledbsoma_sr_complete", (DL_FUNC) &_tiledbsoma_sr_complete, 1},
{"_tiledbsoma_mq_setup", (DL_FUNC) &_tiledbsoma_mq_setup, 10},
{"_tiledbsoma_mq_complete", (DL_FUNC) &_tiledbsoma_mq_complete, 1},
{"_tiledbsoma_create_empty_arrow_table", (DL_FUNC) &_tiledbsoma_create_empty_arrow_table, 0},
{"_tiledbsoma_sr_next", (DL_FUNC) &_tiledbsoma_sr_next, 1},
{"_tiledbsoma_sr_reset", (DL_FUNC) &_tiledbsoma_sr_reset, 1},
{"_tiledbsoma_sr_set_dim_points", (DL_FUNC) &_tiledbsoma_sr_set_dim_points, 3},
{"_tiledbsoma_mq_next", (DL_FUNC) &_tiledbsoma_mq_next, 1},
{"_tiledbsoma_mq_reset", (DL_FUNC) &_tiledbsoma_mq_reset, 1},
{"_tiledbsoma_mq_set_dim_points", (DL_FUNC) &_tiledbsoma_mq_set_dim_points, 3},
{"_tiledbsoma_tiledbsoma_stats_enable", (DL_FUNC) &_tiledbsoma_tiledbsoma_stats_enable, 0},
{"_tiledbsoma_tiledbsoma_stats_disable", (DL_FUNC) &_tiledbsoma_tiledbsoma_stats_disable, 0},
{"_tiledbsoma_tiledbsoma_stats_reset", (DL_FUNC) &_tiledbsoma_tiledbsoma_stats_reset, 0},
Expand Down
54 changes: 16 additions & 38 deletions apis/r/src/arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,42 +222,20 @@ void writeArrayFromArrow(
// optional timestamp range
std::optional<tdbs::TimestampRange> tsrng = makeTimestampRange(tsvec);

std::shared_ptr<tdbs::SOMAArray> arrup;
if (arraytype == "SOMADataFrame") {
arrup = tdbs::SOMADataFrame::open(
OpenMode::write,
uri,
somactx,
"unnamed",
{},
"auto",
ResultOrder::automatic,
tsrng);
} else if (arraytype == "SOMADenseNDArray") {
arrup = tdbs::SOMADenseNDArray::open(
OpenMode::write,
uri,
somactx,
"unnamed",
{},
"auto",
ResultOrder::colmajor,
tsrng);
} else if (arraytype == "SOMASparseNDArray") {
arrup = tdbs::SOMASparseNDArray::open(
OpenMode::write,
uri,
somactx,
"unnamed",
{},
"auto",
ResultOrder::automatic,
tsrng);
} else { // not reached
Rcpp::stop(tfm::format("Unexpected array type '%s'", arraytype));
}

arrup.get()->set_array_data(std::move(schema), std::move(array));
arrup.get()->write();
arrup.get()->close();
std::unique_ptr<tdbs::SOMAArray> arrup = tdbs::SOMAArray::open(
OpenMode::write,
uri,
somactx,
"unnamed",
{},
"auto",
ResultOrder::automatic,
tsrng);

auto mq = tdbs::ManagedQuery(*arrup, somactx->tiledb_ctx());
mq.set_layout(arraytype == "SOMADenseNDArray" ?
ResultOrder::colmajor : ResultOrder::automatic);
mq.set_array_data(std::move(schema), std::move(array));
mq.submit_write();
mq.close();
}
Loading

0 comments on commit 2105abc

Please sign in to comment.